在茫茫代码中,很容易迷失。经过一段时间的艰苦探索,终于对client-go有了一定的了解,能够从整体上把相关功能串起来。本文主要是informer相关的内容,因为这是client-go比较核心的功能。后面还会发一篇关于client-go编程使用的文章,会从kubernetes.Interface,dynamic,discovery,workequeue和informer工厂函数等方面介绍,再结合一个operator作为实例进行分析(虽然大部分operator都使用kube-builder等类似的工具构建,但是本着求根问底的精神还是要分析一下)。
相关存储结构
在开始介绍之前,先介绍一下client-go中提供存储相关的接口,这些接口用来缓存api对象。Store是最基础的接口,提供增删改查等,还有一些继承于Store的接口,比Store可能会多实现一些方法,可以针对不同的场景进行使用。
- Store
- ExpirationStore
- UndeltaStore
- Queue
- FIFO
- DeltaFIFO
- Indexer
- cache
不同Store的引用
Store接口的具体定义可以看vendor/k8s.io/client-go/tools/cache/store.go这个文件。在IDE的帮助下,找到了所有实现了这个Store的相关实现。本文主要关心client-go,所以主要跟进client-go下面的相关实现。
- delta_fifo.go
- NewDeltaFIFO()
- NewDeltaFIFOWithOptions() informer和sharedinformer使用到了
- vendor/k8s.io/client-go/tools/cache/controller.go:NewInformer()
- vendor/k8s.io/client-go/tools/cache/shared_informer.go:sharedIndexInformer.Run()
- expiration_cache.go
- NewTTLStore()
- pkg/kubelet/util/cache/object_cache.go:NewObjectCache()
- NewExpirationStore()
- pkg/credentialprovider/aws/aws_credentials.go:newECRProvider()
- pkg/credentialprovider/azure/azure_credentials.go:NewACRProvider()
- NewTTLStore()
- fifo.go 该文件中定义了Queue接口,继承了Store,并且在该文件中通过FIFO实现了这个queue,另外一个实现是在delta_fifo.go中的DeltaFIFO
- NewFIFO() 代码中未找到该引用
- index.go 这个文件定义了Indexer接口,继承了Store,在store.go中通过cache实现了该接口
- store.go
- NewStore()
- staging/src/k8s.io/client-go/tools/cache/controller.go:NewInformer()
- staging/src/k8s.io/client-go/tools/cache/undelta_store.go:NewUndeltaStore()
- NewIndexer()
- staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go:newWatchCache()
- staging/src/k8s.io/client-go/tools/cache/controller.go:NewIndexerInformer()
- staging/src/k8s.io/client-go/tools/cache/reflector.go:NewNamespaceKeyedIndexerAndReflector() 该函数在IDE中显示未被使用
- staging/src/k8s.io/client-go/tools/cache/shared_informer.go:NewSharedIndexInformer()
- NewStore()
- undelata_store.go
- NewUndeltaStore()
- pkg/kubelet/config/apiserver.go:newSourceApiserverFromLW()
- NewUndeltaStore()
informer简要介绍
client-go中提供了普通informer和sharedInformer两种informer给我们使用。使用informer可以快速的构建各种资源的控制器,来对k8s进行扩展。informer提供了资源变化时执行回调的功能,可以在新增资源,修改资源和是删除资源时执行相应的控制器逻辑。
使用sharedInformer可以对同一个资源注册多个控制器,每个控制器独立执行自己的业务逻辑,互不干扰。多个控制器之间底层公用一个存储和一个到apiserver的连接,可以减少apiserver的压力的和本地内存的占用。informer则不具备这些能力,对于简单控制器可以使用informer,对于较复杂的控制器,会有针对同一个资源多个控制逻辑的推荐使用sharedInformer。
这两个informer在实现的复杂度上有很大差异,sharedInformer相对于普通informer来说要复杂的多。因此本文先从普通informer开始来进行分析,普通informer还分两种,一种是普通的informer,另外一种indexerinfomer。两种informer的区别在于前者使用Store作为对象存储,而后者采用Indexer作为对象存储,通过索引可以加快搜索。
普通informer分析
接下来我们以普通的informer进行分析,关于informer的使用可以参考这里。代码位于k8s.io/client-go/tools/cache/controller.go文件中,该文件中定义了Controller接口,并通过controller进行了实现。
controller分析
先看下接口定义以及controller包含的关键数据结构:
// 这两个函数用来创建informer实例
func NewInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler,) (Store, Controller) {}
func NewIndexerInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, indexers Indexers,) (Indexer, Controller) {}
type Controller interface {
// Run主要做两件事情。第一个是创建并启动一个Reflector,从ListerWatcher接口
// 获取对象和通知并放到Queue中,并可能还会调用Queue的Resync方法(是在Reflector调用的)。
// 第二件事情就是不断从Queue中弹出对象并通过ProcessFunc进行处理。
Run(stopCh <-chan struct{})
// 调用Queue的HasSynced判断Queue是否完成首次同步。
HasSynced() bool
// 最新的资源版本
LastSyncResourceVersion() string
}
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
// 这个函数中最重要的是Process函数。DeltaFIFO.Pop()内会调用这个函数。这个函数的操作就是
// 判断Delta类型并同步到clientState中(Store或Indexer类型),并调用对应的handler函数。
// 注意clientState和DeltaFIFO的不同,clientState是在handler运行上下文使用的,在handler
// 运行时,必须是可以在clientState里获取到这个对象的。而DeltaFIFO中存储的是资源在某一时刻
// 在apiserver中的快照加上后续的变更信息。
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return New(cfg)
}
可以看到Controller接口中最主要的方法就是Run(),该函数的功能参考上面注释。下面来详细看下这个函数的主要代码。首先创建了一个Reflector,然后运行Reflector和processLoop函数,并阻塞等待停止信号。其中Reflector可以看成是生产者,processLoop可以看成是消费者。
func (c *controller) Run(stopCh <-chan struct{}) {
// 不相关代码注释掉...
// Reflector通过ListerWatcher从apiserver获取资源以及通知,并放入到Queue中。
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
// 不相关代码注释掉...
var wg wait.Group
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
// processLoop做的事情很简单,就是不断的调用Queue.Pop函数来逐个处理Queue中的对象。
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
reflector分析
接下来的话,我们再来看Reflector是如何工作的,我们来分析一下Reflector.Run()方法。Run中通过backoffManager监视ListAndWatch的运行,异常时可以重新运行。
func (r *Reflector) Run(stopCh <-chan struct{}) {
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
}
下面来看下核心ListAndWatch的运行。
// 这块值得我们注意,这个store参数在controller调用的时候传递的DeltaFIFO类型。
// processLoop不断从DeltaFIFO里拿数据然后进行处理。
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
if err := func() error {
var list runtime.Object
go func() {
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
// 不相关代码注释掉...主要是一些参数设置和分页相关的逻辑。
list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
}()
// 不相关代码注释掉...
// 完这个函数,正常情况下已经拿到了一个完整的资源列表。
items, err := meta.ExtractList(list)
// 这里是将完整的列表同步到Queue中
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
}(); err != nil {
return err
}
// 不相关代码注释掉...
// 下面这段代码比较重要,在将首次完整列表同步到Store中后,启动了一个resync的协程。
// 这个协程等待各个cancelCh,stopCh和resyncCh的消息,前两个的处理是调用cleanup()并返回。
// 其中最重要的是resyncCh的消息,如果接收到该channel消息的话,会执行Store.Resync()函数。
// 在最开始我们介绍了不同类型的存储,其中有一个DeltaFIFO,它的Resync()操作是将clientState
// 中所有已知的对象重新还未在队列中的重新加入到队列中,因此Controller中的processLoop可以定
// 期的执行全量资源的同步操作。
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
// 接下来就是Watch操作
for {
// 不相关代码注释掉...
w, err := r.listerWatcher.Watch(options)
// 不相关代码注释掉...
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
// 不相关代码注释掉...
return nil
}
}
}
// 初始全量资源同步函数
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
// 资源变动操作,对于资源的增加,删除和更新分别调用了Store的对应操作。
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
// 不相关代码注释掉...
loop:
for {
select {
// ...
case event, ok := <-w.ResultChan():
// 不相关代码注释掉...
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
case watch.Modified:
err := r.store.Update(event.Object)
case watch.Deleted:
err := r.store.Delete(event.Object)
}
}
}
// 不相关代码注释掉...
}
至此,informer的整体流程分析已经差不多了,但是其中还是有很多细节还没有讲到的,笔者也还没有真正完全理解这个完整的代码,还需要各位读者亲自跟着代码去梳理。同时对讲的不对的地方能够积极提出。
sharedinformer分析
现在,我们再来梳理一下sharedinformer的原理,这块会更加复杂一些。下面是一张整理好的图片,关于代码的调用逻辑,关键数据结构和关键代码。在后面讲解过程中主要以这张图来展开详细说明。图片不太清楚的话也可以下载这个附件解压后在draw.io打开。
在详细介绍之前先从整体上给将SharedInformer进行一个拆分,大概可分为下列几个功能组件:
- sharedInformer
- sharedProcessor
- processorListener
可以通过sharedinformer注册事件处理函数。sharedprocessor用来对接受的事件做分发处理。processorlistener则是具体执行事件处理的单元。总体逻辑就是sharedprocessor出DeltaFIFO中获取事件,然后分发的各个processorlistener上去执行。每通过sharedinformer注册事件处理函数时就会创建一个processorlistener。
processorlistener分析
现在我们从最末端的事件处理函数来分析,先来看下processorlistener是如何获取和处理事件的。下面是processorlistner接口和一些主要函数:
type processorListener struct {
// 这个是run()读取用的
nextCh chan interface{}
// 这个是sharedprocessor分发事件用的
addCh chan interface{}
// 事件处理函数
handler ResourceEventHandler
// 里面存还未进行分发的事件,从addCh过来之后暂存到这里。
pendingNotifications buffer.RingGrowing
requestedResyncPeriod time.Duration
resyncPeriod time.Duration
nextResync time.Time
resyncLock sync.Mutex
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
ret.determineNextResync(now)
return ret
}
// 这个函数的作用就是不断从nextCh获取消息然后进行处理。
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
// 这个函数的主要租用是在pendingNotifications中获取等待处理的事件发送到p.nextCh中,这样run()
// 函数就可以通过这个channel获取到事件并进行处理。pop()函数在sharedinformer启动或注册新的handler时启动运行。
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
shareprocessor分析
现在再来看下shareprocessor是如何处理事件的。
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
// 上面介绍的processorlistener
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
// 注册事件处理函数并启动
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
// 分发事件到各个listener中
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
// 启动所有的listeners
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
sharedinformer分析
下面是SharedInformer接口:
type SharedInformer interface {
AddEventHandler(handler ResourceEventHandler)
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
GetStore() Store
GetController() Controller
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
SetWatchErrorHandler(handler WatchErrorHandler) error
}
type SharedIndexInformer interface {
SharedInformer
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
接下来看SharedInformer的创建方法
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
// 共享的处理器
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
// 获取资源的ListAndWatch接口
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
// 创建sharedinformer只是传递了一个空的Indexers。
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
}
下面看sharedinformer的run()方法
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 给Reflector用的
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
// 处理Reflector的Queue用的,这个函数很关键。从DeltaFIFO获取事件
// 然后通过sharedprocessor进行分发处理
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 创建controller,参考上面普通informer的Controller部分
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)
}
最后再来看HandleDeltas这个函数
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// DeltaFIFO拿出来的都是Deltas类型
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
// 如果存储中已经有了这个对象,则执行更新操作
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
// 这个switch就是来判断是不是resync操作
isSync := false
switch {
case d.Type == Sync:
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// 分发更新事件
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 分发添加事件
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 分发删除事件
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
截止到这里,sharedinformer的大体流程上就分析完了。可以看到,和普通informer不同的地方是从Reflector的Queue消费消息的处理逻辑,普通informer是直接就地处理,而sharedinformer是使用sharedprocessor分发的各个listener中,每个listener单独处理。
这部分代码也有很多细节地方笔者也还没有去完全研究完,比如多个事件处理函数在注册的时候都传递了resyncPeriod参数,如果这个值不同,他们怎么做到互相隔离的(Reflector只能设置一个同步时间,sharedinformer如何设置的这个时间)。这个client-go的设计还是比较复杂的,随着后面的学习和使用如有新的点继续在这里完善。