kube-apiserver是k8s中最为核心的组件,对外暴露restful接口,实现对集群中各种资源的增删改查操作。kubelet,kube-*组件都通过apiserver获取自己感兴趣的资源做处理,系统中所有的组件都只负责自己的部分,最终会促使各种资源到达期望的状态。
apiserver中的api资源是由组构成的,叫做ApiGroup,如apps,extensions,每个资源组下面又有不同类型的资源,称为Kind,如Deployment。每个分组下还会有不同的版本,在相同分组的不同版本下面相同的Kind的资源可能会有字段的增删等变更。因此apiserver需要能够正确处理不同版本资源之间的兼容性处理。从废弃策略的规则#2中可以了解到,不同版本之间的资源可以互相转换,并且不丢失任何信息,接下来分析apiserver是具体怎么实现的。本文基于k8s的1.8.0版本版本的代码。
首先看下apiserver的启动流程,下面是关键的函数调用链,去掉了不相关代码和条件判断等。
main() cmd/kube-apiserver/apiserver.go:31
NewAPIServerCommand() cmd/kube-apiserver/app/server.go:99
Run() cmd/kube-apiserver/app/server.go:151
CreateServerChain() cmd/kube-apiserver/app/server.go:169
# 返回的config.ExtraConfig中保存了
CreateKubeAPIServerConfig() cmd/kube-apiserver/app/server.go:273
buildGenericConfig() cmd/kube-apiserver/app/server.go:417
# 加载当前版本默认api资源配置
genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource() cmd/kube-apiserver/app/server.go:431
# 将参数配置中的--runtime-config应用到默认api资源配置
s.APIEnablement.ApplyTo(genericConfig, master.DefaultAPIResourceConfigSource(), legacyscheme.Scheme) cmd/kube-apiserver/app/server.go:449
# 设置存储端后端对应的api操作接口
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
# 将apiserver和storage的配置返回了,就是说apiserver和storage都是使用的用户自定义的配置对默认配置进行覆盖后的配置
# 返回的 genericConfig.MergedResourceConfig保存了api启用配置,类型为*storage.ResourceConfig
# 返回的storageFactory.APIResourceConfig也保存了api启用配置,类型也为*storage.ResourceConfig
CreateKubeAPIServer() cmd/kube-apiserver/app/server.go:191
kubeAPIServerConfig.Complete().New() cmd/kube-apiserver/app/server.go:219
# 启用legacy api(/api/v1)
m.InstallLegacyAPI() pkg/master/master.go:405
# 启用api(/apis/{groupn})
m.InstallAPIs()
m.GenericAPIServer.InstallAPIGroups() pkg/master/master.go:553
``
其中m.InstallLegacyAPI()是安装旧版接口,及/api/v1接口路径下的接口。我们主要看m.InstallAPIS这部分,这部分安装的接口的都在/apis/{group}路径下。
现在深入到InstallAPIGroups()这个函数中以及看后面的详细调用链。
// Exposes given api groups in the API.
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
// 安装api资源
s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels)
// type APIServerHandler struct {
// FullHandlerChain http.Handler
// GoRestfulContainer *restful.Container
// NonGoRestfulMux *mux.PathRecorderMux
// Director http.Handler
// }
// 其中restful.Container为go的restful框架中server实例
//
// type APIGroup struct {
// Name string
// Versions []GroupVersionForDiscovery
// PreferredVersion GroupVersionForDiscovery
// ServerAddressByClientCIDRs []ServerAddressByClientCIDR
// }
//
// type APIGroupHandler struct {
// serializer runtime.NegotiatedSerializer
// group v1.APIGroup
// }
// NewApiGroupHandler返回上述结构体,serializer为序列化和反序列化器
//
// type NegotiatedSerializer interface {
// SupportedMediaTypes() []SerializerInfo
// EncoderForVersion(serializer Encoder, gv GroupVersioner) Encoder
// DecoderToVersion(serializer Decoder, gv GroupVersioner) Decoder
// }
// 序列化器主要用来处理http请求,可将runtime.Oject对象编码为指定版本或从指定版本得到runtime.Object
// schema代表了k8s资源对象的内部表示形式
//
// APIGroupHandler实现了gorestful框架的WebService接口
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
}
这个函数主要将该group安装到 /apis/<groupName>路径下,访问这个路径能够获取所有支持的版本和描述等信息,不必重点看这块。
func (s *APIGroupHandler) WebService() *restful.WebService {
mediaTypes, _ := negotiation.MediaTypesForSerializer(s.serializer)
ws := new(restful.WebService)
ws.Path(APIGroupPrefix + "/" + s.group.Name)
ws.Doc("get information of a group")
ws.Route(ws.GET("/").To(s.handle).
Doc("get information of a group").
Operation("getAPIGroup").
Produces(mediaTypes...).
Consumes(mediaTypes...).
Writes(metav1.APIGroup{}))
return ws
}
// handle returns a handler which will return the api.GroupAndVersion of the group.
func (s *APIGroupHandler) handle(req *restful.Request, resp *restful.Response) {
s.ServeHTTP(resp.ResponseWriter, req.Request)
}
func (s *APIGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, &s.group)
}
下面是安装各种api资源的地方,将各group下的所有支持的不同版本注册到REST服务当中。
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
klog.Warningf("Skipping API %v because it has no resources.", groupVersion)
continue
}
apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
if apiGroupInfo.OptionsExternalVersion != nil {
apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
}
apiGroupVersion.OpenAPIModels = openAPIModels
apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes
if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
}
}
return nil
}
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
apiResources, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
return utilerrors.NewAggregate(registrationErrors)
}
func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var errors []error
ws := a.newWebService()
// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
paths := make([]string, len(a.group.Storage))
var i int = 0
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
for _, path := range paths {
// 将storage中定义的路径映射到api接口的路径当中,并传递对应的rest.Storage到handler函数当中。
// 在registerResourceHandlers当中会对这些路径注册不同的增删改查的操作
apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
if err != nil {
errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
}
if apiResource != nil {
apiResources = append(apiResources, *apiResource)
}
}
return apiResources, ws, errors
}
// 这个函数比较长,拿出最关键的关于POST和GET方法的路由注册
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
// 这个地方是针对创建资源的handler注册,将资源路径->handler注册到restful的WebService中。handler的是由restfulCreateResource创建的,其中的creater参数是rest.Creater接口,上面提到的rest.Storage里实现了对应的处理方法,如pkg/registry/apps/deployment/storage/storage.go中的DeploymentStorage,改接口中内嵌了REST结构,并且REST中内嵌了vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go中的Store结构,改结构实现了pkg/api/rest.StandardStorage中的标准接口。
case "POST": // Create a resource.
var handler restful.RouteFunction
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
article := GetArticleForNoun(kind, " ")
doc := "create" + article + kind
if isSubresource {
doc = "create " + subresource + " of" + article + kind
}
route := ws.POST(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
// TODO: in some cases, the API may return a v1.Status instead of the versioned object
// but currently go-restful can't handle multiple different objects being returned.
Returns(http.StatusCreated, "Created", producedObject).
Returns(http.StatusAccepted, "Accepted", producedObject).
Reads(defaultVersionedObject).
Writes(producedObject)
if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
return nil, err
}
addParams(route, action.Params)
routes = append(routes, route)
}
func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
}
}
func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
// 这行就是把请求的内容按照请求的版本解码出来,解码之后是一个runtime.Object对象
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
return func(w http.ResponseWriter, req *http.Request) {
requestFunc := func() (runtime.Object, error) {
// 这里的r.Create就调用到了Storage的创建接口了
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
}
result, err := requestFunc()
}
}
接下来就是使用etcd的实现将runtime.Object写到etcd中,关于etcd的存储初始化,没有具体跟踪。在写入etcd的时候数据也是经过反序列化的,apiserver中提供了protobuf和json两种序列化方法,目前看默认是使用protobuf对etcd中的数据进行编解码(从etcd获取二进制格式数据饭钱是代码中protobuf codec定义的魔术字段\x6b\x38\x73\x00)。
k8s中的资源对象从api的http请求中通过反序列化为runtime.Ojbect对象,在将这个对象序列化到etcd中,存到etcd中时会优先使用代码中定义的优先版本进行存储。如下面代码中把SetVersionPriority指定的优先级对换一下位置再看etcd中存储的数据及为v1beta2版本(测试用Deployment)。
+++ b/pkg/apis/apps/install/install.go
@@ -38,5 +38,5 @@ func Install(scheme *runtime.Scheme) {
utilruntime.Must(v1beta1.AddToScheme(scheme))
utilruntime.Must(v1beta2.AddToScheme(scheme))
utilruntime.Must(v1.AddToScheme(scheme))
- utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v1beta2.SchemeGroupVersion, v1beta1.SchemeGroupVersion))
+ utilruntime.Must(scheme.SetVersionPriority(v1beta2.SchemeGroupVersion, v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion))
}
另外apiserver在从etcd读取数据的时候是通过Decoder来解码数据的,上面说到实际数据前面有4个字节的protobuf的魔术数字,因此apiserver可以自动这个数字自动探测应该使用哪个解码器。数据解码之后也是runtime.Object对象,然后再序列化成客户端指定的json/yaml等格式。关于资源对象的编解码部分都在k8s.io/apimachinery这个包下,后续再详细对这个包进行分析介绍。