blob: e6f831475c4a3e0072106daf7d5104401808e41c [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package leader
import (
"context"
"fmt"
"os"
"sync/atomic"
"time"
"github.com/google/uuid"
"gorm.io/gorm"
"github.com/apache/dubbo-admin/pkg/core/logger"
)
const (
// DefaultLeaseDuration is the default duration for a leader lease
DefaultLeaseDuration = 30 * time.Second
// DefaultRenewInterval is the default interval for renewing the lease
DefaultRenewInterval = 10 * time.Second
// DefaultAcquireRetryInterval is the default retry interval for acquiring leadership
DefaultAcquireRetryInterval = 5 * time.Second
)
// LeaderElection manages leader election for distributed components
// It uses database-based optimistic locking to ensure only one replica holds the lease at any time
type LeaderElection struct {
db *gorm.DB
component string
holderID string
leaseDuration time.Duration
renewInterval time.Duration
acquireRetry time.Duration
isLeader atomic.Bool
currentVersion int64
stopCh chan struct{}
}
// Option is a functional option for configuring LeaderElection
type Option func(*LeaderElection)
// WithLeaseDuration sets the lease duration
func WithLeaseDuration(d time.Duration) Option {
return func(le *LeaderElection) {
le.leaseDuration = d
}
}
// WithRenewInterval sets the renewal interval
func WithRenewInterval(d time.Duration) Option {
return func(le *LeaderElection) {
le.renewInterval = d
}
}
// WithAcquireRetryInterval sets the acquisition retry interval
func WithAcquireRetryInterval(d time.Duration) Option {
return func(le *LeaderElection) {
le.acquireRetry = d
}
}
// NewLeaderElection creates a new LeaderElection instance
func NewLeaderElection(db *gorm.DB, component, holderID string, opts ...Option) *LeaderElection {
le := &LeaderElection{
db: db,
component: component,
holderID: holderID,
leaseDuration: DefaultLeaseDuration,
renewInterval: DefaultRenewInterval,
acquireRetry: DefaultAcquireRetryInterval,
stopCh: make(chan struct{}),
}
for _, opt := range opts {
opt(le)
}
return le
}
// EnsureTable creates the leader_leases table if it doesn't exist
// This is idempotent and can be called multiple times
func (le *LeaderElection) EnsureTable() error {
return le.db.AutoMigrate(&LeaderLease{})
}
// TryAcquire attempts to acquire the leader lease from an expired holder.
// It only competes for leases that have already expired and does NOT renew an
// existing self-held lease — use Renew for that.
// Returns true if the current holder successfully acquired the lease.
func (le *LeaderElection) TryAcquire(ctx context.Context) bool {
now := time.Now()
expiresAt := now.Add(le.leaseDuration)
// Only take over an expired lease; never pre-empt an active holder.
result := le.db.WithContext(ctx).Model(&LeaderLease{}).
Where("component = ? AND expires_at < ?", le.component, now).
Updates(map[string]interface{}{
"holder_id": le.holderID,
"acquired_at": now,
"expires_at": expiresAt,
"version": gorm.Expr("version + 1"),
})
if result.Error != nil {
logger.Warnf("leader election: failed to update lease for component %s: %v", le.component, result.Error)
le.isLeader.Store(false)
return false
}
// If the update succeeded (found a row to update)
if result.RowsAffected > 0 {
// Fetch the updated version
var lease LeaderLease
err := le.db.WithContext(ctx).
Where("component = ?", le.component).
First(&lease).Error
if err != nil {
logger.Warnf("leader election: failed to read back updated lease for component %s: %v", le.component, err)
le.isLeader.Store(false)
return false
}
le.currentVersion = lease.Version
le.isLeader.Store(true)
return true
}
// No row was updated, try to insert a new record (lease doesn't exist)
result = le.db.WithContext(ctx).Create(&LeaderLease{
Component: le.component,
HolderID: le.holderID,
AcquiredAt: now,
ExpiresAt: expiresAt,
Version: 1,
})
if result.Error != nil {
// If insertion fails, it means another replica just created it
// This is expected in concurrent scenarios
logger.Debugf("leader election: failed to insert lease for component %s (probably created by another replica): %v", le.component, result.Error)
le.isLeader.Store(false)
return false
}
le.currentVersion = 1
le.isLeader.Store(true)
return true
}
// Renew attempts to renew the current leader lease
// Returns true if the renewal was successful
func (le *LeaderElection) Renew(ctx context.Context) bool {
if !le.isLeader.Load() {
return false
}
now := time.Now()
expiresAt := now.Add(le.leaseDuration)
result := le.db.WithContext(ctx).Model(&LeaderLease{}).
Where("component = ? AND holder_id = ? AND version = ?", le.component, le.holderID, le.currentVersion).
Updates(map[string]interface{}{
"acquired_at": now,
"expires_at": expiresAt,
"version": gorm.Expr("version + 1"),
})
if result.Error != nil {
logger.Warnf("leader election: failed to renew lease for component %s: %v", le.component, result.Error)
le.isLeader.Store(false)
return false
}
if result.RowsAffected > 0 {
le.currentVersion++
return true
}
// Lease was lost (likely held by another replica now)
logger.Warnf("leader election: lost leader lease for component %s (renewal failed, version mismatch)", le.component)
le.isLeader.Store(false)
return false
}
// Release releases the leader lease for this holder
// This should be called when the holder voluntarily gives up leadership
func (le *LeaderElection) Release(ctx context.Context) {
le.isLeader.Store(false)
expiresAt := time.Now().Add(-1 * time.Second) // Immediately expire the lease
result := le.db.WithContext(ctx).Model(&LeaderLease{}).
Where("component = ? AND holder_id = ?", le.component, le.holderID).
Update("expires_at", expiresAt)
if result.Error != nil {
logger.Warnf("leader election: failed to release lease for component %s: %v", le.component, result.Error)
}
}
// IsLeader returns true if this holder currently holds the leader lease
func (le *LeaderElection) IsLeader() bool {
return le.isLeader.Load()
}
// RunLeaderElection runs the leader election loop
// It blocks and runs onStartLeading/onStopLeading callbacks as leadership changes
// This is designed to be run in a separate goroutine
func (le *LeaderElection) RunLeaderElection(ctx context.Context, stopCh <-chan struct{},
onStartLeading func(), onStopLeading func()) {
ticker := time.NewTicker(le.acquireRetry)
defer ticker.Stop()
renewTicker := time.NewTicker(le.renewInterval)
defer renewTicker.Stop()
renewTicker.Stop() // Don't start renewal ticker yet
isLeader := false
for {
select {
case <-ctx.Done():
if isLeader {
le.Release(context.Background())
onStopLeading()
}
return
case <-stopCh:
if isLeader {
le.Release(context.Background())
onStopLeading()
}
return
case <-ticker.C:
// Try to acquire leadership if not already leader
if !isLeader {
if le.TryAcquire(ctx) {
logger.Infof("leader election: component %s acquired leadership (holder: %s)", le.component, le.holderID)
isLeader = true
renewTicker.Reset(le.renewInterval)
onStartLeading()
}
}
case <-renewTicker.C:
// Renew leadership if currently leader
if isLeader {
if !le.Renew(ctx) {
logger.Warnf("leader election: component %s lost leadership (holder: %s)", le.component, le.holderID)
isLeader = false
renewTicker.Stop()
ticker.Reset(le.acquireRetry)
onStopLeading()
}
}
}
}
}
// GenerateHolderID generates a unique holder ID combining hostname and UUID
func GenerateHolderID() (string, error) {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
return fmt.Sprintf("%s-%s", hostname, uuid.New().String()), nil
}