blob: d7ea00b009663719180bff85a6ebd0470070fe52 [file] [log] [blame]
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconcilers
/*
Original Source:
https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c7/pkg/cmd/server/election/lease_endpoint_reconciler.go
*/
import (
"fmt"
"net"
"path"
"sync"
"time"
"k8s.io/klog"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/storage"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
)
// Leases is an interface which assists in managing the set of active masters
type Leases interface {
// ListLeases retrieves a list of the current master IPs
ListLeases() ([]string, error)
// UpdateLease adds or refreshes a master's lease
UpdateLease(ip string) error
// RemoveLease removes a master's lease
RemoveLease(ip string) error
}
type storageLeases struct {
storage storage.Interface
baseKey string
leaseTime time.Duration
}
var _ Leases = &storageLeases{}
// ListLeases retrieves a list of the current master IPs from storage
func (s *storageLeases) ListLeases() ([]string, error) {
ipInfoList := &corev1.EndpointsList{}
if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, "0", storage.Everything, ipInfoList); err != nil {
return nil, err
}
ipList := make([]string, len(ipInfoList.Items))
for i, ip := range ipInfoList.Items {
ipList[i] = ip.Subsets[0].Addresses[0].IP
}
klog.V(6).Infof("Current master IPs listed in storage are %v", ipList)
return ipList, nil
}
// UpdateLease resets the TTL on a master IP in storage
func (s *storageLeases) UpdateLease(ip string) error {
key := path.Join(s.baseKey, ip)
return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) {
// just make sure we've got the right IP set, and then refresh the TTL
existing := input.(*corev1.Endpoints)
existing.Subsets = []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{{IP: ip}},
},
}
// leaseTime needs to be in seconds
leaseTime := uint64(s.leaseTime / time.Second)
// NB: GuaranteedUpdate does not perform the store operation unless
// something changed between load and store (not including resource
// version), meaning we can't refresh the TTL without actually
// changing a field.
existing.Generation++
klog.V(6).Infof("Resetting TTL on master IP %q listed in storage to %v", ip, leaseTime)
return existing, &leaseTime, nil
})
}
// RemoveLease removes the lease on a master IP in storage
func (s *storageLeases) RemoveLease(ip string) error {
return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &corev1.Endpoints{}, nil)
}
// NewLeases creates a new etcd-based Leases implementation.
func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases {
return &storageLeases{
storage: storage,
baseKey: baseKey,
leaseTime: leaseTime,
}
}
type leaseEndpointReconciler struct {
endpointClient corev1client.EndpointsGetter
masterLeases Leases
stopReconcilingCalled bool
reconcilingLock sync.Mutex
}
// NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler
func NewLeaseEndpointReconciler(endpointClient corev1client.EndpointsGetter, masterLeases Leases) EndpointReconciler {
return &leaseEndpointReconciler{
endpointClient: endpointClient,
masterLeases: masterLeases,
stopReconcilingCalled: false,
}
}
// ReconcileEndpoints lists keys in a special etcd directory.
// Each key is expected to have a TTL of R+n, where R is the refresh interval
// at which this function is called, and n is some small value. If an
// apiserver goes down, it will fail to refresh its key's TTL and the key will
// expire. ReconcileEndpoints will notice that the endpoints object is
// different from the directory listing, and update the endpoints object
// accordingly.
func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
if r.stopReconcilingCalled {
return nil
}
// Refresh the TTL on our key, independently of whether any error or
// update conflict happens below. This makes sure that at least some of
// the masters will add our endpoint.
if err := r.masterLeases.UpdateLease(ip.String()); err != nil {
return err
}
return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
}
func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
e, err := r.endpointClient.Endpoints(corev1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
shouldCreate := false
if err != nil {
if !errors.IsNotFound(err) {
return err
}
shouldCreate = true
e = &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: corev1.NamespaceDefault,
},
}
}
// ... and the list of master IP keys from etcd
masterIPs, err := r.masterLeases.ListLeases()
if err != nil {
return err
}
// Since we just refreshed our own key, assume that zero endpoints
// returned from storage indicates an issue or invalid state, and thus do
// not update the endpoints list based on the result.
if len(masterIPs) == 0 {
return fmt.Errorf("no master IPs were listed in storage, refusing to erase all endpoints for the kubernetes service")
}
// Next, we compare the current list of endpoints with the list of master IP keys
formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
if formatCorrect && ipCorrect && portsCorrect {
return nil
}
if !formatCorrect {
// Something is egregiously wrong, just re-make the endpoints record.
e.Subsets = []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{},
Ports: endpointPorts,
}}
}
if !formatCorrect || !ipCorrect {
// repopulate the addresses according to the expected IPs from etcd
e.Subsets[0].Addresses = make([]corev1.EndpointAddress, len(masterIPs))
for ind, ip := range masterIPs {
e.Subsets[0].Addresses[ind] = corev1.EndpointAddress{IP: ip}
}
// Lexicographic order is retained by this step.
e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
}
if !portsCorrect {
// Reset ports.
e.Subsets[0].Ports = endpointPorts
}
klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs)
if shouldCreate {
if _, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Create(e); errors.IsAlreadyExists(err) {
err = nil
}
} else {
_, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Update(e)
}
return err
}
// checkEndpointSubsetFormatWithLease determines if the endpoint is in the
// format ReconcileEndpoints expects when the controller is using leases.
//
// Return values:
// * formatCorrect is true if exactly one subset is found.
// * ipsCorrect when the addresses in the endpoints match the expected addresses list
// * portsCorrect is true when endpoint ports exactly match provided ports.
// portsCorrect is only evaluated when reconcilePorts is set to true.
func checkEndpointSubsetFormatWithLease(e *corev1.Endpoints, expectedIPs []string, ports []corev1.EndpointPort, reconcilePorts bool) (formatCorrect bool, ipsCorrect bool, portsCorrect bool) {
if len(e.Subsets) != 1 {
return false, false, false
}
sub := &e.Subsets[0]
portsCorrect = true
if reconcilePorts {
if len(sub.Ports) != len(ports) {
portsCorrect = false
} else {
for i, port := range ports {
if port != sub.Ports[i] {
portsCorrect = false
break
}
}
}
}
ipsCorrect = true
if len(sub.Addresses) != len(expectedIPs) {
ipsCorrect = false
} else {
// check the actual content of the addresses
// present addrs is used as a set (the keys) and to indicate if a
// value was already found (the values)
presentAddrs := make(map[string]bool, len(expectedIPs))
for _, ip := range expectedIPs {
presentAddrs[ip] = false
}
// uniqueness is assumed amongst all Addresses.
for _, addr := range sub.Addresses {
if alreadySeen, ok := presentAddrs[addr.IP]; alreadySeen || !ok {
ipsCorrect = false
break
}
presentAddrs[addr.IP] = true
}
}
return true, ipsCorrect, portsCorrect
}
func (r *leaseEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
r.reconcilingLock.Lock()
defer r.reconcilingLock.Unlock()
r.stopReconcilingCalled = true
if err := r.masterLeases.RemoveLease(ip.String()); err != nil {
return err
}
return r.doReconcile(serviceName, endpointPorts, true)
}