blob: 8adb13ac1d635dedf708b0670a81c64951f0cfba [file] [log] [blame]
= Debezium PostgresSQL Connector Component
:doctitle: Debezium PostgresSQL Connector
:shortname: debezium-postgres
:artifactid: camel-debezium-postgres
:description: Capture changes from a PostgresSQL database.
:since: 3.0
:supportlevel: Stable
:tabs-sync-option:
:component-header: Only consumer is supported
//Manually maintained attributes
:group: Debezium
:camel-spring-boot-name: debezium-postgres
*Since Camel {since}*
*{component-header}*
The Debezium PostgresSQL component is wrapper around https://debezium.io/[Debezium] using https://debezium.io/documentation/reference/1.9/development/engine.html[Debezium Engine], which enables Change Data Capture from PostgresSQL database using Debezium without the need for Kafka or Kafka Connect.
*Note on handling failures:* Per https://debezium.io/documentation/reference/1.9/development/engine.html#_handling_failures[Debezium Embedded Engine] documentation, the engines is actively recording source offsets and periodically flushes these offsets to a persistent storage, so when the application is restarted or crashed, the engine will resume from the last recorded offset.
Thus, at normal operation, your downstream routes will receive each event exactly once, however in case of an application crash (not having a graceful shutdown), the application will resume from the last recorded offset,
which may result in receiving duplicate events immediately after the restart. Therefore, your downstream routes should be tolerant enough of such case and deduplicate events if needed.
Maven users will need to add the following dependency to their `pom.xml`
for this component.
[source,xml]
----
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-debezium-postgres</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
----
== URI format
---------------------------
debezium-postgres:name[?options]
---------------------------
// component-configure options: START
// component-configure options: END
// component options: START
include::partial$component-configure-options.adoc[]
include::partial$component-endpoint-options.adoc[]
// component options: END
// endpoint options: START
// endpoint options: END
For more information about configuration:
https://debezium.io/documentation/reference/0.10/operations/embedded.html#engine-properties[https://debezium.io/documentation/reference/0.10/operations/embedded.html#engine-properties]
https://debezium.io/documentation/reference/0.10/connectors/postgresql.html#connector-properties[https://debezium.io/documentation/reference/0.10/connectors/postgresql.html#connector-properties]
// component headers: START
include::partial$component-endpoint-headers.adoc[]
// component headers: END
== Message body
The message body if is not `null` (in case of tombstones), it contains the state of the row after the event occurred as `Struct` format or `Map` format if you use the included Type Converter from `Struct` to `Map` (please look below for more explanation).
== Samples
=== Consuming events
Here is a very simple route that you can use in order to listen to Debezium events from PostgresSQL connector.
[source,java]
----
from("debezium-postgres:dbz-test-1?offsetStorageFileName=/usr/offset-file-1.dat&databaseHostname=localhost&databaseUser=debezium&databasePassword=dbz&databaseServerName=my-app-connector&databaseHistoryFileFilename=/usr/history-file-1.dat")
.log("Event received from Debezium : ${body}")
.log(" with this identifier ${headers.CamelDebeziumIdentifier}")
.log(" with these source metadata ${headers.CamelDebeziumSourceMetadata}")
.log(" the event occurred upon this operation '${headers.CamelDebeziumSourceOperation}'")
.log(" on this database '${headers.CamelDebeziumSourceMetadata[db]}' and this table '${headers.CamelDebeziumSourceMetadata[table]}'")
.log(" with the key ${headers.CamelDebeziumKey}")
.log(" the previous value is ${headers.CamelDebeziumBefore}")
----
By default, the component will emit the events in the body and `CamelDebeziumBefore` header as https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html[`Struct`] data type, the reasoning behind this, is to perceive the schema information in case is needed.
However, the component as well contains a xref:manual::type-converter.adoc[Type Converter] that converts
from default output type of https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html[`Struct`] to `Map` in order to leverage Camel's rich xref:manual::data-format.adoc[Data Format] types which many of them work out of box with `Map` data type.
To use it, you can either add `Map.class` type when you access the message e.g: `exchange.getIn().getBody(Map.class)`, or you can convert the body always to `Map` from the route builder by adding `.convertBodyTo(Map.class)` to your Camel Route DSL after `from` statement.
We mentioned above about the schema, which can be used in case you need to perform advance data transformation and the schema is needed for that. If you choose not to convert your body to `Map`,
you can obtain the schema information as https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Schema.html[`Schema`] type from `Struct` like this:
[source,java]
----
from("debezium-postgres:[name]?[options]])
.process(exchange -> {
final Struct bodyValue = exchange.getIn().getBody(Struct.class);
final Schema schemaValue = bodyValue.schema();
log.info("Body value is : {}", bodyValue);
log.info("With Schema : {}", schemaValue);
log.info("And fields of : {}", schemaValue.fields());
log.info("Field name has `{}` type", schemaValue.field("name").schema());
});
----
*Important Note:* This component is a thin wrapper around Debezium Engine as mentioned, therefore before using this component in production, you need to understand how Debezium works and how configurations can reflect the expected behavior, especially in regards to https://debezium.io/documentation/reference/1.9/development/engine.html#_handling_failures[handling failures].
include::spring-boot:partial$starter.adoc[]