Load Nodes is a set of Sink Connectors based on Apache Flink® for loading data to different storage systems.
Load Node | Version | Driver |
---|---|---|
Kafka | Kafka: 0.10+ | None |
HBase | HBase: 2.2.x | None |
PostgreSQL | PostgreSQL: 9.6, 10, 11, 12 | JDBC Driver: 42.2.12 |
Oracle | Oracle: 11, 12, 19 | Oracle Driver: 19.3.0.0 |
MySQL | MySQL: 5.6, 5.7, 8.0.x RDS MySQL: 5.6, 5.7, 8.0.x PolarDB MySQL: 5.6, 5.7, 8.0.x Aurora MySQL: 5.6, 5.7, 8.0.x MariaDB: 10.x PolarDB X: 2.0.1 | JDBC Driver: 8.0.21 |
TDSQL-PostgreSQL | TDSQL-PostgreSQL: 10.17 | JDBC Driver: 42.2.12 |
Greenplum | Greenplum: 4.x, 5.x, 6.x | JDBC Driver: 42.2.12 |
Elasticsearch | Elasticsearch: 6.x, 7.x | None |
ClickHouse | ClickHouse: 20.7+ | JDBC Driver: 0.3.1 |
Hive | Hive: 1.x, 2.x, 3.x | None |
SQLServer | SQLServer: 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 |
HDFS | HDFS: 2.x, 3.x | None |
Iceberg | Iceberg: 0.13.1+ | None |
The following table shows the version mapping between InLong® Load Nodes and Flink®:
InLong® Load Nodes Version | Flink® Version |
---|---|
1.2.0 | 1.13.5 |
We need several steps to setup a Flink cluster with the provided connector.
FLINK_HOME/lib/
.The example shows how to create a MySQL Load Node in Flink SQL Client and load data to it.
-- Creates a MySQL Extract Node CREATE TABLE mysql_extract_node ( id INT NOT NULL, name STRING, age INT, weight DECIMAL(10,3), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc-inlong', 'hostname' = 'YourHostname', 'port' = '3306', 'username' = 'YourUsername', 'password' = 'YourPassword', 'database-name' = 'YourDatabaseName', 'table-name' = 'YourTableName' ); -- Creates a MySQL Load Node CREATE TABLE mysql_load_node ( id INT NOT NULL, name STRING, age INT, weight DECIMAL(10,3), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc-inlong', 'url' = 'jdbc:mysql://YourHostname:3306/YourDatabaseName', 'username' = 'YourUsername', 'password' = 'YourPassword', 'table-name' = 'YourTableName' ); INSERT INTO mysql_load_node SELECT id, name, age, weight FROM mysql_extract_node;