| = Camel-Kafka-connector PGEvent Source |
| |
| This is an example for Camel-Kafka-connector PGEvent Source |
| |
| == Standalone |
| |
| === What is needed |
| |
| - A running postgresql instance through docker |
| |
| === Running Kafka |
| |
| [source] |
| ---- |
| $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties |
| $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties |
| $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic |
| ---- |
| |
| === Download the connector package |
| |
| Download the connector package tar.gz and extract the content to a directory. In this example we'll use `/home/oscerd/connectors/` |
| |
| [source] |
| ---- |
| > cd /home/oscerd/connectors/ |
| > wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-pgevent-kafka-connector/0.10.0/camel-pgevent-kafka-connector-0.10.0-package.tar.gz |
| > untar.gz camel-pgevent-kafka-connector-0.10.0-package.tar.gz |
| ---- |
| |
| === Configuring Kafka Connect |
| |
| You'll need to set up the `plugin.path` property in your kafka |
| |
| Open the `$KAFKA_HOME/config/connect-standalone.properties` and set the `plugin.path` property to your choosen location: |
| |
| [source] |
| ---- |
| ... |
| plugin.path=/home/oscerd/connectors |
| ... |
| ---- |
| |
| === Setup the docker image |
| |
| We'll need a full running Postgresql instance. |
| |
| First step is running it: |
| |
| [source] |
| ---- |
| > docker run --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -d postgres |
| 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb |
| ---- |
| |
| Take note of the container id. |
| We need now to create the table we'll use: the table is the following |
| |
| [source] |
| ---- |
| CREATE TABLE accounts ( |
| user_id serial PRIMARY KEY, |
| username VARCHAR ( 50 ) UNIQUE NOT NULL, |
| city VARCHAR ( 50 ) NOT NULL |
| ); |
| ---- |
| |
| We are now ready to create the table |
| |
| [source] |
| ---- |
| > docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres |
| psql (13.0 (Debian 13.0-1.pgdg100+1)) |
| Type "help" for help. |
| |
| postgres=# CREATE TABLE accounts ( |
| postgres(# user_id serial PRIMARY KEY, |
| postgres(# username VARCHAR ( 50 ) UNIQUE NOT NULL, |
| postgres(# city VARCHAR ( 50 ) NOT NULL |
| postgres(# ); |
| ---- |
| |
| Now we need to create a trigger with a notify command for the channel we'll choose. In our case we'll call the channel like the table we created, so 'accounts' |
| |
| The Trigger is the following |
| |
| [source] |
| ---- |
| CREATE OR REPLACE FUNCTION row_inserted() |
| RETURNS trigger AS $$ |
| DECLARE |
| BEGIN |
| PERFORM pg_notify( |
| 'accounts', |
| row_to_json(NEW)::text); |
| RETURN NEW; |
| END; |
| $$ LANGUAGE plpgsql; |
| |
| CREATE TRIGGER row_inserted |
| AFTER INSERT ON accounts |
| FOR EACH ROW |
| EXECUTE PROCEDURE row_inserted(); |
| ---- |
| |
| We need to execute it in our psql session |
| |
| [source] |
| ---- |
| > docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres |
| psql (13.0 (Debian 13.0-1.pgdg100+1)) |
| Type "help" for help. |
| |
| postgres=# CREATE OR REPLACE FUNCTION row_inserted() |
| postgres-# RETURNS trigger AS $$ |
| postgres$# DECLARE |
| postgres$# BEGIN |
| postgres$# PERFORM pg_notify( |
| postgres$# 'accounts', |
| postgres$# row_to_json(NEW)::text); |
| postgres$# RETURN NEW; |
| postgres$# END; |
| postgres$# $$ LANGUAGE plpgsql; |
| CREATE FUNCTION |
| postgres=# |
| postgres=# CREATE TRIGGER row_inserted |
| postgres-# AFTER INSERT ON accounts |
| postgres-# FOR EACH ROW |
| postgres-# EXECUTE PROCEDURE row_inserted(); |
| CREATE TRIGGER |
| ---- |
| |
| We need to take note also of the container ip |
| |
| ---- |
| > docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb |
| 172.17.0.2 |
| ---- |
| |
| === Setup the connectors |
| |
| Open the PGEvent configuration file at `$EXAMPLES/pgevent/pgevent-source/config/CamelPgeventSourceConnector.properties` |
| |
| [source] |
| ---- |
| name=CamelPgeventSourceConnector |
| connector.class=org.apache.camel.kafkaconnector.pgevent.CamelPgeventSourceConnector |
| key.converter=org.apache.kafka.connect.storage.StringConverter |
| value.converter=org.apache.kafka.connect.storage.StringConverter |
| |
| topics=mytopic |
| |
| camel.source.endpoint.user=postgres |
| camel.source.endpoint.pass=mysecretpassword |
| camel.source.path.port=5432 |
| camel.source.path.host=172.17.0.2 |
| |
| camel.source.path.channel=accounts |
| camel.source.path.database=postgres |
| ---- |
| |
| and add the correct IP for the container. |
| |
| === Running the example |
| |
| Run the kafka connect with the SQL Source connector: |
| |
| [source] |
| ---- |
| $KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $EXAMPLES/pgevent/pgevent-source/config/CamelPgeventSourceConnector.properties |
| ---- |
| |
| Now we need to run some insert in our database |
| |
| [source] |
| ---- |
| > docker exec -it 6cd4ba4696f2e8872f3787faaa8d03d1dae5cb5f22986648adf132823f3690eb psql -U postgres |
| postgres=# insert into accounts(username,city) values('andrea','Roma'); |
| INSERT 0 1 |
| Asynchronous notification "accounts" with payload "{"user_id":1,"username":"andrea","city":"Roma"}" received from server process with PID 152. |
| postgres=# insert into accounts(username,city) values('John','New York'); |
| INSERT 0 1 |
| Asynchronous notification "accounts" with payload "{"user_id":2,"username":"John","city":"New York"}" received from server process with PID 152. |
| ---- |
| |
| On a different terminal run the kafkacat consumer |
| |
| [source] |
| ---- |
| > ./kafkacat -b localhost:9092 -t mytopic |
| {"user_id":1,"username":"andrea","city":"Roma"} |
| {"user_id":2,"username":"John","city":"New York"} |
| % Reached end of topic dbtest6 [0] at offset 2 |
| ---- |
| |