blob: 9c8380ba78b453aab68fe0a8b87dc38d191ae90a [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package trait
import (
"context"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/disk"
"k8s.io/client-go/discovery/cached/memory"
"sigs.k8s.io/controller-runtime/pkg/client"
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
util "github.com/apache/camel-k/pkg/util/controller"
)
var (
toFileName = regexp.MustCompile(`[^(\w/\.)]`)
diskCachedDiscoveryClient discovery.CachedDiscoveryInterface
memoryCachedDiscoveryClient discovery.CachedDiscoveryInterface
discoveryClientLock sync.Mutex
)
type discoveryCacheType string
const (
disabledDiscoveryCache discoveryCacheType = "disabled"
diskDiscoveryCache discoveryCacheType = "disk"
memoryDiscoveryCache discoveryCacheType = "memory"
)
// The GC Trait garbage-collects all resources that are no longer necessary upon integration updates.
//
// +camel-k:trait=gc
type garbageCollectorTrait struct {
BaseTrait `property:",squash"`
// Discovery client cache to be used, either `disabled`, `disk` or `memory` (default `memory`)
DiscoveryCache *discoveryCacheType `property:"discovery-cache" json:"discoveryCache,omitempty"`
}
func newGarbageCollectorTrait() Trait {
return &garbageCollectorTrait{
BaseTrait: NewBaseTrait("gc", 1200),
}
}
func (t *garbageCollectorTrait) Configure(e *Environment) (bool, error) {
if t.Enabled != nil && !*t.Enabled {
return false, nil
}
if t.DiscoveryCache == nil {
s := memoryDiscoveryCache
t.DiscoveryCache = &s
}
return e.IntegrationInPhase(
v1.IntegrationPhaseInitialization,
v1.IntegrationPhaseDeploying,
v1.IntegrationPhaseRunning),
nil
}
func (t *garbageCollectorTrait) Apply(e *Environment) error {
switch e.Integration.Status.Phase {
case v1.IntegrationPhaseRunning:
// Register a post action that deletes the existing resources that are labelled
// with the previous integration generations.
// TODO: this should be refined so that it's run when all the replicas for the newer generation
// are ready. This is to be added when the integration scale status is refined with ready replicas
e.PostActions = append(e.PostActions, func(env *Environment) error {
// The collection and deletion are performed asynchronously to avoid blocking
// the reconcile loop.
go t.garbageCollectResources(env)
return nil
})
fallthrough
default:
// Register a post processor that adds the required labels to the new resources
e.PostProcessors = append(e.PostProcessors, func(env *Environment) error {
generation := strconv.FormatInt(env.Integration.GetGeneration(), 10)
env.Resources.VisitMetaObject(func(resource metav1.Object) {
labels := resource.GetLabels()
// Label the resource with the current integration generation
labels["camel.apache.org/generation"] = generation
// Make sure the integration label is set
labels[v1.IntegrationLabel] = env.Integration.Name
resource.SetLabels(labels)
})
return nil
})
}
return nil
}
func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) {
integration, _ := labels.NewRequirement(v1.IntegrationLabel, selection.Equals, []string{e.Integration.Name})
generation, err := labels.NewRequirement("camel.apache.org/generation", selection.LessThan, []string{strconv.FormatInt(e.Integration.GetGeneration(), 10)})
if err != nil {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot determine generation requirement")
return
}
selector := labels.NewSelector().
Add(*integration).
Add(*generation)
deletableGVKs, err := t.getDeletableTypes(e)
if err != nil {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot discover GVK types")
return
}
t.deleteEachOf(deletableGVKs, e, selector)
}
func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) {
for gvk := range gvks {
resources := unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": gvk.GroupVersion().String(),
"kind": gvk.Kind,
},
}
options := []client.ListOption{
client.InNamespace(e.Integration.Namespace),
util.MatchingSelector{Selector: selector},
}
if err := t.Client.List(context.TODO(), &resources, options...); err != nil {
if !k8serrors.IsNotFound(err) && !k8serrors.IsForbidden(err) {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot list child resources: %v", gvk)
}
continue
}
for _, resource := range resources.Items {
r := resource
if !t.canBeDeleted(e, r) {
continue
}
err := t.Client.Delete(context.TODO(), &r, client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil {
// The resource may have already been deleted
if !k8serrors.IsNotFound(err) {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot delete child resource: %s/%s", resource.GetKind(), resource.GetName())
}
} else {
t.L.ForIntegration(e.Integration).Debugf("child resource deleted: %s/%s", resource.GetKind(), resource.GetName())
}
}
}
}
func (t *garbageCollectorTrait) canBeDeleted(e *Environment, u unstructured.Unstructured) bool {
// Only delete direct children of the integration, otherwise we can affect the behavior of external controllers (i.e. Knative)
for _, o := range u.GetOwnerReferences() {
if o.Kind == v1.IntegrationKind && strings.HasPrefix(o.APIVersion, v1.SchemeGroupVersion.Group) && o.Name == e.Integration.Name {
return true
}
}
return false
}
func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.GroupVersionKind]struct{}, error) {
// We rely on the discovery API to retrieve all the resources GVK,
// that results in an unbounded set that can impact garbage collection latency when scaling up.
discoveryClient, err := t.discoveryClient(e)
if err != nil {
return nil, err
}
resources, err := discoveryClient.ServerPreferredNamespacedResources()
// Swallow group discovery errors, e.g., Knative serving exposes
// an aggregated API for custom.metrics.k8s.io that requires special
// authentication scheme while discovering preferred resources
if err != nil && !discovery.IsGroupDiscoveryFailedError(err) {
return nil, err
}
// We only take types that support the "delete" verb,
// to prevents from performing queries that we know are going to return "MethodNotAllowed".
return groupVersionKinds(discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)),
nil
}
func groupVersionKinds(rls []*metav1.APIResourceList) map[schema.GroupVersionKind]struct{} {
GVKs := map[schema.GroupVersionKind]struct{}{}
for _, rl := range rls {
for _, r := range rl.APIResources {
GVKs[schema.FromAPIVersionAndKind(rl.GroupVersion, r.Kind)] = struct{}{}
}
}
return GVKs
}
func (t *garbageCollectorTrait) discoveryClient(e *Environment) (discovery.DiscoveryInterface, error) {
discoveryClientLock.Lock()
defer discoveryClientLock.Unlock()
switch *t.DiscoveryCache {
case diskDiscoveryCache:
if diskCachedDiscoveryClient != nil {
return diskCachedDiscoveryClient, nil
}
config := t.Client.GetConfig()
httpCacheDir := filepath.Join(mustHomeDir(), ".kube", "http-cache")
diskCacheDir := filepath.Join(mustHomeDir(), ".kube", "cache", "discovery", toHostDir(config.Host))
var err error
diskCachedDiscoveryClient, err = disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 10*time.Minute)
return diskCachedDiscoveryClient, err
case memoryDiscoveryCache:
if memoryCachedDiscoveryClient != nil {
return memoryCachedDiscoveryClient, nil
}
memoryCachedDiscoveryClient = memory.NewMemCacheClient(t.Client.Discovery())
return memoryCachedDiscoveryClient, nil
case disabledDiscoveryCache, "":
return t.Client.Discovery(), nil
default:
t.L.ForIntegration(e.Integration).Infof("unsupported discovery cache type: %s", *t.DiscoveryCache)
return t.Client.Discovery(), nil
}
}