| == Try it out locally |
| |
| Get a locally running kafka instance by following https://kafka.apache.org/quickstart[apache Kafka quickstart guide]. |
| |
| ==== This usually boils down to: |
| .Set KAFKA_HOME |
| [source,bash] |
| ---- |
| export KAFKA_HOME=<your kafka install dir> |
| ---- |
| |
| .Start Zookeeper cluster |
| [source,bash] |
| ---- |
| $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties |
| ---- |
| |
| .Start Kafka broker |
| [source,bash] |
| ---- |
| $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties |
| ---- |
| |
| .Create "mytopic" topic |
| [source,bash] |
| ---- |
| $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic |
| ---- |
| |
| ==== Then run Camel kafka connectors source and/or syncs: |
| [NOTE] |
| ==== |
| In order to run more than one instance of a standalone kafka connect on the same machine you neet to duplicate `$KAFKA_HOME/config/connect-standalone.properties` file changing the http port used for each instance: |
| [source,bash] |
| ---- |
| cp $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-standalone2.properties |
| |
| echo rest.port=<your unused port number> >> $KAFKA_HOME/config/connect-standalone2.properties |
| ---- |
| ==== |
| |
| .Run the default sink, just a camel logger: |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelSinkConnector.properties |
| ---- |
| |
| .Run the default source, just a camel timer: |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelSourceConnector.properties |
| ---- |
| |
| .Run the AWS Kinesis source: |
| You can adjust properties in `examples/CamelAWSKinesisSourceConnector.properties` for example configuring access key, secret key and region |
| by adding `camel.component.aws-kinesis.configuration.access-key=youraccesskey`, `camel.component.aws-kinesis.configuration.secret-key=yoursecretkey` and `camel.component.aws-kinesis.configuration.region=yourregion` |
| |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSKinesisSourceConnector.properties |
| ---- |
| |
| .Run the AWS SQS sink: |
| You can adjust properties in `examples/CamelAWSSQSSinkConnector.properties` for example configuring access key, secret key and region |
| by adding `camel.component.aws-sqs.configuration.access-key=youraccesskey`, `camel.component.aws-sqs.configuration.secret-key=yoursecretkey` and `camel.component.aws-sqs.configuration.region=yourregion` |
| |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSSQSSinkConnector.properties |
| ---- |
| |
| .Run the AWS SQS source: |
| You can adjust properties in `examples/CamelAWSSQSSourceConnector.properties` for example configuring access key, secret key and region |
| by adding `camel.component.aws-sqs.configuration.access-key=youraccesskey`, `camel.component.aws-sqs.configuration.secret-key=yoursecretkey` and `camel.component.aws-sqs.configuration.region=yourregion` |
| |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSSQSSourceConnector.properties |
| ---- |
| |
| .Run the AWS SNS sink: |
| You can adjust properties in `examples/CamelAWSSNSSinkConnector.properties` for example configuring access key, secret key and region |
| by adding `camel.component.aws-sns.configuration.access-key=youraccesskey`, `camel.component.aws-sns.configuration.secret-key=yoursecretkey` and `camel.component.aws-sns.configuration.region=yourregion` |
| |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSSNSSinkConnector.properties |
| ---- |
| |
| .Run the AWS S3 source: |
| You can adjust properties in `examples/CamelAWSS3SourceConnector.properties` for example configuring access key, secret key and region |
| by adding `camel.component.aws-s3.configuration.access-key=youraccesskey`, `camel.component.aws-s3.configuration.secret-key=yoursecretkey` and `camel.component.aws-s3.configuration.region=yourregion` |
| Here you also have a converter specific for S3Object. |
| |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelAWSS3SourceConnector.properties |
| ---- |
| |
| .Run the cassandraql source: |
| |
| To run this example you'll need a bit more work: |
| |
| First you'll need to run a cassandra instance: |
| |
| [source,bash] |
| ---- |
| docker run --name master_node --env MAX_HEAP_SIZE='800M' -dt oscerd/cassandra |
| ---- |
| |
| Check everything is fine: |
| |
| [source,bash] |
| ---- |
| docker exec -ti master_node /opt/cassandra/bin/nodetool status |
| Datacenter: datacenter1 |
| ======================= |
| Status=Up/Down |
| |/ State=Normal/Leaving/Joining/Moving |
| -- Address Load Tokens Owns (effective) Host ID Rack |
| UN 172.17.0.2 251.32 KiB 256 100.0% 5126aaad-f143-43e9-920a-0f9540a93967 rack1 |
| ---- |
| |
| You'll need a local installation of cassandra, in particular the 3.11.4. |
| Now we can populate the database: |
| |
| [source,bash] |
| ---- |
| <LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node) |
| ---- |
| |
| and run the script: |
| |
| [source,bash] |
| ---- |
| create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3}; |
| use test; |
| create table users ( id int primary key, name text ); |
| insert into users (id,name) values (1, 'oscerd'); |
| quit; |
| ---- |
| |
| The output of the following command should be used in the configuration of CamelCassandraQLSourceConnector.properties |
| |
| [source,bash] |
| ---- |
| <LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node) |
| ---- |
| |
| in particular it should be used as address instead of localhost in the `camel.source.url` |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelCassandraQLSourceConnector.properties |
| ---- |
| |
| .Run the cassandraql sink: |
| |
| To run this example you'll need a bit more work: |
| |
| First you'll need to run a cassandra instance: |
| |
| [source,bash] |
| ---- |
| docker run --name master_node --env MAX_HEAP_SIZE='800M' -dt oscerd/cassandra |
| ---- |
| |
| Check everything is fine: |
| |
| [source,bash] |
| ---- |
| docker exec -ti master_node /opt/cassandra/bin/nodetool status |
| Datacenter: datacenter1 |
| ======================= |
| Status=Up/Down |
| |/ State=Normal/Leaving/Joining/Moving |
| -- Address Load Tokens Owns (effective) Host ID Rack |
| UN 172.17.0.2 251.32 KiB 256 100.0% 5126aaad-f143-43e9-920a-0f9540a93967 rack1 |
| ---- |
| |
| You'll need a local installation of cassandra, in particular the 3.11.4. |
| Now we can populate the database: |
| |
| [source,bash] |
| ---- |
| <LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node) |
| ---- |
| |
| and run the script: |
| |
| [source,bash] |
| ---- |
| create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3}; |
| use test; |
| create table users (id uuid primary key, name text ); |
| insert into users (id,name) values (now(), 'oscerd'); |
| quit; |
| ---- |
| |
| The output of the following command should be used in the configuration of CamelCassandraQLSourceConnector.properties |
| |
| [source,bash] |
| ---- |
| <LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node) |
| ---- |
| |
| in particular it should be used as address instead of localhost in the `camel.sink.url` |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelCassandraQLSinkConnector.properties |
| ---- |
| |
| .Run the Elasticsearch sink: |
| You can adjust properties in `examples/CamelElasticSearchSinkConnector.properties` for example configuring the hostAddresses. |
| |
| For the index operation, it might be necessary to provide or implement a `transformer`. A sample configuration would be similar to the one below: |
| [source,bash] |
| ---- |
| transforms=ElasticSearchTransformer |
| ---- |
| |
| This is the sample Transformer used in the integration test code that transforms Kafka's ConnectRecord to a Map: |
| [source,bash] |
| ---- |
| transforms.ElasticSearchTransformer.type=org.apache.camel.kafkaconnector.sink.elasticsearch.transforms.ConnectRecordValueToMapTransformer |
| ---- |
| |
| This is a configuration for the sample transformer that defines the key used in the map: |
| [source,bash] |
| ---- |
| transforms.ElasticSearchTransformer.key=MyKey |
| ---- |
| |
| When the configuration is ready run the sink with: |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelElasticSearchSinkConnector.properties |
| ---- |
| |
| .Run the file sink, just a camel file appending to /tmp/kafkaconnect.txt: |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelFileSinkConnector.properties |
| ---- |
| |
| .Run the http sink: |
| You can adjust properties in `examples/CamelHttpSinkConnector.properties` for example configuring the called url. |
| |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelHttpSinkConnector.properties |
| ---- |
| |
| .Run the JMS source: |
| You can adjust properties in `examples/CamelJmsSourceConnector.properties` for example configuring username and password |
| by adding `camel.component.sjms2.connection-factory.userName=yourusername` and `camel.component.sjms2.connection-factory.password=yourpassword` |
| |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelJmsSourceConnector.properties |
| ---- |
| |
| .Run the JMS sink: |
| You can adjust properties in `examples/CamelJmsSourceConnector.properties` for example configuring username and password |
| by adding `camel.component.sjms2.connection-factory.userName=yourusername` and `camel.component.sjms2.connection-factory.password=yourpassword` |
| |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelJmsSinkConnector.properties |
| ---- |
| |
| .Run the telegram source: |
| Change your telegram bot token in `examples/CamelTelegramSourceConnector.properties` |
| |
| [source,bash] |
| ---- |
| export CLASSPATH="$(find core/target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')" |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties examples/CamelTelegramSourceConnector.properties |
| ---- |
| |
| ==== Listen or produce from a Kafka topic using Kafka utilities: |
| |
| .Run an Kafka Consumer |
| [source,bash] |
| ---- |
| $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning |
| ---- |
| |
| .Run an interactive CLI kafka producer |
| [source,bash] |
| ---- |
| $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic |
| ---- |