kube-apiserver代码分析 – API多版本再探

欢迎加入本站的kubernetes技术交流群,微信添加:加Blue_L。


在上一篇API多版本初探中已经描述了API资源是如何注册到路由当中,并大概介绍了api请求如何根据优先版本存储到etcd中。本篇文章主要介绍apiserver如何将etcd中的数据按照按照用户指定的版本返回。在探究同一个api group下不同版本资源兼容问题之外,还会探究不同api group下的不同资源是如何做到兼容的。本文基于kubernetes-1.19.11版本。

本文关注的重点内容有:

  1. apiserver如何读取解析etcd中的数据
  2. apiserver怎么返回用户指定的版本资源
  3. 不同api group怎么做到兼容,如netwoking.k8s.io/v1beta1下的Ingress资源和extensions/v1beta1下的Ingress资源

在阅读源码过程中在protobuf处理关键位置加入了打印堆栈日志,先找出整个函数调用链,再根据链路分析整个执行流程会更容易的多,具体修改和打印的栈信息可在后面的附录中查看。

首先看pkg/master/import_known_versions.go这个文件,该文件中有许多xxx/install的import。

import (
   // These imports are the API groups the API server will support.
   _ "k8s.io/kubernetes/pkg/apis/admission/install"
   _ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
   _ "k8s.io/kubernetes/pkg/apis/apps/install"
   ...

来看下这个文件的作用。从文件的名称来看就知道是导入所有的所有已知的api版本,在install包的init函数中会将所有已定义的资源加载到runtime schema中,主要包含下面两类:

  1. 在pkg/apis/${group}/types.go中定义的类型
  2. 在k8s.io/api/${group}/${version}/types.go中定义的

第1类资源代表了内部版本的表示方法,是在etcd中读取的数据(或api请求数据)反序列化之后的表示类型。第2类资源为不同的group下特定version的类型,在写入etcd的(或api响应的)数据是根据特定group/version下的类型进行序列化的。

在install中有一个SetVersionPriority()方法,用来设置写入etcd是优先使用的版本。当然第2类资源在注册的时候还会加上一些设置默认值和转换到内部版本或从内部版本转换的方法来实现内部版本和外部版本之间的互相转换。这些方法的定义在pkg/apis/${group}/${version}下的default.go,conversion.go,zz_generated.xxx.go中定义的。

接下来以Ingress资源来说明转换的读取,转换,返回响应的流程,首先通过加日志查看当前runtime schema中可以看到有三个Ingress类型的定义。分别对应的是extensions/v1beta1,networking.k8s.io/v1beta1和内部版本,可以看到内部版本有两个,分别是extensions/__internal和networking.k8s.io/__internal,但是其他俩对应的实际类型都是pkg/apis/networking/types.go中定义的Ingress类型。

再来回顾一下加载api资源注册路由部分,有一个InstallAPIs()方法,该参数中的networkingrest.RESTStorageProvider{}来自pkg/registry/networking/rest/storage_settings.go中定义的storage接口,这个storage接口设置了ingresses资源对应ingressStorage。

func (p RESTStorageProvider) v1beta1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) { 
   storage := map[string]rest.Storage{} 
   // ingresses 
   ingressStorage, ingressStatusStorage, err := ingressstore.NewREST(restOptionsGetter) 
   if err != nil { 
      return storage, err 
   } 
   storage["ingresses"] = ingressStorage 
   storage["ingresses/status"] = ingressStatusStorage 
 
   // ingressclasses 
   ingressClassStorage, err := ingressclassstore.NewREST(restOptionsGetter) 
   if err != nil { 
      return storage, err 
   } 
   storage["ingressclasses"] = ingressClassStorage 
 
   return storage, nil 
}

这个ingressStorage来自pkg/registry/networking/ingress/storage/storage.go,该接口中有下列一个New方法

// New creates an instance of the StatusREST object 
func (r *StatusREST) New() runtime.Object { 
   return &networking.Ingress{} 
}

这个返回的networking.Ingress也是(k8s.io/kubernetes)pkg/apis/networking/types.go中定义的Ingress类型。

rest handler根据URL中资源路径得到在etcd中对应的key,使用runtime中的universal decoder根据前4个字节魔术数字判断应使用protobuf解码该value。在protobuf进行数据的反序列化时,首先会解析出数据中的GroupVersionKind字段,如extensions/v1beta1Ingress或networking.k8s.io,然后在runtime注册表中找到这个GroupVersionKind对用的类型,分别对应的是k8s.io/api/networking/v1beta1/register.go和k8s.io/api/extensions/v1beta1/register.go。protobuf将数据反序列化为对应的Struct,并设置缺省字段默认值。

然后解码器将其转换为api资源中注册的版本(k8s.io/kubernetes/pkg/apis/networking),比如__internal版本,当然并不是每个资源都有__internal版本,如apps/v1,对于读取apps/v1beta1数据来说apps/v1就是相当于内部版本,因为api资源中注册的也是这个下面的定义。内部版本的概念其实是说读取之后的存储的struct,这个struct会在通过转换函数转换成客户端请求的版本。

修改k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go,加上一些打印日志。

func (c *codec) Decode(data []byte, defaultGVK *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { 
   // If the into object is unstructured and expresses an opinion about its group/version, 
   // create a new instance of the type so we always exercise the conversion path (skips short-circuiting on `into == obj`) 
   decodeInto := into 
   if into != nil { 
      if _, ok := into.(runtime.Unstructured); ok && !into.GetObjectKind().GroupVersionKind().GroupVersion().Empty() { 
         decodeInto = reflect.New(reflect.TypeOf(into).Elem()).Interface().(runtime.Object) 
      } 
   } 
 
   obj, gvk, err := c.decoder.Decode(data, defaultGVK, decodeInto) 
   if err != nil { 
      return nil, gvk, err 
   } 
   klog.V(1).Info("type obj %v, type into %v", reflect.TypeOf(obj), reflect.TypeOf(decodeInto)) 
 
   if d, ok := obj.(runtime.NestedObjectDecoder); ok { 
      if err := d.DecodeNestedObjects(runtime.WithoutVersionDecoder{c.decoder}); err != nil { 
         return nil, gvk, err 
      } 
   } 
 
   // if we specify a target, use generic conversion. 
   if into != nil { 
      // perform defaulting if requested 
      if c.defaulter != nil { 
         c.defaulter.Default(obj) 
      } 
 
      // Short-circuit conversion if the into object is same object 
      if into == obj { 
         return into, gvk, nil 
      } 
      klog.V(1).Info("decodeVersion %v", c.decodeVersion) 
      if err := c.convertor.Convert(obj, into, c.decodeVersion); err != nil { 
         return nil, gvk, err 
      } 
      klog.V(1).Info("return type %v", reflect.TypeOf(into)) 
      return into, gvk, nil 
   } 
 
   // perform defaulting if requested 
   if c.defaulter != nil { 
      c.defaulter.Default(obj) 
   } 
 
   out, err := c.convertor.ConvertToVersion(obj, c.decodeVersion) 
   if err != nil { 
      return nil, gvk, err 
   } 
   return out, gvk, nil 
}

curl http://localhost:8080/apis/extensions/v1beta1/namespaces/default/ingresses/minimal-ingress 观察日志打印内容

I0525 14:09:42.242142   11564 versioning.go:140] type obj %v, type into %v*v1beta1.Ingress *networking.Ingress
I0525 14:09:42.242152   11564 versioning.go:159] decodeVersion %v{{networking.k8s.io __internal} [{networking.k8s.io } {networking.k8s.io }] true}
I0525 14:09:42.242165   11564 versioning.go:163] return type %v*networking.Ingress
I0525 14:09:42.242188   11564 response.go:134] transformedObject *networking.Ingress
I0525 14:09:42.242192   11564 response.go:140] target kind networking.k8s.io/v1, Kind=Ingress, {0xc00058a0e0 [{[application/json] application/json [json] true 0xc0002fc0f0 0xc0002fc1e0 []  {} 0xc0002fc0f0} {[application/yaml] application/yaml [yaml] true 0xc0002fc3c0 <nil> []  <nil> <nil>} {[application/vnd.kubernetes.protobuf] application/vnd.kubernetes.protobuf [pb] false 0xc000550000 <nil> []  {} 0xc000542060}] 0xc000474150 [{application/json application json true 0xc0002fc0f0 0xc0002fc1e0 0xc00057ced0} {application/yaml application yaml true 0xc0002fc3c0 <nil> <nil>} {application/vnd.kubernetes.protobuf application vnd.kubernetes.protobuf false 0xc000550000 <nil> 0xc00057cf60}] 0xc0002fc0f0}

此时,无论etcd中存储的是什么版本,在解码器解码之后都变成了内部版本。接下来就是apiserver如何将这个内部版本转换为客户端请求的版本。

先从k8s.io/apiserver/pkg/endpoints/handlers/get.go开始看

// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
func GetResource(r rest.Getter, e rest.Exporter, scope *RequestScope) http.HandlerFunc {
   return getResourceHandler(scope,
      func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
         return r.Get(ctx, name, &options)
      })
}

func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc {
   return func(w http.ResponseWriter, req *http.Request) {
      result, err := getter(ctx, name, req, trace)
      transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
   }
}

k8s.io/apiserver/pkg/endpoints/handlers/response.go

// transformResponseObject takes an object loaded from storage and performs any necessary transformations.
// Will write the complete response object.
func transformResponseObject(ctx context.Context, scope *RequestScope, trace *utiltrace.Trace, req *http.Request, w http.ResponseWriter, statusCode int, mediaType negotiation.MediaTypeOptions, result runtime.Object) {
   options, err := optionsForTransform(mediaType, req)
   if err != nil {
      scope.err(err, w, req)
      return
   }
   obj, err := transformObject(ctx, result, options, mediaType, scope, req)
   klog.V(1).Infof("transformedObject %v", reflect.TypeOf(obj))
   if err != nil {
      scope.err(err, w, req)
      return
   }
   kind, serializer, _ := targetEncodingForTransform(scope, mediaType, req)
   klog.V(1).Infof("target kind %v, %v", kind, serializer)
   responsewriters.WriteObjectNegotiated(serializer, scope, kind.GroupVersion(), w, req, statusCode, obj)
}

执行下列命令访问不同group version下的地址请求不同版本的资源,看到日志中的target kind是相应的请求的版本。 curl curl http://localhost:8080/apis/networking.k8s.io/v1beta1/namespaces/default/ingresses/minimal-ingress

I0525 14:09:55.237780   11564 response.go:134] transformedObject *networking.Ingress
I0525 14:09:55.237785   11564 response.go:140] target kind networking.k8s.io/v1beta1, Kind=Ingress, {0xc00058a0e0 [{[application/json] application/json [json] true 0xc0002fc0f0 0xc0002fc1e0 []  {} 0xc0002fc0f0} {[application/yaml] application/yaml [yaml] true 0xc0002fc3c0 <nil> []  <nil> <nil>} {[application/vnd.kubernetes.protobuf] application/vnd.kubernetes.protobuf [pb] false 0xc000550000 <nil> []  {} 0xc000542060}] 0xc000474150 [{application/json application json true 0xc0002fc0f0 0xc0002fc1e0 0xc00057ced0} {application/yaml application yaml true 0xc0002fc3c0 <nil> <nil>} {application/vnd.kubernetes.protobuf application vnd.kubernetes.protobuf false 0xc000550000 <nil> 0xc00057cf60}] 0xc0002fc0f0}

curl http://localhost:8080/apis/extensions/v1beta1/namespaces/default/ingresses/minimal-ingress

I0525 14:11:24.986629   11564 response.go:134] transformedObject *networking.Ingress
I0525 14:11:24.986633   11564 response.go:140] target kind extensions/v1beta1, Kind=Ingress, {0xc00058a0e0 [{[application/json] application/json [json] true 0xc0002fc0f0 0xc0002fc1e0 []  {} 0xc0002fc0f0} {[application/yaml] application/yaml [yaml] true 0xc0002fc3c0 <nil> []  <nil> <nil>} {[application/vnd.kubernetes.protobuf] application/vnd.kubernetes.protobuf [pb] false 0xc000550000 <nil> []  {} 0xc000542060}] 0xc000474150 [{application/json application json true 0xc0002fc0f0 0xc0002fc1e0 0xc00057ced0} {application/yaml application yaml true 0xc0002fc3c0 <nil> <nil>} {application/vnd.kubernetes.protobuf application vnd.kubernetes.protobuf false 0xc000550000 <nil> 0xc00057cf60}] 0xc0002fc0f0}

具体的转换由k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go中的WriteObjectNegotiated方法实现,大概逻辑就是从runtime中获取一个转换pair下的所有转换函数,并以此执行。如

(networking.Ingress,extensions/vbeta1.Ingress)-> [funcA, funcB] (networking.Ingress,networking.k8s.io/1beta1.Ingress)-> [funcA, funcB, funcC]

到此为止,在文章开头处提到的三个重点问题都得到了答案。另外对于很多细节问题还没有深入探索,如default,convert函数怎么写,code-generator如何自动生成代码,以及Serializer的详细编解码过程等,后面有机会在深入分析。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注