tree: 5cac105edf0d895e95a473c3a9e082cd6ff81138 [path history] [tgz]
  1. flow-binding.yaml
  2. README.md
kamelets/kafka-to-s3-streaming-upload/README.md

Kafka to LOG

  • Use the quickstart for https://strimzi.io/quickstarts/ and follow the minikube guide.

  • Install camel-k on the kafka namespace

  • Add the correct credentials in the flow binding file for AWS S3 service. Don't forget to create the kamelets-demo bucket in the region you select.

  • Run the following commands

    kubectl apply -f flow-binding.yaml -n kafka
    
  • Check logs

    kamel logs kafka-to-s3-streaming-upload -n kafka
    
    [1] 2021-07-30 05:45:28,277 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Routes startup summary (total:3 started:3)
    [1] 2021-07-30 05:45:28,277 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main)     Started route1 (kamelet://kafka-not-secured-source/source)
    [1] 2021-07-30 05:45:28,277 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main)     Started source (kafka://test-topic)
    [1] 2021-07-30 05:45:28,277 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main)     Started sink (kamelet://source)
    [1] 2021-07-30 05:45:28,278 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.11.0 (camel-1) started in 1s751ms (build:0ms init:75ms start:1s676ms)
    [1] 2021-07-30 05:45:28,281 INFO  [io.quarkus] (main) camel-k-integration 1.5.0 on JVM (powered by Quarkus 2.0.0.Final) started in 7.710s.
    [1] 2021-07-30 05:45:28,281 INFO  [io.quarkus] (main) Profile prod activated.
    [1] 2021-07-30 05:45:28,282 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'sasl.kerberos.ticket.renew.window.factor' was supplied but isn't a known config.
    [1] 2021-07-30 05:45:28,284 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'sasl.kerberos.kinit.cmd' was supplied but isn't a known config.
    [1] 2021-07-30 05:45:28,285 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'specific.avro.reader' was supplied but isn't a known config.
    [1] 2021-07-30 05:45:28,284 INFO  [io.quarkus] (main) Installed features: [camel-aws2-commons, camel-aws2-s3, camel-bean, camel-core, camel-k-core, camel-k-runtime, camel-kafka, camel-kamelet, camel-support-common, camel-support-commons-logging, camel-support-httpclient, camel-yaml-dsl, cdi, kafka-client]
    [1] 2021-07-30 05:45:28,287 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'sasl.kerberos.ticket.renew.jitter' was supplied but isn't a known config.
    [1] 2021-07-30 05:45:28,288 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.trustmanager.algorithm' was supplied but isn't a known config.
    [1] 2021-07-30 05:45:28,288 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.keystore.type' was supplied but isn't a known config.
    [1] 2021-07-30 05:45:28,289 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'sasl.kerberos.min.time.before.relogin' was supplied but isn't a known config.
    [1] 2021-07-30 05:45:28,289 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.endpoint.identification.algorithm' was supplied but isn't a known config.
    [1] 2021-07-30 05:45:28,289 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.protocol' was supplied but isn't a known config.
    [1] 2021-07-30 05:45:28,290 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.enabled.protocols' was supplied but isn't a known config.
    [1] 2021-07-30 05:45:28,291 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.truststore.type' was supplied but isn't a known config.
    [1] 2021-07-30 05:45:28,291 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.keymanager.algorithm' was supplied but isn't a known config.
    [1] 2021-07-30 05:45:28,292 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) Kafka version: 2.8.0
    [1] 2021-07-30 05:45:28,292 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) Kafka commitId: ebb1d6e21cc92130
    [1] 2021-07-30 05:45:28,292 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) Kafka startTimeMs: 1627623928292
    [1] 2021-07-30 05:45:28,292 INFO  [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) Subscribing test-topic-Thread 0 to topic test-topic
    [1] 2021-07-30 05:45:28,293 INFO  [org.apa.kaf.cli.con.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Subscribed to topic(s): test-topic
    [1] 2021-07-30 05:45:28,582 WARN  [org.apa.kaf.cli.NetworkClient] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Error while fetching metadata with correlation id 2 : {test-topic=LEADER_NOT_AVAILABLE}
    [1] 2021-07-30 05:45:28,583 INFO  [org.apa.kaf.cli.Metadata] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Cluster ID: r6q2BGnHT7awUAK0diO1hA
    [1] 2021-07-30 05:45:28,585 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Discovered group coordinator my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2147483647 rack: null)
    [1] 2021-07-30 05:45:28,635 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] (Re-)joining group
    [1] 2021-07-30 05:45:28,755 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] (Re-)joining group
    [1] 2021-07-30 05:45:28,759 WARN  [org.apa.kaf.cli.NetworkClient] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Error while fetching metadata with correlation id 6 : {test-topic=LEADER_NOT_AVAILABLE}
    [1] 2021-07-30 05:45:28,868 WARN  [org.apa.kaf.cli.NetworkClient] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Error while fetching metadata with correlation id 8 : {test-topic=LEADER_NOT_AVAILABLE}
    [1] 2021-07-30 05:45:31,766 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Successfully joined group with generation Generation{generationId=1, memberId='consumer-camel-k-integration-2-a03ed9eb-5a45-4170-b519-97f9a32e019d', protocol='range'}
    [1] 2021-07-30 05:45:31,772 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Finished assignment for group at generation 1: {consumer-camel-k-integration-2-a03ed9eb-5a45-4170-b519-97f9a32e019d=Assignment(partitions=[test-topic-0])}
    [1] 2021-07-30 05:45:31,790 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Successfully synced group in generation Generation{generationId=1, memberId='consumer-camel-k-integration-2-a03ed9eb-5a45-4170-b519-97f9a32e019d', protocol='range'}
    [1] 2021-07-30 05:45:31,791 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Notifying assignor about the new Assignment(partitions=[test-topic-0])
    [1] 2021-07-30 05:45:31,797 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Adding newly assigned partitions: test-topic-0
    [1] 2021-07-30 05:45:31,808 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Found no committed offset for partition test-topic-0
    [1] 2021-07-30 05:45:31,818 INFO  [org.apa.kaf.cli.con.int.SubscriptionState] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Resetting offset for partition test-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9092 (id: 0 rack: null)], epoch=0}}.
    
  • Send some data to Kafka topic. Run the following command

    kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.24.0-kafka-2.8.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic test-topic
    If you don't see a command prompt, try pressing enter.
    >l1
    >l2
    >l3
    >l4
    >l5
    
  • In the integration logs you should now read

    [1] 2021-07-30 05:46:12,156 INFO  [org.apa.cam.com.aws.s3.str.AWS2S3StreamUploadProducer] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) Completed upload for the part 1 with etag "03204a1ad503637fc7b6cef6ac290fd4-1" at index 5
    
  • If you go into the kamelets-demo bucket of your account, you should see only one file KafkaTestFile.txt, containing the file messages concatenated.