blob: 760636342b5b4c0a64c1909f18e824fa19f06bdf [file] [log] [blame]
/*
* Copyright 2022 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controllers
import (
"context"
"fmt"
"github.com/kiegroup/container-builder/api"
"github.com/kiegroup/container-builder/util/log"
apiv08 "github.com/kiegroup/kogito-serverless-operator/api/v1alpha08"
"github.com/kiegroup/kogito-serverless-operator/builder"
"github.com/kiegroup/kogito-serverless-operator/platform"
"github.com/kiegroup/kogito-serverless-operator/utils"
"github.com/kiegroup/kogito-serverless-operator/utils/kubernetes"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"time"
)
// KogitoServerlessWorkflowReconciler reconciles a KogitoServerlessWorkflow object
type KogitoServerlessWorkflowReconciler struct {
Client client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
}
//+kubebuilder:rbac:groups=sw.kogito.kie.org,resources=kogitoserverlessworkflows,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=sw.kogito.kie.org,resources=kogitoserverlessworkflows/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=sw.kogito.kie.org,resources=kogitoserverlessworkflows/finalizers,verbs=update
//+kubebuilder:rbac:groups=sw.kogito.kie.org,resources=pods,verbs=get;watch;list
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// the KogitoServerlessWorkflow object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.2/pkg/reconcile
func (r *KogitoServerlessWorkflowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrllog.FromContext(ctx)
// Make sure the operator is allowed to act on namespace
if ok, err := platform.IsOperatorAllowedOnNamespace(ctx, r.Client, req.Namespace); err != nil {
return reconcile.Result{}, err
} else if !ok {
log.Info(fmt.Sprintf("Ignoring request because the operator hasn't got the permissions to work on namespace %s", req.Namespace))
return reconcile.Result{}, nil
}
// Fetch the Workflow instance
workflow := &apiv08.KogitoServerlessWorkflow{}
err := r.Client.Get(ctx, req.NamespacedName, workflow)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
log.Error(err, "Failed to get KogitoServerlessWorkflow")
return ctrl.Result{}, err
}
// Only process resources assigned to the operator
if !platform.IsOperatorHandlerConsideringLock(ctx, r.Client, req.Namespace, workflow) {
log.Info("Ignoring request because resource is not assigned to current operator")
return reconcile.Result{}, nil
}
switch workflow.Status.Condition {
// If the status condition is None or Waiting for platform let's try to start a build!
case apiv08.NoneConditionType, apiv08.WaitingForPlatformConditionType:
_, err := platform.GetActivePlatform(ctx, r.Client, req.Namespace)
if err != nil {
log.Error(err, "No active Platform for namespace %s so the workflow cannot be built. Waiting for an active platform")
workflow.Status.Condition = apiv08.WaitingForPlatformConditionType
_, err = r.performStatusUpdate(ctx, workflow)
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
// If there is an active platform we have got all the information to build but...
// ...let's check before if we have got already a build!
build := &apiv08.KogitoServerlessBuild{}
err = r.Client.Get(ctx, req.NamespacedName, build)
if err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{}, err
}
//If there isn't a build let's create and start the first one!
buildable := builder.NewBuildable(r.Client, ctx)
_, err = buildable.HandleWorkflowBuild(workflow.Name, req)
} else {
//If there is a build, let's ask to restart it
build.Status.BuildPhase = api.BuildPhaseNone
build.Status.Builder.Status = api.BuildStatus{}
if err = r.Client.Status().Update(ctx, build); err != nil {
log.Error(err, fmt.Sprintf("Failed to update Build status for Workflow %s", workflow.Name))
return ctrl.Result{}, err
}
}
workflow.Status.Condition = apiv08.BuildingConditionType
_, err = r.performStatusUpdate(ctx, workflow)
return ctrl.Result{}, err
// If the status condition is Building let's check if the build is finished!
case apiv08.BuildingConditionType, apiv08.RunningConditionType:
// Let's retrieve the build to check the status
build := &apiv08.KogitoServerlessBuild{}
err = r.Client.Get(ctx, req.NamespacedName, build)
if err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{}, err
}
log.Error(err, "Build not found for this workflow", "Workflow", req.Name)
return ctrl.Result{}, nil
}
if build.Status.Builder.Status.Phase == api.BuildPhaseSucceeded && workflow.Status.Condition != apiv08.RunningConditionType {
//If we have finished a build and the workflow is not running, we will start the provisioning phase
workflow.Status.Condition = apiv08.ProvisioningConditionType
r.performStatusUpdate(ctx, workflow)
return ctrl.Result{}, err
} else if (build.Status.Builder.Status.Phase == api.BuildPhaseFailed || build.Status.Builder.Status.Phase == api.BuildPhaseError) && workflow.Status.Condition != apiv08.RunningConditionType {
log.Info(fmt.Sprintf("Workflow %s build is failed!", workflow.Name))
workflow.Status.Condition = apiv08.FailedConditionType
r.performStatusUpdate(ctx, workflow)
} else if (build.Status.Builder.Status.Phase == api.BuildPhaseSucceeded || build.Status.Builder.Status.Phase == api.BuildPhaseFailed || build.Status.Builder.Status.Phase == api.BuildPhaseError) && workflow.Status.Condition == apiv08.RunningConditionType {
//If we have finished a build and the workflow is running, we have to rebuild it because there was a change in the workflow definition and requeue the request
if !utils.Compare(utils.GetWorkflowSpecHash(workflow.Status.Applied), utils.GetWorkflowSpecHash(workflow.Spec)) { // Let's check that the 2 workflow definition are different
workflow.Status.Condition = apiv08.NoneConditionType
r.performStatusUpdate(ctx, workflow)
return ctrl.Result{Requeue: true}, err
}
} else {
if workflow.Status.Condition != apiv08.BuildingConditionType {
workflow.Status.Condition = apiv08.BuildingConditionType
r.performStatusUpdate(ctx, workflow)
}
return ctrl.Result{}, err
}
// If the status condition is Deploying let's check if the deployment is finished!
case apiv08.ProvisioningConditionType, apiv08.DeployingConditionType:
pl, err := platform.GetActivePlatform(ctx, r.Client, req.Namespace)
if err != nil {
log.Error(err, "No active Platform for namespace %s so the workflow cannot be deployed. Waiting for an active platform")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
return r.manageBasicWorkflowDeployment(ctx, workflow, pl)
default:
log.Info(fmt.Sprintf("Workflow %s is in status %s but at the moment we are not supporting it!", workflow.Name, workflow.Status.Condition))
return ctrl.Result{}, err
}
return ctrl.Result{}, err
}
func (r *KogitoServerlessWorkflowReconciler) performStatusUpdate(ctx context.Context, workflow *apiv08.KogitoServerlessWorkflow) (bool, error) {
var err error
workflow.Status.Applied = workflow.Spec
if err = r.Client.Status().Update(ctx, workflow); err != nil {
log.Error(err, "Failed to update Workflow status")
return false, err
}
return true, err
}
func (r *KogitoServerlessWorkflowReconciler) manageBasicWorkflowDeployment(ctx context.Context, workflow *apiv08.KogitoServerlessWorkflow, platform *apiv08.KogitoServerlessPlatform) (reconcile.Result, error) {
// Check if this Deployment already exists
found := &appsv1.Deployment{}
err := r.Client.Get(ctx, types.NamespacedName{Name: workflow.Name, Namespace: workflow.Namespace}, found)
var result *reconcile.Result
result, err = kubernetes.EnsureDeployment(ctx, r.Client, r.Scheme, workflow, platform.Spec.BuildPlatform.Registry.Address)
if result != nil {
log.Error(err, "Deployment Not ready")
if workflow.Status.Condition != apiv08.DeployingConditionType {
workflow.Status.Condition = apiv08.DeployingConditionType
r.performStatusUpdate(ctx, workflow)
}
result.RequeueAfter = 5 * time.Second
return *result, err
}
// Check if this Service already exists
result, err = kubernetes.EnsureService(r.Client, r.Scheme, workflow)
if result != nil {
log.Error(err, "Service Not ready")
if workflow.Status.Condition != apiv08.DeployingConditionType {
workflow.Status.Condition = apiv08.DeployingConditionType
r.performStatusUpdate(ctx, workflow)
}
result.RequeueAfter = 5 * time.Second
return *result, err
}
// Deployment and Service already exists - don't requeue
log.Info("Skip reconcile: Deployment and service already exists",
"Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
//We can now update the workflow status to running
if workflow.Status.Condition != apiv08.RunningConditionType {
workflow.Status.Condition = apiv08.RunningConditionType
r.performStatusUpdate(ctx, workflow)
return reconcile.Result{Requeue: false}, err
}
return reconcile.Result{Requeue: false}, err
}
func buildEnqueueRequestsFromMapFunc(c client.Client, build *apiv08.KogitoServerlessBuild) []reconcile.Request {
var requests []reconcile.Request
if build.Status.BuildPhase != api.BuildPhaseSucceeded && build.Status.BuildPhase != api.BuildPhaseError {
return requests
}
list := &apiv08.KogitoServerlessWorkflowList{}
// Do global search in case of global operator (it may be using a global platform)
var opts []client.ListOption
if !platform.IsCurrentOperatorGlobal() {
opts = append(opts, client.InNamespace(build.Namespace))
}
if err := c.List(context.Background(), list, opts...); err != nil {
log.Error(err, "Failed to retrieve workflow list")
return requests
}
for i := range list.Items {
workflow := &list.Items[i]
match, err := utils.SameOrMatch(build, workflow)
if err != nil {
log.Errorf(err, "Error matching workflow %q with build %q", workflow.Name, build.Name)
continue
}
if !match {
continue
}
if workflow.Status.Condition == apiv08.BuildingConditionType || workflow.Status.Condition == apiv08.RunningConditionType {
log.Infof("Build %s ready, notify workflow: %s in condition %s", build.Name, workflow.Name, workflow.Status.Condition)
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: workflow.Namespace,
Name: workflow.Name,
},
})
}
}
return requests
}
func platformEnqueueRequestsFromMapFunc(c client.Client, p *apiv08.KogitoServerlessPlatform) []reconcile.Request {
var requests []reconcile.Request
if p.Status.Phase == apiv08.PlatformPhaseReady {
list := &apiv08.KogitoServerlessWorkflowList{}
// Do global search in case of global operator (it may be using a global platform)
var opts []client.ListOption
if !platform.IsCurrentOperatorGlobal() {
opts = append(opts, client.InNamespace(p.Namespace))
}
if err := c.List(context.Background(), list, opts...); err != nil {
log.Error(err, "Failed to list workflows")
return requests
}
for _, workflow := range list.Items {
if workflow.Status.Condition == apiv08.WaitingForPlatformConditionType {
log.Infof("Platform %s ready, wake-up workflow: %s", p.Name, workflow.Name)
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: workflow.Namespace,
Name: workflow.Name,
},
})
}
}
}
return requests
}
// SetupWithManager sets up the controller with the Manager.
func (r *KogitoServerlessWorkflowReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&apiv08.KogitoServerlessWorkflow{}).
Watches(&source.Kind{Type: &apiv08.KogitoServerlessBuild{}}, handler.EnqueueRequestsFromMapFunc(func(c client.Object) []reconcile.Request {
build, ok := c.(*apiv08.KogitoServerlessBuild)
if !ok {
log.Error(fmt.Errorf("type assertion failed: %v", c), "Failed to retrieve workflow list")
return []reconcile.Request{}
}
return buildEnqueueRequestsFromMapFunc(mgr.GetClient(), build)
})).
Watches(&source.Kind{Type: &apiv08.KogitoServerlessPlatform{}}, handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request {
platform, ok := a.(*apiv08.KogitoServerlessPlatform)
if !ok {
log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve workflow list")
return []reconcile.Request{}
}
return platformEnqueueRequestsFromMapFunc(mgr.GetClient(), platform)
})).
Complete(r)
}