import {siteVariables} from ‘../../version’;
The PostgreSQL Extract Node
allows for reading snapshot data and incremental data from PostgreSQL database. This document describes how to set up the PostgreSQL Extract Node
to run SQL queries against PostgreSQL databases.
Extract Node | Version | Driver |
---|---|---|
PostgreSQL-CDC | PostgreSQL: 9.6, 10, 11, 12 | JDBC Driver: 42.2.12 |
In order to set up the PostgreSQL Extract Node
, the following provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
Change Data Capture (CDC) allows you to track and propagate changes in a PostgreSQL database to downstream consumers based on its Write-Ahead Log (WAL). You need to ensure that the upstream database is configured to support logical replication. Before using the PostgreSQL connector to monitor the changes committed on a PostgreSQL server, decide which logical decoding plug-in you intend to use. If you plan not to use the native pgoutput
logical replication stream support, then you must install the logical decoding plug-in into the PostgreSQL server.
pgoutput is the standard logical decoding output plug-in in PostgreSQL 10+. It is maintained by the PostgreSQL community, and used by PostgreSQL itself for logical replication. This plug-in is always present so no additional libraries need to be installed.
wal_level
configuration setting:SHOW wal_level;
The default value is replica. For CDC, you’ll need to set it to logical
in the database configuration file (postgresql.conf). Keep in mind that changing the wal_level requires a restart of the Postgres instance and can affect database performance.
wal_level = logical
decoderbufs is based on Protobuf and maintained by the Debezium community. installing it.
shared_preload_libraries = 'decoderbufs'
wal_level = logical
REPLICA IDENTITY
is a PostgreSQL-specific table-level setting that determines the amount of information that is available to the logical decoding plug-in for UPDATE and DELETE events. See more.
Please keep the replica indentity
level of source tables to FULL
. You can check and change this value by SQL:
-- show replica identity SELECT CASE relreplident WHEN 'd' THEN 'default' WHEN 'n' THEN 'nothing' WHEN 'f' THEN 'full' WHEN 'i' THEN 'index' END AS replica_identity FROM pg_class WHERE oid = 'mytablename'::regclass; -- change replica identity ALTER TABLE mytablename REPLICA IDENTITY FULL;
CREATE TABLE `postgresTable`( `name` STRING, `age` INT ) WITH ( 'connector' = 'postgres-cdc-inlong', 'hostname' = 'localhost', 'username' = 'postgres', 'password' = 'inlong', 'database-name' = 'postgres', 'schema-name' = 'public', 'port' = '5432', 'table-name' = 'user', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'feaafacbaddadc' )
TODO: It will be supported in the future.
TODO: It will be supported in the future.
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be postgres-cdc-inlong . |
hostname | required | (none) | String | IP address or hostname of the PostgreSQL database server. |
username | required | (none) | String | Name of the PostgreSQL database to use when connecting to the PostgreSQL database server. |
password | required | (none) | String | Password to use when connecting to the PostgreSQL database server. |
database-name | required | (none) | String | Database name of the PostgreSQL server to monitor. |
schema-name | required | (none) | String | Schema name of the PostgreSQL database to monitor. |
table-name | required | (none) | String | Table name of the PostgreSQL database to monitor. |
port | optional | 5432 | Integer | Integer port number of the PostgreSQL database server. |
decoding.plugin.name | optional | decoderbufs | String | The name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput. |
slot.name | optional | flink | String | The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring. Slot names must conform to PostgreSQL replication slot naming rules, which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.” |
debezium.* | optional | (none) | String | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from Postgres server. For example: ‘debezium.snapshot.mode’ = ‘never’. See more about the Debezium's Postgres Connector properties. |
inlong.metric.labels | optional | (none) | String | Inlong metric label, format of value is groupId={groupId} &streamId={streamId} &nodeId={nodeId} . |
:::caution
slot.name
is recommended to set for different tables to avoid the potential PSQLException: ERROR: replication slot “flink” is active for PID 974 error.SELECT * FROM pg_replication_slots; -- get slot name is flink. delete it SELECT pg_drop_replication_slot('flink');
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
key | Data Type | Description |
---|---|---|
table_name | STRING NOT NULL | Name of the table that contain the row. |
schema_name | STRING NOT NULL | Name of the schema that contain the row. |
database_name | STRING NOT NULL | Name of the database that contain the row. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the change stream, the value is always 0. |
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
CREATE TABLE postgresTable ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, `name` STRING, `age` INT ) WITH ( 'connector' = 'postgres-cdc-inlong', 'hostname' = 'localhost', 'username' = 'postgres', 'password' = 'inlong', 'database-name' = 'postgres', 'schema-name' = 'public', 'port' = '5432', 'table-name' = 'user', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'feaafacbaddadc' );
PostgreSQL type | Flink SQL type |
---|---|
TINYINT | |
SMALLINT INT2 SMALLSERIAL SERIAL2 | SMALLINT |
INTEGER SERIAL | INT |
BIGINT BIGSERIAL | BIGINT |
DECIMAL(20, 0) | |
REAL FLOAT4 | FLOAT |
FLOAT8 DOUBLE PRECISION | DOUBLE |
NUMERIC(p, s) DECIMAL(p, s) | DECIMAL(p, s) |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT | STRING |
BYTEA | BYTES |