| /* |
| Copyright 2015 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| // Package leaderelection implements leader election of a set of endpoints. |
| // It uses an annotation in the endpoints object to store the record of the |
| // election state. |
| // |
| // This implementation does not guarantee that only one client is acting as a |
| // leader (a.k.a. fencing). A client observes timestamps captured locally to |
| // infer the state of the leader election. Thus the implementation is tolerant |
| // to arbitrary clock skew, but is not tolerant to arbitrary clock skew rate. |
| // |
| // However the level of tolerance to skew rate can be configured by setting |
| // RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a |
| // maximum tolerated ratio of time passed on the fastest node to time passed on |
| // the slowest node can be approximately achieved with a configuration that sets |
| // the same ratio of LeaseDuration to RenewDeadline. For example if a user wanted |
| // to tolerate some nodes progressing forward in time twice as fast as other nodes, |
| // the user could set LeaseDuration to 60 seconds and RenewDeadline to 30 seconds. |
| // |
| // While not required, some method of clock synchronization between nodes in the |
| // cluster is highly recommended. It's important to keep in mind when configuring |
| // this client that the tolerance to skew rate varies inversely to master |
| // availability. |
| // |
| // Larger clusters often have a more lenient SLA for API latency. This should be |
| // taken into account when configuring the client. The rate of leader transitions |
| // should be monitored and RetryPeriod and LeaseDuration should be increased |
| // until the rate is stable and acceptably low. It's important to keep in mind |
| // when configuring this client that the tolerance to API latency varies inversely |
| // to master availability. |
| // |
| // DISCLAIMER: this is an alpha API. This library will likely change significantly |
| // or even be removed entirely in subsequent releases. Depend on this API at |
| // your own risk. |
| package leaderelection |
| |
| import ( |
| "context" |
| "fmt" |
| "reflect" |
| "time" |
| |
| "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/util/clock" |
| "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/util/wait" |
| rl "k8s.io/client-go/tools/leaderelection/resourcelock" |
| |
| "k8s.io/klog" |
| ) |
| |
| const ( |
| JitterFactor = 1.2 |
| ) |
| |
| // NewLeaderElector creates a LeaderElector from a LeaderElectionConfig |
| func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) { |
| if lec.LeaseDuration <= lec.RenewDeadline { |
| return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline") |
| } |
| if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) { |
| return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor") |
| } |
| if lec.LeaseDuration < 1 { |
| return nil, fmt.Errorf("leaseDuration must be greater than zero") |
| } |
| if lec.RenewDeadline < 1 { |
| return nil, fmt.Errorf("renewDeadline must be greater than zero") |
| } |
| if lec.RetryPeriod < 1 { |
| return nil, fmt.Errorf("retryPeriod must be greater than zero") |
| } |
| |
| if lec.Lock == nil { |
| return nil, fmt.Errorf("Lock must not be nil.") |
| } |
| return &LeaderElector{ |
| config: lec, |
| clock: clock.RealClock{}, |
| }, nil |
| } |
| |
| type LeaderElectionConfig struct { |
| // Lock is the resource that will be used for locking |
| Lock rl.Interface |
| |
| // LeaseDuration is the duration that non-leader candidates will |
| // wait to force acquire leadership. This is measured against time of |
| // last observed ack. |
| LeaseDuration time.Duration |
| // RenewDeadline is the duration that the acting master will retry |
| // refreshing leadership before giving up. |
| RenewDeadline time.Duration |
| // RetryPeriod is the duration the LeaderElector clients should wait |
| // between tries of actions. |
| RetryPeriod time.Duration |
| |
| // Callbacks are callbacks that are triggered during certain lifecycle |
| // events of the LeaderElector |
| Callbacks LeaderCallbacks |
| |
| // WatchDog is the associated health checker |
| // WatchDog may be null if its not needed/configured. |
| WatchDog *HealthzAdaptor |
| |
| // Name is the name of the resource lock for debugging |
| Name string |
| } |
| |
| // LeaderCallbacks are callbacks that are triggered during certain |
| // lifecycle events of the LeaderElector. These are invoked asynchronously. |
| // |
| // possible future callbacks: |
| // * OnChallenge() |
| type LeaderCallbacks struct { |
| // OnStartedLeading is called when a LeaderElector client starts leading |
| OnStartedLeading func(context.Context) |
| // OnStoppedLeading is called when a LeaderElector client stops leading |
| OnStoppedLeading func() |
| // OnNewLeader is called when the client observes a leader that is |
| // not the previously observed leader. This includes the first observed |
| // leader when the client starts. |
| OnNewLeader func(identity string) |
| } |
| |
| // LeaderElector is a leader election client. |
| type LeaderElector struct { |
| config LeaderElectionConfig |
| // internal bookkeeping |
| observedRecord rl.LeaderElectionRecord |
| observedTime time.Time |
| // used to implement OnNewLeader(), may lag slightly from the |
| // value observedRecord.HolderIdentity if the transition has |
| // not yet been reported. |
| reportedLeader string |
| |
| // clock is wrapper around time to allow for less flaky testing |
| clock clock.Clock |
| |
| // name is the name of the resource lock for debugging |
| name string |
| } |
| |
| // Run starts the leader election loop |
| func (le *LeaderElector) Run(ctx context.Context) { |
| defer func() { |
| runtime.HandleCrash() |
| le.config.Callbacks.OnStoppedLeading() |
| }() |
| if !le.acquire(ctx) { |
| return // ctx signalled done |
| } |
| ctx, cancel := context.WithCancel(ctx) |
| defer cancel() |
| go le.config.Callbacks.OnStartedLeading(ctx) |
| le.renew(ctx) |
| } |
| |
| // RunOrDie starts a client with the provided config or panics if the config |
| // fails to validate. |
| func RunOrDie(ctx context.Context, lec LeaderElectionConfig) { |
| le, err := NewLeaderElector(lec) |
| if err != nil { |
| panic(err) |
| } |
| if lec.WatchDog != nil { |
| lec.WatchDog.SetLeaderElection(le) |
| } |
| le.Run(ctx) |
| } |
| |
| // GetLeader returns the identity of the last observed leader or returns the empty string if |
| // no leader has yet been observed. |
| func (le *LeaderElector) GetLeader() string { |
| return le.observedRecord.HolderIdentity |
| } |
| |
| // IsLeader returns true if the last observed leader was this client else returns false. |
| func (le *LeaderElector) IsLeader() bool { |
| return le.observedRecord.HolderIdentity == le.config.Lock.Identity() |
| } |
| |
| // acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds. |
| // Returns false if ctx signals done. |
| func (le *LeaderElector) acquire(ctx context.Context) bool { |
| ctx, cancel := context.WithCancel(ctx) |
| defer cancel() |
| succeeded := false |
| desc := le.config.Lock.Describe() |
| klog.Infof("attempting to acquire leader lease %v...", desc) |
| wait.JitterUntil(func() { |
| succeeded = le.tryAcquireOrRenew() |
| le.maybeReportTransition() |
| if !succeeded { |
| klog.V(4).Infof("failed to acquire lease %v", desc) |
| return |
| } |
| le.config.Lock.RecordEvent("became leader") |
| klog.Infof("successfully acquired lease %v", desc) |
| cancel() |
| }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) |
| return succeeded |
| } |
| |
| // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. |
| func (le *LeaderElector) renew(ctx context.Context) { |
| ctx, cancel := context.WithCancel(ctx) |
| defer cancel() |
| wait.Until(func() { |
| timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) |
| defer timeoutCancel() |
| err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { |
| done := make(chan bool, 1) |
| go func() { |
| defer close(done) |
| done <- le.tryAcquireOrRenew() |
| }() |
| |
| select { |
| case <-timeoutCtx.Done(): |
| return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err()) |
| case result := <-done: |
| return result, nil |
| } |
| }, timeoutCtx.Done()) |
| |
| le.maybeReportTransition() |
| desc := le.config.Lock.Describe() |
| if err == nil { |
| klog.V(5).Infof("successfully renewed lease %v", desc) |
| return |
| } |
| le.config.Lock.RecordEvent("stopped leading") |
| klog.Infof("failed to renew lease %v: %v", desc, err) |
| cancel() |
| }, le.config.RetryPeriod, ctx.Done()) |
| } |
| |
| // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, |
| // else it tries to renew the lease if it has already been acquired. Returns true |
| // on success else returns false. |
| func (le *LeaderElector) tryAcquireOrRenew() bool { |
| now := metav1.Now() |
| leaderElectionRecord := rl.LeaderElectionRecord{ |
| HolderIdentity: le.config.Lock.Identity(), |
| LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), |
| RenewTime: now, |
| AcquireTime: now, |
| } |
| |
| // 1. obtain or create the ElectionRecord |
| oldLeaderElectionRecord, err := le.config.Lock.Get() |
| if err != nil { |
| if !errors.IsNotFound(err) { |
| klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) |
| return false |
| } |
| if err = le.config.Lock.Create(leaderElectionRecord); err != nil { |
| klog.Errorf("error initially creating leader election record: %v", err) |
| return false |
| } |
| le.observedRecord = leaderElectionRecord |
| le.observedTime = le.clock.Now() |
| return true |
| } |
| |
| // 2. Record obtained, check the Identity & Time |
| if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) { |
| le.observedRecord = *oldLeaderElectionRecord |
| le.observedTime = le.clock.Now() |
| } |
| if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && |
| !le.IsLeader() { |
| klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) |
| return false |
| } |
| |
| // 3. We're going to try to update. The leaderElectionRecord is set to it's default |
| // here. Let's correct it before updating. |
| if le.IsLeader() { |
| leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime |
| leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions |
| } else { |
| leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 |
| } |
| |
| // update the lock itself |
| if err = le.config.Lock.Update(leaderElectionRecord); err != nil { |
| klog.Errorf("Failed to update lock: %v", err) |
| return false |
| } |
| le.observedRecord = leaderElectionRecord |
| le.observedTime = le.clock.Now() |
| return true |
| } |
| |
| func (le *LeaderElector) maybeReportTransition() { |
| if le.observedRecord.HolderIdentity == le.reportedLeader { |
| return |
| } |
| le.reportedLeader = le.observedRecord.HolderIdentity |
| if le.config.Callbacks.OnNewLeader != nil { |
| go le.config.Callbacks.OnNewLeader(le.reportedLeader) |
| } |
| } |
| |
| // Check will determine if the current lease is expired by more than timeout. |
| func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error { |
| if !le.IsLeader() { |
| // Currently not concerned with the case that we are hot standby |
| return nil |
| } |
| // If we are more than timeout seconds after the lease duration that is past the timeout |
| // on the lease renew. Time to start reporting ourselves as unhealthy. We should have |
| // died but conditions like deadlock can prevent this. (See #70819) |
| if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease { |
| return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name) |
| } |
| |
| return nil |
| } |