client-go代码分析之informer和sharedInformer分析

在茫茫代码中,很容易迷失。经过一段时间的艰苦探索,终于对client-go有了一定的了解,能够从整体上把相关功能串起来。本文主要是informer相关的内容,因为这是client-go比较核心的功能。后面还会发一篇关于client-go编程使用的文章,会从kubernetes.Interface,dynamic,discovery,workequeue和informer工厂函数等方面介绍,再结合一个operator作为实例进行分析(虽然大部分operator都使用kube-builder等类似的工具构建,但是本着求根问底的精神还是要分析一下)。

相关存储结构

在开始介绍之前,先介绍一下client-go中提供存储相关的接口,这些接口用来缓存api对象。Store是最基础的接口,提供增删改查等,还有一些继承于Store的接口,比Store可能会多实现一些方法,可以针对不同的场景进行使用。

  • Store
    • ExpirationStore
    • UndeltaStore
    • Queue
      • FIFO
      • DeltaFIFO
    • Indexer
      • cache

不同Store的引用

Store接口的具体定义可以看vendor/k8s.io/client-go/tools/cache/store.go这个文件。在IDE的帮助下,找到了所有实现了这个Store的相关实现。本文主要关心client-go,所以主要跟进client-go下面的相关实现。

  • delta_fifo.go
    • NewDeltaFIFO()
    • NewDeltaFIFOWithOptions() informer和sharedinformer使用到了
      • vendor/k8s.io/client-go/tools/cache/controller.go:NewInformer()
      • vendor/k8s.io/client-go/tools/cache/shared_informer.go:sharedIndexInformer.Run()
  • expiration_cache.go
    • NewTTLStore()
      • pkg/kubelet/util/cache/object_cache.go:NewObjectCache()
    • NewExpirationStore()
      • pkg/credentialprovider/aws/aws_credentials.go:newECRProvider()
      • pkg/credentialprovider/azure/azure_credentials.go:NewACRProvider()
  • fifo.go 该文件中定义了Queue接口,继承了Store,并且在该文件中通过FIFO实现了这个queue,另外一个实现是在delta_fifo.go中的DeltaFIFO
    • NewFIFO() 代码中未找到该引用
  • index.go 这个文件定义了Indexer接口,继承了Store,在store.go中通过cache实现了该接口
  • store.go
    • NewStore()
      • staging/src/k8s.io/client-go/tools/cache/controller.go:NewInformer()
      • staging/src/k8s.io/client-go/tools/cache/undelta_store.go:NewUndeltaStore()
    • NewIndexer()
      • staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go:newWatchCache()
      • staging/src/k8s.io/client-go/tools/cache/controller.go:NewIndexerInformer()
      • staging/src/k8s.io/client-go/tools/cache/reflector.go:NewNamespaceKeyedIndexerAndReflector() 该函数在IDE中显示未被使用
      • staging/src/k8s.io/client-go/tools/cache/shared_informer.go:NewSharedIndexInformer()
  • undelata_store.go
    • NewUndeltaStore()
      • pkg/kubelet/config/apiserver.go:newSourceApiserverFromLW()

informer简要介绍

client-go中提供了普通informer和sharedInformer两种informer给我们使用。使用informer可以快速的构建各种资源的控制器,来对k8s进行扩展。informer提供了资源变化时执行回调的功能,可以在新增资源,修改资源和是删除资源时执行相应的控制器逻辑。

使用sharedInformer可以对同一个资源注册多个控制器,每个控制器独立执行自己的业务逻辑,互不干扰。多个控制器之间底层公用一个存储和一个到apiserver的连接,可以减少apiserver的压力的和本地内存的占用。informer则不具备这些能力,对于简单控制器可以使用informer,对于较复杂的控制器,会有针对同一个资源多个控制逻辑的推荐使用sharedInformer。

这两个informer在实现的复杂度上有很大差异,sharedInformer相对于普通informer来说要复杂的多。因此本文先从普通informer开始来进行分析,普通informer还分两种,一种是普通的informer,另外一种indexerinfomer。两种informer的区别在于前者使用Store作为对象存储,而后者采用Indexer作为对象存储,通过索引可以加快搜索。

普通informer分析

接下来我们以普通的informer进行分析,关于informer的使用可以参考这里。代码位于k8s.io/client-go/tools/cache/controller.go文件中,该文件中定义了Controller接口,并通过controller进行了实现。

controller分析

先看下接口定义以及controller包含的关键数据结构:

// 这两个函数用来创建informer实例
func NewInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler,) (Store, Controller) {}
func NewIndexerInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, indexers Indexers,) (Indexer, Controller) {}

type Controller interface {
   // Run主要做两件事情。第一个是创建并启动一个Reflector,从ListerWatcher接口
   // 获取对象和通知并放到Queue中,并可能还会调用Queue的Resync方法(是在Reflector调用的)。
   // 第二件事情就是不断从Queue中弹出对象并通过ProcessFunc进行处理。
   Run(stopCh <-chan struct{})

   // 调用Queue的HasSynced判断Queue是否完成首次同步。
   HasSynced() bool

   // 最新的资源版本
   LastSyncResourceVersion() string
}

type controller struct {
   config         Config
   reflector      *Reflector
   reflectorMutex sync.RWMutex
   clock          clock.Clock
}

// 这个函数中最重要的是Process函数。DeltaFIFO.Pop()内会调用这个函数。这个函数的操作就是
// 判断Delta类型并同步到clientState中(Store或Indexer类型),并调用对应的handler函数。
// 注意clientState和DeltaFIFO的不同,clientState是在handler运行上下文使用的,在handler
// 运行时,必须是可以在clientState里获取到这个对象的。而DeltaFIFO中存储的是资源在某一时刻
// 在apiserver中的快照加上后续的变更信息。
func newInformer(
   lw ListerWatcher,
   objType runtime.Object,
   resyncPeriod time.Duration,
   h ResourceEventHandler,
   clientState Store,
) Controller {
   // This will hold incoming changes. Note how we pass clientState in as a
   // KeyLister, that way resync operations will result in the correct set
   // of update/delete deltas.
   fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
      KnownObjects:          clientState,
      EmitDeltaTypeReplaced: true,
   })

   cfg := &Config{
      Queue:            fifo,
      ListerWatcher:    lw,
      ObjectType:       objType,
      FullResyncPeriod: resyncPeriod,
      RetryOnError:     false,

      Process: func(obj interface{}) error {
         // from oldest to newest
         for _, d := range obj.(Deltas) {
            switch d.Type {
            case Sync, Replaced, Added, Updated:
               if old, exists, err := clientState.Get(d.Object); err == nil && exists {
                  if err := clientState.Update(d.Object); err != nil {
                     return err
                  }
                  h.OnUpdate(old, d.Object)
               } else {
                  if err := clientState.Add(d.Object); err != nil {
                     return err
                  }
                  h.OnAdd(d.Object)
               }
            case Deleted:
               if err := clientState.Delete(d.Object); err != nil {
                  return err
               }
               h.OnDelete(d.Object)
            }
         }
         return nil
      },
   }
   return New(cfg)
}

可以看到Controller接口中最主要的方法就是Run(),该函数的功能参考上面注释。下面来详细看下这个函数的主要代码。首先创建了一个Reflector,然后运行Reflector和processLoop函数,并阻塞等待停止信号。其中Reflector可以看成是生产者,processLoop可以看成是消费者。

func (c *controller) Run(stopCh <-chan struct{}) {
   // 不相关代码注释掉...
   // Reflector通过ListerWatcher从apiserver获取资源以及通知,并放入到Queue中。
   r := NewReflector(
      c.config.ListerWatcher,
      c.config.ObjectType,
      c.config.Queue,
      c.config.FullResyncPeriod,
   )
   // 不相关代码注释掉...
   var wg wait.Group
   wg.StartWithChannel(stopCh, r.Run)
   wait.Until(c.processLoop, time.Second, stopCh)
   wg.Wait()
}

// processLoop做的事情很简单,就是不断的调用Queue.Pop函数来逐个处理Queue中的对象。
func (c *controller) processLoop() {
   for {
      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
      if err != nil {
         if err == ErrFIFOClosed {
            return
         }
         if c.config.RetryOnError {
            // This is the safe way to re-enqueue.
            c.config.Queue.AddIfNotPresent(obj)
         }
      }
   }
}

reflector分析

接下来的话,我们再来看Reflector是如何工作的,我们来分析一下Reflector.Run()方法。Run中通过backoffManager监视ListAndWatch的运行,异常时可以重新运行。

func (r *Reflector) Run(stopCh <-chan struct{}) {
   wait.BackoffUntil(func() {
      if err := r.ListAndWatch(stopCh); err != nil {
         r.watchErrorHandler(r, err)
      }
   }, r.backoffManager, true, stopCh)
}

下面来看下核心ListAndWatch的运行。

// 这块值得我们注意,这个store参数在controller调用的时候传递的DeltaFIFO类型。
// processLoop不断从DeltaFIFO里拿数据然后进行处理。
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {}

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
   options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

   if err := func() error {
      var list runtime.Object
      go func() {
         pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
            return r.listerWatcher.List(opts)
         }))
         // 不相关代码注释掉...主要是一些参数设置和分页相关的逻辑。
         list, paginatedResult, err = pager.List(context.Background(), options)
         if isExpiredError(err) || isTooLargeResourceVersionError(err) {
            list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
         }
      }()
      // 不相关代码注释掉...
      // 完这个函数,正常情况下已经拿到了一个完整的资源列表。
      items, err := meta.ExtractList(list)
      // 这里是将完整的列表同步到Queue中
      if err := r.syncWith(items, resourceVersion); err != nil {
         return fmt.Errorf("unable to sync list result: %v", err)
      }
   }(); err != nil {
      return err
   }
   // 不相关代码注释掉...
   // 下面这段代码比较重要,在将首次完整列表同步到Store中后,启动了一个resync的协程。
   // 这个协程等待各个cancelCh,stopCh和resyncCh的消息,前两个的处理是调用cleanup()并返回。
   // 其中最重要的是resyncCh的消息,如果接收到该channel消息的话,会执行Store.Resync()函数。
   // 在最开始我们介绍了不同类型的存储,其中有一个DeltaFIFO,它的Resync()操作是将clientState
   // 中所有已知的对象重新还未在队列中的重新加入到队列中,因此Controller中的processLoop可以定
   // 期的执行全量资源的同步操作。
   resyncerrc := make(chan error, 1)
   cancelCh := make(chan struct{})
   defer close(cancelCh)
   go func() {
      resyncCh, cleanup := r.resyncChan()
      defer func() {
         cleanup() // Call the last one written into cleanup
      }()
      for {
         select {
         case <-resyncCh:
         case <-stopCh:
            return
         case <-cancelCh:
            return
         }
         if r.ShouldResync == nil || r.ShouldResync() {
            klog.V(4).Infof("%s: forcing resync", r.name)
            if err := r.store.Resync(); err != nil {
               resyncerrc <- err
               return
            }
         }
         cleanup()
         resyncCh, cleanup = r.resyncChan()
      }
   }()
   // 接下来就是Watch操作
   for {
      // 不相关代码注释掉...
      w, err := r.listerWatcher.Watch(options)
      // 不相关代码注释掉...
      if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
         // 不相关代码注释掉...
         return nil
      }
   }
}

// 初始全量资源同步函数
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
   found := make([]interface{}, 0, len(items))
   for _, item := range items {
      found = append(found, item)
   }
   return r.store.Replace(found, resourceVersion)
}

// 资源变动操作,对于资源的增加,删除和更新分别调用了Store的对应操作。
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
   // 不相关代码注释掉...
loop:
   for {
      select {
      // ...
      case event, ok := <-w.ResultChan():
         // 不相关代码注释掉...
         switch event.Type {
         case watch.Added:
            err := r.store.Add(event.Object)
         case watch.Modified:
            err := r.store.Update(event.Object)
         case watch.Deleted:
            err := r.store.Delete(event.Object)
         }
      }
   }
   // 不相关代码注释掉...
}

至此,informer的整体流程分析已经差不多了,但是其中还是有很多细节还没有讲到的,笔者也还没有真正完全理解这个完整的代码,还需要各位读者亲自跟着代码去梳理。同时对讲的不对的地方能够积极提出。

sharedinformer分析

现在,我们再来梳理一下sharedinformer的原理,这块会更加复杂一些。下面是一张整理好的图片,关于代码的调用逻辑,关键数据结构和关键代码。在后面讲解过程中主要以这张图来展开详细说明。图片不太清楚的话也可以下载这个附件解压后在draw.io打开。

在详细介绍之前先从整体上给将SharedInformer进行一个拆分,大概可分为下列几个功能组件:

  • sharedInformer
  • sharedProcessor
  • processorListener

可以通过sharedinformer注册事件处理函数。sharedprocessor用来对接受的事件做分发处理。processorlistener则是具体执行事件处理的单元。总体逻辑就是sharedprocessor出DeltaFIFO中获取事件,然后分发的各个processorlistener上去执行。每通过sharedinformer注册事件处理函数时就会创建一个processorlistener。

processorlistener分析

现在我们从最末端的事件处理函数来分析,先来看下processorlistener是如何获取和处理事件的。下面是processorlistner接口和一些主要函数:

type processorListener struct {
        // 这个是run()读取用的
	nextCh chan interface{}
        // 这个是sharedprocessor分发事件用的
	addCh  chan interface{}
        // 事件处理函数
	handler ResourceEventHandler
        // 里面存还未进行分发的事件,从addCh过来之后暂存到这里。
	pendingNotifications buffer.RingGrowing

	requestedResyncPeriod time.Duration
	resyncPeriod time.Duration
	nextResync time.Time
	resyncLock sync.Mutex
}

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
	ret := &processorListener{
		nextCh:                make(chan interface{}),
		addCh:                 make(chan interface{}),
		handler:               handler,
		pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
		requestedResyncPeriod: requestedResyncPeriod,
		resyncPeriod:          resyncPeriod,
	}

	ret.determineNextResync(now)

	return ret
}

// 这个函数的作用就是不断从nextCh获取消息然后进行处理。
func (p *processorListener) run() {
	stopCh := make(chan struct{})
	wait.Until(func() {
		for next := range p.nextCh {
			switch notification := next.(type) {
			case updateNotification:
				p.handler.OnUpdate(notification.oldObj, notification.newObj)
			case addNotification:
				p.handler.OnAdd(notification.newObj)
			case deleteNotification:
				p.handler.OnDelete(notification.oldObj)
			default:
				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
			}
		}
		// the only way to get here is if the p.nextCh is empty and closed
		close(stopCh)
	}, 1*time.Second, stopCh)
}

// 这个函数的主要租用是在pendingNotifications中获取等待处理的事件发送到p.nextCh中,这样run()
// 函数就可以通过这个channel获取到事件并进行处理。pop()函数在sharedinformer启动或注册新的handler时启动运行。
func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			// Notification dispatched
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

shareprocessor分析

现在再来看下shareprocessor是如何处理事件的。

type sharedProcessor struct {
	listenersStarted bool
	listenersLock    sync.RWMutex
        // 上面介绍的processorlistener
	listeners        []*processorListener
	syncingListeners []*processorListener
	clock            clock.Clock
	wg               wait.Group
}

// 注册事件处理函数并启动
func (p *sharedProcessor) addListener(listener *processorListener) {
	p.listenersLock.Lock()
	defer p.listenersLock.Unlock()

	p.addListenerLocked(listener)
	if p.listenersStarted {
		p.wg.Start(listener.run)
		p.wg.Start(listener.pop)
	}
}

// 分发事件到各个listener中
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}
// 启动所有的listeners
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

sharedinformer分析

下面是SharedInformer接口:

type SharedInformer interface {
   AddEventHandler(handler ResourceEventHandler)
   AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
   GetStore() Store
   GetController() Controller
   Run(stopCh <-chan struct{})
   HasSynced() bool
   LastSyncResourceVersion() string
   SetWatchErrorHandler(handler WatchErrorHandler) error
}

type SharedIndexInformer interface {
   SharedInformer
   AddIndexers(indexers Indexers) error
   GetIndexer() Indexer
}

接下来看SharedInformer的创建方法

func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
   realClock := &clock.RealClock{}
   sharedIndexInformer := &sharedIndexInformer{
      // 共享的处理器
      processor:                       &sharedProcessor{clock: realClock},
      indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
      // 获取资源的ListAndWatch接口
      listerWatcher:                   lw,
      objectType:                      exampleObject,
      resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
      defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
      cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
      clock:                           realClock,
   }
   return sharedIndexInformer
}

// 创建sharedinformer只是传递了一个空的Indexers。
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
	return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
}

下面看sharedinformer的run()方法

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

        // 给Reflector用的
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,
                // 处理Reflector的Queue用的,这个函数很关键。从DeltaFIFO获取事件
                // 然后通过sharedprocessor进行分发处理
		Process:           s.HandleDeltas,
		WatchErrorHandler: s.watchErrorHandler,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		// 创建controller,参考上面普通informer的Controller部分
		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	s.controller.Run(stopCh)
}

最后再来看HandleDeltas这个函数

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// DeltaFIFO拿出来的都是Deltas类型
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
                        // 如果存储中已经有了这个对象,则执行更新操作
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				// 这个switch就是来判断是不是resync操作
				isSync := false
				switch {
				case d.Type == Sync:
					isSync = true
				case d.Type == Replaced:
					if accessor, err := meta.Accessor(d.Object); err == nil {
						if oldAccessor, err := meta.Accessor(old); err == nil {
							// Replaced events that didn't change resourceVersion are treated as resync events
							// and only propagated to listeners that requested resync
							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
						}
					}
				}
				// 分发更新事件
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				// 分发添加事件
				s.processor.distribute(addNotification{newObj: d.Object}, false)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			// 分发删除事件
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

截止到这里,sharedinformer的大体流程上就分析完了。可以看到,和普通informer不同的地方是从Reflector的Queue消费消息的处理逻辑,普通informer是直接就地处理,而sharedinformer是使用sharedprocessor分发的各个listener中,每个listener单独处理。

这部分代码也有很多细节地方笔者也还没有去完全研究完,比如多个事件处理函数在注册的时候都传递了resyncPeriod参数,如果这个值不同,他们怎么做到互相隔离的(Reflector只能设置一个同步时间,sharedinformer如何设置的这个时间)。这个client-go的设计还是比较复杂的,随着后面的学习和使用如有新的点继续在这里完善。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注