The main workflow process the incoming Order event and start a parallel state calling two subflows: Fraud Handling and Shipping Handling. The workflow will end once both subflows end.
Fraud Handling will produce a new FraudEvaluation
event if the order is above 1000 USD. Any other system or service in the architecture can then read this event and react upon it, like canceling the order for example.
In parallel, regarding or not the order would need fraud evaluation, the workflow will produce events classifying the required Shipping service: International or Domestic. For this example, domestic shipping is any order with address within US.
This quickstart requires Knative Eventing to be available in your cluster:
For local testing only you can use Podman or Docker to simulate an application receiving your events.
You will need:
When using native image compilation, you will also need:
K_SINK=http://localhost:8181 mvn clean compile quarkus:dev
K_SINK
is the environment variable injected by the Knative Eventing platform once we deploy the application on a Kubernetes cluster. Instead of hardcoding the endpoint where we are going to send our produced messages, we rely on Knative to inject this information in runtime.
The environment variable will be assigned to the SmallRye HTTP connector in runtime:
mp.messaging.outgoing.kogito_outgoing_stream.url=${K_SINK}
.
For local tests we will mock the endpoint where the produced messages supposed to be delivered.
mvn clean package java -jar target/quarkus-app/quarkus-run.jar
or on Windows
mvn clean package java -jar target\quarkus-app\quarkus-run.jar
Note that this requires GRAALVM_HOME to point to a valid GraalVM installation
mvn clean package -Pnative
To run the generated native executable, generated in target/
, execute
./target/serverless-workflow-order-processing-runner
You can take a look at the OpenAPI definition - automatically generated and included in this service - to determine all available operations exposed by this service.
For easy readability you can visualize the OpenAPI definition file using a UI tool like for example available Swagger UI.
In addition, various clients to interact with this service can be easily generated using this OpenAPI definition.
When running in either Quarkus Development or Native mode, we also leverage the Quarkus OpenAPI extension that exposes Swagger UI that you can use to look at available REST endpoints and send test requests.
First thing, fire up the sink application using podman/docker:
$ podman run --rm -it -p 8181:8080 gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
This is the same image used by Knative Eventing demos. It's running on port 8181 to not clash with the example application.
Then run the application with:
$ K_SINK=http://localhost:8181 mvn clean quarkus:dev __ ____ __ _____ ___ __ ____ ______ --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \ --\___\_\____/_/ |_/_/|_/_/|_|\____/___/ 2021-04-13 17:39:39,488 INFO [org.kie.kog.cod.api.uti.AddonsConfigDiscovery] (build-22) Performed addonsConfig discovery, found: AddonsConfig{usePersistence=false, useTracing=false, useMonitoring=false, usePrometheusMonitoring=false, useKnativeEventing=true, useCloudEvents=true, useExplainability=false} 2021-04-13 17:39:39,612 INFO [io.ser.api.wor.BaseWorkflow] (build-22) Unable to convert as json markup, trying as yaml 2021-04-13 17:39:39,676 INFO [io.ser.api.wor.BaseWorkflow] (build-22) Unable to convert as json markup, trying as yaml 2021-04-13 17:39:39,677 INFO [io.ser.api.wor.BaseWorkflow] (build-22) Unable to convert as json markup, trying as yaml 2021-04-13 17:39:39,746 INFO [org.kie.kog.cod.cor.uti.ApplicationGeneratorDiscovery] (build-22) Generator discovery performed, found [openapispecs, processes] 2021-04-13 17:39:40,193 INFO [org.kie.kog.cod.pro.ProcessCodegen] (build-22) Knative Eventing addon enabled, generating CloudEvent HTTP listener 2021-04-13 17:39:41,034 WARN [io.qua.arc.pro.BeanArchives] (build-45) Failed to index byte: Class does not exist in ClassLoader QuarkusClassLoader:Deployment Class Loader 2021-04-13 17:39:41,754 INFO [org.kie.kog.ser.eve.imp.AbstractMessageConsumer] (Quarkus Main Thread) Consumer for class com.fasterxml.jackson.databind.JsonNode started. 2021-04-13 17:39:41,776 INFO [org.kie.kog.add.clo.qua.QuarkusKogitoExtensionInitializer] (Quarkus Main Thread) Registered Kogito CloudEvent extension 2021-04-13 17:39:41,827 INFO [io.quarkus] (Quarkus Main Thread) serverless-workflow-order-processing 999-SNAPSHOT on JVM (powered by Quarkus 1.11.5.Final) started in 2.815s. Listening on: http://localhost:8080 2021-04-13 17:39:41,827 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated. 2021-04-13 17:39:41,827 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, mutiny, rest-client, rest-client-jackson, resteasy, resteasy-jackson, servlet, smallrye-context-propagation, smallrye-health, smallrye-openapi, smallrye-reactive-messaging, swagger-ui, vertx]
Now send a message to the application on 8080 port using the cloud event format with curl
:
$ curl -X POST \ -H "content-type: application/json" \ -H "ce-specversion: 1.0" \ -H "ce-source: /from/localhost" \ -H "ce-type: orderEvent" \ -H "ce-id: f0643c68-609c-48aa-a820-5df423fa4fe0" \ -d ' {"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":10000,"description":"iPhone 12"}' \ http://localhost:8080
You should see an output like this one in the running container terminal:
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: fraudEvaluation source: /process/fraudhandling id: 2acd4329-a215-4460-9750-d7f07eb2d5e5 time: 2021-04-13T20:40:38.394409Z Extensions, kogitoparentprociid: f12e91c0-8980-40b6-a49b-3c35ce435718 kogitoprocid: fraudhandling kogitoprocinstanceid: b822962b-959d-412c-bb4b-80a0cdcdf7f0 kogitorootprocid: orderworkflow kogitorootprociid: f12e91c0-8980-40b6-a49b-3c35ce435718 kogitousertaskist: 1 Data, {"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":10000,"description":"iPhone 12","fraudEvaluation":true} ☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: internationalShipping source: /process/shippinghandling id: d557fad8-81b5-482a-b981-5ecb267a92f9 time: 2021-04-13T20:40:38.431677Z Extensions, kogitoparentprociid: f12e91c0-8980-40b6-a49b-3c35ce435718 kogitoprocid: shippinghandling kogitoprocinstanceid: ba57743d-521f-41f2-864a-a7b3f68d35af kogitorootprocid: orderworkflow kogitorootprociid: f12e91c0-8980-40b6-a49b-3c35ce435718 kogitousertaskist: 1 Data, {"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":10000,"description":"iPhone 12","fraudEvaluation":true,"shipping":"international"}
You will notice that we received two events: one for Fraud Evaluation and another one for International Shipping processing.
Additionally, there are some extension attributes that starts with kogito
to provide some context of the execution and the event producer.
To take the other path of the workflow send this message to your application:
$ curl -X POST \ -H "content-type: application/json" \ -H "ce-specversion: 1.0" \ -H "ce-source: /from/localhost" \ -H "ce-type: orderEvent" \ -H "ce-id: f0643c68-609c-48aa-a820-5df423fa4fe0" \ -d ' {"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":500,"description":"iPhone 7"}' \ http://localhost:8080
Since the value is under 1000 USD, we will only receive the International Shipping event:
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: internationalShipping source: /process/shippinghandling id: 096d1009-7566-41ac-a3f1-ab0756172adc time: 2021-04-13T20:44:12.391093Z Extensions, kogitoparentprociid: 4ae2d088-5492-417a-b625-3a4ac0e91403 kogitoprocid: shippinghandling kogitoprocinstanceid: 2e9fca25-c1f9-4e31-bc08-0ad740cfea5d kogitorootprocid: orderworkflow kogitorootprociid: 4ae2d088-5492-417a-b625-3a4ac0e91403 kogitousertaskist: 1 Data, {"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":500,"description":"iPhone 12","shipping":"international"}
We have prepared a knative
Maven profile to build the service image locally and all the Knative resources you need to get started.
Just make sure your cluster has Knative Eventing available:
KogitoSource
via command line.eval $(minikube docker-env)
to build the image directly into the Minikube registry.mvn clean install -Pknative -Dnamespace=<your namespace>
to build the image and the Knative resources for your application to run.kubectl apply -f target/kubernetes/knative.yml,target/kubernetes/kogito.yml
. It will deploy the objects from knative.yml
and kogito.yml
generated files.curl
from the terminal like you did in the previously steps. To see what's going on, just query for one of the Knative service sinks created on step #5. You should see something like:☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: internationalShipping source: /process/shippinghandling id: f1193deb-9c4e-4e90-8aab-27fdba4ed740 time: 2021-04-15T16:12:04.455905Z Extensions, knativearrivaltime: 2021-04-15T16:12:04.657355879Z kogitoparentprociid: 836fbb3b-a68f-487b-9d96-cc93ca387ab9 kogitoprocid: shippinghandling kogitoprocinstanceid: 1cc0d2b9-c5fb-445c-8229-5790e7a7b254 kogitorootprocid: orderworkflow kogitorootprociid: 836fbb3b-a68f-487b-9d96-cc93ca387ab9 kogitousertaskist: 1 Data, {"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":500,"description":"iPhone 7","shipping":"international"}
Ideally, you installed Knative on Minikube via their quickstart. Doing so, you will have installed nip.io DNS and will be able to access the services via their exposed Knative Routes.
Alternatively, if you installed via Knative Operators and Istio, you will need to follow this procedure in order to access the service:
Run minikube tunnel
Define the Ingress Gateway INGRESSGATEWAY=istio-ingressgateway
Get the Gateway IP with:
export GATEWAY_IP=`kubectl get svc $INGRESSGATEWAY --namespace istio-system \ --output jsonpath="{.status.loadBalancer.ingress[*]['ip']}"`
Run the curl
command using the Gateway URL. For example:
curl -X POST \ -H "content-type: application/json" \ -H "ce-specversion: 1.0" \ -H "ce-source: /from/localhost" \ -H "ce-type: orderEvent" \ -H "ce-id: f0643c68-609c-48aa-a820-5df423fa4fe0" \ -d ' {"id":"f0643c68-609c-48aa-a820-5df423fa4fe0","country":"Brazil","total":15000,"description":"iPhone 7"}' \ http://${GATEWAY_IP} --header "Host:serverless-workflow-order-processing.kogito.example.com"