import ChangeLog from ‘../changelog/connector-jdbc.md’;

Redshift

JDBC Redshift sink Connector

Support those engines

Spark
Flink
Seatunnel Zeta

Key features

Use Xa transactions to ensure exactly-once. So only support exactly-once for the database which is support Xa transactions. You can set is_exactly_once=true to enable it.

Description

Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once semantics (using XA transaction guarantee).

Supported DataSource list

datasourcesupported versionsdriverurlmaven
redshiftDifferent dependency version has different driver class.com.amazon.redshift.jdbc.Driverjdbc:redshift://localhost:5439/databaseDownload

Database dependency

For Spark/Flink Engine

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/plugins/.

For SeaTunnel Zeta Engine

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/lib/.

Data Type Mapping

SeaTunnel Data typeRedshift Data type
BOOLEANBOOLEAN
TINYINT
SMALLINT
SMALLINT
INTINTEGER
BIGINTBIGINT
FLOATREAL
DOUBLEDOUBLE PRECISION
DECIMALNUMERIC
STRING(<=65535)CHARACTER VARYING
STRING(>65535)SUPER
BYTESBINARY VARYING
TIMETIME
TIMESTAMPTIMESTAMP
MAP
ARRAY
ROW
SUPER

Task Example

Simple

sink {
    jdbc {
        url = "jdbc:redshift://localhost:5439/mydatabase"
        driver = "com.amazon.redshift.jdbc.Driver"
        username = "myUser"
        password = "myPassword"
        
        generate_sink_sql = true
        schema = "public"
        table = "sink_table"
    }
}

CDC(Change data capture) event

CDC change data is also supported by us In this case, you need config database, table and primary_keys.

sink {
    jdbc {
        url = "jdbc:redshift://localhost:5439/mydatabase"
        driver = "com.amazon.redshift.jdbc.Driver"
        username = "myUser"
        password = "mypassword"
        
        generate_sink_sql = true
        schema = "public"
        table = "sink_table"
        
        # config update/delete primary keys
        primary_keys = ["id","name"]
    }
}

Changelog