blob: 14876d1ece2f6b9739ea4e2518a754ed3c3b5530 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package controller implements a k8s controller for managing the lifecycle of a validating webhook.
package controller
import (
"bytes"
"context"
"errors"
"fmt"
"reflect"
"strings"
"time"
)
import (
"github.com/hashicorp/go-multierror"
"istio.io/api/label"
networking "istio.io/api/networking/v1alpha3"
"istio.io/client-go/pkg/apis/networking/v1alpha3"
"istio.io/pkg/log"
kubeApiAdmission "k8s.io/api/admissionregistration/v1"
kubeErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/keycertbundle"
"github.com/apache/dubbo-go-pixiu/pkg/config/labels"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/webhooks/util"
)
var scope = log.RegisterScope("validationController", "validation webhook controller", 0)
type Options struct {
// Istio system namespace where istiod resides.
WatchedNamespace string
// File path to the x509 certificate bundle used by the webhook server
// and patched into the webhook config.
CABundleWatcher *keycertbundle.Watcher
// Revision for control plane performing patching on the validating webhook.
Revision string
// Name of the service running the webhook server.
ServiceName string
}
// Validate the options that exposed to end users
func (o Options) Validate() error {
var errs *multierror.Error
if o.WatchedNamespace == "" || !labels.IsDNS1123Label(o.WatchedNamespace) {
errs = multierror.Append(errs, fmt.Errorf("invalid namespace: %q", o.WatchedNamespace))
}
if o.ServiceName == "" || !labels.IsDNS1123Label(o.ServiceName) {
errs = multierror.Append(errs, fmt.Errorf("invalid service name: %q", o.ServiceName))
}
if o.CABundleWatcher == nil {
errs = multierror.Append(errs, errors.New("CA bundle watcher not specified"))
}
return errs.ErrorOrNil()
}
// String produces a string field version of the arguments for debugging.
func (o Options) String() string {
buf := &bytes.Buffer{}
_, _ = fmt.Fprintf(buf, "WatchedNamespace: %v\n", o.WatchedNamespace)
_, _ = fmt.Fprintf(buf, "Revision: %v\n", o.Revision)
_, _ = fmt.Fprintf(buf, "ServiceName: %v\n", o.ServiceName)
return buf.String()
}
type Controller struct {
o Options
client kube.Client
webhookInformer cache.SharedInformer
queue workqueue.RateLimitingInterface
dryRunOfInvalidConfigRejected bool
}
// NewValidatingWebhookController creates a new Controller.
func NewValidatingWebhookController(client kube.Client,
revision, ns string, caBundleWatcher *keycertbundle.Watcher) *Controller {
o := Options{
WatchedNamespace: ns,
CABundleWatcher: caBundleWatcher,
Revision: revision,
ServiceName: "istiod",
}
return newController(o, client)
}
type eventType string
const (
retryEvent eventType = "retryEvent"
updateEvent eventType = "updateEvent"
)
type reconcileRequest struct {
event eventType
description string
webhookName string // if empty, ALL webhooks should be updated in response
}
func (rr reconcileRequest) String() string {
return fmt.Sprintf("[description] %s, [eventType] %s, [name] %s", rr.description, rr.event, rr.webhookName)
}
func filterWatchedObject(obj metav1.Object) (skip bool, key string) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return true, ""
}
return false, key
}
func makeHandler(queue workqueue.Interface, gvk schema.GroupVersionKind) *cache.ResourceEventHandlerFuncs {
return &cache.ResourceEventHandlerFuncs{
AddFunc: func(curr interface{}) {
obj, err := meta.Accessor(curr)
if err != nil {
return
}
skip, key := filterWatchedObject(obj)
scope.Debugf("HandlerAdd: key=%v skip=%v", key, skip)
if skip {
return
}
req := reconcileRequest{
event: updateEvent,
webhookName: obj.GetName(),
description: fmt.Sprintf("add event (%v, Kind=%v) %v", gvk.GroupVersion(), gvk.Kind, key),
}
queue.Add(req)
},
UpdateFunc: func(prev, curr interface{}) {
currObj, err := meta.Accessor(curr)
if err != nil {
return
}
prevObj, err := meta.Accessor(prev)
if err != nil {
return
}
if currObj.GetResourceVersion() == prevObj.GetResourceVersion() {
return
}
skip, key := filterWatchedObject(currObj)
scope.Debugf("HandlerUpdate: key=%v skip=%v", key, skip)
if skip {
return
}
req := reconcileRequest{
event: updateEvent,
webhookName: currObj.GetName(),
description: fmt.Sprintf("update event (%v, Kind=%v) %v", gvk.GroupVersion(), gvk.Kind, key),
}
queue.Add(req)
},
}
}
// precompute GVK for known types.
var (
configGVK = kubeApiAdmission.SchemeGroupVersion.WithKind(reflect.TypeOf(kubeApiAdmission.ValidatingWebhookConfiguration{}).Name())
)
func newController(
o Options,
client kube.Client,
) *Controller {
c := &Controller{
o: o,
client: client,
queue: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, 5*time.Minute)),
}
webhookInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
opts.LabelSelector = fmt.Sprintf("%s=%s", label.IoIstioRev.Name, o.Revision)
return client.AdmissionregistrationV1().ValidatingWebhookConfigurations().List(context.TODO(), opts)
},
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
opts.LabelSelector = fmt.Sprintf("%s=%s", label.IoIstioRev.Name, o.Revision)
return client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Watch(context.TODO(), opts)
},
},
&kubeApiAdmission.ValidatingWebhookConfiguration{}, 0, cache.Indexers{},
)
webhookInformer.AddEventHandler(makeHandler(c.queue, configGVK))
c.webhookInformer = webhookInformer
return c
}
func (c *Controller) Run(stop <-chan struct{}) {
defer c.queue.ShutDown()
go c.webhookInformer.Run(stop)
if !cache.WaitForCacheSync(stop, c.webhookInformer.HasSynced) {
return
}
go c.startCaBundleWatcher(stop)
go c.runWorker()
<-stop
}
// startCaBundleWatcher listens for updates to the CA bundle and patches the webhooks.
// shouldn't we be doing this for both validating and mutating webhooks...?
func (c *Controller) startCaBundleWatcher(stop <-chan struct{}) {
if c.o.CABundleWatcher == nil {
return
}
id, watchCh := c.o.CABundleWatcher.AddWatcher()
defer c.o.CABundleWatcher.RemoveWatcher(id)
// trigger initial update
watchCh <- struct{}{}
for {
select {
case <-watchCh:
c.queue.AddRateLimited(reconcileRequest{
updateEvent,
"CA bundle update",
"",
})
case <-stop:
return
}
}
}
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() (cont bool) {
obj, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(obj)
req, ok := obj.(reconcileRequest)
if !ok {
// don't retry an invalid reconcileRequest item
c.queue.Forget(req)
return true
}
// empty webhook name means we must patch for each webhook
if req.webhookName == "" {
c.updateAll()
return true
}
if retry, err := c.reconcileRequest(req); retry || err != nil {
c.queue.AddRateLimited(reconcileRequest{
event: retryEvent,
description: "retry reconcile request",
webhookName: req.webhookName,
})
utilruntime.HandleError(err)
} else {
c.queue.Forget(obj)
}
return true
}
// updateAll updates all webhooks matching the controller's revision, generally in response
// to a CA bundle update. updateAll reports an error only when there's an issue listing webhooks,
// not when it has an issue updating a single webhook so that we can retry separately for the
// two cases.
func (c *Controller) updateAll() {
whs := c.webhookInformer.GetStore().List()
for _, item := range whs {
wh := item.(*kubeApiAdmission.ValidatingWebhookConfiguration)
if retry, err := c.reconcileRequest(reconcileRequest{
event: updateEvent,
description: "CA bundle update",
webhookName: wh.Name,
}); retry || err != nil {
c.queue.AddRateLimited(reconcileRequest{
event: retryEvent,
description: "retry reconcile request",
webhookName: wh.Name,
})
}
}
}
// reconcile the desired state with the kube-apiserver.
// the returned results indicate if the reconciliation should be retried and/or
// if there was an error.
func (c *Controller) reconcileRequest(req reconcileRequest) (bool, error) {
// Stop early if webhook is not present, rather than attempting (and failing) to reconcile permanently
// If the webhook is later added a new reconciliation request will trigger it to update
configuration, err := c.client.AdmissionregistrationV1().ValidatingWebhookConfigurations().Get(context.Background(), req.webhookName, metav1.GetOptions{})
if err != nil {
if kubeErrors.IsNotFound(err) {
scope.Infof("Skip patching webhook, webhook %q not found", req.webhookName)
return false, nil
}
return false, err
}
scope.Debugf("Reconcile(enter): %v", req)
defer func() { scope.Debugf("Reconcile(exit)") }()
caBundle, err := util.LoadCABundle(c.o.CABundleWatcher)
if err != nil {
scope.Errorf("Failed to load CA bundle: %v", err)
reportValidationConfigLoadError(err.(*util.ConfigError).Reason())
// no point in retrying unless cert file changes.
return false, nil
}
failurePolicy := kubeApiAdmission.Ignore
ready := c.readyForFailClose()
if ready {
failurePolicy = kubeApiAdmission.Fail
}
return !ready, c.updateValidatingWebhookConfiguration(configuration, caBundle, failurePolicy)
}
func (c *Controller) readyForFailClose() bool {
if !c.dryRunOfInvalidConfigRejected {
if rejected, reason := c.isDryRunOfInvalidConfigRejected(); !rejected {
scope.Infof("Not ready to switch validation to fail-closed: %v", reason)
return false
}
scope.Info("Endpoint successfully rejected invalid config. Switching to fail-close.")
c.dryRunOfInvalidConfigRejected = true
}
return true
}
const (
deniedRequestMessageFragment = `denied the request`
missingResourceMessageFragment = `the server could not find the requested resource`
unsupportedDryRunMessageFragment = `does not support dry run`
)
// Confirm invalid configuration is successfully rejected before switching to FAIL-CLOSE.
func (c *Controller) isDryRunOfInvalidConfigRejected() (rejected bool, reason string) {
invalidGateway := &v1alpha3.Gateway{
ObjectMeta: metav1.ObjectMeta{
Name: "invalid-gateway",
Namespace: c.o.WatchedNamespace,
// Must ensure that this is the revision validating the known-bad config
Labels: map[string]string{
label.IoIstioRev.Name: c.o.Revision,
},
},
Spec: networking.Gateway{},
}
createOptions := metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}}
istioClient := c.client.Istio().NetworkingV1alpha3()
_, err := istioClient.Gateways(c.o.WatchedNamespace).Create(context.TODO(), invalidGateway, createOptions)
if kubeErrors.IsAlreadyExists(err) {
updateOptions := metav1.UpdateOptions{DryRun: []string{metav1.DryRunAll}}
_, err = istioClient.Gateways(c.o.WatchedNamespace).Update(context.TODO(), invalidGateway, updateOptions)
}
if err == nil {
return false, "dummy invalid config not rejected"
}
// We expect to get deniedRequestMessageFragment (the config was rejected, as expected)
if strings.Contains(err.Error(), deniedRequestMessageFragment) {
return true, ""
}
// If the CRD does not exist, we will get this error. This is to handle when Pilot is run
// without CRDs - in this case, this check will not be possible.
if strings.Contains(err.Error(), missingResourceMessageFragment) {
scope.Warnf("Missing Gateway CRD, cannot perform validation check. Assuming validation is ready")
return true, ""
}
// If some validating webhooks does not support dryRun(sideEffects=Unknown or Some), we will get this error.
// We should assume valdiation is ready because there is no point in retrying this request.
if strings.Contains(err.Error(), unsupportedDryRunMessageFragment) {
scope.Warnf("One of the validating webhooks does not support DryRun, cannot perform validation check. Assuming validation is ready. Details: %v", err)
return true, ""
}
return false, fmt.Sprintf("dummy invalid rejected for the wrong reason: %v", err)
}
func (c *Controller) updateValidatingWebhookConfiguration(current *kubeApiAdmission.ValidatingWebhookConfiguration,
caBundle []byte, failurePolicy kubeApiAdmission.FailurePolicyType) error {
dirty := false
for i := range current.Webhooks {
if !bytes.Equal(current.Webhooks[i].ClientConfig.CABundle, caBundle) ||
(current.Webhooks[i].FailurePolicy != nil && *current.Webhooks[i].FailurePolicy != failurePolicy) {
dirty = true
break
}
}
if !dirty {
scope.Infof("validatingwebhookconfiguration %v (failurePolicy=%v, resourceVersion=%v) is up-to-date. No change required.",
current.Name, failurePolicy, current.ResourceVersion)
return nil
}
updated := current.DeepCopy()
for i := range updated.Webhooks {
updated.Webhooks[i].ClientConfig.CABundle = caBundle
updated.Webhooks[i].FailurePolicy = &failurePolicy
}
latest, err := c.client.AdmissionregistrationV1().
ValidatingWebhookConfigurations().Update(context.TODO(), updated, metav1.UpdateOptions{})
if err != nil {
scope.Errorf("Failed to update validatingwebhookconfiguration %v (failurePolicy=%v, resourceVersion=%v): %v",
updated.Name, failurePolicy, updated.ResourceVersion, err)
reportValidationConfigUpdateError(kubeErrors.ReasonForError(err))
return err
}
scope.Infof("Successfully updated validatingwebhookconfiguration %v (failurePolicy=%v,resourceVersion=%v)",
updated.Name, failurePolicy, latest.ResourceVersion)
reportValidationConfigUpdate()
return nil
}
var (
codec runtime.Codec
scheme *runtime.Scheme
)
func init() {
scheme = runtime.NewScheme()
utilruntime.Must(kubeApiAdmission.AddToScheme(scheme))
opt := json.SerializerOptions{Yaml: true}
yamlSerializer := json.NewSerializerWithOptions(json.DefaultMetaFactory, scheme, scheme, opt)
codec = versioning.NewDefaultingCodecForScheme(
scheme,
yamlSerializer,
yamlSerializer,
kubeApiAdmission.SchemeGroupVersion,
runtime.InternalGroupVersioner,
)
}