| # Camel-Kafka-connector Slack Source with Apicurio Registry |
| |
| This is an example for Camel-Kafka-connector Slack Source with Apicurio Registry |
| |
| ## Standalone |
| |
| ### What is needed |
| |
| - A Slack app |
| - A Slack channel |
| - An Apicurio registry instance |
| |
| ### Setting up Slack |
| |
| You'll need a workspace and a channel. |
| |
| In your Slack settings, create an app. |
| |
| Add the following permissions to your Bot Token scopes: |
| * channels:history |
| * channels:read |
| |
| Install the app on your workspace and select the channel you want to consume from. |
| |
| Use the Bot User OAuth Access Token as token for this example. |
| |
| ### Running Kafka |
| |
| ``` |
| $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties |
| $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties |
| $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic |
| ``` |
| |
| ### Running Apicurio Registry |
| |
| In this case we'll use the in-memory docker image |
| |
| ``` |
| docker run -it -p 8080:8080 apicurio/apicurio-registry-mem:1.3.1.Final |
| exec java -Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -javaagent:/opt/agent-bond/agent-bond.jar=jmx_exporter{{9779:/opt/agent-bond/jmx_exporter_config.yml}} -XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -XX:MinHeapFreeRatio=20 -XX:MaxHeapFreeRatio=40 -XX:+ExitOnOutOfMemoryError -cp . -jar /deployments/apicurio-registry-app-1.3.1.Final-runner.jar |
| __ ____ __ _____ ___ __ ____ ______ |
| --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ |
| -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \ |
| --\___\_\____/_/ |_/_/|_/_/|_|\____/___/ |
| 2020-10-27 06:29:52,739 WARN [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.username" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo |
| 2020-10-27 06:29:52,739 WARN [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.driver" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo |
| 2020-10-27 06:29:52,739 WARN [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.url" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo |
| 2020-10-27 06:29:52,739 WARN [io.qua.config] (main) Unrecognized configuration key "quarkus.hibernate-orm.database.generation" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo |
| 2020-10-27 06:29:52,739 WARN [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.password" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo |
| 2020-10-27 06:29:53,806 INFO [io.quarkus] (main) apicurio-registry-app 1.3.1.Final on JVM (powered by Quarkus 1.8.0.Final) started in 1.233s. Listening on: http://0.0.0.0:8080 |
| 2020-10-27 06:29:53,806 INFO [io.quarkus] (main) Profile prod activated. |
| 2020-10-27 06:29:53,806 INFO [io.quarkus] (main) Installed features: [cdi, resteasy, resteasy-jackson, servlet, smallrye-health, smallrye-metrics, smallrye-openapi] |
| ``` |
| |
| In terms of needed running bits we are now on track. |
| |
| ### Setting up the needed bits and running the example |
| |
| You'll need to setup the plugin.path property in your kafka |
| |
| Open the `$KAFKA_HOME/config/connect-standalone.properties` |
| |
| and set the `plugin.path` property to your choosen location |
| |
| You'll need to build your connector starting from an archetype: |
| |
| ``` |
| > mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.11.0 |
| [INFO] Scanning for projects... |
| [INFO] |
| [INFO] ------------------< org.apache.maven:standalone-pom >------------------- |
| [INFO] Building Maven Stub Project (No POM) 1 |
| [INFO] --------------------------------[ pom ]--------------------------------- |
| [INFO] |
| [INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>> |
| [INFO] |
| [INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<< |
| [INFO] |
| [INFO] |
| [INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom --- |
| [INFO] Generating project in Interactive mode |
| [INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.4.0] found in catalog remote |
| Define value for property 'groupId': org.apache.camel.kafkaconnector |
| Define value for property 'artifactId': slack-extended |
| Define value for property 'version' 1.0-SNAPSHOT: : 0.11.0 |
| Define value for property 'package' org.apache.camel.kafkaconnector: : |
| Define value for property 'camel-kafka-connector-name': camel-slack-kafka-connector |
| [INFO] Using property: camel-kafka-connector-version = 0.11.0 |
| Confirm properties configuration: |
| groupId: org.apache.camel.kafkaconnector |
| artifactId: slack-extended |
| version: 0.11.0 |
| package: org.apache.camel.kafkaconnector |
| camel-kafka-connector-name: camel-slack-kafka-connector |
| camel-kafka-connector-version: 0.11.0 |
| Y: : y |
| [INFO] ---------------------------------------------------------------------------- |
| [INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.11.0 |
| [INFO] ---------------------------------------------------------------------------- |
| [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector |
| [INFO] Parameter: artifactId, Value: slack-extended |
| [INFO] Parameter: version, Value: 0.11.0 |
| [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector |
| [INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector |
| [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector |
| [INFO] Parameter: version, Value: 0.11.0 |
| [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector |
| [INFO] Parameter: camel-kafka-connector-name, Value: camel-slack-kafka-connector |
| [INFO] Parameter: camel-kafka-connector-version, Value: 0.11.0 |
| [INFO] Parameter: artifactId, Value: slack-extended |
| [INFO] Project created from Archetype in dir: /home/oscerd/playground/slack-extended |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] BUILD SUCCESS |
| [INFO] ------------------------------------------------------------------------ |
| [INFO] Total time: 39.295 s |
| [INFO] Finished at: 2020.11.03T09:16:51+02:00 |
| [INFO] ------------------------------------------------------------------------ |
| > cd /home/workspace/miscellanea/slack-extended |
| ``` |
| |
| Now we need to edit the POM |
| |
| |
| ``` |
| . |
| . |
| . |
| <version>0.11.0</version> |
| <name>A Camel Kafka Connector extended</name> |
| |
| <properties> |
| <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> |
| <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> |
| <camel-kafka-connector-version>${project.version}</camel-kafka-connector-version> |
| </properties> |
| |
| <dependencies> |
| <dependency> |
| <groupId>org.apache.kafka</groupId> |
| <artifactId>connect-api</artifactId> |
| <scope>provided</scope> |
| <version>${kafka.version}</version> |
| </dependency> |
| <dependency> |
| <groupId>org.apache.kafka</groupId> |
| <artifactId>connect-transforms</artifactId> |
| <scope>provided</scope> |
| <version>${kafka.version}</version> |
| </dependency> |
| <dependency> |
| <groupId>org.apache.camel.kafkaconnector</groupId> |
| <artifactId>camel-kafka-connector</artifactId> |
| <version>0.11.0</version> |
| </dependency> |
| <dependency> |
| <groupId>org.apache.camel.kafkaconnector</groupId> |
| <artifactId>camel-slack-kafka-connector</artifactId> |
| <version>0.11.0</version> |
| </dependency> |
| <dependency> |
| <groupId>io.apicurio</groupId> |
| <artifactId>apicurio-registry-utils-converter</artifactId> |
| <version>1.3.1.Final</version> |
| </dependency> |
| <dependency> |
| <groupId>io.apicurio</groupId> |
| <artifactId>apicurio-registry-rest-client</artifactId> |
| <version>1.3.1.Final</version> |
| </dependency> |
| </dependencies> |
| . |
| . |
| . |
| ``` |
| |
| and add the following class in the main package |
| |
| Now we need to build the connector: |
| |
| ``` |
| > mvn clean package |
| ``` |
| |
| In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated tar.gz from the previois build |
| |
| ``` |
| > cd /home/oscerd/connectors/ |
| > cp /home/workspace/miscellanea/slack-extended/target/slack-extended-0.11.0-package.tar.gz . |
| > untar.gz slack-extended-0.11.0-package.tar.gz |
| ``` |
| |
| Now it's time to setup the connector |
| |
| Open the Slack source apicurio configuration file |
| |
| ``` |
| name=CamelSlackSourceConnector |
| connector.class=org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector |
| key.converter=org.apache.kafka.connect.storage.StringConverter |
| transforms=SlackTransformer |
| transforms.SlackTransformer.type=org.apache.camel.kafkaconnector.slack.transformers.SlackTransforms |
| value.converter.apicurio.registry.url=http://localhost:8080/api |
| value.converter=io.apicurio.registry.utils.converter.ExtJsonConverter |
| value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy |
| |
| topics=mytopic |
| |
| camel.source.path.channel=general |
| camel.source.endpoint.token=<the token created for your Bot> |
| ``` |
| |
| Now you can run the example |
| |
| ``` |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelSlackSourceApicurioConnector.properties |
| ``` |
| |
| Add messages to your channel for example "Hello" |
| |
| In another terminal, using kafkacat, you should be able to see body. |
| |
| ``` |
| > kafkacat -b localhost:9092 -t mytopic |
| {"schemaId":1,"payload":"Hello"} |
| ``` |
| |
| ### What happened at registry level |
| |
| The transform will take the text field from the SlackMessage pojo and set it as value with a schema determined by the type of the text field, so basically a String. |
| |
| ``` |
| > curl -X GET http://localhost:8080/api/artifacts/ |
| ["mytopic-value"] |
| ``` |
| |
| We have just one artifact in the registry and in the apicurio logs we should see just one single reference: |
| |
| ``` |
| 2020-10-27 06:30:08,175 WARN [io.api.reg.res.ArtifactsResourceImpl] (executor-thread-1) Artifact mytopic-value/1 not indexed, status: 0 |
| ``` |
| |
| We can also collect some version meta info for the schema |
| |
| ``` |
| curl -X GET http://localhost:8080/api/artifacts/mytopic-value/versions/1/meta |
| {"version":1,"createdOn":160.11.0208148,"type":"KCONNECT","globalId":1,"state":"ENABLED","id":"mytopic-value"} |
| ``` |
| |
| and some meta info too |
| |
| ``` |
| curl -X GET http://localhost:8080/api/artifacts/mytopic-value/meta |
| {"createdOn":160.11.0208148,"modifiedOn":160.11.0208148,"id":"mytopic-value","version":1,"type":"KCONNECT","globalId":1,"state":"ENABLED"} |
| ``` |
| |
| and finally the schema content |
| |
| ``` |
| curl -X GET http://localhost:8080/api/artifacts/mytopic-value |
| {"type":"string","optional":false} |
| ``` |