blob: acbe3ad876251e31c305150efb1edcc8023085c4 [file] [log] [blame]
apiVersion: camel.apache.org/v1
kind: Integration
metadata:
name: Postman Demo
spec:
flows:
- rest:
post:
- to: direct:post
path: /parcels
consumes: application/json
produces: application/json
- route:
from:
uri: direct:post
steps:
- log:
message: 'Received: ${body}'
- multicast:
steps:
- to:
uri: kamelet:kafka-not-secured-sink
parameters:
topic: parcels
bootstrapServers: '{{kafka-brokers}}'
- to:
uri: kamelet:postgresql-sink
parameters:
serverName: '{{postgres-server}}'
serverPort: '5432'
username: postgres
password: postgres
databaseName: demo
query: >-
INSERT INTO parcels (id,address) VALUES
(:#id,:#address) ON CONFLICT (id) DO NOTHING
aggregationStrategy: >-
#class:org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy
parallelProcessing: true
streaming: true
id: post
- route:
from:
uri: kamelet:jms-apache-artemis-source
steps:
- to:
uri: xj:identity
parameters:
transformDirection: XML2JSON
- to:
uri: kamelet:kafka-not-secured-sink
parameters:
topic: payments
bootstrapServers: '{{kafka-brokers}}'
parameters:
destinationType: queue
destinationName: payments
brokerURL: '{{jms-broker}}'
id: payment
- route:
from:
uri: kamelet:kafka-not-secured-source
steps:
- log:
message: 'Aggegating: ${body}'
- unmarshal:
json:
library: jackson
- aggregate:
steps:
- choice:
when:
- expression:
groovy:
expression: >-
body.find { it.containsKey('status') }.status ==
'confirmed'
steps:
- marshal:
json:
library: jackson
- log:
message: 'Send to MQTT : ${body}'
- to:
uri: kamelet:mqtt-sink
parameters:
topic: deliveries
brokerUrl: '{{mqtt-broker}}'
otherwise:
steps:
- setBody:
expression:
groovy:
expression: 'body.find { it.containsKey(''status'') } '
- marshal:
json:
library: jackson
- log:
message: 'Send to database: ${body}'
- to:
uri: kamelet:postgresql-sink
parameters:
serverName: '{{postgres-server}}'
serverPort: '5432'
username: postgres
password: postgres
databaseName: demo
query: >-
UPDATE parcels set status = 'CANCELED' WHERE
id = :#id
aggregationStrategy: aggregator
completionSize: 2
correlationExpression:
groovy:
expression: body.get('id')
parameters:
topic: parcels,payments
bootstrapServers: '{{kafka-brokers}}'
autoCommitEnable: true
consumerGroup: postman
id: aggregator
- route:
from:
uri: kamelet:mqtt-source
steps:
- log:
message: 'Delivery: ${body}'
parameters:
topic: deliveries
brokerUrl: '{{mqtt-broker}}'
- beans:
- name: aggregator
type: org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy