flannel源码简要分析

启动运行流程

启动流程

  1. 寻找要使用的主机网卡,该网卡会在创建flannel.1设备时作为dev参数使用,表示flannel.1设备绑定的网卡为该网卡,vxlan的出站和入站数据都会经过这个网卡。
  2. 接下来是创建网络管理器,网络管理器用来获取网络配置,本机的pod网络等。
  3. 创建后端插件,图中实例的是vxlan后端,类型是Backend,有一个RegisterNetwork方法,在vxlan后端中在这个方法中创建vxlan设备以及设置其ip地址,并返回一个backend.Network结构。
  4. 设置iptables的伪装和转发规则,其中的-A POSTROUTING ! -s 10.244.0.0/16 -d 10.244.0.0/16 -j MASQUERADE这条iptables规则导致host到容器网络的访问源地址使用flannel.1的ip。
  5. 运行backend.Networ.Run()方法持续不断的监听所有网络变更的事件,根据事件刷新相关的route,arp和fdb表到主机上。
  6. 定时对本机的租约进行续租(etcd时才用)。

功能组件

SubnetManager

网络管理器用来管理当前主机和集群主机的网络配置。

接口说明

GetNetworkConfig(ctx context.Context) (*Config, error)

获取当前节点的网络配置信息:

type Config struct {
   Network     ip.IP4Net
   SubnetMin   ip.IP4
   SubnetMax   ip.IP4
   SubnetLen   uint
   BackendType string          `json:"-"`
   Backend     json.RawMessage `json:",omitempty"`
}

// 该配置由下列函数生成
func ParseConfig(s string) (*Config, error) {
   cfg := new(Config)
   err := json.Unmarshal([]byte(s), cfg)
   if err != nil {
      return nil, err
   }

   if cfg.SubnetLen > 0 {
      if cfg.SubnetLen < cfg.Network.PrefixLen {
         return nil, errors.New("HostSubnet is larger network than Network")
      }
   } else {
      // try to give each host a /24 but if the whole network
      // is /24 or smaller, half the network
      if cfg.Network.PrefixLen < 24 {
         cfg.SubnetLen = 24
      } else {
         cfg.SubnetLen = cfg.Network.PrefixLen + 1
      }
   }

   subnetSize := ip.IP4(1 << (32 - cfg.SubnetLen))

   if cfg.SubnetMin == ip.IP4(0) {
      // skip over the first subnet otherwise it causes problems. e.g.
      // if Network is 10.100.0.0/16, having an interface with 10.0.0.0
      // makes ping think it's a broadcast address (not sure why)
      cfg.SubnetMin = cfg.Network.IP + subnetSize
   } else if !cfg.Network.Contains(cfg.SubnetMin) {
      return nil, errors.New("SubnetMin is not in the range of the Network")
   }

   if cfg.SubnetMax == ip.IP4(0) {
      cfg.SubnetMax = cfg.Network.Next().IP - subnetSize
   } else if !cfg.Network.Contains(cfg.SubnetMax) {
      return nil, errors.New("SubnetMax is not in the range of the Network")
   }

   bt, err := parseBackendType(cfg.Backend)
   if err != nil {
      return nil, err
   }
   cfg.BackendType = bt

   return cfg, nil
}
AcquireLease(ctx context.Context, attrs *LeaseAttrs) (*Lease, error)

获取当前节点的租约信息,Lease结构如下,在vxlan插件注册网络时会调用这个方法获取当前节点的pod网络信息。

type Lease struct {
   Subnet     ip.IP4Net
   Attrs      LeaseAttrs
   Expiration time.Time

   Asof uint64
}
RenewLease(ctx context.Context, lease *Lease) error

续租当前节点的租约,只针对etcd时有效。

WatchLease(ctx context.Context, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error)

监听当前节点的租约变化,只针对etcd时有效。

WatchLeases(ctx context.Context, cursor interface{}) (LeaseWatchResult, error)

监听所有节点的租约信息变化,已在当前节点上操作修改路由表,arp表和fdb表。

基于k8s的网络管理器

func NewSubnetManager(apiUrl, kubeconfig string) (subnet.Manager, error) {
   ...
   // 创建时通过client-go包直接后台启动了一个nodes caches,将apiserver的list-watch事件通过ksm.events传递给WatchLeases()处理。
   sm, err := newKubeSubnetManager(c, sc, nodeName)
   go sm.Run(context.Background())
   ...
}

func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName string) (*kubeSubnetManager, error) {
   var ksm kubeSubnetManager
   ksm.client = c
   ksm.nodeName = nodeName
   ksm.subnetConf = sc
   ksm.events = make(chan subnet.Event, 5000)
   indexer, controller := cache.NewIndexerInformer(
      &cache.ListWatch{
         ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
            return ksm.client.CoreV1().Nodes().List(options)
         },
         WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
            return ksm.client.CoreV1().Nodes().Watch(options)
         },
      },
      &v1.Node{},
      resyncPeriod,
      cache.ResourceEventHandlerFuncs{
         AddFunc: func(obj interface{}) {
            ksm.handleAddLeaseEvent(subnet.EventAdded, obj)
         },
         UpdateFunc: ksm.handleUpdateLeaseEvent,
         DeleteFunc: func(obj interface{}) {
            ksm.handleAddLeaseEvent(subnet.EventRemoved, obj)
         },
      },
      cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
   )
   ksm.nodeController = controller
   ksm.nodeStore = listers.NewNodeLister(indexer)
   return &ksm, nil
}

func (ksm *kubeSubnetManager) Run(ctx context.Context) {
   glog.Infof("Starting kube subnet manager")
   ksm.nodeController.Run(ctx.Done())
}

func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context) (*subnet.Config, error) {
   return ksm.subnetConf, nil
}

func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *subnet.LeaseAttrs) (*subnet.Lease, error) {
   cachedNode, err := ksm.nodeStore.Get(ksm.nodeName)
   if err != nil {
      return nil, err
   }
   nobj, err := api.Scheme.DeepCopy(cachedNode)
   if err != nil {
      return nil, err
   }
   n := nobj.(*v1.Node)

   if n.Spec.PodCIDR == "" {
      return nil, fmt.Errorf("node %q pod cidr not assigned", ksm.nodeName)
   }
   bd, err := attrs.BackendData.MarshalJSON()
   if err != nil {
      return nil, err
   }
   _, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
   if err != nil {
      return nil, err
   }
   if n.Annotations[backendDataAnnotation] != string(bd) ||
      n.Annotations[backendTypeAnnotation] != attrs.BackendType ||
      n.Annotations[backendPublicIPAnnotation] != attrs.PublicIP.String() ||
      n.Annotations[subnetKubeManagedAnnotation] != "true" {
      n.Annotations[backendTypeAnnotation] = attrs.BackendType
      n.Annotations[backendDataAnnotation] = string(bd)
      n.Annotations[backendPublicIPAnnotation] = attrs.PublicIP.String()
      n.Annotations[subnetKubeManagedAnnotation] = "true"

      oldData, err := json.Marshal(cachedNode)
      if err != nil {
         return nil, err
      }

      newData, err := json.Marshal(n)
      if err != nil {
         return nil, err
      }

      patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
      if err != nil {
         return nil, fmt.Errorf("failed to create patch for node %q: %v", ksm.nodeName, err)
      }

      _, err = ksm.client.CoreV1().Nodes().Patch(ksm.nodeName, types.StrategicMergePatchType, patchBytes, "status")
      if err != nil {
         return nil, err
      }
   }
   return &subnet.Lease{
      Subnet:     ip.FromIPNet(cidr),
      Attrs:      *attrs,
      Expiration: time.Now().Add(24 * time.Hour),
   }, nil
}

func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, cursor interface{}) (subnet.LeaseWatchResult, error) {
   select {
   case event := <-ksm.events:
      return subnet.LeaseWatchResult{
         Events: []subnet.Event{event},
      }, nil
   case <-ctx.Done():
      return subnet.LeaseWatchResult{}, context.Canceled
   }
}

Backend Network

上图中的GetBackend()方法可根据配置获取不同的网络后端支持,如基于云vpc,vxlan等模式。

vxlan模式

本文章中主要看vxlan模式。Backend.RegisterNetwork()方法会返回一个Network结构,两个结构的定义如下:

type Backend interface {
   // Called when the backend should create or begin managing a new network
   RegisterNetwork(ctx context.Context, config *subnet.Config) (Network, error)
}

type Network interface {
   Lease() *subnet.Lease
   MTU() int
   Run(ctx context.Context)
}

该结构主要有三个方法,其中最重要的是Run()方法。这个方法中会调用SubnetManager.WatchLeases()监听整个集群网络的变更事件,并根据不同事件刷新路由表,arp表和fdb表等。 在vxlan Network.Run()中有一个变量值得注意,就是directRoutingOK,这个变量的意思是否使用直接路由模式(DirectRouting,类似于host-gw模式),不经过vxlan,这种情况下只刷新route表,这个对性能应该有提升。该变量是否为true是查询路由结果来判定的,类似使用ip route get ${ip},逻辑是如果有一个路由,并且网关地址为空,则说明是在同一个网络中,具备直接路由能力。

[root@debian ~]# ip route get 10.66.0.250
10.66.0.250 dev eth0 src 10.66.0.240

对于网络事件的王铮的监听处理逻辑可参考vxlan_network.go中的handleSubnetEvents的完整实现,值得说明的是,整个route,arp和fdb表是由flannel静态写入和更新的,这些在arp和fdb中的记录不会过期,并且在flannel重启时,这些表依然存在,这就实现了flannel无网络中断进行升级操作:

func (nw *network) handleSubnetEvents(batch []subnet.Event) {
   for _, event := range batch {
      sn := event.Lease.Subnet
      attrs := event.Lease.Attrs
      if attrs.BackendType != "vxlan" {
         log.Warningf("ignoring non-vxlan subnet(%s): type=%v", sn, attrs.BackendType)
         continue
      }

      var vxlanAttrs vxlanLeaseAttrs
      if err := json.Unmarshal(attrs.BackendData, &vxlanAttrs); err != nil {
         log.Error("error decoding subnet lease JSON: ", err)
         continue
      }

      // This route is used when traffic should be vxlan encapsulated
      vxlanRoute := netlink.Route{
         LinkIndex: nw.dev.link.Attrs().Index,
         Scope:     netlink.SCOPE_UNIVERSE,
         Dst:       sn.ToIPNet(),
         Gw:        sn.IP.ToIP(),
      }
      vxlanRoute.SetFlag(syscall.RTNH_F_ONLINK)

      // directRouting is where the remote host is on the same subnet so vxlan isn't required.
      directRoute := netlink.Route{
         Dst: sn.ToIPNet(),
         Gw:  attrs.PublicIP.ToIP(),
      }
      var directRoutingOK = false
      if nw.dev.directRouting {
         routes, err := netlink.RouteGet(attrs.PublicIP.ToIP())
         if err != nil {
            log.Errorf("Couldn't lookup route to %v: %v", attrs.PublicIP, err)
            continue
         }
         if len(routes) == 1 && routes[0].Gw == nil {
            // There is only a single route and there's no gateway (i.e. it's directly connected)
            directRoutingOK = true
         }
      }

      switch event.Type {
      case subnet.EventAdded:
         if directRoutingOK {
            log.V(2).Infof("Adding direct route to subnet: %s PublicIP: %s", sn, attrs.PublicIP)

            if err := netlink.RouteReplace(&directRoute); err != nil {
               log.Errorf("Error adding route to %v via %v: %v", sn, attrs.PublicIP, err)
               continue
            }
         } else {
            log.V(2).Infof("adding subnet: %s PublicIP: %s VtepMAC: %s", sn, attrs.PublicIP, net.HardwareAddr(vxlanAttrs.VtepMAC))
            if err := nw.dev.AddARP(neighbor{IP: sn.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
               log.Error("AddARP failed: ", err)
               continue
            }

            if err := nw.dev.AddFDB(neighbor{IP: attrs.PublicIP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
               log.Error("AddFDB failed: ", err)

               // Try to clean up the ARP entry then continue
               if err := nw.dev.DelARP(neighbor{IP: event.Lease.Subnet.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
                  log.Error("DelARP failed: ", err)
               }

               continue
            }

            // Set the route - the kernel would ARP for the Gw IP address if it hadn't already been set above so make sure
            // this is done last.
            if err := netlink.RouteReplace(&vxlanRoute); err != nil {
               log.Errorf("failed to add vxlanRoute (%s -> %s): %v", vxlanRoute.Dst, vxlanRoute.Gw, err)

               // Try to clean up both the ARP and FDB entries then continue
               if err := nw.dev.DelARP(neighbor{IP: event.Lease.Subnet.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
                  log.Error("DelARP failed: ", err)
               }

               if err := nw.dev.DelFDB(neighbor{IP: event.Lease.Attrs.PublicIP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
                  log.Error("DelFDB failed: ", err)
               }

               continue
            }
         }
      case subnet.EventRemoved:
         if directRoutingOK {
            log.V(2).Infof("Removing direct route to subnet: %s PublicIP: %s", sn, attrs.PublicIP)
            if err := netlink.RouteDel(&directRoute); err != nil {
               log.Errorf("Error deleting route to %v via %v: %v", sn, attrs.PublicIP, err)
            }
         } else {
            log.V(2).Infof("removing subnet: %s PublicIP: %s VtepMAC: %s", sn, attrs.PublicIP, net.HardwareAddr(vxlanAttrs.VtepMAC))

            // Try to remove all entries - don't bail out if one of them fails.
            if err := nw.dev.DelARP(neighbor{IP: sn.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
               log.Error("DelARP failed: ", err)
            }

            if err := nw.dev.DelFDB(neighbor{IP: attrs.PublicIP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
               log.Error("DelFDB failed: ", err)
            }

            if err := netlink.RouteDel(&vxlanRoute); err != nil {
               log.Errorf("failed to delete vxlanRoute (%s -> %s): %v", vxlanRoute.Dst, vxlanRoute.Gw, err)
            }
         }
      default:
         log.Error("internal error: unknown event type: ", int(event.Type))
      }
   }
}

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注