| # Camel-Kafka-connector AWS2 Kinesis Sink |
| |
| This is an example for Camel-Kafka-connector AWS2-Kinesis Sink |
| |
| ## Standalone |
| |
| ### What is needed |
| |
| - An AWS Kinesis stream |
| - The AWS CLI locally |
| - Some work on AWS console |
| |
| ### Running Kafka |
| |
| ``` |
| $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties |
| $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties |
| $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic |
| ``` |
| |
| ### Setting up the needed bits and running the example |
| |
| You'll need to setup the plugin.path property in your kafka |
| |
| Open the `$KAFKA_HOME/config/connect-standalone.properties` |
| |
| and set the `plugin.path` property to your choosen location |
| |
| In this example we'll use `/home/oscerd/connectors/` |
| |
| ``` |
| > cd /home/oscerd/connectors/ |
| > wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-kinesis-kafka-connector/0.10.1/camel-aws2-kinesis-kafka-connector-0.10.1-package.tar.gz |
| > untar.gz camel-aws2-kinesis-kafka-connector-0.10.1-package.tar.gz |
| ``` |
| |
| On AWS console create a Kinesis stream delivery stream named streamTest. |
| |
| Now it's time to setup the connectors |
| |
| Open the AWS2 Kinesis configuration file |
| |
| ``` |
| name=CamelAws2-kinesisSinkConnector |
| connector.class=org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSinkConnector |
| |
| key.converter=org.apache.kafka.connect.storage.StringConverter |
| value.converter=org.apache.kafka.connect.storage.StringConverter |
| |
| topics=mytopic |
| |
| camel.sink.path.streamName=streamTest |
| camel.sink.endpoint.accessKey=xxxx |
| camel.sink.endpoint.secretKey=yyyy |
| camel.sink.endpoint.region=region |
| ``` |
| |
| and add the correct credentials for AWS. |
| |
| Now you can run the example |
| |
| ``` |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelAWS2KinesisSinkConnector.properties |
| ``` |
| |
| Now send message to kafka topic in this way for example: |
| |
| ``` |
| > echo "hello there" | ./kafkacat -b localhost:9092 -H "CamelHeader.CamelAwsKinesisPartitionKey=partition-1" -t mytopic |
| % Auto-selecting Producer mode (use -P or -C to override) |
| > echo "hello there" | ./kafkacat -b localhost:9092 -H "CamelHeader.CamelAwsKinesisPartitionKey=partition-1" -t mytopic |
| % Auto-selecting Producer mode (use -P or -C to override) |
| ``` |
| |
| To verify the record are present in the streamTest stream we can the AWS CLI |
| |
| First we need to get the shardIterator |
| |
| ``` |
| > aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name streamTest |
| { |
| "ShardIterator": "AAAAAAAAAAGxdqX2OPHzjl3szvOLjdl21ylngnoD9zW3PSvRZHvQu825c0TCgA/M4Z5/dzZzBIJ1JR6h4VF2kmqFsEHOHXQ7gBq1mqXsBxUdk8Xvj1EkzUIbi3tcQFdmXSgW0O+9oTIJZ5ljiWFAwd1Czx1BsiB2c2RcqKUz/nRJjNL5MQBKywKuDEcplfVh+C2NnOCFdKqIamH0KeuK0UXhSHK1ghlW" |
| } |
| ``` |
| |
| After this we'll need to perform a get-records operation |
| |
| > aws kinesis get-records --shard-iterator AAAAAAAAAAGxdqX2OPHzjl3szvOLjdl21ylngnoD9zW3PSvRZHvQu825c0TCgA/M4Z5/dzZzBIJ1JR6h4VF2kmqFsEHOHXQ7gBq1mqXsBxUdk8Xvj1EkzUIbi3tcQFdmXSgW0O+9oTIJZ5ljiWFAwd1Czx1BsiB2c2RcqKUz/nRJjNL5MQBKywKuDEcplfVh+C2NnOCFdKqIamH0KeuK0UXhSHK1ghlW |
| { |
| "Records": [ |
| { |
| "Data": "aGVsbG8gdGhlcmU=", |
| "PartitionKey": "partition-1", |
| "ApproximateArrivalTimestamp": 1610729857.904, |
| "SequenceNumber": "4961458467700449568901978308705626930.10.1414429070721026" |
| }, |
| { |
| "Data": "aGVsbG8gdGhlcmU=", |
| "PartitionKey": "partition-1", |
| "ApproximateArrivalTimestamp": 1610729861.765, |
| "SequenceNumber": "49614584677004495689019783087057478230601029333123334146" |
| } |
| ], |
| "NextShardIterator": "AAAAAAAAAAFWEhvAPrJc6dctkUTv5cFSIIcaQshFYv5wtlofGWJfmi8NjQljI5B4xzdVTE23zik9sbx+G0+T8CxTXScStjWVcZMNRi0Gt11lE0a8a+WkzP5/Zmm8Gf6X6f3w5P/tNzRUFCQc+Tg7eNOeevjiyRdn0271qOtfk5gS7NVtSaSGq13CwV3FWcCN2FzE9F8K04+8YihNrvBNhcuFIU3jyBhY", |
| "MillisBehindLatest": 0 |
| } |
| ``` |
| |
| As you may see we have now two records. |
| |
| |
| |
| |