| = Camel-Kafka-connector AWS2 SNS Sink |
| |
| This is an example for Camel-Kafka-connector AWS2-SNS Sink |
| |
| == Standalone |
| |
| === What is needed |
| |
| - An AWS SNS topic |
| |
| === Running Kafka |
| |
| [source] |
| ---- |
| $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties |
| $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties |
| $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic |
| ---- |
| |
| === Download the connector package |
| |
| Download the connector package tar.gz and extract the content to a directory. In this example we'll use `/home/oscerd/connectors/` |
| |
| [source] |
| ---- |
| > cd /home/oscerd/connectors/ |
| > wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-sns-kafka-connector/0.10.1/camel-aws2-sns-kafka-connector-0.10.1-package.tar.gz |
| > untar.gz camel-aws2-sns-kafka-connector-0.10.1-package.tar.gz |
| ---- |
| |
| === Configuring Kafka Connect |
| |
| You'll need to set up the `plugin.path` property in your kafka |
| |
| Open the `$KAFKA_HOME/config/connect-standalone.properties` and set the `plugin.path` property to your choosen location: |
| |
| [source] |
| ---- |
| ... |
| plugin.path=/home/oscerd/connectors |
| ... |
| ---- |
| |
| === Setup the connectors |
| |
| Open the AWS2 SNS configuration file at `$EXAMPLES/aws2-sns/aws2-sns-sink/config/CamelAWS2SNSSinkConnector.properties` |
| |
| [source] |
| ---- |
| name=CamelAWS2SNSSinkConnector |
| connector.class=org.apache.camel.kafkaconnector.aws2sns.CamelAws2snsSinkConnector |
| key.converter=org.apache.kafka.connect.storage.StringConverter |
| value.converter=org.apache.kafka.connect.storage.StringConverter |
| |
| topics=mytopic |
| |
| camel.sink.path.topicNameOrArn=camel-1 |
| |
| camel.component.aws2-sns.accessKey=xxxx |
| camel.component.aws2-sns.secretKey=yyyy |
| camel.component.aws2-sns.region=eu-west-1 |
| ---- |
| |
| and add the correct credentials for AWS. |
| |
| === Running the example |
| |
| Run the kafka connect with the SNS Sink connector: |
| |
| [source] |
| ---- |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $EXAMPLES/aws2-sns/aws2-sns-sink/config/CamelAWS2SNSSinkConnector.properties |
| ---- |
| |
| On a different terminal run the kafka-producer and send messages to your Kafka Broker. |
| |
| [source] |
| ---- |
| $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopic |
| Kafka to SNS message 1 |
| Kafka to SNS message 2 |
| ---- |
| |
| Connect to your AWS Console and create a subscription for the `camel-1`, you should then receive messages on the chosen subscriber. |
| |
| == Openshift |
| |
| === What is needed |
| |
| - An AWS SNS topic |
| - An Openshift instance |
| |
| === Running Kafka using Strimzi Operator |
| |
| First we install the Strimzi operator and use it to deploy the Kafka broker and Kafka Connect into our OpenShift project. |
| We need to create security objects as part of installation so it is necessary to switch to admin user. |
| If you use Minishift, you can do it with the following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc login -u system:admin |
| ---- |
| |
| We will use OpenShift project `myproject`. |
| If it doesn't exist yet, you can create it using following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc new-project myproject |
| ---- |
| |
| If the project already exists, you can switch to it with: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc project myproject |
| ---- |
| |
| We can now install the Strimzi operator into this project: |
| |
| [source,bash,options="nowrap",subs="attributes"] |
| ---- |
| oc apply -f https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.20.1/strimzi-cluster-operator-0.20.1.yaml |
| ---- |
| |
| Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed: |
| |
| [source,bash,options="nowrap",subs="attributes"] |
| ---- |
| # Deploy a single node Kafka broker |
| oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/kafka/kafka-persistent-single.yaml |
| |
| # Deploy a single instance of Kafka Connect with no plug-in installed |
| oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/connect/kafka-connect-s2i-single-node-kafka.yaml |
| ---- |
| |
| Optionally enable the possibility to instantiate Kafka Connectors through specific custom resource: |
| [source,bash,options="nowrap"] |
| ---- |
| oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true |
| ---- |
| |
| === Add Camel Kafka connector binaries |
| |
| Strimzi uses `Source2Image` builds to allow users to add their own connectors to the existing Strimzi Docker images. |
| We now need to build the connectors and add them to the image. |
| If you have built the whole `Camel Kafka Connector` project (`mvn clean package`) decompress the connectors you need in a folder (i.e. like `my-connectors/`) |
| so that each one is in its own subfolder |
| (alternatively you can download the latest officially released and packaged connectors from maven): |
| |
| So we need to do something like this: |
| |
| [source] |
| ---- |
| > cd my-connectors/ |
| > wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-sns-kafka-connector/0.10.1/camel-aws2-sns-kafka-connector-0.10.1-package.tar.gz |
| > untar.gz camel-aws2-sns-kafka-connector-0.10.1-package.tar.gz |
| ---- |
| |
| Now we can start the build |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc start-build my-connect-cluster-connect --from-dir=./my-connectors/ --follow |
| ---- |
| |
| We should now wait for the rollout of the new image to finish and the replica set with the new connector to become ready. |
| Once it is done, we can check that the connectors are available in our Kafka Connect cluster. |
| Strimzi is running Kafka Connect in a distributed mode. |
| |
| To check the available connector plugins, you can run the following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins |
| ---- |
| |
| You should see something like this: |
| |
| [source,json,options="nowrap"] |
| ---- |
| [{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"0.10.1"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"0.10.1"},{"class":"org.apache.camel.kafkaconnector.aws2sns.CamelAws2snsSinkConnector","type":"sink","version":"0.10.1"},{"class":"org.apache.camel.kafkaconnector.aws2sns.CamelAws2snsSourceConnector","type":"source","version":"0.10.1"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}] |
| ---- |
| |
| === Set the AWS credentials as OpenShift secret (optional) |
| |
| Credentials to your AWS account can be specified directly in the connector instance definition in plain text, or you can create an OpenShift secret object beforehand and then reference the secret. |
| |
| If you want to use the secret, you'll need to edit the file `$EXAMPLES/aws2-sns/aws2-sns-sink/config/openshift/aws2-sns-cred.properties` with the correct credentials and then create the secret with the following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc create secret generic aws2-sns --from-file=$EXAMPLES/aws2-sns/aws2-sns-sink/config/openshift/aws2-sns-cred.properties |
| ---- |
| |
| Then you need to edit KafkaConnectS2I custom resource to reference the secret. You can do that either in the OpenShift console or using `oc edit KafkaConnectS2I` command. |
| |
| Add following configuration to the custom resource: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| spec: |
| # ... |
| config: |
| config.providers: file |
| config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider |
| #... |
| externalConfiguration: |
| volumes: |
| - name: aws-credentials |
| secret: |
| secretName: aws2-sns |
| ---- |
| |
| In this way the secret `aws2-sns` will be mounted as volume with path `/opt/kafka/external-configuration/aws-credentials/` |
| |
| === Create connector instance |
| |
| If you have enabled the connector custom resources using the `use-connector-resources` annotation, you can create the connector instance by creating a specific custom resource: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc apply -f - << EOF |
| apiVersion: kafka.strimzi.io/v1alpha1 |
| kind: KafkaConnector |
| metadata: |
| name: sns-sink-connector |
| namespace: myproject |
| labels: |
| strimzi.io/cluster: my-connect-cluster |
| spec: |
| class: org.apache.camel.kafkaconnector.aws2sns.CamelAws2snsSinkConnector |
| tasksMax: 1 |
| config: |
| key.converter: org.apache.kafka.connect.storage.StringConverter |
| value.converter: org.apache.kafka.connect.storage.StringConverter |
| topics: sns-topic |
| camel.sink.path.topicNameOrArn: camel-connector-test |
| camel.component.aws2-sns.accessKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws2-sns-cred.properties:accessKey} |
| camel.component.aws2-sns.secretKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws2-sns-cred.properties:secretKey} |
| camel.component.aws2-sns.region: ${file:/opt/kafka/external-configuration/aws-credentials/aws2-sns-cred.properties:region} |
| EOF |
| ---- |
| |
| If you don't want to use the OpenShift secret for storing the credentials, replace the properties in the custom resource for the actual values, |
| otherwise you can now create the custom resource using: |
| |
| [source] |
| ---- |
| oc apply -f $EXAMPLES/aws2-sns/aws2-sns-sink/config/openshift/aws2-sns-sink-connector.yaml |
| ---- |
| |
| The other option, if you are not using the custom resources, is to create the instance of AWS2 SNS sink connector through the Kafka Connect API: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -X POST \ |
| -H "Accept:application/json" \ |
| -H "Content-Type:application/json" \ |
| http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF' |
| { |
| "name": "sns-sink-connector", |
| "config": { |
| "connector.class": "org.apache.camel.kafkaconnector.aws2sns.CamelAws2snsSinkConnector", |
| "tasks.max": "1", |
| "key.converter": "org.apache.kafka.connect.storage.StringConverter", |
| "value.converter": "org.apache.kafka.connect.storage.StringConverter", |
| "topics": "sns-topic", |
| "camel.sink.path.topicNameOrArn": "camel-connector-test", |
| "camel.component.aws2-sns.accessKey": "${file:/opt/kafka/external-configuration/aws-credentials/aws2-sns-cred.properties:accessKey}", |
| "camel.component.aws2-sns.secretKey": "${file:/opt/kafka/external-configuration/aws-credentials/aws2-sns-cred.properties:secretKey}", |
| "camel.component.aws2-sns.region": "${file:/opt/kafka/external-configuration/aws-credentials/aws2-sns-cred.properties:region}" |
| } |
| } |
| EOF |
| ---- |
| |
| Again, if you don't use the OpenShift secret, replace the properties with your actual AWS credentials. |
| |
| You can check the status of the connector using: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connectors/sns-sink-connector/status |
| ---- |
| |
| === Check enqueued messages |
| |
| Connect to your AWS Console and create a subscription for the `camel-connector-test`, you should then receive messages on the chosen subscriber. |
| |
| Run the kafka-producer and send messages to your Kafka Broker. |
| |
| [source] |
| ---- |
| oc exec -i -c kafka my-cluster-kafka-0 -- bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic sns-topic |
| Kafka to SNS message 1 |
| Kafka to SNS message 2 |
| ---- |
| |