blob: 69b636dc6b957e6f5f63c99aa7b7299443596257 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package platform
import (
"context"
"fmt"
"os"
"strings"
"k8s.io/klog/v2"
coordination "k8s.io/api/coordination/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
)
const (
// DefaultPlatformName is the standard name used for the platform.
DefaultPlatformName = "kogito-serverless-platform"
OperatorWatchNamespaceEnvVariable = "WATCH_NAMESPACE"
operatorNamespaceEnvVariable = "NAMESPACE"
)
// Copied from https://github.com/kubernetes/enhancements/tree/master/keps/sig-cluster-lifecycle/generic/1755-communicating-a-local-registry
// LocalRegistryHostingV1 describes a local registry that developer tools can
// connect to. A local registry allows clients to load images into the local
// cluster by pushing to this registry.
type LocalRegistryHostingV1 struct {
// Host documents the host (hostname and port) of the registry, as seen from
// outside the cluster.
//
// This is the registry host that tools outside the cluster should push images
// to.
Host string `yaml:"host,omitempty"`
// HostFromClusterNetwork documents the host (hostname and port) of the
// registry, as seen from networking inside the container pods.
//
// This is the registry host that tools running on pods inside the cluster
// should push images to. If not set, then tools inside the cluster should
// assume the local registry is not available to them.
HostFromClusterNetwork string `yaml:"hostFromClusterNetwork,omitempty"`
// HostFromContainerRuntime documents the host (hostname and port) of the
// registry, as seen from the cluster's container runtime.
//
// When tools apply Kubernetes objects to the cluster, this host should be
// used for image name fields. If not set, users of this field should use the
// value of Host instead.
//
// Note that it doesn't make sense semantically to define this field, but not
// define Host or HostFromClusterNetwork. That would imply a way to pull
// images without a way to push images.
HostFromContainerRuntime string `yaml:"hostFromContainerRuntime,omitempty"`
// Help contains a URL pointing to documentation for users on how to set
// up and configure a local registry.
//
// Tools can use this to nudge users to enable the registry. When possible,
// the writer should use as permanent a URL as possible to prevent drift
// (e.g., a version control SHA).
//
// When image pushes to a registry host specified in one of the other fields
// fail, the tool should display this help URL to the user. The help URL
// should contain instructions on how to diagnose broken or misconfigured
// registries.
Help string `yaml:"help,omitempty"`
}
const OperatorLockName = "kogito-serverless-lock"
// IsCurrentOperatorGlobal returns true if the operator is configured to watch all namespaces.
func IsCurrentOperatorGlobal() bool {
if watchNamespace, envSet := os.LookupEnv(OperatorWatchNamespaceEnvVariable); !envSet || strings.TrimSpace(watchNamespace) == "" {
return true
}
return false
}
// GetOperatorNamespace returns the namespace where the current operator is located (if set).
func GetOperatorNamespace() string {
if podNamespace, envSet := os.LookupEnv(operatorNamespaceEnvVariable); envSet {
return podNamespace
}
return ""
}
// GetOperatorLockName returns the name of the lock lease that is electing a leader on the particular namepsace.
func GetOperatorLockName(operatorID string) string {
return fmt.Sprintf("%s-lock", operatorID)
}
// GetActivePlatform returns the currently installed active platform in the local namespace.
func GetActivePlatform(ctx context.Context, c ctrl.Client, namespace string) (*operatorapi.SonataFlowPlatform, error) {
return getLocalPlatform(ctx, c, namespace, true)
}
// getLocalPlatform returns the currently installed platform or any platform existing in local namespace.
func getLocalPlatform(ctx context.Context, c ctrl.Client, namespace string, active bool) (*operatorapi.SonataFlowPlatform, error) {
klog.V(log.D).InfoS("Finding available platforms")
lst, err := listPrimaryPlatforms(ctx, c, namespace)
if err != nil {
return nil, err
}
for _, p := range lst.Items {
platform := p // pin
if IsActive(&platform) {
klog.V(log.D).InfoS("Found active local build platform", "platform", platform.Name)
return &platform, nil
}
}
if !active && len(lst.Items) > 0 {
// does not require the platform to be active, just return one if present
res := lst.Items[0]
klog.V(log.D).InfoS("Found local build platform", "platform", res.Name)
return &res, nil
}
klog.V(log.I).InfoS("Not found a local build platform", "Namespace", namespace)
klog.V(log.I).InfoS("Creating a default SonataFlowPlatform", "Namespace", namespace)
sfp := newDefaultSonataFlowPlatform(namespace)
if err = c.Create(ctx, sfp); err != nil {
return nil, err
}
return sfp, nil
}
// listPrimaryPlatforms returns all non-secondary platforms installed in a given namespace (only one will be active).
func listPrimaryPlatforms(ctx context.Context, c ctrl.Reader, namespace string) (*operatorapi.SonataFlowPlatformList, error) {
lst, err := listAllPlatforms(ctx, c, namespace)
if err != nil {
return nil, err
}
filtered := &operatorapi.SonataFlowPlatformList{}
for i := range lst.Items {
pl := lst.Items[i]
if !IsSecondary(&pl) {
filtered.Items = append(filtered.Items, pl)
}
}
return filtered, nil
}
// listAllPlatforms returns all platforms installed in a given namespace.
func listAllPlatforms(ctx context.Context, c ctrl.Reader, namespace string) (*operatorapi.SonataFlowPlatformList, error) {
lst := operatorapi.NewSonataFlowPlatformList()
if err := c.List(ctx, &lst, ctrl.InNamespace(namespace)); err != nil {
return nil, err
}
return &lst, nil
}
// IsActive determines if the given platform is being used.
func IsActive(p *operatorapi.SonataFlowPlatform) bool {
return !p.Status.IsDuplicated()
}
// IsSecondary determines if the given platform is marked as secondary.
func IsSecondary(p *operatorapi.SonataFlowPlatform) bool {
if l, ok := p.Annotations[metadata.SecondaryPlatformAnnotation]; ok && l == "true" {
return true
}
return false
}
// IsNamespaceLocked tells if the namespace contains a lock indicating that an operator owns it.
func IsNamespaceLocked(ctx context.Context, c ctrl.Reader, namespace string) (bool, error) {
if namespace == "" {
return false, nil
}
platforms, err := listPrimaryPlatforms(ctx, c, namespace)
if err != nil {
return true, err
}
for _, platform := range platforms.Items {
lease := coordination.Lease{}
var operatorLockName string
if platform.Name != "" {
operatorLockName = GetOperatorLockName(platform.Name)
} else {
operatorLockName = OperatorLockName
}
if err := c.Get(ctx, ctrl.ObjectKey{Namespace: namespace, Name: operatorLockName}, &lease); err == nil || !k8serrors.IsNotFound(err) {
return true, err
}
}
return false, nil
}
// IsOperatorAllowedOnNamespace returns true if the current operator is allowed to react on changes in the given namespace.
func IsOperatorAllowedOnNamespace(ctx context.Context, c ctrl.Reader, namespace string) (bool, error) {
// allow all local operators
if !IsCurrentOperatorGlobal() {
return true, nil
}
// allow global operators that use a proper operator id
if utils.OperatorID() != "" {
return true, nil
}
operatorNamespace := GetOperatorNamespace()
if operatorNamespace == namespace {
// Global operator is allowed on its own namespace
return true, nil
}
alreadyOwned, err := IsNamespaceLocked(ctx, c, namespace)
if err != nil {
return false, err
}
return !alreadyOwned, nil
}
// IsOperatorHandler Operators matching the annotation operator id are allowed to reconcile.
// For legacy resources that are missing a proper operator id annotation the default global operator or the local
// operator in this namespace are candidates for reconciliation.
func IsOperatorHandler(object ctrl.Object) bool {
if object == nil {
return true
}
resourceID := utils.GetOperatorIDAnnotation(object)
operatorID := utils.OperatorID()
// allow operator with matching id to handle the resource
if resourceID == operatorID {
return true
}
// check if we are dealing with resource that is missing a proper operator id annotation
if resourceID == "" {
// allow default global operator to handle legacy resources (missing proper operator id annotations)
if operatorID == DefaultPlatformName {
return true
}
// allow local operators to handle legacy resources (missing proper operator id annotations)
if !IsCurrentOperatorGlobal() {
return true
}
}
return false
}
// IsOperatorHandlerConsideringLock uses normal IsOperatorHandler checks and adds additional check for legacy resources
// that are missing a proper operator id annotation. In general two kind of operators race for reconcile these legacy resources.
// The local operator for this namespace and the default global operator instance. Based on the existence of a namespace
// lock the current local operator has precedence. When no lock exists the default global operator should reconcile.
func IsOperatorHandlerConsideringLock(ctx context.Context, c ctrl.Reader, namespace string, object ctrl.Object) bool {
isHandler := IsOperatorHandler(object)
if !isHandler {
return false
}
resourceID := utils.GetOperatorIDAnnotation(object)
// add additional check on resources missing an operator id
if resourceID == "" {
operatorNamespace := GetOperatorNamespace()
if operatorNamespace == namespace {
// Global operator is allowed on its own namespace
return true
}
if locked, err := IsNamespaceLocked(ctx, c, namespace); err != nil || locked {
// namespace is locked so local operators do have precedence
return !IsCurrentOperatorGlobal()
}
}
return true
}