| |
| # Copyright (c) 2021-2023, PostgreSQL Global Development Group |
| |
| # Tests that logical decoding messages |
| use strict; |
| use warnings; |
| use PostgreSQL::Test::Cluster; |
| use PostgreSQL::Test::Utils; |
| use Test::More; |
| |
| # Create publisher node |
| my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); |
| $node_publisher->init(allows_streaming => 'logical'); |
| $node_publisher->append_conf('postgresql.conf', 'autovacuum = off'); |
| $node_publisher->start; |
| |
| # Create subscriber node |
| my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); |
| $node_subscriber->init(allows_streaming => 'logical'); |
| $node_subscriber->start; |
| |
| # Create some preexisting content on publisher |
| $node_publisher->safe_psql('postgres', |
| "CREATE TABLE tab_test (a int primary key)"); |
| |
| # Setup structure on subscriber |
| $node_subscriber->safe_psql('postgres', |
| "CREATE TABLE tab_test (a int primary key)"); |
| |
| # Setup logical replication |
| my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; |
| $node_publisher->safe_psql('postgres', |
| "CREATE PUBLICATION tap_pub FOR TABLE tab_test"); |
| |
| $node_subscriber->safe_psql('postgres', |
| "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" |
| ); |
| |
| $node_publisher->wait_for_catchup('tap_sub'); |
| |
| # Ensure a transactional logical decoding message shows up on the slot |
| $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); |
| |
| # wait for the replication slot to become inactive on the publisher |
| $node_publisher->poll_query_until( |
| 'postgres', |
| "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'tap_sub' AND active='f'", |
| 1); |
| |
| $node_publisher->safe_psql('postgres', |
| "SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')" |
| ); |
| |
| my $result = $node_publisher->safe_psql( |
| 'postgres', qq( |
| SELECT get_byte(data, 0) |
| FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, |
| 'proto_version', '1', |
| 'publication_names', 'tap_pub', |
| 'messages', 'true') |
| )); |
| |
| # 66 77 67 == B M C == BEGIN MESSAGE COMMIT |
| is( $result, qq(66 |
| 77 |
| 67), |
| 'messages on slot are B M C with message option'); |
| |
| $result = $node_publisher->safe_psql( |
| 'postgres', qq( |
| SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape') |
| FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, |
| 'proto_version', '1', |
| 'publication_names', 'tap_pub', |
| 'messages', 'true') |
| OFFSET 1 LIMIT 1 |
| )); |
| |
| is($result, qq(1|pgoutput), |
| "flag transactional is set to 1 and prefix is pgoutput"); |
| |
| $result = $node_publisher->safe_psql( |
| 'postgres', qq( |
| SELECT get_byte(data, 0) |
| FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL, |
| 'proto_version', '1', |
| 'publication_names', 'tap_pub') |
| )); |
| |
| # no message and no BEGIN and COMMIT because of empty transaction optimization |
| is($result, qq(), |
| 'option messages defaults to false so message (M) is not available on slot' |
| ); |
| |
| $node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)"); |
| |
| my $message_lsn = $node_publisher->safe_psql('postgres', |
| "SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')" |
| ); |
| |
| $node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)"); |
| |
| $result = $node_publisher->safe_psql( |
| 'postgres', qq( |
| SELECT get_byte(data, 0), get_byte(data, 1) |
| FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL, |
| 'proto_version', '1', |
| 'publication_names', 'tap_pub', |
| 'messages', 'true') |
| WHERE lsn = '$message_lsn' AND xid = 0 |
| )); |
| |
| is($result, qq(77|0), 'non-transactional message on slot is M'); |
| |
| # Ensure a non-transactional logical decoding message shows up on the slot when |
| # it is emitted within an aborted transaction. The message won't emit until |
| # something advances the LSN, hence, we intentionally forces the server to |
| # switch to a new WAL file. |
| $node_publisher->safe_psql( |
| 'postgres', qq( |
| BEGIN; |
| SELECT pg_logical_emit_message(false, 'pgoutput', |
| 'a non-transactional message is available even if the transaction is aborted 1'); |
| INSERT INTO tab_test VALUES (3); |
| SELECT pg_logical_emit_message(true, 'pgoutput', |
| 'a transactional message is not available if the transaction is aborted'); |
| SELECT pg_logical_emit_message(false, 'pgoutput', |
| 'a non-transactional message is available even if the transaction is aborted 2'); |
| ROLLBACK; |
| SELECT pg_switch_wal(); |
| )); |
| |
| $result = $node_publisher->safe_psql( |
| 'postgres', qq( |
| SELECT get_byte(data, 0), get_byte(data, 1) |
| FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, |
| 'proto_version', '1', |
| 'publication_names', 'tap_pub', |
| 'messages', 'true') |
| )); |
| |
| is( $result, qq(77|0 |
| 77|0), |
| 'non-transactional message on slot from aborted transaction is M'); |
| |
| $node_subscriber->stop('fast'); |
| $node_publisher->stop('fast'); |
| |
| done_testing(); |