| # Camel-Kafka-connector CQL Source |
| |
| ## Introduction |
| |
| This is an example for Camel-Kafka-connector CQL |
| |
| ## What is needed |
| |
| - A Cassandra instance |
| |
| ## Running Kafka |
| |
| ``` |
| $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties |
| $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties |
| $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic |
| ``` |
| |
| ## Setting up the needed bits and running the example |
| |
| You'll need to setup the plugin.path property in your kafka |
| |
| Open the `$KAFKA_HOME/config/connect-standalone.properties` |
| |
| and set the `plugin.path` property to your choosen location |
| |
| You'll need to build your connector starting from an archetype: |
| |
| ``` |
| > mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.10.1 |
| [INFO] Scanning for projects... |
| [INFO] |
| [INFO] ------------------< org.apache.maven:standalone-pom >------------------- |
| [INFO] Building Maven Stub Project (No POM) 1 |
| [INFO] --------------------------------[ pom ]--------------------------------- |
| [INFO] |
| [INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>> |
| [INFO] |
| [INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<< |
| [INFO] |
| [INFO] |
| [INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom --- |
| [INFO] Generating project in Interactive mode |
| Define value for property 'groupId': org.apache.camel.kafkaconnector.cql.extended |
| Define value for property 'artifactId': cql-extended |
| Define value for property 'version' 1.0-SNAPSHOT: : 0.10.1 |
| Define value for property 'package' org.apache.camel.kafkaconnector.cql.extended: : |
| Define value for property 'camel-kafka-connector-name': camel-cql-kafka-connector |
| [INFO] Using property: camel-kafka-connector-version = 0.10.1 |
| Confirm properties configuration: |
| groupId: org.apache.camel.kafkaconnector.cql.extended |
| artifactId: cql-extended |
| version: 0.10.1 |
| package: org.apache.camel.kafkaconnector.cql.extended |
| camel-kafka-connector-name: camel-cql-kafka-connector |
| camel-kafka-connector-version: 0.10.1 |
| Y: : Y |
| [INFO] ---------------------------------------------------------------------------- |
| [INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.10.1 |
| [INFO] ---------------------------------------------------------------------------- |
| [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended |
| [INFO] Parameter: artifactId, Value: cql-extended |
| [INFO] Parameter: version, Value: 0.10.1 |
| [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended |
| [INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector/cql/extended |
| [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended |
| [INFO] Parameter: version, Value: 0.10.1 |
| [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended |
| [INFO] Parameter: camel-kafka-connector-name, Value: camel-cql-kafka-connector |
| [INFO] Parameter: camel-kafka-connector-version, Value: 0.10.1 |
| [INFO] Parameter: artifactId, Value: cql-extended |
| [INFO] Project created from Archetype in dir: /home/oscerd/playground/cql-extended |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] BUILD SUCCESS |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] Total time: 55.314 s |
| [INFO] Finished at: 2020-10-22T18:06:34+02:00 |
| [INFO] ------------------------------------------------------------------------ |
| > cd /home/oscerd/playground/cql-extended |
| ``` |
| |
| Import the cql-extended project in your favorite IDE and create the following class as RowConversionStrategy |
| |
| ``` |
| package org.apache.camel.kafkaconnector.cql.extended; |
| |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import com.datastax.oss.driver.api.core.cql.ResultSet; |
| import com.datastax.oss.driver.api.core.cql.Row; |
| import org.apache.camel.component.cassandra.ResultSetConversionStrategy; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class RowConversionStrategy implements ResultSetConversionStrategy { |
| private static final Logger LOG = LoggerFactory.getLogger(RowConversionStrategy.class); |
| |
| @Override |
| public Object getBody(ResultSet resultSet) { |
| List<String> ret = new ArrayList<>(); |
| |
| Iterator<Row> iterator = resultSet.iterator(); |
| while (iterator.hasNext()) { |
| Row row = iterator.next(); |
| int id = row.getInt("id"); |
| String name = row.getString("name"); |
| ret.add("Row[" + String.valueOf(id) + ", " + name +"]"); |
| } |
| |
| return ret; |
| } |
| } |
| ``` |
| |
| Now we need to build the connector: |
| |
| ``` |
| > mvn clean package |
| ``` |
| |
| In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated tar.gz from the previous build |
| |
| ``` |
| > cd /home/oscerd/connectors/ |
| > cp /home/oscerd/playground/cql-extended/target/cql-extended-0.10.1-package.tar.gz . |
| > untar.gz cql-extended-0.10.1-package.tar.gz |
| ``` |
| |
| and we're now ready to setting up the Cassandra cluster. |
| |
| ## Setting up Apache Cassandra |
| |
| This examples require a running Cassandra instance, for simplicity the steps below show how to start Cassandra using Docker. First you'll need to run a Cassandra instance: |
| |
| [source,bash] |
| ---- |
| docker run --name master_node --env MAX_HEAP_SIZE='800M' -dt oscerd/cassandra |
| ---- |
| |
| Next, check and make sure Cassandra is running: |
| |
| [source,bash] |
| ---- |
| docker exec -ti master_node /opt/cassandra/bin/nodetool status |
| Datacenter: datacenter1 |
| ======================= |
| Status=Up/Down |
| |/ State=Normal/Leaving/Joining/Moving |
| -- Address Load Tokens Owns (effective) Host ID Rack |
| UN 172.17.0.2 251.32 KiB 256 100.0% 5126aaad-f143-43e9-920a-0f9540a93967 rack1 |
| ---- |
| |
| To populate the database using to the `cqlsh` tool, you'll need a local installation of Cassandra. Download and extract the Apache Cassandra distribution to a directory. We reference the Cassandra installation directory with `LOCAL_CASSANDRA_HOME`. Here we use version 3.11.4 to connect to the Cassandra instance we started using Docker. |
| |
| [source,bash] |
| ---- |
| <LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node) |
| ---- |
| |
| Next, execute the following script to create keyspace `test`, the table `users` and insert one row into it. |
| |
| [source,bash] |
| ---- |
| create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3}; |
| use test; |
| create table users ( id int primary key, name text ); |
| insert into users (id,name) values (1, 'oscerd'); |
| quit; |
| ---- |
| |
| In the configuration `.properties` file we use below the IP address of the Cassandra master node needs to be configured, replace the value `172.17.0.2` configuration property with the IP of the master node obtained from Docker. Each example uses a different `.properties` file shown in the command line to run the example. |
| |
| [source,bash] |
| ---- |
| docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node |
| ---- |
| |
| Now it's time to setup the connectors |
| |
| Open the CQL Source configuration file |
| |
| ``` |
| name=CamelCassandraQLSourceConnector |
| connector.class=org.apache.camel.kafkaconnector.cql.CamelCqlSourceConnector |
| key.converter=org.apache.kafka.connect.storage.StringConverter |
| value.converter=org.apache.kafka.connect.storage.StringConverter |
| |
| topics=mytopic |
| |
| camel.source.path.hosts=172.17.0.2 |
| camel.source.path.port=9042 |
| camel.source.path.keyspace=test |
| camel.source.endpoint.cql=select * from users |
| camel.source.endpoint.resultSetConversionStrategy=#class:org.apache.camel.kafkaconnector.cql.extended.RowConversionStrategy |
| ``` |
| |
| Set the correct options in the file. |
| |
| Now you can run the example |
| |
| ``` |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelCassandraQLSourceConnector.properties |
| ``` |
| |
| On a different terminal run the kafka-consumer and you should see messages to Kafka from Cassandra |
| |
| ``` |
| bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning |
| [Row[1, oscerd]] |
| ``` |
| |
| ## Openshift |
| |
| ### What is needed |
| |
| - An Openshift instance |
| |
| ### Running Kafka using Strimzi Operator |
| |
| First we install the Strimzi operator and use it to deploy the Kafka broker and Kafka Connect into our OpenShift project. |
| We need to create security objects as part of installation so it is necessary to switch to admin user. |
| If you use Minishift, you can do it with the following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc login -u system:admin |
| ---- |
| |
| We will use OpenShift project `myproject`. |
| If it doesn't exist yet, you can create it using following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc new-project myproject |
| ---- |
| |
| If the project already exists, you can switch to it with: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc project myproject |
| ---- |
| |
| We can now install the Strimzi operator into this project: |
| |
| [source,bash,options="nowrap",subs="attributes"] |
| ---- |
| oc apply -f https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.20.1/strimzi-cluster-operator-0.20.1.yaml |
| ---- |
| |
| Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed: |
| |
| [source,bash,options="nowrap",subs="attributes"] |
| ---- |
| # Deploy a single node Kafka broker |
| oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/kafka/kafka-persistent-single.yaml |
| |
| # Deploy a single instance of Kafka Connect with no plug-in installed |
| oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/connect/kafka-connect-s2i-single-node-kafka.yaml |
| ---- |
| |
| Optionally enable the possibility to instantiate Kafka Connectors through specific custom resource: |
| [source,bash,options="nowrap"] |
| ---- |
| oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true |
| ---- |
| |
| ### Add Camel Kafka connector binaries |
| |
| Strimzi uses `Source2Image` builds to allow users to add their own connectors to the existing Strimzi Docker images. |
| We now need to build the connectors and add them to the image, |
| if you have built the whole project (`mvn clean package`) decompress the connectors you need in a folder (i.e. like `my-connectors/`) |
| so that each one is in its own subfolder |
| (alternatively you can download the latest officially released and packaged connectors from maven): |
| |
| ``` |
| > mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.10.1 |
| [INFO] Scanning for projects... |
| [INFO] |
| [INFO] ------------------< org.apache.maven:standalone-pom >------------------- |
| [INFO] Building Maven Stub Project (No POM) 1 |
| [INFO] --------------------------------[ pom ]--------------------------------- |
| [INFO] |
| [INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>> |
| [INFO] |
| [INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<< |
| [INFO] |
| [INFO] |
| [INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom --- |
| [INFO] Generating project in Interactive mode |
| Define value for property 'groupId': org.apache.camel.kafkaconnector.cql.extended |
| Define value for property 'artifactId': cql-extended |
| Define value for property 'version' 1.0-SNAPSHOT: : 0.10.1 |
| Define value for property 'package' org.apache.camel.kafkaconnector.cql.extended: : |
| Define value for property 'camel-kafka-connector-name': camel-cql-kafka-connector |
| [INFO] Using property: camel-kafka-connector-version = 0.10.1 |
| Confirm properties configuration: |
| groupId: org.apache.camel.kafkaconnector.cql.extended |
| artifactId: cql-extended |
| version: 0.10.1 |
| package: org.apache.camel.kafkaconnector.cql.extended |
| camel-kafka-connector-name: camel-cql-kafka-connector |
| camel-kafka-connector-version: 0.10.1 |
| Y: : Y |
| [INFO] ---------------------------------------------------------------------------- |
| [INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.10.1 |
| [INFO] ---------------------------------------------------------------------------- |
| [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended |
| [INFO] Parameter: artifactId, Value: cql-extended |
| [INFO] Parameter: version, Value: 0.10.1 |
| [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended |
| [INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector/cql/extended |
| [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.cql.extended |
| [INFO] Parameter: version, Value: 0.10.1 |
| [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.cql.extended |
| [INFO] Parameter: camel-kafka-connector-name, Value: camel-cql-kafka-connector |
| [INFO] Parameter: camel-kafka-connector-version, Value: 0.10.1 |
| [INFO] Parameter: artifactId, Value: cql-extended |
| [INFO] Project created from Archetype in dir: /home/oscerd/playground/cql-extended |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] BUILD SUCCESS |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] Total time: 55.314 s |
| [INFO] Finished at: 2020-10-22T18:06:34+02:00 |
| [INFO] ------------------------------------------------------------------------ |
| > cd /home/oscerd/playground/cql-extended |
| ``` |
| |
| Import the cql-extended project in your favorite IDE and create the following class as RowConversionStrategy |
| |
| ``` |
| package org.apache.camel.kafkaconnector.cql.extended; |
| |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import com.datastax.oss.driver.api.core.cql.ResultSet; |
| import com.datastax.oss.driver.api.core.cql.Row; |
| import org.apache.camel.component.cassandra.ResultSetConversionStrategy; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class RowConversionStrategy implements ResultSetConversionStrategy { |
| private static final Logger LOG = LoggerFactory.getLogger(RowConversionStrategy.class); |
| |
| @Override |
| public Object getBody(ResultSet resultSet) { |
| List<String> ret = new ArrayList<>(); |
| |
| Iterator<Row> iterator = resultSet.iterator(); |
| while (iterator.hasNext()) { |
| Row row = iterator.next(); |
| int id = row.getInt("id"); |
| String name = row.getString("name"); |
| ret.add("Row[" + String.valueOf(id) + ", " + name +"]"); |
| } |
| |
| return ret; |
| } |
| } |
| ``` |
| |
| Now we need to build the connector: |
| |
| ``` |
| > mvn clean package |
| ``` |
| |
| In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated tar.gz from the previous build |
| |
| ``` |
| > cd my-connectors/ |
| > cp /home/oscerd/playground/cql-extended/target/cql-extended-0.10.1-package.tar.gz . |
| > untar.gz cql-extended-0.10.1-package.tar.gz |
| ``` |
| |
| Now we can start the build |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc start-build my-connect-cluster-connect --from-dir=./my-connectors/ --follow |
| ---- |
| |
| We should now wait for the rollout of the new image to finish and the replica set with the new connector to become ready. |
| Once it is done, we can check that the connectors are available in our Kafka Connect cluster. |
| Strimzi is running Kafka Connect in a distributed mode. |
| |
| To check the available connector plugins, you can run the following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins | jq . |
| ---- |
| |
| You should see something like this: |
| |
| [source,json,options="nowrap"] |
| ---- |
| [ |
| { |
| "class": "org.apache.camel.kafkaconnector.CamelSinkConnector", |
| "type": "sink", |
| "version": "0.10.1" |
| }, |
| { |
| "class": "org.apache.camel.kafkaconnector.CamelSourceConnector", |
| "type": "source", |
| "version": "0.10.1" |
| }, |
| { |
| "class": "org.apache.camel.kafkaconnector.cql.CamelCqlSinkConnector", |
| "type": "sink", |
| "version": "0.10.1" |
| }, |
| { |
| "class": "org.apache.camel.kafkaconnector.cql.CamelCqlSourceConnector", |
| "type": "source", |
| "version": "0.10.1" |
| }, |
| { |
| "class": "org.apache.kafka.connect.file.FileStreamSinkConnector", |
| "type": "sink", |
| "version": "2.5.0" |
| }, |
| { |
| "class": "org.apache.kafka.connect.file.FileStreamSourceConnector", |
| "type": "source", |
| "version": "2.5.0" |
| }, |
| { |
| "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", |
| "type": "source", |
| "version": "1" |
| }, |
| { |
| "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", |
| "type": "source", |
| "version": "1" |
| }, |
| { |
| "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", |
| "type": "source", |
| "version": "1" |
| } |
| ] |
| ---- |
| |
| |
| ### Deploy the Cassandra instance |
| |
| Next, we need to deploy a Cassandra instance: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc create -f config/openshift/cassandra.yaml |
| ---- |
| |
| This will create a Cassandra deployment and a service that will allow other pods to connect to it. |
| |
| |
| We then create the table in cassandra using the following command: |
| |
| ---- |
| cat config/openshift/cql-init | oc run -i --restart=Never --attach --rm --image centos/cassandra-311-centos7 cassandra-client --command bash -- -c 'cqlsh -u admin -p admin cassandra' |
| ---- |
| |
| |
| ### Create connector instance |
| |
| Now we can create some instance of the CQL source connector: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -X POST \ |
| -H "Accept:application/json" \ |
| -H "Content-Type:application/json" \ |
| http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF' |
| { |
| "name": "cql-source-connector", |
| "config": { |
| "connector.class": "org.apache.camel.kafkaconnector.cql.CamelCqlSourceConnector", |
| "tasks.max": "1", |
| "key.converter": "org.apache.kafka.connect.storage.StringConverter", |
| "value.converter": "org.apache.kafka.connect.storage.StringConverter", |
| "topics": "mytopic", |
| "camel.source.path.hosts": "cassandra", |
| "camel.source.path.port": "9042", |
| "camel.source.path.keyspace": "test", |
| "camel.source.endpoint.cql": "select * from users", |
| "camel.source.endpoint.username": "admin", |
| "camel.source.endpoint.password": "admin", |
| "camel.source.endpoint.resultSetConversionStrategy": "#class:org.apache.camel.kafkaconnector.cql.extended.RowConversionStrategy" |
| } |
| } |
| EOF |
| ---- |
| |
| Altenatively, if have enabled `use-connector-resources`, you can create the connector instance by creating a specific custom resource: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc create -f config/openshift/cql-source-connector.yaml |
| ---- |
| |
| |
| You can check the status of the connector using |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connectors/cql-source-connector/status |
| ---- |
| |
| |
| |
| ### Receive messages |
| |
| |
| Run the following command to get an interactive cqlsh session and insert some data into cassandra: |
| |
| ---- |
| oc run -ti --restart=Never --attach --rm --image centos/cassandra-311-centos7 cassandra-client --command bash -- -c 'cqlsh -u admin -p admin cassandra' |
| If you don't see a command prompt, try pressing enter. |
| Connected to Test Cluster at cassandra:9042. |
| [cqlsh 5.0.1 | Cassandra 3.11.1 | CQL spec 3.4.4 | Native protocol v4] |
| Use HELP for help. |
| admin@cqlsh> insert into test.users(id, name) values (1, 'oscerd'); |
| ---- |
| |
| |
| And check the messages were received using the console consumer: |
| |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc exec -i -c kafka my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic s3-topic --from-beginning |
| [Row[1, oscerd]] |
| ---- |
| |