kube-apiserver代码分析 – API多版本初探

kube-apiserver是k8s中最为核心的组件,对外暴露restful接口,实现对集群中各种资源的增删改查操作。kubelet,kube-*组件都通过apiserver获取自己感兴趣的资源做处理,系统中所有的组件都只负责自己的部分,最终会促使各种资源到达期望的状态。

apiserver中的api资源是由组构成的,叫做ApiGroup,如apps,extensions,每个资源组下面又有不同类型的资源,称为Kind,如Deployment。每个分组下还会有不同的版本,在相同分组的不同版本下面相同的Kind的资源可能会有字段的增删等变更。因此apiserver需要能够正确处理不同版本资源之间的兼容性处理。从废弃策略规则#2中可以了解到,不同版本之间的资源可以互相转换,并且不丢失任何信息,接下来分析apiserver是具体怎么实现的。本文基于k8s的1.8.0版本版本的代码。

首先看下apiserver的启动流程,下面是关键的函数调用链,去掉了不相关代码和条件判断等。

main() cmd/kube-apiserver/apiserver.go:31
  NewAPIServerCommand() cmd/kube-apiserver/app/server.go:99
    Run() cmd/kube-apiserver/app/server.go:151
      CreateServerChain() cmd/kube-apiserver/app/server.go:169
        # 返回的config.ExtraConfig中保存了
        CreateKubeAPIServerConfig() cmd/kube-apiserver/app/server.go:273
          buildGenericConfig() cmd/kube-apiserver/app/server.go:417
            # 加载当前版本默认api资源配置
            genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource() cmd/kube-apiserver/app/server.go:431
            # 将参数配置中的--runtime-config应用到默认api资源配置
            s.APIEnablement.ApplyTo(genericConfig, master.DefaultAPIResourceConfigSource(), legacyscheme.Scheme) cmd/kube-apiserver/app/server.go:449
            # 设置存储端后端对应的api操作接口
            storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
            # 将apiserver和storage的配置返回了,就是说apiserver和storage都是使用的用户自定义的配置对默认配置进行覆盖后的配置
            # 返回的 genericConfig.MergedResourceConfig保存了api启用配置,类型为*storage.ResourceConfig
            # 返回的storageFactory.APIResourceConfig也保存了api启用配置,类型也为*storage.ResourceConfig
        CreateKubeAPIServer() cmd/kube-apiserver/app/server.go:191
          kubeAPIServerConfig.Complete().New() cmd/kube-apiserver/app/server.go:219
            # 启用legacy api(/api/v1)
            m.InstallLegacyAPI() pkg/master/master.go:405
            # 启用api(/apis/{groupn})
            m.InstallAPIs() 
              m.GenericAPIServer.InstallAPIGroups() pkg/master/master.go:553
``

其中m.InstallLegacyAPI()是安装旧版接口,及/api/v1接口路径下的接口。我们主要看m.InstallAPIS这部分,这部分安装的接口的都在/apis/{group}路径下。

现在深入到InstallAPIGroups()这个函数中以及看后面的详细调用链。

// Exposes given api groups in the API.
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
    // 安装api资源
    s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels)

    // type APIServerHandler struct {
    //    FullHandlerChain   http.Handler
    //    GoRestfulContainer *restful.Container
    //    NonGoRestfulMux    *mux.PathRecorderMux
    //    Director           http.Handler
    // }
    // 其中restful.Container为go的restful框架中server实例
    //
    // type APIGroup struct {
    //    Name                       string
    //    Versions                   []GroupVersionForDiscovery
    //    PreferredVersion           GroupVersionForDiscovery
    //    ServerAddressByClientCIDRs []ServerAddressByClientCIDR
    // }
    //
    // type APIGroupHandler struct {
    //     serializer runtime.NegotiatedSerializer
    //     group      v1.APIGroup
    // }
    // NewApiGroupHandler返回上述结构体,serializer为序列化和反序列化器
    //
    // type NegotiatedSerializer interface {
    //     SupportedMediaTypes() []SerializerInfo
    //     EncoderForVersion(serializer Encoder, gv GroupVersioner) Encoder
    //     DecoderToVersion(serializer Decoder, gv GroupVersioner) Decoder
    // }
    // 序列化器主要用来处理http请求,可将runtime.Oject对象编码为指定版本或从指定版本得到runtime.Object
    // schema代表了k8s资源对象的内部表示形式
    //
    // APIGroupHandler实现了gorestful框架的WebService接口
    s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
}

这个函数主要将该group安装到 /apis/<groupName>路径下,访问这个路径能够获取所有支持的版本和描述等信息,不必重点看这块。

func (s *APIGroupHandler) WebService() *restful.WebService {
    mediaTypes, _ := negotiation.MediaTypesForSerializer(s.serializer)
    ws := new(restful.WebService)
    ws.Path(APIGroupPrefix + "/" + s.group.Name)
    ws.Doc("get information of a group")
    ws.Route(ws.GET("/").To(s.handle).
        Doc("get information of a group").
        Operation("getAPIGroup").
        Produces(mediaTypes...).
        Consumes(mediaTypes...).
        Writes(metav1.APIGroup{}))
    return ws
}

// handle returns a handler which will return the api.GroupAndVersion of the group.
func (s *APIGroupHandler) handle(req *restful.Request, resp *restful.Response) {
    s.ServeHTTP(resp.ResponseWriter, req.Request)
}

func (s *APIGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, &s.group)
}

下面是安装各种api资源的地方,将各group下的所有支持的不同版本注册到REST服务当中。

// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
    for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
        if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
            klog.Warningf("Skipping API %v because it has no resources.", groupVersion)
            continue
        }

        apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
        if apiGroupInfo.OptionsExternalVersion != nil {
            apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
        }
        apiGroupVersion.OpenAPIModels = openAPIModels
        apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes

        if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
            return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
        }
    }

    return nil
}

func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
    prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
    installer := &APIInstaller{
        group:             g,
        prefix:            prefix,
        minRequestTimeout: g.MinRequestTimeout,
    }

    apiResources, ws, registrationErrors := installer.Install()
    versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
    versionDiscoveryHandler.AddToWebService(ws)
    container.Add(ws)
    return utilerrors.NewAggregate(registrationErrors)
}


func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
    var apiResources []metav1.APIResource
    var errors []error
    ws := a.newWebService()

    // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
    paths := make([]string, len(a.group.Storage))
    var i int = 0
    for path := range a.group.Storage {
        paths[i] = path
        i++
    }
    sort.Strings(paths)
    for _, path := range paths {
        // 将storage中定义的路径映射到api接口的路径当中,并传递对应的rest.Storage到handler函数当中。
        // 在registerResourceHandlers当中会对这些路径注册不同的增删改查的操作
        apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
        if err != nil {
            errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
        }
        if apiResource != nil {
            apiResources = append(apiResources, *apiResource)
        }
    }
    return apiResources, ws, errors
}

// 这个函数比较长,拿出最关键的关于POST和GET方法的路由注册
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
        // 这个地方是针对创建资源的handler注册,将资源路径->handler注册到restful的WebService中。handler的是由restfulCreateResource创建的,其中的creater参数是rest.Creater接口,上面提到的rest.Storage里实现了对应的处理方法,如pkg/registry/apps/deployment/storage/storage.go中的DeploymentStorage,改接口中内嵌了REST结构,并且REST中内嵌了vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go中的Store结构,改结构实现了pkg/api/rest.StandardStorage中的标准接口。
        case "POST": // Create a resource.
            var handler restful.RouteFunction
            if isNamedCreater {
                handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
            } else {
                handler = restfulCreateResource(creater, reqScope, admit)
            }
            handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
            article := GetArticleForNoun(kind, " ")
            doc := "create" + article + kind
            if isSubresource {
                doc = "create " + subresource + " of" + article + kind
            }
            route := ws.POST(action.Path).To(handler).
                Doc(doc).
                Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
                Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
                Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
                Returns(http.StatusOK, "OK", producedObject).
                // TODO: in some cases, the API may return a v1.Status instead of the versioned object
                // but currently go-restful can't handle multiple different objects being returned.
                Returns(http.StatusCreated, "Created", producedObject).
                Returns(http.StatusAccepted, "Accepted", producedObject).
                Reads(defaultVersionedObject).
                Writes(producedObject)
            if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
                return nil, err
            }
            addParams(route, action.Params)
            routes = append(routes, route)
}

func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
    return func(req *restful.Request, res *restful.Response) {
        handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
    }
}

func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
    return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}

func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
    // 这行就是把请求的内容按照请求的版本解码出来,解码之后是一个runtime.Object对象
    obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
    return func(w http.ResponseWriter, req *http.Request) {
        requestFunc := func() (runtime.Object, error) {
            // 这里的r.Create就调用到了Storage的创建接口了
            return r.Create(
                ctx,
                name,
                obj,
                rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
                options,
            )
        }
        result, err := requestFunc()
    }
}

接下来就是使用etcd的实现将runtime.Object写到etcd中,关于etcd的存储初始化,没有具体跟踪。在写入etcd的时候数据也是经过反序列化的,apiserver中提供了protobuf和json两种序列化方法,目前看默认是使用protobuf对etcd中的数据进行编解码(从etcd获取二进制格式数据饭钱是代码中protobuf codec定义的魔术字段\x6b\x38\x73\x00)。

k8s中的资源对象从api的http请求中通过反序列化为runtime.Ojbect对象,在将这个对象序列化到etcd中,存到etcd中时会优先使用代码中定义的优先版本进行存储。如下面代码中把SetVersionPriority指定的优先级对换一下位置再看etcd中存储的数据及为v1beta2版本(测试用Deployment)。

+++ b/pkg/apis/apps/install/install.go
@@ -38,5 +38,5 @@ func Install(scheme *runtime.Scheme) {
        utilruntime.Must(v1beta1.AddToScheme(scheme))
        utilruntime.Must(v1beta2.AddToScheme(scheme))
        utilruntime.Must(v1.AddToScheme(scheme))
-       utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v1beta2.SchemeGroupVersion, v1beta1.SchemeGroupVersion))
+       utilruntime.Must(scheme.SetVersionPriority(v1beta2.SchemeGroupVersion, v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion))
 }

另外apiserver在从etcd读取数据的时候是通过Decoder来解码数据的,上面说到实际数据前面有4个字节的protobuf的魔术数字,因此apiserver可以自动这个数字自动探测应该使用哪个解码器。数据解码之后也是runtime.Object对象,然后再序列化成客户端指定的json/yaml等格式。关于资源对象的编解码部分都在k8s.io/apimachinery这个包下,后续再详细对这个包进行分析介绍。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注