| |
| # Copyright (c) 2021-2023, PostgreSQL Global Development Group |
| |
| # Binary mode logical replication test |
| |
| use strict; |
| use warnings; |
| use PostgreSQL::Test::Cluster; |
| use PostgreSQL::Test::Utils; |
| use Test::More; |
| |
| # Create and initialize a publisher node |
| my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); |
| $node_publisher->init(allows_streaming => 'logical'); |
| $node_publisher->start; |
| |
| # Create and initialize subscriber node |
| my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); |
| $node_subscriber->init(allows_streaming => 'logical'); |
| $node_subscriber->start; |
| |
| # Create tables on both sides of the replication |
| my $ddl = qq( |
| CREATE TABLE public.test_numerical ( |
| a INTEGER PRIMARY KEY, |
| b NUMERIC, |
| c FLOAT, |
| d BIGINT |
| ); |
| CREATE TABLE public.test_arrays ( |
| a INTEGER[] PRIMARY KEY, |
| b NUMERIC[], |
| c TEXT[] |
| );); |
| |
| $node_publisher->safe_psql('postgres', $ddl); |
| $node_subscriber->safe_psql('postgres', $ddl); |
| |
| # Configure logical replication |
| $node_publisher->safe_psql('postgres', |
| "CREATE PUBLICATION tpub FOR ALL TABLES"); |
| |
| # ------------------------------------------------------ |
| # Ensure binary mode also executes COPY in binary format |
| # ------------------------------------------------------ |
| |
| # Insert some content before creating a subscription |
| $node_publisher->safe_psql( |
| 'postgres', qq( |
| INSERT INTO public.test_numerical (a, b, c, d) VALUES |
| (1, 1.2, 1.3, 10), |
| (2, 2.2, 2.3, 20); |
| INSERT INTO public.test_arrays (a, b, c) VALUES |
| ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'), |
| ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}'); |
| )); |
| |
| my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres'; |
| $node_subscriber->safe_psql('postgres', |
| "CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' " |
| . "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)"); |
| |
| # Ensure the COPY command is executed in binary format on the publisher |
| $node_publisher->wait_for_log( |
| qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT WITH \(FORMAT binary\)/ |
| ); |
| |
| # Ensure nodes are in sync with each other |
| $node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub'); |
| |
| my $sync_check = qq( |
| SELECT a, b, c, d FROM test_numerical ORDER BY a; |
| SELECT a, b, c FROM test_arrays ORDER BY a; |
| ); |
| |
| # Check the synced data on the subscriber |
| my $result = $node_subscriber->safe_psql('postgres', $sync_check); |
| |
| is( $result, '1|1.2|1.3|10 |
| 2|2.2|2.3|20 |
| {1,2,3}|{1.1,1.2,1.3}|{one,two,three} |
| {3,1,2}|{1.3,1.1,1.2}|{three,one,two}', 'check synced data on subscriber'); |
| |
| # ---------------------------------- |
| # Ensure apply works in binary mode |
| # ---------------------------------- |
| |
| # Insert some content and make sure it's replicated across |
| $node_publisher->safe_psql( |
| 'postgres', qq( |
| INSERT INTO public.test_arrays (a, b, c) VALUES |
| ('{2,1,3}', '{1.2, 1.1, 1.3}', '{"two", "one", "three"}'), |
| ('{1,3,2}', '{1.1, 1.3, 1.2}', '{"one", "three", "two"}'); |
| |
| INSERT INTO public.test_numerical (a, b, c, d) VALUES |
| (3, 3.2, 3.3, 30), |
| (4, 4.2, 4.3, 40); |
| )); |
| |
| $node_publisher->wait_for_catchup('tsub'); |
| |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT a, b, c, d FROM test_numerical ORDER BY a"); |
| |
| is( $result, '1|1.2|1.3|10 |
| 2|2.2|2.3|20 |
| 3|3.2|3.3|30 |
| 4|4.2|4.3|40', 'check replicated data on subscriber'); |
| |
| # Test updates as well |
| $node_publisher->safe_psql( |
| 'postgres', qq( |
| UPDATE public.test_arrays SET b[1] = 42, c = NULL; |
| UPDATE public.test_numerical SET b = 42, c = NULL; |
| )); |
| |
| $node_publisher->wait_for_catchup('tsub'); |
| |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT a, b, c FROM test_arrays ORDER BY a"); |
| |
| is( $result, '{1,2,3}|{42,1.2,1.3}| |
| {1,3,2}|{42,1.3,1.2}| |
| {2,1,3}|{42,1.1,1.3}| |
| {3,1,2}|{42,1.1,1.2}|', 'check updated replicated data on subscriber'); |
| |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT a, b, c, d FROM test_numerical ORDER BY a"); |
| |
| is( $result, '1|42||10 |
| 2|42||20 |
| 3|42||30 |
| 4|42||40', 'check updated replicated data on subscriber'); |
| |
| # ------------------------------------------------------------------------------ |
| # Use ALTER SUBSCRIPTION to change to text format and then back to binary format |
| # ------------------------------------------------------------------------------ |
| |
| # Test to reset back to text formatting, and then to binary again |
| $node_subscriber->safe_psql('postgres', |
| "ALTER SUBSCRIPTION tsub SET (binary = false);"); |
| |
| $node_publisher->safe_psql( |
| 'postgres', qq( |
| INSERT INTO public.test_numerical (a, b, c, d) VALUES |
| (5, 5.2, 5.3, 50); |
| )); |
| |
| $node_publisher->wait_for_catchup('tsub'); |
| |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT a, b, c, d FROM test_numerical ORDER BY a"); |
| |
| is( $result, '1|42||10 |
| 2|42||20 |
| 3|42||30 |
| 4|42||40 |
| 5|5.2|5.3|50', 'check replicated data on subscriber'); |
| |
| $node_subscriber->safe_psql('postgres', |
| "ALTER SUBSCRIPTION tsub SET (binary = true);"); |
| |
| $node_publisher->safe_psql( |
| 'postgres', qq( |
| INSERT INTO public.test_arrays (a, b, c) VALUES |
| ('{2,3,1}', '{1.2, 1.3, 1.1}', '{"two", "three", "one"}'); |
| )); |
| |
| $node_publisher->wait_for_catchup('tsub'); |
| |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT a, b, c FROM test_arrays ORDER BY a"); |
| |
| is( $result, '{1,2,3}|{42,1.2,1.3}| |
| {1,3,2}|{42,1.3,1.2}| |
| {2,1,3}|{42,1.1,1.3}| |
| {2,3,1}|{1.2,1.3,1.1}|{two,three,one} |
| {3,1,2}|{42,1.1,1.2}|', 'check replicated data on subscriber'); |
| |
| # --------------------------------------------------------------- |
| # Test binary replication without and with send/receive functions |
| # --------------------------------------------------------------- |
| |
| # Create a custom type without send/rcv functions |
| $ddl = qq( |
| CREATE TYPE myvarchar; |
| CREATE FUNCTION myvarcharin(cstring, oid, integer) RETURNS myvarchar |
| LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharin'; |
| CREATE FUNCTION myvarcharout(myvarchar) RETURNS cstring |
| LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharout'; |
| CREATE TYPE myvarchar ( |
| input = myvarcharin, |
| output = myvarcharout); |
| CREATE TABLE public.test_myvarchar ( |
| a myvarchar |
| );); |
| |
| $node_publisher->safe_psql('postgres', $ddl); |
| $node_subscriber->safe_psql('postgres', $ddl); |
| |
| # Insert some initial data |
| $node_publisher->safe_psql( |
| 'postgres', qq( |
| INSERT INTO public.test_myvarchar (a) VALUES |
| ('a'); |
| )); |
| |
| # Check the subscriber log from now on. |
| my $offset = -s $node_subscriber->logfile; |
| |
| # Refresh the publication to trigger the tablesync |
| $node_subscriber->safe_psql('postgres', |
| "ALTER SUBSCRIPTION tsub REFRESH PUBLICATION"); |
| |
| # It should fail |
| $node_subscriber->wait_for_log( |
| qr/ERROR: ( [A-Z0-9]+:)? no binary input function available for type/, |
| $offset); |
| |
| # Create and set send/rcv functions for the custom type |
| $ddl = qq( |
| CREATE FUNCTION myvarcharsend(myvarchar) RETURNS bytea |
| LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharsend'; |
| CREATE FUNCTION myvarcharrecv(internal, oid, integer) RETURNS myvarchar |
| LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharrecv'; |
| ALTER TYPE myvarchar SET ( |
| send = myvarcharsend, |
| receive = myvarcharrecv |
| );); |
| |
| $node_publisher->safe_psql('postgres', $ddl); |
| $node_subscriber->safe_psql('postgres', $ddl); |
| |
| # Now tablesync should succeed |
| $node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub'); |
| |
| # Check the synced data on the subscriber |
| $result = |
| $node_subscriber->safe_psql('postgres', 'SELECT a FROM test_myvarchar;'); |
| |
| is($result, 'a', 'check synced data on subscriber with custom type'); |
| |
| # ----------------------------------------------------- |
| # Test mismatched column types with/without binary mode |
| # ----------------------------------------------------- |
| |
| # Test syncing tables with mismatching column types |
| $node_publisher->safe_psql( |
| 'postgres', qq( |
| CREATE TABLE public.test_mismatching_types ( |
| a bigint PRIMARY KEY |
| ); |
| INSERT INTO public.test_mismatching_types (a) |
| VALUES (1), (2); |
| )); |
| |
| # Check the subscriber log from now on. |
| $offset = -s $node_subscriber->logfile; |
| |
| $node_subscriber->safe_psql( |
| 'postgres', qq( |
| CREATE TABLE public.test_mismatching_types ( |
| a int PRIMARY KEY |
| ); |
| ALTER SUBSCRIPTION tsub REFRESH PUBLICATION; |
| )); |
| |
| # Cannot sync due to type mismatch |
| $node_subscriber->wait_for_log( |
| qr/ERROR: ( [A-Z0-9]+:)? incorrect binary data format/, $offset); |
| |
| # Check the publisher log from now on. |
| $offset = -s $node_publisher->logfile; |
| |
| # Setting binary to false should allow syncing |
| $node_subscriber->safe_psql( |
| 'postgres', qq( |
| ALTER SUBSCRIPTION tsub SET (binary = false);)); |
| |
| # Ensure the COPY command is executed in text format on the publisher |
| $node_publisher->wait_for_log( |
| qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT\n/, $offset); |
| |
| $node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub'); |
| |
| # Check the synced data on the subscriber |
| $result = $node_subscriber->safe_psql('postgres', |
| 'SELECT a FROM test_mismatching_types ORDER BY a;'); |
| |
| is( $result, '1 |
| 2', 'check synced data on subscriber with binary = false'); |
| |
| $node_subscriber->stop('fast'); |
| $node_publisher->stop('fast'); |
| |
| done_testing(); |