blob: f86c813141083b3b140acb4204b097effe7d0f38 [file] [log] [blame]
# Camel-Kafka-connector AWS2 Kinesis Source
This is an example for Camel-Kafka-connector AWS2-Kinesis Source
## Standalone
### What is needed
- An AWS Kinesis stream
- Some work on AWS console
### Running Kafka
```
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
```
=== Setting up the needed bits and running the example
You'll need to setup the plugin.path property in your kafka
Open the `$KAFKA_HOME/config/connect-standalone.properties`
and set the `plugin.path` property to your choosen location
You'll need to build your connector starting from an archetype:
```
> mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.7.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] --------------------------------[ pom ]---------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:3.2.0:generate (default-cli) > generate-sources @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:3.2.0:generate (default-cli) < generate-sources @ standalone-pom <<<
[INFO]
[INFO]
[INFO] --- maven-archetype-plugin:3.2.0:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Interactive mode
[INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.7.0] found in catalog remote
Define value for property 'groupId': org.apache.camel.kafkaconnector
Define value for property 'artifactId': aws2-kinesis-extended
Define value for property 'version' 1.0-SNAPSHOT: : 0.7.0
Define value for property 'package' org.apache.camel.kafkaconnector: :
Define value for property 'camel-kafka-connector-name': camel-aws2-kinesis-kafka-connector
[INFO] Using property: camel-kafka-connector-version = 0.7.0
Confirm properties configuration:
groupId: org.apache.camel.kafkaconnector
artifactId: aws2-kinesis-extended
version: 0.7.0
package: org.apache.camel.kafkaconnector
camel-kafka-connector-name: camel-aws2-kinesis-kafka-connector
camel-kafka-connector-version: 0.7.0
Y: : Y
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.7.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
[INFO] Parameter: artifactId, Value: aws2-kinesis-extended
[INFO] Parameter: version, Value: 0.7.0
[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector
[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
[INFO] Parameter: version, Value: 0.7.0
[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
[INFO] Parameter: camel-kafka-connector-name, Value: camel-aws2-kinesis-kafka-connector
[INFO] Parameter: camel-kafka-connector-version, Value: 0.7.0
[INFO] Parameter: artifactId, Value: aws2-kinesis-extended
[INFO] Project created from Archetype in dir: /home/oscerd/workspace/miscellanea/aws2-kinesis-extended
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:01 min
[INFO] Finished at: 2021-01-14T14:15:24+01:00
[INFO] ------------------------------------------------------------------------
> cd /home/workspace/miscellanea/aws2-kinesis-extended
```
We'll need to add a little transform for this example. So import the ftp-extended project in your IDE and create a class in the only package there
```
package org.apache.camel.kafkaconnector;
import java.util.Map;
import org.apache.camel.kafkaconnector.utils.SchemaHelper;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.Record;
public class KinesisRecordDataTransforms <R extends ConnectRecord<R>> implements Transformation<R> {
public static final String FIELD_KEY_CONFIG = "key";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Transforms Data to String");
private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordDataTransforms.class);
@Override
public R apply(R r) {
Object value = r.value();
if (value instanceof Record) {
LOG.debug("Converting record from Data to String");
Record message = (Record) r.value();
String payload = new String(message.data().asByteArray());
return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
SchemaHelper.buildSchemaBuilderForType(payload), payload, r.timestamp());
} else {
LOG.debug("Unexpected message type: {}", r.value().getClass());
return r;
}
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
```
On AWS console create a Kinesis stream delivery stream named streamTest.
Now it's time to setup the connectors
Open the AWS2 Kinesis configuration file
```
name=CamelAws2-kinesisSourceConnector
connector.class=org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSourceConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=KinesisRecordDataTransforms
transforms.KinesisRecordDataTransforms.type=org.apache.camel.kafkaconnector.KinesisRecordDataTransforms
topics=mytopic
camel.source.path.streamName=streamTest
camel.source.endpoint.accessKey=xxxx
camel.source.endpoint.secretKey=yyyy
camel.source.endpoint.region=region
```
and add the correct credentials for AWS.
Now you can run the example
```
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelAWS2KinesisSourceConnector.properties
```
Now send a record to Kinesis streamTest stream with 'Kinesis Event 1' as data and a second record with 'Kinesis Event 2' as data.
As example you can use the KinesisComponentIntegrationTest.java from the camel main repository.
On a different terminal run the consumer with kafkacat
```
./kafkacat -b localhost:9092 -t mytopic
% Auto-selecting Consumer mode (use -P or -C to override)
{"schema":{"type":"string","optional":false},"payload":"Kinesis Event 1."}
{"schema":{"type":"string","optional":false},"payload":"Kinesis Event 2."}
% Reached end of topic mytopic [0] at offset 2
```
## Openshift
### What is needed
- An AWS Kinesis stream
- Some work on AWS console
- An Openshift instance
### Running Kafka using Strimzi Operator
First we install the Strimzi operator and use it to deploy the Kafka broker and Kafka Connect into our OpenShift project.
We need to create security objects as part of installation so it is necessary to switch to admin user.
If you use Minishift, you can do it with the following command:
[source,bash,options="nowrap"]
----
oc login -u system:admin
----
We will use OpenShift project `myproject`.
If it doesn't exist yet, you can create it using following command:
[source,bash,options="nowrap"]
----
oc new-project myproject
----
If the project already exists, you can switch to it with:
[source,bash,options="nowrap"]
----
oc project myproject
----
We can now install the Strimzi operator into this project:
[source,bash,options="nowrap",subs="attributes"]
----
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.19.0/strimzi-cluster-operator-0.19.0.yaml
----
Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed:
[source,bash,options="nowrap",subs="attributes"]
----
# Deploy a single node Kafka broker
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.19.0/examples/kafka/kafka-persistent-single.yaml
# Deploy a single instance of Kafka Connect with no plug-in installed
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.19.0/examples/connect/kafka-connect-s2i-single-node-kafka.yaml
----
Optionally enable the possibility to instantiate Kafka Connectors through specific custom resource:
[source,bash,options="nowrap"]
----
oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true
----
### Add Camel Kafka connector binaries
Strimzi uses `Source2Image` builds to allow users to add their own connectors to the existing Strimzi Docker images.
We now need to build the connectors and add them to the image,
if you have built the whole project (`mvn clean package`) decompress the connectors you need in a folder (i.e. like `my-connectors/`)
so that each one is in its own subfolder
(alternatively you can download the latest officially released and packaged connectors from maven):
In this case we need to extend an existing connector and add a transform, so we need to leverage the archetype
```
> mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.7.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] --------------------------------[ pom ]---------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:3.2.0:generate (default-cli) > generate-sources @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:3.2.0:generate (default-cli) < generate-sources @ standalone-pom <<<
[INFO]
[INFO]
[INFO] --- maven-archetype-plugin:3.2.0:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Interactive mode
[INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.7.0] found in catalog remote
Define value for property 'groupId': org.apache.camel.kafkaconnector
Define value for property 'artifactId': aws2-kinesis-extended
Define value for property 'version' 1.0-SNAPSHOT: : 0.7.0
Define value for property 'package' org.apache.camel.kafkaconnector: :
Define value for property 'camel-kafka-connector-name': camel-aws2-kinesis-kafka-connector
[INFO] Using property: camel-kafka-connector-version = 0.7.0
Confirm properties configuration:
groupId: org.apache.camel.kafkaconnector
artifactId: aws2-kinesis-extended
version: 0.7.0
package: org.apache.camel.kafkaconnector
camel-kafka-connector-name: camel-aws2-kinesis-kafka-connector
camel-kafka-connector-version: 0.7.0
Y: : Y
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.7.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
[INFO] Parameter: artifactId, Value: aws2-kinesis-extended
[INFO] Parameter: version, Value: 0.7.0
[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector
[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
[INFO] Parameter: version, Value: 0.7.0
[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
[INFO] Parameter: camel-kafka-connector-name, Value: camel-aws2-kinesis-kafka-connector
[INFO] Parameter: camel-kafka-connector-version, Value: 0.7.0
[INFO] Parameter: artifactId, Value: aws2-kinesis-extended
[INFO] Project created from Archetype in dir: /home/oscerd/workspace/miscellanea/aws2-kinesis-extended
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:01 min
[INFO] Finished at: 2021-01-14T14:15:24+01:00
[INFO] ------------------------------------------------------------------------
> cd /home/workspace/miscellanea/aws2-kinesis-extended
```
We'll need to add a little transform for this example. So import the ftp-extended project in your IDE and create a class in the only package there
```
package org.apache.camel.kafkaconnector;
import java.util.Map;
import org.apache.camel.kafkaconnector.utils.SchemaHelper;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.Record;
public class KinesisRecordDataTransforms <R extends ConnectRecord<R>> implements Transformation<R> {
public static final String FIELD_KEY_CONFIG = "key";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Transforms Data to String");
private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordDataTransforms.class);
@Override
public R apply(R r) {
Object value = r.value();
if (value instanceof Record) {
LOG.debug("Converting record from Data to String");
Record message = (Record) r.value();
String payload = new String(message.data().asByteArray());
return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
SchemaHelper.buildSchemaBuilderForType(payload), payload, r.timestamp());
} else {
LOG.debug("Unexpected message type: {}", r.value().getClass());
return r;
}
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
```
Now we need to build the connector:
```
> mvn clean package
```
And move the zip package in target to my-connectors folder and unzipped it.
Now we can start the build
[source,bash,options="nowrap"]
----
oc start-build my-connect-cluster-connect --from-dir=./my-connectors/ --follow
----
We should now wait for the rollout of the new image to finish and the replica set with the new connector to become ready.
Once it is done, we can check that the connectors are available in our Kafka Connect cluster.
Strimzi is running Kafka Connect in a distributed mode.
To check the available connector plugins, you can run the following command:
[source,bash,options="nowrap"]
----
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins
----
You should see something like this:
[source,json,options="nowrap"]
----
[{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"0.7.0"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"0.7.0"},{"class":"org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSinkConnector","type":"sink","version":"0.7.0"},{"class":"org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSourceConnector","type":"source","version":"0.7.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
----
### Set the AWS credential as secret (optional)
You can also set the aws creds option as secret, you'll need to edit the file config/aws2-kinesis-cred.properties with the correct credentials and then execute the following command
[source,bash,options="nowrap"]
----
oc create secret generic aws2-kinesi --from-file=config/openshift/aws2-kinesis-cred.properties
----
Now we need to edit KafkaConnectS2I custom resource to reference the secret. For example:
[source,bash,options="nowrap"]
----
spec:
# ...
config:
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
#...
externalConfiguration:
volumes:
- name: aws-credentials
secret:
secretName: aws2-kinesis
----
In this way the secret aws2-kinesis will be mounted as volume with path /opt/kafka/external-configuration/aws-credentials/
### Create connector instance
Now we can create some instance of the AWS2 Kinesis source connector:
[source,bash,options="nowrap"]
----
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF'
{
"name": "kinesis-source-connector",
"config": {
"connector.class": "org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "KinesisRecordDataTransforms",
"transforms.KinesisRecordDataTransforms.type": "org.apache.camel.kafkaconnector.KinesisRecordDataTransforms",
"topics": "kinesis-topic",
"camel.source.path.streamName": "streamTest",
"camel.source.endpoint.accessKey": "xxx",
"camel.source.endpoint.secretKey": "xxx",
"camel.source.endpoint.region": "xxx"
}
}
EOF
----
Altenatively, if have enabled `use-connector-resources`, you can create the connector instance by creating a specific custom resource:
[source,bash,options="nowrap"]
----
oc apply -f - << EOF
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: kinesis-source-connector
namespace: myproject
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSourceConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
transforms: KinesisRecordDataTransforms
transforms.KinesisRecordDataTransforms.type: org.apache.camel.kafkaconnector.KinesisRecordDataTransforms
topics: kinesis-topic
camel.source.path.streamName: streamTest
camel.source.endpoint.accessKey: xxx
camel.source.endpoint.secretKey: xxx
camel.source.endpoint.region: xxx
EOF
----
If you followed the optional step for secret credentials you can run the following command:
[source,bash,options="nowrap"]
----
oc apply -f config/openshift/aws2-kinesis-source.yaml
----
You can check the status of the connector using
[source,bash,options="nowrap"]
----
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connectors/kinesis-source-connector/status
----
Now send a record to Kinesis streamTest stream with 'Kinesis Event 1' as data and a second record with 'Kinesis Event 2' as data.
As example you can use the KinesisComponentIntegrationTest.java from the camel main repository.
On a different terminal run the kafka-producer and send messages to your Kafka Broker.
```
oc exec -i -c kafka my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kinesis-topic
{"schema":{"type":"string","optional":false},"payload":"Kinesis Event 1."}
{"schema":{"type":"string","optional":false},"payload":"Kinesis Event 2."}
```