| = Camel-Kafka-connector JDBC Sink |
| |
| This is an example for Camel-Kafka-connector JDBC Sink |
| |
| == Standalone |
| |
| === What is needed |
| |
| - A running postgresql instance through docker |
| - Postgresql Jdbc Driver |
| |
| === Running Kafka |
| |
| [source] |
| ---- |
| $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties |
| $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties |
| $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic |
| ---- |
| |
| === Download the connector package |
| |
| Download the connector package tar.gz and extract the content to a directory. In this example we'll use `/home/oscerd/connectors/` |
| |
| [source] |
| ---- |
| > cd /home/oscerd/connectors/ |
| > wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-jdbc-kafka-connector/0.11.0/camel-jdbc-kafka-connector-0.11.0-package.tar.gz |
| > untar.gz camel-jdbc-kafka-connector-0.11.0-package.tar.gz |
| ---- |
| |
| There is also the need of the driver for this example |
| |
| [source] |
| ---- |
| > cd /home/oscerd/connectors/camel-jdbc-kafka-connector/ |
| > wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.14/postgresql-42.2.14.jar |
| ---- |
| |
| === Configuring Kafka Connect |
| |
| You'll need to set up the `plugin.path` property in your kafka |
| |
| Open the `$KAFKA_HOME/config/connect-standalone.properties` and set the `plugin.path` property to your choosen location: |
| |
| [source] |
| ---- |
| ... |
| plugin.path=/home/oscerd/connectors |
| ... |
| ---- |
| |
| === Setup the docker image |
| |
| We'll need a full running Postgresql instance. |
| |
| First step is running it: |
| |
| [source] |
| ---- |
| > docker run --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -d postgres |
| 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb |
| ---- |
| |
| Take note of the container id. |
| We need now to create the table we'll use: the table is the following |
| |
| [source] |
| ---- |
| CREATE TABLE accounts ( |
| user_id serial PRIMARY KEY, |
| username VARCHAR ( 50 ) UNIQUE NOT NULL, |
| city VARCHAR ( 50 ) NOT NULL |
| ); |
| ---- |
| |
| We are now ready to create the table |
| |
| [source] |
| ---- |
| > docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres |
| psql (13.0 (Debian 13.0-1.pgdg100+1)) |
| Type "help" for help. |
| |
| postgres=# CREATE TABLE accounts ( |
| postgres(# user_id serial PRIMARY KEY, |
| postgres(# username VARCHAR ( 50 ) UNIQUE NOT NULL, |
| postgres(# city VARCHAR ( 50 ) NOT NULL |
| postgres(# ); |
| ---- |
| |
| We need to take note also of the container ip |
| |
| ---- |
| > docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb |
| 172.17.0.2 |
| ---- |
| |
| === Setup the connectors |
| |
| Open the JDBC configuration file at `$EXAMPLES/jdbc/jdbc-sink/config/CamelJdbcSinkConnector.properties` |
| |
| [source] |
| ---- |
| name=CamelJdbcSinkConnector |
| connector.class=org.apache.camel.kafkaconnector.jdbc.CamelJdbcSinkConnector |
| tasks.max=1 |
| |
| key.converter=org.apache.kafka.connect.storage.StringConverter |
| value.converter=org.apache.kafka.connect.storage.StringConverter |
| |
| topics=mytopic |
| |
| camel.component.jdbc.dataSource.user=postgres |
| camel.component.jdbc.dataSource.password=mysecretpassword |
| camel.component.jdbc.dataSource.serverName=172.17.0.2 |
| camel.component.jdbc.dataSource=#class:org.postgresql.ds.PGSimpleDataSource |
| camel.sink.path.dataSourceName=default |
| camel.sink.endpoint.useHeadersAsParameters=true |
| ---- |
| |
| and add the correct IP for the container. |
| |
| === Running the example |
| |
| Run the kafka connect with the JDBC Sink connector: |
| |
| [source] |
| ---- |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $EXAMPLES/jdbc/jdbc-sink/config/CamelJdbcSinkConnector.properties |
| ---- |
| |
| On a different terminal run the kafkacat producer and send the following message |
| |
| [source] |
| ---- |
| > echo "INSERT INTO accounts (username,city) VALUES (:?username,:?city)" | ./kafkacat -P -b localhost:9092 -t mytopic -H "CamelHeader.username=andrea" -H "CamelHeader.city=Roma" |
| > echo "INSERT INTO accounts (username,city) VALUES (:?username,:?city)" | ./kafkacat -P -b localhost:9092 -t mytopic -H "CamelHeader.username=John" -H "CamelHeader.city=New York" |
| ---- |
| |
| Now you can search through the psql command the record inserted |
| |
| [source] |
| ---- |
| > docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres |
| psql (13.0 (Debian 13.0-1.pgdg100+1)) |
| Type "help" for help. |
| |
| postgres=# select * from accounts; |
| user_id | username | city |
| ---------+----------+---------- |
| 1 | andrea | Roma |
| 2 | John | New York |
| (2 rows) |
| ---- |