启动流程
- 寻找要使用的主机网卡,该网卡会在创建flannel.1设备时作为dev参数使用,表示flannel.1设备绑定的网卡为该网卡,vxlan的出站和入站数据都会经过这个网卡。
- 接下来是创建网络管理器,网络管理器用来获取网络配置,本机的pod网络等。
- 创建后端插件,图中实例的是vxlan后端,类型是Backend,有一个RegisterNetwork方法,在vxlan后端中在这个方法中创建vxlan设备以及设置其ip地址,并返回一个backend.Network结构。
- 设置iptables的伪装和转发规则,其中的-A POSTROUTING ! -s 10.244.0.0/16 -d 10.244.0.0/16 -j MASQUERADE这条iptables规则导致host到容器网络的访问源地址使用flannel.1的ip。
- 运行backend.Networ.Run()方法持续不断的监听所有网络变更的事件,根据事件刷新相关的route,arp和fdb表到主机上。
- 定时对本机的租约进行续租(etcd时才用)。
功能组件
SubnetManager
网络管理器用来管理当前主机和集群主机的网络配置。
接口说明
GetNetworkConfig(ctx context.Context) (*Config, error)
获取当前节点的网络配置信息:
type Config struct {
Network ip.IP4Net
SubnetMin ip.IP4
SubnetMax ip.IP4
SubnetLen uint
BackendType string `json:"-"`
Backend json.RawMessage `json:",omitempty"`
}
// 该配置由下列函数生成
func ParseConfig(s string) (*Config, error) {
cfg := new(Config)
err := json.Unmarshal([]byte(s), cfg)
if err != nil {
return nil, err
}
if cfg.SubnetLen > 0 {
if cfg.SubnetLen < cfg.Network.PrefixLen {
return nil, errors.New("HostSubnet is larger network than Network")
}
} else {
// try to give each host a /24 but if the whole network
// is /24 or smaller, half the network
if cfg.Network.PrefixLen < 24 {
cfg.SubnetLen = 24
} else {
cfg.SubnetLen = cfg.Network.PrefixLen + 1
}
}
subnetSize := ip.IP4(1 << (32 - cfg.SubnetLen))
if cfg.SubnetMin == ip.IP4(0) {
// skip over the first subnet otherwise it causes problems. e.g.
// if Network is 10.100.0.0/16, having an interface with 10.0.0.0
// makes ping think it's a broadcast address (not sure why)
cfg.SubnetMin = cfg.Network.IP + subnetSize
} else if !cfg.Network.Contains(cfg.SubnetMin) {
return nil, errors.New("SubnetMin is not in the range of the Network")
}
if cfg.SubnetMax == ip.IP4(0) {
cfg.SubnetMax = cfg.Network.Next().IP - subnetSize
} else if !cfg.Network.Contains(cfg.SubnetMax) {
return nil, errors.New("SubnetMax is not in the range of the Network")
}
bt, err := parseBackendType(cfg.Backend)
if err != nil {
return nil, err
}
cfg.BackendType = bt
return cfg, nil
}
AcquireLease(ctx context.Context, attrs *LeaseAttrs) (*Lease, error)
获取当前节点的租约信息,Lease结构如下,在vxlan插件注册网络时会调用这个方法获取当前节点的pod网络信息。
type Lease struct {
Subnet ip.IP4Net
Attrs LeaseAttrs
Expiration time.Time
Asof uint64
}
RenewLease(ctx context.Context, lease *Lease) error
续租当前节点的租约,只针对etcd时有效。
WatchLease(ctx context.Context, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error)
监听当前节点的租约变化,只针对etcd时有效。
WatchLeases(ctx context.Context, cursor interface{}) (LeaseWatchResult, error)
监听所有节点的租约信息变化,已在当前节点上操作修改路由表,arp表和fdb表。
基于k8s的网络管理器
func NewSubnetManager(apiUrl, kubeconfig string) (subnet.Manager, error) {
...
// 创建时通过client-go包直接后台启动了一个nodes caches,将apiserver的list-watch事件通过ksm.events传递给WatchLeases()处理。
sm, err := newKubeSubnetManager(c, sc, nodeName)
go sm.Run(context.Background())
...
}
func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName string) (*kubeSubnetManager, error) {
var ksm kubeSubnetManager
ksm.client = c
ksm.nodeName = nodeName
ksm.subnetConf = sc
ksm.events = make(chan subnet.Event, 5000)
indexer, controller := cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ksm.client.CoreV1().Nodes().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ksm.client.CoreV1().Nodes().Watch(options)
},
},
&v1.Node{},
resyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ksm.handleAddLeaseEvent(subnet.EventAdded, obj)
},
UpdateFunc: ksm.handleUpdateLeaseEvent,
DeleteFunc: func(obj interface{}) {
ksm.handleAddLeaseEvent(subnet.EventRemoved, obj)
},
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
ksm.nodeController = controller
ksm.nodeStore = listers.NewNodeLister(indexer)
return &ksm, nil
}
func (ksm *kubeSubnetManager) Run(ctx context.Context) {
glog.Infof("Starting kube subnet manager")
ksm.nodeController.Run(ctx.Done())
}
func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context) (*subnet.Config, error) {
return ksm.subnetConf, nil
}
func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *subnet.LeaseAttrs) (*subnet.Lease, error) {
cachedNode, err := ksm.nodeStore.Get(ksm.nodeName)
if err != nil {
return nil, err
}
nobj, err := api.Scheme.DeepCopy(cachedNode)
if err != nil {
return nil, err
}
n := nobj.(*v1.Node)
if n.Spec.PodCIDR == "" {
return nil, fmt.Errorf("node %q pod cidr not assigned", ksm.nodeName)
}
bd, err := attrs.BackendData.MarshalJSON()
if err != nil {
return nil, err
}
_, cidr, err := net.ParseCIDR(n.Spec.PodCIDR)
if err != nil {
return nil, err
}
if n.Annotations[backendDataAnnotation] != string(bd) ||
n.Annotations[backendTypeAnnotation] != attrs.BackendType ||
n.Annotations[backendPublicIPAnnotation] != attrs.PublicIP.String() ||
n.Annotations[subnetKubeManagedAnnotation] != "true" {
n.Annotations[backendTypeAnnotation] = attrs.BackendType
n.Annotations[backendDataAnnotation] = string(bd)
n.Annotations[backendPublicIPAnnotation] = attrs.PublicIP.String()
n.Annotations[subnetKubeManagedAnnotation] = "true"
oldData, err := json.Marshal(cachedNode)
if err != nil {
return nil, err
}
newData, err := json.Marshal(n)
if err != nil {
return nil, err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
if err != nil {
return nil, fmt.Errorf("failed to create patch for node %q: %v", ksm.nodeName, err)
}
_, err = ksm.client.CoreV1().Nodes().Patch(ksm.nodeName, types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, err
}
}
return &subnet.Lease{
Subnet: ip.FromIPNet(cidr),
Attrs: *attrs,
Expiration: time.Now().Add(24 * time.Hour),
}, nil
}
func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, cursor interface{}) (subnet.LeaseWatchResult, error) {
select {
case event := <-ksm.events:
return subnet.LeaseWatchResult{
Events: []subnet.Event{event},
}, nil
case <-ctx.Done():
return subnet.LeaseWatchResult{}, context.Canceled
}
}
Backend Network
上图中的GetBackend()方法可根据配置获取不同的网络后端支持,如基于云vpc,vxlan等模式。
vxlan模式
本文章中主要看vxlan模式。Backend.RegisterNetwork()方法会返回一个Network结构,两个结构的定义如下:
type Backend interface {
// Called when the backend should create or begin managing a new network
RegisterNetwork(ctx context.Context, config *subnet.Config) (Network, error)
}
type Network interface {
Lease() *subnet.Lease
MTU() int
Run(ctx context.Context)
}
该结构主要有三个方法,其中最重要的是Run()方法。这个方法中会调用SubnetManager.WatchLeases()监听整个集群网络的变更事件,并根据不同事件刷新路由表,arp表和fdb表等。 在vxlan Network.Run()中有一个变量值得注意,就是directRoutingOK,这个变量的意思是否使用直接路由模式(DirectRouting,类似于host-gw模式),不经过vxlan,这种情况下只刷新route表,这个对性能应该有提升。该变量是否为true是查询路由结果来判定的,类似使用ip route get ${ip},逻辑是如果有一个路由,并且网关地址为空,则说明是在同一个网络中,具备直接路由能力。
[root@debian ~]# ip route get 10.66.0.250
10.66.0.250 dev eth0 src 10.66.0.240
对于网络事件的王铮的监听处理逻辑可参考vxlan_network.go中的handleSubnetEvents的完整实现,值得说明的是,整个route,arp和fdb表是由flannel静态写入和更新的,这些在arp和fdb中的记录不会过期,并且在flannel重启时,这些表依然存在,这就实现了flannel无网络中断进行升级操作:
func (nw *network) handleSubnetEvents(batch []subnet.Event) {
for _, event := range batch {
sn := event.Lease.Subnet
attrs := event.Lease.Attrs
if attrs.BackendType != "vxlan" {
log.Warningf("ignoring non-vxlan subnet(%s): type=%v", sn, attrs.BackendType)
continue
}
var vxlanAttrs vxlanLeaseAttrs
if err := json.Unmarshal(attrs.BackendData, &vxlanAttrs); err != nil {
log.Error("error decoding subnet lease JSON: ", err)
continue
}
// This route is used when traffic should be vxlan encapsulated
vxlanRoute := netlink.Route{
LinkIndex: nw.dev.link.Attrs().Index,
Scope: netlink.SCOPE_UNIVERSE,
Dst: sn.ToIPNet(),
Gw: sn.IP.ToIP(),
}
vxlanRoute.SetFlag(syscall.RTNH_F_ONLINK)
// directRouting is where the remote host is on the same subnet so vxlan isn't required.
directRoute := netlink.Route{
Dst: sn.ToIPNet(),
Gw: attrs.PublicIP.ToIP(),
}
var directRoutingOK = false
if nw.dev.directRouting {
routes, err := netlink.RouteGet(attrs.PublicIP.ToIP())
if err != nil {
log.Errorf("Couldn't lookup route to %v: %v", attrs.PublicIP, err)
continue
}
if len(routes) == 1 && routes[0].Gw == nil {
// There is only a single route and there's no gateway (i.e. it's directly connected)
directRoutingOK = true
}
}
switch event.Type {
case subnet.EventAdded:
if directRoutingOK {
log.V(2).Infof("Adding direct route to subnet: %s PublicIP: %s", sn, attrs.PublicIP)
if err := netlink.RouteReplace(&directRoute); err != nil {
log.Errorf("Error adding route to %v via %v: %v", sn, attrs.PublicIP, err)
continue
}
} else {
log.V(2).Infof("adding subnet: %s PublicIP: %s VtepMAC: %s", sn, attrs.PublicIP, net.HardwareAddr(vxlanAttrs.VtepMAC))
if err := nw.dev.AddARP(neighbor{IP: sn.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("AddARP failed: ", err)
continue
}
if err := nw.dev.AddFDB(neighbor{IP: attrs.PublicIP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("AddFDB failed: ", err)
// Try to clean up the ARP entry then continue
if err := nw.dev.DelARP(neighbor{IP: event.Lease.Subnet.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("DelARP failed: ", err)
}
continue
}
// Set the route - the kernel would ARP for the Gw IP address if it hadn't already been set above so make sure
// this is done last.
if err := netlink.RouteReplace(&vxlanRoute); err != nil {
log.Errorf("failed to add vxlanRoute (%s -> %s): %v", vxlanRoute.Dst, vxlanRoute.Gw, err)
// Try to clean up both the ARP and FDB entries then continue
if err := nw.dev.DelARP(neighbor{IP: event.Lease.Subnet.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("DelARP failed: ", err)
}
if err := nw.dev.DelFDB(neighbor{IP: event.Lease.Attrs.PublicIP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("DelFDB failed: ", err)
}
continue
}
}
case subnet.EventRemoved:
if directRoutingOK {
log.V(2).Infof("Removing direct route to subnet: %s PublicIP: %s", sn, attrs.PublicIP)
if err := netlink.RouteDel(&directRoute); err != nil {
log.Errorf("Error deleting route to %v via %v: %v", sn, attrs.PublicIP, err)
}
} else {
log.V(2).Infof("removing subnet: %s PublicIP: %s VtepMAC: %s", sn, attrs.PublicIP, net.HardwareAddr(vxlanAttrs.VtepMAC))
// Try to remove all entries - don't bail out if one of them fails.
if err := nw.dev.DelARP(neighbor{IP: sn.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("DelARP failed: ", err)
}
if err := nw.dev.DelFDB(neighbor{IP: attrs.PublicIP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("DelFDB failed: ", err)
}
if err := netlink.RouteDel(&vxlanRoute); err != nil {
log.Errorf("failed to delete vxlanRoute (%s -> %s): %v", vxlanRoute.Dst, vxlanRoute.Gw, err)
}
}
default:
log.Error("internal error: unknown event type: ", int(event.Type))
}
}
}