blob: a96316ee88f74b912c18d18ea06a25688e5605c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controller
import (
"fmt"
"strings"
"sync"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"github.com/apache/dubbo-kubernetes/pkg/config/visibility"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/hashicorp/go-multierror"
"istio.io/api/annotation"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)
var (
endpointSliceRequirement = labelRequirement(mcs.LabelServiceName, selection.DoesNotExist, nil)
endpointSliceSelector = klabels.NewSelector().Add(*endpointSliceRequirement)
)
type endpointSliceController struct {
endpointCache *endpointSliceCache
slices kclient.Client[*v1.EndpointSlice]
c *Controller
}
type endpointSliceCache struct {
mu sync.RWMutex
endpointsByServiceAndSlice map[host.Name]map[string][]*model.DubboEndpoint
}
func newEndpointSliceCache() *endpointSliceCache {
out := &endpointSliceCache{
endpointsByServiceAndSlice: make(map[host.Name]map[string][]*model.DubboEndpoint),
}
return out
}
func newEndpointSliceController(c *Controller) *endpointSliceController {
slices := kclient.NewFiltered[*v1.EndpointSlice](c.client, kclient.Filter{ObjectFilter: c.client.ObjectFilter()})
out := &endpointSliceController{
c: c,
slices: slices,
endpointCache: newEndpointSliceCache(),
}
registerHandlers[*v1.EndpointSlice](c, slices, "EndpointSlice", out.onEvent, nil)
return out
}
func (esc *endpointSliceController) onEvent(_, ep *v1.EndpointSlice, event model.Event) error {
esc.onEventInternal(nil, ep, event)
return nil
}
func (esc *endpointSliceController) onEventInternal(_, ep *v1.EndpointSlice, event model.Event) {
esLabels := ep.GetLabels()
if !endpointSliceSelector.Matches(klabels.Set(esLabels)) {
return
}
if event == model.EventDelete {
esc.deleteEndpointSlice(ep)
} else {
esc.updateEndpointSlice(ep)
}
// Now check if we need to do a full push for the service.
// If the service is headless, we need to do a full push if service exposes TCP ports
// to create IP based listeners. For pure HTTP headless services, we only need to push NDS.
name := serviceNameForEndpointSlice(esLabels)
namespace := ep.GetNamespace()
svc := esc.c.services.Get(name, namespace)
if svc != nil && !serviceNeedsPush(svc) {
return
}
namespacedName := getServiceNamespacedName(ep)
hostnames := esc.c.hostNamesForNamespacedName(namespacedName)
esc.pushEDS(hostnames, namespacedName.Namespace)
if svc == nil || svc.Spec.ClusterIP != corev1.ClusterIPNone || svc.Spec.Type == corev1.ServiceTypeExternalName {
return
}
configsUpdated := sets.New[model.ConfigKey]()
supportsOnlyHTTP := true
for _, modelSvc := range esc.c.servicesForNamespacedName(config.NamespacedName(svc)) {
for _, p := range modelSvc.Ports {
if !p.Protocol.IsHTTP() {
supportsOnlyHTTP = false
break
}
}
if supportsOnlyHTTP {
configsUpdated.Insert(model.ConfigKey{Kind: kind.ServiceEntry, Name: modelSvc.Hostname.String(), Namespace: svc.Namespace})
}
}
if len(configsUpdated) > 0 {
esc.c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
Full: true,
ConfigsUpdated: configsUpdated,
Reason: model.NewReasonStats(model.HeadlessEndpointUpdate),
})
}
}
func (esc *endpointSliceController) deleteEndpointSlice(slice *v1.EndpointSlice) {
key := config.NamespacedName(slice)
for _, e := range slice.Endpoints {
for _, a := range e.Addresses {
esc.c.pods.endpointDeleted(key, a)
}
}
esc.endpointCache.mu.Lock()
defer esc.endpointCache.mu.Unlock()
for _, hostName := range esc.c.hostNamesForNamespacedName(getServiceNamespacedName(slice)) {
// endpointSlice cache update
if esc.endpointCache.has(hostName) {
esc.endpointCache.delete(hostName, slice.Name)
}
}
}
func (e *endpointSliceCache) has(hostname host.Name) bool {
_, found := e.endpointsByServiceAndSlice[hostname]
return found
}
func (e *endpointSliceCache) delete(hostname host.Name, slice string) {
delete(e.endpointsByServiceAndSlice[hostname], slice)
if len(e.endpointsByServiceAndSlice[hostname]) == 0 {
delete(e.endpointsByServiceAndSlice, hostname)
}
}
func (esc *endpointSliceController) updateEndpointSlice(slice *v1.EndpointSlice) {
for _, hostname := range esc.c.hostNamesForNamespacedName(getServiceNamespacedName(slice)) {
esc.updateEndpointCacheForSlice(hostname, slice)
}
}
func (esc *endpointSliceController) initializeNamespace(ns string, filtered bool) error {
var err *multierror.Error
var endpoints []*v1.EndpointSlice
if filtered {
endpoints = esc.slices.List(ns, klabels.Everything())
} else {
endpoints = esc.slices.ListUnfiltered(ns, klabels.Everything())
}
for _, s := range endpoints {
err = multierror.Append(err, esc.onEvent(nil, s, model.EventAdd))
}
return err.ErrorOrNil()
}
func (esc *endpointSliceController) updateEndpointCacheForSlice(hostName host.Name, epSlice *v1.EndpointSlice) {
var endpoints []*model.DubboEndpoint
if epSlice.AddressType == v1.AddressTypeFQDN {
return
}
svc := esc.c.GetService(hostName)
svcNamespacedName := getServiceNamespacedName(epSlice)
// This is not a endpointslice for service, ignore
if svcNamespacedName.Name == "" {
return
}
for _, e := range epSlice.Endpoints {
// Draining tracking is only enabled if persistent sessions is enabled.
// If we start using them for other features, this can be adjusted.
healthStatus := endpointHealthStatus(svc, e)
// CRITICAL: Log health status for debugging (use Warningf to ensure visibility)
if len(e.Addresses) > 0 {
ready := "nil"
terminating := "nil"
if e.Conditions.Ready != nil {
ready = fmt.Sprintf("%v", *e.Conditions.Ready)
}
if e.Conditions.Terminating != nil {
terminating = fmt.Sprintf("%v", *e.Conditions.Terminating)
}
log.Debugf("endpointHealthStatus: address=%s, Ready=%s, Terminating=%s, HealthStatus=%v, svc=%v",
e.Addresses[0], ready, terminating, healthStatus, svc != nil)
}
for _, a := range e.Addresses {
pod, expectedPod := getPod(esc.c, a, &metav1.ObjectMeta{Name: epSlice.Name, Namespace: epSlice.Namespace}, e.TargetRef, hostName)
if pod == nil && expectedPod {
continue
}
var overrideAddresses []string
builder := esc.c.NewEndpointBuilder(pod)
// EDS and ServiceEntry use name for service port - ADS will need to map to numbers.
// CRITICAL FIX: Always use Service.Port.Name as source of truth for ServicePortName
// - Use EndpointSlice.Port.Port (service port number) as endpointPort
// - Always resolve portName from Service by matching port number (Service is source of truth)
// - This ensures ep.ServicePortName matches svcPort.Name in BuildClusterLoadAssignment
kubeSvc := esc.c.services.Get(svcNamespacedName.Name, svcNamespacedName.Namespace)
for _, port := range epSlice.Ports {
var epSlicePortNum int32
if port.Port != nil {
epSlicePortNum = *port.Port
}
var epSlicePortName string
if port.Name != nil {
epSlicePortName = *port.Name
}
var servicePortNum int32
var targetPortNum int32
var portName string
// CRITICAL FIX: EndpointSlice.Port.Port might be targetPort or service port
// We need to find the matching ServicePort and resolve:
// 1. servicePortNum (Service.Port) - used for matching in BuildClusterLoadAssignment
// 2. targetPortNum (Service.TargetPort) - used as EndpointPort
// 3. portName (Service.Port.Name) - used as ServicePortName for filtering
if kubeSvc != nil {
matched := false
for _, kubePort := range kubeSvc.Spec.Ports {
// Try matching by port number (service port or targetPort)
portMatches := int32(kubePort.Port) == epSlicePortNum
// Also try matching by targetPort
var kubeTargetPort int32
if kubePort.TargetPort.Type == intstr.Int {
kubeTargetPort = kubePort.TargetPort.IntVal
} else if kubePort.TargetPort.Type == intstr.String && pod != nil {
// Resolve targetPort name from Pod
for _, container := range pod.Spec.Containers {
for _, containerPort := range container.Ports {
if containerPort.Name == kubePort.TargetPort.StrVal {
kubeTargetPort = containerPort.ContainerPort
break
}
}
if kubeTargetPort != 0 {
break
}
}
}
targetPortMatches := kubeTargetPort == epSlicePortNum
// Match by port name if available
nameMatches := epSlicePortName == "" || kubePort.Name == epSlicePortName
if (portMatches || targetPortMatches) && (epSlicePortName == "" || nameMatches) {
// Found matching ServicePort
servicePortNum = int32(kubePort.Port)
portName = kubePort.Name
// Resolve targetPortNum
if kubePort.TargetPort.Type == intstr.Int {
targetPortNum = kubePort.TargetPort.IntVal
} else if kubePort.TargetPort.Type == intstr.String && pod != nil {
// Resolve targetPort name from Pod container ports
for _, container := range pod.Spec.Containers {
for _, containerPort := range container.Ports {
if containerPort.Name == kubePort.TargetPort.StrVal {
targetPortNum = containerPort.ContainerPort
break
}
}
if targetPortNum != 0 {
break
}
}
} else {
// If targetPort is string but pod not found, use service port as fallback
targetPortNum = servicePortNum
}
// If targetPortNum is still 0, use service port
if targetPortNum == 0 {
targetPortNum = servicePortNum
}
matched = true
log.Debugf("updateEndpointCacheForSlice: matched ServicePort (servicePort=%d, targetPort=%d, portName='%s') for EndpointSlice.Port (portNum=%d, portName='%s')",
servicePortNum, targetPortNum, portName, epSlicePortNum, epSlicePortName)
break
}
}
if !matched {
// CRITICAL FIX: If we can't match by Service, try to find ServicePort by port number only
// This handles cases where EndpointSlice.Port.Name doesn't match but port number does
// This is critical for ensuring portName matches Service.Port.Name
for _, kubePort := range kubeSvc.Spec.Ports {
if int32(kubePort.Port) == epSlicePortNum {
// Found by port number, use Service.Port.Name
servicePortNum = int32(kubePort.Port)
portName = kubePort.Name
// Resolve targetPortNum
if kubePort.TargetPort.Type == intstr.Int {
targetPortNum = kubePort.TargetPort.IntVal
} else if kubePort.TargetPort.Type == intstr.String && pod != nil {
for _, container := range pod.Spec.Containers {
for _, containerPort := range container.Ports {
if containerPort.Name == kubePort.TargetPort.StrVal {
targetPortNum = containerPort.ContainerPort
break
}
}
if targetPortNum != 0 {
break
}
}
}
if targetPortNum == 0 {
targetPortNum = servicePortNum
}
matched = true
log.Infof("updateEndpointCacheForSlice: matched ServicePort by port number only (servicePort=%d, targetPort=%d, portName='%s') for EndpointSlice.Port (portNum=%d, portName='%s')",
servicePortNum, targetPortNum, portName, epSlicePortNum, epSlicePortName)
break
}
}
if !matched {
log.Warnf("updateEndpointCacheForSlice: failed to match EndpointSlice.Port (portNum=%d, portName='%s') with Service %s, using EndpointSlice values (WARNING: portName may not match Service.Port.Name)",
epSlicePortNum, epSlicePortName, svcNamespacedName.Name)
// Fallback: use EndpointSlice values
// CRITICAL: This should rarely happen, but if it does, portName may not match Service.Port.Name
// which will cause endpoints to be filtered in BuildClusterLoadAssignment
servicePortNum = epSlicePortNum
targetPortNum = epSlicePortNum
if epSlicePortName != "" {
portName = epSlicePortName
} else {
// If EndpointSlice.Port.Name is also empty, try to get port name from Service by port number
for _, kubePort := range kubeSvc.Spec.Ports {
if int32(kubePort.Port) == epSlicePortNum {
portName = kubePort.Name
log.Debugf("updateEndpointCacheForSlice: resolved portName='%s' from Service by port number %d", portName, epSlicePortNum)
break
}
}
}
}
}
} else {
// Service not found, use EndpointSlice values
servicePortNum = epSlicePortNum
targetPortNum = epSlicePortNum
if epSlicePortName != "" {
portName = epSlicePortName
}
log.Debugf("updateEndpointCacheForSlice: Service not found for %s, using EndpointSlice values (portNum=%d, portName='%s')",
svcNamespacedName.Name, epSlicePortNum, epSlicePortName)
}
// CRITICAL: Log endpoint creation with actual values for debugging
log.Debugf("updateEndpointCacheForSlice: creating endpoint for service %s (address=%s, servicePortNum=%d, targetPortNum=%d, portName='%s', hostname=%s, kubeSvc=%v)",
svcNamespacedName.Name, a, servicePortNum, targetPortNum, portName, hostName, kubeSvc != nil)
// CRITICAL FIX: According to Istio's implementation and Kubernetes EndpointSlice spec:
// - EndpointSlice.Port.Port should be the Service Port (not targetPort)
// - But IstioEndpoint.EndpointPort should be the targetPort (container port)
// - ServicePortName should match Service.Port.Name for filtering in BuildClusterLoadAssignment
//
// We use targetPortNum as EndpointPort because that's what the container actually listens on.
// The servicePortNum is used for matching in BuildClusterLoadAssignment via portName.
//
// CRITICAL: svc.SupportsUnhealthyEndpoints() returns true if Service has publishNotReadyAddresses=true
// This allows endpoints with Ready=false to be included in EDS, which is useful for services
// that need to receive traffic even before they are fully ready (e.g., during startup).
// CRITICAL FIX: Check if svc is nil before calling SupportsUnhealthyEndpoints
var supportsUnhealthy bool
if svc != nil {
supportsUnhealthy = svc.SupportsUnhealthyEndpoints()
} else {
// If service is nil, default to false (don't support unhealthy endpoints)
supportsUnhealthy = false
}
dubboEndpoint := builder.buildDubboEndpoint(a, targetPortNum, portName, nil, healthStatus, supportsUnhealthy)
// CRITICAL: Log if endpoint is unhealthy and service doesn't support it
if healthStatus == model.UnHealthy && !supportsUnhealthy {
if svc != nil {
log.Debugf("updateEndpointCacheForSlice: endpoint %s is unhealthy (HealthStatus=%v) but service %s does not support unhealthy endpoints (PublishNotReadyAddresses=%v). Endpoint will be filtered in EDS.",
a, healthStatus, svcNamespacedName.Name, svc.Attributes.PublishNotReadyAddresses)
} else {
log.Debugf("updateEndpointCacheForSlice: endpoint %s is unhealthy (HealthStatus=%v) but service %s is nil. Endpoint will be filtered in EDS.",
a, healthStatus, svcNamespacedName.Name)
}
}
// CRITICAL: Verify the endpoint was created with correct ServicePortName
if dubboEndpoint != nil {
log.Debugf("updateEndpointCacheForSlice: created endpoint with ServicePortName='%s', EndpointPort=%d, address=%s",
dubboEndpoint.ServicePortName, dubboEndpoint.EndpointPort, dubboEndpoint.FirstAddressOrNil())
} else {
log.Debugf("updateEndpointCacheForSlice: buildDubboEndpoint returned nil for address=%s, targetPortNum=%d, portName='%s'",
a, targetPortNum, portName)
}
if len(overrideAddresses) > 1 {
dubboEndpoint.Addresses = overrideAddresses
}
endpoints = append(endpoints, dubboEndpoint)
}
}
}
esc.endpointCache.Update(hostName, epSlice.Name, endpoints)
}
func (e *endpointSliceCache) Update(hostname host.Name, slice string, endpoints []*model.DubboEndpoint) {
e.mu.Lock()
defer e.mu.Unlock()
e.update(hostname, slice, endpoints)
}
func (e *endpointSliceCache) update(hostname host.Name, slice string, endpoints []*model.DubboEndpoint) {
if len(endpoints) == 0 {
delete(e.endpointsByServiceAndSlice[hostname], slice)
}
if _, f := e.endpointsByServiceAndSlice[hostname]; !f {
e.endpointsByServiceAndSlice[hostname] = make(map[string][]*model.DubboEndpoint)
}
// We will always overwrite. A conflict here means an endpoint is transitioning
// from one slice to another See
// https://github.com/kubernetes/website/blob/master/content/en/docs/concepts/services-networking/endpoint-slices.md#duplicate-endpoints
// In this case, we can always assume and update is fresh, although older slices
// we have not gotten updates may be stale; therefore we always take the new
// update.
e.endpointsByServiceAndSlice[hostname][slice] = endpoints
}
func endpointHealthStatus(svc *model.Service, e v1.Endpoint) model.HealthStatus {
// CRITICAL FIX: Correct health status logic
// 1. If Ready is nil or true, endpoint is healthy
// 2. If Ready is false, check if it's terminating
// 3. If terminating, mark as Terminating
// 4. Otherwise, mark as UnHealthy
// Check Ready condition
if e.Conditions.Ready == nil || *e.Conditions.Ready {
return model.Healthy
}
// Ready is false, check if it's terminating
// CRITICAL FIX: Terminating should be checked only if it's not nil AND true
// If Terminating is nil, it means the endpoint is not terminating (it's just not ready)
if e.Conditions.Terminating != nil && *e.Conditions.Terminating {
return model.Terminating
}
// Ready is false and not terminating, mark as unhealthy
return model.UnHealthy
}
func serviceNameForEndpointSlice(labels map[string]string) string {
return labels[v1.LabelServiceName]
}
func getPod(c *Controller, ip string, ep *metav1.ObjectMeta, targetRef *corev1.ObjectReference, host host.Name) (*corev1.Pod, bool) {
var expectPod bool
pod := c.getPod(ip, ep.Namespace, targetRef)
if targetRef != nil && targetRef.Kind == kind.Pod.String() {
expectPod = true
if pod == nil {
c.registerEndpointResync(ep, ip, host)
}
}
return pod, expectPod
}
func (c *Controller) registerEndpointResync(ep *metav1.ObjectMeta, ip string, host host.Name) {
// Tell pod cache we want to queue the endpoint event when this pod arrives.
c.pods.queueEndpointEventOnPodArrival(config.NamespacedName(ep), ip)
}
func (c *Controller) getPod(ip string, namespace string, targetRef *corev1.ObjectReference) *corev1.Pod {
if targetRef != nil && targetRef.Kind == kind.Pod.String() {
key := types.NamespacedName{Name: targetRef.Name, Namespace: targetRef.Namespace}
pod := c.pods.getPodByKey(key)
return pod
}
// This means the endpoint is manually controlled
// We will want to lookup a pod to find metadata like service account, labels, etc. But for hostNetwork, we just get a raw IP,
// and the IP may be shared by many pods. Best we can do is guess.
pods := c.pods.getPodsByIP(ip)
for _, p := range pods {
if p.Namespace == namespace {
// Might not be right, but best we can do.
return p
}
}
return nil
}
func getServiceNamespacedName(slice *v1.EndpointSlice) types.NamespacedName {
return types.NamespacedName{
Namespace: slice.GetNamespace(),
Name: serviceNameForEndpointSlice(slice.GetLabels()),
}
}
func serviceNeedsPush(svc *corev1.Service) bool {
if svc.Annotations[annotation.NetworkingExportTo.Name] != "" {
namespaces := strings.Split(svc.Annotations[annotation.NetworkingExportTo.Name], ",")
for _, ns := range namespaces {
ns = strings.TrimSpace(ns)
if ns == string(visibility.None) {
return false
}
}
}
return true
}
func (esc *endpointSliceController) pushEDS(hostnames []host.Name, namespace string) {
shard := model.ShardKeyFromRegistry(esc.c)
esc.endpointCache.mu.Lock()
defer esc.endpointCache.mu.Unlock()
for _, hostname := range hostnames {
endpoints := esc.endpointCache.get(hostname)
// CRITICAL: Log endpoint registration for debugging
log.Infof("pushEDS: registering %d endpoints for service %s in namespace %s (shard=%v)",
len(endpoints), string(hostname), namespace, shard)
if len(endpoints) > 0 {
// Log endpoint details for first few endpoints
for i, ep := range endpoints {
if i < 3 {
log.Infof("pushEDS: endpoint[%d] address=%s, port=%d, ServicePortName='%s', HealthStatus=%v",
i, ep.FirstAddressOrNil(), ep.EndpointPort, ep.ServicePortName, ep.HealthStatus)
}
}
}
esc.c.opts.XDSUpdater.EDSUpdate(shard, string(hostname), namespace, endpoints)
}
}
type endpointKey struct {
ip string
port string
}
func (e *endpointSliceCache) get(hostname host.Name) []*model.DubboEndpoint {
var endpoints []*model.DubboEndpoint
found := sets.New[endpointKey]()
for _, eps := range e.endpointsByServiceAndSlice[hostname] {
for _, ep := range eps {
key := endpointKey{ep.FirstAddressOrNil(), ep.ServicePortName}
if found.InsertContains(key) {
// This a duplicate. Update() already handles conflict resolution, so we don't
// need to pick the "right" one here.
continue
}
endpoints = append(endpoints, ep)
}
}
return endpoints
}
func (esc *endpointSliceController) podArrived(name, ns string) error {
ep := esc.slices.Get(name, ns)
if ep == nil {
return nil
}
return esc.onEvent(nil, ep, model.EventAdd)
}
func (esc *endpointSliceController) buildDubboEndpointsWithService(name, namespace string, hostName host.Name, updateCache bool) []*model.DubboEndpoint {
esLabelSelector := endpointSliceSelectorForService(name)
slices := esc.slices.List(namespace, esLabelSelector)
if len(slices) == 0 {
return nil
}
if updateCache {
// A cache update was requested. Rebuild the endpoints for these slices.
for _, slice := range slices {
esc.updateEndpointCacheForSlice(hostName, slice)
}
}
return esc.endpointCache.Get(hostName)
}
func (e *endpointSliceCache) Get(hostname host.Name) []*model.DubboEndpoint {
e.mu.RLock()
defer e.mu.RUnlock()
return e.get(hostname)
}
func endpointSliceSelectorForService(name string) klabels.Selector {
return klabels.Set(map[string]string{
v1.LabelServiceName: name,
}).AsSelectorPreValidated().Add(*endpointSliceRequirement)
}