This example contains a workflow service to demonstrate correlation feature using callback states and events. Each callback state withing the workflow publishes an event and wait for a response event, there is an incoming event, it is matched with the proper workflow instance by using the correlation attribute, in this case it is the userid
. So for every incoming event the userid is used to properly find and trigger the proper workflow instance. The correlation is defined in the workflow definition file that is described using JSON format as defined in the CNCF Serverless Workflow specification.
"correlation": [ { "contextAttributeName": "userid" } ]
Events should be in CloudEvent format and the correlation attribute should be defined as an extension attribute, in this case userid
.
The workflow example is started by events as well, so a start event should be published with the same correlation attribute `userid, that will be used to match correlations for the started workflow instance.
In the example the event broker used to publish/receive the events is Kafka, and the used topics are the same described as the event types in the workflow definition.
{ "name": "newAccountEvent", "source": "", "type": "newAccountEventType", "correlation": [ { "contextAttributeName": "userid" } ] }
For simplicity, the events are published and consumed in the same application running the workflow, but in a real use case they should come from different services interacting with the workflow, see EventsService.
To start the workflow as mentioned, it is required an event to be published which is going to be consumed by the workflow service starting a new instance. A helper REST endpoint was recreated to simplify this step, so once a POST request is received it publishes the start event to the broker see WorkflowResource.
All eventing configuration and the broker parameters are in done in the application.properties.
This quickstart requires an Apache Kafka to be available and by default expects it to be on default port and localhost.
https://kafka.apache.org/quickstart
To publish and consume the event, topic “move” is used.
Optionally and for convenience, a docker-compose configuration file is provided in the path docker-compose/, where you can just run the command from there:
docker-compose up
In this way a container for Kafka will be started on port 9092.
Alternatively, you can run this example using persistence with a MongoDB server.
Configuration for setting up the connection can be found in applications.properties file, which follows the Quarkus MongoDB Client settings, for more information please check MongoDB Client Configuration Reference.
Optionally and for convenience, a docker-compose configuration file is provided in the path docker-compose/, where you can just run the command from there:
docker-compose up
You will need:
When using native image compilation, you will also need:
mvn clean package quarkus:dev
mvn clean package java -jar target/quarkus-app/quarkus-run.jar
or on Windows
mvn clean package java -jar target\quarkus-app\quarkus-run.jar
To enable persistence, please append -Ppersistence
to your Maven command. That will ensure the correct dependencies are in place, and automatically set the required properties to connect with the PostgreSQL instance from the provided docker compose.
mvn clean package -Peristence
Note that this requires GRAALVM_HOME to point to a valid GraalVM installation
mvn clean package -Pnative
To run the generated native executable, generated in target/
, execute
./target/serverless-workflow-correlation-quarkus-{version}-runner
The service based on the JSON workflow definition can be access by sending a request to http://localhost:8080/account/{userid}
Complete curl command can be found below:
curl -X POST -H 'Content-Type:application/json' -H 'Accept:application/json' http://localhost:8080/account/12345
After a while (note that to you need give time for event to be consumed) you should see the log message printed in the console, and the workflow is completed.
2022-05-12 11:02:15,891 INFO [org.kie.kog.ser.eve.imp.ProcessEventDispatcher] (kogito-event-executor-0) Starting new process instance with signal 'newAccountEventType' 2022-05-12 11:02:18,909 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-9) SRMSG18256: Initialize record store for topic-partition 'validateAccountEmail-0' at position 16. 2022-05-12 11:02:18,919 INFO [org.kie.kog.exa.EventsService] (pool-1-thread-1) Validate Account received. Workflow data JsonCloudEventData{node={"email":"test@test.com","userId":"12345"}} 2022-05-12 11:02:19,931 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-5) SRMSG18256: Initialize record store for topic-partition 'validatedAccountEmail-0' at position 16. 2022-05-12 11:02:20,962 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-8) SRMSG18256: Initialize record store for topic-partition 'activateAccount-0' at position 16. 2022-05-12 11:02:20,971 INFO [org.kie.kog.exa.EventsService] (pool-1-thread-1) Activate Account received. Workflow data JsonCloudEventData{node={"email":"test@test.com","userId":"12345"}} 2022-05-12 11:02:21,994 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-6) SRMSG18256: Initialize record store for topic-partition 'activatedAccount-0' at position 7. 2022-05-12 11:02:22,006 INFO [org.kie.kog.exa.EventsService] (kogito-event-executor-0) Complete Account Creation received. Workflow data {"email":"test@test.com","userId":"12345"}, KogitoProcessInstanceId 0cef0eef-06c8-4433-baea-505fa8d45f68