client-go代码分析之informer和sharedInformer分析

在茫茫代码中,很容易迷失。经过一段时间的艰苦探索,终于对client-go有了一定的了解,能够从整体上把相关功能串起来。本文主要是informer相关的内容,因为这是client-go比较核心的功能。后面还会发一篇关于client-go编程使用的文章,会从kubernetes.Interface,dynamic,discovery,workequeue和informer工厂函数等方面介绍,再结合一个operator作为实例进行分析(虽然大部分operator都使用kube-builder等类似的工具构建,但是本着求根问底的精神还是要分析一下)。 相关存储结构 在开始介绍之前,先介绍一下client-go中提供存储相关的接口,这些接口用来缓存api对象。Store是最基础的接口,提供增删改查等,还有一些继承于Store的接口,比Store可能会多实现一些方法,可以针对不同的场景进行使用。 Store ExpirationStore UndeltaStore Queue FIFO DeltaFIFO Indexer cache 不同Store的引用 Store接口的具体定义可以看vendor/k8s.io/client-go/tools/cache/store.go这个文件。在IDE的帮助下,找到了所有实现了这个Store的相关实现。本文主要关心client-go,所以主要跟进client-go下面的相关实现。 delta_fifo.go NewDeltaFIFO() NewDeltaFIFOWithOptions() informer和sharedinformer使用到了 vendor/k8s.io/client-go/tools/cache/controller.go:NewInformer() vendor/k8s.io/client-go/tools/cache/shared_informer.go:sharedIndexInformer.Run() expiration_cache.go NewTTLStore() pkg/kubelet/util/cache/object_cache.go:NewObjectCache() NewExpirationStore() pkg/credentialprovider/aws/aws_credentials.go:newECRProvider() pkg/credentialprovider/azure/azure_credentials.go:NewACRProvider() fifo.go 该文件中定义了Queue接口,继承了Store,并且在该文件中通过FIFO实现了这个queue,另外一个实现是在delta_fifo.go中的DeltaFIFO NewFIFO() 代码中未找到该引用 index.go 这个文件定义了Indexer接口,继承了Store,在store.go中通过cache实现了该接口 store.go NewStore() staging/src/k8s.io/client-go/tools/cache/controller.go:NewInformer() staging/src/k8s.io/client-go/tools/cache/undelta_store.go:NewUndeltaStore() NewIndexer() staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go:newWatchCache() staging/src/k8s.io/client-go/tools/cache/controller.go:NewIndexerInformer() staging/src/k8s.io/client-go/tools/cache/reflector.go:NewNamespaceKeyedIndexerAndReflector() 该函数在IDE中显示未被使用 staging/src/k8s.io/client-go/tools/cache/shared_informer.go:NewSharedIndexInformer() undelata_store.go NewUndeltaStore() pkg/kubelet/config/apiserver.go:newSourceApiserverFromLW() informer简要介绍 client-go中提供了普通informer和sharedInformer两种informer给我们使用。使用informer可以快速的构建各种资源的控制器,来对k8s进行扩展。informer提供了资源变化时执行回调的功能,可以在新增资源,修改资源和是删除资源时执行相应的控制器逻辑。 使用sharedInformer可以对同一个资源注册多个控制器,每个控制器独立执行自己的业务逻辑,互不干扰。多个控制器之间底层公用一个存储和一个到apiserver的连接,可以减少apiserver的压力的和本地内存的占用。informer则不具备这些能力,对于简单控制器可以使用informer,对于较复杂的控制器,会有针对同一个资源多个控制逻辑的推荐使用sharedInformer。 这两个informer在实现的复杂度上有很大差异,sharedInformer相对于普通informer来说要复杂的多。因此本文先从普通informer开始来进行分析,普通informer还分两种,一种是普通的informer,另外一种indexerinfomer。两种informer的区别在于前者使用Store作为对象存储,而后者采用Indexer作为对象存储,通过索引可以加快搜索。 普通informer分析 接下来我们以普通的informer进行分析,关于informer的使用可以参考这里。代码位于k8s.io/client-go/tools/cache/controller.go文件中,该文件中定义了Controller接口,并通过controller进行了实现。 controller分析 先看下接口定义以及controller包含的关键数据结构: // 这两个函数用来创建informer实例 func NewInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler,) (Store, Controller) {} func NewIndexerInformer(lw ListerWatcher, objType…

flannel源码简要分析

启动运行流程 启动流程 寻找要使用的主机网卡,该网卡会在创建flannel.1设备时作为dev参数使用,表示flannel.1设备绑定的网卡为该网卡,vxlan的出站和入站数据都会经过这个网卡。 接下来是创建网络管理器,网络管理器用来获取网络配置,本机的pod网络等。 创建后端插件,图中实例的是vxlan后端,类型是Backend,有一个RegisterNetwork方法,在vxlan后端中在这个方法中创建vxlan设备以及设置其ip地址,并返回一个backend.Network结构。 设置iptables的伪装和转发规则,其中的-A POSTROUTING ! -s 10.244.0.0/16 -d 10.244.0.0/16 -j MASQUERADE这条iptables规则导致host到容器网络的访问源地址使用flannel.1的ip。 运行backend.Networ.Run()方法持续不断的监听所有网络变更的事件,根据事件刷新相关的route,arp和fdb表到主机上。 定时对本机的租约进行续租(etcd时才用)。 功能组件 SubnetManager 网络管理器用来管理当前主机和集群主机的网络配置。 接口说明 GetNetworkConfig(ctx context.Context) (*Config, error) 获取当前节点的网络配置信息: type Config struct { Network ip.IP4Net SubnetMin ip.IP4 SubnetMax ip.IP4 SubnetLen uint BackendType string `json:"-"` Backend json.RawMessage `json:",omitempty"` } // 该配置由下列函数生成 func ParseConfig(s string) (*Config, error) { cfg := new(Config) err := json.Unmarshal([]byte(s), cfg) if err != nil { return nil, err } if cfg.SubnetLen…

k8s调研学习方向

网络 flannel/calico 组网和calico的ipam的实现 cilium 全面了解这个网络插件 metallb 全面了解这个网络插件,底层实现 kube-proxy 主要关注iptables规则,ipset和ipvs的使用 存储 csi 关注这个接口的具体定义实现 juicefs 关注源码层面的原理 glusterfs 试用与了解 minio 源码级研究 etcd 源码级研究 核心组件 kube-apiserver 主要关注存储的实现,性能与横向扩展 kubelet 主要关注底层创建pod的完整流程,包括cgroup,存储,cri,网络配置等 kube-scheduler 主要看调度的整体流程,以及基于批的调度实现 扩展组件 client-go 关注list-watch机制,informer原理,apiserver端的实现 operator和controller-runtime库,以及自己实现一个operator,并支持api多版本…

kube-apiserver代码分析 – API多版本再探

在上一篇API多版本初探中已经描述了API资源是如何注册到路由当中,并大概介绍了api请求如何根据优先版本存储到etcd中。本篇文章主要介绍apiserver如何将etcd中的数据按照按照用户指定的版本返回。在探究同一个api group下不同版本资源兼容问题之外,还会探究不同api group下的不同资源是如何做到兼容的。本文基于kubernetes-1.19.11版本。 本文关注的重点内容有: apiserver如何读取解析etcd中的数据 apiserver怎么返回用户指定的版本资源 不同api group怎么做到兼容,如netwoking.k8s.io/v1beta1下的Ingress资源和extensions/v1beta1下的Ingress资源 在阅读源码过程中在protobuf处理关键位置加入了打印堆栈日志,先找出整个函数调用链,再根据链路分析整个执行流程会更容易的多,具体修改和打印的栈信息可在后面的附录中查看。 首先看pkg/master/import_known_versions.go这个文件,该文件中有许多xxx/install的import。 import ( // These imports are the API groups the API server will support. _ "k8s.io/kubernetes/pkg/apis/admission/install" _ "k8s.io/kubernetes/pkg/apis/admissionregistration/install" _ "k8s.io/kubernetes/pkg/apis/apps/install" ... 来看下这个文件的作用。从文件的名称来看就知道是导入所有的所有已知的api版本,在install包的init函数中会将所有已定义的资源加载到runtime schema中,主要包含下面两类: 在pkg/apis/${group}/types.go中定义的类型 在k8s.io/api/${group}/${version}/types.go中定义的 第1类资源代表了内部版本的表示方法,是在etcd中读取的数据(或api请求数据)反序列化之后的表示类型。第2类资源为不同的group下特定version的类型,在写入etcd的(或api响应的)数据是根据特定group/version下的类型进行序列化的。 在install中有一个SetVersionPriority()方法,用来设置写入etcd是优先使用的版本。当然第2类资源在注册的时候还会加上一些设置默认值和转换到内部版本或从内部版本转换的方法来实现内部版本和外部版本之间的互相转换。这些方法的定义在pkg/apis/${group}/${version}下的default.go,conversion.go,zz_generated.xxx.go中定义的。 接下来以Ingress资源来说明转换的读取,转换,返回响应的流程,首先通过加日志查看当前runtime schema中可以看到有三个Ingress类型的定义。分别对应的是extensions/v1beta1,networking.k8s.io/v1beta1和内部版本,可以看到内部版本有两个,分别是extensions/__internal和networking.k8s.io/__internal,但是其他俩对应的实际类型都是pkg/apis/networking/types.go中定义的Ingress类型。 再来回顾一下加载api资源注册路由部分,有一个InstallAPIs()方法,该参数中的networkingrest.RESTStorageProvider{}来自pkg/registry/networking/rest/storage_settings.go中定义的storage接口,这个storage接口设置了ingresses资源对应ingressStorage。 func (p RESTStorageProvider) v1beta1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) { storage := map[string]rest.Storage{} // ingresses ingressStorage, ingressStatusStorage, err := ingressstore.NewREST(restOptionsGetter) if err != nil { return storage, err } storage["ingresses"]…

kube-apiserver代码分析 – API多版本初探

kube-apiserver是k8s中最为核心的组件,对外暴露restful接口,实现对集群中各种资源的增删改查操作。kubelet,kube-*组件都通过apiserver获取自己感兴趣的资源做处理,系统中所有的组件都只负责自己的部分,最终会促使各种资源到达期望的状态。 apiserver中的api资源是由组构成的,叫做ApiGroup,如apps,extensions,每个资源组下面又有不同类型的资源,称为Kind,如Deployment。每个分组下还会有不同的版本,在相同分组的不同版本下面相同的Kind的资源可能会有字段的增删等变更。因此apiserver需要能够正确处理不同版本资源之间的兼容性处理。从废弃策略的规则#2中可以了解到,不同版本之间的资源可以互相转换,并且不丢失任何信息,接下来分析apiserver是具体怎么实现的。本文基于k8s的1.8.0版本版本的代码。 首先看下apiserver的启动流程,下面是关键的函数调用链,去掉了不相关代码和条件判断等。 main() cmd/kube-apiserver/apiserver.go:31 NewAPIServerCommand() cmd/kube-apiserver/app/server.go:99 Run() cmd/kube-apiserver/app/server.go:151 CreateServerChain() cmd/kube-apiserver/app/server.go:169 # 返回的config.ExtraConfig中保存了 CreateKubeAPIServerConfig() cmd/kube-apiserver/app/server.go:273 buildGenericConfig() cmd/kube-apiserver/app/server.go:417 # 加载当前版本默认api资源配置 genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource() cmd/kube-apiserver/app/server.go:431 # 将参数配置中的--runtime-config应用到默认api资源配置 s.APIEnablement.ApplyTo(genericConfig, master.DefaultAPIResourceConfigSource(), legacyscheme.Scheme) cmd/kube-apiserver/app/server.go:449 # 设置存储端后端对应的api操作接口 storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig # 将apiserver和storage的配置返回了,就是说apiserver和storage都是使用的用户自定义的配置对默认配置进行覆盖后的配置 # 返回的 genericConfig.MergedResourceConfig保存了api启用配置,类型为*storage.ResourceConfig # 返回的storageFactory.APIResourceConfig也保存了api启用配置,类型也为*storage.ResourceConfig CreateKubeAPIServer() cmd/kube-apiserver/app/server.go:191 kubeAPIServerConfig.Complete().New() cmd/kube-apiserver/app/server.go:219 # 启用legacy api(/api/v1) m.InstallLegacyAPI() pkg/master/master.go:405 # 启用api(/apis/{groupn}) m.InstallAPIs() m.GenericAPIServer.InstallAPIGroups() pkg/master/master.go:553 `` 其中m.InstallLegacyAPI()是安装旧版接口,及/api/v1接口路径下的接口。我们主要看m.InstallAPIS这部分,这部分安装的接口的都在/apis/{group}路径下。 现在深入到InstallAPIGroups()这个函数中以及看后面的详细调用链。 // Exposes given api groups in the API. func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos…

linux配置多级服务器登录和隧道映射

通常,办公环境的电脑无法直接连接到开发测试服务器,往往需要进行多次ssh跳转。这时可通过配置ssh支持自动跳转登录功能。假设有2台服务器A和B在我们的开发测试环境中,本地我们只能连接到A服务器,而A服务器可以连接到B服务器。编辑~/.ssh/config文件,输入下列内容: Host serverA User root HostName 10.1.2.3 IdentityFile /home/myusername/.ssh/id_rsa Port 22 Host serverB User root HostName 192.168.1.3 IdentityFile /home/myusername/.ssh/id_rsa port 22 ProxyJump mypub 此时,在本地即可通过ssh serverB直接连接到服务器B的ssh服务。如果要使用免密方式进行登录。则需要执行ssh-copy-id root@serverA和ssh-copy-id root@serverB进行配置免密。 在进行了这样的配置之后,我们也可以更加方便的建立隧道来使用了。比如在服务器上有一个8080的http服务,想要在本地浏览器里访问。那么,可以执行下列命令建立隧道: ssh -Nf -Llocalhost:8080:192.168.1.3:8080 serverB 此命令会将serverB上的8080端口映射到本地的8080端口上,在浏览器中我们只需要输入http://localhost:8080可以访问到serverB上的web服务。 同样,也可以通过-R选项将本地端口映射到serverB的某个端口上,比如在微信公众号开发测试的时候,可以将配置在公众平台的服务器地址上的服务映射到本地。 为了避免网络连接中断导致隧道断开连接,可以使用systemd来保活,隧道进程退出后自动将其拉起: # cat /lib/systemd/system//my-http-proxy.service [Unit] Describe=Kmr nginx agent After=network.target [Service] LimitNOFILE=10000 Type=simple User=root Group=root ExecStart=/usr/bin/ssh -oExitOnForwardFailure=yes -oPubkeyAuthentication=yes -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null -oServerAliveInterval=5 -oServerAliveCountMax=3 -Llocalhost:8080:localhost:8080 -N root@serverB Restart = always RestartSec = 1s StartLimitInterval = 0 [Install] WantedBy=network.target 执行systemctl…

linux下tcp keepalive相关参数调整测试

首先说明下面三个和keepalive相关的内核参数以及默认的值 # sysctl -a | grep keepalive # 在会后一次发送数据包后多久向对方发起探测 net.ipv4.tcp_keepalive_time = 7200 # 在没有收到对方确认时,会按照这个时间间隔再次探测 net.ipv4.tcp_keepalive_intvl = 75 # 在没有收到对方确认时,进行探测的次数 net.ipv4.tcp_keepalive_probes = 9 下面通过在本地环境上测试这些参数,首先将本地的默认keepalive参数进行修改 # sysctl -a | grep keepalive net.ipv4.tcp_keepalive_intvl = 5 net.ipv4.tcp_keepalive_probes = 9 net.ipv4.tcp_keepalive_time = 20 下载并编译带keepalive功能支持的netcat命令行工具 git clone https://github.com/cyberelf/netcat-keepalive.git cd netcat-keepalive/ make linux 运行tcpdump进行抓包 tcpdump -iany port 18888 启动服务端监听 ./nckl-linux -v4K -l 18888 使用nc去连接 nc -v -p55666 localhost 18888 可以看到抓包内容如下 root@debian:/home/blue# tcpdump -iany port 18888 tcpdump: verbose…

记一次k8s集群pod一直terminating问题的排查

现象描述 pod一直处于terminating状态,或者很久才能删除,内核日志中持续打印unregister_netdevice: waiting for XXX to become free. Usage count = 1。 故障诊断 经过定位和排查,定位到是内核的一个bug导致网络设备无法删除。具体参考https://github.com/torvalds/linux/commit/ee60ad219f5c7c4fb2f047f88037770063ef785f。 另外在github的k8s的issues里也有该bug的相关讨论。有人给出了付现这个问题的方式,以及验证上面提到的修复方法是否有效。下面是按照他给出的方案做的复现和验证。具体可参考https://github.com/moby/moby/issues/5618#issuecomment-549333485。 问题排查 从kubelet内核日志来看是在删除pod的网卡设备时因为内核的引用计数bug,导致无法删除。后续对网卡信息的查询和再次删除操作应该也会导致超时失败(根据日志推断,暂时还未在代码中找到对应调用,线上环境也没法重启调整日志级别和调试)。 首先需要看一个概念:PLEG。 PLEG (pod lifecycle event generator) 是 kubelet 中一个非常重要的模块,它主要完成以下几个目标: 从 runtime 中获取 pod 当前状态,产生 pod lifecycle events 从 runtime 中获取 pod 当前状态,更新 kubelet pod cache 接下来分析一下造成问题的原因应该是k8s的PLEG在同步pod信息时,可能要查询网卡详情(ip地址),由于内核bug导致超时,致使syncLoop中每执行一次遍历的时间过长(4分钟左右),因此新建pod和删除pod的时候,node上的信息和server上的信息更新不及时。用busybox测试创建和删除时,通过docker ps可以看到响应容器很快就可以启动或删除掉。 从图中可以看到该日志:Calico CNI deleting device in netns /proc/16814/ns/net这条。这是在pod执行删除是产生的。在正常情况下后面会有删除完成的日志信息,如下图: 但上面的日志里的无此信息,并且10s后打印了unregister_netdevice xxx的日志。这里是触发了内核bug。通过ps aux | grep calico也可以看到在对应时间有一个calico进程启动去执行操作,目前这个进程还在(10.209.33.105),这里估计k8s也有bug,没有wait pid,导致calico成为僵尸进程。 下图是kubelet日志。其中的PLEG is not healthy日志也是在对应的时间点出现。 问题本地复现 要在本地复现这个问题,首先需要给内核打补丁来协助复现。 diff --git a/net/ipv4/route.c b/net/ipv4/route.c index a0163c5..6b9e7ee…

轮指和修指甲小计

轮指 拇指要放松才能弹均匀,四个手指头下意识去独立控制,ima拨弦时要快速发力。90左右慢速练习时要把声音弹足够大和清晰。 指甲 由于m型指甲,斜坡不用太长,斜坡太长会有指甲摩擦琴弦的声音,最高坡点在手指中心偏向小指方向一点点的位置。指甲不用太长,从正面看相对于指肚多出1至2毫米即可。拨弦方向在标准姿势稍垂直于琴弦一点点,可以有效利用m型指甲中间凹点带动下压琴弦。合理的指甲形状可以通过拨弦时的阻力确定,阻力太小太顺滑发不出圆润的声音,因为无法有效的下压琴弦,要调整到一个阻力合适的形状。…

古典吉他学习第十七十八节课总结

课上主要看练习效果。注意练习的时候声音要大,要放开。注意保留指,旋律音不要用保留指,是多少时值就响多久。伴奏是和弦的话可以。要区分出旋律和伴奏,也要注意多个声部以及旋律在声部之间的变换。还需在深入。 对于节奏的把握需要继续练,尤其附点音符。 练习 1. 和声音程 过 2. 《铜壶的四周》 八分音符 = 75 90 105 3. 《古典舞蹈》 八分音符 = 70 80 90 4. 《土拨鼠》 八分音符 = 70 80 90 5. 视奏《小精灵》、《多年以后》、《冒险记》…