blob: 83aa5370e9c0b429a513bda044d04650ff945903 [file] [log] [blame]
# Camel-Kafka-connector CQL Source
## Introduction
This is an example for Camel-Kafka-connector CQL
## What is needed
- A Cassandra instance
## Running Kafka
```
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
```
## Setting up the needed bits and running the example
You'll need to setup the plugin.path property in your kafka
Open the `$KAFKA_HOME/config/connect-standalone.properties`
and set the `plugin.path` property to your choosen location
You'll need to build your connector starting from an archetype:
```
> mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.10.1
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] --------------------------------[ pom ]---------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
[INFO]
[INFO]
[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Interactive mode
Define value for property 'groupId': org.apache.camel.kafkaconnector.cql.extended
Define value for property 'artifactId': cql-extended
Define value for property 'version' 1.0-SNAPSHOT: : 0.10.1
Define value for property 'package' org.apache.camel.kafkaconnector.cql.extended: :
Define value for property 'camel-kafka-connector-name': camel-cql-kafka-connector
[INFO] Using property: camel-kafka-connector-version = 0.10.1
Confirm properties configuration:
groupId: org.apache.camel.kafkaconnector.cql.extended
artifactId: cql-extended
version: 0.10.1
package: org.apache.camel.kafkaconnector.cql.extended
camel-kafka-connector-name: camel-cql-kafka-connector
camel-kafka-connector-version: 0.10.1
Y: : Y
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.10.1
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended
[INFO] Parameter: artifactId, Value: cql-extended
[INFO] Parameter: version, Value: 0.10.1
[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended
[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector/cql/extended
[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended
[INFO] Parameter: version, Value: 0.10.1
[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended
[INFO] Parameter: camel-kafka-connector-name, Value: camel-cql-kafka-connector
[INFO] Parameter: camel-kafka-connector-version, Value: 0.10.1
[INFO] Parameter: artifactId, Value: cql-extended
[INFO] Project created from Archetype in dir: /home/oscerd/playground/cql-extended
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 55.314 s
[INFO] Finished at: 2020-10-22T18:06:34+02:00
[INFO] ------------------------------------------------------------------------
> cd /home/oscerd/playground/cql-extended
```
Import the cql-extended project in your favorite IDE and create the following class as RowConversionStrategy
```
package org.apache.camel.kafkaconnector.cql.extended;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import org.apache.camel.component.cassandra.ResultSetConversionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RowConversionStrategy implements ResultSetConversionStrategy {
private static final Logger LOG = LoggerFactory.getLogger(RowConversionStrategy.class);
@Override
public Object getBody(ResultSet resultSet) {
List<String> ret = new ArrayList<>();
Iterator<Row> iterator = resultSet.iterator();
while (iterator.hasNext()) {
Row row = iterator.next();
int id = row.getInt("id");
String name = row.getString("name");
ret.add("Row[" + String.valueOf(id) + ", " + name +"]");
}
return ret;
}
}
```
Now we need to build the connector:
```
> mvn clean package
```
In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated tar.gz from the previous build
```
> cd /home/oscerd/connectors/
> cp /home/oscerd/playground/cql-extended/target/cql-extended-0.10.1-package.tar.gz .
> untar.gz cql-extended-0.10.1-package.tar.gz
```
and we're now ready to setting up the Cassandra cluster.
## Setting up Apache Cassandra
This examples require a running Cassandra instance, for simplicity the steps below show how to start Cassandra using Docker. First you'll need to run a Cassandra instance:
[source,bash]
----
docker run --name master_node --env MAX_HEAP_SIZE='800M' -dt oscerd/cassandra
----
Next, check and make sure Cassandra is running:
[source,bash]
----
docker exec -ti master_node /opt/cassandra/bin/nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 172.17.0.2 251.32 KiB 256 100.0% 5126aaad-f143-43e9-920a-0f9540a93967 rack1
----
To populate the database using to the `cqlsh` tool, you'll need a local installation of Cassandra. Download and extract the Apache Cassandra distribution to a directory. We reference the Cassandra installation directory with `LOCAL_CASSANDRA_HOME`. Here we use version 3.11.4 to connect to the Cassandra instance we started using Docker.
[source,bash]
----
<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node)
----
Next, execute the following script to create keyspace `test`, the table `users` and insert one row into it.
[source,bash]
----
create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3};
use test;
create table users ( id int primary key, name text );
insert into users (id,name) values (1, 'oscerd');
quit;
----
In the configuration `.properties` file we use below the IP address of the Cassandra master node needs to be configured, replace the value `172.17.0.2` configuration property with the IP of the master node obtained from Docker. Each example uses a different `.properties` file shown in the command line to run the example.
[source,bash]
----
docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node
----
Now it's time to setup the connectors
Open the CQL Source configuration file
```
name=CamelCassandraQLSourceConnector
connector.class=org.apache.camel.kafkaconnector.cql.CamelCqlSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
topics=mytopic
camel.source.path.hosts=172.17.0.2
camel.source.path.port=9042
camel.source.path.keyspace=test
camel.source.endpoint.cql=select * from users
camel.source.endpoint.resultSetConversionStrategy=#class:org.apache.camel.kafkaconnector.cql.extended.RowConversionStrategy
```
Set the correct options in the file.
Now you can run the example
```
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelCassandraQLSourceConnector.properties
```
On a different terminal run the kafka-consumer and you should see messages to Kafka from Cassandra
```
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning
[Row[1, oscerd]]
```
## Openshift
### What is needed
- An Openshift instance
### Running Kafka using Strimzi Operator
First we install the Strimzi operator and use it to deploy the Kafka broker and Kafka Connect into our OpenShift project.
We need to create security objects as part of installation so it is necessary to switch to admin user.
If you use Minishift, you can do it with the following command:
[source,bash,options="nowrap"]
----
oc login -u system:admin
----
We will use OpenShift project `myproject`.
If it doesn't exist yet, you can create it using following command:
[source,bash,options="nowrap"]
----
oc new-project myproject
----
If the project already exists, you can switch to it with:
[source,bash,options="nowrap"]
----
oc project myproject
----
We can now install the Strimzi operator into this project:
[source,bash,options="nowrap",subs="attributes"]
----
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.20.1/strimzi-cluster-operator-0.20.1.yaml
----
Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed:
[source,bash,options="nowrap",subs="attributes"]
----
# Deploy a single node Kafka broker
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/kafka/kafka-persistent-single.yaml
# Deploy a single instance of Kafka Connect with no plug-in installed
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/connect/kafka-connect-s2i-single-node-kafka.yaml
----
Optionally enable the possibility to instantiate Kafka Connectors through specific custom resource:
[source,bash,options="nowrap"]
----
oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true
----
### Add Camel Kafka connector binaries
Strimzi uses `Source2Image` builds to allow users to add their own connectors to the existing Strimzi Docker images.
We now need to build the connectors and add them to the image,
if you have built the whole project (`mvn clean package`) decompress the connectors you need in a folder (i.e. like `my-connectors/`)
so that each one is in its own subfolder
(alternatively you can download the latest officially released and packaged connectors from maven):
```
> mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.10.1
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] --------------------------------[ pom ]---------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
[INFO]
[INFO]
[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Interactive mode
Define value for property 'groupId': org.apache.camel.kafkaconnector.cql.extended
Define value for property 'artifactId': cql-extended
Define value for property 'version' 1.0-SNAPSHOT: : 0.10.1
Define value for property 'package' org.apache.camel.kafkaconnector.cql.extended: :
Define value for property 'camel-kafka-connector-name': camel-cql-kafka-connector
[INFO] Using property: camel-kafka-connector-version = 0.10.1
Confirm properties configuration:
groupId: org.apache.camel.kafkaconnector.cql.extended
artifactId: cql-extended
version: 0.10.1
package: org.apache.camel.kafkaconnector.cql.extended
camel-kafka-connector-name: camel-cql-kafka-connector
camel-kafka-connector-version: 0.10.1
Y: : Y
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.10.1
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended
[INFO] Parameter: artifactId, Value: cql-extended
[INFO] Parameter: version, Value: 0.10.1
[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended
[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector/cql/extended
[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended
[INFO] Parameter: version, Value: 0.10.1
[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended
[INFO] Parameter: camel-kafka-connector-name, Value: camel-cql-kafka-connector
[INFO] Parameter: camel-kafka-connector-version, Value: 0.10.1
[INFO] Parameter: artifactId, Value: cql-extended
[INFO] Project created from Archetype in dir: /home/oscerd/playground/cql-extended
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 55.314 s
[INFO] Finished at: 2020-10-22T18:06:34+02:00
[INFO] ------------------------------------------------------------------------
> cd /home/oscerd/playground/cql-extended
```
Import the cql-extended project in your favorite IDE and create the following class as RowConversionStrategy
```
package org.apache.camel.kafkaconnector.cql.extended;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import org.apache.camel.component.cassandra.ResultSetConversionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RowConversionStrategy implements ResultSetConversionStrategy {
private static final Logger LOG = LoggerFactory.getLogger(RowConversionStrategy.class);
@Override
public Object getBody(ResultSet resultSet) {
List<String> ret = new ArrayList<>();
Iterator<Row> iterator = resultSet.iterator();
while (iterator.hasNext()) {
Row row = iterator.next();
int id = row.getInt("id");
String name = row.getString("name");
ret.add("Row[" + String.valueOf(id) + ", " + name +"]");
}
return ret;
}
}
```
Now we need to build the connector:
```
> mvn clean package
```
In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated tar.gz from the previous build
```
> cd my-connectors/
> cp /home/oscerd/playground/cql-extended/target/cql-extended-0.10.1-package.tar.gz .
> untar.gz cql-extended-0.10.1-package.tar.gz
```
Now we can start the build
[source,bash,options="nowrap"]
----
oc start-build my-connect-cluster-connect --from-dir=./my-connectors/ --follow
----
We should now wait for the rollout of the new image to finish and the replica set with the new connector to become ready.
Once it is done, we can check that the connectors are available in our Kafka Connect cluster.
Strimzi is running Kafka Connect in a distributed mode.
To check the available connector plugins, you can run the following command:
[source,bash,options="nowrap"]
----
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins | jq .
----
You should see something like this:
[source,json,options="nowrap"]
----
[
{
"class": "org.apache.camel.kafkaconnector.CamelSinkConnector",
"type": "sink",
"version": "0.10.1"
},
{
"class": "org.apache.camel.kafkaconnector.CamelSourceConnector",
"type": "source",
"version": "0.10.1"
},
{
"class": "org.apache.camel.kafkaconnector.cql.CamelCqlSinkConnector",
"type": "sink",
"version": "0.10.1"
},
{
"class": "org.apache.camel.kafkaconnector.cql.CamelCqlSourceConnector",
"type": "source",
"version": "0.10.1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.5.0"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.5.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
----
### Deploy the Cassandra instance
Next, we need to deploy a Cassandra instance:
[source,bash,options="nowrap"]
----
oc create -f config/openshift/cassandra.yaml
----
This will create a Cassandra deployment and a service that will allow other pods to connect to it.
We then create the table in cassandra using the following command:
----
cat config/openshift/cql-init | oc run -i --restart=Never --attach --rm --image centos/cassandra-311-centos7 cassandra-client --command bash -- -c 'cqlsh -u admin -p admin cassandra'
----
### Create connector instance
Now we can create some instance of the CQL source connector:
[source,bash,options="nowrap"]
----
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF'
{
"name": "cql-source-connector",
"config": {
"connector.class": "org.apache.camel.kafkaconnector.cql.CamelCqlSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"topics": "mytopic",
"camel.source.path.hosts": "cassandra",
"camel.source.path.port": "9042",
"camel.source.path.keyspace": "test",
"camel.source.endpoint.cql": "select * from users",
"camel.source.endpoint.username": "admin",
"camel.source.endpoint.password": "admin",
"camel.source.endpoint.resultSetConversionStrategy": "#class:org.apache.camel.kafkaconnector.cql.extended.RowConversionStrategy"
}
}
EOF
----
Altenatively, if have enabled `use-connector-resources`, you can create the connector instance by creating a specific custom resource:
[source,bash,options="nowrap"]
----
oc create -f config/openshift/cql-source-connector.yaml
----
You can check the status of the connector using
[source,bash,options="nowrap"]
----
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connectors/cql-source-connector/status
----
### Receive messages
Run the following command to get an interactive cqlsh session and insert some data into cassandra:
----
oc run -ti --restart=Never --attach --rm --image centos/cassandra-311-centos7 cassandra-client --command bash -- -c 'cqlsh -u admin -p admin cassandra'
If you don't see a command prompt, try pressing enter.
Connected to Test Cluster at cassandra:9042.
[cqlsh 5.0.1 | Cassandra 3.11.1 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
admin@cqlsh> insert into test.users(id, name) values (1, 'oscerd');
----
And check the messages were received using the console consumer:
[source,bash,options="nowrap"]
----
oc exec -i -c kafka my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic s3-topic --from-beginning
[Row[1, oscerd]]
----