blob: 6f02d583036b6f2ba4f421a0ae260694a2750ea0 [file] [log] [blame]
# Camel-Kafka-connector AWS2 Kinesis Sink
This is an example for Camel-Kafka-connector AWS2-Kinesis Sink
## Standalone
### What is needed
- An AWS Kinesis stream
- The AWS CLI locally
- Some work on AWS console
### Running Kafka
```
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
```
### Setting up the needed bits and running the example
You'll need to setup the plugin.path property in your kafka
Open the `$KAFKA_HOME/config/connect-standalone.properties`
and set the `plugin.path` property to your choosen location
In this example we'll use `/home/oscerd/connectors/`
```
> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-kinesis-kafka-connector/0.10.1/camel-aws2-kinesis-kafka-connector-0.10.1-package.tar.gz
> untar.gz camel-aws2-kinesis-kafka-connector-0.10.1-package.tar.gz
```
On AWS console create a Kinesis stream delivery stream named streamTest.
Now it's time to setup the connectors
Open the AWS2 Kinesis configuration file
```
name=CamelAws2-kinesisSinkConnector
connector.class=org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
topics=mytopic
camel.sink.path.streamName=streamTest
camel.sink.endpoint.accessKey=xxxx
camel.sink.endpoint.secretKey=yyyy
camel.sink.endpoint.region=region
```
and add the correct credentials for AWS.
Now you can run the example
```
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelAWS2KinesisSinkConnector.properties
```
Now send message to kafka topic in this way for example:
```
> echo "hello there" | ./kafkacat -b localhost:9092 -H "CamelHeader.CamelAwsKinesisPartitionKey=partition-1" -t mytopic
% Auto-selecting Producer mode (use -P or -C to override)
> echo "hello there" | ./kafkacat -b localhost:9092 -H "CamelHeader.CamelAwsKinesisPartitionKey=partition-1" -t mytopic
% Auto-selecting Producer mode (use -P or -C to override)
```
To verify the record are present in the streamTest stream we can the AWS CLI
First we need to get the shardIterator
```
> aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name streamTest
{
"ShardIterator": "AAAAAAAAAAGxdqX2OPHzjl3szvOLjdl21ylngnoD9zW3PSvRZHvQu825c0TCgA/M4Z5/dzZzBIJ1JR6h4VF2kmqFsEHOHXQ7gBq1mqXsBxUdk8Xvj1EkzUIbi3tcQFdmXSgW0O+9oTIJZ5ljiWFAwd1Czx1BsiB2c2RcqKUz/nRJjNL5MQBKywKuDEcplfVh+C2NnOCFdKqIamH0KeuK0UXhSHK1ghlW"
}
```
After this we'll need to perform a get-records operation
> aws kinesis get-records --shard-iterator AAAAAAAAAAGxdqX2OPHzjl3szvOLjdl21ylngnoD9zW3PSvRZHvQu825c0TCgA/M4Z5/dzZzBIJ1JR6h4VF2kmqFsEHOHXQ7gBq1mqXsBxUdk8Xvj1EkzUIbi3tcQFdmXSgW0O+9oTIJZ5ljiWFAwd1Czx1BsiB2c2RcqKUz/nRJjNL5MQBKywKuDEcplfVh+C2NnOCFdKqIamH0KeuK0UXhSHK1ghlW
{
"Records": [
{
"Data": "aGVsbG8gdGhlcmU=",
"PartitionKey": "partition-1",
"ApproximateArrivalTimestamp": 1610729857.904,
"SequenceNumber": "4961458467700449568901978308705626930.10.1414429070721026"
},
{
"Data": "aGVsbG8gdGhlcmU=",
"PartitionKey": "partition-1",
"ApproximateArrivalTimestamp": 1610729861.765,
"SequenceNumber": "49614584677004495689019783087057478230601029333123334146"
}
],
"NextShardIterator": "AAAAAAAAAAFWEhvAPrJc6dctkUTv5cFSIIcaQshFYv5wtlofGWJfmi8NjQljI5B4xzdVTE23zik9sbx+G0+T8CxTXScStjWVcZMNRi0Gt11lE0a8a+WkzP5/Zmm8Gf6X6f3w5P/tNzRUFCQc+Tg7eNOeevjiyRdn0271qOtfk5gS7NVtSaSGq13CwV3FWcCN2FzE9F8K04+8YihNrvBNhcuFIU3jyBhY",
"MillisBehindLatest": 0
}
```
As you may see we have now two records.