blob: 336f17e64295c6d8995e3f4730659d0aad01921a [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package webhooks
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
)
import (
"istio.io/api/label"
"istio.io/pkg/log"
v1 "k8s.io/api/admissionregistration/v1"
kubeErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
admissioninformer "k8s.io/client-go/informers/admissionregistration/v1"
"k8s.io/client-go/kubernetes"
admissionregistrationv1client "k8s.io/client-go/kubernetes/typed/admissionregistration/v1"
"k8s.io/client-go/tools/cache"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/keycertbundle"
kubelib "github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/kube/controllers"
"github.com/apache/dubbo-go-pixiu/pkg/webhooks/util"
)
var (
errWrongRevision = errors.New("webhook does not belong to target revision")
errNotFound = errors.New("webhook not found")
errNoWebhookWithName = errors.New("webhook configuration did not contain webhook with target name")
)
// WebhookCertPatcher listens for webhooks on specified revision and patches their CA bundles
type WebhookCertPatcher struct {
client kubernetes.Interface
// revision to patch webhooks for
revision string
webhookName string
queue controllers.Queue
// File path to the x509 certificate bundle used by the webhook server
// and patched into the webhook config.
CABundleWatcher *keycertbundle.Watcher
informer cache.SharedIndexInformer
}
// NewWebhookCertPatcher creates a WebhookCertPatcher
func NewWebhookCertPatcher(
client kubelib.Client,
revision, webhookName string, caBundleWatcher *keycertbundle.Watcher) (*WebhookCertPatcher, error) {
p := &WebhookCertPatcher{
client: client,
revision: revision,
webhookName: webhookName,
CABundleWatcher: caBundleWatcher,
}
p.queue = controllers.NewQueue("webhook patcher",
controllers.WithReconciler(p.webhookPatchTask),
controllers.WithMaxAttempts(5))
informer := admissioninformer.NewFilteredMutatingWebhookConfigurationInformer(client, 0, cache.Indexers{}, func(options *metav1.ListOptions) {
options.LabelSelector = fmt.Sprintf("%s=%s", label.IoIstioRev.Name, revision)
})
p.informer = informer
informer.AddEventHandler(controllers.ObjectHandler(p.queue.AddObject))
return p, nil
}
// Run runs the WebhookCertPatcher
func (w *WebhookCertPatcher) Run(stopChan <-chan struct{}) {
go w.informer.Run(stopChan)
go w.startCaBundleWatcher(stopChan)
w.queue.Run(stopChan)
}
func (w *WebhookCertPatcher) HasSynced() bool {
return w.informer.HasSynced() && w.queue.HasSynced()
}
// webhookPatchTask takes the result of patchMutatingWebhookConfig and modifies the result for use in task queue
func (w *WebhookCertPatcher) webhookPatchTask(o types.NamespacedName) error {
reportWebhookPatchAttempts(o.Name)
err := w.patchMutatingWebhookConfig(
w.client.AdmissionregistrationV1().MutatingWebhookConfigurations(),
o.Name)
// do not want to retry the task if these errors occur, they indicate that
// we should no longer be patching the given webhook
if kubeErrors.IsNotFound(err) || errors.Is(err, errWrongRevision) || errors.Is(err, errNoWebhookWithName) || errors.Is(err, errNotFound) {
return nil
}
if err != nil {
log.Errorf("patching webhook %s failed: %v", o.Name, err)
reportWebhookPatchRetry(o.Name)
}
return err
}
// patchMutatingWebhookConfig takes a webhookConfigName and patches the CA bundle for that webhook configuration
func (w *WebhookCertPatcher) patchMutatingWebhookConfig(
client admissionregistrationv1client.MutatingWebhookConfigurationInterface,
webhookConfigName string) error {
raw, _, err := w.informer.GetIndexer().GetByKey(webhookConfigName)
if raw == nil || err != nil {
reportWebhookPatchFailure(webhookConfigName, reasonWebhookConfigNotFound)
return errNotFound
}
config := raw.(*v1.MutatingWebhookConfiguration)
// prevents a race condition between multiple istiods when the revision is changed or modified
v, ok := config.Labels[label.IoIstioRev.Name]
if v != w.revision || !ok {
reportWebhookPatchFailure(webhookConfigName, reasonWrongRevision)
return errWrongRevision
}
found := false
updated := false
caCertPem, err := util.LoadCABundle(w.CABundleWatcher)
if err != nil {
log.Errorf("Failed to load CA bundle: %v", err)
reportWebhookPatchFailure(webhookConfigName, reasonLoadCABundleFailure)
return err
}
for i, wh := range config.Webhooks {
if strings.HasSuffix(wh.Name, w.webhookName) {
if !bytes.Equal(caCertPem, config.Webhooks[i].ClientConfig.CABundle) {
updated = true
}
config.Webhooks[i].ClientConfig.CABundle = caCertPem
found = true
}
}
if !found {
reportWebhookPatchFailure(webhookConfigName, reasonWebhookEntryNotFound)
return errNoWebhookWithName
}
if updated {
_, err = client.Update(context.Background(), config, metav1.UpdateOptions{})
if err != nil {
reportWebhookPatchFailure(webhookConfigName, reasonWebhookUpdateFailure)
}
}
return err
}
// startCaBundleWatcher listens for updates to the CA bundle and patches the webhooks.
func (w *WebhookCertPatcher) startCaBundleWatcher(stop <-chan struct{}) {
id, watchCh := w.CABundleWatcher.AddWatcher()
defer w.CABundleWatcher.RemoveWatcher(id)
for {
select {
case <-watchCh:
lists := w.informer.GetStore().List()
for _, list := range lists {
mutatingWebhookConfig := list.(*v1.MutatingWebhookConfiguration)
if mutatingWebhookConfig == nil {
continue
}
log.Debugf("updating caBundle for webhook %q", mutatingWebhookConfig.Name)
w.queue.Add(types.NamespacedName{Name: mutatingWebhookConfig.Name})
}
case <-stop:
return
}
}
}