blob: 10e813712e8a429a66e284578527ecf67f999a25 [file] [log] [blame]
# Camel-Kafka-connector CQL Sink
## Introduction
This is an example for Camel-Kafka-connector CQL
## What is needed
- A Cassandra instance
## Running Kafka
```
$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
```
## Setting up the needed bits and running the example
You'll need to setup the plugin.path property in your kafka
Open the `$KAFKA_HOME/config/connect-standalone.properties`
and set the `plugin.path` property to your choosen location
In this example we'll use `/home/oscerd/connectors/`
```
> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-cql-kafka-connector/0.5.0/camel-cql-kafka-connector-0.5.0-package.zip
> unzip camel-cql-kafka-connector-0.5.0-package.zip
```
## Setting up Apache Cassandra
This examples require a running Cassandra instance, for simplicity the steps below show how to start Cassandra using Docker. First you'll need to run a Cassandra instance:
[source,bash]
----
docker run --name master_node --env MAX_HEAP_SIZE='800M' -dt oscerd/cassandra
----
Next, check and make sure Cassandra is running:
[source,bash]
----
docker exec -ti master_node /opt/cassandra/bin/nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 172.17.0.2 251.32 KiB 256 100.0% 5126aaad-f143-43e9-920a-0f9540a93967 rack1
----
To populate the database using to the `cqlsh` tool, you'll need a local installation of Cassandra. Download and extract the Apache Cassandra distribution to a directory. We reference the Cassandra installation directory with `LOCAL_CASSANDRA_HOME`. Here we use version 3.11.4 to connect to the Cassandra instance we started using Docker.
[source,bash]
----
<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node)
----
Next, execute the following script to create keyspace `test`, the table `users` and insert one row into it.
[source,bash]
----
create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3};
use test;
create table users ( id timeuuid primary key, name text );
insert into users (id,name) values (now(), 'oscerd');
quit;
----
In the configuration `.properties` file we use below the IP address of the Cassandra master node needs to be configured, replace the value `172.17.0.2` in the `camel.source.url` or `localhost` in `camel.sink.url` configuration property with the IP of the master node obtained from Docker. Each example uses a different `.properties` file shown in the command line to run the example.
[source,bash]
----
docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node
----
Now it's time to setup the connectors
Open the CQL Sink configuration file
```
name=CamelCassandraQLSinkConnector
topics=mytopic
tasks.max=1
connector.class=org.apache.camel.kafkaconnector.cql.CamelCqlSinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
camel.sink.path.hosts=172.17.0.2
camel.sink.path.port=9042
camel.sink.path.keyspace=test
camel.sink.endpoint.cql=insert into users(id, name) values (now(), ?)
```
and add the correct credentials for AWS.
Now you can run the example
```
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelCassandraQLSinkConnector.properties
```
On a different terminal run the kafka-producer and you should see messages from the Cassandra test keyspace populated
```
kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
>message
```
You can verify the behavior through the following command
[source,bash]
----
<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node)
----
Next, execute the following script to create keyspace `test`, the table `users` and insert one row into it.
[source,bash]
----
use test;
select * from users;
----
and you should see
[source,bash]
----
(2 rows)
cqlsh:test> select * from users;
id | name
--------------------------------------+----------
6cbe74a0-96a6-11ea-a8ff-09d03512038e | message
fc2c66c0-96a5-11ea-a8ff-09d03512038e | oscerd
----