blob: 8b72a23433183b57a41e89f085dccec5113d810f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package common
import (
"fmt"
"strings"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"
cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model"
"github.com/imdario/mergo"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/tracker"
"sigs.k8s.io/controller-runtime/pkg/client"
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/persistence"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/properties"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/variables"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils/openshift"
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
)
// ObjectCreator is the func that creates the initial reference object, if the object doesn't exist in the cluster, this one is created.
// Can be used as a reference to keep the object immutable
type ObjectCreator func(workflow *operatorapi.SonataFlow) (client.Object, error)
// ObjectCreatorWithPlatform is the func equivalent to ObjectCreator to use when the resource being created needs a reference to the
// SonataFlowPlatform
type ObjectCreatorWithPlatform func(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (client.Object, error)
// ObjectsCreator creates multiple resources
type ObjectsCreator func(workflow *operatorapi.SonataFlow) ([]client.Object, error)
const (
defaultHTTPServicePort = 80
// Default deployment health check configuration
// See: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/
healthTimeoutSeconds = 3
healthStartedFailureThreshold = 5
healthStartedPeriodSeconds = 15
healthStartedInitialDelaySeconds = 10
)
// DeploymentCreator is an objectCreator for a base Kubernetes Deployments for profiles that need to deploy the workflow on a vanilla deployment.
// It serves as a basis for a basic Quarkus Java application, expected to listen on http 8080.
func DeploymentCreator(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFlowPlatform) (client.Object, error) {
lbl := workflowproj.GetMergedLabels(workflow)
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: workflow.Name,
Namespace: workflow.Namespace,
Labels: lbl,
},
Spec: appsv1.DeploymentSpec{
Replicas: getReplicasOrDefault(workflow),
Selector: &metav1.LabelSelector{
MatchLabels: workflowproj.GetSelectorLabels(workflow),
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: lbl,
},
Spec: corev1.PodSpec{},
},
},
}
if err := mergo.Merge(&deployment.Spec.Template.Spec, workflow.Spec.PodTemplate.PodSpec.ToPodSpec(), mergo.WithOverride); err != nil {
return nil, err
}
flowContainer, err := defaultContainer(workflow, plf)
if err != nil {
return nil, err
}
kubeutil.AddOrReplaceContainer(operatorapi.DefaultContainerName, *flowContainer, &deployment.Spec.Template.Spec)
return deployment, nil
}
// KServiceCreator creates the default Knative Service object for SonataFlow instances. It's based on the default DeploymentCreator.
func KServiceCreator(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFlowPlatform) (client.Object, error) {
lbl := workflowproj.GetMergedLabels(workflow)
ksvc := &servingv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: workflow.Name,
Namespace: workflow.Namespace,
Labels: lbl,
},
Spec: servingv1.ServiceSpec{
ConfigurationSpec: servingv1.ConfigurationSpec{
Template: servingv1.RevisionTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: lbl,
},
Spec: servingv1.RevisionSpec{
PodSpec: corev1.PodSpec{},
},
},
},
},
}
if err := mergo.Merge(&ksvc.Spec.Template.Spec.PodSpec, workflow.Spec.PodTemplate.PodSpec.ToPodSpec(), mergo.WithOverride); err != nil {
return nil, err
}
flowContainer, err := defaultContainer(workflow, plf)
if err != nil {
return nil, err
}
kubeutil.AddOrReplaceContainer(operatorapi.DefaultContainerName, *flowContainer, &ksvc.Spec.Template.Spec.PodSpec)
return ksvc, nil
}
func getReplicasOrDefault(workflow *operatorapi.SonataFlow) *int32 {
var dReplicas int32 = 1
if workflow.Spec.PodTemplate.Replicas == nil {
return &dReplicas
}
return workflow.Spec.PodTemplate.Replicas
}
func defaultContainer(workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFlowPlatform) (*corev1.Container, error) {
defaultContainerPort := corev1.ContainerPort{
ContainerPort: variables.DefaultHTTPWorkflowPortIntStr.IntVal,
Name: utils.DefaultServicePortName,
Protocol: corev1.ProtocolTCP,
}
defaultFlowContainer := &corev1.Container{
Name: operatorapi.DefaultContainerName,
Ports: []corev1.ContainerPort{defaultContainerPort},
TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: constants.QuarkusHealthPathLive,
Port: variables.DefaultHTTPWorkflowPortIntStr,
},
},
TimeoutSeconds: healthTimeoutSeconds,
PeriodSeconds: healthStartedPeriodSeconds,
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: constants.QuarkusHealthPathReady,
Port: variables.DefaultHTTPWorkflowPortIntStr,
},
},
TimeoutSeconds: healthTimeoutSeconds,
PeriodSeconds: healthStartedPeriodSeconds,
},
StartupProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: constants.QuarkusHealthPathStarted,
Port: variables.DefaultHTTPWorkflowPortIntStr,
},
},
InitialDelaySeconds: healthStartedInitialDelaySeconds,
TimeoutSeconds: healthTimeoutSeconds,
FailureThreshold: healthStartedFailureThreshold,
PeriodSeconds: healthStartedPeriodSeconds,
},
SecurityContext: kubeutil.SecurityDefaults(),
}
// Merge with flowContainer
if err := mergo.Merge(defaultFlowContainer, workflow.Spec.PodTemplate.Container.ToContainer(), mergo.WithOverride); err != nil {
return nil, err
}
var pper *operatorapi.PlatformPersistenceOptionsSpec
if plf != nil && plf.Spec.Persistence != nil {
pper = plf.Spec.Persistence
}
if p := persistence.RetrieveConfiguration(workflow.Spec.Persistence, pper, workflow.Name); p != nil {
defaultFlowContainer = persistence.ConfigurePersistence(defaultFlowContainer, p, workflow.Name, workflow.Namespace)
}
// immutable
defaultFlowContainer.Name = operatorapi.DefaultContainerName
portIdx := -1
for i := range defaultFlowContainer.Ports {
if defaultFlowContainer.Ports[i].Name == utils.DefaultServicePortName ||
defaultFlowContainer.Ports[i].ContainerPort == variables.DefaultHTTPWorkflowPortIntStr.IntVal {
portIdx = i
break
}
}
if portIdx < 0 {
defaultFlowContainer.Ports = append(defaultFlowContainer.Ports, defaultContainerPort)
} else {
defaultFlowContainer.Ports[portIdx] = defaultContainerPort
}
return defaultFlowContainer, nil
}
// ServiceCreator is an objectCreator for a basic Service aiming a vanilla Kubernetes Deployment.
// It maps the default HTTP port (80) to the target Java application webserver on port 8080.
func ServiceCreator(workflow *operatorapi.SonataFlow) (client.Object, error) {
lbl := workflowproj.GetMergedLabels(workflow)
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: workflow.Name,
Namespace: workflow.Namespace,
Labels: lbl,
},
Spec: corev1.ServiceSpec{
Selector: lbl,
Ports: []corev1.ServicePort{{
Protocol: corev1.ProtocolTCP,
Port: defaultHTTPServicePort,
TargetPort: variables.DefaultHTTPWorkflowPortIntStr,
}},
},
}
return service, nil
}
// SinkBindingCreator is an ObjectsCreator for SinkBinding.
// It will create v1.SinkBinding based on events defined in workflow.
func SinkBindingCreator(workflow *operatorapi.SonataFlow) (client.Object, error) {
lbl := workflowproj.GetMergedLabels(workflow)
// skip if no produced event is found
if workflow.Spec.Sink == nil || !workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindProduced) {
return nil, nil
}
sink := workflow.Spec.Sink
// subject must be deployment to inject K_SINK, service won't work
sinkBinding := &sourcesv1.SinkBinding{
ObjectMeta: metav1.ObjectMeta{
Name: strings.ToLower(fmt.Sprintf("%s-sb", workflow.Name)),
Namespace: workflow.Namespace,
Labels: lbl,
},
Spec: sourcesv1.SinkBindingSpec{
SourceSpec: duckv1.SourceSpec{
Sink: *sink,
},
BindingSpec: duckv1.BindingSpec{
Subject: tracker.Reference{
Name: workflow.Name,
Namespace: workflow.Namespace,
APIVersion: "apps/v1",
Kind: "Deployment",
},
},
},
}
return sinkBinding, nil
}
// TriggersCreator is an ObjectsCreator for Triggers.
// It will create a list of eventingv1.Trigger based on events defined in workflow.
func TriggersCreator(workflow *operatorapi.SonataFlow) ([]client.Object, error) {
var resultObjects []client.Object
lbl := workflowproj.GetMergedLabels(workflow)
//consumed
events := workflow.Spec.Flow.Events
for _, event := range events {
// filter out produce events
if event.Kind == cncfmodel.EventKindProduced {
continue
}
// construct eventingv1.Trigger
trigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: strings.ToLower(fmt.Sprintf("%s-%s-trigger", workflow.Name, event.Name)),
Namespace: workflow.Namespace,
Labels: lbl,
},
Spec: eventingv1.TriggerSpec{
Broker: constants.KnativeEventingBrokerDefault,
Filter: &eventingv1.TriggerFilter{
Attributes: eventingv1.TriggerFilterAttributes{
"type": event.Type,
},
},
Subscriber: duckv1.Destination{
Ref: &duckv1.KReference{
Name: workflow.Name,
Namespace: workflow.Namespace,
APIVersion: "v1",
Kind: "Service",
},
},
},
}
resultObjects = append(resultObjects, trigger)
}
return resultObjects, nil
}
// OpenShiftRouteCreator is an ObjectCreator for a basic Route for a workflow running on OpenShift.
// It enables the exposition of the service using an OpenShift Route.
// See: https://github.com/openshift/api/blob/d170fcdc0fa638b664e4f35f2daf753cb4afe36b/route/v1/route.crd.yaml
func OpenShiftRouteCreator(workflow *operatorapi.SonataFlow) (client.Object, error) {
route, err := openshift.RouteForWorkflow(workflow)
return route, err
}
// UserPropsConfigMapCreator creates an empty ConfigMap to hold the user application properties
func UserPropsConfigMapCreator(workflow *operatorapi.SonataFlow) (client.Object, error) {
return workflowproj.CreateNewUserPropsConfigMap(workflow), nil
}
// ManagedPropsConfigMapCreator creates an empty ConfigMap to hold the external application properties
func ManagedPropsConfigMapCreator(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (client.Object, error) {
props, err := properties.ApplicationManagedProperties(workflow, platform)
if err != nil {
return nil, err
}
return workflowproj.CreateNewManagedPropsConfigMap(workflow, props), nil
}