【笔记】Operator课程(1-6)
本文最后更新于 395 天前,其中的信息可能已经有所发展或是发生改变。

白丁云原生:k8s编程-operator篇【已完结】

clientgo架构

image-20230326150144483

  1. 以前以为client-go只是通过restclient去调用api server,其实不然,它还具备controller的逻辑,以实现更强大的k8s操作。

RESTClient原理

  1. 创建了所有版本的client,例如corev1,体现k8s向后兼容,只要正式版发布的接口不会删除,beta预发布的会在几个迭代后删除
  2. restclient就是httpclient,可用来直接调用k8s api的url
  3. clientset封装了对k8s资源的操作

Reflector原理

  1. 通过listwatch从api server获取资源列表,并监听其变化

  2. 通过debug定位监听阻塞的地方,在go标准库里,json stream。具体方法未能定位到

  3. 获取变更事件后,调用了store接口,保存数据。例如delta fifo queue

    switch event.Type {
            case watch.Added:
                err := store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
                }
            case watch.Modified:
                err := store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
                }
            case watch.Deleted:
                // TODO: Will any consumers need access to the "last known
                // state", which is passed in event.Object? If so, may need
                // to change this.
                err := store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
                }
  4. resource version标识k8s资源版本,体现变更,watch的时候用到

DeltaFIFO原理

  1. 实现store接口,记录历史操作

    type Store interface {
    Add(obj interface{}) error
    Update(obj interface{}) error
    Delete(obj interface{}) error
    List() []interface{}
    ListKeys() []string
    Get(obj interface{}) (item interface{}, exists bool, err error)
    GetByKey(key string) (item interface{}, exists bool, err error)
    Replace([]interface{}, string) error
    Resync() error
    }
    1. 其他实现类

      1. ExpirationCache

        1. 过期原理:保存的结构体里包括时间戳
        c.cacheStorage.Add(key, &TimestampedEntry{obj, c.clock.Now(), key})
        1. 相关知识:redis清理过期key
      2. UndeltaStore

        1. 数据变更时,推送保存的所有数据
        func (u *UndeltaStore) Add(obj interface{}) error {
            if err := u.Store.Add(obj); err != nil {
                return err
            }
            u.PushFunc(u.Store.List())
            return nil
        }
        
        func (u *UndeltaStore) Update(obj interface{}) error {
            if err := u.Store.Update(obj); err != nil {
                return err
            }
            u.PushFunc(u.Store.List())
            return nil
        }
        1. 代理模式
      3. FIFO

        1. 和delta fifo的区别是没有保存delta(历史操作)
  2. 使用数组加map,实现有序+O(1)读取,空间换时间

    type DeltaFIFO struct {
    lock sync.RWMutex
    cond sync.Cond
    items map[string]Deltas
    queue []string
    ...
    }
    1. map保存了delta数组,也就是操作记录(命令模式)

      type DeltaType string
      
      const (
      Added   DeltaType = "Added"
      Updated DeltaType = "Updated"
      Deleted DeltaType = "Deleted"
      Replaced DeltaType = "Replaced"
      Sync DeltaType = "Sync"
      )
      
      type Delta struct {
      Type   DeltaType
      Object interface{}
      }
    2. sync.Cond实现阻塞队列

  3. 记录历史操作

    func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    ...
    
    oldDeltas := f.items[id]
    newDeltas := append(oldDeltas, Delta{actionType, obj})
    newDeltas = dedupDeltas(newDeltas)
    
    if len(newDeltas) > 0 {
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } 
    ...
  4. 取出历史记录并处理

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
            // When Close() is called, the f.closed is set and the condition is broadcasted.
            // Which causes this loop to continue and return from the Pop().
            if f.closed {
                return nil, ErrFIFOClosed
            }

      // 阻塞获取
            f.cond.Wait()
        }
        isInInitialList := !f.hasSynced_locked()
        id := f.queue[0]
        f.queue = f.queue[1:]
        depth := len(f.queue)
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        item, ok := f.items[id]
        if !ok {
            // This should never happen
            klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
            continue
        }
        delete(f.items, id)
        // Only log traces if the queue depth is greater than 10 and it takes more than
        // 100 milliseconds to process one item from the queue.
        // Queue depth never goes high because processing an item is locking the queue,
        // and new items can't be added until processing finish.
        // https://github.com/kubernetes/kubernetes/issues/103789
        if depth > 10 {
            trace := utiltrace.New("DeltaFIFO Pop Process",
                utiltrace.Field{Key: "ID", Value: id},
                utiltrace.Field{Key: "Depth", Value: depth},
                utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
            defer trace.LogIfLong(100 * time.Millisecond)
        }

    // 处理
        err := process(item, isInInitialList)
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        // Don't need to copyDeltas here, because we're transferring
        // ownership to the caller.
        return item, err
    }
}
作者:Yuyy
博客:https://yuyy.info
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇