blob: 2daaa82f67bc42a3289f441e723d562e0bbba4ad [file] [log] [blame]
[[Tryitoutlocally-Tryitoutlocally]]
= Try it out locally
[[Tryitoutlocally-RunKafka]]
== Run Kafka
First, get a locally running kafka instance by following Apache Kafka https://kafka.apache.org/quickstart[quickstart guide]. This usually boils down to:
.Set KAFKA_HOME
[source,bash]
----
export KAFKA_HOME=<your kafka install dir>
----
.Start Zookeeper cluster
[source,bash]
----
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
----
.Start Kafka broker
[source,bash]
----
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
----
.Create "mytopic" topic
[source,bash]
----
$KAFKA_HOME/bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic mytopic
----
Next, run Camel kafka connectors source and/or syncs:
[NOTE]
====
In order to run more than one instance of a standalone kafka connect on the same machine you neet to duplicate `$KAFKA_HOME/config/connect-standalone.properties` file changing the http port used for each instance:
[source,bash]
----
cp $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-standalone2.properties
echo rest.port=<your unused port number> >> $KAFKA_HOME/config/connect-standalone2.properties
----
====
You can use these Kafka utilities to listen or produce from a Kafka topic:
.Run an Kafka Consumer
[source,bash]
----
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning
----
.Run an interactive CLI kafka producer
[source,bash]
----
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
----
[[Tryitoutlocally-TryExamples]]
== Try some examples
For the following examples you need to fetch the `camel-kafka-connector` project and https://github.com/apache/camel-kafka-connector/blob/master/README.adoc#build-the-project[build] it locally by running `./mvnw package` from the root of the project. Look into the `config` and `examples` directories for the configuration files (`*.properties`) of the examples showcased here.
First you need to set the `CLASSPATH` environment variable to include the `jar` files from the `core/target/camel-kafka-connector-[version]-package/share/java/` directory. On UNIX systems this can be done by running:
[source,bash]
----
export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"
----
[[Tryitoutlocally-SimpleLogger]]
=== Simple logger (sink)
This is an example of a _sink_ that logs messages consumed from `mytopic`.
.Run the default sink, just a camel logger:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelSinkConnector.properties
----
[[Tryitoutlocally-Timer]]
=== Timer (source)
This is an example of a _source_ that produces a message every second to `mytopic`.
.Run the default source, just a camel timer:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelSourceConnector.properties
----
[[Tryitoutlocally-AwsKinesis]]
=== AWS Kinesis (source)
This example consumes from AWS Kinesis data stream and transfers the payload to `mytopic` topic in Kafka.
Adjust properties in `examples/CamelAWSKinesisSourceConnector.properties` for your environment, you need to configure access key, secret key and region by setting `camel.component.aws-kinesis.configuration.access-key=youraccesskey`, `camel.component.aws-kinesis.configuration.secret-key=yoursecretkey` and `camel.component.aws-kinesis.configuration.region=yourregion`.
.Run the AWS Kinesis source:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSKinesisSourceConnector.properties
----
[[Tryitoutlocally-AWSSQSSink]]
=== AWS SQS (sink)
This example consumes from Kafka topic `mytopic` and transfers the payload to AWS SQS.
Adjust properties in `examples/CamelAWSSQSSinkConnector.properties` for your environment, you need to configure access key, secret key and region by setting `camel.component.aws-sqs.configuration.access-key=youraccesskey`, `camel.component.aws-sqs.configuration.secret-key=yoursecretkey` and `camel.component.aws-sqs.configuration.region=yourregion`
.Run the AWS SQS sink:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSSQSSinkConnector.properties
----
[[Tryitoutlocally-AWSSQSSource]]
=== AWS SQS (source)
This example consumes from AWS SQS queue `mysqs` and transfers the payload to `mytopic` topic in Kafka.
Adjust properties in `examples/CamelAWSSQSSourceConnector.properties` for your environment, you need to configure access key, secret key and region by setting `camel.component.aws-sqs.configuration.access-key=youraccesskey`, `camel.component.aws-sqs.configuration.secret-key=yoursecretkey` and `camel.component.aws-sqs.configuration.region=yourregion`
.Run the AWS SQS source:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSSQSSourceConnector.properties
----
[[Tryitoutlocally-AWSSNSSink]]
=== AWS SNS (sink)
This example consumes from `mytopic` Kafka topic and transfers the payload to AWS SNS `topic` topic.
Adjust properties in `examples/CamelAWSSNSSinkConnector.properties` for your environment, you need to configure access key, secret key and region by setting `camel.component.aws-sns.configuration.access-key=youraccesskey`, `camel.component.aws-sns.configuration.secret-key=yoursecretkey` and `camel.component.aws-sns.configuration.region=yourregion`
.Run the AWS SNS sink:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSSNSSinkConnector.properties
----
[[Tryitoutlocally-AWSSNSSource]]
=== AWS S3 (source)
This example fetches objects from AWS S3 in the `camel-kafka-connector` bucket and transfers the payload to `mytopic` Kafka topic. This example shows how to implement a custom converter converting from bytes received from S3 to Kafka's `SchemaAndValue`.
Adjust properties in `examples/CamelAWSS3SourceConnector.properties` for your environment, you need to configure access key, secret key and region by adding `camel.component.aws-s3.configuration.access-key=youraccesskey`, `camel.component.aws-s3.configuration.secret-key=yoursecretkey` and `camel.component.aws-s3.configuration.region=yourregion`
.Run the AWS S3 source:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSS3SourceConnector.properties
----
[[Tryitoutlocally-CassandraQL]]
=== Apache Cassandra
This examples require a running Cassandra instance, for simplicity the steps below show how to start Cassandra using Docker. First you'll need to run a Cassandra instance:
[source,bash]
----
docker run --name master_node --env MAX_HEAP_SIZE='800M' -dt oscerd/cassandra
----
Next, check and make sure Cassandra is running:
[source,bash]
----
docker exec -ti master_node /opt/cassandra/bin/nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 172.17.0.2 251.32 KiB 256 100.0% 5126aaad-f143-43e9-920a-0f9540a93967 rack1
----
To populate the database using to the `cqlsh` tool, you'll need a local installation of Cassandra. Download and extract the Apache Cassandra distribution to a directory. We reference the Cassandra installation directory with `LOCAL_CASSANDRA_HOME`. Here we use version 3.11.4 to connect to the Cassandra instance we started using Docker.
[source,bash]
----
<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node)
----
Next, execute the following script to create keyspace `test`, the table `users` and insert one row into it.
[source,bash]
----
create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3};
use test;
create table users ( id int primary key, name text );
insert into users (id,name) values (1, 'oscerd');
quit;
----
In the configuration `.properties` file we use below the IP address of the Cassandra master node needs to be configured, replace the value `172.17.0.2` in the `camel.source.url` or `localhost` in `camel.sink.url` configuration property with the IP of the master node obtained from Docker. Each example uses a different `.properties` file shown in the command line to run the example.
[source,bash]
----
docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node
----
[[Tryitoutlocally-CassandraQLSource]]
==== Apache Cassandra (source)
This example polls Cassandra via CSQL (`select * from users`) in the `test` keyspace and transfers the result to the `mytopic` Kafka topic.
.Run the Cassandra CQL source:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelCassandraQLSourceConnector.properties
----
[[Tryitoutlocally-CassandraQLSink]]
==== Apache Cassandra (sink)
This example adds data to the `users` table in Cassandra from the data consumed from the `mytopic` Kafka topic. Notice how the `name` column is populated from the Kafka message using CQL comand `insert into users...`.
.Run the Cassandra CQL sink:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelCassandraQLSinkConnector.properties
----
[[Tryitoutlocally-ElasticsearchSink]]
=== Elasticsearch (sink)
This example passes data from `mytopic` Kafka topic to `sampleIndexName` index in Elasticsearch. Adjust properties in `examples/CamelElasticSearchSinkConnector.properties` to reflect your environment, for example change the `hostAddresses` to a valid Elasticsearch instance hostname and port.
For the index operation, it might be necessary to provide or implement a `transformer`. A sample configuration would be similar to the one below:
[source,bash]
----
transforms=ElasticSearchTransformer
----
This is the sample Transformer used in the integration test code that transforms Kafka's ConnectRecord to a Map:
[source,bash]
----
transforms.ElasticSearchTransformer.type=org.apache.camel.kafkaconnector.sink.elasticsearch.transforms.ConnectRecordValueToMapTransformer
----
This is a configuration for the sample transformer that defines the key used in the map:
[source,bash]
----
transforms.ElasticSearchTransformer.key=MyKey
----
When the configuration is ready run the sink with:
.Run the Elasticsearch sink:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelElasticSearchSinkConnector.properties
----
[[Tryitoutlocally-FileSink]]
=== File (sink)
This example appends data from `mytopic` Kafka topic to a file in `/tmp/kafkaconnect.txt`.
.Run the file sink:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelFileSinkConnector.properties
----
[[Tryitoutlocally-HttpSink]]
=== HTTP (sink)
This example sends data from `mytopic` Kafka topic to a HTTP service. Adjust properties in `examples/CamelHttpSinkConnector.properties` for your environment, for example configuring the `camel.sink.url`.
.Run the http sink:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelHttpSinkConnector.properties
----
[[Tryitoutlocally-JMSSource]]
=== JMS (source)
This example receives messages from a JMS queue named `myqueue` and transfers them to `mytopic` Kafka topic. In this example ActiveMQ is used and it's configured to connect to the broker running on `localhost:61616`. Adjust properties in `examples/CamelJmsSourceConnector.properties` for your environment, for example configuring username and password by setting `camel.component.sjms2.connection-factory.userName=yourusername` and `camel.component.sjms2.connection-factory.password=yourpassword` or change the `camel.component.sjms2.connection-factory` and `camel.component.sjms2.connection-factory.brokerURL` to reflect your JMS implementation and URL.
.Run the JMS source:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelJmsSourceConnector.properties
----
[[Tryitoutlocally-JMSSink]]
=== JMS (sink)
This example receives messages from `mytopic` Kafka topic and transfers them to JMS queue named `myqueue`. In this example ActiveMQ is used and it's configured to connect to the broker running on `localhost:61616`. You can adjust properties in `examples/CamelJmsSinkConnector.properties` for your environment, for example configure username and password by adding `camel.component.sjms2.connection-factory.userName=yourusername` and `camel.component.sjms2.connection-factory.password=yourpassword` or change the `camel.component.sjms2.connection-factory` and `camel.component.sjms2.connection-factory.brokerURL` to reflect your JMS implementation and URL.
.Run the JMS sink:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelJmsSinkConnector.properties
----
[[Tryitoutlocally-TelegramSource]]
=== Telegram (source)
This example transfers messages sent to Telegram bot to the `mytopic` Kafka topic. Adjust to set telegram bot token in `examples/CamelTelegramSourceConnector.properties` to reflect your bot's token.
.Run the telegram source:
[source,bash]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelTelegramSourceConnector.properties
----