| # Camel-Kafka-connector AWS2 Lambda Sink |
| |
| This is an example for Camel-Kafka-connector AWS2-Lambda Sink |
| |
| ## Standalone |
| |
| ### What is needed |
| |
| - An AWS Lambda function |
| - The following project here: https://github.com/oscerd/lambda-ckc |
| |
| ### Running Kafka |
| |
| ``` |
| $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties |
| $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties |
| $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic |
| ``` |
| |
| ### Setting up the needed bits and running the example |
| |
| You'll need to setup the plugin.path property in your kafka |
| |
| Open the `$KAFKA_HOME/config/connect-standalone.properties` |
| |
| and set the `plugin.path` property to your choosen location |
| |
| In this example we'll use `/home/oscerd/connectors/` |
| |
| ``` |
| > cd /home/oscerd/connectors/ |
| > wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-lambda-kafka-connector/0.9.0/camel-aws2-lambda-kafka-connector-0.9.0-package.tar.gz |
| > untar.gz camel-aws2-sns-kafka-connector-0.9.0-package.tar.gz |
| ``` |
| |
| ### Deploying the AWS Lambda function |
| |
| ``` |
| > git clone git@github.com:oscerd/lambda-ckc.git |
| > cd lambda-ckc/ |
| > mvn clean install |
| ``` |
| |
| In your AWS console for Lambda service, create a new function called `hello-ckc` and deploy the `target/test-ckc.jar` |
| |
| Wait for the function to be up and running and eventually do a test through the lambda console. |
| |
| ## Setting up the connector |
| |
| Now it's time to setup the connectors |
| |
| Open the AWS2 Lambda configuration file |
| |
| ``` |
| name=CamelAWS2LambdaSinkConnector |
| connector.class=org.apache.camel.kafkaconnector.aws2lambda.CamelAws2lambdaSinkConnector |
| key.converter=org.apache.kafka.connect.storage.StringConverter |
| value.converter=org.apache.kafka.connect.converters.ByteArrayConverter |
| |
| topics=mytopic |
| |
| camel.sink.path.function=hello-ckc |
| camel.sink.endpoint.operation=invokeFunction |
| |
| camel.component.aws2-lambda.accessKey=xxxx |
| camel.component.aws2-lambda.secretKey=yyyy |
| camel.component.aws2-lambda.region=eu-west-1 |
| ``` |
| |
| and add the correct credentials for AWS. |
| |
| Now you can run the example |
| |
| ``` |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelAWS2LambdaSinkConnector.properties |
| ``` |
| |
| On a different terminal run the kafka-producer and send messages to your Kafka Broker. |
| |
| ``` |
| >$ bin/kafka-console-producer.sh --topic test12 --broker-list localhost:9092 |
| >"Kafka message sent!" |
| ``` |
| |
| In the AWS console, in the CloudWatch Log insights section under the monitoring tabs, you should get the information about this invokation and you should see in the logs |
| |
| ``` |
| 2020-08-07T12:32:12.282+02:00 START RequestId: 666d2257-1a22-406e-b112-2384ceac23a3 Version: $LATEST |
| 2020-08-07T12:32:12.321+02:00 Event received: Kafka message sent! |
| 2020-08-07T12:32:12.325+02:00 END RequestId: 666d2257-1a22-406e-b112-2384ceac23a3 |
| 2020-08-07T12:32:12.325+02:00 REPORT RequestId: 666d2257-1a22-406e-b112-2384ceac23a3 Duration: 43.33 ms Billed Duration: 100 ms Memory Size: 512 MB Max Memory Used: 75 MB Init Duration: 293.52 ms |
| ``` |
| |
| ## Openshift |
| |
| ### What is needed |
| |
| - An AWS Lambda function |
| - The following project here: https://github.com/oscerd/lambda-ckc |
| - An Openshift instance |
| |
| ### Running Kafka using Strimzi Operator |
| |
| First we install the Strimzi operator and use it to deploy the Kafka broker and Kafka Connect into our OpenShift project. |
| We need to create security objects as part of installation so it is necessary to switch to admin user. |
| If you use Minishift, you can do it with the following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc login -u system:admin |
| ---- |
| |
| We will use OpenShift project `myproject`. |
| If it doesn't exist yet, you can create it using following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc new-project myproject |
| ---- |
| |
| If the project already exists, you can switch to it with: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc project myproject |
| ---- |
| |
| We can now install the Strimzi operator into this project: |
| |
| [source,bash,options="nowrap",subs="attributes"] |
| ---- |
| oc apply -f https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.20.1/strimzi-cluster-operator-0.20.1.yaml |
| ---- |
| |
| Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed: |
| |
| [source,bash,options="nowrap",subs="attributes"] |
| ---- |
| # Deploy a single node Kafka broker |
| oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/kafka/kafka-persistent-single.yaml |
| |
| # Deploy a single instance of Kafka Connect with no plug-in installed |
| oc apply -f https://github.com/strimzi/strimzi-kafka-operator/raw/0.20.1/examples/connect/kafka-connect-s2i-single-node-kafka.yaml |
| ---- |
| |
| Optionally enable the possibility to instantiate Kafka Connectors through specific custom resource: |
| [source,bash,options="nowrap"] |
| ---- |
| oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true |
| ---- |
| |
| ### Add Camel Kafka connector binaries |
| |
| Strimzi uses `Source2Image` builds to allow users to add their own connectors to the existing Strimzi Docker images. |
| We now need to build the connectors and add them to the image, |
| if you have built the whole project (`mvn clean package`) decompress the connectors you need in a folder (i.e. like `my-connectors/`) |
| so that each one is in its own subfolder |
| (alternatively you can download the latest officially released and packaged connectors from maven): |
| |
| So we need to do something like this: |
| |
| ``` |
| > cd my-connectors/ |
| > wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws2-lambda-kafka-connector/0.9.0/camel-aws2-lambda-kafka-connector-0.9.0-package.tar.gz |
| > untar.gz camel-aws2-lambda-kafka-connector-0.9.0-package.tar.gz |
| ``` |
| |
| Now we can start the build |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc start-build my-connect-cluster-connect --from-dir=./my-connectors/ --follow |
| ---- |
| |
| We should now wait for the rollout of the new image to finish and the replica set with the new connector to become ready. |
| Once it is done, we can check that the connectors are available in our Kafka Connect cluster. |
| Strimzi is running Kafka Connect in a distributed mode. |
| |
| To check the available connector plugins, you can run the following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins |
| ---- |
| |
| You should see something like this: |
| |
| [source,json,options="nowrap"] |
| ---- |
| [{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"0.9.0"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"0.9.0"},{"class":"org.apache.camel.kafkaconnector.aws2lambda.CamelAws2lambdaSinkConnector","type":"sink","version":"0.9.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}] |
| ---- |
| |
| ### Set the AWS credential as secret (optional) |
| |
| You can also set the aws creds option as secret, you'll need to edit the file config/aws2-lambda-cred.properties with the correct credentials and then execute the following command |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc create secret generic aws2-lambda --from-file=config/openshift/aws2-lambda-cred.properties |
| ---- |
| |
| Now we need to edit KafkaConnectS2I custom resource to reference the secret. For example: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| spec: |
| # ... |
| config: |
| config.providers: file |
| config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider |
| #... |
| externalConfiguration: |
| volumes: |
| - name: aws-credentials |
| secret: |
| secretName: aws2-lambda |
| ---- |
| |
| In this way the secret aws2-lambda will be mounted as volume with path /opt/kafka/external-configuration/aws-credentials/ |
| |
| ### Create connector instance |
| |
| Now we can create some instance of the AWS2 Lambda sink connector: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -X POST \ |
| -H "Accept:application/json" \ |
| -H "Content-Type:application/json" \ |
| http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF' |
| { |
| "name": "lambda-sink-connector", |
| "config": { |
| "connector.class": "org.apache.camel.kafkaconnector.aws2lambda.CamelAws2lambdaSinkConnector", |
| "tasks.max": "1", |
| "key.converter": "org.apache.kafka.connect.storage.StringConverter", |
| "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", |
| "topics": "lambda-topic", |
| "camel.sink.path.function": "hello-ckc", |
| "camel.sink.endpoint.operation": "invokeFunction", |
| "camel.component.aws2-lambda.accessKey": "xxx", |
| "camel.component.aws2-lambda.secretKey": "xxx", |
| "camel.component.aws2-lambda.region": "xxx" |
| } |
| } |
| EOF |
| ---- |
| |
| Altenatively, if have enabled `use-connector-resources`, you can create the connector instance by creating a specific custom resource: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc apply -f - << EOF |
| apiVersion: kafka.strimzi.io/v1alpha1 |
| kind: KafkaConnector |
| metadata: |
| name: lambda-sink-connector |
| namespace: myproject |
| labels: |
| strimzi.io/cluster: my-connect-cluster |
| spec: |
| class: org.apache.camel.kafkaconnector.aws2lambda.CamelAws2lambdaSinkConnector |
| tasksMax: 1 |
| config: |
| key.converter: org.apache.kafka.connect.storage.StringConverter |
| value.converter: org.apache.kafka.connect.converters.ByteArrayConverter |
| topics: lambda-topic |
| camel.sink.path.function: hello-ckc |
| camel.sink.endpoint.operation: invokeFunction |
| camel.component.aws2-lambda.accessKey: xxxx |
| camel.component.aws2-lambda.secretKey: yyyy |
| camel.component.aws2-lambda.region: region |
| EOF |
| ---- |
| |
| If you followed the optional step for secret credentials you can run the following command: |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc apply -f config/openshift/aws2-lambda-sink.yaml |
| ---- |
| |
| You can check the status of the connector using |
| |
| [source,bash,options="nowrap"] |
| ---- |
| oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connectors/lambda-sink-connector/status |
| ---- |
| |
| Just connect to your AWS Console and check the content of camel-kafka-connector bucket. |
| |
| On a different terminal run the kafka-producer and send messages to your Kafka Broker. |
| |
| ``` |
| oc exec -i -c kafka my-cluster-kafka-0 -- bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic lambda-topic |
| Kafka message sent! |
| ``` |
| |
| In the AWS console, in the CloudWatch Log insights section under the monitoring tabs, you should get the information about this invokation and you should see in the logs |
| |
| ``` |
| 2020-08-07T12:32:12.282+02:00 START RequestId: 666d2257-1a22-406e-b112-2384ceac23a3 Version: $LATEST |
| 2020-08-07T12:32:12.321+02:00 Event received: Kafka message sent! |
| 2020-08-07T12:32:12.325+02:00 END RequestId: 666d2257-1a22-406e-b112-2384ceac23a3 |
| 2020-08-07T12:32:12.325+02:00 REPORT RequestId: 666d2257-1a22-406e-b112-2384ceac23a3 Duration: 43.33 ms Billed Duration: 100 ms Memory Size: 512 MB Max Memory Used: 75 MB Init Duration: 293.52 ms |
| ``` |
| |