blob: 8a82d98b31a5818b27badd0c315d6a0d45b75b2f [file] [log] [blame]
= Camel Debezium example
=== Introduction
An example which shows how to integrate Camel with Debezium and sink everything into a target database.
This project consists of the following examples:
1. Send events using Debezium component to Kinesis .
2. Example how data can be sinked into Cassandra that produced by Debezium.
== Prerequisites
=== MySQL
In order to stream changes from MySQL, you will need to have https://debezium.io/documentation/reference/0.9/connectors/mysql.html#enabling-the-binlog[_row-level_] binary binlog enabled. However,
for the sake of this example, we will use the following docker image which is setup with row enabled binary logs and some sample data:
```
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9
```
The above docker image will start a MySQL server exposed to port `3306` with root password set.
=== Amazon Kinesis
Since we will use Kinesis to stream changes from Debezium as an example, you need to create a stream called `camel-debezium-example` in `eu-central-1`. As well, you will need to create AWS access and secret keys, once you are done from creating the keys, update the following properties in `src/main/resources/application.properties`:
```
kinesis.accessKey ={{generated-access-key}}
kinesis.secretKey = {{generated-secret-key}}
```
=== Cassandra
In this example, we will use Cassandra to sink the events into, you will need to either to download and run Cassandra on your machine or you can simply use the following docker image that exposes a Cassandra instance on port 9042:
```
docker run -p 9042:9042 --rm --name cassandra -d cassandra
```
Once you have your Cassandra instance, using your favorite CQL gui or even https://docs.datastax.com/en/archived/cql/3.3/cql/cql_reference/cqlsh.html[cqlsh], you will need to execute the following CQL commands to prepare the necessary keyspace and table for our example:
```
CREATE KEYSPACE dbzSink WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};
USE dbzSink;
CREATE TABLE products (
id int PRIMARY KEY,
name varchar,
description varchar,
weight float
);
```
*Note:* We will stream a table called `product` from MySQL docker image which is already set. Most of the configurations that will get you started with this example are already set in `application.properties`.
=== Build
Due to licensing issues, you will need to add the dependency for `mysql-conenctor-java`, just add the following to your POM file:
[source,xml]
------------------------------------------------------------
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
------------------------------------------------------------
You will need to compile this example first:
$ mvn clean compile
=== Run
Run the Kinesis producer first
$ mvn compile exec:java -Pkinesis-producer
Run the Debezium consumer in the seperate shell
$ mvn compile exec:java -Pdebezium-consumer
Initially, you will Debezium will perform a snapshot of the whitelisted tables per `application.properties`, hence you should expect
the data to be replicated into Cassandra. Once the snapshot mode is done, you can try to insert a new row, update fields, delete .. etc on MySQL whitelisted table(s), you should see
the changes reflecting on Cassandra as well, you can verify that by running the following query on cqlsh:
```
select * from dbzSink.products;
```
=== Configuration
You can configure the details in the file:
`src/main/resources/application.properties`
You can enable verbose logging by adjusting the `src/main/resources/log4j2.properties`
file as documented in the file.
=== Help and contributions
If you hit any problem using Camel or have some feedback,
then please https://camel.apache.org/support.html[let us know].
We also love contributors,
so https://camel.apache.org/contributing.html[get involved] :-)
The Camel riders!