| # Camel-Kafka-connector Slack Source |
| |
| This is an example for Camel-Kafka-connector Slack Source |
| |
| ## Standalone |
| |
| ### What is needed |
| |
| - A Slack app |
| - A Slack channel |
| |
| ### Setting up Slack |
| |
| You'll need a workspace and a channel. |
| |
| In your Slack settings, create an app. |
| |
| Add the following permissions to your Bot Token scopes: |
| * channels:history |
| * channels:read |
| |
| Install the app on your workspace and select the channel you want to consume from. |
| |
| Use the Bot User OAuth Access Token as token for this example. |
| |
| ### Running Kafka |
| |
| ``` |
| $KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties |
| $KAFKA_HOME/bin/kafka-server-start.sh config/server.properties |
| $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic |
| ``` |
| |
| ### Setting up the needed bits and running the example |
| |
| You'll need to setup the plugin.path property in your kafka |
| |
| Open the `$KAFKA_HOME/config/connect-standalone.properties` |
| |
| and set the `plugin.path` property to your choosen location |
| |
| You'll need to build your connector starting from an archetype: |
| |
| ``` |
| > mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.5.0 |
| [INFO] Scanning for projects... |
| [INFO] |
| [INFO] ------------------< org.apache.maven:standalone-pom >------------------- |
| [INFO] Building Maven Stub Project (No POM) 1 |
| [INFO] --------------------------------[ pom ]--------------------------------- |
| [INFO] |
| [INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>> |
| [INFO] |
| [INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<< |
| [INFO] |
| [INFO] |
| [INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom --- |
| [INFO] Generating project in Interactive mode |
| [INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.4.0] found in catalog remote |
| Define value for property 'groupId': org.apache.camel.kafkaconnector |
| Define value for property 'artifactId': slack-extended |
| Define value for property 'version' 1.0-SNAPSHOT: : 0.5.0 |
| Define value for property 'package' org.apache.camel.kafkaconnector: : |
| Define value for property 'camel-kafka-connector-name': camel-slack-kafka-connector |
| [INFO] Using property: camel-kafka-connector-version = 0.5.0 |
| Confirm properties configuration: |
| groupId: org.apache.camel.kafkaconnector |
| artifactId: slack-extended |
| version: 0.5.0 |
| package: org.apache.camel.kafkaconnector |
| camel-kafka-connector-name: camel-slack-kafka-connector |
| camel-kafka-connector-version: 0.5.0 |
| Y: : y |
| [INFO] ---------------------------------------------------------------------------- |
| [INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.5.0 |
| [INFO] ---------------------------------------------------------------------------- |
| [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector |
| [INFO] Parameter: artifactId, Value: slack-extended |
| [INFO] Parameter: version, Value: 0.5.0 |
| [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector |
| [INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector |
| [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector |
| [INFO] Parameter: version, Value: 0.5.0 |
| [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector |
| [INFO] Parameter: camel-kafka-connector-name, Value: camel-slack-kafka-connector |
| [INFO] Parameter: camel-kafka-connector-version, Value: 0.5.0 |
| [INFO] Parameter: artifactId, Value: slack-extended |
| [INFO] Project created from Archetype in dir: /home/oscerd/playground/slack-extended |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] BUILD SUCCESS |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] Total time: 39.295 s |
| [INFO] Finished at: 2020-10-13T09:16:51+02:00 |
| [INFO] ------------------------------------------------------------------------ |
| > cd /home/workspace/miscellanea/slack-extended |
| ``` |
| |
| and add the following class in the main package |
| |
| ``` |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.camel.kafkaconnector.slack.source; |
| |
| import java.util.Map; |
| |
| import org.apache.camel.component.slack.helper.SlackMessage; |
| import org.apache.camel.kafkaconnector.utils.SchemaHelper; |
| import org.apache.kafka.common.config.ConfigDef; |
| import org.apache.kafka.connect.connector.ConnectRecord; |
| import org.apache.kafka.connect.transforms.Transformation; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class SlackTransformer <R extends ConnectRecord<R>> implements Transformation<R> { |
| public static final String FIELD_KEY_CONFIG = "key"; |
| public static final ConfigDef CONFIG_DEF = new ConfigDef() |
| .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, |
| "Transforms String-based content from Kafka into a map"); |
| |
| private static final Logger LOG = LoggerFactory.getLogger(SlackTransformer.class); |
| |
| @Override |
| public R apply(R r) { |
| Object value = r.value(); |
| |
| if (r.value() instanceof SlackMessage) { |
| LOG.debug("Converting record from SlackMessage to text"); |
| SlackMessage message = (SlackMessage) r.value(); |
| |
| LOG.debug("Received text: {}", message.getText()); |
| |
| return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), |
| SchemaHelper.buildSchemaBuilderForType(message.getText()), message.getText(), r.timestamp()); |
| |
| } else { |
| LOG.debug("Unexpected message type: {}", r.value().getClass()); |
| |
| return r; |
| } |
| } |
| |
| @Override |
| public ConfigDef config() { |
| return CONFIG_DEF; |
| } |
| |
| @Override |
| public void close() { |
| |
| } |
| |
| @Override |
| public void configure(Map<String, ?> map) { |
| |
| } |
| } |
| ``` |
| |
| Now we need to build the connector: |
| |
| ``` |
| > mvn clean package |
| ``` |
| |
| In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated zip from the previois build |
| |
| ``` |
| > cd /home/oscerd/connectors/ |
| > cp /home/workspace/miscellanea/slack-extended/target/slack-extended-0.5.0-package.zip . |
| > unzip slack-extended-0.5.0-package.zip |
| ``` |
| |
| Now it's time to setup the connectors |
| |
| Open the Slack source configuration file |
| |
| ``` |
| name=CamelSlackSourceConnector |
| connector.class=org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector |
| key.converter=org.apache.kafka.connect.storage.StringConverter |
| transforms=SlackTransformer |
| transforms.SlackTransformer.type=org.apache.camel.kafkaconnector.SlackTransformer |
| |
| topics=mytopic |
| |
| camel.source.path.channel=general |
| camel.source.endpoint.token=<the token created for your Bot> |
| ``` |
| |
| Now you can run the example |
| |
| ``` |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelSlackSourceConnector.properties |
| ``` |
| |
| Add messages to your channel for example "Hello" |
| |
| In another terminal, using kafkacat, you should be able to see the headers. |
| |
| ``` |
| > kafkacat -b localhost:9092 -t mytopic -f 'Topic %t[%p], offset: %o, key: %k, payload: %s \n' |
| Topic test301[0], offset: 22, key: , payload: {"schema":{"type":"string","optional":false},"payload":"Hello"} |
| ``` |
| |
| ## Openshift |
| |
| ### What is needed |
| |
| - A Slack App |
| - A Slack channel |
| - An Openshift instance |
| |
| ### Running Kafka using Strimzi Operator |
| |
| First we install the Strimzi operator and use it to deploy the Kafka broker and Kafka Connect into our OpenShift project. |
| We need to create security objects as part of installation so it is necessary to switch to admin user. |
| If you use Minishift, you can do it with the following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc login -u system:admin |
| ---- |
| |
| We will use OpenShift project `myproject`. |
| If it doesn't exist yet, you can create it using following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc new-project myproject |
| ---- |
| |
| If the project already exists, you can switch to it with: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc project myproject |
| ---- |
| |
| We can now install the Strimzi operator into this project: |
| |
| [source,bash,options="nowrap",subs="attributes"] |
| ---- |
| oc apply -f https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.19.0/strimzi-cluster-operator-0.19.0.yaml |
| ---- |
| |
| Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed: |
| |
| [source,bash,options="nowrap",subs="attributes"] |
| ---- |
| # Deploy a single node Kafka broker |
| oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.19.0/examples/kafka/kafka-persistent-single.yaml |
| |
| # Deploy a single instance of Kafka Connect with no plug-in installed |
| oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.19.0/examples/connect/kafka-connect-s2i-single-node-kafka.yaml |
| ---- |
| |
| Optionally enable the possibility to instantiate Kafka Connectors through specific custom resource: |
| [source,bash,options="nowrap"] |
| ---- |
| oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true |
| ---- |
| |
| ### Add Camel Kafka connector binaries |
| |
| Strimzi uses `Source2Image` builds to allow users to add their own connectors to the existing Strimzi Docker images. |
| We now need to build the connectors and add them to the image, |
| if you have built the whole project (`mvn clean package`) decompress the connectors you need in a folder (i.e. like `my-connectors/`) |
| so that each one is in its own subfolder |
| (alternatively you can download the latest officially released and packaged connectors from maven): |
| |
| In this case we need to extend an existing connector and add a Transform, so we need to leverage the archetype |
| |
| ``` |
| > mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.5.0 |
| [INFO] Scanning for projects... |
| [INFO] |
| [INFO] ------------------< org.apache.maven:standalone-pom >------------------- |
| [INFO] Building Maven Stub Project (No POM) 1 |
| [INFO] --------------------------------[ pom ]--------------------------------- |
| [INFO] |
| [INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>> |
| [INFO] |
| [INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<< |
| [INFO] |
| [INFO] |
| [INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom --- |
| [INFO] Generating project in Interactive mode |
| [INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.4.0] found in catalog remote |
| Define value for property 'groupId': org.apache.camel.kafkaconnector |
| Define value for property 'artifactId': slack-extended |
| Define value for property 'version' 1.0-SNAPSHOT: : 0.5.0 |
| Define value for property 'package' org.apache.camel.kafkaconnector: : |
| Define value for property 'camel-kafka-connector-name': camel-slack-kafka-connector |
| [INFO] Using property: camel-kafka-connector-version = 0.5.0 |
| Confirm properties configuration: |
| groupId: org.apache.camel.kafkaconnector |
| artifactId: slack-extended |
| version: 0.5.0 |
| package: org.apache.camel.kafkaconnector |
| camel-kafka-connector-name: camel-slack-kafka-connector |
| camel-kafka-connector-version: 0.5.0 |
| Y: : y |
| [INFO] ---------------------------------------------------------------------------- |
| [INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.5.0 |
| [INFO] ---------------------------------------------------------------------------- |
| [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector |
| [INFO] Parameter: artifactId, Value: slack-extended |
| [INFO] Parameter: version, Value: 0.5.0 |
| [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector |
| [INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector |
| [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector |
| [INFO] Parameter: version, Value: 0.5.0 |
| [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector |
| [INFO] Parameter: camel-kafka-connector-name, Value: camel-slack-kafka-connector |
| [INFO] Parameter: camel-kafka-connector-version, Value: 0.5.0 |
| [INFO] Parameter: artifactId, Value: slack-extended |
| [INFO] Project created from Archetype in dir: /home/oscerd/playground/slack-extended |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] BUILD SUCCESS |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] Total time: 39.295 s |
| [INFO] Finished at: 2020-10-13T09:16:51+02:00 |
| [INFO] ------------------------------------------------------------------------ |
| > cd /home/workspace/miscellanea/slack-extended |
| ``` |
| and add the following class in the main package |
| |
| ``` |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.camel.kafkaconnector.slack.source; |
| |
| import java.util.Map; |
| |
| import org.apache.camel.component.slack.helper.SlackMessage; |
| import org.apache.camel.kafkaconnector.utils.SchemaHelper; |
| import org.apache.kafka.common.config.ConfigDef; |
| import org.apache.kafka.connect.connector.ConnectRecord; |
| import org.apache.kafka.connect.transforms.Transformation; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class SlackTransformer <R extends ConnectRecord<R>> implements Transformation<R> { |
| public static final String FIELD_KEY_CONFIG = "key"; |
| public static final ConfigDef CONFIG_DEF = new ConfigDef() |
| .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, |
| "Transforms String-based content from Kafka into a map"); |
| |
| private static final Logger LOG = LoggerFactory.getLogger(SlackTransformer.class); |
| |
| @Override |
| public R apply(R r) { |
| Object value = r.value(); |
| |
| if (r.value() instanceof SlackMessage) { |
| LOG.debug("Converting record from SlackMessage to text"); |
| SlackMessage message = (SlackMessage) r.value(); |
| |
| LOG.debug("Received text: {}", message.getText()); |
| |
| return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), |
| SchemaHelper.buildSchemaBuilderForType(message.getText()), message.getText(), r.timestamp()); |
| |
| } else { |
| LOG.debug("Unexpected message type: {}", r.value().getClass()); |
| |
| return r; |
| } |
| } |
| |
| @Override |
| public ConfigDef config() { |
| return CONFIG_DEF; |
| } |
| |
| @Override |
| public void close() { |
| |
| } |
| |
| @Override |
| public void configure(Map<String, ?> map) { |
| |
| } |
| } |
| ``` |
| |
| Now we need to build the connector: |
| |
| ``` |
| > mvn clean package |
| ``` |
| And move the zip package in targe to my-connectors folder and unzipped it. |
| |
| Now we can start the build |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc start-build my-connect-cluster-connect --from-dir=./my-connectors/ --follow |
| ---- |
| |
| We should now wait for the rollout of the new image to finish and the replica set with the new connector to become ready. |
| Once it is done, we can check that the connectors are available in our Kafka Connect cluster. |
| Strimzi is running Kafka Connect in a distributed mode. |
| |
| To check the available connector plugins, you can run the following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins |
| ---- |
| |
| You should see something like this: |
| |
| [source,json,options="nowrap"] |
| ---- |
| [{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"0.5.0"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"0.5.0"},{"class":"org.apache.camel.kafkaconnector.slack.CamelSlackSinkConnector","type":"sink","version":"0.5.0"},{"class":"org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector","type":"source","version":"0.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}] |
| ---- |
| |
| ### Set the Bot Token as secret (optional) |
| |
| You can also set the aws creds option as secret, you'll need to edit the file config/aws2-s3-cred.properties with the correct credentials and then execute the following command |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc create secret generic slack-token --from-file=config/openshift/slack-token.properties |
| ---- |
| |
| Now we need to edit KafkaConnectS2I custom resource to reference the secret. For example: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| spec: |
| # ... |
| config: |
| config.providers: file |
| config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider |
| #... |
| externalConfiguration: |
| volumes: |
| - name: slack-token |
| secret: |
| secretName: slack-token |
| ---- |
| |
| In this way the secret slack-token will be mounted as volume with path /opt/kafka/external-configuration/slack-token/ |
| |
| ### Create connector instance |
| |
| Now we can create some instance of the Slack source connector: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -X POST \ |
| -H "Accept:application/json" \ |
| -H "Content-Type:application/json" \ |
| http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF' |
| { |
| "name": "slack-source-connector", |
| "config": { |
| "connector.class": "org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector", |
| "tasks.max": "1", |
| "key.converter": "org.apache.kafka.connect.storage.StringConverter", |
| "transforms": "SlackTransformer", |
| "transforms.SlackTransformer.type": "org.apache.camel.kafkaconnector.SlackTransformer", |
| "topics": "slack-topic", |
| "camel.source.path.channel": "general", |
| "camel.source.endpoint.token": "<token>" |
| } |
| } |
| EOF |
| ---- |
| |
| Altenatively, if have enabled `use-connector-resources`, you can create the connector instance by creating a specific custom resource: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc apply -f - << EOF |
| apiVersion: kafka.strimzi.io/v1alpha1 |
| kind: KafkaConnector |
| metadata: |
| name: slack-source-connector |
| namespace: myproject |
| labels: |
| strimzi.io/cluster: my-connect-cluster |
| spec: |
| class: org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector |
| tasksMax: 1 |
| config: |
| key.converter: org.apache.kafka.connect.storage.StringConverter |
| transforms: SlackTransformer |
| transforms.SlackTransformer.type: org.apache.camel.kafkaconnector.SlackTransformer |
| topics: slack-topic |
| camel.source.path.channel: general |
| camel.source.endpoint.token: token |
| EOF |
| ---- |
| |
| If you followed the optional step for secret webhook you can run the following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc apply -f config/openshift/slack-source.yaml |
| ---- |
| |
| Add messages to your channel for example "Hello" |
| |
| Using kafkacat, you should be able to see the headers. |
| |
| ``` |
| > kafkacat -b localhost:9092 -t mytopic -f 'Topic %t[%p], offset: %o, key: %k, payload: %s \n' |
| Topic test301[0], offset: 22, key: , payload: {"schema":{"type":"string","optional":false},"payload":"Hello"} |
| ``` |
| |