blob: 342e81a6f302384ecf23462a435f2c624a0b1f60 [file] [log] [blame]
// Copyright 2023 Red Hat, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package profiles
import (
"context"
"path"
"time"
"k8s.io/klog/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/kiegroup/kogito-serverless-operator/workflowproj"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
"github.com/kiegroup/kogito-serverless-operator/api/metadata"
"github.com/kiegroup/kogito-serverless-operator/controllers/workflowdef"
"github.com/kiegroup/kogito-serverless-operator/log"
"github.com/kiegroup/kogito-serverless-operator/utils"
"github.com/kiegroup/kogito-serverless-operator/controllers/platform"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"github.com/kiegroup/kogito-serverless-operator/api"
operatorapi "github.com/kiegroup/kogito-serverless-operator/api/v1alpha08"
kubeutil "github.com/kiegroup/kogito-serverless-operator/utils/kubernetes"
)
const (
configMapWorkflowDefVolumeName = "workflow-definition"
configMapWorkflowDefMountPath = "/home/kogito/serverless-workflow-project/src/main/resources/workflows"
configMapResourcesVolumeName = "resources"
configMapExternalResourcesVolumeNamePrefix = configMapResourcesVolumeName + "-"
// quarkusDevConfigMountPath mount path for application properties file in the Workflow Quarkus Application
// See: https://quarkus.io/guides/config-reference#application-properties-file
quarkusDevConfigMountPath = "/home/kogito/serverless-workflow-project/src/main/resources"
requeueAfterFailure = 3 * time.Minute
requeueAfterFollowDeployment = 5 * time.Second
requeueAfterIsRunning = 1 * time.Minute
// recoverDeploymentErrorRetries how many times the operator should try to recover from a failure before giving up
recoverDeploymentErrorRetries = 3
// requeueRecoverDeploymentErrorInterval interval between recovering from failures
requeueRecoverDeploymentErrorInterval = recoverDeploymentErrorInterval * time.Minute
recoverDeploymentErrorInterval = 10
)
var _ ProfileReconciler = &developmentProfile{}
type developmentProfile struct {
baseReconciler
}
func (d developmentProfile) GetProfile() metadata.ProfileType {
return metadata.DevProfile
}
func newDevProfileReconciler(client client.Client, config *rest.Config) ProfileReconciler {
support := &stateSupport{
client: client,
}
var ensurers *devProfileObjectEnsurers
var enrichers *devProfileObjectEnrichers
if utils.IsOpenShift() {
ensurers = newDevelopmentObjectEnsurersForOpenShift(support)
enrichers = newDevelopmentObjectEnrichersForOpenShift(support)
} else {
ensurers = newDevelopmentObjectEnsurers(support)
enrichers = newDevelopmentObjectEnrichers(support)
}
stateMachine := newReconciliationStateMachine(
&ensureRunningDevWorkflowReconciliationState{stateSupport: support, ensurers: ensurers},
&followDeployDevWorkflowReconciliationState{stateSupport: support, enrichers: enrichers},
&recoverFromFailureDevReconciliationState{stateSupport: support})
profile := &developmentProfile{
baseReconciler: newBaseProfileReconciler(support, stateMachine),
}
klog.V(log.I).InfoS("Reconciling in", "profile", profile.GetProfile())
return profile
}
func newDevelopmentObjectEnsurers(support *stateSupport) *devProfileObjectEnsurers {
return &devProfileObjectEnsurers{
deployment: newDefaultObjectEnsurer(support.client, devDeploymentCreator),
service: newDefaultObjectEnsurer(support.client, devServiceCreator),
network: newDummyObjectEnsurer(),
definitionConfigMap: newDefaultObjectEnsurer(support.client, workflowDefConfigMapCreator),
propertiesConfigMap: newDefaultObjectEnsurer(support.client, workflowPropsConfigMapCreator),
}
}
func newDevelopmentObjectEnsurersForOpenShift(support *stateSupport) *devProfileObjectEnsurers {
return &devProfileObjectEnsurers{
deployment: newDefaultObjectEnsurer(support.client, devDeploymentCreator),
service: newDefaultObjectEnsurer(support.client, defaultServiceCreator),
network: newDefaultObjectEnsurer(support.client, defaultNetworkCreator),
definitionConfigMap: newDefaultObjectEnsurer(support.client, workflowDefConfigMapCreator),
propertiesConfigMap: newDefaultObjectEnsurer(support.client, workflowPropsConfigMapCreator),
}
}
func newDevelopmentObjectEnrichers(support *stateSupport) *devProfileObjectEnrichers {
return &devProfileObjectEnrichers{
networkInfo: newStatusEnricher(support.client, defaultDevStatusEnricher),
}
}
func newDevelopmentObjectEnrichersForOpenShift(support *stateSupport) *devProfileObjectEnrichers {
return &devProfileObjectEnrichers{
networkInfo: newStatusEnricher(support.client, devStatusEnricherForOpenShift),
}
}
type devProfileObjectEnsurers struct {
deployment ObjectEnsurer
service ObjectEnsurer
network ObjectEnsurer
definitionConfigMap ObjectEnsurer
propertiesConfigMap ObjectEnsurer
}
type devProfileObjectEnrichers struct {
networkInfo *statusEnricher
//Here we can add more enrichers if we need in future to enrich objects with more info coming from reconciliation
}
type ensureRunningDevWorkflowReconciliationState struct {
*stateSupport
ensurers *devProfileObjectEnsurers
}
func (e *ensureRunningDevWorkflowReconciliationState) CanReconcile(workflow *operatorapi.SonataFlow) bool {
return workflow.Status.IsReady() || workflow.Status.GetTopLevelCondition().IsUnknown() || workflow.Status.IsChildObjectsProblem()
}
func (e *ensureRunningDevWorkflowReconciliationState) Do(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) {
var objs []client.Object
flowDefCM, _, err := e.ensurers.definitionConfigMap.ensure(ctx, workflow, ensureWorkflowDefConfigMapMutator(workflow))
if err != nil {
return ctrl.Result{Requeue: false}, objs, err
}
objs = append(objs, flowDefCM)
propsCM, _, err := e.ensurers.propertiesConfigMap.ensure(ctx, workflow, ensureWorkflowDevPropertiesConfigMapMutator(workflow))
if err != nil {
return ctrl.Result{Requeue: false}, objs, err
}
objs = append(objs, propsCM)
externalCM, err := workflowdef.FetchExternalResourcesConfigMapsRef(e.client, workflow)
if err != nil {
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.ExternalResourcesNotFoundReason, "External Resources ConfigMap not found: %s", err.Error())
if _, err = e.performStatusUpdate(ctx, workflow); err != nil {
return ctrl.Result{RequeueAfter: requeueAfterFailure}, objs, err
}
return ctrl.Result{RequeueAfter: requeueAfterFailure}, objs, nil
}
devBaseContainerImage := workflowdef.GetDefaultWorkflowDevModeImageTag()
pl, errPl := platform.GetActivePlatform(ctx, e.client, workflow.Namespace)
// check if the Platform available
if errPl == nil && len(pl.Spec.DevMode.BaseImage) > 0 {
devBaseContainerImage = pl.Spec.DevMode.BaseImage
}
deployment, _, err := e.ensurers.deployment.ensure(ctx, workflow,
devDeploymentMutateVisitor(workflow),
naiveApplyImageDeploymentMutateVisitor(devBaseContainerImage),
mountDevConfigMapsMutateVisitor(flowDefCM.(*corev1.ConfigMap), propsCM.(*corev1.ConfigMap), externalCM))
if err != nil {
return ctrl.Result{RequeueAfter: requeueAfterFailure}, objs, err
}
objs = append(objs, deployment)
service, _, err := e.ensurers.service.ensure(ctx, workflow, defaultServiceMutateVisitor(workflow))
if err != nil {
return ctrl.Result{RequeueAfter: requeueAfterFailure}, objs, err
}
objs = append(objs, service)
route, _, err := e.ensurers.network.ensure(ctx, workflow)
if err != nil {
return ctrl.Result{RequeueAfter: requeueAfterFailure}, objs, err
}
objs = append(objs, route)
// First time reconciling this object, mark as wait for deployment
if workflow.Status.GetTopLevelCondition().IsUnknown() {
klog.V(log.I).InfoS("Workflow is in WaitingForDeployment Condition")
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.WaitingForDeploymentReason, "")
if _, err = e.performStatusUpdate(ctx, workflow); err != nil {
return ctrl.Result{RequeueAfter: requeueAfterFailure}, objs, err
}
return ctrl.Result{RequeueAfter: requeueAfterIsRunning}, objs, nil
}
// Is the deployment still available?
convertedDeployment := deployment.(*appsv1.Deployment)
if !kubeutil.IsDeploymentAvailable(convertedDeployment) {
klog.V(log.I).InfoS("Workflow is not running due to a problem in the Deployment. Attempt to recover.")
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.DeploymentUnavailableReason, getDeploymentUnavailabilityMessage(convertedDeployment))
if _, err = e.performStatusUpdate(ctx, workflow); err != nil {
return ctrl.Result{RequeueAfter: requeueAfterFailure}, objs, err
}
}
return ctrl.Result{RequeueAfter: requeueAfterIsRunning}, objs, nil
}
type followDeployDevWorkflowReconciliationState struct {
*stateSupport
enrichers *devProfileObjectEnrichers
}
func (f *followDeployDevWorkflowReconciliationState) CanReconcile(workflow *operatorapi.SonataFlow) bool {
return workflow.Status.IsWaitingForDeployment()
}
func (f *followDeployDevWorkflowReconciliationState) Do(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) {
result, err := DeploymentHandler(f.client).SyncDeploymentStatus(ctx, workflow)
if err != nil {
return ctrl.Result{RequeueAfter: requeueAfterFailure}, nil, err
}
if _, err := f.performStatusUpdate(ctx, workflow); err != nil {
return ctrl.Result{RequeueAfter: requeueAfterFailure}, nil, err
}
return result, nil, nil
}
func (f *followDeployDevWorkflowReconciliationState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error {
deployment := &appsv1.Deployment{}
if err := f.client.Get(ctx, client.ObjectKeyFromObject(workflow), deployment); err != nil {
return err
}
if deployment != nil && kubeutil.IsDeploymentAvailable(deployment) {
// Enriching Workflow CR status with needed network info
if _, err := f.enrichers.networkInfo.Enrich(ctx, workflow); err != nil {
return err
}
if _, err := f.performStatusUpdate(ctx, workflow); err != nil {
return err
}
}
return nil
}
type recoverFromFailureDevReconciliationState struct {
*stateSupport
}
func (r *recoverFromFailureDevReconciliationState) CanReconcile(workflow *operatorapi.SonataFlow) bool {
return workflow.Status.GetCondition(api.RunningConditionType).IsFalse()
}
func (r *recoverFromFailureDevReconciliationState) Do(ctx context.Context, workflow *operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) {
// for now, a very basic attempt to recover by rolling out the deployment
deployment := &appsv1.Deployment{}
if err := r.client.Get(ctx, client.ObjectKeyFromObject(workflow), deployment); err != nil {
// if the deployment is not there, let's try to reset the status condition and make the reconciliation fix the objects
if errors.IsNotFound(err) {
klog.V(log.I).InfoS("Tried to recover from failed state, no deployment found, trying to reset the workflow conditions")
workflow.Status.RecoverFailureAttempts = 0
workflow.Status.Manager().MarkUnknown(api.RunningConditionType, "", "")
if _, updateErr := r.performStatusUpdate(ctx, workflow); updateErr != nil {
return ctrl.Result{Requeue: false}, nil, updateErr
}
return ctrl.Result{RequeueAfter: requeueAfterFailure}, nil, nil
}
return ctrl.Result{Requeue: false}, nil, err
}
// if the deployment is progressing we might have good news
if kubeutil.IsDeploymentAvailable(deployment) {
workflow.Status.RecoverFailureAttempts = 0
workflow.Status.Manager().MarkTrue(api.RunningConditionType)
if _, updateErr := r.performStatusUpdate(ctx, workflow); updateErr != nil {
return ctrl.Result{Requeue: false}, nil, updateErr
}
return ctrl.Result{RequeueAfter: requeueAfterFailure}, nil, nil
}
if workflow.Status.RecoverFailureAttempts >= recoverDeploymentErrorRetries {
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.RedeploymentExhaustedReason,
"Can't recover workflow from failure after maximum attempts: %d", workflow.Status.RecoverFailureAttempts)
if _, updateErr := r.performStatusUpdate(ctx, workflow); updateErr != nil {
return ctrl.Result{}, nil, updateErr
}
return ctrl.Result{RequeueAfter: requeueRecoverDeploymentErrorInterval}, nil, nil
}
// TODO: we can improve deployment failures https://issues.redhat.com/browse/KOGITO-8812
// Guard to avoid consecutive reconciliations to mess with the recover interval
if !workflow.Status.LastTimeRecoverAttempt.IsZero() &&
metav1.Now().Sub(workflow.Status.LastTimeRecoverAttempt.Time).Minutes() > 10 {
return ctrl.Result{RequeueAfter: time.Minute * recoverDeploymentErrorInterval}, nil, nil
}
// let's try rolling out the deployment
if err := kubeutil.MarkDeploymentToRollout(deployment); err != nil {
return ctrl.Result{}, nil, err
}
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
updateErr := r.client.Update(ctx, deployment)
return updateErr
})
if retryErr != nil {
klog.V(log.E).ErrorS(retryErr, "Error during Deployment rollout")
return ctrl.Result{RequeueAfter: requeueRecoverDeploymentErrorInterval}, nil, nil
}
workflow.Status.RecoverFailureAttempts += 1
workflow.Status.LastTimeRecoverAttempt = metav1.Now()
if _, err := r.performStatusUpdate(ctx, workflow); err != nil {
return ctrl.Result{Requeue: false}, nil, err
}
return ctrl.Result{RequeueAfter: requeueRecoverDeploymentErrorInterval}, nil, nil
}
// mountDevConfigMapsMutateVisitor mounts the required configMaps in the Workflow Dev Deployment
func mountDevConfigMapsMutateVisitor(flowDefCM, propsCM *corev1.ConfigMap, workflowResCMs []operatorapi.ConfigMapWorkflowResource) mutateVisitor {
return func(object client.Object) controllerutil.MutateFn {
return func() error {
deployment := object.(*appsv1.Deployment)
volumes := make([]corev1.Volume, 0)
volumeMounts := []corev1.VolumeMount{
kubeutil.VolumeMount(configMapWorkflowDefVolumeName, true, configMapWorkflowDefMountPath),
kubeutil.VolumeMount(configMapResourcesVolumeName, true, quarkusDevConfigMountPath),
}
// defaultResourcesVolume holds every ConfigMap mount required on src/main/resources
defaultResourcesVolume := corev1.Volume{Name: configMapResourcesVolumeName, VolumeSource: corev1.VolumeSource{Projected: &corev1.ProjectedVolumeSource{}}}
kubeutil.VolumeProjectionAddConfigMap(defaultResourcesVolume.Projected, propsCM.Name, corev1.KeyToPath{Key: workflowproj.ApplicationPropertiesFileName, Path: workflowproj.ApplicationPropertiesFileName})
// resourceVolumes holds every resource that needs to be mounted on src/main/resources/<specific_dir>
resourceVolumes := make([]corev1.Volume, 0)
for _, workflowResCM := range workflowResCMs {
// if we need to mount at the root dir, we use the defaultResourcesVolume
if len(workflowResCM.WorkflowPath) == 0 {
kubeutil.VolumeProjectionAddConfigMap(defaultResourcesVolume.Projected, workflowResCM.ConfigMap.Name)
continue
}
// the resource configMap needs a specific dir, inside the src/main/resources
// to avoid clashing with other configMaps trying to mount on the same dir, we create one projected per path
volumeMountName := configMapExternalResourcesVolumeNamePrefix + utils.PathToString(workflowResCM.WorkflowPath)
volumeMounts = kubeutil.VolumeMountAdd(volumeMounts, volumeMountName, path.Join(quarkusDevConfigMountPath, workflowResCM.WorkflowPath))
resourceVolumes = kubeutil.VolumeAddVolumeProjectionConfigMap(resourceVolumes, workflowResCM.ConfigMap.Name, volumeMountName)
}
volumes = append(volumes, defaultResourcesVolume, kubeutil.VolumeConfigMap(configMapWorkflowDefVolumeName, flowDefCM.Name))
volumes = append(volumes, resourceVolumes...)
deployment.Spec.Template.Spec.Volumes = make([]corev1.Volume, 0)
deployment.Spec.Template.Spec.Volumes = volumes
deployment.Spec.Template.Spec.Containers[0].VolumeMounts = make([]corev1.VolumeMount, 0)
deployment.Spec.Template.Spec.Containers[0].VolumeMounts = volumeMounts
return nil
}
}
}