blob: d2148b2b181165173cdf6511c260f0e9d5d4dce1 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gateway
import (
"fmt"
"sync"
"time"
)
import (
"go.uber.org/atomic"
istiolog "istio.io/pkg/log"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
k8s "sigs.k8s.io/gateway-api/apis/v1alpha2"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/config/kube/crdclient"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model/credentials"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model/kstatus"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube/controller"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/status"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collection"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collections"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
var log = istiolog.RegisterScope("gateway", "gateway-api controller", 0)
var (
errUnsupportedOp = fmt.Errorf("unsupported operation: the gateway config store is a read-only view")
errUnsupportedType = fmt.Errorf("unsupported type: this operation only supports gateway and virtual service resource type")
)
// Controller defines the controller for the gateway-api. The controller acts a bit different from most.
// Rather than watching the CRs directly, we depend on the existing model.ConfigStoreController which
// already watches all CRs. When there are updates, a new PushContext will be computed, which will eventually
// call Controller.Recompute(). Once this happens, we will inspect the current state of the world, and transform
// gateway-api types into Istio types (Gateway/VirtualService). Future calls to Get/List will return these
// Istio types. These are not stored in the cluster at all, and are purely internal; they can be seen on /debug/configz.
// During Recompute(), the status on all gateway-api types is also tracked. Once completed, if the status
// has changed at all, it is queued to asynchronously update the status of the object in Kubernetes.
type Controller struct {
// client for accessing Kubernetes
client kube.Client
// cache provides access to the underlying gateway-configs
cache model.ConfigStoreController
// Gateway-api types reference namespace labels directly, so we need access to these
namespaceLister listerv1.NamespaceLister
namespaceInformer cache.SharedIndexInformer
namespaceHandler model.EventHandler
// domain stores the cluster domain, typically cluster.local
domain string
// state is our computed Istio resources. Access is guarded by stateMu. This is updated from Recompute().
state OutputResources
stateMu sync.RWMutex
// statusController controls the status working queue. Status will only be written if statusEnabled is true, which
// is only the case when we are the leader.
statusController *status.Controller
statusEnabled *atomic.Bool
}
var _ model.GatewayController = &Controller{}
func NewController(client kube.Client, c model.ConfigStoreController, options controller.Options) *Controller {
var ctl *status.Controller
nsInformer := client.KubeInformer().Core().V1().Namespaces().Informer()
gatewayController := &Controller{
client: client,
cache: c,
namespaceLister: client.KubeInformer().Core().V1().Namespaces().Lister(),
namespaceInformer: nsInformer,
domain: options.DomainSuffix,
statusController: ctl,
// Disabled by default, we will enable only if we win the leader election
statusEnabled: atomic.NewBool(false),
}
nsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
gatewayController.namespaceEvent(nil, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
gatewayController.namespaceEvent(oldObj, newObj)
},
})
return gatewayController
}
func (c *Controller) Schemas() collection.Schemas {
return collection.SchemasFor(
collections.IstioNetworkingV1Alpha3Virtualservices,
collections.IstioNetworkingV1Alpha3Gateways,
)
}
func (c *Controller) Get(typ config.GroupVersionKind, name, namespace string) *config.Config {
return nil
}
func (c *Controller) List(typ config.GroupVersionKind, namespace string) ([]config.Config, error) {
if typ != gvk.Gateway && typ != gvk.VirtualService {
return nil, errUnsupportedType
}
c.stateMu.RLock()
defer c.stateMu.RUnlock()
switch typ {
case gvk.Gateway:
return filterNamespace(c.state.Gateway, namespace), nil
case gvk.VirtualService:
return filterNamespace(c.state.VirtualService, namespace), nil
default:
return nil, errUnsupportedType
}
}
func (c *Controller) SetStatusWrite(enabled bool, statusManager *status.Manager) {
c.statusEnabled.Store(enabled)
if enabled && features.EnableGatewayAPIStatus && statusManager != nil {
c.statusController = statusManager.CreateGenericController(func(status interface{}, context interface{}) status.GenerationProvider {
return &gatewayGeneration{context}
})
} else {
c.statusController = nil
}
}
// Recompute takes in a current snapshot of the gateway-api configs, and regenerates our internal state.
// Any status updates required will be enqueued as well.
func (c *Controller) Recompute(context model.GatewayContext) error {
t0 := time.Now()
defer func() {
log.Debugf("recompute complete in %v", time.Since(t0))
}()
gatewayClass, err := c.cache.List(gvk.GatewayClass, metav1.NamespaceAll)
if err != nil {
return fmt.Errorf("failed to list type GatewayClass: %v", err)
}
gateway, err := c.cache.List(gvk.KubernetesGateway, metav1.NamespaceAll)
if err != nil {
return fmt.Errorf("failed to list type Gateway: %v", err)
}
httpRoute, err := c.cache.List(gvk.HTTPRoute, metav1.NamespaceAll)
if err != nil {
return fmt.Errorf("failed to list type HTTPRoute: %v", err)
}
tcpRoute, err := c.cache.List(gvk.TCPRoute, metav1.NamespaceAll)
if err != nil {
return fmt.Errorf("failed to list type TCPRoute: %v", err)
}
tlsRoute, err := c.cache.List(gvk.TLSRoute, metav1.NamespaceAll)
if err != nil {
return fmt.Errorf("failed to list type TLSRoute: %v", err)
}
referencePolicy, err := c.cache.List(gvk.ReferencePolicy, metav1.NamespaceAll)
if err != nil {
return fmt.Errorf("failed to list type BackendPolicy: %v", err)
}
input := &KubernetesResources{
GatewayClass: deepCopyStatus(gatewayClass),
Gateway: deepCopyStatus(gateway),
HTTPRoute: deepCopyStatus(httpRoute),
TCPRoute: deepCopyStatus(tcpRoute),
TLSRoute: deepCopyStatus(tlsRoute),
ReferencePolicy: referencePolicy,
Domain: c.domain,
Context: context,
}
if !anyApisUsed(input) {
// Early exit for common case of no gateway-api used.
c.stateMu.Lock()
defer c.stateMu.Unlock()
// make sure we clear out the state, to handle the last gateway-api resource being removed
c.state = OutputResources{}
return nil
}
nsl, err := c.namespaceLister.List(klabels.Everything())
if err != nil {
return fmt.Errorf("failed to list type Namespaces: %v", err)
}
namespaces := map[string]*corev1.Namespace{}
for _, ns := range nsl {
namespaces[ns.Name] = ns
}
input.Namespaces = namespaces
output := convertResources(input)
// Handle all status updates
c.QueueStatusUpdates(input)
c.stateMu.Lock()
defer c.stateMu.Unlock()
c.state = output
return nil
}
func (c *Controller) QueueStatusUpdates(r *KubernetesResources) {
c.handleStatusUpdates(r.GatewayClass)
c.handleStatusUpdates(r.Gateway)
c.handleStatusUpdates(r.HTTPRoute)
c.handleStatusUpdates(r.TCPRoute)
c.handleStatusUpdates(r.TLSRoute)
}
func (c *Controller) handleStatusUpdates(configs []config.Config) {
if c.statusController == nil || !c.statusEnabled.Load() {
return
}
for _, cfg := range configs {
ws := cfg.Status.(*kstatus.WrappedStatus)
if ws.Dirty {
res := status.ResourceFromModelConfig(cfg)
c.statusController.EnqueueStatusUpdateResource(ws.Unwrap(), res)
}
}
}
func (c *Controller) Create(config config.Config) (revision string, err error) {
return "", errUnsupportedOp
}
func (c *Controller) Update(config config.Config) (newRevision string, err error) {
return "", errUnsupportedOp
}
func (c *Controller) UpdateStatus(config config.Config) (newRevision string, err error) {
return "", errUnsupportedOp
}
func (c *Controller) Patch(orig config.Config, patchFn config.PatchFunc) (string, error) {
return "", errUnsupportedOp
}
func (c *Controller) Delete(typ config.GroupVersionKind, name, namespace string, _ *string) error {
return errUnsupportedOp
}
func (c *Controller) RegisterEventHandler(typ config.GroupVersionKind, handler model.EventHandler) {
switch typ {
case gvk.Namespace:
c.namespaceHandler = handler
}
// For all other types, do nothing as c.cache has been registered
}
func (c *Controller) Run(stop <-chan struct{}) {
go func() {
if crdclient.WaitForCRD(gvk.GatewayClass, stop) {
gcc := NewClassController(c.client)
c.client.RunAndWait(stop)
gcc.Run(stop)
}
}()
cache.WaitForCacheSync(stop, c.namespaceInformer.HasSynced)
}
func (c *Controller) SetWatchErrorHandler(handler func(r *cache.Reflector, err error)) error {
return c.cache.SetWatchErrorHandler(handler)
}
func (c *Controller) HasSynced() bool {
return c.cache.HasSynced()
}
func (c *Controller) SecretAllowed(resourceName string, namespace string) bool {
p, err := credentials.ParseResourceName(resourceName, "", "", "")
if err != nil {
log.Warnf("failed to parse resource name %q: %v", resourceName, err)
return false
}
from := Reference{Kind: gvk.KubernetesGateway, Namespace: k8s.Namespace(namespace)}
to := Reference{Kind: gvk.Secret, Namespace: k8s.Namespace(p.Namespace)}
allow := c.state.AllowedReferences[from][to]
if allow == nil {
return false
}
return allow.AllowAll || allow.AllowedNames.Contains(p.Name)
}
// namespaceEvent handles a namespace add/update. Gateway's can select routes by label, so we need to handle
// when the labels change.
// Note: we don't handle delete as a delete would also clean up any relevant gateway-api types which will
// trigger its own event.
func (c *Controller) namespaceEvent(oldObj interface{}, newObj interface{}) {
// First, find all the label keys on the old/new namespace. We include NamespaceNameLabel
// since we have special logic to always allow this on namespace.
touchedNamespaceLabels := sets.New(NamespaceNameLabel)
touchedNamespaceLabels.InsertAll(getLabelKeys(oldObj)...)
touchedNamespaceLabels.InsertAll(getLabelKeys(newObj)...)
// Next, we find all keys our Gateways actually reference.
c.stateMu.RLock()
intersection := touchedNamespaceLabels.Intersection(c.state.ReferencedNamespaceKeys)
c.stateMu.RUnlock()
// If there was any overlap, then a relevant namespace label may have changed, and we trigger a
// push A more exact check could actually determine if the label selection result actually changed.
// However, this is a much simpler approach that is likely to scale well enough for now.
if !intersection.IsEmpty() && c.namespaceHandler != nil {
log.Debugf("namespace labels changed, triggering namespace handler: %v", intersection.UnsortedList())
c.namespaceHandler(config.Config{}, config.Config{}, model.EventUpdate)
}
}
// getLabelKeys extracts all label keys from a namespace object.
func getLabelKeys(obj interface{}) []string {
if obj == nil {
return nil
}
ns, ok := obj.(*corev1.Namespace)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil
}
ns, ok = tombstone.Obj.(*corev1.Namespace)
if !ok {
return nil
}
}
keys := make([]string, 0, len(ns.Labels))
for k := range ns.Labels {
keys = append(keys, k)
}
return keys
}
// deepCopyStatus creates a copy of all configs, with a copy of the status field that we can mutate.
// This allows our functions to call Status.Mutate, and then we can later persist all changes into the
// API server.
func deepCopyStatus(configs []config.Config) []config.Config {
res := make([]config.Config, 0, len(configs))
for _, c := range configs {
nc := config.Config{
Meta: c.Meta,
Spec: c.Spec,
Status: kstatus.Wrap(c.Status),
}
res = append(res, nc)
}
return res
}
// filterNamespace allows filtering out configs to only a specific namespace. This allows implementing the
// List call which can specify a specific namespace.
func filterNamespace(cfgs []config.Config, namespace string) []config.Config {
if namespace == metav1.NamespaceAll {
return cfgs
}
filtered := make([]config.Config, 0, len(cfgs))
for _, c := range cfgs {
if c.Namespace == namespace {
filtered = append(filtered, c)
}
}
return filtered
}
// anyApisUsed determines if there are any gateway-api resources created at all. If not, we can
// short circuit all processing to avoid excessive work.
func anyApisUsed(input *KubernetesResources) bool {
return len(input.GatewayClass) > 0 ||
len(input.Gateway) > 0 ||
len(input.HTTPRoute) > 0 ||
len(input.TCPRoute) > 0 ||
len(input.TLSRoute) > 0 ||
len(input.ReferencePolicy) > 0
}