Doris provides the following ways to load data from PostgreSQL:
Doris uses JDBC Catalog to map PostgreSQL as an external catalog, allowing direct SQL queries against PostgreSQL data. Combined with INSERT INTO or CREATE TABLE AS SELECT, this is suitable for one-time migration or periodic batch loading.
Doris uses Streaming Job to continuously sync full and incremental data from PostgreSQL to Doris. By integrating Flink CDC reading capability, Doris keeps the job running, reads WAL from PostgreSQL and writes it to Doris tables with exactly-once semantics. Two modes are supported: SQL Mapping Sync and Auto Table Creation Sync. Available since Doris 4.1.0.
Use Flink Doris Connector together with Flink Postgres CDC for real-time synchronization. This is suitable for scenarios that require additional Flink stream processing logic. The connector also provides a one-click full-database synchronization tool. For details, see Flink Doris Connector.
Data integration tools such as DataX, SeaTunnel, and CloudCanal also support syncing data from PostgreSQL to Doris.
In most cases, you can use JDBC Catalog directly for one-time data migration. When continuous full + incremental synchronization is required, Streaming Job is recommended.
Use JDBC Catalog to map PostgreSQL as an external catalog, then use INSERT INTO or CREATE TABLE AS SELECT to load data. For detailed syntax, see JDBC PostgreSQL Catalog.
CREATE TABLE public.students ( id INT PRIMARY KEY, name VARCHAR(64), age INT ); INSERT INTO public.students VALUES (1, 'Emily', 25), (2, 'Bob', 30);
CREATE CATALOG pg_catalog PROPERTIES ( "type" = "jdbc", "user" = "postgres", "password" = "postgres", "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", "driver_url" = "postgresql-42.5.1.jar", "driver_class" = "org.postgresql.Driver" );
CREATE DATABASE IF NOT EXISTS doris_db; CREATE TABLE doris_db.students ( id INT, name VARCHAR(64), age INT ) UNIQUE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ("replication_num" = "1");
INSERT INTO doris_db.students SELECT id, name, age FROM pg_catalog.postgres.public.students;
If the target table does not exist yet, you can also use CREATE TABLE AS SELECT to create the table and load data in one step:
CREATE TABLE doris_db.students PROPERTIES ("replication_num" = "1") AS SELECT * FROM pg_catalog.postgres.public.students;
SELECT * FROM doris_db.students; +----+-------+------+ | id | name | age | +----+-------+------+ | 1 | Emily | 25 | | 2 | Bob | 30 | +----+-------+------+
Streaming Job continuously reads PostgreSQL WAL via Flink CDC and writes it to Doris. Two modes are supported:
include_tables to sync one, several, or all tables). Provides at-least-once semantics.Before submitting a Streaming Job, logical replication (wal_level=logical) must be enabled on the PostgreSQL side and the user must be granted the corresponding REPLICATION privileges. For environment-specific setup steps, see:
Auto Table Creation Sync uses the FROM POSTGRES ... TO DATABASE ... syntax. The target is a Doris database, and the downstream tables are automatically created on first sync.
CREATE TABLE public.students ( id INT PRIMARY KEY, name VARCHAR(64), age INT ); INSERT INTO public.students VALUES (1, 'Emily', 25), (2, 'Bob', 30);
Auto Table Creation Sync does not require pre-creating tables, but the target database that hosts them must exist:
CREATE DATABASE IF NOT EXISTS doris_db;
The example below uses include_tables to sync only the students table (multiple tables can be comma-separated; leave empty to sync the whole database):
CREATE JOB pg_db_sync ON STREAMING FROM POSTGRES ( "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", "driver_url" = "postgresql-42.5.1.jar", "driver_class" = "org.postgresql.Driver", "user" = "postgres", "password" = "postgres", "database" = "postgres", "schema" = "public", "include_tables" = "students", "offset" = "initial" ) TO DATABASE doris_db ( "table.create.properties.replication_num" = "1" -- set to 1 in single-BE deployments );
SELECT * FROM jobs("type"="insert") WHERE ExecuteType = "STREAMING";
SHOW TABLES FROM doris_db; SELECT * FROM doris_db.students;
For more common operations and full parameter reference, see PostgreSQL CDC with Auto Table Creation.
CREATE TABLE public.students ( id INT PRIMARY KEY, name VARCHAR(64), age INT ); INSERT INTO public.students VALUES (1, 'Emily', 25), (2, 'Bob', 30);
SQL Mapping Sync requires the target table to exist beforehand:
CREATE DATABASE IF NOT EXISTS doris_db; CREATE TABLE doris_db.students ( id INT, name VARCHAR(64), age INT ) UNIQUE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ("replication_num" = "1");
Use CREATE STREAMING JOB with the INSERT INTO ... SELECT * FROM cdc_stream(...) syntax:
CREATE JOB pg_students_sync ON STREAMING DO INSERT INTO doris_db.students SELECT * FROM cdc_stream( "type" = "postgres", "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", "driver_url" = "postgresql-42.5.1.jar", "driver_class" = "org.postgresql.Driver", "user" = "postgres", "password" = "postgres", "database" = "postgres", "schema" = "public", "table" = "students", "offset" = "initial" );
SELECT * FROM jobs("type"="insert") WHERE ExecuteType = "STREAMING";
SELECT * FROM doris_db.students;
For more common operations (pause, resume, delete, check task, etc.) and full parameter reference, see PostgreSQL CDC with SQL Mapping.