import ChangeLog from ‘../changelog/connector-cloudberry.md’;
JDBC Cloudberry Sink Connector
Spark
Flink
SeaTunnel Zeta
Write data through JDBC. Cloudberry currently does not have its own native driver. It uses PostgreSQL‘s driver for connectivity and follows PostgreSQL’s implementation.
Support Batch mode and Streaming mode, support concurrent writing, support exactly-once semantics (using XA transaction guarantee).
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/plugins/.
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/lib/.
Use
Xa transactionsto ensureexactly-once. So only supportexactly-oncefor the database which is supportXa transactions. You can setis_exactly_once=trueto enable it.
| Datasource | Supported Versions | Driver | Url | Maven |
|---|---|---|---|---|
| Cloudberry | Uses PostgreSQL driver implementation | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | Download |
Please download the PostgreSQL driver jar and copy it to the ‘$SEATUNNEL_HOME/plugins/jdbc/lib/’ working directory
For example: cp postgresql-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/
Cloudberry uses PostgreSQL's data type implementation. Please refer to PostgreSQL documentation for data type compatibility and mappings.
Cloudberry connector uses the same options as PostgreSQL. For detailed configuration options, please refer to the PostgreSQL documentation.
Key options include:
env { parallelism = 1 job.mode = "BATCH" } source { FakeSource { parallelism = 1 plugin_output = "fake" row.num = 16 schema = { fields { name = "string" age = "int" } } } } sink { jdbc { url = "jdbc:postgresql://localhost:5432/cloudberrydb" driver = "org.postgresql.Driver" user = "dbadmin" password = "password" query = "insert into test_table(name,age) values(?,?)" } }
sink { Jdbc { url = "jdbc:postgresql://localhost:5432/cloudberrydb" driver = "org.postgresql.Driver" user = "dbadmin" password = "password" generate_sink_sql = true database = "mydb" table = "public.test_table" } }
sink { jdbc { url = "jdbc:postgresql://localhost:5432/cloudberrydb" driver = "org.postgresql.Driver" user = "dbadmin" password = "password" query = "insert into test_table(name,age) values(?,?)" is_exactly_once = "true" xa_data_source_class_name = "org.postgresql.xa.PGXADataSource" } }
sink { jdbc { url = "jdbc:postgresql://localhost:5432/cloudberrydb" driver = "org.postgresql.Driver" user = "dbadmin" password = "password" generate_sink_sql = true database = "mydb" table = "sink_table" primary_keys = ["id","name"] field_ide = UPPERCASE } }
sink { Jdbc { url = "jdbc:postgresql://localhost:5432/cloudberrydb" driver = "org.postgresql.Driver" user = "dbadmin" password = "password" generate_sink_sql = true database = "mydb" table = "public.test_table" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" } }
For more detailed examples and options, please refer to the PostgreSQL connector documentation.