blob: 8fed318106fa0c3e89323d9fd780eee72cc22f4c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package controller
import (
"cmp"
"context"
"errors"
"fmt"
"slices"
"github.com/api7/gopkg/pkg/log"
"github.com/go-logr/logr"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/apache/apisix-ingress-controller/api/v1alpha1"
apiv2 "github.com/apache/apisix-ingress-controller/api/v2"
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
"github.com/apache/apisix-ingress-controller/internal/controller/status"
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
"github.com/apache/apisix-ingress-controller/internal/provider"
"github.com/apache/apisix-ingress-controller/internal/types"
"github.com/apache/apisix-ingress-controller/internal/utils"
pkgutils "github.com/apache/apisix-ingress-controller/pkg/utils"
)
// ApisixRouteReconciler reconciles a ApisixRoute object
type ApisixRouteReconciler struct {
client.Client
Scheme *runtime.Scheme
Log logr.Logger
Provider provider.Provider
Updater status.Updater
Readier readiness.ReadinessManager
}
// SetupWithManager sets up the controller with the Manager.
func (r *ApisixRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&apiv2.ApisixRoute{},
builder.WithPredicates(
MatchesIngressClassPredicate(r.Client, r.Log),
),
).
WithEventFilter(
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.NewPredicateFuncs(TypePredicate[*corev1.Secret]()),
),
).
Watches(
&networkingv1.IngressClass{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRouteForIngressClass),
builder.WithPredicates(
predicate.NewPredicateFuncs(matchesIngressController),
),
).
Watches(&v1alpha1.GatewayProxy{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRouteForGatewayProxy),
).
Watches(&discoveryv1.EndpointSlice{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRoutesForService),
).
Watches(&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRoutesForSecret),
).
Watches(&apiv2.ApisixUpstream{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRouteForApisixUpstream),
).
Watches(&apiv2.ApisixPluginConfig{},
handler.EnqueueRequestsFromMapFunc(r.listApisixRoutesForPluginConfig),
).
Named("apisixroute").
Complete(r)
}
func (r *ApisixRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
defer r.Readier.Done(&apiv2.ApisixRoute{}, req.NamespacedName)
var ar apiv2.ApisixRoute
if err := r.Get(ctx, req.NamespacedName, &ar); err != nil {
if client.IgnoreNotFound(err) == nil {
ar.Namespace = req.Namespace
ar.Name = req.Name
ar.TypeMeta = metav1.TypeMeta{
Kind: KindApisixRoute,
APIVersion: apiv2.GroupVersion.String(),
}
if err := r.Provider.Delete(ctx, &ar); err != nil {
r.Log.Error(err, "failed to delete apisixroute", "apisixroute", ar)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
var (
tctx = provider.NewDefaultTranslateContext(ctx)
ic *networkingv1.IngressClass
err error
)
if ic, err = FindMatchingIngressClass(tctx, r.Client, r.Log, &ar); err != nil {
r.Log.V(1).Info("no matching IngressClass available",
"ingressClassName", ar.Spec.IngressClassName,
"error", err.Error())
if err := r.Provider.Delete(ctx, &ar); err != nil {
r.Log.Error(err, "failed to delete apisixroute", "apisixroute", ar)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
defer func() { r.updateStatus(&ar, err) }()
if err = ProcessIngressClassParameters(tctx, r.Client, r.Log, &ar, ic); err != nil {
r.Log.Error(err, "failed to process IngressClass parameters")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if err = r.processApisixRoute(tctx, &ar); err != nil {
return ctrl.Result{}, err
}
if err = r.Provider.Update(ctx, tctx, &ar); err != nil {
err = types.ReasonError{
Reason: string(apiv2.ConditionReasonSyncFailed),
Message: err.Error(),
}
r.Log.Error(err, "failed to process", "apisixroute", ar)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *ApisixRouteReconciler) processApisixRoute(tctx *provider.TranslateContext, in *apiv2.ApisixRoute) error {
var (
rules = make(map[string]struct{})
)
for httpIndex, http := range in.Spec.HTTP {
// check rule names
if _, ok := rules[http.Name]; ok {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: "duplicate route rule name",
}
}
rules[http.Name] = struct{}{}
// check secret
if err := r.validatePlugins(tctx, in, http.Plugins); err != nil {
return err
}
// check plugin config reference
if http.PluginConfigName != "" {
if err := r.validatePluginConfig(tctx, in, http); err != nil {
return err
}
}
// check vars
if _, err := http.Match.NginxVars.ToVars(); err != nil {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf(".spec.http[%d].match.exprs: %s", httpIndex, err.Error()),
}
}
// validate remote address
if err := utils.ValidateRemoteAddrs(http.Match.RemoteAddrs); err != nil {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf(".spec.http[%d].match.remoteAddrs: %s", httpIndex, err.Error()),
}
}
// process backend
if err := r.validateHTTPBackends(tctx, in, http); err != nil {
return err
}
// process upstreams
if err := r.validateUpstreams(tctx, in, http); err != nil {
return err
}
}
for _, stream := range in.Spec.Stream {
// check rule names
if _, ok := rules[stream.Name]; ok {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: "duplicate route rule name",
}
}
rules[stream.Name] = struct{}{}
// check secret
if err := r.validatePlugins(tctx, in, stream.Plugins); err != nil {
return err
}
// process backend
if err := r.validateStreamBackend(tctx, in, stream); err != nil {
return err
}
}
return nil
}
func (r *ApisixRouteReconciler) validatePlugins(tctx *provider.TranslateContext, in *apiv2.ApisixRoute, plugins []apiv2.ApisixRoutePlugin) error {
// check secret
for _, plugin := range plugins {
if !plugin.Enable {
continue
}
// check secret
if err := r.validateSecrets(tctx, in, plugin.SecretRef); err != nil {
return err
}
}
return nil
}
func (r *ApisixRouteReconciler) validateStreamBackend(tctx *provider.TranslateContext, in *apiv2.ApisixRoute, stream apiv2.ApisixRouteStream) error {
return r.validateHTTPBackend(tctx, apiv2.ApisixRouteHTTPBackend{
ServiceName: stream.Backend.ServiceName,
ServicePort: stream.Backend.ServicePort,
Subset: stream.Backend.Subset,
ResolveGranularity: stream.Backend.ResolveGranularity,
}, in.GetNamespace())
}
func (r *ApisixRouteReconciler) validatePluginConfig(tctx *provider.TranslateContext, in *apiv2.ApisixRoute, http apiv2.ApisixRouteHTTP) error {
pcNamespace := in.Namespace
if http.PluginConfigNamespace != "" {
pcNamespace = http.PluginConfigNamespace
}
var (
pc = apiv2.ApisixPluginConfig{
ObjectMeta: metav1.ObjectMeta{
Name: http.PluginConfigName,
Namespace: pcNamespace,
},
}
pcNN = utils.NamespacedName(&pc)
)
if err := r.Get(tctx, pcNN, &pc); err != nil {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to get ApisixPluginConfig: %s", pcNN),
}
}
// Check if ApisixPluginConfig has IngressClassName and if it matches
if in.Spec.IngressClassName != pc.Spec.IngressClassName && pc.Spec.IngressClassName != "" {
var pcIC networkingv1.IngressClass
if err := r.Get(tctx, client.ObjectKey{Name: pc.Spec.IngressClassName}, &pcIC); err != nil {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to get IngressClass %s for ApisixPluginConfig %s: %v", pc.Spec.IngressClassName, pcNN, err),
}
}
if !matchesController(pcIC.Spec.Controller) {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("ApisixPluginConfig %s references IngressClass %s with non-matching controller", pcNN, pc.Spec.IngressClassName),
}
}
}
tctx.ApisixPluginConfigs[pcNN] = &pc
// Also check secrets referenced by plugin config
if err := r.validatePlugins(tctx, in, pc.Spec.Plugins); err != nil {
return err
}
return nil
}
func (r *ApisixRouteReconciler) validateSecrets(tctx *provider.TranslateContext, in *apiv2.ApisixRoute, secretRef string) error {
if secretRef == "" {
return nil
}
var (
secret = corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretRef,
Namespace: in.Namespace,
},
}
secretNN = utils.NamespacedName(&secret)
)
if err := r.Get(tctx, secretNN, &secret); err != nil {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to get Secret: %s", secretNN),
}
}
tctx.Secrets[utils.NamespacedName(&secret)] = &secret
return nil
}
func (r *ApisixRouteReconciler) processExternalNodes(tctx *provider.TranslateContext, ups apiv2.ApisixUpstream) error {
for _, node := range ups.Spec.ExternalNodes {
if node.Type == apiv2.ExternalTypeService {
var (
service corev1.Service
serviceNN = k8stypes.NamespacedName{Namespace: ups.GetNamespace(), Name: node.Name}
)
if err := r.Get(tctx, serviceNN, &service); err != nil {
r.Log.Error(err, "failed to get service in ApisixUpstream", "ApisixUpstream", ups.Name, "Service", serviceNN)
if client.IgnoreNotFound(err) == nil {
continue
}
return err
}
tctx.Services[utils.NamespacedName(&service)] = &service
}
}
return nil
}
func (r *ApisixRouteReconciler) processTLSSecret(tctx *provider.TranslateContext, ups apiv2.ApisixUpstream) error {
if ups.Spec.TLSSecret != nil && ups.Spec.TLSSecret.Name != "" {
var (
secret corev1.Secret
secretNN = k8stypes.NamespacedName{Namespace: cmp.Or(ups.Spec.TLSSecret.Namespace, ups.Namespace), Name: ups.Spec.TLSSecret.Name}
)
if err := r.Get(tctx, secretNN, &secret); err != nil {
r.Log.Error(err, "failed to get secret in ApisixUpstream", "ApisixUpstream", ups.Name, "Secret", secretNN)
if client.IgnoreNotFound(err) != nil {
return err
}
}
tctx.Secrets[secretNN] = &secret
}
return nil
}
func (r *ApisixRouteReconciler) validateHTTPBackends(tctx *provider.TranslateContext, in *apiv2.ApisixRoute, http apiv2.ApisixRouteHTTP) error {
var backends = make(map[k8stypes.NamespacedName]struct{})
for _, backend := range http.Backends {
serviceNN := k8stypes.NamespacedName{
Namespace: in.GetNamespace(),
Name: backend.ServiceName,
}
if _, ok := backends[serviceNN]; ok {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("duplicate backend service: %s", serviceNN),
}
}
backends[serviceNN] = struct{}{}
if err := r.validateHTTPBackend(tctx, backend, in.GetNamespace()); err != nil {
return err
}
}
return nil
}
func (r *ApisixRouteReconciler) validateHTTPBackend(tctx *provider.TranslateContext, backend apiv2.ApisixRouteHTTPBackend, ns string) error {
var (
au apiv2.ApisixUpstream
service corev1.Service
serviceNN = k8stypes.NamespacedName{
Namespace: ns,
Name: backend.ServiceName,
}
)
if err := r.Get(tctx, serviceNN, &service); err != nil {
if err = client.IgnoreNotFound(err); err == nil {
r.Log.Error(errors.New("service not found"), "Service", serviceNN)
return nil
}
return err
}
// try to get apisixupstream with the same name as the backend service
log.Debugw("try to get apisixupstream with the same name as the backend service", zap.Stringer("Service", serviceNN))
if err := r.Get(tctx, serviceNN, &au); err != nil {
log.Debugw("no ApisixUpstream with the same name as the backend service found", zap.Stringer("Service", serviceNN), zap.Error(err))
if err = client.IgnoreNotFound(err); err != nil {
return err
}
} else {
tctx.Upstreams[serviceNN] = &au
if err := r.processTLSSecret(tctx, au); err != nil {
return err
}
}
if service.Spec.Type == corev1.ServiceTypeExternalName {
tctx.Services[serviceNN] = &service
return nil
}
if backend.ResolveGranularity == "service" && service.Spec.ClusterIP == "" {
r.Log.Error(errors.New("service has no ClusterIP"), "Service", serviceNN, "ResolveGranularity", backend.ResolveGranularity)
return nil
}
if !slices.ContainsFunc(service.Spec.Ports, func(port corev1.ServicePort) bool {
if backend.ServicePort.Type == intstr.Int {
return port.Port == int32(backend.ServicePort.IntValue())
}
if backend.ServicePort.Type == intstr.String {
return port.Name == backend.ServicePort.StrVal
}
return false
}) {
if backend.ServicePort.Type == intstr.Int {
r.Log.Error(errors.New("port not found in service"), "Service", serviceNN, "port", backend.ServicePort.IntValue())
} else {
r.Log.Error(errors.New("named port not found in service"), "Service", serviceNN, "port", backend.ServicePort.StrVal)
}
return nil
}
tctx.Services[serviceNN] = &service
var endpoints discoveryv1.EndpointSliceList
if err := r.List(tctx, &endpoints,
client.InNamespace(service.Namespace),
client.MatchingLabels{
discoveryv1.LabelServiceName: service.Name,
},
); err != nil {
return types.ReasonError{
Reason: string(apiv2.ConditionReasonInvalidSpec),
Message: fmt.Sprintf("failed to list endpoint slices: %v", err),
}
}
// backend.subset specifies a subset of upstream nodes.
// It specifies that the target pod's label should be a superset of the subset labels of the ApisixUpstream of the serviceName
subsetLabels := r.getSubsetLabels(tctx, serviceNN, backend.Subset)
tctx.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(tctx, endpoints.Items, subsetLabels)
return nil
}
func (r *ApisixRouteReconciler) validateUpstreams(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute, http apiv2.ApisixRouteHTTP) error {
for _, upstream := range http.Upstreams {
if upstream.Name == "" {
continue
}
var (
ups apiv2.ApisixUpstream
upsNN = k8stypes.NamespacedName{
Namespace: ar.GetNamespace(),
Name: upstream.Name,
}
)
if err := r.Get(tctx, upsNN, &ups); err != nil {
r.Log.Error(err, "failed to get ApisixUpstream", "ApisixUpstream", upsNN)
if client.IgnoreNotFound(err) == nil {
continue
}
return err
}
tctx.Upstreams[upsNN] = &ups
if err := r.processExternalNodes(tctx, ups); err != nil {
return err
}
if err := r.processTLSSecret(tctx, ups); err != nil {
return err
}
}
return nil
}
func (r *ApisixRouteReconciler) listApisixRoutesForService(ctx context.Context, obj client.Object) []reconcile.Request {
endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
return nil
}
var (
namespace = endpointSlice.GetNamespace()
serviceName = endpointSlice.Labels[discoveryv1.LabelServiceName]
arList apiv2.ApisixRouteList
)
if err := r.List(ctx, &arList, client.MatchingFields{
indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName),
}); err != nil {
r.Log.Error(err, "failed to list apisixroutes by service", "service", serviceName)
return nil
}
requests := make([]reconcile.Request, 0, len(arList.Items))
for _, ar := range arList.Items {
requests = append(requests, reconcile.Request{NamespacedName: utils.NamespacedName(&ar)})
}
return pkgutils.DedupComparable(requests)
}
func (r *ApisixRouteReconciler) listApisixRoutesForSecret(ctx context.Context, obj client.Object) []reconcile.Request {
secret, ok := obj.(*corev1.Secret)
if !ok {
return nil
}
var (
arList apiv2.ApisixRouteList
pcList apiv2.ApisixPluginConfigList
allRequests = make([]reconcile.Request, 0)
)
// First, find ApisixRoutes that directly reference this secret
if err := r.List(ctx, &arList, client.MatchingFields{
indexer.SecretIndexRef: indexer.GenIndexKey(secret.GetNamespace(), secret.GetName()),
}); err != nil {
r.Log.Error(err, "failed to list apisixroutes by secret", "secret", secret.Name)
return nil
}
for _, ar := range arList.Items {
allRequests = append(allRequests, reconcile.Request{NamespacedName: utils.NamespacedName(&ar)})
}
// Second, find ApisixPluginConfigs that reference this secret
if err := r.List(ctx, &pcList, client.MatchingFields{
indexer.SecretIndexRef: indexer.GenIndexKey(secret.GetNamespace(), secret.GetName()),
}); err != nil {
r.Log.Error(err, "failed to list apisixpluginconfigs by secret", "secret", secret.Name)
return nil
}
// Then find ApisixRoutes that reference these PluginConfigs
for _, pc := range pcList.Items {
var arListForPC apiv2.ApisixRouteList
if err := r.List(ctx, &arListForPC, client.MatchingFields{
indexer.PluginConfigIndexRef: indexer.GenIndexKey(pc.GetNamespace(), pc.GetName()),
}); err != nil {
r.Log.Error(err, "failed to list apisixroutes by plugin config", "pluginconfig", pc.Name)
continue
}
for _, ar := range arListForPC.Items {
allRequests = append(allRequests, reconcile.Request{NamespacedName: utils.NamespacedName(&ar)})
}
}
return pkgutils.DedupComparable(allRequests)
}
func (r *ApisixRouteReconciler) listApisixRouteForIngressClass(ctx context.Context, object client.Object) (requests []reconcile.Request) {
ingressClass, ok := object.(*networkingv1.IngressClass)
if !ok {
return nil
}
return ListMatchingRequests(
ctx,
r.Client,
r.Log,
&apiv2.ApisixRouteList{},
func(obj client.Object) bool {
ar, ok := obj.(*apiv2.ApisixRoute)
if !ok {
r.Log.Error(fmt.Errorf("expected ApisixRoute, got %T", obj), "failed to match object type")
return false
}
return (IsDefaultIngressClass(ingressClass) && ar.Spec.IngressClassName == "") || ar.Spec.IngressClassName == ingressClass.Name
},
)
}
func (r *ApisixRouteReconciler) listApisixRouteForGatewayProxy(ctx context.Context, object client.Object) (requests []reconcile.Request) {
return listIngressClassRequestsForGatewayProxy(ctx, r.Client, object, r.Log, r.listApisixRouteForIngressClass)
}
func (r *ApisixRouteReconciler) listApisixRouteForApisixUpstream(ctx context.Context, object client.Object) (requests []reconcile.Request) {
au, ok := object.(*apiv2.ApisixUpstream)
if !ok {
return nil
}
var arList apiv2.ApisixRouteList
if err := r.List(ctx, &arList, client.MatchingFields{indexer.ApisixUpstreamRef: indexer.GenIndexKey(au.GetNamespace(), au.GetName())}); err != nil {
r.Log.Error(err, "failed to list ApisixRoutes")
return nil
}
for _, ar := range arList.Items {
requests = append(requests, reconcile.Request{NamespacedName: utils.NamespacedName(&ar)})
}
return pkgutils.DedupComparable(requests)
}
func (r *ApisixRouteReconciler) updateStatus(ar *apiv2.ApisixRoute, err error) {
SetApisixCRDConditionAccepted(&ar.Status, ar.GetGeneration(), err)
r.Updater.Update(status.Update{
NamespacedName: utils.NamespacedName(ar),
Resource: &apiv2.ApisixRoute{},
Mutator: status.MutatorFunc(func(obj client.Object) client.Object {
cp := obj.(*apiv2.ApisixRoute).DeepCopy()
cp.Status = ar.Status
return cp
}),
})
}
func (r *ApisixRouteReconciler) listApisixRoutesForPluginConfig(ctx context.Context, obj client.Object) []reconcile.Request {
pc, ok := obj.(*apiv2.ApisixPluginConfig)
if !ok {
return nil
}
// First check if the ApisixPluginConfig has matching IngressClassName
if pc.Spec.IngressClassName != "" {
var ic networkingv1.IngressClass
if err := r.Get(ctx, client.ObjectKey{Name: pc.Spec.IngressClassName}, &ic); err != nil {
if client.IgnoreNotFound(err) != nil {
r.Log.Error(err, "failed to get IngressClass for ApisixPluginConfig", "pluginconfig", pc.Name)
}
return nil
}
if !matchesController(ic.Spec.Controller) {
return nil
}
}
var arList apiv2.ApisixRouteList
if err := r.List(ctx, &arList, client.MatchingFields{
indexer.PluginConfigIndexRef: indexer.GenIndexKey(pc.GetNamespace(), pc.GetName()),
}); err != nil {
r.Log.Error(err, "failed to list apisixroutes by plugin config", "pluginconfig", pc.Name)
return nil
}
requests := make([]reconcile.Request, 0, len(arList.Items))
for _, ar := range arList.Items {
requests = append(requests, reconcile.Request{NamespacedName: utils.NamespacedName(&ar)})
}
return pkgutils.DedupComparable(requests)
}
func (r *ApisixRouteReconciler) getSubsetLabels(tctx *provider.TranslateContext, auNN k8stypes.NamespacedName, subset string) map[string]string {
if subset == "" {
return nil
}
au, ok := tctx.Upstreams[auNN]
if !ok {
return nil
}
// try to get the subset labels from the ApisixUpstream subsets
for _, s := range au.Spec.Subsets {
if subset == s.Name {
return s.Labels
}
}
return nil
}
func (r *ApisixRouteReconciler) filterEndpointSlicesBySubsetLabels(ctx context.Context, in []discoveryv1.EndpointSlice, labels map[string]string) []discoveryv1.EndpointSlice {
if len(labels) == 0 {
return in
}
for i := range in {
in[i] = r.filterEndpointSliceByTargetPod(ctx, in[i], labels)
}
return utils.Filter(in, func(v discoveryv1.EndpointSlice) bool {
return len(v.Endpoints) > 0
})
}
// filterEndpointSliceByTargetPod filters item.Endpoints which is not a subset of labels
func (r *ApisixRouteReconciler) filterEndpointSliceByTargetPod(ctx context.Context, item discoveryv1.EndpointSlice, labels map[string]string) discoveryv1.EndpointSlice {
item.Endpoints = utils.Filter(item.Endpoints, func(v discoveryv1.Endpoint) bool {
if v.TargetRef == nil || v.TargetRef.Kind != KindPod {
return true
}
var (
pod corev1.Pod
podNN = k8stypes.NamespacedName{
Namespace: v.TargetRef.Namespace,
Name: v.TargetRef.Name,
}
)
if err := r.Get(ctx, podNN, &pod); err != nil {
return false
}
return utils.IsSubsetOf(labels, pod.GetLabels())
})
return item
}