剖析zookeeper sdk,高效的多路复用

JoeyJazz Fantasy Art Wallpaper

背景

最近遇到zk无法使用,zk断开后无法重连等问题,排查了下用到的zk sdk,顺便写文章记录下这个sdk是如何高效复用连接的。

流程分析

获取连接

image-20241119195141572

  1. 服务启动时,会创建zk connection,本质上就是一个TCP连接。

  2. 如果失败,就从zk配置里取下一个zk server地址,进行连接。

    func (c *Conn) connect() {
    log.Printf("Start to get zk connection from %v", c.servers)
    c.serverIndex = (c.serverIndex + 1) % len(c.servers)
    startIndex := c.serverIndex
    c.setState(StateConnecting)
    for {
        zkConn, err := c.dialer("tcp", c.servers[c.serverIndex], c.connectTimeout)
        if err == nil {
            log.Printf("Get new zk connection: %s", c.servers[c.serverIndex])
            c.conn = zkConn
            c.setState(StateConnected)
            return
        }
    
        log.Printf("Failed to connect to %s: %+v", c.servers[c.serverIndex], err)
    
        c.serverIndex = (c.serverIndex + 1) % len(c.servers)
        if c.serverIndex == startIndex {
            c.flushUnsentRequests(ErrNoServer)
            time.Sleep(time.Second)
        }
    }
    }

创建写入请求到连接的协程一

image-20241122151710012

  1. sendChan是一个channel,协程一会循环消费sendChan里的zk命令请求。

  2. 上层应用执行zk命令时,通过会调用sdk,封装成一个请求,写到sendChan里。

  3. 将请求数据的长度写到byte数组前4位,服务端读取时先读前4位,得到请求数据长度,再读取对应长度的数据。从而避免粘包问题

  4. 将req对象放到一个map里,key为自增的xid,通过这个xid关联responce,从而实现并发请求,乱序响应。和http2.0里的stream id一样,是多路复用的一种方式。

    HTTP 协议演进

创建从连接读取响应值的协程二

image-20241122110638816

  1. 从TCP连接里读取服务端响应,先读取当前响应值的长度,再读取全部数据。
  2. 响应值包含本次请求的xid。
  3. 根据xid区分响应类型,例如watch事件,客户端会得到事件类型和path。
    1. 如何避免watch丢事件?
      1. zk的watch是一次性的,客户端收到事件后需要重新watch才能收到下一个事件。那么在重新watch前这个时间窗口内发生的事件就会丢失
      2. 解决方案是通过面向终态来解决。
        1. 方案一:每次watch到事件后,调用 getData 或 getChildren 主动获取最新数据进行处理。基于 Zookeeper 高级封装的 Apache Curator 的 PersistentWatcher 就是这么处理的。
        2. 方案二:在方案一的基础上增加一步,在执行watch后,到事件到来前,先调用一次 getData 或 getChildren 主动获取最新数据进行处理。这样能避免漏掉上一次watch到时候后获取最新数据处理,到第二次watch前这个时间窗口内的事件。这个概率很小,大部分情况方案一就够了。
  4. 收到响应值后,往channel req.recvChan发送信号,通知阻塞的上层应用。

上层应用调用zk sdk发送请求

image-20241122143151582

  1. 构造请求体
  2. 将请求体写到channel sendChan
  3. 阻塞从 channel req1.recvChan 获取响应值
func (c *Conn) request(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) (int64, error) {
    r := <-c.queueRequest(opcode, req, res, recvFunc)
    return r.zxid, r.err
}

func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) <-chan response {
    rq := &request{
        xid:        c.nextXid(),
        opcode:     opcode,
        pkt:        req,
        recvStruct: res,
        recvChan:   make(chan response, 1),
        recvFunc:   recvFunc,
    }
    c.sendChan <- rq
    return rq.recvChan
}

整体流程

image-20241119193520678

总结

zookeeper sdk和zookeeper服务端建立一个TCP连接,通过它并发执行zk命令。每个请求都包含一个xid,服务端乱序返回的响应值也包含xid,从而关联上请求和响应,实现高性能的多路复用。

作者:Yuyy
博客:https://yuyy.info
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇