| |
| # Copyright (c) 2021-2023, PostgreSQL Global Development Group |
| |
| # logical replication of 2PC test |
| use strict; |
| use warnings; |
| use PostgreSQL::Test::Cluster; |
| use PostgreSQL::Test::Utils; |
| use Test::More; |
| |
| ############################### |
| # Setup |
| ############################### |
| |
| # Initialize publisher node |
| my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); |
| $node_publisher->init(allows_streaming => 'logical'); |
| $node_publisher->append_conf('postgresql.conf', |
| qq(max_prepared_transactions = 10)); |
| $node_publisher->start; |
| |
| # Create subscriber node |
| my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); |
| $node_subscriber->init(allows_streaming => 'logical'); |
| $node_subscriber->append_conf('postgresql.conf', |
| qq(max_prepared_transactions = 0)); |
| $node_subscriber->start; |
| |
| # Create some pre-existing content on publisher |
| $node_publisher->safe_psql('postgres', |
| "CREATE TABLE tab_full (a int PRIMARY KEY)"); |
| $node_publisher->safe_psql( |
| 'postgres', " |
| BEGIN; |
| INSERT INTO tab_full SELECT generate_series(1,10); |
| PREPARE TRANSACTION 'some_initial_data'; |
| COMMIT PREPARED 'some_initial_data';"); |
| |
| # Setup structure on subscriber |
| $node_subscriber->safe_psql('postgres', |
| "CREATE TABLE tab_full (a int PRIMARY KEY)"); |
| |
| # Setup logical replication |
| my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; |
| $node_publisher->safe_psql('postgres', |
| "CREATE PUBLICATION tap_pub FOR TABLE tab_full"); |
| |
| my $appname = 'tap_sub'; |
| $node_subscriber->safe_psql( |
| 'postgres', " |
| CREATE SUBSCRIPTION tap_sub |
| CONNECTION '$publisher_connstr application_name=$appname' |
| PUBLICATION tap_pub |
| WITH (two_phase = on)"); |
| |
| # Wait for initial table sync to finish |
| $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); |
| |
| # Also wait for two-phase to be enabled |
| my $twophase_query = |
| "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');"; |
| $node_subscriber->poll_query_until('postgres', $twophase_query) |
| or die "Timed out while waiting for subscriber to enable twophase"; |
| |
| ############################### |
| # check that 2PC gets replicated to subscriber |
| # then COMMIT PREPARED |
| ############################### |
| |
| # Save the log location, to see the failure of the application |
| my $log_location = -s $node_subscriber->logfile; |
| |
| $node_publisher->safe_psql( |
| 'postgres', " |
| BEGIN; |
| INSERT INTO tab_full VALUES (11); |
| PREPARE TRANSACTION 'test_prepared_tab_full';"); |
| |
| # Confirm the ERROR is reported becasue max_prepared_transactions is zero |
| $node_subscriber->wait_for_log( |
| qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/); |
| |
| # Set max_prepared_transactions to correct value to resume the replication |
| $node_subscriber->append_conf('postgresql.conf', |
| qq(max_prepared_transactions = 10)); |
| $node_subscriber->restart; |
| |
| $node_publisher->wait_for_catchup($appname); |
| |
| # check that transaction is in prepared state on subscriber |
| my $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM pg_prepared_xacts;"); |
| is($result, qq(1), 'transaction is prepared on subscriber'); |
| |
| # check that 2PC gets committed on subscriber |
| $node_publisher->safe_psql('postgres', |
| "COMMIT PREPARED 'test_prepared_tab_full';"); |
| |
| $node_publisher->wait_for_catchup($appname); |
| |
| # check that transaction is committed on subscriber |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM tab_full where a = 11;"); |
| is($result, qq(1), 'Row inserted via 2PC has committed on subscriber'); |
| |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM pg_prepared_xacts;"); |
| is($result, qq(0), 'transaction is committed on subscriber'); |
| |
| ############################### |
| # check that 2PC gets replicated to subscriber |
| # then ROLLBACK PREPARED |
| ############################### |
| |
| $node_publisher->safe_psql( |
| 'postgres', " |
| BEGIN; |
| INSERT INTO tab_full VALUES (12); |
| PREPARE TRANSACTION 'test_prepared_tab_full';"); |
| |
| $node_publisher->wait_for_catchup($appname); |
| |
| # check that transaction is in prepared state on subscriber |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM pg_prepared_xacts;"); |
| is($result, qq(1), 'transaction is prepared on subscriber'); |
| |
| # check that 2PC gets aborted on subscriber |
| $node_publisher->safe_psql('postgres', |
| "ROLLBACK PREPARED 'test_prepared_tab_full';"); |
| |
| $node_publisher->wait_for_catchup($appname); |
| |
| # check that transaction is aborted on subscriber |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM tab_full where a = 12;"); |
| is($result, qq(0), 'Row inserted via 2PC is not present on subscriber'); |
| |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM pg_prepared_xacts;"); |
| is($result, qq(0), 'transaction is aborted on subscriber'); |
| |
| ############################### |
| # Check that ROLLBACK PREPARED is decoded properly on crash restart |
| # (publisher and subscriber crash) |
| ############################### |
| |
| $node_publisher->safe_psql( |
| 'postgres', " |
| BEGIN; |
| INSERT INTO tab_full VALUES (12); |
| INSERT INTO tab_full VALUES (13); |
| PREPARE TRANSACTION 'test_prepared_tab';"); |
| |
| $node_subscriber->stop('immediate'); |
| $node_publisher->stop('immediate'); |
| |
| $node_publisher->start; |
| $node_subscriber->start; |
| |
| # rollback post the restart |
| $node_publisher->safe_psql('postgres', |
| "ROLLBACK PREPARED 'test_prepared_tab';"); |
| $node_publisher->wait_for_catchup($appname); |
| |
| # check inserts are rolled back |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM tab_full where a IN (12,13);"); |
| is($result, qq(0), 'Rows rolled back are not on the subscriber'); |
| |
| ############################### |
| # Check that COMMIT PREPARED is decoded properly on crash restart |
| # (publisher and subscriber crash) |
| ############################### |
| |
| $node_publisher->safe_psql( |
| 'postgres', " |
| BEGIN; |
| INSERT INTO tab_full VALUES (12); |
| INSERT INTO tab_full VALUES (13); |
| PREPARE TRANSACTION 'test_prepared_tab';"); |
| |
| $node_subscriber->stop('immediate'); |
| $node_publisher->stop('immediate'); |
| |
| $node_publisher->start; |
| $node_subscriber->start; |
| |
| # commit post the restart |
| $node_publisher->safe_psql('postgres', |
| "COMMIT PREPARED 'test_prepared_tab';"); |
| $node_publisher->wait_for_catchup($appname); |
| |
| # check inserts are visible |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM tab_full where a IN (12,13);"); |
| is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); |
| |
| ############################### |
| # Check that COMMIT PREPARED is decoded properly on crash restart |
| # (subscriber only crash) |
| ############################### |
| |
| $node_publisher->safe_psql( |
| 'postgres', " |
| BEGIN; |
| INSERT INTO tab_full VALUES (14); |
| INSERT INTO tab_full VALUES (15); |
| PREPARE TRANSACTION 'test_prepared_tab';"); |
| |
| $node_subscriber->stop('immediate'); |
| $node_subscriber->start; |
| |
| # commit post the restart |
| $node_publisher->safe_psql('postgres', |
| "COMMIT PREPARED 'test_prepared_tab';"); |
| $node_publisher->wait_for_catchup($appname); |
| |
| # check inserts are visible |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM tab_full where a IN (14,15);"); |
| is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); |
| |
| ############################### |
| # Check that COMMIT PREPARED is decoded properly on crash restart |
| # (publisher only crash) |
| ############################### |
| |
| $node_publisher->safe_psql( |
| 'postgres', " |
| BEGIN; |
| INSERT INTO tab_full VALUES (16); |
| INSERT INTO tab_full VALUES (17); |
| PREPARE TRANSACTION 'test_prepared_tab';"); |
| |
| $node_publisher->stop('immediate'); |
| $node_publisher->start; |
| |
| # commit post the restart |
| $node_publisher->safe_psql('postgres', |
| "COMMIT PREPARED 'test_prepared_tab';"); |
| $node_publisher->wait_for_catchup($appname); |
| |
| # check inserts are visible |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM tab_full where a IN (16,17);"); |
| is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber'); |
| |
| ############################### |
| # Test nested transaction with 2PC |
| ############################### |
| |
| # check that 2PC gets replicated to subscriber |
| $node_publisher->safe_psql( |
| 'postgres', " |
| BEGIN; |
| INSERT INTO tab_full VALUES (21); |
| SAVEPOINT sp_inner; |
| INSERT INTO tab_full VALUES (22); |
| ROLLBACK TO SAVEPOINT sp_inner; |
| PREPARE TRANSACTION 'outer'; |
| "); |
| $node_publisher->wait_for_catchup($appname); |
| |
| # check that transaction is in prepared state on subscriber |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM pg_prepared_xacts;"); |
| is($result, qq(1), 'transaction is prepared on subscriber'); |
| |
| # COMMIT |
| $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'outer';"); |
| |
| $node_publisher->wait_for_catchup($appname); |
| |
| # check the transaction state on subscriber |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM pg_prepared_xacts;"); |
| is($result, qq(0), 'transaction is ended on subscriber'); |
| |
| # check inserts are visible. 22 should be rolled back. 21 should be committed. |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT a FROM tab_full where a IN (21,22);"); |
| is($result, qq(21), 'Rows committed are on the subscriber'); |
| |
| ############################### |
| # Test using empty GID |
| ############################### |
| |
| # check that 2PC gets replicated to subscriber |
| $node_publisher->safe_psql( |
| 'postgres', " |
| BEGIN; |
| INSERT INTO tab_full VALUES (51); |
| PREPARE TRANSACTION '';"); |
| $node_publisher->wait_for_catchup($appname); |
| |
| # check that transaction is in prepared state on subscriber |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM pg_prepared_xacts;"); |
| is($result, qq(1), 'transaction is prepared on subscriber'); |
| |
| # ROLLBACK |
| $node_publisher->safe_psql('postgres', "ROLLBACK PREPARED '';"); |
| |
| # check that 2PC gets aborted on subscriber |
| $node_publisher->wait_for_catchup($appname); |
| |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM pg_prepared_xacts;"); |
| is($result, qq(0), 'transaction is aborted on subscriber'); |
| |
| ############################### |
| # copy_data=false and two_phase |
| ############################### |
| |
| #create some test tables for copy tests |
| $node_publisher->safe_psql('postgres', |
| "CREATE TABLE tab_copy (a int PRIMARY KEY)"); |
| $node_publisher->safe_psql('postgres', |
| "INSERT INTO tab_copy SELECT generate_series(1,5);"); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE TABLE tab_copy (a int PRIMARY KEY)"); |
| $node_subscriber->safe_psql('postgres', "INSERT INTO tab_copy VALUES (88);"); |
| $result = |
| $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); |
| is($result, qq(1), 'initial data in subscriber table'); |
| |
| # Setup logical replication |
| $node_publisher->safe_psql('postgres', |
| "CREATE PUBLICATION tap_pub_copy FOR TABLE tab_copy;"); |
| |
| my $appname_copy = 'appname_copy'; |
| $node_subscriber->safe_psql( |
| 'postgres', " |
| CREATE SUBSCRIPTION tap_sub_copy |
| CONNECTION '$publisher_connstr application_name=$appname_copy' |
| PUBLICATION tap_pub_copy |
| WITH (two_phase=on, copy_data=false);"); |
| |
| # Wait for initial table sync to finish |
| $node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy); |
| |
| # Also wait for two-phase to be enabled |
| $node_subscriber->poll_query_until('postgres', $twophase_query) |
| or die "Timed out while waiting for subscriber to enable twophase"; |
| |
| # Check that the initial table data was NOT replicated (because we said copy_data=false) |
| $result = |
| $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); |
| is($result, qq(1), 'initial data in subscriber table'); |
| |
| # Now do a prepare on publisher and check that it IS replicated |
| $node_publisher->safe_psql( |
| 'postgres', " |
| BEGIN; |
| INSERT INTO tab_copy VALUES (99); |
| PREPARE TRANSACTION 'mygid';"); |
| |
| # Wait for both subscribers to catchup |
| $node_publisher->wait_for_catchup($appname_copy); |
| $node_publisher->wait_for_catchup($appname); |
| |
| # Check that the transaction has been prepared on the subscriber, there will be 2 |
| # prepared transactions for the 2 subscriptions. |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM pg_prepared_xacts;"); |
| is($result, qq(2), 'transaction is prepared on subscriber'); |
| |
| # Now commit the insert and verify that it IS replicated |
| $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'mygid';"); |
| |
| $result = |
| $node_publisher->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); |
| is($result, qq(6), 'publisher inserted data'); |
| |
| $node_publisher->wait_for_catchup($appname_copy); |
| |
| $result = |
| $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); |
| is($result, qq(2), 'replicated data in subscriber table'); |
| |
| $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_copy;"); |
| $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;"); |
| |
| ############################### |
| # check all the cleanup |
| ############################### |
| |
| $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); |
| |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM pg_subscription"); |
| is($result, qq(0), 'check subscription was dropped on subscriber'); |
| |
| $result = $node_publisher->safe_psql('postgres', |
| "SELECT count(*) FROM pg_replication_slots"); |
| is($result, qq(0), 'check replication slot was dropped on publisher'); |
| |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM pg_subscription_rel"); |
| is($result, qq(0), |
| 'check subscription relation status was dropped on subscriber'); |
| |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM pg_replication_origin"); |
| is($result, qq(0), 'check replication origin was dropped on subscriber'); |
| |
| $node_subscriber->stop('fast'); |
| $node_publisher->stop('fast'); |
| |
| done_testing(); |