blob: 592d49cbe4de6cd2b27ef30040f494333e2425b2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package providers
import (
"context"
"fmt"
"os"
"time"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
extensionsv1beta1 "k8s.io/client-go/listers/extensions/v1beta1"
networkingv1 "k8s.io/client-go/listers/networking/v1"
networkingv1beta1 "k8s.io/client-go/listers/networking/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"github.com/apache/apisix-ingress-controller/pkg/api"
"github.com/apache/apisix-ingress-controller/pkg/apisix"
"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/kube"
apisixscheme "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/scheme"
v2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2"
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2beta3"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/metrics"
apisixprovider "github.com/apache/apisix-ingress-controller/pkg/providers/apisix"
apisixtranslation "github.com/apache/apisix-ingress-controller/pkg/providers/apisix/translation"
"github.com/apache/apisix-ingress-controller/pkg/providers/gateway"
ingressprovider "github.com/apache/apisix-ingress-controller/pkg/providers/ingress"
"github.com/apache/apisix-ingress-controller/pkg/providers/k8s"
"github.com/apache/apisix-ingress-controller/pkg/providers/k8s/namespace"
"github.com/apache/apisix-ingress-controller/pkg/providers/k8s/pod"
"github.com/apache/apisix-ingress-controller/pkg/providers/translation"
providertypes "github.com/apache/apisix-ingress-controller/pkg/providers/types"
"github.com/apache/apisix-ingress-controller/pkg/providers/utils"
)
const (
// _component is used for event component
_component = "ApisixIngress"
// minimum interval for ingress sync to APISIX
_mininumApisixResourceSyncInterval = 60 * time.Second
)
// Controller is the ingress apisix controller object.
type Controller struct {
name string
namespace string
cfg *config.Config
apisix apisix.APISIX
apiServer *api.Server
MetricsCollector metrics.Collector
kubeClient *kube.KubeClient
// recorder event
recorder record.EventRecorder
// leaderContextCancelFunc will be called when apisix-ingress-controller
// decides to give up its leader role.
leaderContextCancelFunc context.CancelFunc
translator translation.Translator
apisixTranslator apisixtranslation.ApisixTranslator
informers *providertypes.ListerInformer
namespaceProvider namespace.WatchingNamespaceProvider
podProvider pod.Provider
kubeProvider k8s.Provider
gatewayProvider *gateway.Provider
apisixProvider apisixprovider.Provider
ingressProvider ingressprovider.Provider
}
// NewController creates an ingress apisix controller object.
func NewController(cfg *config.Config) (*Controller, error) {
podName := os.Getenv("POD_NAME")
podNamespace := os.Getenv("POD_NAMESPACE")
if podNamespace == "" {
podNamespace = "default"
}
client, err := apisix.NewClient(cfg.APISIX.AdminAPIVersion)
if err != nil {
return nil, err
}
kubeClient, err := kube.NewKubeClient(cfg)
if err != nil {
return nil, err
}
apiSrv, err := api.NewServer(cfg)
if err != nil {
return nil, err
}
// recorder
utilruntime.Must(apisixscheme.AddToScheme(scheme.Scheme))
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.Client.CoreV1().Events("")})
c := &Controller{
name: podName,
namespace: podNamespace,
cfg: cfg,
apiServer: apiSrv,
apisix: client,
MetricsCollector: metrics.NewPrometheusCollector(),
kubeClient: kubeClient,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),
}
return c, nil
}
// Eventf implements the resourcelock.EventRecorder interface.
func (c *Controller) Eventf(_ runtime.Object, eventType string, reason string, message string, _ ...interface{}) {
log.Infow(reason, zap.String("message", message), zap.String("event_type", eventType))
}
// Run launches the controller.
func (c *Controller) Run(stop chan struct{}) error {
rootCtx, rootCancel := context.WithCancel(context.Background())
defer rootCancel()
go func() {
<-stop
rootCancel()
}()
c.MetricsCollector.ResetLeader(false)
go func() {
if err := c.apiServer.Run(rootCtx.Done()); err != nil {
log.Errorf("failed to launch API Server: %s", err)
}
}()
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Namespace: c.namespace,
Name: c.cfg.Kubernetes.ElectionID,
},
Client: c.kubeClient.Client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: c.name,
EventRecorder: c,
},
}
cfg := leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 5 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: c.run,
OnNewLeader: func(identity string) {
log.Warnf("found a new leader %s", identity)
if identity != c.name {
log.Infow("controller now is running as a candidate",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
c.MetricsCollector.ResetLeader(false)
// delete the old APISIX cluster, so that the cached state
// like synchronization won't be used next time the candidate
// becomes the leader again.
c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName)
}
},
OnStoppedLeading: func() {
log.Infow("controller now is running as a candidate",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
c.MetricsCollector.ResetLeader(false)
// delete the old APISIX cluster, so that the cached state
// like synchronization won't be used next time the candidate
// becomes the leader again.
c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName)
},
},
ReleaseOnCancel: true,
Name: "ingress-apisix",
}
elector, err := leaderelection.NewLeaderElector(cfg)
if err != nil {
log.Errorf("failed to create leader elector: %s", err.Error())
return err
}
election:
curCtx, cancel := context.WithCancel(rootCtx)
c.leaderContextCancelFunc = cancel
elector.Run(curCtx)
select {
case <-rootCtx.Done():
return nil
default:
goto election
}
}
func (c *Controller) initSharedInformers() *providertypes.ListerInformer {
kubeFactory := c.kubeClient.NewSharedIndexInformerFactory()
apisixFactory := c.kubeClient.NewAPISIXSharedIndexInformerFactory()
var (
ingressInformer cache.SharedIndexInformer
ingressListerV1 networkingv1.IngressLister
ingressListerV1beta1 networkingv1beta1.IngressLister
ingressListerExtensionsV1beta1 extensionsv1beta1.IngressLister
)
var (
apisixUpstreamInformer cache.SharedIndexInformer
apisixRouteInformer cache.SharedIndexInformer
apisixPluginConfigInformer cache.SharedIndexInformer
apisixConsumerInformer cache.SharedIndexInformer
apisixTlsInformer cache.SharedIndexInformer
apisixClusterConfigInformer cache.SharedIndexInformer
apisixRouteListerV2beta3 v2beta3.ApisixRouteLister
apisixUpstreamListerV2beta3 v2beta3.ApisixUpstreamLister
apisixTlsListerV2beta3 v2beta3.ApisixTlsLister
apisixClusterConfigListerV2beta3 v2beta3.ApisixClusterConfigLister
apisixConsumerListerV2beta3 v2beta3.ApisixConsumerLister
apisixPluginConfigListerV2beta3 v2beta3.ApisixPluginConfigLister
apisixRouteListerV2 v2.ApisixRouteLister
apisixUpstreamListerV2 v2.ApisixUpstreamLister
apisixTlsListerV2 v2.ApisixTlsLister
apisixClusterConfigListerV2 v2.ApisixClusterConfigLister
apisixConsumerListerV2 v2.ApisixConsumerLister
apisixPluginConfigListerV2 v2.ApisixPluginConfigLister
)
switch c.cfg.Kubernetes.APIVersion {
case config.ApisixV2beta3:
apisixRouteInformer = apisixFactory.Apisix().V2beta3().ApisixRoutes().Informer()
apisixTlsInformer = apisixFactory.Apisix().V2beta3().ApisixTlses().Informer()
apisixClusterConfigInformer = apisixFactory.Apisix().V2beta3().ApisixClusterConfigs().Informer()
apisixConsumerInformer = apisixFactory.Apisix().V2beta3().ApisixConsumers().Informer()
apisixPluginConfigInformer = apisixFactory.Apisix().V2beta3().ApisixPluginConfigs().Informer()
apisixUpstreamInformer = apisixFactory.Apisix().V2beta3().ApisixUpstreams().Informer()
apisixRouteListerV2beta3 = apisixFactory.Apisix().V2beta3().ApisixRoutes().Lister()
apisixUpstreamListerV2beta3 = apisixFactory.Apisix().V2beta3().ApisixUpstreams().Lister()
apisixTlsListerV2beta3 = apisixFactory.Apisix().V2beta3().ApisixTlses().Lister()
apisixClusterConfigListerV2beta3 = apisixFactory.Apisix().V2beta3().ApisixClusterConfigs().Lister()
apisixConsumerListerV2beta3 = apisixFactory.Apisix().V2beta3().ApisixConsumers().Lister()
apisixPluginConfigListerV2beta3 = apisixFactory.Apisix().V2beta3().ApisixPluginConfigs().Lister()
case config.ApisixV2:
apisixRouteInformer = apisixFactory.Apisix().V2().ApisixRoutes().Informer()
apisixTlsInformer = apisixFactory.Apisix().V2().ApisixTlses().Informer()
apisixClusterConfigInformer = apisixFactory.Apisix().V2().ApisixClusterConfigs().Informer()
apisixConsumerInformer = apisixFactory.Apisix().V2().ApisixConsumers().Informer()
apisixPluginConfigInformer = apisixFactory.Apisix().V2().ApisixPluginConfigs().Informer()
apisixUpstreamInformer = apisixFactory.Apisix().V2().ApisixUpstreams().Informer()
apisixRouteListerV2 = apisixFactory.Apisix().V2().ApisixRoutes().Lister()
apisixUpstreamListerV2 = apisixFactory.Apisix().V2().ApisixUpstreams().Lister()
apisixTlsListerV2 = apisixFactory.Apisix().V2().ApisixTlses().Lister()
apisixClusterConfigListerV2 = apisixFactory.Apisix().V2().ApisixClusterConfigs().Lister()
apisixConsumerListerV2 = apisixFactory.Apisix().V2().ApisixConsumers().Lister()
apisixPluginConfigListerV2 = apisixFactory.Apisix().V2().ApisixPluginConfigs().Lister()
default:
panic(fmt.Errorf("unsupported API version %v", c.cfg.Kubernetes.APIVersion))
}
apisixUpstreamLister := kube.NewApisixUpstreamLister(apisixUpstreamListerV2beta3, apisixUpstreamListerV2)
apisixRouteLister := kube.NewApisixRouteLister(apisixRouteListerV2beta3, apisixRouteListerV2)
apisixTlsLister := kube.NewApisixTlsLister(apisixTlsListerV2beta3, apisixTlsListerV2)
apisixClusterConfigLister := kube.NewApisixClusterConfigLister(apisixClusterConfigListerV2beta3, apisixClusterConfigListerV2)
apisixConsumerLister := kube.NewApisixConsumerLister(apisixConsumerListerV2beta3, apisixConsumerListerV2)
apisixPluginConfigLister := kube.NewApisixPluginConfigLister(apisixPluginConfigListerV2beta3, apisixPluginConfigListerV2)
epLister, epInformer := kube.NewEndpointListerAndInformer(kubeFactory, c.cfg.Kubernetes.WatchEndpointSlices)
svcInformer := kubeFactory.Core().V1().Services().Informer()
svcLister := kubeFactory.Core().V1().Services().Lister()
podInformer := kubeFactory.Core().V1().Pods().Informer()
podLister := kubeFactory.Core().V1().Pods().Lister()
secretInformer := kubeFactory.Core().V1().Secrets().Informer()
secretLister := kubeFactory.Core().V1().Secrets().Lister()
configmapInformer := kubeFactory.Core().V1().ConfigMaps().Informer()
configmapLister := kubeFactory.Core().V1().ConfigMaps().Lister()
switch c.cfg.Kubernetes.IngressVersion {
case config.IngressNetworkingV1:
ingressInformer = kubeFactory.Networking().V1().Ingresses().Informer()
ingressListerV1 = kubeFactory.Networking().V1().Ingresses().Lister()
case config.IngressNetworkingV1beta1:
ingressInformer = kubeFactory.Networking().V1beta1().Ingresses().Informer()
ingressListerV1beta1 = kubeFactory.Networking().V1beta1().Ingresses().Lister()
default:
ingressInformer = kubeFactory.Extensions().V1beta1().Ingresses().Informer()
ingressListerExtensionsV1beta1 = kubeFactory.Extensions().V1beta1().Ingresses().Lister()
}
ingressLister := kube.NewIngressLister(ingressListerV1, ingressListerV1beta1, ingressListerExtensionsV1beta1)
listerInformer := &providertypes.ListerInformer{
ApisixFactory: apisixFactory,
KubeFactory: kubeFactory,
EpLister: epLister,
EpInformer: epInformer,
SvcLister: svcLister,
SvcInformer: svcInformer,
SecretLister: secretLister,
SecretInformer: secretInformer,
PodLister: podLister,
PodInformer: podInformer,
ConfigMapInformer: configmapInformer,
ConfigMapLister: configmapLister,
IngressInformer: ingressInformer,
IngressLister: ingressLister,
ApisixUpstreamLister: apisixUpstreamLister,
ApisixRouteLister: apisixRouteLister,
ApisixConsumerLister: apisixConsumerLister,
ApisixTlsLister: apisixTlsLister,
ApisixPluginConfigLister: apisixPluginConfigLister,
ApisixClusterConfigLister: apisixClusterConfigLister,
ApisixUpstreamInformer: apisixUpstreamInformer,
ApisixPluginConfigInformer: apisixPluginConfigInformer,
ApisixRouteInformer: apisixRouteInformer,
ApisixClusterConfigInformer: apisixClusterConfigInformer,
ApisixConsumerInformer: apisixConsumerInformer,
ApisixTlsInformer: apisixTlsInformer,
}
return listerInformer
}
func (c *Controller) run(ctx context.Context) {
log.Infow("controller tries to leading ...",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
var cancelFunc context.CancelFunc
ctx, cancelFunc = context.WithCancel(ctx)
defer cancelFunc()
// give up leader
defer c.leaderContextCancelFunc()
clusterOpts := &apisix.ClusterOptions{
AdminAPIVersion: c.cfg.APISIX.AdminAPIVersion,
Name: c.cfg.APISIX.DefaultClusterName,
AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
BaseURL: c.cfg.APISIX.DefaultClusterBaseURL,
MetricsCollector: c.MetricsCollector,
}
err := c.apisix.AddCluster(ctx, clusterOpts)
if err != nil && err != apisix.ErrDuplicatedCluster {
// TODO give up the leader role
log.Errorf("failed to add default cluster: %s", err)
return
}
if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
// TODO give up the leader role
log.Errorf("failed to wait the default cluster to be ready: %s", err)
// re-create apisix cluster, used in next c.run
if err = c.apisix.UpdateCluster(ctx, clusterOpts); err != nil {
log.Errorf("failed to update default cluster: %s", err)
return
}
return
}
// Creation Phase
c.informers = c.initSharedInformers()
common := &providertypes.Common{
ControllerNamespace: c.namespace,
ListerInformer: c.informers,
Config: c.cfg,
APISIX: c.apisix,
KubeClient: c.kubeClient,
MetricsCollector: c.MetricsCollector,
Recorder: c.recorder,
}
c.namespaceProvider, err = namespace.NewWatchingNamespaceProvider(ctx, c.kubeClient, c.cfg)
if err != nil {
ctx.Done()
return
}
c.podProvider, err = pod.NewProvider(common, c.namespaceProvider)
if err != nil {
ctx.Done()
return
}
c.translator = translation.NewTranslator(&translation.TranslatorOptions{
APIVersion: c.cfg.Kubernetes.APIVersion,
EndpointLister: c.informers.EpLister,
ServiceLister: c.informers.SvcLister,
SecretLister: c.informers.SecretLister,
PodLister: c.informers.PodLister,
ApisixUpstreamLister: c.informers.ApisixUpstreamLister,
PodProvider: c.podProvider,
})
c.apisixProvider, c.apisixTranslator, err = apisixprovider.NewProvider(common, c.namespaceProvider, c.translator)
if err != nil {
ctx.Done()
return
}
c.ingressProvider, err = ingressprovider.NewProvider(common, c.namespaceProvider, c.translator, c.apisixTranslator)
if err != nil {
ctx.Done()
return
}
c.kubeProvider, err = k8s.NewProvider(common, c.translator, c.namespaceProvider, c.apisixProvider, c.ingressProvider)
if err != nil {
ctx.Done()
return
}
if c.cfg.Kubernetes.EnableGatewayAPI {
c.gatewayProvider, err = gateway.NewGatewayProvider(&gateway.ProviderOptions{
Cfg: c.cfg,
APISIX: c.apisix,
APISIXClusterName: c.cfg.APISIX.DefaultClusterName,
KubeTranslator: c.translator,
RestConfig: nil,
KubeClient: c.kubeClient.Client,
MetricsCollector: c.MetricsCollector,
NamespaceProvider: c.namespaceProvider,
})
if err != nil {
ctx.Done()
return
}
}
// Init Phase
if err = c.namespaceProvider.Init(ctx); err != nil {
ctx.Done()
return
}
// Wait Resouce sync
if ok := c.informers.StartAndWaitForCacheSync(ctx); !ok {
ctx.Done()
return
}
// Compare resource
if err = c.apisixProvider.Init(ctx); err != nil {
ctx.Done()
return
}
// Run Phase
e := utils.ParallelExecutor{}
e.Add(func() {
c.checkClusterHealth(ctx, cancelFunc)
})
e.Add(func() {
c.namespaceProvider.Run(ctx)
})
e.Add(func() {
c.kubeProvider.Run(ctx)
})
e.Add(func() {
c.apisixProvider.Run(ctx)
})
e.Add(func() {
c.ingressProvider.Run(ctx)
})
if c.cfg.Kubernetes.EnableGatewayAPI {
e.Add(func() {
c.gatewayProvider.Run(ctx)
})
}
e.Add(func() {
c.resourceSyncLoop(ctx, c.cfg.ApisixResourceSyncInterval.Duration)
})
c.MetricsCollector.ResetLeader(true)
log.Infow("controller now is running as leader",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
<-ctx.Done()
e.Wait()
for _, execErr := range e.Errors() {
log.Error(execErr.Error())
}
if len(e.Errors()) > 0 {
log.Error("Start failed, abort...")
cancelFunc()
}
}
func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.CancelFunc) {
defer cancelFunc()
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HealthCheck(ctx)
if err != nil {
// Finally failed health check, then give up leader.
log.Warnf("failed to check health for default cluster: %s, give up leader", err)
c.apiServer.HealthState.Lock()
defer c.apiServer.HealthState.Unlock()
c.apiServer.HealthState.Err = err
return
}
log.Debugf("success check health for default cluster")
c.MetricsCollector.IncrCheckClusterHealth(c.name)
}
}
func (c *Controller) syncAllResources() {
e := utils.ParallelExecutor{}
e.Add(c.ingressProvider.ResourceSync)
e.Add(c.apisixProvider.ResourceSync)
e.Wait()
}
func (c *Controller) resourceSyncLoop(ctx context.Context, interval time.Duration) {
// The interval shall not be less than 60 seconds.
if interval < _mininumApisixResourceSyncInterval {
log.Warnw("The apisix-resource-sync-interval shall not be less than 60 seconds.",
zap.String("apisix-resource-sync-interval", interval.String()),
)
interval = _mininumApisixResourceSyncInterval
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.syncAllResources()
continue
case <-ctx.Done():
return
}
}
}