背景
最近遇到zk无法使用,zk断开后无法重连等问题,排查了下用到的zk sdk,顺便写文章记录下这个sdk是如何高效复用连接的。
流程分析
获取连接
-
服务启动时,会创建zk connection,本质上就是一个TCP连接。
-
如果失败,就从zk配置里取下一个zk server地址,进行连接。
func (c *Conn) connect() { log.Printf("Start to get zk connection from %v", c.servers) c.serverIndex = (c.serverIndex + 1) % len(c.servers) startIndex := c.serverIndex c.setState(StateConnecting) for { zkConn, err := c.dialer("tcp", c.servers[c.serverIndex], c.connectTimeout) if err == nil { log.Printf("Get new zk connection: %s", c.servers[c.serverIndex]) c.conn = zkConn c.setState(StateConnected) return } log.Printf("Failed to connect to %s: %+v", c.servers[c.serverIndex], err) c.serverIndex = (c.serverIndex + 1) % len(c.servers) if c.serverIndex == startIndex { c.flushUnsentRequests(ErrNoServer) time.Sleep(time.Second) } } }
创建写入请求到连接的协程一
-
sendChan是一个channel,协程一会循环消费sendChan里的zk命令请求。
-
上层应用执行zk命令时,通过会调用sdk,封装成一个请求,写到sendChan里。
-
将请求数据的长度写到byte数组前4位,服务端读取时先读前4位,得到请求数据长度,再读取对应长度的数据。从而避免粘包问题。
-
将req对象放到一个map里,key为自增的xid,通过这个xid关联responce,从而实现并发请求,乱序响应。和http2.0里的stream id一样,是多路复用的一种方式。
创建从连接读取响应值的协程二
- 从TCP连接里读取服务端响应,先读取当前响应值的长度,再读取全部数据。
- 响应值包含本次请求的xid。
- 根据xid区分响应类型,例如watch事件,客户端会得到事件类型和path。
- 如何避免watch丢事件?
- zk的watch是一次性的,客户端收到事件后需要重新watch才能收到下一个事件。那么在重新watch前这个时间窗口内发生的事件就会丢失
- 解决方案是通过面向终态来解决。
- 方案一:每次watch到事件后,调用 getData 或 getChildren 主动获取最新数据进行处理。基于 Zookeeper 高级封装的 Apache Curator 的 PersistentWatcher 就是这么处理的。
- 方案二:在方案一的基础上增加一步,在执行watch后,到事件到来前,先调用一次 getData 或 getChildren 主动获取最新数据进行处理。这样能避免漏掉上一次watch到时候后获取最新数据处理,到第二次watch前这个时间窗口内的事件。这个概率很小,大部分情况方案一就够了。
- 如何避免watch丢事件?
- 收到响应值后,往channel req.recvChan发送信号,通知阻塞的上层应用。
上层应用调用zk sdk发送请求
- 构造请求体
- 将请求体写到channel sendChan
- 阻塞从 channel req1.recvChan 获取响应值
func (c *Conn) request(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) (int64, error) {
r := <-c.queueRequest(opcode, req, res, recvFunc)
return r.zxid, r.err
}
func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) <-chan response {
rq := &request{
xid: c.nextXid(),
opcode: opcode,
pkt: req,
recvStruct: res,
recvChan: make(chan response, 1),
recvFunc: recvFunc,
}
c.sendChan <- rq
return rq.recvChan
}
整体流程
总结
zookeeper sdk和zookeeper服务端建立一个TCP连接,通过它并发执行zk命令。每个请求都包含一个xid,服务端乱序返回的响应值也包含xid,从而关联上请求和响应,实现高性能的多路复用。