[KOGITO-7754] create Knative eventing resources (#350)

diff --git a/api/v1alpha08/sonataflow_types.go b/api/v1alpha08/sonataflow_types.go
index 04aa544..ac32fe3 100644
--- a/api/v1alpha08/sonataflow_types.go
+++ b/api/v1alpha08/sonataflow_types.go
@@ -659,6 +659,9 @@
 	PodTemplate PodTemplateSpec `json:"podTemplate,omitempty"`
 	// Persistence defines the database persistence configuration for the workflow
 	Persistence *PersistenceOptionsSpec `json:"persistence,omitempty"`
+	// Sink describes the sinkBinding details of this SonataFlow instance.
+	//+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="sink"
+	Sink *duckv1.Destination `json:"sink,omitempty"`
 }
 
 // SonataFlowStatus defines the observed state of SonataFlow
diff --git a/api/v1alpha08/zz_generated.deepcopy.go b/api/v1alpha08/zz_generated.deepcopy.go
index 75e8bf3..fb2db18 100644
--- a/api/v1alpha08/zz_generated.deepcopy.go
+++ b/api/v1alpha08/zz_generated.deepcopy.go
@@ -26,6 +26,7 @@
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/runtime"
 	"knative.dev/pkg/apis"
+	duckv1 "knative.dev/pkg/apis/duck/v1"
 )
 
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
@@ -1174,6 +1175,11 @@
 		*out = new(PersistenceOptionsSpec)
 		(*in).DeepCopyInto(*out)
 	}
+	if in.Sink != nil {
+		in, out := &in.Sink, &out.Sink
+		*out = new(duckv1.Destination)
+		(*in).DeepCopyInto(*out)
+	}
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowSpec.
diff --git a/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml b/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml
index cffce25..7597683 100644
--- a/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml
+++ b/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml
@@ -312,6 +312,9 @@
           definition. For example, a collection of OpenAPI specification files.
         displayName: resources
         path: resources
+      - description: Sink describes the sinkBinding details of this SonataFlow instance.
+        displayName: sink
+        path: sink
       statusDescriptors:
       - description: Address is used as a part of Addressable interface (status.address.url)
           for knative
@@ -401,6 +404,36 @@
           - update
           - watch
         - apiGroups:
+          - eventing.knative.dev
+          resources:
+          - triggers
+          - triggers/status
+          - triggers/finalizers
+          verbs:
+          - create
+          - delete
+          - deletecollection
+          - get
+          - list
+          - patch
+          - update
+          - watch
+        - apiGroups:
+          - sources.knative.dev
+          resources:
+          - sinkbindings
+          - sinkbindings/status
+          - sinkbindings/finalizers
+          verbs:
+          - create
+          - delete
+          - deletecollection
+          - get
+          - list
+          - patch
+          - update
+          - watch
+        - apiGroups:
           - coordination.k8s.io
           resources:
           - leases
diff --git a/bundle/manifests/sonataflow.org_sonataflows.yaml b/bundle/manifests/sonataflow.org_sonataflows.yaml
index 76c704f..6836cd4 100644
--- a/bundle/manifests/sonataflow.org_sonataflows.yaml
+++ b/bundle/manifests/sonataflow.org_sonataflows.yaml
@@ -9346,6 +9346,53 @@
                       type: object
                     type: array
                 type: object
+              sink:
+                description: Sink describes the sinkBinding details of this SonataFlow
+                  instance.
+                properties:
+                  CACerts:
+                    description: CACerts are Certification Authority (CA) certificates
+                      in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
+                      If set, these CAs are appended to the set of CAs provided by
+                      the Addressable target, if any.
+                    type: string
+                  ref:
+                    description: Ref points to an Addressable.
+                    properties:
+                      address:
+                        description: Address points to a specific Address Name.
+                        type: string
+                      apiVersion:
+                        description: API version of the referent.
+                        type: string
+                      group:
+                        description: 'Group of the API, without the version of the
+                          group. This can be used as an alternative to the APIVersion,
+                          and then resolved using ResolveGroup. Note: This API is
+                          EXPERIMENTAL and might break anytime. For more details:
+                          https://github.com/knative/eventing/issues/5086'
+                        type: string
+                      kind:
+                        description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
+                        type: string
+                      name:
+                        description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
+                        type: string
+                      namespace:
+                        description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
+                          This is optional field, it gets defaulted to the object
+                          holding it if left out.'
+                        type: string
+                    required:
+                    - kind
+                    - name
+                    type: object
+                  uri:
+                    description: URI can be an absolute URL(non-empty scheme and non-empty
+                      host) pointing to the target or a relative URI. Relative URIs
+                      will be resolved using the base URI retrieved from Ref.
+                    type: string
+                type: object
             required:
             - flow
             type: object
diff --git a/config/crd/bases/sonataflow.org_sonataflows.yaml b/config/crd/bases/sonataflow.org_sonataflows.yaml
index e521432..a9df118 100644
--- a/config/crd/bases/sonataflow.org_sonataflows.yaml
+++ b/config/crd/bases/sonataflow.org_sonataflows.yaml
@@ -9347,6 +9347,53 @@
                       type: object
                     type: array
                 type: object
+              sink:
+                description: Sink describes the sinkBinding details of this SonataFlow
+                  instance.
+                properties:
+                  CACerts:
+                    description: CACerts are Certification Authority (CA) certificates
+                      in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
+                      If set, these CAs are appended to the set of CAs provided by
+                      the Addressable target, if any.
+                    type: string
+                  ref:
+                    description: Ref points to an Addressable.
+                    properties:
+                      address:
+                        description: Address points to a specific Address Name.
+                        type: string
+                      apiVersion:
+                        description: API version of the referent.
+                        type: string
+                      group:
+                        description: 'Group of the API, without the version of the
+                          group. This can be used as an alternative to the APIVersion,
+                          and then resolved using ResolveGroup. Note: This API is
+                          EXPERIMENTAL and might break anytime. For more details:
+                          https://github.com/knative/eventing/issues/5086'
+                        type: string
+                      kind:
+                        description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
+                        type: string
+                      name:
+                        description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
+                        type: string
+                      namespace:
+                        description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
+                          This is optional field, it gets defaulted to the object
+                          holding it if left out.'
+                        type: string
+                    required:
+                    - kind
+                    - name
+                    type: object
+                  uri:
+                    description: URI can be an absolute URL(non-empty scheme and non-empty
+                      host) pointing to the target or a relative URI. Relative URIs
+                      will be resolved using the base URI retrieved from Ref.
+                    type: string
+                type: object
             required:
             - flow
             type: object
diff --git a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
index 1f65bbe..089ae24 100644
--- a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
+++ b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
@@ -196,6 +196,9 @@
           definition. For example, a collection of OpenAPI specification files.
         displayName: resources
         path: resources
+      - description: Sink describes the sinkBinding details of this SonataFlow instance.
+        displayName: sink
+        path: sink
       statusDescriptors:
       - description: Address is used as a part of Addressable interface (status.address.url)
           for knative
diff --git a/config/rbac/builder_role.yaml b/config/rbac/builder_role.yaml
index 9d00da3..70b2ab5 100644
--- a/config/rbac/builder_role.yaml
+++ b/config/rbac/builder_role.yaml
@@ -65,4 +65,34 @@
     - list
     - patch
     - update
+    - watch
+- apiGroups:
+    - eventing.knative.dev
+  resources:
+    - triggers
+    - triggers/status
+    - triggers/finalizers
+  verbs:
+    - create
+    - delete
+    - deletecollection
+    - get
+    - list
+    - patch
+    - update
+    - watch
+- apiGroups:
+    - sources.knative.dev
+  resources:
+    - sinkbindings
+    - sinkbindings/status
+    - sinkbindings/finalizers
+  verbs:
+    - create
+    - delete
+    - deletecollection
+    - get
+    - list
+    - patch
+    - update
     - watch
\ No newline at end of file
diff --git a/controllers/profiles/common/constants/workflows.go b/controllers/profiles/common/constants/workflows.go
index a64002a..8087f96 100644
--- a/controllers/profiles/common/constants/workflows.go
+++ b/controllers/profiles/common/constants/workflows.go
@@ -16,4 +16,11 @@
 
 const (
 	MicroprofileServiceCatalogPropertyPrefix = "org.kie.kogito.addons.discovery."
+	KogitoOutgoingEventsURL                  = "mp.messaging.outgoing.kogito_outgoing_stream.url"
+	KogitoOutgoingEventsConnector            = "mp.messaging.outgoing.kogito_outgoing_stream.connector"
+	KogitoIncomingEventsConnector            = "mp.messaging.incoming.kogito_incoming_stream.connector"
+	KogitoIncomingEventsPath                 = "mp.messaging.incoming.kogito_incoming_stream.path"
+	KnativeHealthEnabled                     = "org.kie.kogito.addons.knative.eventing.health-enabled"
+	KnativeInjectedEnvVar                    = "${K_SINK}"
+	KnativeEventingBrokerDefault             = "default"
 )
diff --git a/controllers/profiles/common/ensurer.go b/controllers/profiles/common/ensurer.go
index 980d5cb..a1b8c92 100644
--- a/controllers/profiles/common/ensurer.go
+++ b/controllers/profiles/common/ensurer.go
@@ -32,6 +32,7 @@
 
 var _ ObjectEnsurer = &defaultObjectEnsurer{}
 var _ ObjectEnsurer = &noopObjectEnsurer{}
+var _ ObjectsEnsurer = &defaultObjectsEnsurer{}
 
 type ObjectEnsurer interface {
 	Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error)
@@ -77,22 +78,10 @@
 	result := controllerutil.OperationResultNone
 
 	object, err := d.creator(workflow)
-	if err != nil {
+	if err != nil || object == nil {
 		return nil, result, err
 	}
-	if result, err = controllerutil.CreateOrPatch(ctx, d.c, object,
-		func() error {
-			for _, v := range visitors {
-				if visitorErr := v(object)(); visitorErr != nil {
-					return visitorErr
-				}
-			}
-			return controllerutil.SetControllerReference(workflow, object, d.c.Scheme())
-		}); err != nil {
-		return nil, result, err
-	}
-	klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace())
-	return object, result, nil
+	return ensureObject(ctx, workflow, visitors, result, d.c, object)
 }
 
 // defaultObjectEnsurerWithPlatform is the equivalent of defaultObjectEnsurer for resources that require a reference to the SonataFlowPlatform
@@ -136,3 +125,61 @@
 	result := controllerutil.OperationResultNone
 	return nil, result, nil
 }
+
+// ObjectsEnsurer is an ensurer to apply multiple objects
+type ObjectsEnsurer interface {
+	Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult
+}
+
+type ObjectEnsurerResult struct {
+	client.Object
+	Result controllerutil.OperationResult
+	Error  error
+}
+
+func NewObjectsEnsurer(client client.Client, creator ObjectsCreator) ObjectsEnsurer {
+	return &defaultObjectsEnsurer{
+		c:       client,
+		creator: creator,
+	}
+}
+
+type defaultObjectsEnsurer struct {
+	ObjectsEnsurer
+	c       client.Client
+	creator ObjectsCreator
+}
+
+func (d *defaultObjectsEnsurer) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult {
+	result := controllerutil.OperationResultNone
+
+	objects, err := d.creator(workflow)
+	if err != nil {
+		return []ObjectEnsurerResult{{nil, result, err}}
+	}
+	var ensureResult []ObjectEnsurerResult
+	for _, object := range objects {
+		ensureObject, c, err := ensureObject(ctx, workflow, visitors, result, d.c, object)
+		ensureResult = append(ensureResult, ObjectEnsurerResult{ensureObject, c, err})
+		if err != nil {
+			return ensureResult
+		}
+	}
+	return ensureResult
+}
+
+func ensureObject(ctx context.Context, workflow *operatorapi.SonataFlow, visitors []MutateVisitor, result controllerutil.OperationResult, c client.Client, object client.Object) (client.Object, controllerutil.OperationResult, error) {
+	if result, err := controllerutil.CreateOrPatch(ctx, c, object,
+		func() error {
+			for _, v := range visitors {
+				if visitorErr := v(object)(); visitorErr != nil {
+					return visitorErr
+				}
+			}
+			return controllerutil.SetControllerReference(workflow, object, c.Scheme())
+		}); err != nil {
+		return nil, result, err
+	}
+	klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace())
+	return object, result, nil
+}
diff --git a/controllers/profiles/common/knative.go b/controllers/profiles/common/knative.go
new file mode 100644
index 0000000..13f5bc4
--- /dev/null
+++ b/controllers/profiles/common/knative.go
@@ -0,0 +1,76 @@
+// Copyright 2024 Apache Software Foundation (ASF)
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package common
+
+import (
+	"context"
+
+	"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative"
+
+	operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+	"github.com/apache/incubator-kie-kogito-serverless-operator/log"
+	"k8s.io/klog/v2"
+	"sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+var _ KnativeEventingHandler = &knativeObjectManager{}
+
+type knativeObjectManager struct {
+	sinkBinding ObjectEnsurer
+	trigger     ObjectsEnsurer
+	*StateSupport
+}
+
+func NewKnativeEventingHandler(support *StateSupport) KnativeEventingHandler {
+	return &knativeObjectManager{
+		sinkBinding:  NewObjectEnsurer(support.C, SinkBindingCreator),
+		trigger:      NewObjectsEnsurer(support.C, TriggersCreator),
+		StateSupport: support,
+	}
+}
+
+type KnativeEventingHandler interface {
+	Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) ([]client.Object, error)
+}
+
+func (k knativeObjectManager) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) ([]client.Object, error) {
+	var objs []client.Object
+
+	if workflow.Spec.Flow.Events == nil {
+		// skip if no event is found
+		klog.V(log.I).InfoS("skip knative resource creation as no event is found")
+	} else if workflow.Spec.Sink == nil {
+		klog.V(log.I).InfoS("Spec.Sink is not provided")
+	} else if knativeAvail, err := knative.GetKnativeAvailability(k.Cfg); err != nil || knativeAvail == nil || !knativeAvail.Eventing {
+		klog.V(log.I).InfoS("Knative Eventing is not installed")
+	} else {
+		// create sinkBinding and trigger
+		sinkBinding, _, err := k.sinkBinding.Ensure(ctx, workflow)
+		if err != nil {
+			return objs, err
+		} else if sinkBinding != nil {
+			objs = append(objs, sinkBinding)
+		}
+
+		triggers := k.trigger.Ensure(ctx, workflow)
+		for _, trigger := range triggers {
+			if trigger.Error != nil {
+				return objs, trigger.Error
+			}
+			objs = append(objs, trigger.Object)
+		}
+	}
+	return objs, nil
+}
diff --git a/controllers/profiles/common/object_creators.go b/controllers/profiles/common/object_creators.go
index 7e08366..4b28e98 100644
--- a/controllers/profiles/common/object_creators.go
+++ b/controllers/profiles/common/object_creators.go
@@ -20,10 +20,21 @@
 package common
 
 import (
+	"fmt"
+	"strings"
+
+	"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef"
+
+	cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model"
+
 	"github.com/imdario/mergo"
 	appsv1 "k8s.io/api/apps/v1"
 	corev1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
+	sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
+	duckv1 "knative.dev/pkg/apis/duck/v1"
+	"knative.dev/pkg/tracker"
 	"sigs.k8s.io/controller-runtime/pkg/client"
 
 	operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
@@ -45,6 +56,9 @@
 // SonataFlowPlatform
 type ObjectCreatorWithPlatform func(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (client.Object, error)
 
+// ObjectsCreator creates multiple resources
+type ObjectsCreator func(workflow *operatorapi.SonataFlow) ([]client.Object, error)
+
 const (
 	defaultHTTPServicePort = 80
 
@@ -199,6 +213,85 @@
 	return service, nil
 }
 
+// SinkBindingCreator is an ObjectsCreator for SinkBinding.
+// It will create v1.SinkBinding based on events defined in workflow.
+func SinkBindingCreator(workflow *operatorapi.SonataFlow) (client.Object, error) {
+	lbl := workflowproj.GetDefaultLabels(workflow)
+
+	// skip if no produced event is found
+	if workflow.Spec.Sink == nil || !workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindProduced) {
+		return nil, nil
+	}
+
+	sink := workflow.Spec.Sink
+
+	// subject must be deployment to inject K_SINK, service won't work
+	sinkBinding := &sourcesv1.SinkBinding{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      strings.ToLower(fmt.Sprintf("%s-sb", workflow.Name)),
+			Namespace: workflow.Namespace,
+			Labels:    lbl,
+		},
+		Spec: sourcesv1.SinkBindingSpec{
+			SourceSpec: duckv1.SourceSpec{
+				Sink: *sink,
+			},
+			BindingSpec: duckv1.BindingSpec{
+				Subject: tracker.Reference{
+					Name:       workflow.Name,
+					Namespace:  workflow.Namespace,
+					APIVersion: "apps/v1",
+					Kind:       "Deployment",
+				},
+			},
+		},
+	}
+	return sinkBinding, nil
+}
+
+// TriggersCreator is an ObjectsCreator for Triggers.
+// It will create a list of eventingv1.Trigger based on events defined in workflow.
+func TriggersCreator(workflow *operatorapi.SonataFlow) ([]client.Object, error) {
+	var resultObjects []client.Object
+	lbl := workflowproj.GetDefaultLabels(workflow)
+
+	//consumed
+	events := workflow.Spec.Flow.Events
+	for _, event := range events {
+		// filter out produce events
+		if event.Kind == cncfmodel.EventKindProduced {
+			continue
+		}
+
+		// construct eventingv1.Trigger
+		trigger := &eventingv1.Trigger{
+			ObjectMeta: metav1.ObjectMeta{
+				Name:      strings.ToLower(fmt.Sprintf("%s-%s-trigger", workflow.Name, event.Name)),
+				Namespace: workflow.Namespace,
+				Labels:    lbl,
+			},
+			Spec: eventingv1.TriggerSpec{
+				Broker: constants.KnativeEventingBrokerDefault,
+				Filter: &eventingv1.TriggerFilter{
+					Attributes: eventingv1.TriggerFilterAttributes{
+						"type": event.Type,
+					},
+				},
+				Subscriber: duckv1.Destination{
+					Ref: &duckv1.KReference{
+						Name:       workflow.Name,
+						Namespace:  workflow.Namespace,
+						APIVersion: "v1",
+						Kind:       "Service",
+					},
+				},
+			},
+		}
+		resultObjects = append(resultObjects, trigger)
+	}
+	return resultObjects, nil
+}
+
 // OpenShiftRouteCreator is an ObjectCreator for a basic Route for a workflow running on OpenShift.
 // It enables the exposition of the service using an OpenShift Route.
 // See: https://github.com/openshift/api/blob/d170fcdc0fa638b664e4f35f2daf753cb4afe36b/route/v1/route.crd.yaml
diff --git a/controllers/profiles/common/object_creators_test.go b/controllers/profiles/common/object_creators_test.go
index beb0436..236aa02 100644
--- a/controllers/profiles/common/object_creators_test.go
+++ b/controllers/profiles/common/object_creators_test.go
@@ -23,6 +23,8 @@
 	"context"
 	"testing"
 
+	sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
+
 	"github.com/magiconair/properties"
 	"github.com/stretchr/testify/assert"
 	appsv1 "k8s.io/api/apps/v1"
@@ -176,6 +178,35 @@
 	assert.Empty(t, flowContainer.Env)
 }
 
+func Test_ensureWorkflowSinkBindingIsCreated(t *testing.T) {
+	workflow := test.GetVetEventSonataFlow(t.Name())
+
+	//On Kubernetes we want the service exposed in Dev with NodePort
+	sinkBinding, _ := SinkBindingCreator(workflow)
+	sinkBinding.SetUID("1")
+	sinkBinding.SetResourceVersion("1")
+
+	reflectSinkBinding := sinkBinding.(*sourcesv1.SinkBinding)
+
+	assert.NotNil(t, reflectSinkBinding)
+	assert.NotNil(t, reflectSinkBinding.Spec)
+	assert.NotEmpty(t, reflectSinkBinding.Spec.Sink)
+	assert.Equal(t, reflectSinkBinding.Spec.Sink.Ref.Kind, "Broker")
+}
+
+func Test_ensureWorkflowTriggersAreCreated(t *testing.T) {
+	workflow := test.GetVetEventSonataFlow(t.Name())
+
+	//On Kubernetes we want the service exposed in Dev with NodePort
+	triggers, _ := TriggersCreator(workflow)
+
+	assert.NotEmpty(t, triggers)
+	assert.Len(t, triggers, 2)
+	for _, trigger := range triggers {
+		assert.Contains(t, []string{"vet-vetappointmentrequestreceived-trigger", "vet-vetappointmentinfo-trigger"}, trigger.GetName())
+	}
+}
+
 func TestMergePodSpec_WithPostgreSQL_and_JDBC_URL_field(t *testing.T) {
 	workflow := test.GetBaseSonataFlow(t.Name())
 	workflow.Spec = v1alpha08.SonataFlowSpec{
diff --git a/controllers/profiles/common/properties/application.go b/controllers/profiles/common/properties/application.go
index 4148914..9b59260 100644
--- a/controllers/profiles/common/properties/application.go
+++ b/controllers/profiles/common/properties/application.go
@@ -40,9 +40,6 @@
 var (
 	immutableApplicationProperties = fmt.Sprintf("quarkus.http.port=%d\n"+
 		"quarkus.http.host=0.0.0.0\n"+
-		// We disable the Knative health checks to not block the pod to run if Knative objects are not available
-		// See: https://kiegroup.github.io/kogito-docs/serverlessworkflow/latest/eventing/consume-produce-events-with-knative-eventing.html#ref-knative-eventing-add-on-source-configuration
-		"org.kie.kogito.addons.knative.eventing.health-enabled=false\n"+
 		"quarkus.devservices.enabled=false\n"+
 		"quarkus.kogito.devservices.enabled=false\n", constants.DefaultHTTPWorkflowPortInt)
 	_ AppPropertyHandler = &appPropertyHandler{}
@@ -160,6 +157,11 @@
 			return nil, err
 		}
 		props.Merge(p)
+		p, err = generateKnativeEventingWorkflowProperties(workflow)
+		if err != nil {
+			return nil, err
+		}
+		props.Merge(p)
 		props.Sort()
 	}
 	handler.defaultMutableProperties = props
diff --git a/controllers/profiles/common/properties/application_test.go b/controllers/profiles/common/properties/application_test.go
index e686aaa..7133db1 100644
--- a/controllers/profiles/common/properties/application_test.go
+++ b/controllers/profiles/common/properties/application_test.go
@@ -124,13 +124,12 @@
 	assert.NoError(t, err)
 	generatedProps, propsErr := properties.LoadString(props.WithUserProperties(userProperties).Build())
 	assert.NoError(t, propsErr)
-	assert.Equal(t, 7, len(generatedProps.Keys()))
+	assert.Equal(t, 6, len(generatedProps.Keys()))
 	assert.NotContains(t, "property1", generatedProps.Keys())
 	assert.NotContains(t, "property2", generatedProps.Keys())
 	assert.Equal(t, "http://greeting.default", generatedProps.GetString("kogito.service.url", ""))
 	assert.Equal(t, "8080", generatedProps.GetString("quarkus.http.port", ""))
 	assert.Equal(t, "0.0.0.0", generatedProps.GetString("quarkus.http.host", ""))
-	assert.Equal(t, "false", generatedProps.GetString("org.kie.kogito.addons.knative.eventing.health-enabled", ""))
 	assert.Equal(t, "false", generatedProps.GetString("quarkus.devservices.enabled", ""))
 	assert.Equal(t, "false", generatedProps.GetString("quarkus.kogito.devservices.enabled", ""))
 	assert.Equal(t, "false", generatedProps.GetString(constants.KogitoUserTasksEventsEnabled, ""))
@@ -157,7 +156,7 @@
 		Build())
 	generatedProps.DisableExpansion = true
 	assert.NoError(t, propsErr)
-	assert.Equal(t, 21, len(generatedProps.Keys()))
+	assert.Equal(t, 20, len(generatedProps.Keys()))
 	assert.NotContains(t, "property1", generatedProps.Keys())
 	assert.NotContains(t, "property2", generatedProps.Keys())
 	assertHasProperty(t, generatedProps, "service1", myService1Address)
@@ -181,7 +180,6 @@
 	assertHasProperty(t, generatedProps, "kogito.service.url", fmt.Sprintf("http://greeting.%s", defaultNamespace))
 	assertHasProperty(t, generatedProps, "quarkus.http.port", "8080")
 	assertHasProperty(t, generatedProps, "quarkus.http.host", "0.0.0.0")
-	assertHasProperty(t, generatedProps, "org.kie.kogito.addons.knative.eventing.health-enabled", "false")
 	assertHasProperty(t, generatedProps, "quarkus.devservices.enabled", "false")
 	assertHasProperty(t, generatedProps, "quarkus.kogito.devservices.enabled", "false")
 	assertHasProperty(t, generatedProps, constants.KogitoUserTasksEventsEnabled, "false")
@@ -276,7 +274,6 @@
 	assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace+"/v2/jobs/events", generatedProps.GetString(constants.JobServiceRequestEventsURL, ""))
 	assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, ""))
 	assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, ""))
-	assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoJobServiceURL, ""))
 
 	// disabling job service bypasses config of outgoing events url
 	platform.Spec.Services.JobService.Enabled = nil
diff --git a/controllers/profiles/common/properties/properties.go b/controllers/profiles/common/properties/properties.go
new file mode 100644
index 0000000..ec4c3b0
--- /dev/null
+++ b/controllers/profiles/common/properties/properties.go
@@ -0,0 +1,49 @@
+// Copyright 2024 Apache Software Foundation (ASF)
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package properties
+
+import (
+	operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+	"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants"
+	"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef"
+	"github.com/magiconair/properties"
+	cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model"
+)
+
+// GenerateKnativeEventingWorkflowProperties returns the set of application properties required for the workflow to produce or consume
+// Knative Events.
+// Never nil.
+func generateKnativeEventingWorkflowProperties(workflow *operatorapi.SonataFlow) (*properties.Properties, error) {
+	props := properties.NewProperties()
+	if workflow == nil || workflow.Spec.Sink == nil {
+		props.Set(constants.KnativeHealthEnabled, "false")
+		return props, nil
+	}
+	// verify ${K_SINK}
+	props.Set(constants.KnativeHealthEnabled, "true")
+	if workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindProduced) {
+		props.Set(constants.KogitoOutgoingEventsConnector, constants.QuarkusHTTP)
+		props.Set(constants.KogitoOutgoingEventsURL, constants.KnativeInjectedEnvVar)
+	}
+	if workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindConsumed) {
+		props.Set(constants.KogitoIncomingEventsConnector, constants.QuarkusHTTP)
+		var path = "/"
+		if workflow.Spec.Sink.URI != nil {
+			path = workflow.Spec.Sink.URI.Path
+		}
+		props.Set(constants.KogitoIncomingEventsPath, path)
+	}
+	return props, nil
+}
diff --git a/controllers/profiles/common/reconciler.go b/controllers/profiles/common/reconciler.go
index da29a10..b280000 100644
--- a/controllers/profiles/common/reconciler.go
+++ b/controllers/profiles/common/reconciler.go
@@ -23,6 +23,8 @@
 	"context"
 	"fmt"
 
+	"k8s.io/client-go/rest"
+
 	"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"
 
 	"k8s.io/client-go/tools/record"
@@ -39,12 +41,13 @@
 // StateSupport is the shared structure with common accessors used throughout the whole reconciliation profiles
 type StateSupport struct {
 	C        client.Client
+	Cfg      *rest.Config
 	Catalog  discovery.ServiceCatalog
 	Recorder record.EventRecorder
 }
 
 // PerformStatusUpdate updates the SonataFlow Status conditions
-func (s StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operatorapi.SonataFlow) (bool, error) {
+func (s *StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operatorapi.SonataFlow) (bool, error) {
 	var err error
 	workflow.Status.ObservedGeneration = workflow.Generation
 	if err = s.C.Status().Update(ctx, workflow); err != nil {
diff --git a/controllers/profiles/dev/profile_dev.go b/controllers/profiles/dev/profile_dev.go
index 14dc188..0bd5a39 100644
--- a/controllers/profiles/dev/profile_dev.go
+++ b/controllers/profiles/dev/profile_dev.go
@@ -46,6 +46,7 @@
 func NewProfileReconciler(client client.Client, cfg *rest.Config, recorder record.EventRecorder) profiles.ProfileReconciler {
 	support := &common.StateSupport{
 		C:        client,
+		Cfg:      cfg,
 		Catalog:  discovery.NewServiceCatalogForConfig(client, cfg),
 		Recorder: recorder,
 	}
diff --git a/controllers/profiles/dev/states_dev.go b/controllers/profiles/dev/states_dev.go
index e186a35..17bd5a8 100644
--- a/controllers/profiles/dev/states_dev.go
+++ b/controllers/profiles/dev/states_dev.go
@@ -117,6 +117,12 @@
 	}
 	objs = append(objs, route)
 
+	if knativeObjs, err := common.NewKnativeEventingHandler(e.StateSupport).Ensure(ctx, workflow); err != nil {
+		return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, objs, err
+	} else {
+		objs = append(objs, knativeObjs...)
+	}
+
 	// First time reconciling this object, mark as wait for deployment
 	if workflow.Status.GetTopLevelCondition().IsUnknown() {
 		klog.V(log.I).InfoS("Workflow is in WaitingForDeployment Condition")
diff --git a/controllers/profiles/prod/deployment_handler.go b/controllers/profiles/prod/deployment_handler.go
index 4581f2c..af999e5 100644
--- a/controllers/profiles/prod/deployment_handler.go
+++ b/controllers/profiles/prod/deployment_handler.go
@@ -83,7 +83,12 @@
 		return reconcile.Result{}, nil, err
 	}
 
+	knativeObjs, err := common.NewKnativeEventingHandler(d.StateSupport).Ensure(ctx, workflow)
+	if err != nil {
+		return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, nil, err
+	}
 	objs := []client.Object{deployment, service, managedPropsCM}
+	objs = append(objs, knativeObjs...)
 
 	if deploymentOp == controllerutil.OperationResultCreated {
 		workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.WaitingForDeploymentReason, "")
diff --git a/controllers/profiles/prod/profile_prod.go b/controllers/profiles/prod/profile_prod.go
index 33932d2..b335326 100644
--- a/controllers/profiles/prod/profile_prod.go
+++ b/controllers/profiles/prod/profile_prod.go
@@ -72,6 +72,7 @@
 func NewProfileReconciler(client client.Client, cfg *rest.Config, recorder record.EventRecorder) profiles.ProfileReconciler {
 	support := &common.StateSupport{
 		C:        client,
+		Cfg:      cfg,
 		Catalog:  discovery.NewServiceCatalogForConfig(client, cfg),
 		Recorder: recorder,
 	}
@@ -93,6 +94,7 @@
 func NewProfileForOpsReconciler(client client.Client, cfg *rest.Config, recorder record.EventRecorder) profiles.ProfileReconciler {
 	support := &common.StateSupport{
 		C:        client,
+		Cfg:      cfg,
 		Catalog:  discovery.NewServiceCatalogForConfig(client, cfg),
 		Recorder: recorder,
 	}
diff --git a/controllers/workflowdef/services.go b/controllers/workflowdef/services.go
new file mode 100644
index 0000000..03db8ef
--- /dev/null
+++ b/controllers/workflowdef/services.go
@@ -0,0 +1,29 @@
+// Copyright 2024 Apache Software Foundation (ASF)
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package workflowdef
+
+import (
+	operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+	cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model"
+)
+
+func ContainsEventKind(workflow *operatorapi.SonataFlow, eventKind cncfmodel.EventKind) bool {
+	for _, event := range workflow.Spec.Flow.Events {
+		if event.Kind == eventKind {
+			return true
+		}
+	}
+	return false
+}
diff --git a/main.go b/main.go
index a1040f7..240a1d2 100644
--- a/main.go
+++ b/main.go
@@ -23,6 +23,9 @@
 	"flag"
 	"os"
 
+	eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
+	sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
+
 	"k8s.io/klog/v2/klogr"
 
 	"k8s.io/klog/v2"
@@ -54,6 +57,8 @@
 func init() {
 	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
 	utilruntime.Must(operatorapi.AddToScheme(scheme))
+	utilruntime.Must(sourcesv1.AddToScheme(scheme))
+	utilruntime.Must(eventingv1.AddToScheme(scheme))
 	//+kubebuilder:scaffold:scheme
 }
 
diff --git a/operator.yaml b/operator.yaml
index a4693bd..fb73c10 100644
--- a/operator.yaml
+++ b/operator.yaml
@@ -26173,6 +26173,53 @@
                       type: object
                     type: array
                 type: object
+              sink:
+                description: Sink describes the sinkBinding details of this SonataFlow
+                  instance.
+                properties:
+                  CACerts:
+                    description: CACerts are Certification Authority (CA) certificates
+                      in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
+                      If set, these CAs are appended to the set of CAs provided by
+                      the Addressable target, if any.
+                    type: string
+                  ref:
+                    description: Ref points to an Addressable.
+                    properties:
+                      address:
+                        description: Address points to a specific Address Name.
+                        type: string
+                      apiVersion:
+                        description: API version of the referent.
+                        type: string
+                      group:
+                        description: 'Group of the API, without the version of the
+                          group. This can be used as an alternative to the APIVersion,
+                          and then resolved using ResolveGroup. Note: This API is
+                          EXPERIMENTAL and might break anytime. For more details:
+                          https://github.com/knative/eventing/issues/5086'
+                        type: string
+                      kind:
+                        description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
+                        type: string
+                      name:
+                        description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
+                        type: string
+                      namespace:
+                        description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
+                          This is optional field, it gets defaulted to the object
+                          holding it if left out.'
+                        type: string
+                    required:
+                    - kind
+                    - name
+                    type: object
+                  uri:
+                    description: URI can be an absolute URL(non-empty scheme and non-empty
+                      host) pointing to the target or a relative URI. Relative URIs
+                      will be resolved using the base URI retrieved from Ref.
+                    type: string
+                type: object
             required:
             - flow
             type: object
@@ -26354,6 +26401,36 @@
   - patch
   - update
   - watch
+- apiGroups:
+  - eventing.knative.dev
+  resources:
+  - triggers
+  - triggers/status
+  - triggers/finalizers
+  verbs:
+  - create
+  - delete
+  - deletecollection
+  - get
+  - list
+  - patch
+  - update
+  - watch
+- apiGroups:
+  - sources.knative.dev
+  resources:
+  - sinkbindings
+  - sinkbindings/status
+  - sinkbindings/finalizers
+  verbs:
+  - create
+  - delete
+  - deletecollection
+  - get
+  - list
+  - patch
+  - update
+  - watch
 ---
 apiVersion: rbac.authorization.k8s.io/v1
 kind: ClusterRole
diff --git a/test/testdata/sonataflow.org_v1alpha08_sonataflow_vet_event.yaml b/test/testdata/sonataflow.org_v1alpha08_sonataflow_vet_event.yaml
new file mode 100644
index 0000000..05e3a5b
--- /dev/null
+++ b/test/testdata/sonataflow.org_v1alpha08_sonataflow_vet_event.yaml
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+apiVersion: sonataflow.org/v1alpha08
+kind: SonataFlow
+metadata:
+  name: vet
+  annotations:
+    sonataflow.org/description: Vet service call via events
+    sonataflow.org/version: 0.0.1
+spec:
+  sink:
+    ref:
+      name: default
+      namespace: default
+      apiVersion: eventing.knative.dev/v1
+      kind: Broker
+  flow:
+    events:
+      - name: MakeVetAppointment
+        source: VetServiceSource
+        type: events.vet.appointments
+        kind: produced
+      - name: VetAppointmentInfo
+        source: VetServiceSource
+        type: events.vet.appointments
+      - name: VetAppointmentRequestReceived
+        source: checkAccountInfo
+        type: events.vet.appointments.request
+    functions:
+      - name: StoreNewPatientInfo
+        operation: specs/services.yaml#checkAccountInfo
+    states:
+      - name: AppointmentRequestReceived
+        type: event
+        onEvents:
+          - eventRefs:
+              - VetAppointmentRequestReceived
+            actions:
+              - name: checkAccount
+                functionRef:
+                  refName: checkAccountInfo
+                  arguments:
+                    account: "${ .accountId }"
+        transition: MakeVetAppointmentState
+      - name: MakeVetAppointmentState
+        type: callback
+        action:
+          name: MakeAppointmentAction
+          eventRef:
+            triggerEventRef: MakeVetAppointment
+            data: "${ .patientInfo }"
+        eventRef: VetAppointmentInfo
+        timeouts:
+          stateExecTimeout: PT15M
+        eventDataFilter:
+          toStateData: .test
+        end: true
diff --git a/test/yaml.go b/test/yaml.go
index 5638e53..8c3692a 100644
--- a/test/yaml.go
+++ b/test/yaml.go
@@ -44,6 +44,7 @@
 	SonataFlowGreetingsWithDataInputSchemaCR  = "sonataflow.org_v1alpha08_sonataflow_greetings_datainput.yaml"
 	SonataFlowGreetingsWithStaticResourcesCR  = "sonataflow.org_v1alpha08_sonataflow-metainf.yaml"
 	SonataFlowSimpleOpsYamlCR                 = "sonataflow.org_v1alpha08_sonataflow-simpleops.yaml"
+	SonataFlowVetWithEventCR                  = "sonataflow.org_v1alpha08_sonataflow_vet_event.yaml"
 	SonataFlowGreetingsDataInputSchemaConfig  = "v1_configmap_greetings_datainput.yaml"
 	SonataFlowGreetingsStaticFilesConfig      = "v1_configmap_greetings_staticfiles.yaml"
 	sonataFlowPlatformYamlCR                  = "sonataflow.org_v1alpha08_sonataflowplatform.yaml"
@@ -202,6 +203,10 @@
 	return NewSonataFlow(sonataFlowSampleYamlCR, namespace)
 }
 
+func GetVetEventSonataFlow(namespace string) *operatorapi.SonataFlow {
+	return GetSonataFlow(SonataFlowVetWithEventCR, namespace)
+}
+
 func GetBaseSonataFlowWithDevProfile(namespace string) *operatorapi.SonataFlow {
 	return NewSonataFlow(sonataFlowSampleYamlCR, namespace, SetDevProfile)
 }