blob: 6ebe8c9655e2eb79bd1f54a1798c75f5c5d5e40e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package controller
import (
"context"
"fmt"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
"github.com/apache/apisix-ingress-controller/api/v1alpha1"
"github.com/apache/apisix-ingress-controller/internal/controller/config"
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
"github.com/apache/apisix-ingress-controller/internal/controller/status"
"github.com/apache/apisix-ingress-controller/internal/manager/readiness"
"github.com/apache/apisix-ingress-controller/internal/provider"
internaltypes "github.com/apache/apisix-ingress-controller/internal/types"
"github.com/apache/apisix-ingress-controller/internal/utils"
)
// ConsumerReconciler reconciles a Gateway object.
type ConsumerReconciler struct { //nolint:revive
client.Client
Scheme *runtime.Scheme
Log logr.Logger
Provider provider.Provider
Updater status.Updater
Readier readiness.ReadinessManager
}
// SetupWithManager sets up the controller with the Manager.
func (r *ConsumerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.Consumer{},
builder.WithPredicates(
predicate.NewPredicateFuncs(r.checkGatewayRef),
),
).
WithEventFilter(
predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.NewPredicateFuncs(TypePredicate[*corev1.Secret]()),
),
).
Watches(&gatewayv1.Gateway{},
handler.EnqueueRequestsFromMapFunc(r.listConsumersForGateway),
builder.WithPredicates(
predicate.Funcs{
GenericFunc: func(e event.GenericEvent) bool {
return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
return false
},
CreateFunc: func(e event.CreateEvent) bool {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
return true
},
},
),
).
Watches(&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(r.listConsumersForSecret),
).
Watches(&v1alpha1.GatewayProxy{},
handler.EnqueueRequestsFromMapFunc(r.listConsumersForGatewayProxy),
).
Complete(r)
}
func (r *ConsumerReconciler) listConsumersForSecret(ctx context.Context, obj client.Object) []reconcile.Request {
secret, ok := obj.(*corev1.Secret)
if !ok {
r.Log.Error(nil, "failed to convert to Secret", "object", obj)
return nil
}
return ListRequests(
ctx,
r.Client,
r.Log,
&v1alpha1.ConsumerList{},
client.MatchingFields{
indexer.SecretIndexRef: indexer.GenIndexKey(secret.GetNamespace(), secret.GetName()),
},
)
}
func (r *ConsumerReconciler) listConsumersForGateway(ctx context.Context, obj client.Object) []reconcile.Request {
gateway, ok := obj.(*gatewayv1.Gateway)
if !ok {
r.Log.Error(nil, "failed to convert to Gateway", "object", obj)
return nil
}
consumerList := &v1alpha1.ConsumerList{}
if err := r.List(ctx, consumerList, client.MatchingFields{
indexer.ConsumerGatewayRef: indexer.GenIndexKey(gateway.GetNamespace(), gateway.GetName()),
}); err != nil {
r.Log.Error(err, "failed to list consumers")
return nil
}
requests := make([]reconcile.Request, 0, len(consumerList.Items))
for _, consumer := range consumerList.Items {
requests = append(requests, reconcile.Request{
NamespacedName: client.ObjectKey{
Name: consumer.Name,
Namespace: consumer.Namespace,
},
})
}
return requests
}
func (r *ConsumerReconciler) listConsumersForGatewayProxy(ctx context.Context, obj client.Object) []reconcile.Request {
gatewayProxy, ok := obj.(*v1alpha1.GatewayProxy)
if !ok {
r.Log.Error(nil, "failed to convert to GatewayProxy", "object", obj)
return nil
}
namespace := gatewayProxy.GetNamespace()
name := gatewayProxy.GetName()
// find all gateways that reference this gateway proxy
gatewayList := &gatewayv1.GatewayList{}
if err := r.List(ctx, gatewayList, client.MatchingFields{
indexer.ParametersRef: indexer.GenIndexKey(namespace, name),
}); err != nil {
r.Log.Error(err, "failed to list gateways for gateway proxy", "gatewayproxy", gatewayProxy.GetName())
return nil
}
var requests []reconcile.Request
for _, gateway := range gatewayList.Items {
consumerList := &v1alpha1.ConsumerList{}
if err := r.List(ctx, consumerList, client.MatchingFields{
indexer.ConsumerGatewayRef: indexer.GenIndexKey(gateway.Namespace, gateway.Name),
}); err != nil {
r.Log.Error(err, "failed to list consumers for gateway", "gateway", gateway.Name)
continue
}
for _, consumer := range consumerList.Items {
requests = append(requests, reconcile.Request{
NamespacedName: client.ObjectKey{
Namespace: consumer.Namespace,
Name: consumer.Name,
},
})
}
}
return requests
}
func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
defer r.Readier.Done(&v1alpha1.Consumer{}, req.NamespacedName)
consumer := new(v1alpha1.Consumer)
if err := r.Get(ctx, req.NamespacedName, consumer); err != nil {
if client.IgnoreNotFound(err) == nil {
consumer.Namespace = req.Namespace
consumer.Name = req.Name
consumer.TypeMeta = metav1.TypeMeta{
Kind: internaltypes.KindConsumer,
APIVersion: v1alpha1.GroupVersion.String(),
}
if err := r.Provider.Delete(ctx, consumer); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
var statusErr error
tctx := provider.NewDefaultTranslateContext(ctx)
gateway, err := r.getGateway(ctx, consumer)
if err != nil {
r.Log.V(1).Info("no matching Gateway available",
"gatewayRef", consumer.Spec.GatewayRef,
"error", err.Error())
return ctrl.Result{}, nil
}
rk := utils.NamespacedNameKind(consumer)
if err := ProcessGatewayProxy(r.Client, r.Log, tctx, gateway, rk); err != nil {
r.Log.Error(err, "failed to process gateway proxy", "gateway", gateway)
statusErr = err
}
if err := r.processSpec(ctx, tctx, consumer); err != nil {
r.Log.Error(err, "failed to process consumer spec", "consumer", consumer)
statusErr = err
}
if err := r.Provider.Update(ctx, tctx, consumer); err != nil {
r.Log.Error(err, "failed to update consumer", "consumer", consumer)
statusErr = err
}
r.updateStatus(consumer, statusErr)
return ctrl.Result{}, nil
}
func (r *ConsumerReconciler) processSpec(ctx context.Context, tctx *provider.TranslateContext, consumer *v1alpha1.Consumer) error {
for _, credential := range consumer.Spec.Credentials {
if credential.SecretRef == nil {
continue
}
ns := consumer.GetNamespace()
if credential.SecretRef.Namespace != nil {
ns = *credential.SecretRef.Namespace
}
secret := corev1.Secret{}
if err := r.Get(ctx, client.ObjectKey{
Name: credential.SecretRef.Name,
Namespace: ns,
}, &secret); err != nil {
if client.IgnoreNotFound(err) == nil {
continue
}
r.Log.Error(err, "failed to get secret", "secret", credential.SecretRef.Name)
return err
}
tctx.Secrets[types.NamespacedName{
Namespace: ns,
Name: credential.SecretRef.Name,
}] = &secret
}
return nil
}
func (r *ConsumerReconciler) updateStatus(consumer *v1alpha1.Consumer, err error) {
condition := NewCondition(consumer.Generation, true, "Successfully")
if err != nil {
condition = NewCondition(consumer.Generation, false, err.Error())
}
if !VerifyConditions(&consumer.Status.Conditions, condition) {
return
}
meta.SetStatusCondition(&consumer.Status.Conditions, condition)
r.Updater.Update(status.Update{
NamespacedName: utils.NamespacedName(consumer),
Resource: consumer.DeepCopy(),
Mutator: status.MutatorFunc(func(obj client.Object) client.Object {
cp := obj.(*v1alpha1.Consumer).DeepCopy()
cp.Status = consumer.Status
return cp
}),
})
}
func (r *ConsumerReconciler) getGateway(ctx context.Context, consumer *v1alpha1.Consumer) (*gatewayv1.Gateway, error) {
ns := consumer.GetNamespace()
if consumer.Spec.GatewayRef.Namespace != nil {
ns = *consumer.Spec.GatewayRef.Namespace
}
gateway := &gatewayv1.Gateway{}
if err := r.Get(ctx, client.ObjectKey{
Name: consumer.Spec.GatewayRef.Name,
Namespace: ns,
}, gateway); err != nil {
return nil, fmt.Errorf("failed to get gateway %s/%s: %w", ns, consumer.Spec.GatewayRef.Name, err)
}
gatewayClass := gatewayv1.GatewayClass{}
if err := r.Get(ctx, client.ObjectKey{
Name: string(gateway.Spec.GatewayClassName),
}, &gatewayClass); err != nil {
return nil, fmt.Errorf("failed to retrieve gatewayclass for gateway: %w", err)
}
if string(gatewayClass.Spec.ControllerName) != config.ControllerConfig.ControllerName {
return nil, fmt.Errorf("gateway %s/%s is not managed by this controller", gateway.Namespace, gateway.Name)
}
return gateway, nil
}
func (r *ConsumerReconciler) checkGatewayRef(object client.Object) bool {
consumer, ok := object.(*v1alpha1.Consumer)
if !ok {
return false
}
return MatchConsumerGatewayRef(context.Background(), r.Client, r.Log, consumer)
}