| == Kafka to AWS Bedrock Text Sink Kamelet |
| |
| In this sample you'll use the Kafka Source Kamelet and the AWS Bedrock Text Sink Kamelet |
| |
| === Install JBang |
| |
| First install JBang according to https://www.jbang.dev |
| |
| When JBang is installed then you should be able to run from a shell: |
| |
| [source,sh] |
| ---- |
| $ jbang --version |
| ---- |
| |
| This will output the version of JBang. |
| |
| To run this example you can either install Camel on JBang via: |
| |
| [source,sh] |
| ---- |
| $ jbang app install camel@apache/camel |
| ---- |
| |
| Which allows to run CamelJBang with `camel` as shown below. |
| |
| === Setup Kafka instance |
| |
| You'll need to run a Kafka cluster to point to. In this case you could use an ansible role like https://github.com/oscerd/kafka-ansible-role |
| |
| And set up a file deploy.yaml with the following content: |
| |
| ```yaml |
| - name: role kafka |
| hosts: localhost |
| remote_user: user |
| |
| roles: |
| - role: kafka-ansible-role |
| kafka_version: 3.6.1 |
| path_dir: /home/user/ |
| unarchive_dest_dir: /home/user/kafka/demo/ |
| start_kafka: true |
| ``` |
| |
| and then run |
| |
| ```shell script |
| ansible-playbook -v deploy.yaml |
| ``` |
| |
| This should start a Kafka instance for you, on your local machine. |
| |
| === Set up AWS Bedrock |
| |
| The Kamelet expects the AWS credentials to be placed in `/home/<user>/.aws/credentials` |
| |
| That's reason why you see the useDefaultCredentialsProvider option set to true in the AWS Bedrock Text Sink configuration. |
| |
| === How to run |
| |
| Then you can run this example using: |
| |
| [source,sh] |
| ---- |
| $ jbang -Dcamel.jbang.version=4.7.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<local_path_to_kamelets> kafka-aws-bedrock.camel.yaml |
| ---- |
| |
| |
| === Sending two messages |
| |
| In the log you should see the consumer: |
| |
| [source,sh] |
| ---- |
| 2024-03-07 06:50:53.517 INFO 11498 --- [ main] el.impl.engine.AbstractCamelContext : Routes startup (total:1 started:1 kamelets:2) |
| 2024-03-07 06:50:53.517 INFO 11498 --- [ main] el.impl.engine.AbstractCamelContext : Started route1 (kamelet://kafka-not-secured-source) |
| 2024-03-07 06:50:53.517 INFO 11498 --- [ main] el.impl.engine.AbstractCamelContext : Apache Camel 4.7.0-SNAPSHOT (kafka-aws-bedrock) started in 817ms (build:0ms init:0ms start:817ms) |
| 2024-03-07 06:50:53.621 INFO 11498 --- [[bedrock-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1 |
| 2024-03-07 06:50:53.622 INFO 11498 --- [[bedrock-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5 |
| 2024-03-07 06:50:53.622 INFO 11498 --- [[bedrock-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1709790653615 |
| 2024-03-07 06:50:53.634 INFO 11498 --- [[bedrock-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy |
| 2024-03-07 06:50:53.634 INFO 11498 --- [[bedrock-topic]] l.component.kafka.KafkaFetchRecords : Subscribing bedrock-topic-Thread 0 to topic bedrock-topic |
| 2024-03-07 06:50:53.636 INFO 11498 --- [[bedrock-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Subscribed to topic(s): bedrock-topic |
| 2024-03-07 06:50:54.023 WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 2 : {bedrock-topic=LEADER_NOT_AVAILABLE} |
| 2024-03-07 06:50:54.025 INFO 11498 --- [[bedrock-topic]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Cluster ID: Wq0puR18So6u4dXsKJnMNg |
| 2024-03-07 06:50:54.140 WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 4 : {bedrock-topic=LEADER_NOT_AVAILABLE} |
| 2024-03-07 06:50:54.263 WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 6 : {bedrock-topic=LEADER_NOT_AVAILABLE} |
| 2024-03-07 06:50:54.371 WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 8 : {bedrock-topic=LEADER_NOT_AVAILABLE} |
| 2024-03-07 06:50:54.784 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null) |
| 2024-03-07 06:50:54.789 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] (Re-)joining group |
| 2024-03-07 06:50:54.821 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Request joining group due to: need to re-join with the given member-id: consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276 |
| 2024-03-07 06:50:54.823 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) |
| 2024-03-07 06:50:54.823 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] (Re-)joining group |
| 2024-03-07 06:50:54.846 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Successfully joined group with generation Generation{generationId=1, memberId='consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276', protocol='range'} |
| 2024-03-07 06:50:54.857 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Finished assignment for group at generation 1: {consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276=Assignment(partitions=[bedrock-topic-0])} |
| 2024-03-07 06:50:54.934 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Successfully synced group in generation Generation{generationId=1, memberId='consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276', protocol='range'} |
| 2024-03-07 06:50:54.935 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Notifying assignor about the new Assignment(partitions=[bedrock-topic-0]) |
| 2024-03-07 06:50:54.937 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Adding newly assigned partitions: bedrock-topic-0 |
| 2024-03-07 06:50:54.962 INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Found no committed offset for partition bedrock-topic-0 |
| 2024-03-07 06:50:54.989 INFO 11498 --- [[bedrock-topic]] onsumer.internals.SubscriptionState : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Resetting offset for partition bedrock-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}. |
| ---- |
| |
| In the example folder you have two different messages, prompt1.txt and prompt2.txt, the first one will request the generation of a Json record and the second one an XML record. Both of them need to have an id field, a product name and a price. |
| |
| We use kcat to send the messages to kafka and consume them. |
| |
| [source,sh] |
| ---- |
| $ kcat -P -b localhost:9092 -t bedrock-topic prompt1.txt |
| $ kcat -P -b localhost:9092 -t bedrock-topic prompt2.txt |
| ---- |
| |
| If you look in the example folder you should see a results folder now: |
| |
| [source,sh] |
| ---- |
| $ cd results/ |
| $ cat * |
| ---- |
| |
| You have two different files and if you cat the content you should see |
| |
| [source,sh] |
| ---- |
| ```tabular-data-json |
| { |
| "rows": [ |
| { |
| "id": 1, |
| "product_name": "Phone 12", |
| "price": "$999" |
| }, |
| { |
| "id": 2, |
| "product_name": "Phone 11", |
| "price": "$899" |
| }, |
| { |
| "id": 3, |
| "product_name": "Phone X", |
| "price": "$799" |
| }, |
| { |
| "id": 4, |
| "product_name": "Phone XS", |
| "price": "$699" |
| }, |
| { |
| "id": 5, |
| "product_name": "Phone XR", |
| "price": "$599" |
| } |
| ] |
| } |
| ``` |
| ```tabular-data-xml |
| <record id="1"> |
| <product name="Apple iPhone 12 Pro Max">$1,099</product> |
| </record> |
| <record id="2"> |
| <product name="Samsung Galaxy S21 Ultra 5G">$1,199</product> |
| </record> |
| <record id="3"> |
| <product name="Google Pixel 6 Pro">$899</product> |
| </record> |
| <record id="4"> |
| <product name="Apple iPhone 11 Pro">$999</product> |
| </record> |
| <record id="5"> |
| <product name="Samsung Galaxy Note 20 Ultra 5G">$1,299</product> |
| </record> |
| ``` |
| ---- |
| |
| === Help and contributions |
| |
| If you hit any problem using Camel or have some feedback, then please |
| https://camel.apache.org/community/support/[let us know]. |
| |
| We also love contributors, so |
| https://camel.apache.org/community/contributing/[get involved] :-) |
| |
| The Camel riders! |