[KOGITO-7754] create Knative eventing resources (#350)
diff --git a/api/v1alpha08/sonataflow_types.go b/api/v1alpha08/sonataflow_types.go
index 04aa544..ac32fe3 100644
--- a/api/v1alpha08/sonataflow_types.go
+++ b/api/v1alpha08/sonataflow_types.go
@@ -659,6 +659,9 @@
PodTemplate PodTemplateSpec `json:"podTemplate,omitempty"`
// Persistence defines the database persistence configuration for the workflow
Persistence *PersistenceOptionsSpec `json:"persistence,omitempty"`
+ // Sink describes the sinkBinding details of this SonataFlow instance.
+ //+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="sink"
+ Sink *duckv1.Destination `json:"sink,omitempty"`
}
// SonataFlowStatus defines the observed state of SonataFlow
diff --git a/api/v1alpha08/zz_generated.deepcopy.go b/api/v1alpha08/zz_generated.deepcopy.go
index 75e8bf3..fb2db18 100644
--- a/api/v1alpha08/zz_generated.deepcopy.go
+++ b/api/v1alpha08/zz_generated.deepcopy.go
@@ -26,6 +26,7 @@
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"knative.dev/pkg/apis"
+ duckv1 "knative.dev/pkg/apis/duck/v1"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
@@ -1174,6 +1175,11 @@
*out = new(PersistenceOptionsSpec)
(*in).DeepCopyInto(*out)
}
+ if in.Sink != nil {
+ in, out := &in.Sink, &out.Sink
+ *out = new(duckv1.Destination)
+ (*in).DeepCopyInto(*out)
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowSpec.
diff --git a/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml b/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml
index cffce25..7597683 100644
--- a/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml
+++ b/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml
@@ -312,6 +312,9 @@
definition. For example, a collection of OpenAPI specification files.
displayName: resources
path: resources
+ - description: Sink describes the sinkBinding details of this SonataFlow instance.
+ displayName: sink
+ path: sink
statusDescriptors:
- description: Address is used as a part of Addressable interface (status.address.url)
for knative
@@ -401,6 +404,36 @@
- update
- watch
- apiGroups:
+ - eventing.knative.dev
+ resources:
+ - triggers
+ - triggers/status
+ - triggers/finalizers
+ verbs:
+ - create
+ - delete
+ - deletecollection
+ - get
+ - list
+ - patch
+ - update
+ - watch
+ - apiGroups:
+ - sources.knative.dev
+ resources:
+ - sinkbindings
+ - sinkbindings/status
+ - sinkbindings/finalizers
+ verbs:
+ - create
+ - delete
+ - deletecollection
+ - get
+ - list
+ - patch
+ - update
+ - watch
+ - apiGroups:
- coordination.k8s.io
resources:
- leases
diff --git a/bundle/manifests/sonataflow.org_sonataflows.yaml b/bundle/manifests/sonataflow.org_sonataflows.yaml
index 76c704f..6836cd4 100644
--- a/bundle/manifests/sonataflow.org_sonataflows.yaml
+++ b/bundle/manifests/sonataflow.org_sonataflows.yaml
@@ -9346,6 +9346,53 @@
type: object
type: array
type: object
+ sink:
+ description: Sink describes the sinkBinding details of this SonataFlow
+ instance.
+ properties:
+ CACerts:
+ description: CACerts are Certification Authority (CA) certificates
+ in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
+ If set, these CAs are appended to the set of CAs provided by
+ the Addressable target, if any.
+ type: string
+ ref:
+ description: Ref points to an Addressable.
+ properties:
+ address:
+ description: Address points to a specific Address Name.
+ type: string
+ apiVersion:
+ description: API version of the referent.
+ type: string
+ group:
+ description: 'Group of the API, without the version of the
+ group. This can be used as an alternative to the APIVersion,
+ and then resolved using ResolveGroup. Note: This API is
+ EXPERIMENTAL and might break anytime. For more details:
+ https://github.com/knative/eventing/issues/5086'
+ type: string
+ kind:
+ description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
+ type: string
+ name:
+ description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
+ type: string
+ namespace:
+ description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
+ This is optional field, it gets defaulted to the object
+ holding it if left out.'
+ type: string
+ required:
+ - kind
+ - name
+ type: object
+ uri:
+ description: URI can be an absolute URL(non-empty scheme and non-empty
+ host) pointing to the target or a relative URI. Relative URIs
+ will be resolved using the base URI retrieved from Ref.
+ type: string
+ type: object
required:
- flow
type: object
diff --git a/config/crd/bases/sonataflow.org_sonataflows.yaml b/config/crd/bases/sonataflow.org_sonataflows.yaml
index e521432..a9df118 100644
--- a/config/crd/bases/sonataflow.org_sonataflows.yaml
+++ b/config/crd/bases/sonataflow.org_sonataflows.yaml
@@ -9347,6 +9347,53 @@
type: object
type: array
type: object
+ sink:
+ description: Sink describes the sinkBinding details of this SonataFlow
+ instance.
+ properties:
+ CACerts:
+ description: CACerts are Certification Authority (CA) certificates
+ in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
+ If set, these CAs are appended to the set of CAs provided by
+ the Addressable target, if any.
+ type: string
+ ref:
+ description: Ref points to an Addressable.
+ properties:
+ address:
+ description: Address points to a specific Address Name.
+ type: string
+ apiVersion:
+ description: API version of the referent.
+ type: string
+ group:
+ description: 'Group of the API, without the version of the
+ group. This can be used as an alternative to the APIVersion,
+ and then resolved using ResolveGroup. Note: This API is
+ EXPERIMENTAL and might break anytime. For more details:
+ https://github.com/knative/eventing/issues/5086'
+ type: string
+ kind:
+ description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
+ type: string
+ name:
+ description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
+ type: string
+ namespace:
+ description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
+ This is optional field, it gets defaulted to the object
+ holding it if left out.'
+ type: string
+ required:
+ - kind
+ - name
+ type: object
+ uri:
+ description: URI can be an absolute URL(non-empty scheme and non-empty
+ host) pointing to the target or a relative URI. Relative URIs
+ will be resolved using the base URI retrieved from Ref.
+ type: string
+ type: object
required:
- flow
type: object
diff --git a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
index 1f65bbe..089ae24 100644
--- a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
+++ b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
@@ -196,6 +196,9 @@
definition. For example, a collection of OpenAPI specification files.
displayName: resources
path: resources
+ - description: Sink describes the sinkBinding details of this SonataFlow instance.
+ displayName: sink
+ path: sink
statusDescriptors:
- description: Address is used as a part of Addressable interface (status.address.url)
for knative
diff --git a/config/rbac/builder_role.yaml b/config/rbac/builder_role.yaml
index 9d00da3..70b2ab5 100644
--- a/config/rbac/builder_role.yaml
+++ b/config/rbac/builder_role.yaml
@@ -65,4 +65,34 @@
- list
- patch
- update
+ - watch
+- apiGroups:
+ - eventing.knative.dev
+ resources:
+ - triggers
+ - triggers/status
+ - triggers/finalizers
+ verbs:
+ - create
+ - delete
+ - deletecollection
+ - get
+ - list
+ - patch
+ - update
+ - watch
+- apiGroups:
+ - sources.knative.dev
+ resources:
+ - sinkbindings
+ - sinkbindings/status
+ - sinkbindings/finalizers
+ verbs:
+ - create
+ - delete
+ - deletecollection
+ - get
+ - list
+ - patch
+ - update
- watch
\ No newline at end of file
diff --git a/controllers/profiles/common/constants/workflows.go b/controllers/profiles/common/constants/workflows.go
index a64002a..8087f96 100644
--- a/controllers/profiles/common/constants/workflows.go
+++ b/controllers/profiles/common/constants/workflows.go
@@ -16,4 +16,11 @@
const (
MicroprofileServiceCatalogPropertyPrefix = "org.kie.kogito.addons.discovery."
+ KogitoOutgoingEventsURL = "mp.messaging.outgoing.kogito_outgoing_stream.url"
+ KogitoOutgoingEventsConnector = "mp.messaging.outgoing.kogito_outgoing_stream.connector"
+ KogitoIncomingEventsConnector = "mp.messaging.incoming.kogito_incoming_stream.connector"
+ KogitoIncomingEventsPath = "mp.messaging.incoming.kogito_incoming_stream.path"
+ KnativeHealthEnabled = "org.kie.kogito.addons.knative.eventing.health-enabled"
+ KnativeInjectedEnvVar = "${K_SINK}"
+ KnativeEventingBrokerDefault = "default"
)
diff --git a/controllers/profiles/common/ensurer.go b/controllers/profiles/common/ensurer.go
index 980d5cb..a1b8c92 100644
--- a/controllers/profiles/common/ensurer.go
+++ b/controllers/profiles/common/ensurer.go
@@ -32,6 +32,7 @@
var _ ObjectEnsurer = &defaultObjectEnsurer{}
var _ ObjectEnsurer = &noopObjectEnsurer{}
+var _ ObjectsEnsurer = &defaultObjectsEnsurer{}
type ObjectEnsurer interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error)
@@ -77,22 +78,10 @@
result := controllerutil.OperationResultNone
object, err := d.creator(workflow)
- if err != nil {
+ if err != nil || object == nil {
return nil, result, err
}
- if result, err = controllerutil.CreateOrPatch(ctx, d.c, object,
- func() error {
- for _, v := range visitors {
- if visitorErr := v(object)(); visitorErr != nil {
- return visitorErr
- }
- }
- return controllerutil.SetControllerReference(workflow, object, d.c.Scheme())
- }); err != nil {
- return nil, result, err
- }
- klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace())
- return object, result, nil
+ return ensureObject(ctx, workflow, visitors, result, d.c, object)
}
// defaultObjectEnsurerWithPlatform is the equivalent of defaultObjectEnsurer for resources that require a reference to the SonataFlowPlatform
@@ -136,3 +125,61 @@
result := controllerutil.OperationResultNone
return nil, result, nil
}
+
+// ObjectsEnsurer is an ensurer to apply multiple objects
+type ObjectsEnsurer interface {
+ Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult
+}
+
+type ObjectEnsurerResult struct {
+ client.Object
+ Result controllerutil.OperationResult
+ Error error
+}
+
+func NewObjectsEnsurer(client client.Client, creator ObjectsCreator) ObjectsEnsurer {
+ return &defaultObjectsEnsurer{
+ c: client,
+ creator: creator,
+ }
+}
+
+type defaultObjectsEnsurer struct {
+ ObjectsEnsurer
+ c client.Client
+ creator ObjectsCreator
+}
+
+func (d *defaultObjectsEnsurer) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult {
+ result := controllerutil.OperationResultNone
+
+ objects, err := d.creator(workflow)
+ if err != nil {
+ return []ObjectEnsurerResult{{nil, result, err}}
+ }
+ var ensureResult []ObjectEnsurerResult
+ for _, object := range objects {
+ ensureObject, c, err := ensureObject(ctx, workflow, visitors, result, d.c, object)
+ ensureResult = append(ensureResult, ObjectEnsurerResult{ensureObject, c, err})
+ if err != nil {
+ return ensureResult
+ }
+ }
+ return ensureResult
+}
+
+func ensureObject(ctx context.Context, workflow *operatorapi.SonataFlow, visitors []MutateVisitor, result controllerutil.OperationResult, c client.Client, object client.Object) (client.Object, controllerutil.OperationResult, error) {
+ if result, err := controllerutil.CreateOrPatch(ctx, c, object,
+ func() error {
+ for _, v := range visitors {
+ if visitorErr := v(object)(); visitorErr != nil {
+ return visitorErr
+ }
+ }
+ return controllerutil.SetControllerReference(workflow, object, c.Scheme())
+ }); err != nil {
+ return nil, result, err
+ }
+ klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace())
+ return object, result, nil
+}
diff --git a/controllers/profiles/common/knative.go b/controllers/profiles/common/knative.go
new file mode 100644
index 0000000..13f5bc4
--- /dev/null
+++ b/controllers/profiles/common/knative.go
@@ -0,0 +1,76 @@
+// Copyright 2024 Apache Software Foundation (ASF)
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package common
+
+import (
+ "context"
+
+ "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative"
+
+ operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+ "github.com/apache/incubator-kie-kogito-serverless-operator/log"
+ "k8s.io/klog/v2"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+var _ KnativeEventingHandler = &knativeObjectManager{}
+
+type knativeObjectManager struct {
+ sinkBinding ObjectEnsurer
+ trigger ObjectsEnsurer
+ *StateSupport
+}
+
+func NewKnativeEventingHandler(support *StateSupport) KnativeEventingHandler {
+ return &knativeObjectManager{
+ sinkBinding: NewObjectEnsurer(support.C, SinkBindingCreator),
+ trigger: NewObjectsEnsurer(support.C, TriggersCreator),
+ StateSupport: support,
+ }
+}
+
+type KnativeEventingHandler interface {
+ Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) ([]client.Object, error)
+}
+
+func (k knativeObjectManager) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) ([]client.Object, error) {
+ var objs []client.Object
+
+ if workflow.Spec.Flow.Events == nil {
+ // skip if no event is found
+ klog.V(log.I).InfoS("skip knative resource creation as no event is found")
+ } else if workflow.Spec.Sink == nil {
+ klog.V(log.I).InfoS("Spec.Sink is not provided")
+ } else if knativeAvail, err := knative.GetKnativeAvailability(k.Cfg); err != nil || knativeAvail == nil || !knativeAvail.Eventing {
+ klog.V(log.I).InfoS("Knative Eventing is not installed")
+ } else {
+ // create sinkBinding and trigger
+ sinkBinding, _, err := k.sinkBinding.Ensure(ctx, workflow)
+ if err != nil {
+ return objs, err
+ } else if sinkBinding != nil {
+ objs = append(objs, sinkBinding)
+ }
+
+ triggers := k.trigger.Ensure(ctx, workflow)
+ for _, trigger := range triggers {
+ if trigger.Error != nil {
+ return objs, trigger.Error
+ }
+ objs = append(objs, trigger.Object)
+ }
+ }
+ return objs, nil
+}
diff --git a/controllers/profiles/common/object_creators.go b/controllers/profiles/common/object_creators.go
index 7e08366..4b28e98 100644
--- a/controllers/profiles/common/object_creators.go
+++ b/controllers/profiles/common/object_creators.go
@@ -20,10 +20,21 @@
package common
import (
+ "fmt"
+ "strings"
+
+ "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef"
+
+ cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model"
+
"github.com/imdario/mergo"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
+ sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
+ duckv1 "knative.dev/pkg/apis/duck/v1"
+ "knative.dev/pkg/tracker"
"sigs.k8s.io/controller-runtime/pkg/client"
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
@@ -45,6 +56,9 @@
// SonataFlowPlatform
type ObjectCreatorWithPlatform func(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (client.Object, error)
+// ObjectsCreator creates multiple resources
+type ObjectsCreator func(workflow *operatorapi.SonataFlow) ([]client.Object, error)
+
const (
defaultHTTPServicePort = 80
@@ -199,6 +213,85 @@
return service, nil
}
+// SinkBindingCreator is an ObjectsCreator for SinkBinding.
+// It will create v1.SinkBinding based on events defined in workflow.
+func SinkBindingCreator(workflow *operatorapi.SonataFlow) (client.Object, error) {
+ lbl := workflowproj.GetDefaultLabels(workflow)
+
+ // skip if no produced event is found
+ if workflow.Spec.Sink == nil || !workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindProduced) {
+ return nil, nil
+ }
+
+ sink := workflow.Spec.Sink
+
+ // subject must be deployment to inject K_SINK, service won't work
+ sinkBinding := &sourcesv1.SinkBinding{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: strings.ToLower(fmt.Sprintf("%s-sb", workflow.Name)),
+ Namespace: workflow.Namespace,
+ Labels: lbl,
+ },
+ Spec: sourcesv1.SinkBindingSpec{
+ SourceSpec: duckv1.SourceSpec{
+ Sink: *sink,
+ },
+ BindingSpec: duckv1.BindingSpec{
+ Subject: tracker.Reference{
+ Name: workflow.Name,
+ Namespace: workflow.Namespace,
+ APIVersion: "apps/v1",
+ Kind: "Deployment",
+ },
+ },
+ },
+ }
+ return sinkBinding, nil
+}
+
+// TriggersCreator is an ObjectsCreator for Triggers.
+// It will create a list of eventingv1.Trigger based on events defined in workflow.
+func TriggersCreator(workflow *operatorapi.SonataFlow) ([]client.Object, error) {
+ var resultObjects []client.Object
+ lbl := workflowproj.GetDefaultLabels(workflow)
+
+ //consumed
+ events := workflow.Spec.Flow.Events
+ for _, event := range events {
+ // filter out produce events
+ if event.Kind == cncfmodel.EventKindProduced {
+ continue
+ }
+
+ // construct eventingv1.Trigger
+ trigger := &eventingv1.Trigger{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: strings.ToLower(fmt.Sprintf("%s-%s-trigger", workflow.Name, event.Name)),
+ Namespace: workflow.Namespace,
+ Labels: lbl,
+ },
+ Spec: eventingv1.TriggerSpec{
+ Broker: constants.KnativeEventingBrokerDefault,
+ Filter: &eventingv1.TriggerFilter{
+ Attributes: eventingv1.TriggerFilterAttributes{
+ "type": event.Type,
+ },
+ },
+ Subscriber: duckv1.Destination{
+ Ref: &duckv1.KReference{
+ Name: workflow.Name,
+ Namespace: workflow.Namespace,
+ APIVersion: "v1",
+ Kind: "Service",
+ },
+ },
+ },
+ }
+ resultObjects = append(resultObjects, trigger)
+ }
+ return resultObjects, nil
+}
+
// OpenShiftRouteCreator is an ObjectCreator for a basic Route for a workflow running on OpenShift.
// It enables the exposition of the service using an OpenShift Route.
// See: https://github.com/openshift/api/blob/d170fcdc0fa638b664e4f35f2daf753cb4afe36b/route/v1/route.crd.yaml
diff --git a/controllers/profiles/common/object_creators_test.go b/controllers/profiles/common/object_creators_test.go
index beb0436..236aa02 100644
--- a/controllers/profiles/common/object_creators_test.go
+++ b/controllers/profiles/common/object_creators_test.go
@@ -23,6 +23,8 @@
"context"
"testing"
+ sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
+
"github.com/magiconair/properties"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
@@ -176,6 +178,35 @@
assert.Empty(t, flowContainer.Env)
}
+func Test_ensureWorkflowSinkBindingIsCreated(t *testing.T) {
+ workflow := test.GetVetEventSonataFlow(t.Name())
+
+ //On Kubernetes we want the service exposed in Dev with NodePort
+ sinkBinding, _ := SinkBindingCreator(workflow)
+ sinkBinding.SetUID("1")
+ sinkBinding.SetResourceVersion("1")
+
+ reflectSinkBinding := sinkBinding.(*sourcesv1.SinkBinding)
+
+ assert.NotNil(t, reflectSinkBinding)
+ assert.NotNil(t, reflectSinkBinding.Spec)
+ assert.NotEmpty(t, reflectSinkBinding.Spec.Sink)
+ assert.Equal(t, reflectSinkBinding.Spec.Sink.Ref.Kind, "Broker")
+}
+
+func Test_ensureWorkflowTriggersAreCreated(t *testing.T) {
+ workflow := test.GetVetEventSonataFlow(t.Name())
+
+ //On Kubernetes we want the service exposed in Dev with NodePort
+ triggers, _ := TriggersCreator(workflow)
+
+ assert.NotEmpty(t, triggers)
+ assert.Len(t, triggers, 2)
+ for _, trigger := range triggers {
+ assert.Contains(t, []string{"vet-vetappointmentrequestreceived-trigger", "vet-vetappointmentinfo-trigger"}, trigger.GetName())
+ }
+}
+
func TestMergePodSpec_WithPostgreSQL_and_JDBC_URL_field(t *testing.T) {
workflow := test.GetBaseSonataFlow(t.Name())
workflow.Spec = v1alpha08.SonataFlowSpec{
diff --git a/controllers/profiles/common/properties/application.go b/controllers/profiles/common/properties/application.go
index 4148914..9b59260 100644
--- a/controllers/profiles/common/properties/application.go
+++ b/controllers/profiles/common/properties/application.go
@@ -40,9 +40,6 @@
var (
immutableApplicationProperties = fmt.Sprintf("quarkus.http.port=%d\n"+
"quarkus.http.host=0.0.0.0\n"+
- // We disable the Knative health checks to not block the pod to run if Knative objects are not available
- // See: https://kiegroup.github.io/kogito-docs/serverlessworkflow/latest/eventing/consume-produce-events-with-knative-eventing.html#ref-knative-eventing-add-on-source-configuration
- "org.kie.kogito.addons.knative.eventing.health-enabled=false\n"+
"quarkus.devservices.enabled=false\n"+
"quarkus.kogito.devservices.enabled=false\n", constants.DefaultHTTPWorkflowPortInt)
_ AppPropertyHandler = &appPropertyHandler{}
@@ -160,6 +157,11 @@
return nil, err
}
props.Merge(p)
+ p, err = generateKnativeEventingWorkflowProperties(workflow)
+ if err != nil {
+ return nil, err
+ }
+ props.Merge(p)
props.Sort()
}
handler.defaultMutableProperties = props
diff --git a/controllers/profiles/common/properties/application_test.go b/controllers/profiles/common/properties/application_test.go
index e686aaa..7133db1 100644
--- a/controllers/profiles/common/properties/application_test.go
+++ b/controllers/profiles/common/properties/application_test.go
@@ -124,13 +124,12 @@
assert.NoError(t, err)
generatedProps, propsErr := properties.LoadString(props.WithUserProperties(userProperties).Build())
assert.NoError(t, propsErr)
- assert.Equal(t, 7, len(generatedProps.Keys()))
+ assert.Equal(t, 6, len(generatedProps.Keys()))
assert.NotContains(t, "property1", generatedProps.Keys())
assert.NotContains(t, "property2", generatedProps.Keys())
assert.Equal(t, "http://greeting.default", generatedProps.GetString("kogito.service.url", ""))
assert.Equal(t, "8080", generatedProps.GetString("quarkus.http.port", ""))
assert.Equal(t, "0.0.0.0", generatedProps.GetString("quarkus.http.host", ""))
- assert.Equal(t, "false", generatedProps.GetString("org.kie.kogito.addons.knative.eventing.health-enabled", ""))
assert.Equal(t, "false", generatedProps.GetString("quarkus.devservices.enabled", ""))
assert.Equal(t, "false", generatedProps.GetString("quarkus.kogito.devservices.enabled", ""))
assert.Equal(t, "false", generatedProps.GetString(constants.KogitoUserTasksEventsEnabled, ""))
@@ -157,7 +156,7 @@
Build())
generatedProps.DisableExpansion = true
assert.NoError(t, propsErr)
- assert.Equal(t, 21, len(generatedProps.Keys()))
+ assert.Equal(t, 20, len(generatedProps.Keys()))
assert.NotContains(t, "property1", generatedProps.Keys())
assert.NotContains(t, "property2", generatedProps.Keys())
assertHasProperty(t, generatedProps, "service1", myService1Address)
@@ -181,7 +180,6 @@
assertHasProperty(t, generatedProps, "kogito.service.url", fmt.Sprintf("http://greeting.%s", defaultNamespace))
assertHasProperty(t, generatedProps, "quarkus.http.port", "8080")
assertHasProperty(t, generatedProps, "quarkus.http.host", "0.0.0.0")
- assertHasProperty(t, generatedProps, "org.kie.kogito.addons.knative.eventing.health-enabled", "false")
assertHasProperty(t, generatedProps, "quarkus.devservices.enabled", "false")
assertHasProperty(t, generatedProps, "quarkus.kogito.devservices.enabled", "false")
assertHasProperty(t, generatedProps, constants.KogitoUserTasksEventsEnabled, "false")
@@ -276,7 +274,6 @@
assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace+"/v2/jobs/events", generatedProps.GetString(constants.JobServiceRequestEventsURL, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, ""))
- assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoJobServiceURL, ""))
// disabling job service bypasses config of outgoing events url
platform.Spec.Services.JobService.Enabled = nil
diff --git a/controllers/profiles/common/properties/properties.go b/controllers/profiles/common/properties/properties.go
new file mode 100644
index 0000000..ec4c3b0
--- /dev/null
+++ b/controllers/profiles/common/properties/properties.go
@@ -0,0 +1,49 @@
+// Copyright 2024 Apache Software Foundation (ASF)
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package properties
+
+import (
+ operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+ "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants"
+ "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef"
+ "github.com/magiconair/properties"
+ cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model"
+)
+
+// GenerateKnativeEventingWorkflowProperties returns the set of application properties required for the workflow to produce or consume
+// Knative Events.
+// Never nil.
+func generateKnativeEventingWorkflowProperties(workflow *operatorapi.SonataFlow) (*properties.Properties, error) {
+ props := properties.NewProperties()
+ if workflow == nil || workflow.Spec.Sink == nil {
+ props.Set(constants.KnativeHealthEnabled, "false")
+ return props, nil
+ }
+ // verify ${K_SINK}
+ props.Set(constants.KnativeHealthEnabled, "true")
+ if workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindProduced) {
+ props.Set(constants.KogitoOutgoingEventsConnector, constants.QuarkusHTTP)
+ props.Set(constants.KogitoOutgoingEventsURL, constants.KnativeInjectedEnvVar)
+ }
+ if workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindConsumed) {
+ props.Set(constants.KogitoIncomingEventsConnector, constants.QuarkusHTTP)
+ var path = "/"
+ if workflow.Spec.Sink.URI != nil {
+ path = workflow.Spec.Sink.URI.Path
+ }
+ props.Set(constants.KogitoIncomingEventsPath, path)
+ }
+ return props, nil
+}
diff --git a/controllers/profiles/common/reconciler.go b/controllers/profiles/common/reconciler.go
index da29a10..b280000 100644
--- a/controllers/profiles/common/reconciler.go
+++ b/controllers/profiles/common/reconciler.go
@@ -23,6 +23,8 @@
"context"
"fmt"
+ "k8s.io/client-go/rest"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"
"k8s.io/client-go/tools/record"
@@ -39,12 +41,13 @@
// StateSupport is the shared structure with common accessors used throughout the whole reconciliation profiles
type StateSupport struct {
C client.Client
+ Cfg *rest.Config
Catalog discovery.ServiceCatalog
Recorder record.EventRecorder
}
// PerformStatusUpdate updates the SonataFlow Status conditions
-func (s StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operatorapi.SonataFlow) (bool, error) {
+func (s *StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operatorapi.SonataFlow) (bool, error) {
var err error
workflow.Status.ObservedGeneration = workflow.Generation
if err = s.C.Status().Update(ctx, workflow); err != nil {
diff --git a/controllers/profiles/dev/profile_dev.go b/controllers/profiles/dev/profile_dev.go
index 14dc188..0bd5a39 100644
--- a/controllers/profiles/dev/profile_dev.go
+++ b/controllers/profiles/dev/profile_dev.go
@@ -46,6 +46,7 @@
func NewProfileReconciler(client client.Client, cfg *rest.Config, recorder record.EventRecorder) profiles.ProfileReconciler {
support := &common.StateSupport{
C: client,
+ Cfg: cfg,
Catalog: discovery.NewServiceCatalogForConfig(client, cfg),
Recorder: recorder,
}
diff --git a/controllers/profiles/dev/states_dev.go b/controllers/profiles/dev/states_dev.go
index e186a35..17bd5a8 100644
--- a/controllers/profiles/dev/states_dev.go
+++ b/controllers/profiles/dev/states_dev.go
@@ -117,6 +117,12 @@
}
objs = append(objs, route)
+ if knativeObjs, err := common.NewKnativeEventingHandler(e.StateSupport).Ensure(ctx, workflow); err != nil {
+ return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, objs, err
+ } else {
+ objs = append(objs, knativeObjs...)
+ }
+
// First time reconciling this object, mark as wait for deployment
if workflow.Status.GetTopLevelCondition().IsUnknown() {
klog.V(log.I).InfoS("Workflow is in WaitingForDeployment Condition")
diff --git a/controllers/profiles/prod/deployment_handler.go b/controllers/profiles/prod/deployment_handler.go
index 4581f2c..af999e5 100644
--- a/controllers/profiles/prod/deployment_handler.go
+++ b/controllers/profiles/prod/deployment_handler.go
@@ -83,7 +83,12 @@
return reconcile.Result{}, nil, err
}
+ knativeObjs, err := common.NewKnativeEventingHandler(d.StateSupport).Ensure(ctx, workflow)
+ if err != nil {
+ return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, nil, err
+ }
objs := []client.Object{deployment, service, managedPropsCM}
+ objs = append(objs, knativeObjs...)
if deploymentOp == controllerutil.OperationResultCreated {
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.WaitingForDeploymentReason, "")
diff --git a/controllers/profiles/prod/profile_prod.go b/controllers/profiles/prod/profile_prod.go
index 33932d2..b335326 100644
--- a/controllers/profiles/prod/profile_prod.go
+++ b/controllers/profiles/prod/profile_prod.go
@@ -72,6 +72,7 @@
func NewProfileReconciler(client client.Client, cfg *rest.Config, recorder record.EventRecorder) profiles.ProfileReconciler {
support := &common.StateSupport{
C: client,
+ Cfg: cfg,
Catalog: discovery.NewServiceCatalogForConfig(client, cfg),
Recorder: recorder,
}
@@ -93,6 +94,7 @@
func NewProfileForOpsReconciler(client client.Client, cfg *rest.Config, recorder record.EventRecorder) profiles.ProfileReconciler {
support := &common.StateSupport{
C: client,
+ Cfg: cfg,
Catalog: discovery.NewServiceCatalogForConfig(client, cfg),
Recorder: recorder,
}
diff --git a/controllers/workflowdef/services.go b/controllers/workflowdef/services.go
new file mode 100644
index 0000000..03db8ef
--- /dev/null
+++ b/controllers/workflowdef/services.go
@@ -0,0 +1,29 @@
+// Copyright 2024 Apache Software Foundation (ASF)
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package workflowdef
+
+import (
+ operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+ cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model"
+)
+
+func ContainsEventKind(workflow *operatorapi.SonataFlow, eventKind cncfmodel.EventKind) bool {
+ for _, event := range workflow.Spec.Flow.Events {
+ if event.Kind == eventKind {
+ return true
+ }
+ }
+ return false
+}
diff --git a/main.go b/main.go
index a1040f7..240a1d2 100644
--- a/main.go
+++ b/main.go
@@ -23,6 +23,9 @@
"flag"
"os"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
+ sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
+
"k8s.io/klog/v2/klogr"
"k8s.io/klog/v2"
@@ -54,6 +57,8 @@
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(operatorapi.AddToScheme(scheme))
+ utilruntime.Must(sourcesv1.AddToScheme(scheme))
+ utilruntime.Must(eventingv1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
diff --git a/operator.yaml b/operator.yaml
index a4693bd..fb73c10 100644
--- a/operator.yaml
+++ b/operator.yaml
@@ -26173,6 +26173,53 @@
type: object
type: array
type: object
+ sink:
+ description: Sink describes the sinkBinding details of this SonataFlow
+ instance.
+ properties:
+ CACerts:
+ description: CACerts are Certification Authority (CA) certificates
+ in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
+ If set, these CAs are appended to the set of CAs provided by
+ the Addressable target, if any.
+ type: string
+ ref:
+ description: Ref points to an Addressable.
+ properties:
+ address:
+ description: Address points to a specific Address Name.
+ type: string
+ apiVersion:
+ description: API version of the referent.
+ type: string
+ group:
+ description: 'Group of the API, without the version of the
+ group. This can be used as an alternative to the APIVersion,
+ and then resolved using ResolveGroup. Note: This API is
+ EXPERIMENTAL and might break anytime. For more details:
+ https://github.com/knative/eventing/issues/5086'
+ type: string
+ kind:
+ description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
+ type: string
+ name:
+ description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
+ type: string
+ namespace:
+ description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
+ This is optional field, it gets defaulted to the object
+ holding it if left out.'
+ type: string
+ required:
+ - kind
+ - name
+ type: object
+ uri:
+ description: URI can be an absolute URL(non-empty scheme and non-empty
+ host) pointing to the target or a relative URI. Relative URIs
+ will be resolved using the base URI retrieved from Ref.
+ type: string
+ type: object
required:
- flow
type: object
@@ -26354,6 +26401,36 @@
- patch
- update
- watch
+- apiGroups:
+ - eventing.knative.dev
+ resources:
+ - triggers
+ - triggers/status
+ - triggers/finalizers
+ verbs:
+ - create
+ - delete
+ - deletecollection
+ - get
+ - list
+ - patch
+ - update
+ - watch
+- apiGroups:
+ - sources.knative.dev
+ resources:
+ - sinkbindings
+ - sinkbindings/status
+ - sinkbindings/finalizers
+ verbs:
+ - create
+ - delete
+ - deletecollection
+ - get
+ - list
+ - patch
+ - update
+ - watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
diff --git a/test/testdata/sonataflow.org_v1alpha08_sonataflow_vet_event.yaml b/test/testdata/sonataflow.org_v1alpha08_sonataflow_vet_event.yaml
new file mode 100644
index 0000000..05e3a5b
--- /dev/null
+++ b/test/testdata/sonataflow.org_v1alpha08_sonataflow_vet_event.yaml
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+apiVersion: sonataflow.org/v1alpha08
+kind: SonataFlow
+metadata:
+ name: vet
+ annotations:
+ sonataflow.org/description: Vet service call via events
+ sonataflow.org/version: 0.0.1
+spec:
+ sink:
+ ref:
+ name: default
+ namespace: default
+ apiVersion: eventing.knative.dev/v1
+ kind: Broker
+ flow:
+ events:
+ - name: MakeVetAppointment
+ source: VetServiceSource
+ type: events.vet.appointments
+ kind: produced
+ - name: VetAppointmentInfo
+ source: VetServiceSource
+ type: events.vet.appointments
+ - name: VetAppointmentRequestReceived
+ source: checkAccountInfo
+ type: events.vet.appointments.request
+ functions:
+ - name: StoreNewPatientInfo
+ operation: specs/services.yaml#checkAccountInfo
+ states:
+ - name: AppointmentRequestReceived
+ type: event
+ onEvents:
+ - eventRefs:
+ - VetAppointmentRequestReceived
+ actions:
+ - name: checkAccount
+ functionRef:
+ refName: checkAccountInfo
+ arguments:
+ account: "${ .accountId }"
+ transition: MakeVetAppointmentState
+ - name: MakeVetAppointmentState
+ type: callback
+ action:
+ name: MakeAppointmentAction
+ eventRef:
+ triggerEventRef: MakeVetAppointment
+ data: "${ .patientInfo }"
+ eventRef: VetAppointmentInfo
+ timeouts:
+ stateExecTimeout: PT15M
+ eventDataFilter:
+ toStateData: .test
+ end: true
diff --git a/test/yaml.go b/test/yaml.go
index 5638e53..8c3692a 100644
--- a/test/yaml.go
+++ b/test/yaml.go
@@ -44,6 +44,7 @@
SonataFlowGreetingsWithDataInputSchemaCR = "sonataflow.org_v1alpha08_sonataflow_greetings_datainput.yaml"
SonataFlowGreetingsWithStaticResourcesCR = "sonataflow.org_v1alpha08_sonataflow-metainf.yaml"
SonataFlowSimpleOpsYamlCR = "sonataflow.org_v1alpha08_sonataflow-simpleops.yaml"
+ SonataFlowVetWithEventCR = "sonataflow.org_v1alpha08_sonataflow_vet_event.yaml"
SonataFlowGreetingsDataInputSchemaConfig = "v1_configmap_greetings_datainput.yaml"
SonataFlowGreetingsStaticFilesConfig = "v1_configmap_greetings_staticfiles.yaml"
sonataFlowPlatformYamlCR = "sonataflow.org_v1alpha08_sonataflowplatform.yaml"
@@ -202,6 +203,10 @@
return NewSonataFlow(sonataFlowSampleYamlCR, namespace)
}
+func GetVetEventSonataFlow(namespace string) *operatorapi.SonataFlow {
+ return GetSonataFlow(SonataFlowVetWithEventCR, namespace)
+}
+
func GetBaseSonataFlowWithDevProfile(namespace string) *operatorapi.SonataFlow {
return NewSonataFlow(sonataFlowSampleYamlCR, namespace, SetDevProfile)
}