client-go leader election example with explanation

This article will show the codes of leader election, first, let’s look at the demonstration below.

First, the Local terminal got the lease, we can confirm that be executing kubectl get lease command. then I terminated the program in the Local terminal, then, we can find the Local(2) terminal got the lease. Finally, I closed the tunnel connection between my computer and my kubernetes cluster, we can find current leader will try to reconnect for renewing the lease, but failed after RenewDeadline.

Following is the code, and most of them come from client-go official examples, I added additional logics and wrote more detailed explanations.

package main

import (
	"context"
	"flag"
	"github.com/google/uuid"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/tools/leaderelection"
	"k8s.io/client-go/tools/leaderelection/resourcelock"
	"k8s.io/klog/v2"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	var id string
	var leaseLockName string
	var leaseLockNamespace string

	flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
	flag.StringVar(&leaseLockName, "lock-name", "leader-election-example", "the lease lock resource name")
	flag.StringVar(&leaseLockNamespace, "lock-namespace", "default", "the lease lock resource namespace")
	flag.Parse()

	clientConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(clientcmd.NewDefaultClientConfigLoadingRules(), &clientcmd.ConfigOverrides{}).ClientConfig()
	if err != nil {
		panic(err.Error())
	}

	clientSet := kubernetes.NewForConfigOrDie(clientConfig)

	run := func(ctx context.Context) {
		// complete your controller loop here
		klog.Info("Controller loop...")

		select {
		// for example, a terminal signal received, or network failures which causing client unable to renew the lease
		// a fencing method may be required, for example, when the context is done, we may do lots of cleanup work, which
		// may take too much time which greater than LeaseDuration, this may cause two instance operate on a same resource
		case <-ctx.Done():
			klog.Infof("Context done, we are not the leader, fencing method may required")
			return
		}
	}

	// use a Go context, so we can tell the leaderelection code when we
	// want to step down
	ctx, cancel := context.WithCancel(context.Background())
	// leaderelection.RunOrDie() will not return, when program panics, this will also get called
	defer cancel()

	// listen for interrupts or the Linux SIGTERM signal and cancel
	// our context, which the leader election code will observe and
	// step down
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-ch
		klog.Info("Received termination, signaling shutdown")
		cancel()
	}()

	// we use the Lease lock type since edits to Leases are less common
	// and fewer objects in the cluster watch "all Leases".
	lock := &resourcelock.LeaseLock{
		LeaseMeta: metav1.ObjectMeta{
			Name:      leaseLockName,
			Namespace: leaseLockNamespace,
		},
		Client: clientSet.CoordinationV1(),
		LockConfig: resourcelock.ResourceLockConfig{
			Identity: id,
		},
	}

	// start the leader election code loop
	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
		Lock: lock,
		// IMPORTANT: you MUST ensure that any code you have that
		// is protected by the lease must terminate **before**
		// you call cancel. Otherwise, you could have a background
		// loop still running and another process could
		// get elected before your background loop finished, violating
		// the stated goal of the lease.
		ReleaseOnCancel: true,
		LeaseDuration:   6 * time.Second,
		RenewDeadline:   3 * time.Second,
		RetryPeriod:     1 * time.Second,
		Callbacks: leaderelection.LeaderCallbacks{
			// this function will run in a new go routine, after that, renew the lease periodically
			OnStartedLeading: func(ctx context.Context) {
				// we're notified when we start - this is where you would
				// usually put your code
				// we should keep our programs run always, if the run() returns,
				// we will still be the leader but do nothing
				run(ctx)
			},
			// This function will run after Run() exits, after context cancel()
			// usually, we exit directly and then k8s restart this program again.
			// so, we don't need to consider too much about program states, everything
			// will start from the most beginning
			OnStoppedLeading: func() {
				// we can do cleanup here
				klog.Infof("leader lost: %s", id)
				// Usually, we flush logs and exit here, if we don't program will run to end of RunOrDie() or Run(),
				// then, main go routine exits, the program exits.
				//os.Exit(0)
			},
			// This function will run in a new go routine, usually for logging leader information
			OnNewLeader: func(identity string) {
				// we're notified when new leader elected
				klog.Infof("My id %s, new leader id %s", id, identity)
				if identity == id {
					// I just got the lock
					return
				}
				klog.Infof("new leader elected: %s", identity)
			},
		},
	})
}

Note: This was tested with client-go of version v0.29.1.

Leave a Reply

Your email address will not be published. Required fields are marked *