blob: 16f4b82759866adb27005557ce0e639d0b15c0c5 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gateway
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"text/template"
"time"
)
import (
istiolog "istio.io/pkg/log"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
appsinformersv1 "k8s.io/client-go/informers/apps/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
gateway "sigs.k8s.io/gateway-api/apis/v1alpha2"
"sigs.k8s.io/gateway-api/pkg/client/listers/apis/v1alpha2"
"sigs.k8s.io/yaml"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/kube/controllers"
)
// DeploymentController implements a controller that materializes a Gateway into an in cluster gateway proxy
// to serve requests from. This is implemented with a Deployment and Service today.
// The implementation makes a few non-obvious choices - namely using Server Side Apply from go templates
// and not using controller-runtime.
//
// controller-runtime has a number of constraints that make it inappropriate for usage here, despite this
// seeming to be the bread and butter of the library:
// * It is not readily possible to bring existing Informers, which would require extra watches (#1668)
// * Goroutine leaks (#1655)
// * Excessive API-server calls at startup which have no benefit to us (#1603)
// * Hard to use with SSA (#1669)
// While these can be worked around, at some point it isn't worth the effort.
//
// Server Side Apply with go templates is an odd choice (no one likes YAML templating...) but is one of the few
// remaining options after all others are ruled out.
// - Merge patch/Update cannot be used. If we always enforce that our object is *exactly* the same as
// the in-cluster object we will get in endless loops due to other controllers that like to add annotations, etc.
// If we chose to allow any unknown fields, then we would never be able to remove fields we added, as
// we cannot tell if we created it or someone else did. SSA fixes these issues
// - SSA using client-go Apply libraries is almost a good choice, but most third-party clients (Istio, MCS, and gateway-api)
// do not provide these libraries.
// - SSA using standard API types doesn't work well either: https://github.com/kubernetes-sigs/controller-runtime/issues/1669
// - This leaves YAML templates, converted to unstructured types and Applied with the dynamic client.
type DeploymentController struct {
client kube.Client
queue controllers.Queue
templates *template.Template
patcher patcher
gatewayLister v1alpha2.GatewayLister
gatewayClassLister v1alpha2.GatewayClassLister
}
// Patcher is a function that abstracts patching logic. This is largely because client-go fakes do not handle patching
type patcher func(gvr schema.GroupVersionResource, name string, namespace string, data []byte, subresources ...string) error
// NewDeploymentController constructs a DeploymentController and registers required informers.
// The controller will not start until Run() is called.
func NewDeploymentController(client kube.Client) *DeploymentController {
gw := client.GatewayAPIInformer().Gateway().V1alpha2().Gateways()
gwc := client.GatewayAPIInformer().Gateway().V1alpha2().GatewayClasses()
dc := &DeploymentController{
client: client,
templates: processTemplates(),
patcher: func(gvr schema.GroupVersionResource, name string, namespace string, data []byte, subresources ...string) error {
c := client.Dynamic().Resource(gvr).Namespace(namespace)
t := true
_, err := c.Patch(context.Background(), name, types.ApplyPatchType, data, metav1.PatchOptions{
Force: &t,
FieldManager: ControllerName,
}, subresources...)
return err
},
gatewayLister: gw.Lister(),
gatewayClassLister: gwc.Lister(),
}
dc.queue = controllers.NewQueue("gateway deployment",
controllers.WithReconciler(dc.Reconcile),
controllers.WithMaxAttempts(5))
// Set up a handler that will add the parent Gateway object onto the queue.
// The queue will only handle Gateway objects; if child resources (Service, etc) are updated we re-add
// the Gateway to the queue and reconcile the state of the world.
handler := controllers.ObjectHandler(controllers.EnqueueForParentHandler(dc.queue, gvk.KubernetesGateway))
// Use the full informer, since we are already fetching all Services for other purposes
// If we somehow stop watching Services in the future we can add a label selector like below.
client.KubeInformer().Core().V1().Services().Informer().
AddEventHandler(handler)
// For Deployments, this is the only controller watching. We can filter to just the deployments we care about
client.KubeInformer().InformerFor(&appsv1.Deployment{}, func(k kubernetes.Interface, resync time.Duration) cache.SharedIndexInformer {
return appsinformersv1.NewFilteredDeploymentInformer(
k, metav1.NamespaceAll, resync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(options *metav1.ListOptions) {
options.LabelSelector = "gateway.istio.io/managed=istio.io-gateway-controller"
},
)
}).AddEventHandler(handler)
// Use the full informer; we are already watching all Gateways for the core Istiod logic
gw.Informer().AddEventHandler(controllers.ObjectHandler(dc.queue.AddObject))
gwc.Informer().AddEventHandler(controllers.ObjectHandler(func(o controllers.Object) {
o.GetName()
gws, _ := dc.gatewayLister.List(klabels.Everything())
for _, g := range gws {
if string(g.Spec.GatewayClassName) == o.GetName() {
dc.queue.AddObject(g)
}
}
}))
return dc
}
func (d *DeploymentController) Run(stop <-chan struct{}) {
d.queue.Run(stop)
}
// Reconcile takes in the name of a Gateway and ensures the cluster is in the desired state
func (d *DeploymentController) Reconcile(req types.NamespacedName) error {
log := log.WithLabels("gateway", req)
gw, err := d.gatewayLister.Gateways(req.Namespace).Get(req.Name)
if err != nil || gw == nil {
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
if err := controllers.IgnoreNotFound(err); err != nil {
log.Errorf("unable to fetch Gateway: %v", err)
return err
}
return nil
}
gc, _ := d.gatewayClassLister.Get(string(gw.Spec.GatewayClassName))
if gc != nil {
// We found the gateway class, but we do not implement it. Skip
if gc.Spec.ControllerName != ControllerName {
return nil
}
} else {
// Didn't find gateway class... it must use implicit Istio one.
if gw.Spec.GatewayClassName != DefaultClassName {
return nil
}
}
// Matched class, reconcile it
return d.configureIstioGateway(log, *gw)
}
func (d *DeploymentController) configureIstioGateway(log *istiolog.Scope, gw gateway.Gateway) error {
// If user explicitly sets addresses, we are assuming they are pointing to an existing deployment.
// We will not manage it in this case
if !isManaged(&gw.Spec) {
log.Debug("skip unmanaged gateway")
return nil
}
log.Info("reconciling")
svc := serviceInput{Gateway: &gw, Ports: extractServicePorts(gw)}
if err := d.ApplyTemplate("service.yaml", svc); err != nil {
return fmt.Errorf("update service: %v", err)
}
log.Info("service updated")
dep := deploymentInput{Gateway: &gw, KubeVersion122: kube.IsAtLeastVersion(d.client, 22)}
if err := d.ApplyTemplate("deployment.yaml", dep); err != nil {
return fmt.Errorf("update deployment: %v", err)
}
log.Info("deployment updated")
gws := &gateway.Gateway{
TypeMeta: metav1.TypeMeta{
Kind: gvk.KubernetesGateway.Kind,
APIVersion: gvk.KubernetesGateway.Group + "/" + gvk.KubernetesGateway.Version,
},
ObjectMeta: metav1.ObjectMeta{
Name: gw.Name,
Namespace: gw.Namespace,
},
Status: gateway.GatewayStatus{
Conditions: setConditions(gw.Generation, nil, map[string]*condition{
string(gateway.GatewayConditionScheduled): {
reason: "ResourcesAvailable",
message: "Deployed gateway to the cluster",
},
}),
},
}
if err := d.ApplyObject(gws, "status"); err != nil {
return fmt.Errorf("update gateway status: %v", err)
}
log.Info("gateway updated")
return nil
}
// ApplyTemplate renders a template with the given input and (server-side) applies the results to the cluster.
func (d *DeploymentController) ApplyTemplate(template string, input metav1.Object, subresources ...string) error {
var buf bytes.Buffer
if err := d.templates.ExecuteTemplate(&buf, template, input); err != nil {
return err
}
data := map[string]interface{}{}
err := yaml.Unmarshal(buf.Bytes(), &data)
if err != nil {
return err
}
us := unstructured.Unstructured{Object: data}
gvr, err := controllers.UnstructuredToGVR(us)
if err != nil {
return err
}
j, err := json.Marshal(us.Object)
if err != nil {
return err
}
log.Debugf("applying %v", string(j))
return d.patcher(gvr, us.GetName(), input.GetNamespace(), j, subresources...)
}
// ApplyObject renders an object with the given input and (server-side) applies the results to the cluster.
func (d *DeploymentController) ApplyObject(obj controllers.Object, subresources ...string) error {
j, err := config.ToJSON(obj)
if err != nil {
return err
}
gvr, err := controllers.ObjectToGVR(obj)
if err != nil {
return err
}
log.Debugf("applying %v", string(j))
return d.patcher(gvr, obj.GetName(), obj.GetNamespace(), j, subresources...)
}
// Merge maps merges multiple maps. Latter maps take precedence over previous maps on overlapping fields
func mergeMaps(maps ...map[string]string) map[string]string {
if len(maps) == 0 {
return nil
}
res := make(map[string]string, len(maps[0]))
for _, m := range maps {
for k, v := range m {
res[k] = v
}
}
return res
}
type serviceInput struct {
*gateway.Gateway
Ports []corev1.ServicePort
}
type deploymentInput struct {
*gateway.Gateway
KubeVersion122 bool
}
func extractServicePorts(gw gateway.Gateway) []corev1.ServicePort {
svcPorts := make([]corev1.ServicePort, 0, len(gw.Spec.Listeners)+1)
svcPorts = append(svcPorts, corev1.ServicePort{
Name: "status-port",
Port: int32(15021),
})
portNums := map[int32]struct{}{}
for i, l := range gw.Spec.Listeners {
if _, f := portNums[int32(l.Port)]; f {
continue
}
portNums[int32(l.Port)] = struct{}{}
name := string(l.Name)
if name == "" {
// Should not happen since name is required, but in case an invalid resource gets in...
name = fmt.Sprintf("%s-%d", strings.ToLower(string(l.Protocol)), i)
}
svcPorts = append(svcPorts, corev1.ServicePort{
Name: name,
Port: int32(l.Port),
})
}
return svcPorts
}