blob: 175b1b99c4e24bdab9ca42d3de68fb07226981bc [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"context"
"time"
)
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
mcsapi "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
serviceRegistryKube "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/kube/mcs"
"github.com/apache/dubbo-go-pixiu/pkg/queue"
)
type autoServiceExportController struct {
autoServiceExportOptions
client kube.Client
queue queue.Instance
serviceInformer cache.SharedInformer
// We use this flag to short-circuit the logic and stop the controller
// if the CRD does not exist (or is deleted)
mcsSupported bool
}
// autoServiceExportOptions provide options for creating a autoServiceExportController.
type autoServiceExportOptions struct {
Client kube.Client
ClusterID cluster.ID
DomainSuffix string
ClusterLocal model.ClusterLocalProvider
}
// newAutoServiceExportController creates a new autoServiceExportController.
func newAutoServiceExportController(opts autoServiceExportOptions) *autoServiceExportController {
c := &autoServiceExportController{
autoServiceExportOptions: opts,
client: opts.Client,
queue: queue.NewQueue(time.Second),
mcsSupported: true,
}
log.Infof("%s starting controller", c.logPrefix())
c.serviceInformer = opts.Client.KubeInformer().Core().V1().Services().Informer()
c.serviceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.onServiceAdd(obj) },
// Do nothing on update. The controller only acts on parts of the service
// that are immutable (e.g. name).
// Do nothing on delete. When we create ServiceExport, we bind its
// lifecycle to the Service so that when the Service is deleted,
// k8s automatically deletes the ServiceExport.
})
return c
}
func (c *autoServiceExportController) onServiceAdd(obj interface{}) {
c.queue.Push(func() error {
if !c.mcsSupported {
// Don't create ServiceExport if MCS is not supported on the cluster.
log.Debugf("%s ignoring added Service, since !mcsSupported", c.logPrefix())
return nil
}
svc, err := convertToService(obj)
if err != nil {
log.Warnf("%s failed converting service: %v", c.logPrefix(), err)
return err
}
if c.isClusterLocalService(svc) {
// Don't create ServiceExport if the service is configured to be
// local to the cluster (i.e. non-exported).
log.Debugf("%s ignoring cluster-local service %s/%s", c.logPrefix(), svc.Namespace, svc.Name)
return nil
}
return c.createServiceExportIfNotPresent(svc)
})
}
func (c *autoServiceExportController) Run(stopCh <-chan struct{}) {
if !cache.WaitForCacheSync(stopCh, c.serviceInformer.HasSynced) {
log.Errorf("%s failed to sync cache", c.logPrefix())
return
}
log.Infof("%s started", c.logPrefix())
go c.queue.Run(stopCh)
}
func (c *autoServiceExportController) logPrefix() string {
return "AutoServiceExport (cluster=" + c.ClusterID.String() + ") "
}
func (c *autoServiceExportController) createServiceExportIfNotPresent(svc *v1.Service) error {
serviceExport := mcsapi.ServiceExport{
TypeMeta: metav1.TypeMeta{
Kind: "ServiceExport",
APIVersion: mcs.MCSSchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Namespace: svc.Namespace,
Name: svc.Name,
// Bind the lifecycle of the ServiceExport to the Service. We do this by making the Service
// the "owner" of the ServiceExport resource.
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: v1.SchemeGroupVersion.String(),
Kind: "Service",
Name: svc.Name,
UID: svc.UID,
},
},
},
}
// Convert to unstructured.
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&serviceExport)
if err != nil {
log.Warnf("%s failed converting ServiceExport %s/%s to Unstructured: %v", c.logPrefix(),
svc.Namespace, svc.Name, err)
return err
}
if _, err = c.client.Dynamic().Resource(mcs.ServiceExportGVR).Namespace(serviceExport.Namespace).Create(
context.TODO(), &unstructured.Unstructured{Object: u}, metav1.CreateOptions{}); err != nil {
switch {
case errors.IsAlreadyExists(err):
// The ServiceExport already exists. Nothing to do.
return nil
case errors.IsNotFound(err):
log.Warnf("%s ServiceExport CRD Not found. Shutting down MCS ServiceExport sync. "+
"Please add the CRD then restart the istiod deployment", c.logPrefix())
c.mcsSupported = false
// Do not return the error, so that the queue does not attempt a retry.
return nil
}
}
if err != nil {
log.Warnf("%s failed creating ServiceExport %s/%s: %v", c.logPrefix(), svc.Namespace, svc.Name, err)
return err
}
log.Debugf("%s created ServiceExport %s/%s", c.logPrefix(), svc.Namespace, svc.Name)
return nil
}
func (c *autoServiceExportController) isClusterLocalService(svc *v1.Service) bool {
hostname := serviceRegistryKube.ServiceHostname(svc.Name, svc.Namespace, c.DomainSuffix)
return c.ClusterLocal.GetClusterLocalHosts().IsClusterLocal(hostname)
}