| |
| # Copyright (c) 2021-2023, PostgreSQL Global Development Group |
| |
| # Test the CREATE SUBSCRIPTION 'origin' parameter and its interaction with |
| # 'copy_data' parameter. |
| use strict; |
| use warnings; |
| use PostgreSQL::Test::Cluster; |
| use PostgreSQL::Test::Utils; |
| use Test::More; |
| |
| my $subname_AB = 'tap_sub_A_B'; |
| my $subname_AB2 = 'tap_sub_A_B_2'; |
| my $subname_BA = 'tap_sub_B_A'; |
| my $subname_BC = 'tap_sub_B_C'; |
| |
| my $result; |
| my $stdout; |
| my $stderr; |
| |
| ############################################################################### |
| # Setup a bidirectional logical replication between node_A & node_B |
| ############################################################################### |
| |
| # Initialize nodes |
| # node_A |
| my $node_A = PostgreSQL::Test::Cluster->new('node_A'); |
| $node_A->init(allows_streaming => 'logical'); |
| $node_A->start; |
| # node_B |
| my $node_B = PostgreSQL::Test::Cluster->new('node_B'); |
| $node_B->init(allows_streaming => 'logical'); |
| $node_B->start; |
| |
| # Create table on node_A |
| $node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); |
| |
| # Create the same table on node_B |
| $node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); |
| |
| # Setup logical replication |
| # node_A (pub) -> node_B (sub) |
| my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; |
| $node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab"); |
| $node_B->safe_psql( |
| 'postgres', " |
| CREATE SUBSCRIPTION $subname_BA |
| CONNECTION '$node_A_connstr application_name=$subname_BA' |
| PUBLICATION tap_pub_A |
| WITH (origin = none)"); |
| |
| # node_B (pub) -> node_A (sub) |
| my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; |
| $node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab"); |
| $node_A->safe_psql( |
| 'postgres', " |
| CREATE SUBSCRIPTION $subname_AB |
| CONNECTION '$node_B_connstr application_name=$subname_AB' |
| PUBLICATION tap_pub_B |
| WITH (origin = none, copy_data = off)"); |
| |
| # Wait for initial table sync to finish |
| $node_A->wait_for_subscription_sync($node_B, $subname_AB); |
| $node_B->wait_for_subscription_sync($node_A, $subname_BA); |
| |
| is(1, 1, 'Bidirectional replication setup is complete'); |
| |
| ############################################################################### |
| # Check that bidirectional logical replication setup does not cause infinite |
| # recursive insertion. |
| ############################################################################### |
| |
| # insert a record |
| $node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);"); |
| $node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);"); |
| |
| $node_A->wait_for_catchup($subname_BA); |
| $node_B->wait_for_catchup($subname_AB); |
| |
| # check that transaction was committed on subscriber(s) |
| $result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); |
| is( $result, qq(11 |
| 21), |
| 'Inserted successfully without leading to infinite recursion in bidirectional replication setup' |
| ); |
| $result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); |
| is( $result, qq(11 |
| 21), |
| 'Inserted successfully without leading to infinite recursion in bidirectional replication setup' |
| ); |
| |
| $node_A->safe_psql('postgres', "DELETE FROM tab;"); |
| |
| $node_A->wait_for_catchup($subname_BA); |
| $node_B->wait_for_catchup($subname_AB); |
| |
| ############################################################################### |
| # Check that remote data of node_B (that originated from node_C) is not |
| # published to node_A. |
| ############################################################################### |
| $result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); |
| is($result, qq(), 'Check existing data'); |
| |
| $result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); |
| is($result, qq(), 'Check existing data'); |
| |
| # Initialize node node_C |
| my $node_C = PostgreSQL::Test::Cluster->new('node_C'); |
| $node_C->init(allows_streaming => 'logical'); |
| $node_C->start; |
| |
| $node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); |
| |
| # Setup logical replication |
| # node_C (pub) -> node_B (sub) |
| my $node_C_connstr = $node_C->connstr . ' dbname=postgres'; |
| $node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab"); |
| $node_B->safe_psql( |
| 'postgres', " |
| CREATE SUBSCRIPTION $subname_BC |
| CONNECTION '$node_C_connstr application_name=$subname_BC' |
| PUBLICATION tap_pub_C |
| WITH (origin = none)"); |
| $node_B->wait_for_subscription_sync($node_C, $subname_BC); |
| |
| # insert a record |
| $node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);"); |
| |
| $node_C->wait_for_catchup($subname_BC); |
| $node_B->wait_for_catchup($subname_AB); |
| $node_A->wait_for_catchup($subname_BA); |
| |
| $result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); |
| is($result, qq(32), 'The node_C data replicated to node_B'); |
| |
| # check that the data published from node_C to node_B is not sent to node_A |
| $result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); |
| is($result, qq(), |
| 'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none' |
| ); |
| |
| ############################################################################### |
| # Specifying origin = NONE indicates that the publisher should only replicate the |
| # changes that are generated locally from node_B, but in this case since the |
| # node_B is also subscribing data from node_A, node_B can have remotely |
| # originated data from node_A. We log a warning, in this case, to draw |
| # attention to there being possible remote data. |
| ############################################################################### |
| ($result, $stdout, $stderr) = $node_A->psql( |
| 'postgres', " |
| CREATE SUBSCRIPTION $subname_AB2 |
| CONNECTION '$node_B_connstr application_name=$subname_AB2' |
| PUBLICATION tap_pub_B |
| WITH (origin = none, copy_data = on)"); |
| like( |
| $stderr, |
| qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/, |
| "Create subscription with origin = none and copy_data when the publisher has subscribed same table" |
| ); |
| |
| $node_A->wait_for_subscription_sync($node_B, $subname_AB2); |
| |
| # Alter subscription ... refresh publication should be successful when no new |
| # table is added |
| $node_A->safe_psql( |
| 'postgres', " |
| ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION"); |
| |
| # Check Alter subscription ... refresh publication when there is a new |
| # table that is subscribing data from a different publication |
| $node_A->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)"); |
| $node_B->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)"); |
| |
| # add a new table to the publication |
| $node_A->safe_psql('postgres', |
| "ALTER PUBLICATION tap_pub_A ADD TABLE tab_new"); |
| $node_B->safe_psql( |
| 'postgres', " |
| ALTER SUBSCRIPTION $subname_BA REFRESH PUBLICATION"); |
| |
| $node_B->wait_for_subscription_sync($node_A, $subname_BA); |
| |
| # add a new table to the publication |
| $node_B->safe_psql('postgres', |
| "ALTER PUBLICATION tap_pub_B ADD TABLE tab_new"); |
| |
| # Alter subscription ... refresh publication should log a warning when a new |
| # table on the publisher is subscribing data from a different publication |
| ($result, $stdout, $stderr) = $node_A->psql( |
| 'postgres', " |
| ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION"); |
| like( |
| $stderr, |
| qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/, |
| "Refresh publication when the publisher has subscribed for the new table, but the subscriber-side wants origin = none" |
| ); |
| |
| # Ensure that relation has reached 'ready' state before we try to drop it |
| my $synced_query = |
| "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');"; |
| $node_A->poll_query_until('postgres', $synced_query) |
| or die "Timed out while waiting for subscriber to synchronize data"; |
| |
| $node_B->wait_for_catchup($subname_AB2); |
| |
| # clear the operations done by this test |
| $node_A->safe_psql( |
| 'postgres', qq( |
| DROP TABLE tab_new; |
| DROP SUBSCRIPTION $subname_AB2; |
| DROP SUBSCRIPTION $subname_AB; |
| DROP PUBLICATION tap_pub_A; |
| )); |
| $node_B->safe_psql( |
| 'postgres', qq( |
| DROP TABLE tab_new; |
| DROP SUBSCRIPTION $subname_BA; |
| DROP PUBLICATION tap_pub_B; |
| )); |
| |
| ############################################################################### |
| # Specifying origin = NONE and copy_data = on must raise WARNING if we subscribe |
| # to a partitioned table and this table contains any remotely originated data. |
| # |
| # node_B |
| # __________________________ |
| # | tab_main | --------------> node_C (tab_main) |
| # |__________________________| |
| # | tab_part1 | tab_part2 | <-------------- node_A (tab_part2) |
| # |____________|_____________| |
| # | tab_part2_1 | |
| # |_____________| |
| # |
| # node_B |
| # __________________________ |
| # | tab_main | |
| # |__________________________| |
| # | tab_part1 | tab_part2 | <-------------- node_A (tab_part2) |
| # |____________|_____________| |
| # | tab_part2_1 | --------------> node_C (tab_part2_1) |
| # |_____________| |
| ############################################################################### |
| |
| # create a table on node A which will act as a source for a partition on node B |
| $node_A->safe_psql( |
| 'postgres', qq( |
| CREATE TABLE tab_part2(a int); |
| CREATE PUBLICATION tap_pub_A FOR TABLE tab_part2; |
| )); |
| |
| # create a partition table on node B |
| $node_B->safe_psql( |
| 'postgres', qq( |
| CREATE TABLE tab_main(a int) PARTITION BY RANGE(a); |
| CREATE TABLE tab_part1 PARTITION OF tab_main FOR VALUES FROM (0) TO (5); |
| CREATE TABLE tab_part2(a int) PARTITION BY RANGE(a); |
| CREATE TABLE tab_part2_1 PARTITION OF tab_part2 FOR VALUES FROM (5) TO (10); |
| ALTER TABLE tab_main ATTACH PARTITION tab_part2 FOR VALUES FROM (5) to (10); |
| CREATE SUBSCRIPTION tap_sub_A_B CONNECTION '$node_A_connstr' PUBLICATION tap_pub_A; |
| )); |
| |
| # create a table on node C |
| $node_C->safe_psql( |
| 'postgres', qq( |
| CREATE TABLE tab_main(a int); |
| CREATE TABLE tab_part2_1(a int); |
| )); |
| |
| # create a logical replication setup between node B and node C with |
| # subscription on node C having origin = NONE and copy_data = on |
| $node_B->safe_psql( |
| 'postgres', qq( |
| CREATE PUBLICATION tap_pub_B FOR TABLE tab_main WITH (publish_via_partition_root); |
| CREATE PUBLICATION tap_pub_B_2 FOR TABLE tab_part2_1; |
| )); |
| |
| ($result, $stdout, $stderr) = $node_C->psql( |
| 'postgres', " |
| CREATE SUBSCRIPTION tap_sub_B_C CONNECTION '$node_B_connstr' PUBLICATION tap_pub_B WITH (origin = none, copy_data = on); |
| "); |
| |
| # A warning must be logged as a partition 'tab_part2' in node B is subscribed to |
| # node A so partition 'tab_part2' can have remotely originated data |
| like( |
| $stderr, |
| qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_b_c" requested copy_data with origin = NONE but might copy data that had a different origin/, |
| "Create subscription with origin = none and copy_data when the publisher's partition is subscribing from different origin" |
| ); |
| $node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B_C"); |
| |
| ($result, $stdout, $stderr) = $node_C->psql( |
| 'postgres', " |
| CREATE SUBSCRIPTION tap_sub_B_C CONNECTION '$node_B_connstr' PUBLICATION tap_pub_B_2 WITH (origin = none, copy_data = on); |
| "); |
| |
| # A warning must be logged as ancestor of table 'tab_part2_1' in node B is |
| # subscribed to node A so table 'tab_part2_1' can have remotely originated |
| # data |
| like( |
| $stderr, |
| qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_b_c" requested copy_data with origin = NONE but might copy data that had a different origin/, |
| "Create subscription with origin = none and copy_data when the publisher's ancestor is subscribing from different origin" |
| ); |
| |
| # clear the operations done by this test |
| $node_C->safe_psql( |
| 'postgres', qq( |
| DROP SUBSCRIPTION tap_sub_B_C; |
| DROP TABLE tab_main; |
| DROP TABLE tab_part2_1; |
| )); |
| $node_B->safe_psql( |
| 'postgres', qq( |
| DROP SUBSCRIPTION tap_sub_A_B; |
| DROP PUBLICATION tap_pub_B; |
| DROP PUBLICATION tap_pub_B_2; |
| DROP TABLE tab_main; |
| )); |
| $node_A->safe_psql( |
| 'postgres', qq( |
| DROP PUBLICATION tap_pub_A; |
| DROP TABLE tab_part2; |
| )); |
| |
| # shutdown |
| $node_B->stop('fast'); |
| $node_A->stop('fast'); |
| $node_C->stop('fast'); |
| |
| done_testing(); |