commit | 27812cfd17c78d79147522da503afce225eda709 | [log] [tgz] |
---|---|---|
author | Jark Wu <jark@apache.org> | Wed Nov 03 15:56:16 2021 +0800 |
committer | GitHub <noreply@github.com> | Wed Nov 03 15:56:16 2021 +0800 |
tree | 51e15e452201841c455c8596677d84b44d11a22b | |
parent | 7ca30b839e3c632702dd86a9528e4fa8e121aefa [diff] |
[docs] Add GitHub star button to documentation (#553)
Flink CDC Connectors is a set of source connectors for Apache Flink, ingesting changes from different databases using change data capture (CDC). The Flink CDC Connectors integrates Debezium as the engine to capture data changes. So it can fully leverage the ability of Debezium. See more about what is Debezium.
This README is meant as a brief walkthrough on the core features with Flink CDC Connectors. For a fully detailed documentation, please see Documentation.
Database | Version |
---|---|
MySQL | Database: 5.7, 8.0.x JDBC Driver: 8.0.16 |
PostgreSQL | Database: 9.6, 10, 11, 12 JDBC Driver: 42.2.12 |
We need several steps to setup a Flink cluster with the provided connector.
FLINK_HOME/lib/
.The example shows how to create a MySQL CDC source in Flink SQL Client and execute queries on it.
-- creates a mysql cdc table source CREATE TABLE mysql_binlog ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3) ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'inventory', 'table-name' = 'products' ); -- read snapshot and binlog data from mysql, and do some transformation, and show on the client SELECT id, UPPER(name), description, weight FROM mysql_binlog;
Include following Maven dependency (available through Maven Central):
<dependency> <groupId>com.ververica</groupId> <!-- add the dependency matching your database --> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.0.2</version> </dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import com.ververica.cdc.connectors.mysql.MySqlSource; public class MySqlBinlogSourceExample { public static void main(String[] args) throws Exception { Properties debeziumProperties = new Properties(); debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock SourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname("yourHostname") .port(yourPort) .databaseList("yourDatabaseName") // set captured database .tableList("yourDatabaseName.yourTableName") // set captured table .username("yourUsername") .password("yourPassword") .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String .debeziumProperties(debeziumProperties) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .addSource(sourceFunction) .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute(); } }
git clone https://github.com/ververica/flink-cdc-connectors.git cd flink-cdc-connectors mvn clean install -DskipTests
Flink CDC Connectors is now available at your local .m2
repository.
The code in this repository is licensed under the Apache Software License 2.
The Flink CDC Connectors welcomes anyone that wants to help out in any way, whether that includes reporting problems, helping with documentation, or contributing code changes to fix bugs, add tests, or implement new features. You can report problems to request features in the GitHub Issues.
DingTalk Chinese User Group
You can search the group number [33121212] or scan the following QR code to join in the group.
To get started, please see https://ververica.github.io/flink-cdc-connectors/