tree: c31fb4fa3ae1b75f8ea14244319e73296e82b8e2 [path history] [tgz]
  1. main/
  2. test/
  3. README.md
sdks/java/io/debezium/src/README.md

DebeziumIO

Connect your Debezium Databases to Apache Beam easily.

What is DebeziumIO?

DebeziumIO is an Apache Beam connector that lets users connect their Events-Driven Databases on Debezium to Apache Beam without the need to set up a Kafka instance.

Getting Started

DebeziumIO uses Debezium Connectors v1.3 to connect to Apache Beam. All you need to do is choose the Debezium Connector that suits your Debezium setup and pick a Serializable Function, then you will be able to connect to Apache Beam and start building your own Pipelines.

These connectors have been successfully tested and are known to work fine:

  • MySQL Connector
  • PostgreSQL Connector
  • SQLServer Connector
  • DB2 Connector

Other connectors might also work.

Setting up a connector and running a Pipeline should be as simple as:

Pipeline p = Pipeline.create();                   // Create a Pipeline
        p.apply(DebeziumIO.<String>read()
                .withConnectorConfiguration(...)  // Debezium Connector setup
                .withFormatFunction(...)          // Serializable Function to use
        ).setCoder(StringUtf8Coder.of());
p.run().waitUntilFinish();                        // Run your pipeline!

Setting up a Debezium Connector

DebeziumIO comes with a handy ConnectorConfiguration builder, which lets you provide all the configuration needed to access your Debezium Database.

A basic configuration such as username, password, port number, and host name must be specified along with the Debezium Connector class you will use by using these methods:

MethodParamDescription
.withConnectorClass(connectorClass)ClassDebezium Connector
.withUsername(username)StringDatabase Username
.withPassword(password)StringDatabase Password
.withHostName(hostname)StringDatabase Hostname
.withPort(portNumber)StringDatabase Port number

You can also add more configuration, such as Connector-specific Properties with the _withConnectionProperty_ method:

MethodParamsDescription
.withConnectionProperty(propName, propValue)String, StringAdds a custom property to the connector.

Note: For more information on custom properties, see your Debezium Connector specific documentation.

Example of a MySQL Debezium Connector setup:

DebeziumIO.ConnectorConfiguration.create()
        .withUsername("dbUsername")
        .withPassword("dbPassword")
        .withConnectorClass(MySqlConnector.class)
        .withHostName("127.0.0.1")
        .withPort("3306")
        .withConnectionProperty("database.server.id", "serverId")
        .withConnectionProperty("database.server.name", "serverName")
        .withConnectionProperty("database.include.list", "dbName")
        .withConnectionProperty("include.schema.changes", "false")

Setting a Serializable Function

A serializable function is required to depict each SourceRecord fetched from the Database.

DebeziumIO comes with a built-in JSON Mapper that you can optionally use to map every SourceRecord fetched from the Database to a JSON object. This helps users visualize and access their data in a simple way.

If you want to use this built-in JSON Mapper, you can do it by setting an instance of SourceRecordJsonMapper as a Serializable Function to the DebeziumIO:

.withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper())

Note: SourceRecordJsonMappercomes out of the box, but you may use any Format Function you prefer.

Quick Example

The following example is how an actual setup would look like using a MySQL Debezium Connector and SourceRecordJsonMapper as the Serializable Function.

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(DebeziumIO.<String>read().
        withConnectorConfiguration(                     // Debezium Connector setup
                DebeziumIO.ConnectorConfiguration.create()
                        .withUsername("debezium")
                        .withPassword("dbz")
                        .withConnectorClass(MySqlConnector.class)
                        .withHostName("127.0.0.1")
                        .withPort("3306")
                        .withConnectionProperty("database.server.id", "184054")
                        .withConnectionProperty("database.server.name", "dbserver1")
                        .withConnectionProperty("database.include.list", "inventory")
                        .withConnectionProperty("include.schema.changes", "false")
        ).withFormatFunction(
                new SourceRecordJson.SourceRecordJsonMapper() // Serializable Function
        )
).setCoder(StringUtf8Coder.of());

p.run().waitUntilFinish();

Shortcut!

If you will be using the built-in SourceRecordJsonMapper as your Serializable Function for all your pipelines, you should use readAsJson().

DebeziumIO comes with a method called readAsJson, which automatically sets the SourceRecordJsonMapper as the Serializable Function for your pipeline. This way, you would need to setup your connector before running your pipeline, without explicitly setting a Serializable Function.

Example of using readAsJson:

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(DebeziumIO.<String>read().
        withConnectorConfiguration(                     // Debezium Connector setup
                DebeziumIO.ConnectorConfiguration.create()
                        .withUsername("debezium")
                        .withPassword("dbz")
                        .withConnectorClass(MySqlConnector.class)
                        .withHostName("127.0.0.1")
                        .withPort("3306")
                        .withConnectionProperty("database.server.id", "184054")
                        .withConnectionProperty("database.server.name", "dbserver1")
                        .withConnectionProperty("database.include.list", "inventory")
                        .withConnectionProperty("include.schema.changes", "false"));

p.run().waitUntilFinish();

Under the hood

KafkaSourceConsumerFn and Restrictions

KafkaSourceConsumerFn (KSC onwards) is a DoFn in charge of the Database replication and CDC.

There are two ways of initializing KSC:

  • Restricted by number of records
  • Restricted by amount of time (minutes)

By default, DebeziumIO initializes it with the former, though user may choose the latter by setting the amount of minutes as a parameter:

FunctionParamDescription
KafkaSourceConsumerFn(connectorClass, recordMapper, maxRecords)Class, SourceRecordMapper, IntRestrict run by number of records (Default).
KafkaSourceConsumerFn(connectorClass, recordMapper, timeToRun)Class, SourceRecordMapper, LongRestrict run by amount of time (in minutes).

Requirements and Supported versions

  • JDK v8
  • Debezium Connectors v1.3
  • Apache Beam 2.25

Running Unit Tests

You can run Integration Tests using gradlew.

Example of running the MySQL Connector Integration Test:

./gradlew integrationTest -p sdks/java/io/debezium/ --tests org.apache.beam.io.debezium.DebeziumIOMySqlConnectorIT -DintegrationTestRunner=direct