This article will show the codes of leader election, first, let’s look at the demonstration below.
First, the Local
terminal got the lease, we can confirm that be executing kubectl get lease
command. then I terminated the program in the Local
terminal, then, we can find the Local(2)
terminal got the lease. Finally, I closed the tunnel connection between my computer and my kubernetes cluster, we can find current leader will try to reconnect for renewing the lease, but failed after RenewDeadline
.
Following is the code, and most of them come from client-go official examples, I added additional logics and wrote more detailed explanations.
package main
import (
"context"
"flag"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
var id string
var leaseLockName string
var leaseLockNamespace string
flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
flag.StringVar(&leaseLockName, "lock-name", "leader-election-example", "the lease lock resource name")
flag.StringVar(&leaseLockNamespace, "lock-namespace", "default", "the lease lock resource namespace")
flag.Parse()
clientConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(clientcmd.NewDefaultClientConfigLoadingRules(), &clientcmd.ConfigOverrides{}).ClientConfig()
if err != nil {
panic(err.Error())
}
clientSet := kubernetes.NewForConfigOrDie(clientConfig)
run := func(ctx context.Context) {
// complete your controller loop here
klog.Info("Controller loop...")
select {
// for example, a terminal signal received, or network failures which causing client unable to renew the lease
// a fencing method may be required, for example, when the context is done, we may do lots of cleanup work, which
// may take too much time which greater than LeaseDuration, this may cause two instance operate on a same resource
case <-ctx.Done():
klog.Infof("Context done, we are not the leader, fencing method may required")
return
}
}
// use a Go context, so we can tell the leaderelection code when we
// want to step down
ctx, cancel := context.WithCancel(context.Background())
// leaderelection.RunOrDie() will not return, when program panics, this will also get called
defer cancel()
// listen for interrupts or the Linux SIGTERM signal and cancel
// our context, which the leader election code will observe and
// step down
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()
// we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
Client: clientSet.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: 6 * time.Second,
RenewDeadline: 3 * time.Second,
RetryPeriod: 1 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
// this function will run in a new go routine, after that, renew the lease periodically
OnStartedLeading: func(ctx context.Context) {
// we're notified when we start - this is where you would
// usually put your code
// we should keep our programs run always, if the run() returns,
// we will still be the leader but do nothing
run(ctx)
},
// This function will run after Run() exits, after context cancel()
// usually, we exit directly and then k8s restart this program again.
// so, we don't need to consider too much about program states, everything
// will start from the most beginning
OnStoppedLeading: func() {
// we can do cleanup here
klog.Infof("leader lost: %s", id)
// Usually, we flush logs and exit here, if we don't program will run to end of RunOrDie() or Run(),
// then, main go routine exits, the program exits.
//os.Exit(0)
},
// This function will run in a new go routine, usually for logging leader information
OnNewLeader: func(identity string) {
// we're notified when new leader elected
klog.Infof("My id %s, new leader id %s", id, identity)
if identity == id {
// I just got the lock
return
}
klog.Infof("new leader elected: %s", identity)
},
},
})
}
Note: This was tested with client-go of version v0.29.1.