The SQL
configuration file appears as follows.
/* config env { parallelism = 1 job.mode = "BATCH" } */ CREATE TABLE source_table WITH ( 'connector'='jdbc', 'type'='source', 'url' = 'jdbc:mysql://localhost:3306/seatunnel', 'driver' = 'com.mysql.cj.jdbc.Driver', 'user' = 'root', 'password' = '123456', 'query' = 'select * from source', 'properties'= '{ useSSL = false, rewriteBatchedStatements = true }' ); CREATE TABLE sink_table WITH ( 'connector'='jdbc', 'type'='sink', 'url' = 'jdbc:mysql://localhost:3306/seatunnel', 'driver' = 'com.mysql.cj.jdbc.Driver', 'user' = 'root', 'password' = '123456', 'generate_sink_sql' = 'true', 'database' = 'seatunnel', 'table' = 'sink' ); INSERT INTO sink_table SELECT id, name, age, email FROM source_table;
SQL
Configuration File/* config env { parallelism = 1 job.mode = "BATCH" } */
In the SQL
file, common configuration sections are defined using /* config */
comments. Inside, common configurations like env
can be defined using HOCON
format.
CREATE TABLE source_table WITH ( 'connector'='jdbc', 'type'='source', 'url' = 'jdbc:mysql://localhost:3306/seatunnel', 'driver' = 'com.mysql.cj.jdbc.Driver', 'user' = 'root', 'password' = '123456', 'query' = 'select * from source', 'properties' = '{ useSSL = false, rewriteBatchedStatements = true }' );
CREATE TABLE ... WITH (...)
syntax creates a mapping for the source table. The TABLE
name is the name of the source-mapped table, and the WITH
syntax contains source-related configuration parameters.connector
and type
, representing connector plugin name (such as jdbc
, FakeSource
, etc.) and source type (fixed as source
), respectively.'key' = 'value',
.'value'
is a sub-configuration, you can directly use a string in HOCON
format. Note: if using a sub-configuration in HOCON
format, the internal property items must be separated by ,
, like this:'properties' = '{ useSSL = false, rewriteBatchedStatements = true }'
'
within 'value'
, it needs to be escaped with ''
, like this:'query' = 'select * from source where name = ''Joy Ding'''
CREATE TABLE sink_table WITH ( 'connector'='jdbc', 'type'='sink', 'url' = 'jdbc:mysql://localhost:3306/seatunnel', 'driver' = 'com.mysql.cj.jdbc.Driver', 'user' = 'root', 'password' = '123456', 'generate_sink_sql' = 'true', 'database' = 'seatunnel', 'table' = 'sink' );
CREATE TABLE ... WITH (...)
syntax creates a mapping for the target table. The TABLE
name is the name of the target-mapped table, and the WITH
syntax contains sink-related configuration parameters.WITH
syntax: connector
and type
, representing connector plugin name (such as jdbc
, console
, etc.) and target type (fixed as sink
), respectively.'key' = 'value',
.INSERT INTO sink_table SELECT id, name, age, email FROM source_table;
SELECT FROM
part is the table name of the source-mapped table.INSERT INTO
part is the table name of the target-mapped table.INSERT
, like this: INSERT INTO sink_table (id, name, age, email) SELECT id, name, age, email FROM source_table;
INSERT INTO sink_table SELECT source_table;
SELECT
part directly uses the name of the source-mapped table, indicating that all data from the source table will be inserted into the target table.transform
configurations. This syntax is generally used in multi-table synchronization scenarios. For example:CREATE TABLE source_table WITH ( 'connector'='jdbc', 'type' = 'source', 'url' = 'jdbc:mysql://127.0.0.1:3306/seatunnel', 'driver' = 'com.mysql.cj.jdbc.Driver', 'user' = 'root', 'password' = '123456', 'table_list' = '[ { table_path = "source.table1" }, { table_path = "source.table2", query = "select * from source.table2" } ]' ); CREATE TABLE sink_table WITH ( 'connector'='jdbc', 'type' = 'sink', 'url' = 'jdbc:mysql://127.0.0.1:3306/seatunnel', 'driver' = 'com.mysql.cj.jdbc.Driver', 'user' = 'root', 'password' = '123456', 'generate_sink_sql' = 'true', 'database' = 'sink' ); INSERT INTO sink_table SELECT source_table;
CREATE TABLE temp1 AS SELECT id, name, age, email FROM source_table;
SELECT
query, used for INSERT INTO
operations.SELECT
part refers to: SQL-transform query
configuration itemCREATE TABLE temp1 AS SELECT id, name, age, email FROM source_table; INSERT INTO sink_table SELECT * FROM temp1;
./bin/seatunnel.sh --config ./config/sample.sql