blob: 9455ea41544d90c82a8a148d4a0aa20ff1047549 [file] [log] [blame]
= Camel-Kafka-connector Cron Source
This is an example for Camel-Kafka-connector Cron Source
== Standalone
=== Running Kafka
[source]
----
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
----
=== Download the connector package
Download the connector package tar.gz and extract the content to a directory. In this example we'll use `/home/oscerd/connectors/`
[source]
----
> cd /home/oscerd/connectors/
> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-cron-kafka-connector/0.11.5/camel-cron-kafka-connector-0.11.5-package.tar.gz
> untar.gz camel-cron-kafka-connector-0.11.5-package.tar.gz
----
In 0.11.5, you'll need to add also camel-quartz and quartz dependency.
[source]
----
> cd /home/oscerd/connectors/camel-cron-kafka-connector
> wget https://repo1.maven.org/maven2/org/apache/camel/camel-quartz/3.7.0/camel-quartz-3.7.0.jar
> wget https://repo1.maven.org/maven2/org/quartz-scheduler/quartz/2.3.2/quartz-2.3.2.jar
----
We are now ready
=== Configuring Kafka Connect
You'll need to set up the `plugin.path` property in your kafka
Open the `$KAFKA_HOME/config/connect-standalone.properties` and set the `plugin.path` property to your choosen location:
[source]
----
...
plugin.path=/home/oscerd/connectors
...
----
=== Setup the connectors
Open the Cron configuration file at `$EXAMPLES/cron/cron-source/config/CamelCronSourceConnector.properties`
[source]
----
name=CamelCronSourceConnector
connector.class=org.apache.camel.kafkaconnector.cron.CamelCronSourceConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
topics=cron-topic
camel.source.endpoint.schedule=0/5+*+*+*+*+?
camel.source.path.name=cron-timer
----
In this case we are using local unix socket.
For the containerId you'll need to use the value of the running debian container.
=== Running the example
Run the kafka connect with the Cron Source connector:
[source]
----
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $EXAMPLES/cron/cron-source/config/CamelCronSourceConnector.properties
----
We'll get multiple message with statistics
On a different terminal run the kafkacat consumer
[source]
----
./kafkacat -b localhost:9092 -t mytopic -f 'Headers: %h: Message value: %s\n'
% Auto-selecting Consumer mode (use -P or -C to override)
Headers: CamelHeader.fireTime=2021-01-21T13:15:30.255Z,CamelHeader.jobRunTime=9,CamelHeader.nextFireTime=2021-01-21T13:15:35.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:15:30.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:15:35.001Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:15:40.000Z,CamelHeader.previousFireTime=2021-01-21T13:15:30.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:15:35.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:15:40.002Z,CamelHeader.jobRunTime=0,CamelHeader.nextFireTime=2021-01-21T13:15:45.000Z,CamelHeader.previousFireTime=2021-01-21T13:15:35.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:15:40.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:15:45.002Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:15:50.000Z,CamelHeader.previousFireTime=2021-01-21T13:15:40.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:15:45.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:15:50.001Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:15:55.000Z,CamelHeader.previousFireTime=2021-01-21T13:15:45.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:15:50.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:15:55.001Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:16:00.000Z,CamelHeader.previousFireTime=2021-01-21T13:15:50.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:15:55.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:16:00.002Z,CamelHeader.jobRunTime=-1,CamelHeader.nextFireTime=2021-01-21T13:16:05.000Z,CamelHeader.previousFireTime=2021-01-21T13:15:55.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:00.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:16:05.001Z,CamelHeader.jobRunTime=-1,CamelHeader.nextFireTime=2021-01-21T13:16:10.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:00.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:05.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:16:10.002Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:16:15.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:05.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:10.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:16:15.002Z,CamelHeader.jobRunTime=0,CamelHeader.nextFireTime=2021-01-21T13:16:20.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:10.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:15.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:16:20.001Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:16:25.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:15.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:20.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:16:25.001Z,CamelHeader.jobRunTime=1,CamelHeader.nextFireTime=2021-01-21T13:16:30.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:20.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:25.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:16:30.001Z,CamelHeader.jobRunTime=-1,CamelHeader.nextFireTime=2021-01-21T13:16:35.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:25.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:30.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:16:35.001Z,CamelHeader.jobRunTime=0,CamelHeader.nextFireTime=2021-01-21T13:16:40.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:30.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:35.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
Headers: CamelHeader.fireTime=2021-01-21T13:16:40.001Z,CamelHeader.jobRunTime=0,CamelHeader.nextFireTime=2021-01-21T13:16:45.000Z,CamelHeader.previousFireTime=2021-01-21T13:16:35.000Z,CamelHeader.refireCount=0,CamelHeader.scheduledFireTime=2021-01-21T13:16:40.000Z,CamelHeader.triggerGroup=Camel_camel-1,CamelHeader.triggerName=cron-timer,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value:
% Reached end of topic mytopic [0] at offset 15
----