= Camel-Kafka-connector SQL Sink
This is an example for Camel-Kafka-connector SQL Sink
== Standalone
=== What is needed
- A running postgresql instance through docker
- Postgresql Jdbc Driver
=== Running Kafka
$KAFKA_HOME/bin/ $KAFKA_HOME/config/
$KAFKA_HOME/bin/ $KAFKA_HOME/config/
$KAFKA_HOME/bin/ --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
=== Download the connector package
Download the connector package tar.gz and extract the content to a directory. In this example we'll use `/home/oscerd/connectors/`
> cd /home/oscerd/connectors/
> wget
> untar.gz camel-sql-kafka-connector-0.9.0-package.tar.gz
There is also the need of the driver for this example
> cd /home/oscerd/connectors/camel-sql-kafka-connector/
> wget
=== Configuring Kafka Connect
You'll need to set up the `plugin.path` property in your kafka
Open the `$KAFKA_HOME/config/` and set the `plugin.path` property to your choosen location:
=== Setup the docker image
We'll need a full running Postgresql instance.
First step is running it:
> docker run --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -d postgres
Take note of the container id.
We need now to create the table we'll use: the table is the following
CREATE TABLE accounts (
user_id serial PRIMARY KEY,
username VARCHAR ( 50 ) UNIQUE NOT NULL,
city VARCHAR ( 50 ) NOT NULL
We are now ready to create the table
> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
psql (13.0 (Debian 13.0-1.pgdg100+1))
Type "help" for help.
postgres=# CREATE TABLE accounts (
postgres(# user_id serial PRIMARY KEY,
postgres(# username VARCHAR ( 50 ) UNIQUE NOT NULL,
postgres(# city VARCHAR ( 50 ) NOT NULL
postgres(# );
We need to take note also of the container ip
> docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb
=== Setup the connectors
Open the SQL configuration file at `$EXAMPLES/sql/sql-sink/config/`
camel.sink.path.query=INSERT INTO accounts (username,city) VALUES (:#username,:#city)
and add the correct IP for the container.
=== Running the example
Run the kafka connect with the SQL Sink connector:
$KAFKA_HOME/bin/ $KAFKA_HOME/config/ $EXAMPLES/sql/sql-sink/config/
On a different terminal run the kafkacat producer and send the following message
> echo "test" | ./kafkacat -P -b localhost:9092 -t mytopic -H "CamelHeader.username=andrea" -H ""
> echo "test" | ./kafkacat -P -b localhost:9092 -t mytopic -H "CamelHeader.username=John" -H " York"
Now you can search through the psql command the record inserted
> docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres
psql (13.0 (Debian 13.0-1.pgdg100+1))
Type "help" for help.
postgres=# select * from accounts;
user_id | username | city
1 | andrea | Roma
2 | John | New York
(2 rows)