| apiVersion: camel.apache.org/v1 |
| kind: Integration |
| metadata: |
| name: Postman Demo |
| spec: |
| flows: |
| - rest: |
| post: |
| - to: direct:post |
| path: /parcels |
| consumes: application/json |
| produces: application/json |
| - route: |
| from: |
| uri: direct:post |
| steps: |
| - log: |
| message: 'Received: ${body}' |
| - multicast: |
| steps: |
| - to: |
| uri: kamelet:kafka-not-secured-sink |
| parameters: |
| topic: parcels |
| bootstrapServers: '{{kafka-brokers}}' |
| - to: |
| uri: kamelet:postgresql-sink |
| parameters: |
| serverName: '{{postgres-server}}' |
| serverPort: '5432' |
| username: postgres |
| password: postgres |
| databaseName: demo |
| query: >- |
| INSERT INTO parcels (id,address) VALUES |
| (:#id,:#address) ON CONFLICT (id) DO NOTHING |
| aggregationStrategy: >- |
| #class:org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy |
| parallelProcessing: true |
| streaming: true |
| id: post |
| - route: |
| from: |
| uri: kamelet:jms-apache-artemis-source |
| steps: |
| - to: |
| uri: xj:identity |
| parameters: |
| transformDirection: XML2JSON |
| - to: |
| uri: kamelet:kafka-not-secured-sink |
| parameters: |
| topic: payments |
| bootstrapServers: '{{kafka-brokers}}' |
| parameters: |
| destinationType: queue |
| destinationName: payments |
| brokerURL: '{{jms-broker}}' |
| id: payment |
| - route: |
| from: |
| uri: kamelet:kafka-not-secured-source |
| steps: |
| - log: |
| message: 'Aggegating: ${body}' |
| - unmarshal: |
| json: |
| library: jackson |
| - aggregate: |
| steps: |
| - choice: |
| when: |
| - expression: |
| groovy: |
| expression: >- |
| body.find { it.containsKey('status') }.status == |
| 'confirmed' |
| steps: |
| - marshal: |
| json: |
| library: jackson |
| - log: |
| message: 'Send to MQTT : ${body}' |
| - to: |
| uri: kamelet:mqtt-sink |
| parameters: |
| topic: deliveries |
| brokerUrl: '{{mqtt-broker}}' |
| otherwise: |
| steps: |
| - setBody: |
| expression: |
| groovy: |
| expression: 'body.find { it.containsKey(''status'') } ' |
| - marshal: |
| json: |
| library: jackson |
| - log: |
| message: 'Send to database: ${body}' |
| - to: |
| uri: kamelet:postgresql-sink |
| parameters: |
| serverName: '{{postgres-server}}' |
| serverPort: '5432' |
| username: postgres |
| password: postgres |
| databaseName: demo |
| query: >- |
| UPDATE parcels set status = 'CANCELED' WHERE |
| id = :#id |
| aggregationStrategy: aggregator |
| completionSize: 2 |
| correlationExpression: |
| groovy: |
| expression: body.get('id') |
| parameters: |
| topic: parcels,payments |
| bootstrapServers: '{{kafka-brokers}}' |
| autoCommitEnable: true |
| consumerGroup: postman |
| id: aggregator |
| - route: |
| from: |
| uri: kamelet:mqtt-source |
| steps: |
| - log: |
| message: 'Delivery: ${body}' |
| parameters: |
| topic: deliveries |
| brokerUrl: '{{mqtt-broker}}' |
| - beans: |
| - name: aggregator |
| type: org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy |