blob: 7b56f72112fc086b4f21d1f616ad7c97f74f4c70 [file] [log] [blame]
== Kafka to AWS Bedrock Text Sink Kamelet
In this sample you'll use the Kafka Source Kamelet and the AWS Bedrock Text Sink Kamelet
=== Install JBang
First install JBang according to
When JBang is installed then you should be able to run from a shell:
$ jbang --version
This will output the version of JBang.
To run this example you can either install Camel on JBang via:
$ jbang app install camel@apache/camel
Which allows to run CamelJBang with `camel` as shown below.
=== Setup Kafka instance
You'll need to run a Kafka cluster to point to. In this case you could use an ansible role like
And set up a file deploy.yaml with the following content:
- name: role kafka
hosts: localhost
remote_user: user
- role: kafka-ansible-role
kafka_version: 3.6.1
path_dir: /home/user/
unarchive_dest_dir: /home/user/kafka/demo/
start_kafka: true
and then run
```shell script
ansible-playbook -v deploy.yaml
This should start a Kafka instance for you, on your local machine.
=== Set up AWS Bedrock
The Kamelet expects the AWS credentials to be placed in `/home/<user>/.aws/credentials`
That's reason why you see the useDefaultCredentialsProvider option set to true in the AWS Bedrock Text Sink configuration.
=== How to run
Then you can run this example using:
$ jbang -Dcamel.jbang.version=4.7.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<local_path_to_kamelets> kafka-aws-bedrock.camel.yaml
=== Sending two messages
In the log you should see the consumer:
2024-03-07 06:50:53.517 INFO 11498 --- [ main] el.impl.engine.AbstractCamelContext : Routes startup (total:1 started:1 kamelets:2)
2024-03-07 06:50:53.517 INFO 11498 --- [ main] el.impl.engine.AbstractCamelContext : Started route1 (kamelet://kafka-not-secured-source)
2024-03-07 06:50:53.517 INFO 11498 --- [ main] el.impl.engine.AbstractCamelContext : Apache Camel 4.7.0-SNAPSHOT (kafka-aws-bedrock) started in 817ms (build:0ms init:0ms start:817ms)
2024-03-07 06:50:53.621 INFO 11498 --- [[bedrock-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
2024-03-07 06:50:53.622 INFO 11498 --- [[bedrock-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
2024-03-07 06:50:53.622 INFO 11498 --- [[bedrock-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1709790653615
2024-03-07 06:50:53.634 INFO 11498 --- [[bedrock-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
2024-03-07 06:50:53.634 INFO 11498 --- [[bedrock-topic]] l.component.kafka.KafkaFetchRecords : Subscribing bedrock-topic-Thread 0 to topic bedrock-topic
2024-03-07 06:50:53.636 INFO 11498 --- [[bedrock-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Subscribed to topic(s): bedrock-topic
2024-03-07 06:50:54.023 WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 2 : {bedrock-topic=LEADER_NOT_AVAILABLE}
2024-03-07 06:50:54.025 INFO 11498 --- [[bedrock-topic]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Cluster ID: Wq0puR18So6u4dXsKJnMNg
2024-03-07 06:50:54.140 WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 4 : {bedrock-topic=LEADER_NOT_AVAILABLE}
2024-03-07 06:50:54.263 WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 6 : {bedrock-topic=LEADER_NOT_AVAILABLE}
2024-03-07 06:50:54.371 WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 8 : {bedrock-topic=LEADER_NOT_AVAILABLE}
2024-03-07 06:50:54.784 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null)
2024-03-07 06:50:54.789 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] (Re-)joining group
2024-03-07 06:50:54.821 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Request joining group due to: need to re-join with the given member-id: consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276
2024-03-07 06:50:54.823 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
2024-03-07 06:50:54.823 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] (Re-)joining group
2024-03-07 06:50:54.846 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Successfully joined group with generation Generation{generationId=1, memberId='consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276', protocol='range'}
2024-03-07 06:50:54.857 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Finished assignment for group at generation 1: {consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276=Assignment(partitions=[bedrock-topic-0])}
2024-03-07 06:50:54.934 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Successfully synced group in generation Generation{generationId=1, memberId='consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276', protocol='range'}
2024-03-07 06:50:54.935 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Notifying assignor about the new Assignment(partitions=[bedrock-topic-0])
2024-03-07 06:50:54.937 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Adding newly assigned partitions: bedrock-topic-0
2024-03-07 06:50:54.962 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Found no committed offset for partition bedrock-topic-0
2024-03-07 06:50:54.989 INFO 11498 --- [[bedrock-topic]] onsumer.internals.SubscriptionState : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Resetting offset for partition bedrock-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}.
In the example folder you have two different messages, prompt1.txt and prompt2.txt, the first one will request the generation of a Json record and the second one an XML record. Both of them need to have an id field, a product name and a price.
We use kcat to send the messages to kafka and consume them.
$ kcat -P -b localhost:9092 -t bedrock-topic prompt1.txt
$ kcat -P -b localhost:9092 -t bedrock-topic prompt2.txt
If you look in the example folder you should see a results folder now:
$ cd results/
$ cat *
You have two different files and if you cat the content you should see
"rows": [
"id": 1,
"product_name": "Phone 12",
"price": "$999"
"id": 2,
"product_name": "Phone 11",
"price": "$899"
"id": 3,
"product_name": "Phone X",
"price": "$799"
"id": 4,
"product_name": "Phone XS",
"price": "$699"
"id": 5,
"product_name": "Phone XR",
"price": "$599"
<record id="1">
<product name="Apple iPhone 12 Pro Max">$1,099</product>
<record id="2">
<product name="Samsung Galaxy S21 Ultra 5G">$1,199</product>
<record id="3">
<product name="Google Pixel 6 Pro">$899</product>
<record id="4">
<product name="Apple iPhone 11 Pro">$999</product>
<record id="5">
<product name="Samsung Galaxy Note 20 Ultra 5G">$1,299</product>
=== Help and contributions
If you hit any problem using Camel or have some feedback, then please[let us know].
We also love contributors, so[get involved] :-)
The Camel riders!