blob: 2af856d8bf9d6215b836bdf3ab0e75b2079d2ee6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ingress
import (
"context"
"fmt"
"os"
"sync"
"time"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
listerscorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"github.com/apache/apisix-ingress-controller/pkg/api"
"github.com/apache/apisix-ingress-controller/pkg/apisix"
apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/kube"
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
apisixscheme "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/scheme"
listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
listersv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2alpha1"
"github.com/apache/apisix-ingress-controller/pkg/kube/translation"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/metrics"
"github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
const (
// _component is used for event component
_component = "ApisixIngress"
// _resourceSynced is used when a resource is synced successfully
_resourceSynced = "ResourcesSynced"
// _messageResourceSynced is used to specify controller
_messageResourceSynced = "%s synced successfully"
// _resourceSyncAborted is used when a resource synced failed
_resourceSyncAborted = "ResourceSyncAborted"
// _messageResourceFailed is used to report error
_messageResourceFailed = "%s synced failed, with error: %s"
)
// Controller is the ingress apisix controller object.
type Controller struct {
name string
namespace string
cfg *config.Config
wg sync.WaitGroup
watchingNamespace map[string]struct{}
apisix apisix.APISIX
podCache types.PodCache
translator translation.Translator
apiServer *api.Server
metricsCollector metrics.Collector
kubeClient *kube.KubeClient
// recorder event
recorder record.EventRecorder
// this map enrolls which ApisixTls objects refer to a Kubernetes
// Secret object.
secretSSLMap *sync.Map
// leaderContextCancelFunc will be called when apisix-ingress-controller
// decides to give up its leader role.
leaderContextCancelFunc context.CancelFunc
// common informers and listers
podInformer cache.SharedIndexInformer
podLister listerscorev1.PodLister
epInformer cache.SharedIndexInformer
epLister kube.EndpointLister
svcInformer cache.SharedIndexInformer
svcLister listerscorev1.ServiceLister
ingressLister kube.IngressLister
ingressInformer cache.SharedIndexInformer
secretInformer cache.SharedIndexInformer
secretLister listerscorev1.SecretLister
apisixUpstreamInformer cache.SharedIndexInformer
apisixUpstreamLister listersv1.ApisixUpstreamLister
apisixRouteLister kube.ApisixRouteLister
apisixRouteInformer cache.SharedIndexInformer
apisixTlsLister listersv1.ApisixTlsLister
apisixTlsInformer cache.SharedIndexInformer
apisixClusterConfigLister listersv2alpha1.ApisixClusterConfigLister
apisixClusterConfigInformer cache.SharedIndexInformer
apisixConsumerInformer cache.SharedIndexInformer
apisixConsumerLister listersv2alpha1.ApisixConsumerLister
// resource controllers
podController *podController
endpointsController *endpointsController
endpointSliceController *endpointSliceController
ingressController *ingressController
secretController *secretController
apisixUpstreamController *apisixUpstreamController
apisixRouteController *apisixRouteController
apisixTlsController *apisixTlsController
apisixClusterConfigController *apisixClusterConfigController
apisixConsumerController *apisixConsumerController
}
// NewController creates an ingress apisix controller object.
func NewController(cfg *config.Config) (*Controller, error) {
podName := os.Getenv("POD_NAME")
podNamespace := os.Getenv("POD_NAMESPACE")
if podNamespace == "" {
podNamespace = "default"
}
client, err := apisix.NewClient()
if err != nil {
return nil, err
}
kubeClient, err := kube.NewKubeClient(cfg)
if err != nil {
return nil, err
}
apiSrv, err := api.NewServer(cfg)
if err != nil {
return nil, err
}
var (
watchingNamespace map[string]struct{}
)
if len(cfg.Kubernetes.AppNamespaces) > 1 || cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll {
watchingNamespace = make(map[string]struct{}, len(cfg.Kubernetes.AppNamespaces))
for _, ns := range cfg.Kubernetes.AppNamespaces {
watchingNamespace[ns] = struct{}{}
}
}
// recorder
utilruntime.Must(apisixscheme.AddToScheme(scheme.Scheme))
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.Client.CoreV1().Events("")})
c := &Controller{
name: podName,
namespace: podNamespace,
cfg: cfg,
apiServer: apiSrv,
apisix: client,
metricsCollector: metrics.NewPrometheusCollector(),
kubeClient: kubeClient,
watchingNamespace: watchingNamespace,
secretSSLMap: new(sync.Map),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),
podCache: types.NewPodCache(),
}
return c, nil
}
func (c *Controller) initWhenStartLeading() {
var (
ingressInformer cache.SharedIndexInformer
apisixRouteInformer cache.SharedIndexInformer
)
kubeFactory := c.kubeClient.NewSharedIndexInformerFactory()
apisixFactory := c.kubeClient.NewAPISIXSharedIndexInformerFactory()
c.podLister = kubeFactory.Core().V1().Pods().Lister()
c.epLister, c.epInformer = kube.NewEndpointListerAndInformer(kubeFactory, c.cfg.Kubernetes.WatchEndpointSlices)
c.svcLister = kubeFactory.Core().V1().Services().Lister()
c.ingressLister = kube.NewIngressLister(
kubeFactory.Networking().V1().Ingresses().Lister(),
kubeFactory.Networking().V1beta1().Ingresses().Lister(),
kubeFactory.Extensions().V1beta1().Ingresses().Lister(),
)
c.secretLister = kubeFactory.Core().V1().Secrets().Lister()
c.apisixRouteLister = kube.NewApisixRouteLister(
apisixFactory.Apisix().V1().ApisixRoutes().Lister(),
apisixFactory.Apisix().V2alpha1().ApisixRoutes().Lister(),
apisixFactory.Apisix().V2beta1().ApisixRoutes().Lister(),
)
c.apisixUpstreamLister = apisixFactory.Apisix().V1().ApisixUpstreams().Lister()
c.apisixTlsLister = apisixFactory.Apisix().V1().ApisixTlses().Lister()
c.apisixClusterConfigLister = apisixFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister()
c.apisixConsumerLister = apisixFactory.Apisix().V2alpha1().ApisixConsumers().Lister()
c.translator = translation.NewTranslator(&translation.TranslatorOptions{
PodCache: c.podCache,
PodLister: c.podLister,
EndpointLister: c.epLister,
ServiceLister: c.svcLister,
ApisixUpstreamLister: c.apisixUpstreamLister,
SecretLister: c.secretLister,
UseEndpointSlices: c.cfg.Kubernetes.WatchEndpointSlices,
})
if c.cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1 {
ingressInformer = kubeFactory.Networking().V1().Ingresses().Informer()
} else if c.cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1beta1 {
ingressInformer = kubeFactory.Networking().V1beta1().Ingresses().Informer()
} else {
ingressInformer = kubeFactory.Extensions().V1beta1().Ingresses().Informer()
}
switch c.cfg.Kubernetes.ApisixRouteVersion {
case config.ApisixRouteV1:
apisixRouteInformer = apisixFactory.Apisix().V1().ApisixRoutes().Informer()
case config.ApisixRouteV2alpha1:
apisixRouteInformer = apisixFactory.Apisix().V2alpha1().ApisixRoutes().Informer()
case config.ApisixRouteV2beta1:
apisixRouteInformer = apisixFactory.Apisix().V2beta1().ApisixRoutes().Informer()
}
c.podInformer = kubeFactory.Core().V1().Pods().Informer()
c.svcInformer = kubeFactory.Core().V1().Services().Informer()
c.ingressInformer = ingressInformer
c.apisixRouteInformer = apisixRouteInformer
c.apisixUpstreamInformer = apisixFactory.Apisix().V1().ApisixUpstreams().Informer()
c.apisixClusterConfigInformer = apisixFactory.Apisix().V2alpha1().ApisixClusterConfigs().Informer()
c.secretInformer = kubeFactory.Core().V1().Secrets().Informer()
c.apisixTlsInformer = apisixFactory.Apisix().V1().ApisixTlses().Informer()
c.apisixConsumerInformer = apisixFactory.Apisix().V2alpha1().ApisixConsumers().Informer()
if c.cfg.Kubernetes.WatchEndpointSlices {
c.endpointSliceController = c.newEndpointSliceController()
} else {
c.endpointsController = c.newEndpointsController()
}
c.podController = c.newPodController()
c.apisixUpstreamController = c.newApisixUpstreamController()
c.ingressController = c.newIngressController()
c.apisixRouteController = c.newApisixRouteController()
c.apisixClusterConfigController = c.newApisixClusterConfigController()
c.apisixTlsController = c.newApisixTlsController()
c.secretController = c.newSecretController()
c.apisixConsumerController = c.newApisixConsumerController()
}
// recorderEvent recorder events for resources
func (c *Controller) recorderEvent(object runtime.Object, eventtype, reason string, err error) {
if err != nil {
message := fmt.Sprintf(_messageResourceFailed, _component, err.Error())
c.recorder.Event(object, eventtype, reason, message)
} else {
message := fmt.Sprintf(_messageResourceSynced, _component)
c.recorder.Event(object, eventtype, reason, message)
}
}
// recorderEvent recorder events for resources
func (c *Controller) recorderEventS(object runtime.Object, eventtype, reason string, msg string) {
c.recorder.Event(object, eventtype, reason, msg)
}
func (c *Controller) goAttach(handler func()) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
handler()
}()
}
// Eventf implements the resourcelock.EventRecorder interface.
func (c *Controller) Eventf(_ runtime.Object, eventType string, reason string, message string, _ ...interface{}) {
log.Infow(reason, zap.String("message", message), zap.String("event_type", eventType))
}
// Run launches the controller.
func (c *Controller) Run(stop chan struct{}) error {
rootCtx, rootCancel := context.WithCancel(context.Background())
defer rootCancel()
go func() {
<-stop
rootCancel()
}()
c.metricsCollector.ResetLeader(false)
go func() {
if err := c.apiServer.Run(rootCtx.Done()); err != nil {
log.Errorf("failed to launch API Server: %s", err)
}
}()
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Namespace: c.namespace,
Name: c.cfg.Kubernetes.ElectionID,
},
Client: c.kubeClient.Client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: c.name,
EventRecorder: c,
},
}
cfg := leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 5 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: c.run,
OnNewLeader: func(identity string) {
log.Warnf("found a new leader %s", identity)
if identity != c.name {
log.Infow("controller now is running as a candidate",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
}
},
OnStoppedLeading: func() {
log.Infow("controller now is running as a candidate",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
c.metricsCollector.ResetLeader(false)
},
},
// Set it to false as current leaderelection implementation will report
// "Failed to release lock: resource name may not be empty" error when
// ReleaseOnCancel is true and the Run context is cancelled.
ReleaseOnCancel: false,
Name: "ingress-apisix",
}
elector, err := leaderelection.NewLeaderElector(cfg)
if err != nil {
log.Errorf("failed to create leader elector: %s", err.Error())
return err
}
election:
curCtx, cancel := context.WithCancel(rootCtx)
c.leaderContextCancelFunc = cancel
elector.Run(curCtx)
select {
case <-rootCtx.Done():
return nil
default:
goto election
}
}
func (c *Controller) run(ctx context.Context) {
log.Infow("controller tries to leading ...",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
var cancelFunc context.CancelFunc
ctx, cancelFunc = context.WithCancel(ctx)
defer cancelFunc()
// give up leader
defer c.leaderContextCancelFunc()
clusterOpts := &apisix.ClusterOptions{
Name: c.cfg.APISIX.DefaultClusterName,
AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
BaseURL: c.cfg.APISIX.DefaultClusterBaseURL,
}
err := c.apisix.AddCluster(ctx, clusterOpts)
if err != nil && err != apisix.ErrDuplicatedCluster {
// TODO give up the leader role
log.Errorf("failed to add default cluster: %s", err)
return
}
if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
// TODO give up the leader role
log.Errorf("failed to wait the default cluster to be ready: %s", err)
// re-create apisix cluster, used in next c.run
if err = c.apisix.UpdateCluster(ctx, clusterOpts); err != nil {
log.Errorf("failed to update default cluster: %s", err)
return
}
return
}
c.initWhenStartLeading()
c.goAttach(func() {
c.checkClusterHealth(ctx, cancelFunc)
})
c.goAttach(func() {
c.podInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.epInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.svcInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.ingressInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.apisixRouteInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.apisixUpstreamInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.apisixClusterConfigInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.secretInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.apisixTlsInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.apisixConsumerInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.podController.run(ctx)
})
c.goAttach(func() {
if c.cfg.Kubernetes.WatchEndpointSlices {
c.endpointSliceController.run(ctx)
} else {
c.endpointsController.run(ctx)
}
})
c.goAttach(func() {
c.apisixUpstreamController.run(ctx)
})
c.goAttach(func() {
c.ingressController.run(ctx)
})
c.goAttach(func() {
c.apisixRouteController.run(ctx)
})
c.goAttach(func() {
c.apisixClusterConfigController.run(ctx)
})
c.goAttach(func() {
c.apisixTlsController.run(ctx)
})
c.goAttach(func() {
c.secretController.run(ctx)
})
c.goAttach(func() {
c.apisixConsumerController.run(ctx)
})
c.metricsCollector.ResetLeader(true)
log.Infow("controller now is running as leader",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
<-ctx.Done()
c.wg.Wait()
}
// namespaceWatching accepts a resource key, getting the namespace part
// and checking whether the namespace is being watched.
func (c *Controller) namespaceWatching(key string) (ok bool) {
if c.watchingNamespace == nil {
ok = true
return
}
ns, _, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
// Ignore resource with invalid key.
ok = false
log.Warnf("resource %s was ignored since: %s", key, err)
return
}
_, ok = c.watchingNamespace[ns]
return
}
func (c *Controller) syncSSL(ctx context.Context, ssl *apisixv1.Ssl, event types.EventType) error {
var (
err error
)
clusterName := c.cfg.APISIX.DefaultClusterName
if event == types.EventDelete {
err = c.apisix.Cluster(clusterName).SSL().Delete(ctx, ssl)
} else if event == types.EventUpdate {
_, err = c.apisix.Cluster(clusterName).SSL().Update(ctx, ssl)
} else {
_, err = c.apisix.Cluster(clusterName).SSL().Create(ctx, ssl)
}
return err
}
func (c *Controller) syncConsumer(ctx context.Context, consumer *apisixv1.Consumer, event types.EventType) (err error) {
clusterName := c.cfg.APISIX.DefaultClusterName
if event == types.EventDelete {
err = c.apisix.Cluster(clusterName).Consumer().Delete(ctx, consumer)
} else if event == types.EventUpdate {
_, err = c.apisix.Cluster(clusterName).Consumer().Update(ctx, consumer)
} else {
_, err = c.apisix.Cluster(clusterName).Consumer().Create(ctx, consumer)
}
return
}
func (c *Controller) syncEndpoint(ctx context.Context, ep kube.Endpoint) error {
namespace := ep.Namespace()
svcName := ep.ServiceName()
svc, err := c.svcLister.Services(ep.Namespace()).Get(svcName)
if err != nil {
if k8serrors.IsNotFound(err) {
log.Infof("service %s/%s not found", ep.Namespace(), svcName)
return nil
}
log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), svcName, err)
return err
}
var subsets []configv1.ApisixUpstreamSubset
subsets = append(subsets, configv1.ApisixUpstreamSubset{})
au, err := c.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName)
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorf("failed to get ApisixUpstream %s/%s: %s", ep.Namespace(), svcName, err)
return err
}
} else if len(au.Spec.Subsets) > 0 {
subsets = append(subsets, au.Spec.Subsets...)
}
clusters := c.apisix.ListClusters()
for _, port := range svc.Spec.Ports {
for _, subset := range subsets {
nodes, err := c.translator.TranslateUpstreamNodes(ep, port.Port, subset.Labels)
if err != nil {
log.Errorw("failed to translate upstream nodes",
zap.Error(err),
zap.Any("endpoints", ep),
zap.Int32("port", port.Port),
)
}
name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port)
for _, cluster := range clusters {
if err := c.syncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {
return err
}
}
}
}
return nil
}
func (c *Controller) syncUpstreamNodesChangeToCluster(ctx context.Context, cluster apisix.Cluster, nodes apisixv1.UpstreamNodes, upsName string) error {
upstream, err := cluster.Upstream().Get(ctx, upsName)
if err != nil {
if err == apisixcache.ErrNotFound {
log.Warnw("upstream is not referenced",
zap.String("cluster", cluster.String()),
zap.String("upstream", upsName),
)
return nil
} else {
log.Errorw("failed to get upstream",
zap.String("upstream", upsName),
zap.String("cluster", cluster.String()),
zap.Error(err),
)
return err
}
}
upstream.Nodes = nodes
log.Debugw("upstream binds new nodes",
zap.Any("upstream", upstream),
zap.String("cluster", cluster.String()),
)
updated := &manifest{
upstreams: []*apisixv1.Upstream{upstream},
}
return c.syncManifests(ctx, nil, updated, nil)
}
func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.CancelFunc) {
defer cancelFunc()
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HealthCheck(ctx)
if err != nil {
// Finally failed health check, then give up leader.
log.Warnf("failed to check health for default cluster: %s, give up leader", err)
return
}
log.Debugf("success check health for default cluster")
c.metricsCollector.IncrCheckClusterHealth(c.name)
}
}