blob: ddd3386bf08ec01d253e80ca7932d3023e0fd2cd [file] [log] [blame]
# Camel-Kafka-connector Slack Source with Apicurio Registry and Avro
This is an example for Camel-Kafka-connector Slack Source with Apicurio Registry and Avro
## Standalone
### What is needed
- A Slack app
- A Slack channel
- An Apicurio registry instance
### Setting up Slack
You'll need a workspace and a channel.
In your Slack settings, create an app.
Add the following permissions to your Bot Token scopes:
* channels:history
* channels:read
Install the app on your workspace and select the channel you want to consume from.
Use the Bot User OAuth Access Token as token for this example.
### Running Kafka
```
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
```
### Running Apicurio Registry
In this case we'll use the in-memory docker image
```
docker run -it -p 8080:8080 apicurio/apicurio-registry-mem:1.3.1.Final
exec java -Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -javaagent:/opt/agent-bond/agent-bond.jar=jmx_exporter{{9779:/opt/agent-bond/jmx_exporter_config.yml}} -XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -XX:MinHeapFreeRatio=20 -XX:MaxHeapFreeRatio=40 -XX:+ExitOnOutOfMemoryError -cp . -jar /deployments/apicurio-registry-app-1.3.1.Final-runner.jar
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-10-27 06:29:52,739 WARN [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.username" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
2020-10-27 06:29:52,739 WARN [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.driver" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
2020-10-27 06:29:52,739 WARN [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.url" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
2020-10-27 06:29:52,739 WARN [io.qua.config] (main) Unrecognized configuration key "quarkus.hibernate-orm.database.generation" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
2020-10-27 06:29:52,739 WARN [io.qua.config] (main) Unrecognized configuration key "quarkus.datasource.password" was provided; it will be ignored; verify that the dependency extension for this configuration is set or you did not make a typo
2020-10-27 06:29:53,806 INFO [io.quarkus] (main) apicurio-registry-app 1.3.1.Final on JVM (powered by Quarkus 1.8.0.Final) started in 1.233s. Listening on: http://0.0.0.0:8080
2020-10-27 06:29:53,806 INFO [io.quarkus] (main) Profile prod activated.
2020-10-27 06:29:53,806 INFO [io.quarkus] (main) Installed features: [cdi, resteasy, resteasy-jackson, servlet, smallrye-health, smallrye-metrics, smallrye-openapi]
```
In terms of needed running bits we are now on track.
### Setting up the needed bits and running the example
You'll need to setup the plugin.path property in your kafka
Open the `$KAFKA_HOME/config/connect-standalone.properties`
and set the `plugin.path` property to your choosen location
You'll need to build your connector starting from an archetype:
```
> mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.11.5
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] --------------------------------[ pom ]---------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
[INFO]
[INFO]
[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Interactive mode
[INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.4.0] found in catalog remote
Define value for property 'groupId': org.apache.camel.kafkaconnector
Define value for property 'artifactId': slack-extended
Define value for property 'version' 1.0-SNAPSHOT: : 0.11.5
Define value for property 'package' org.apache.camel.kafkaconnector: :
Define value for property 'camel-kafka-connector-name': camel-slack-kafka-connector
[INFO] Using property: camel-kafka-connector-version = 0.11.5
Confirm properties configuration:
groupId: org.apache.camel.kafkaconnector
artifactId: slack-extended
version: 0.11.5
package: org.apache.camel.kafkaconnector
camel-kafka-connector-name: camel-slack-kafka-connector
camel-kafka-connector-version: 0.11.5
Y: : y
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.11.5
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
[INFO] Parameter: artifactId, Value: slack-extended
[INFO] Parameter: version, Value: 0.11.5
[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector
[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
[INFO] Parameter: version, Value: 0.11.5
[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
[INFO] Parameter: camel-kafka-connector-name, Value: camel-slack-kafka-connector
[INFO] Parameter: camel-kafka-connector-version, Value: 0.11.5
[INFO] Parameter: artifactId, Value: slack-extended
[INFO] Project created from Archetype in dir: /home/oscerd/playground/slack-extended
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 39.295 s
[INFO] Finished at: 2020.11.53T09:16:51+02:00
[INFO] ------------------------------------------------------------------------
> cd /home/workspace/miscellanea/slack-extended
```
Now we need to edit the POM
```
.
.
.
<version>0.11.5</version>
<name>A Camel Kafka Connector extended</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<camel-kafka-connector-version>${project.version}</camel-kafka-connector-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<scope>provided</scope>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-transforms</artifactId>
<scope>provided</scope>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel.kafkaconnector</groupId>
<artifactId>camel-kafka-connector</artifactId>
<version>0.11.5</version>
</dependency>
<dependency>
<groupId>org.apache.camel.kafkaconnector</groupId>
<artifactId>camel-slack-kafka-connector</artifactId>
<version>0.11.5</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-converter</artifactId>
<version>1.3.1.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-rest-client</artifactId>
<version>1.3.1.Final</version>
</dependency>
</dependencies>
.
.
.
```
Now we need to build the connector:
```
> mvn clean package
```
In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated tar.gz from the previois build
```
> cd /home/oscerd/connectors/
> cp /home/workspace/miscellanea/slack-extended/target/slack-extended-0.11.5-package.tar.gz .
> untar.gz slack-extended-0.11.5-package.tar.gz
```
Now it's time to setup the connector
Open the Slack source apicurio configuration file
```
name=CamelSlackSourceConnector
connector.class=org.apache.camel.kafkaconnector.slack.CamelSlackSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
transforms=SlackTransformer
transforms.SlackTransformer.type=org.apache.camel.kafkaconnector.slack.transformers.SlackTransforms
value.converter.apicurio.registry.url=http://localhost:8080/api
value.converter=io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
topics=mytopic
camel.source.path.channel=general
camel.source.endpoint.token=<the token created for your Bot>
```
Now you can run the example
```
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelSlackSourceAvroApicurioConnector.properties
```
You'll need to use a little Java Consumer to consume the messages.
In the folder kafka-avro-basic-consumer run the following command:
```
mvn clean compile exec:exec -Dkafka.topic.name=mytopic
```
Send a message in your chat like Hello and you should see it logged
```
2020-10-27 11:43:42,819 [main ] INFO SimpleConsumer - Hello
```
### What happened at registry level
The transform will take the text field from the SlackMessage pojo and set it as value with a schema determined by the type of the text field, so basically a String.
```
> curl -X GET http://localhost:8080/api/artifacts/
["mytopic-value"]
```
We have just one artifact in the registry and in the apicurio logs we should see just one single reference:
```
2020-10-27 06:30:08,175 WARN [io.api.reg.res.ArtifactsResourceImpl] (executor-thread-1) Artifact mytopic-value/1 not indexed, status: 0
```
We can also collect some version meta info for the schema
```
curl -X GET http://localhost:8080/api/artifacts/mytopic-value/versions/1/meta
{"version":1,"createdOn":160.11.5208148,"type":"KCONNECT","globalId":1,"state":"ENABLED","id":"mytopic-value"}
```
and some meta info too
```
curl -X GET http://localhost:8080/api/artifacts/mytopic-value/meta
{"createdOn":160.11.5208148,"modifiedOn":160.11.5208148,"id":"mytopic-value","version":1,"type":"KCONNECT","globalId":1,"state":"ENABLED"}
```
and finally the schema content
```
curl -X GET http://localhost:8080/api/artifacts/mytopic-value
"string"
```