blob: 3d429d494625829d7145982b9e5cdb72bced5ef4 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package workloadentry
import (
"context"
"fmt"
"math"
"strings"
"sync"
"time"
)
import (
"golang.org/x/time/rate"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"istio.io/api/meta/v1alpha1"
"istio.io/api/networking/v1alpha3"
istiolog "istio.io/pkg/log"
"istio.io/pkg/monitoring"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubetypes "k8s.io/apimachinery/pkg/types"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model/status"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
"github.com/apache/dubbo-go-pixiu/pkg/kube/controllers"
"github.com/apache/dubbo-go-pixiu/pkg/queue"
)
func init() {
monitoring.MustRegister(autoRegistrationSuccess)
monitoring.MustRegister(autoRegistrationUpdates)
monitoring.MustRegister(autoRegistrationUnregistrations)
monitoring.MustRegister(autoRegistrationDeletes)
monitoring.MustRegister(autoRegistrationErrors)
}
var (
autoRegistrationSuccess = monitoring.NewSum(
"auto_registration_success_total",
"Total number of successful auto registrations.",
)
autoRegistrationUpdates = monitoring.NewSum(
"auto_registration_updates_total",
"Total number of auto registration updates.",
)
autoRegistrationUnregistrations = monitoring.NewSum(
"auto_registration_unregister_total",
"Total number of unregistrations.",
)
autoRegistrationDeletes = monitoring.NewSum(
"auto_registration_deletes_total",
"Total number of auto registration cleaned up by periodic timer.",
)
autoRegistrationErrors = monitoring.NewSum(
"auto_registration_errors_total",
"Total number of auto registration errors.",
)
)
const (
// TODO use status or another proper API instead of annotations
// AutoRegistrationGroupAnnotation on a WorkloadEntry stores the associated WorkloadGroup.
AutoRegistrationGroupAnnotation = "istio.io/autoRegistrationGroup"
// WorkloadControllerAnnotation on a WorkloadEntry should store the current/last pilot instance connected to the workload for XDS.
WorkloadControllerAnnotation = "istio.io/workloadController"
// ConnectedAtAnnotation on a WorkloadEntry stores the time in nanoseconds when the associated workload connected to a Pilot instance.
ConnectedAtAnnotation = "istio.io/connectedAt"
// DisconnectedAtAnnotation on a WorkloadEntry stores the time in nanoseconds when the associated workload disconnected from a Pilot instance.
DisconnectedAtAnnotation = "istio.io/disconnectedAt"
timeFormat = time.RFC3339Nano
// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
// sequence of delays between successive queuings of a service.
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
)
type HealthEvent struct {
// whether or not the agent thought the target is healthy
Healthy bool `json:"healthy,omitempty"`
// error message propagated
Message string `json:"errMessage,omitempty"`
}
type HealthCondition struct {
proxy *model.Proxy
entryName string
condition *v1alpha1.IstioCondition
}
var log = istiolog.RegisterScope("wle", "wle controller debugging", 0)
type Controller struct {
instanceID string
// TODO move WorkloadEntry related tasks into their own object and give InternalGen a reference.
// store should either be k8s (for running pilot) or in-memory (for tests). MCP and other config store implementations
// do not support writing. We only use it here for reading WorkloadEntry/WorkloadGroup.
store model.ConfigStoreController
// Note: unregister is to update the workload entry status: like setting `DisconnectedAtAnnotation`
// and make the workload entry enqueue `cleanupQueue`
// cleanup is to delete the workload entry
// queue contains workloadEntry that need to be unregistered
queue controllers.Queue
// cleanupLimit rate limit's auto registered WorkloadEntry cleanup calls to k8s
cleanupLimit *rate.Limiter
// cleanupQueue delays the cleanup of auto registered WorkloadEntries to allow for grace period
cleanupQueue queue.Delayed
mutex sync.Mutex
// record the current adsConnections number
// note: this is to handle reconnect to the same istiod, but in rare case the disconnect event is later than the connect event
// keyed by proxy network+ip
adsConnections map[string]uint8
// maxConnectionAge is a duration that workload entry should be cleaned up if it does not reconnects.
maxConnectionAge time.Duration
// healthCondition is a fifo queue used for updating health check status
healthCondition controllers.Queue
}
type HealthStatus = v1alpha1.IstioCondition
// NewController create a controller which manages workload lifecycle and health status.
func NewController(store model.ConfigStoreController, instanceID string, maxConnAge time.Duration) *Controller {
if features.WorkloadEntryAutoRegistration || features.WorkloadEntryHealthChecks {
maxConnAge := maxConnAge + maxConnAge/2
// if overflow, set it to max int64
if maxConnAge < 0 {
maxConnAge = time.Duration(math.MaxInt64)
}
c := &Controller{
instanceID: instanceID,
store: store,
cleanupLimit: rate.NewLimiter(rate.Limit(20), 1),
cleanupQueue: queue.NewDelayed(),
adsConnections: map[string]uint8{},
maxConnectionAge: maxConnAge,
}
c.queue = controllers.NewQueue("unregister_workloadentry",
controllers.WithMaxAttempts(maxRetries),
controllers.WithGenericReconciler(c.unregisterWorkload))
c.healthCondition = controllers.NewQueue("healthcheck",
controllers.WithMaxAttempts(maxRetries),
controllers.WithGenericReconciler(c.updateWorkloadEntryHealth))
return c
}
return nil
}
func (c *Controller) Run(stop <-chan struct{}) {
if c == nil {
return
}
if c.store != nil && c.cleanupQueue != nil {
go c.periodicWorkloadEntryCleanup(stop)
go c.cleanupQueue.Run(stop)
}
go c.queue.Run(stop)
go c.healthCondition.Run(stop)
<-stop
}
// workItem contains the state of a "disconnect" event used to unregister a workload.
type workItem struct {
entryName string
proxy *model.Proxy
disConTime time.Time
origConTime time.Time
}
func setConnectMeta(c *config.Config, controller string, conTime time.Time) {
if c.Annotations == nil {
c.Annotations = map[string]string{}
}
c.Annotations[WorkloadControllerAnnotation] = controller
c.Annotations[ConnectedAtAnnotation] = conTime.Format(timeFormat)
delete(c.Annotations, DisconnectedAtAnnotation)
}
func (c *Controller) RegisterWorkload(proxy *model.Proxy, conTime time.Time) error {
if !features.WorkloadEntryAutoRegistration || c == nil {
return nil
}
// check if the WE already exists, update the status
entryName := autoregisteredWorkloadEntryName(proxy)
if entryName == "" {
return nil
}
proxy.AutoregisteredWorkloadEntryName = entryName
c.mutex.Lock()
c.adsConnections[makeProxyKey(proxy)]++
c.mutex.Unlock()
if err := c.registerWorkload(entryName, proxy, conTime); err != nil {
log.Errorf(err)
return err
}
return nil
}
func (c *Controller) registerWorkload(entryName string, proxy *model.Proxy, conTime time.Time) error {
wle := c.store.Get(gvk.WorkloadEntry, entryName, proxy.Metadata.Namespace)
if wle != nil {
lastConTime, _ := time.Parse(timeFormat, wle.Annotations[ConnectedAtAnnotation])
// the proxy has reconnected to another pilot, not belong to this one.
if conTime.Before(lastConTime) {
return nil
}
// Try to patch, if it fails then try to create
_, err := c.store.Patch(*wle, func(cfg config.Config) (config.Config, kubetypes.PatchType) {
setConnectMeta(&cfg, c.instanceID, conTime)
return cfg, kubetypes.MergePatchType
})
if err != nil {
return fmt.Errorf("failed updating WorkloadEntry %s/%s err: %v", proxy.Metadata.Namespace, entryName, err)
}
autoRegistrationUpdates.Increment()
log.Infof("updated auto-registered WorkloadEntry %s/%s", proxy.Metadata.Namespace, entryName)
return nil
}
// No WorkloadEntry, create one using fields from the associated WorkloadGroup
groupCfg := c.store.Get(gvk.WorkloadGroup, proxy.Metadata.AutoRegisterGroup, proxy.Metadata.Namespace)
if groupCfg == nil {
autoRegistrationErrors.Increment()
return grpcstatus.Errorf(codes.FailedPrecondition, "auto-registration WorkloadEntry of %v failed: cannot find WorkloadGroup %s/%s",
proxy.ID, proxy.Metadata.Namespace, proxy.Metadata.AutoRegisterGroup)
}
entry := workloadEntryFromGroup(entryName, proxy, groupCfg)
setConnectMeta(entry, c.instanceID, conTime)
_, err := c.store.Create(*entry)
if err != nil {
autoRegistrationErrors.Increment()
return fmt.Errorf("auto-registration WorkloadEntry of %v failed: error creating WorkloadEntry: %v", proxy.ID, err)
}
hcMessage := ""
if _, f := entry.Annotations[status.WorkloadEntryHealthCheckAnnotation]; f {
hcMessage = " with health checking enabled"
}
autoRegistrationSuccess.Increment()
log.Infof("auto-registered WorkloadEntry %s/%s%s", proxy.Metadata.Namespace, entryName, hcMessage)
return nil
}
func (c *Controller) QueueUnregisterWorkload(proxy *model.Proxy, origConnect time.Time) {
if !features.WorkloadEntryAutoRegistration || c == nil {
return
}
// check if the WE already exists, update the status
entryName := proxy.AutoregisteredWorkloadEntryName
if entryName == "" {
return
}
c.mutex.Lock()
num := c.adsConnections[makeProxyKey(proxy)]
// if there is still ads connection, do not unregister.
if num > 1 {
c.adsConnections[makeProxyKey(proxy)] = num - 1
c.mutex.Unlock()
return
}
delete(c.adsConnections, makeProxyKey(proxy))
c.mutex.Unlock()
workload := &workItem{
entryName: entryName,
proxy: proxy,
disConTime: time.Now(),
origConTime: origConnect,
}
if err := c.unregisterWorkload(workload); err != nil {
log.Errorf(err)
c.queue.Add(workload)
}
}
func (c *Controller) unregisterWorkload(item interface{}) error {
workItem, ok := item.(*workItem)
if !ok {
return nil
}
// unset controller, set disconnect time
cfg := c.store.Get(gvk.WorkloadEntry, workItem.entryName, workItem.proxy.Metadata.Namespace)
if cfg == nil {
// return error and backoff retry to prevent workloadentry leak
// TODO(@hzxuzhonghu): update the Get interface, fallback to calling apiserver.
return fmt.Errorf("workloadentry %s/%s is not found, maybe deleted or because of propagate latency",
workItem.proxy.Metadata.Namespace, workItem.entryName)
}
// only queue a delete if this disconnect event is associated with the last connect event written to the worload entry
if mostRecentConn, err := time.Parse(timeFormat, cfg.Annotations[ConnectedAtAnnotation]); err == nil {
if mostRecentConn.After(workItem.origConTime) {
// this disconnect event wasn't processed until after we successfully reconnected
return nil
}
}
// The wle has reconnected to another istiod and controlled by it.
if cfg.Annotations[WorkloadControllerAnnotation] != c.instanceID {
return nil
}
conTime, _ := time.Parse(timeFormat, cfg.Annotations[ConnectedAtAnnotation])
// The wle has reconnected to this istiod,
// this may happen when the unregister fails retry
if workItem.disConTime.Before(conTime) {
return nil
}
wle := cfg.DeepCopy()
delete(wle.Annotations, ConnectedAtAnnotation)
wle.Annotations[DisconnectedAtAnnotation] = workItem.disConTime.Format(timeFormat)
// use update instead of patch to prevent race condition
if _, err := c.store.Update(wle); err != nil {
autoRegistrationErrors.Increment()
return fmt.Errorf("disconnect: failed updating WorkloadEntry %s/%s: %v", workItem.proxy.Metadata.Namespace, workItem.entryName, err)
}
autoRegistrationUnregistrations.Increment()
// after grace period, check if the workload ever reconnected
ns := workItem.proxy.Metadata.Namespace
c.cleanupQueue.PushDelayed(func() error {
wle := c.store.Get(gvk.WorkloadEntry, workItem.entryName, ns)
if wle == nil {
return nil
}
if c.shouldCleanupEntry(*wle) {
c.cleanupEntry(*wle)
}
return nil
}, features.WorkloadEntryCleanupGracePeriod)
return nil
}
// QueueWorkloadEntryHealth enqueues the associated WorkloadEntries health status.
func (c *Controller) QueueWorkloadEntryHealth(proxy *model.Proxy, event HealthEvent) {
// we assume that the workload entry exists
// if auto registration does not exist, try looking
// up in NodeMetadata
entryName := proxy.AutoregisteredWorkloadEntryName
if entryName == "" {
log.Errorf("unable to derive WorkloadEntry for health update for %v", proxy.ID)
return
}
condition := transformHealthEvent(proxy, entryName, event)
c.healthCondition.Add(condition)
}
// updateWorkloadEntryHealth updates the associated WorkloadEntries health status
// based on the corresponding health check performed by istio-agent.
func (c *Controller) updateWorkloadEntryHealth(obj interface{}) error {
condition := obj.(HealthCondition)
// get previous status
cfg := c.store.Get(gvk.WorkloadEntry, condition.entryName, condition.proxy.Metadata.Namespace)
if cfg == nil {
return fmt.Errorf("failed to update health status for %v: WorkloadEntry %v not found", condition.proxy.ID, condition.entryName)
}
// The workloadentry has reconnected to the other istiod
if cfg.Annotations[WorkloadControllerAnnotation] != c.instanceID {
return nil
}
// check if the existing health status is newer than this one
wleStatus, ok := cfg.Status.(*v1alpha1.IstioStatus)
if ok {
healthCondition := status.GetCondition(wleStatus.Conditions, status.ConditionHealthy)
if healthCondition != nil {
if healthCondition.LastProbeTime.AsTime().After(condition.condition.LastProbeTime.AsTime()) {
return nil
}
}
}
// replace the updated status
wle := status.UpdateConfigCondition(*cfg, condition.condition)
// update the status
_, err := c.store.UpdateStatus(wle)
if err != nil {
return fmt.Errorf("error while updating WorkloadEntry health status for %s: %v", condition.proxy.ID, err)
}
log.Debugf("updated health status of %v to %v", condition.proxy.ID, condition.condition)
return nil
}
// periodicWorkloadEntryCleanup checks lists all WorkloadEntry
func (c *Controller) periodicWorkloadEntryCleanup(stopCh <-chan struct{}) {
if !features.WorkloadEntryAutoRegistration {
return
}
ticker := time.NewTicker(10 * features.WorkloadEntryCleanupGracePeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
wles, err := c.store.List(gvk.WorkloadEntry, metav1.NamespaceAll)
if err != nil {
log.Warnf("error listing WorkloadEntry for cleanup: %v", err)
continue
}
for _, wle := range wles {
wle := wle
if c.shouldCleanupEntry(wle) {
c.cleanupQueue.Push(func() error {
c.cleanupEntry(wle)
return nil
})
}
}
case <-stopCh:
return
}
}
}
func (c *Controller) shouldCleanupEntry(wle config.Config) bool {
// don't clean-up if connected or non-autoregistered WorkloadEntries
if wle.Annotations[AutoRegistrationGroupAnnotation] == "" {
return false
}
// If there is ConnectedAtAnnotation set, don't cleanup this workload entry.
// This may happen when the workload fast reconnects to the same istiod.
// 1. disconnect: the workload entry has been updated
// 2. connect: but the patch is based on the old workloadentry because of the propagation latency.
// So in this case the `DisconnectedAtAnnotation` is still there and the cleanup procedure will go on.
connTime := wle.Annotations[ConnectedAtAnnotation]
if connTime != "" {
// handle workload leak when both workload/pilot down at the same time before pilot has a chance to set disconnTime
connAt, err := time.Parse(timeFormat, connTime)
// if it has been 1.5*maxConnectionAge since workload connected, should delete it.
if err == nil && uint64(time.Since(connAt)) > uint64(c.maxConnectionAge)+uint64(c.maxConnectionAge/2) {
return true
}
return false
}
disconnTime := wle.Annotations[DisconnectedAtAnnotation]
if disconnTime == "" {
return false
}
disconnAt, err := time.Parse(timeFormat, disconnTime)
// if we haven't passed the grace period, don't cleanup
if err == nil && time.Since(disconnAt) < features.WorkloadEntryCleanupGracePeriod {
return false
}
return true
}
func (c *Controller) cleanupEntry(wle config.Config) {
if err := c.cleanupLimit.Wait(context.TODO()); err != nil {
log.Errorf("error in WorkloadEntry cleanup rate limiter: %v", err)
return
}
if err := c.store.Delete(gvk.WorkloadEntry, wle.Name, wle.Namespace, &wle.ResourceVersion); err != nil && !errors.IsNotFound(err) {
log.Warnf("failed cleaning up auto-registered WorkloadEntry %s/%s: %v", wle.Namespace, wle.Name, err)
autoRegistrationErrors.Increment()
return
}
autoRegistrationDeletes.Increment()
log.Infof("cleaned up auto-registered WorkloadEntry %s/%s", wle.Namespace, wle.Name)
}
func autoregisteredWorkloadEntryName(proxy *model.Proxy) string {
if proxy.Metadata.AutoRegisterGroup == "" {
return ""
}
if len(proxy.IPAddresses) == 0 {
log.Errorf("auto-registration of %v failed: missing IP addresses", proxy.ID)
return ""
}
if len(proxy.Metadata.Namespace) == 0 {
log.Errorf("auto-registration of %v failed: missing namespace", proxy.ID)
return ""
}
p := []string{proxy.Metadata.AutoRegisterGroup, sanitizeIP(proxy.IPAddresses[0])}
if proxy.Metadata.Network != "" {
p = append(p, string(proxy.Metadata.Network))
}
name := strings.Join(p, "-")
if len(name) > 253 {
name = name[len(name)-253:]
log.Warnf("generated WorkloadEntry name is too long, consider making the WorkloadGroup name shorter. Shortening from beginning to: %s", name)
}
return name
}
// sanitizeIP ensures an IP address (IPv6) can be used in Kubernetes resource name
func sanitizeIP(s string) string {
return strings.ReplaceAll(s, ":", "-")
}
func transformHealthEvent(proxy *model.Proxy, entryName string, event HealthEvent) HealthCondition {
cond := &v1alpha1.IstioCondition{
Type: status.ConditionHealthy,
// last probe and transition are the same because
// we only send on transition in the agent
LastProbeTime: timestamppb.Now(),
LastTransitionTime: timestamppb.Now(),
}
out := HealthCondition{
proxy: proxy,
entryName: entryName,
condition: cond,
}
if event.Healthy {
cond.Status = status.StatusTrue
return out
}
cond.Status = status.StatusFalse
cond.Message = event.Message
return out
}
func mergeLabels(labels ...map[string]string) map[string]string {
if len(labels) == 0 {
return map[string]string{}
}
out := make(map[string]string, len(labels)*len(labels[0]))
for _, lm := range labels {
for k, v := range lm {
out[k] = v
}
}
return out
}
var workloadGroupIsController = true
func workloadEntryFromGroup(name string, proxy *model.Proxy, groupCfg *config.Config) *config.Config {
group := groupCfg.Spec.(*v1alpha3.WorkloadGroup)
entry := group.Template.DeepCopy()
entry.Address = proxy.IPAddresses[0]
// TODO move labels out of entry
// node metadata > WorkloadGroup.Metadata > WorkloadGroup.Template
if group.Metadata != nil && group.Metadata.Labels != nil {
entry.Labels = mergeLabels(entry.Labels, group.Metadata.Labels)
}
if proxy.Metadata != nil && proxy.Metadata.Labels != nil {
entry.Labels = mergeLabels(entry.Labels, proxy.Metadata.Labels)
}
annotations := map[string]string{AutoRegistrationGroupAnnotation: groupCfg.Name}
if group.Metadata != nil && group.Metadata.Annotations != nil {
annotations = mergeLabels(annotations, group.Metadata.Annotations)
}
if proxy.Metadata.Network != "" {
entry.Network = string(proxy.Metadata.Network)
}
// proxy.Locality is unset when auto registration takes place, because its
// state is not fully initialized. Therefore, we check the bootstrap node.
if proxy.XdsNode.Locality != nil {
entry.Locality = util.LocalityToString(proxy.XdsNode.Locality)
}
if proxy.Metadata.ProxyConfig != nil && proxy.Metadata.ProxyConfig.ReadinessProbe != nil {
annotations[status.WorkloadEntryHealthCheckAnnotation] = "true"
}
return &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.WorkloadEntry,
Name: name,
Namespace: proxy.Metadata.Namespace,
Labels: entry.Labels,
Annotations: annotations,
OwnerReferences: []metav1.OwnerReference{{
APIVersion: groupCfg.GroupVersionKind.GroupVersion(),
Kind: groupCfg.GroupVersionKind.Kind,
Name: groupCfg.Name,
UID: kubetypes.UID(groupCfg.UID),
Controller: &workloadGroupIsController,
}},
},
Spec: entry,
// TODO status fields used for garbage collection
Status: nil,
}
}
func makeProxyKey(proxy *model.Proxy) string {
return string(proxy.Metadata.Network) + proxy.IPAddresses[0]
}