blob: 125a14a9075a101de4a8c032bc1e129b83b6c74b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package controller
import (
"cmp"
"context"
"fmt"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
k8stypes "k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
"sigs.k8s.io/gateway-api/apis/v1beta1"
"github.com/apache/apisix-ingress-controller/api/v1alpha1"
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
"github.com/apache/apisix-ingress-controller/internal/controller/status"
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
"github.com/apache/apisix-ingress-controller/internal/provider"
"github.com/apache/apisix-ingress-controller/internal/types"
"github.com/apache/apisix-ingress-controller/internal/utils"
)
// TCPRouteReconciler reconciles a TCPRoute object.
type TCPRouteReconciler struct { //nolint:revive
client.Client
Scheme *runtime.Scheme
Log logr.Logger
Provider provider.Provider
Updater status.Updater
Readier readiness.ReadinessManager
}
// SetupWithManager sets up the controller with the Manager.
func (r *TCPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
bdr := ctrl.NewControllerManagedBy(mgr).
For(&gatewayv1alpha2.TCPRoute{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Watches(&discoveryv1.EndpointSlice{},
handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesByServiceRef),
).
Watches(&gatewayv1.Gateway{},
handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForGateway),
builder.WithPredicates(
predicate.Funcs{
GenericFunc: func(e event.GenericEvent) bool {
return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
return false
},
CreateFunc: func(e event.CreateEvent) bool {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
return true
},
},
),
).
Watches(&v1alpha1.BackendTrafficPolicy{},
handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForBackendTrafficPolicy),
).
Watches(&v1alpha1.GatewayProxy{},
handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForGatewayProxy),
)
if GetEnableReferenceGrant() {
bdr.Watches(&v1beta1.ReferenceGrant{},
handler.EnqueueRequestsFromMapFunc(r.listTCPRoutesForReferenceGrant),
builder.WithPredicates(referenceGrantPredicates(KindTCPRoute)),
)
}
return bdr.Complete(r)
}
func (r *TCPRouteReconciler) listTCPRoutesForBackendTrafficPolicy(ctx context.Context, obj client.Object) []reconcile.Request {
policy, ok := obj.(*v1alpha1.BackendTrafficPolicy)
if !ok {
r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to BackendTrafficPolicy")
return nil
}
tcprouteList := []gatewayv1alpha2.TCPRoute{}
for _, targetRef := range policy.Spec.TargetRefs {
service := &corev1.Service{}
if err := r.Get(ctx, client.ObjectKey{
Namespace: policy.Namespace,
Name: string(targetRef.Name),
}, service); err != nil {
if client.IgnoreNotFound(err) != nil {
r.Log.Error(err, "failed to get service", "namespace", policy.Namespace, "name", targetRef.Name)
}
continue
}
tcprList := &gatewayv1alpha2.TCPRouteList{}
if err := r.List(ctx, tcprList, client.MatchingFields{
indexer.ServiceIndexRef: indexer.GenIndexKey(policy.Namespace, string(targetRef.Name)),
}); err != nil {
r.Log.Error(err, "failed to list tcproutes by service reference", "service", targetRef.Name)
return nil
}
tcprouteList = append(tcprouteList, tcprList.Items...)
}
var namespacedNameMap = make(map[k8stypes.NamespacedName]struct{})
requests := make([]reconcile.Request, 0, len(tcprouteList))
for _, tr := range tcprouteList {
key := k8stypes.NamespacedName{
Namespace: tr.Namespace,
Name: tr.Name,
}
if _, ok := namespacedNameMap[key]; !ok {
namespacedNameMap[key] = struct{}{}
requests = append(requests, reconcile.Request{
NamespacedName: client.ObjectKey{
Namespace: tr.Namespace,
Name: tr.Name,
},
})
}
}
return requests
}
func (r *TCPRouteReconciler) listTCPRoutesForGateway(ctx context.Context, obj client.Object) []reconcile.Request {
gateway, ok := obj.(*gatewayv1.Gateway)
if !ok {
r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to Gateway")
}
tcprList := &gatewayv1alpha2.TCPRouteList{}
if err := r.List(ctx, tcprList, client.MatchingFields{
indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name),
}); err != nil {
r.Log.Error(err, "failed to list tcproutes by gateway", "gateway", gateway.Name)
return nil
}
requests := make([]reconcile.Request, 0, len(tcprList.Items))
for _, tcr := range tcprList.Items {
requests = append(requests, reconcile.Request{
NamespacedName: client.ObjectKey{
Namespace: tcr.Namespace,
Name: tcr.Name,
},
})
}
return requests
}
// listTCPRoutesForGatewayProxy list all TCPRoute resources that are affected by a given GatewayProxy
func (r *TCPRouteReconciler) listTCPRoutesForGatewayProxy(ctx context.Context, obj client.Object) []reconcile.Request {
gatewayProxy, ok := obj.(*v1alpha1.GatewayProxy)
if !ok {
r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to GatewayProxy")
return nil
}
namespace := gatewayProxy.GetNamespace()
name := gatewayProxy.GetName()
// find all gateways that reference this gateway proxy
gatewayList := &gatewayv1.GatewayList{}
if err := r.List(ctx, gatewayList, client.MatchingFields{
indexer.ParametersRef: indexer.GenIndexKey(namespace, name),
}); err != nil {
r.Log.Error(err, "failed to list gateways for gateway proxy", "gatewayproxy", gatewayProxy.GetName())
return nil
}
var requests []reconcile.Request
// for each gateway, find all TCPRoute resources that reference it
for _, gateway := range gatewayList.Items {
tcpRouteList := &gatewayv1alpha2.TCPRouteList{}
if err := r.List(ctx, tcpRouteList, client.MatchingFields{
indexer.ParentRefs: indexer.GenIndexKey(gateway.Namespace, gateway.Name),
}); err != nil {
r.Log.Error(err, "failed to list tcproutes for gateway", "gateway", gateway.Name)
continue
}
for _, tcpRoute := range tcpRouteList.Items {
requests = append(requests, reconcile.Request{
NamespacedName: client.ObjectKey{
Namespace: tcpRoute.Namespace,
Name: tcpRoute.Name,
},
})
}
}
return requests
}
func (r *TCPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
defer r.Readier.Done(&gatewayv1alpha2.TCPRoute{}, req.NamespacedName)
tr := new(gatewayv1alpha2.TCPRoute)
if err := r.Get(ctx, req.NamespacedName, tr); err != nil {
if client.IgnoreNotFound(err) == nil {
tr.Namespace = req.Namespace
tr.Name = req.Name
tr.TypeMeta = metav1.TypeMeta{
Kind: KindTCPRoute,
APIVersion: gatewayv1alpha2.GroupVersion.String(),
}
if err := r.Provider.Delete(ctx, tr); err != nil {
r.Log.Error(err, "failed to delete tcproute", "tcproute", tr)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
type ResourceStatus struct {
status bool
msg string
}
acceptStatus := ResourceStatus{
status: true,
msg: "Route is accepted",
}
gateways, err := ParseRouteParentRefs(ctx, r.Client, r.Log, tr, tr.Spec.ParentRefs)
if err != nil {
return ctrl.Result{}, err
}
if len(gateways) == 0 {
return ctrl.Result{}, nil
}
tctx := provider.NewDefaultTranslateContext(ctx)
tctx.RouteParentRefs = tr.Spec.ParentRefs
rk := utils.NamespacedNameKind(tr)
for _, gateway := range gateways {
if err := ProcessGatewayProxy(r.Client, r.Log, tctx, gateway.Gateway, rk); err != nil {
acceptStatus.status = false
acceptStatus.msg = err.Error()
}
}
var backendRefErr error
if err := r.processTCPRoute(tctx, tr); err != nil {
// When encountering a backend reference error, it should not affect the acceptance status
if types.IsSomeReasonError(err, gatewayv1.RouteReasonInvalidKind) {
backendRefErr = err
} else {
acceptStatus.status = false
acceptStatus.msg = err.Error()
}
}
// Store the backend reference error for later use.
// If the backend reference error is because of an invalid kind, use this error first
if err := r.processTCPRouteBackendRefs(tctx, req.NamespacedName); err != nil && backendRefErr == nil {
backendRefErr = err
}
ProcessBackendTrafficPolicy(r.Client, r.Log, tctx)
tr.Status.Parents = make([]gatewayv1.RouteParentStatus, 0, len(gateways))
for _, gateway := range gateways {
parentStatus := gatewayv1.RouteParentStatus{}
SetRouteParentRef(&parentStatus, gateway.Gateway.Name, gateway.Gateway.Namespace)
for _, condition := range gateway.Conditions {
parentStatus.Conditions = MergeCondition(parentStatus.Conditions, condition)
}
SetRouteConditionAccepted(&parentStatus, tr.GetGeneration(), acceptStatus.status, acceptStatus.msg)
SetRouteConditionResolvedRefs(&parentStatus, tr.GetGeneration(), backendRefErr)
tr.Status.Parents = append(tr.Status.Parents, parentStatus)
}
r.Updater.Update(status.Update{
NamespacedName: utils.NamespacedName(tr),
Resource: &gatewayv1alpha2.TCPRoute{},
Mutator: status.MutatorFunc(func(obj client.Object) client.Object {
t, ok := obj.(*gatewayv1alpha2.TCPRoute)
if !ok {
err := fmt.Errorf("unsupported object type %T", obj)
panic(err)
}
tCopy := t.DeepCopy()
tCopy.Status = tr.Status
return tCopy
}),
})
UpdateStatus(r.Updater, r.Log, tctx)
if isRouteAccepted(gateways) {
routeToUpdate := tr
if err := r.Provider.Update(ctx, tctx, routeToUpdate); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
func (r *TCPRouteReconciler) processTCPRoute(tctx *provider.TranslateContext, tcpRoute *gatewayv1alpha2.TCPRoute) error {
var terror error
for _, rule := range tcpRoute.Spec.Rules {
for _, backend := range rule.BackendRefs {
if backend.Kind != nil && *backend.Kind != KindService {
terror = types.NewInvalidKindError(*backend.Kind)
continue
}
tctx.BackendRefs = append(tctx.BackendRefs, gatewayv1.BackendRef{
BackendObjectReference: gatewayv1.BackendObjectReference{
Name: backend.Name,
Namespace: cmp.Or(backend.Namespace, (*gatewayv1.Namespace)(&tcpRoute.Namespace)),
Port: backend.Port,
},
})
}
}
return terror
}
func (r *TCPRouteReconciler) processTCPRouteBackendRefs(tctx *provider.TranslateContext, trNN k8stypes.NamespacedName) error {
var terr error
for _, backend := range tctx.BackendRefs {
targetNN := k8stypes.NamespacedName{
Namespace: trNN.Namespace,
Name: string(backend.Name),
}
if backend.Namespace != nil {
targetNN.Namespace = string(*backend.Namespace)
}
if backend.Kind != nil && *backend.Kind != KindService {
terr = types.NewInvalidKindError(*backend.Kind)
continue
}
if backend.Port == nil {
terr = fmt.Errorf("port is required")
continue
}
var service corev1.Service
if err := r.Get(tctx, targetNN, &service); err != nil {
terr = err
if client.IgnoreNotFound(err) == nil {
terr = types.ReasonError{
Reason: string(gatewayv1.RouteReasonBackendNotFound),
Message: fmt.Sprintf("Service %s not found", targetNN),
}
}
continue
}
// if cross namespaces between TCPRoute and referenced Service, check ReferenceGrant
if trNN.Namespace != targetNN.Namespace {
if permitted := checkReferenceGrant(tctx,
r.Client,
v1beta1.ReferenceGrantFrom{
Group: gatewayv1.GroupName,
Kind: KindTCPRoute,
Namespace: v1beta1.Namespace(trNN.Namespace),
},
gatewayv1.ObjectReference{
Group: corev1.GroupName,
Kind: KindService,
Name: gatewayv1.ObjectName(targetNN.Name),
Namespace: (*gatewayv1.Namespace)(&targetNN.Namespace),
},
); !permitted {
terr = types.ReasonError{
Reason: string(v1beta1.RouteReasonRefNotPermitted),
Message: fmt.Sprintf("%s is in a different namespace than the TCPRoute %s and no ReferenceGrant allowing reference is configured", targetNN, trNN),
}
continue
}
}
if service.Spec.Type == corev1.ServiceTypeExternalName {
tctx.Services[targetNN] = &service
continue
}
portExists := false
for _, port := range service.Spec.Ports {
if port.Port == int32(*backend.Port) {
portExists = true
break
}
}
if !portExists {
terr = fmt.Errorf("port %d not found in service %s", *backend.Port, targetNN.Name)
continue
}
tctx.Services[targetNN] = &service
endpointSliceList := new(discoveryv1.EndpointSliceList)
if err := r.List(tctx, endpointSliceList,
client.InNamespace(targetNN.Namespace),
client.MatchingLabels{
discoveryv1.LabelServiceName: targetNN.Name,
},
); err != nil {
r.Log.Error(err, "failed to list endpoint slices", "Service", targetNN)
terr = err
continue
}
tctx.EndpointSlices[targetNN] = endpointSliceList.Items
}
return terr
}
func (r *TCPRouteReconciler) listTCPRoutesForReferenceGrant(ctx context.Context, obj client.Object) (requests []reconcile.Request) {
grant, ok := obj.(*v1beta1.ReferenceGrant)
if !ok {
r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to ReferenceGrant")
return nil
}
var tcpRouteList gatewayv1alpha2.TCPRouteList
if err := r.List(ctx, &tcpRouteList); err != nil {
r.Log.Error(err, "failed to list tcproutes for reference ReferenceGrant", "ReferenceGrant", k8stypes.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()})
return nil
}
for _, tcpRoute := range tcpRouteList.Items {
tr := v1beta1.ReferenceGrantFrom{
Group: gatewayv1.GroupName,
Kind: KindTCPRoute,
Namespace: v1beta1.Namespace(tcpRoute.GetNamespace()),
}
for _, from := range grant.Spec.From {
if from == tr {
requests = append(requests, reconcile.Request{
NamespacedName: client.ObjectKey{
Namespace: tcpRoute.GetNamespace(),
Name: tcpRoute.GetName(),
},
})
}
}
}
return requests
}
func (r *TCPRouteReconciler) listTCPRoutesByServiceRef(ctx context.Context, obj client.Object) []reconcile.Request {
endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
r.Log.Error(fmt.Errorf("unexpected object type"), "failed to convert object to EndpointSlice")
return nil
}
namespace := endpointSlice.GetNamespace()
serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName]
trList := &gatewayv1alpha2.TCPRouteList{}
if err := r.List(ctx, trList, client.MatchingFields{
indexer.ServiceIndexRef: indexer.GenIndexKey(namespace, serviceName),
}); err != nil {
r.Log.Error(err, "failed to list tcproutes by service", "service", serviceName)
return nil
}
requests := make([]reconcile.Request, 0, len(trList.Items))
for _, tr := range trList.Items {
requests = append(requests, reconcile.Request{
NamespacedName: client.ObjectKey{
Namespace: tr.Namespace,
Name: tr.Name,
},
})
}
return requests
}