blob: 59af72f039a8a28d0130493fd0497bb5c7a3c54d [file] [log] [blame]
# Camel-Kafka-connector Telegram Sink
This is an example for Camel-Kafka-connector Telegram Sink
## Standalone
### What is needed
- A Telegram Bot
- A ChatId
### Setting up Telegram Bot and get Chat id
First of all create a telegram bot through the Bot Father channel in Telegram and take note of the BOT authorization token.
Create a group Chat with yourself and the BOT just created, or add the BOT to an existing group Chat.
If you are lazy you can invite also the RawDataBot BOT and take note of the group chatId.
In this example we created the ApacheCamelBot BOT.
### Running Kafka
```
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
```
### Setting up the needed bits and running the example
You'll need to setup the plugin.path property in your kafka
Open the `$KAFKA_HOME/config/connect-standalone.properties`
and set the `plugin.path` property to your choosen location
In this example we'll use `/home/oscerd/connectors/`
```
> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-telegram-kafka-connector/0.11.5/camel-telegram-kafka-connector-0.11.5-package.tar.gz
> untar.gz camel-telegram-kafka-connector-0.11.5-package.tar.gz
```
Now it's time to setup the connectors
Open the Telegram sink configuration file
```
name=CamelTelegramSinkConnector
connector.class=org.apache.camel.kafkaconnector.telegram.CamelTelegramSinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
topics=mytopic
camel.sink.endpoint.authorizationToken=<bot_authorization_token>
camel.sink.path.type=bots
camel.sink.endpoint.chatId=<group_chat_id>
```
Set the correct options in the file.
Now you can run the example
```
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelTelegramSinkConnector.properties
```
Check for messages in your chat
In another terminal, using kafkacat, you should be able to send messages
```
> echo "Hello from Apache Camel" | ./kafkacat -b localhost:9092 -t mytopic
% Auto-selecting Producer mode (use -P or -C to override)
```
and in your group chat you should see messages coming from the ApacheCamelBot or the name of your BOT.
## Openshift
### What is needed
- An Telegram BOT
- An Openshift instance
### Running Kafka using Strimzi Operator
First we install the Strimzi operator and use it to deploy the Kafka broker and Kafka Connect into our OpenShift project.
We need to create security objects as part of installation so it is necessary to switch to admin user.
If you use Minishift, you can do it with the following command:
[source,bash,options="nowrap"]
----
oc login -u system:admin
----
We will use OpenShift project `myproject`.
If it doesn't exist yet, you can create it using following command:
[source,bash,options="nowrap"]
----
oc new-project myproject
----
If the project already exists, you can switch to it with:
[source,bash,options="nowrap"]
----
oc project myproject
----
We can now install the Strimzi operator into this project:
[source,bash,options="nowrap",subs="attributes"]
----
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.20.1/strimzi-cluster-operator-0.20.1.yaml
----
Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed:
[source,bash,options="nowrap",subs="attributes"]
----
# Deploy a single node Kafka broker
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/kafka/kafka-persistent-single.yaml
# Deploy a single instance of Kafka Connect with no plug-in installed
oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/connect/kafka-connect-s2i-single-node-kafka.yaml
----
Optionally enable the possibility to instantiate Kafka Connectors through specific custom resource:
[source,bash,options="nowrap"]
----
oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true
----
### Add Camel Kafka connector binaries
Strimzi uses `Source2Image` builds to allow users to add their own connectors to the existing Strimzi Docker images.
We now need to build the connectors and add them to the image,
if you have built the whole project (`mvn clean package`) decompress the connectors you need in a folder (i.e. like `my-connectors/`)
so that each one is in its own subfolder
(alternatively you can download the latest officially released and packaged connectors from maven):
So we need to do something like this:
```
> cd my-connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-telegram-kafka-connector/0.11.5/camel-telegram-kafka-connector-0.11.5-package.tar.gz
> untar.gz camel-telegram-kafka-connector-0.11.5-package.tar.gz
```
Now we can start the build
[source,bash,options="nowrap"]
----
oc start-build my-connect-cluster-connect --from-dir=./my-connectors/ --follow
----
We should now wait for the rollout of the new image to finish and the replica set with the new connector to become ready.
Once it is done, we can check that the connectors are available in our Kafka Connect cluster.
Strimzi is running Kafka Connect in a distributed mode.
To check the available connector plugins, you can run the following command:
[source,bash,options="nowrap"]
----
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins
----
You should see something like this:
[source,json,options="nowrap"]
----
[{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"0.11.5"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"0.11.5"},{"class":"org.apache.camel.kafkaconnector.telegram.CamelTelegramSinkConnector","type":"sink","version":"0.11.5"},{"class":"org.apache.camel.kafkaconnector.telegram.CamelTelegramSourceConnector","type":"source","version":"0.11.5"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
----
### Set the AWS credential as secret (optional)
You can also set the aws creds option as secret, you'll need to edit the file config/aws2-s3-cred.properties with the correct credentials and then execute the following command
[source,bash,options="nowrap"]
----
oc create secret generic telegram-token --from-file=config/openshift/telegram-token.properties
----
Now we need to edit KafkaConnectS2I custom resource to reference the secret. For example:
[source,bash,options="nowrap"]
----
spec:
# ...
config:
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
#...
externalConfiguration:
volumes:
- name: telegram-token
secret:
secretName: telegram-token
----
In this way the secret telegram-token will be mounted as volume with path /opt/kafka/external-configuration/telegram-token/
### Create connector instance
Now we can create some instance of the Telegram sink connector:
[source,bash,options="nowrap"]
----
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF'
{
"name": "telegram-sink-connector",
"config": {
"connector.class": "org.apache.camel.kafkaconnector.telegram.CamelTelegramSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"topics": "telegram-topic",
"camel.sink.endpoint.authorizationToken": <bot_authorization_token>,
"camel.sink.path.type": "bots",
"camel.sink.endpoint.chatId: <group_chat_id>
}
}
EOF
----
Altenatively, if have enabled `use-connector-resources`, you can create the connector instance by creating a specific custom resource:
[source,bash,options="nowrap"]
----
oc apply -f - << EOF
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: telegram-sink-connector
namespace: myproject
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.camel.kafkaconnector.telegram.CamelTelegramSinkConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
topics: telegram-topic
camel.source.path.bucketNameOrArn: camel-kafka-connector
camel.source.maxPollDuration: 10000
camel.sink.endpoint.authorizationToken: <bot_authorization_token>
camel.sink.path.type: "bots"
camel.sink.endpoint.chatId: <group_chat_id>
EOF
----
If you followed the optional step for secret credentials you can run the following command:
[source,bash,options="nowrap"]
----
oc apply -f config/openshift/telegram-sink-connector.yaml
----
You can check the status of the connector using
[source,bash,options="nowrap"]
----
oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connectors/telegram-sink-connector/status
----
Check for message in your chat.
### Check received messages
You can run the Kafka console producer to see the messages received from the topic:
[source,bash,options="nowrap"]
----
```
oc exec -i -c kafka my-cluster-kafka-0 -- bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic telegram-topic
Kafka to S3 message 1
Kafka to S3 message 2
Kafka to S3 message 3
Kafka to S3 message 4
Kafka to S3 message 5
```
----