本文最后更新于 738 天前,其中的信息可能已经有所发展或是发生改变。
clientgo架构
- 以前以为client-go只是通过restclient去调用api server,其实不然,它还具备controller的逻辑,以实现更强大的k8s操作。
RESTClient原理
- 创建了所有版本的client,例如corev1,体现k8s向后兼容,只要正式版发布的接口不会删除,beta预发布的会在几个迭代后删除
- restclient就是httpclient,可用来直接调用k8s api的url
- clientset封装了对k8s资源的操作
Reflector原理
-
通过listwatch从api server获取资源列表,并监听其变化
-
通过debug定位监听阻塞的地方,在go标准库里,json stream。具体方法未能定位到
-
获取变更事件后,调用了store接口,保存数据。例如delta fifo queue
switch event.Type { case watch.Added: err := store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err)) } case watch.Modified: err := store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err)) } case watch.Deleted: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. err := store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err)) }
-
resource version标识k8s资源版本,体现变更,watch的时候用到
DeltaFIFO原理
-
实现store接口,记录历史操作
type Store interface { Add(obj interface{}) error Update(obj interface{}) error Delete(obj interface{}) error List() []interface{} ListKeys() []string Get(obj interface{}) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error) Replace([]interface{}, string) error Resync() error }
-
其他实现类
-
ExpirationCache
- 过期原理:保存的结构体里包括时间戳
c.cacheStorage.Add(key, &TimestampedEntry{obj, c.clock.Now(), key})
- 相关知识:redis清理过期key
-
UndeltaStore
- 数据变更时,推送保存的所有数据
func (u *UndeltaStore) Add(obj interface{}) error { if err := u.Store.Add(obj); err != nil { return err } u.PushFunc(u.Store.List()) return nil } func (u *UndeltaStore) Update(obj interface{}) error { if err := u.Store.Update(obj); err != nil { return err } u.PushFunc(u.Store.List()) return nil }
- 代理模式
-
FIFO
- 和delta fifo的区别是没有保存delta(历史操作)
-
-
-
使用数组加map,实现有序+O(1)读取,空间换时间
type DeltaFIFO struct { lock sync.RWMutex cond sync.Cond items map[string]Deltas queue []string ... }
-
map保存了delta数组,也就是操作记录(命令模式)
type DeltaType string const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" Replaced DeltaType = "Replaced" Sync DeltaType = "Sync" ) type Delta struct { Type DeltaType Object interface{} }
-
sync.Cond实现阻塞队列
-
-
记录历史操作
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } ... oldDeltas := f.items[id] newDeltas := append(oldDeltas, Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) if len(newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } ...
-
取出历史记录并处理
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed
}
// 阻塞获取
f.cond.Wait()
}
isInInitialList := !f.hasSynced_locked()
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
// Only log traces if the queue depth is greater than 10 and it takes more than
// 100 milliseconds to process one item from the queue.
// Queue depth never goes high because processing an item is locking the queue,
// and new items can't be added until processing finish.
// https://github.com/kubernetes/kubernetes/issues/103789
if depth > 10 {
trace := utiltrace.New("DeltaFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: depth},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
// 处理
err := process(item, isInInitialList)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}