blob: e2ef06edf3b68bbe1cf6396024deea9d05d77833 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package status
import (
"context"
"sync"
"github.com/go-logr/logr"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
"github.com/apache/apisix-ingress-controller/api/v1alpha1"
v2 "github.com/apache/apisix-ingress-controller/api/v2"
types "github.com/apache/apisix-ingress-controller/internal/types"
pkgmetrics "github.com/apache/apisix-ingress-controller/pkg/metrics"
)
const UpdateChannelBufferSize = 10000
type Update struct {
NamespacedName k8stypes.NamespacedName
Resource client.Object
Mutator Mutator
}
type Mutator interface {
Mutate(obj client.Object) client.Object
}
type MutatorFunc func(client.Object) client.Object
func (m MutatorFunc) Mutate(obj client.Object) client.Object {
if m == nil {
return nil
}
return m(obj)
}
var cmpIgnoreLastTT = cmp.Options{
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
cmpopts.IgnoreMapEntries(func(k string, _ any) bool {
return k == "lastTransitionTime"
}),
}
type UpdateHandler struct {
log logr.Logger
client client.Client
updateChannel chan Update
wg *sync.WaitGroup
}
func NewStatusUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler {
u := &UpdateHandler{
log: log,
client: client,
updateChannel: make(chan Update, UpdateChannelBufferSize),
wg: new(sync.WaitGroup),
}
u.wg.Add(1)
return u
}
func (u *UpdateHandler) apply(ctx context.Context, update Update) {
if err := retry.OnError(retry.DefaultBackoff, func(err error) bool {
return k8serrors.IsConflict(err)
}, func() error {
return u.updateStatus(ctx, update)
}); err != nil {
u.log.Error(err, "unable to update status", "name", update.NamespacedName.Name,
"namespace", update.NamespacedName.Namespace)
}
}
func (u *UpdateHandler) updateStatus(ctx context.Context, update Update) error {
var obj = update.Resource
if err := u.client.Get(ctx, update.NamespacedName, obj); err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
newObj := update.Mutator.Mutate(obj)
if newObj == nil {
return nil
}
if statusEqual(obj, newObj, cmpIgnoreLastTT) {
u.log.V(1).Info("status is equal, skipping update", "name", update.NamespacedName.Name,
"namespace", update.NamespacedName.Namespace,
"kind", types.KindOf(obj))
return nil
}
newObj.SetUID(obj.GetUID())
u.log.V(1).Info("updating status", "name", update.NamespacedName.Name,
"namespace", update.NamespacedName.Namespace,
"kind", types.KindOf(newObj),
)
return u.client.Status().Update(ctx, newObj)
}
func (u *UpdateHandler) Start(ctx context.Context) error {
u.log.Info("started status update handler")
defer u.log.Info("stopped status update handler")
u.wg.Done()
for {
select {
case <-ctx.Done():
return nil
case update := <-u.updateChannel:
// Decrement queue length after removing item from queue
pkgmetrics.DecStatusQueueLength()
u.log.Info("received a status update", "namespace", update.NamespacedName.Namespace,
"name", update.NamespacedName.Name,
"kind", types.KindOf(update.Resource),
)
u.apply(ctx, update)
}
}
}
func (u *UpdateHandler) NeedsLeaderElection() bool {
return true
}
func (u *UpdateHandler) Writer() Updater {
return &UpdateWriter{
updateChannel: u.updateChannel,
wg: u.wg,
}
}
type Updater interface {
Update(u Update)
}
type UpdateWriter struct {
updateChannel chan<- Update
wg *sync.WaitGroup
}
func (u *UpdateWriter) Update(update Update) {
u.wg.Wait()
u.updateChannel <- update
// Increment queue length after adding new item
pkgmetrics.IncStatusQueueLength()
}
func statusEqual(a, b any, opts ...cmp.Option) bool {
var statusA, statusB any
switch a := a.(type) {
case *gatewayv1.GatewayClass:
b, ok := b.(*gatewayv1.GatewayClass)
if !ok {
return false
}
statusA, statusB = a.Status, b.Status
case *gatewayv1.Gateway:
b, ok := b.(*gatewayv1.Gateway)
if !ok {
return false
}
statusA, statusB = a.Status, b.Status
case *gatewayv1.HTTPRoute:
b, ok := b.(*gatewayv1.HTTPRoute)
if !ok {
return false
}
statusA, statusB = a.Status, b.Status
case *v2.ApisixRoute:
b, ok := b.(*v2.ApisixRoute)
if !ok {
return false
}
statusA, statusB = a.Status, b.Status
case *v2.ApisixGlobalRule:
b, ok := b.(*v2.ApisixGlobalRule)
if !ok {
return false
}
statusA, statusB = a.Status, b.Status
case *v2.ApisixPluginConfig:
b, ok := b.(*v2.ApisixPluginConfig)
if !ok {
return false
}
statusA, statusB = a.Status, b.Status
case *v2.ApisixTls:
b, ok := b.(*v2.ApisixTls)
if !ok {
return false
}
statusA, statusB = a.Status, b.Status
case *v2.ApisixConsumer:
b, ok := b.(*v2.ApisixConsumer)
if !ok {
return false
}
statusA, statusB = a.Status, b.Status
case *v1alpha1.HTTPRoutePolicy:
b, ok := b.(*v1alpha1.HTTPRoutePolicy)
if !ok {
return false
}
statusA, statusB = a.Status, b.Status
case *v1alpha1.BackendTrafficPolicy:
b, ok := b.(*v1alpha1.BackendTrafficPolicy)
if !ok {
return false
}
statusA, statusB = a.Status, b.Status
case *v1alpha1.Consumer:
b, ok := b.(*v1alpha1.Consumer)
if !ok {
return false
}
statusA, statusB = a.Status, b.Status
default:
return false
}
return cmp.Equal(statusA, statusB, opts...)
}