blob: 4d83b461c2d6d22168a6b444d23ad473f3ca81ed [file] [log] [blame]
# Copyright (c) 2021-2023, PostgreSQL Global Development Group
# Tests for disable_on_error and SKIP transaction features.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
my $offset = 0;
# Test skipping the transaction. This function must be called after the caller
# has inserted data that conflicts with the subscriber. The finish LSN of the
# error transaction that is used to specify to ALTER SUBSCRIPTION ... SKIP is
# fetched from the server logs. After executing ALTER SUBSCRIPTION ... SKIP, we
# check if logical replication can continue working by inserting $nonconflict_data
# on the publisher.
sub test_skip_lsn
{
my ($node_publisher, $node_subscriber, $nonconflict_data, $expected, $msg)
= @_;
# Wait until a conflict occurs on the subscriber.
$node_subscriber->poll_query_until('postgres',
"SELECT subenabled = FALSE FROM pg_subscription WHERE subname = 'sub'"
);
# Get the finish LSN of the error transaction, mapping the expected
# ERROR with its CONTEXT when retrieving this information.
my $contents = slurp_file($node_subscriber->logfile, $offset);
$contents =~
qr/duplicate key value violates unique constraint "tbl_pkey".*\n.*DETAIL:.*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m
or die "could not get error-LSN";
my $lsn = $1;
# Set skip lsn.
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION sub SKIP (lsn = '$lsn')");
# Re-enable the subscription.
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
# Wait for the failed transaction to be skipped
$node_subscriber->poll_query_until('postgres',
"SELECT subskiplsn = '0/0' FROM pg_subscription WHERE subname = 'sub'"
);
# Check the log to ensure that the transaction is skipped, and advance the
# offset of the log file for the next test.
$offset = $node_subscriber->wait_for_log(
qr/LOG: ( [A-Z0-9]+:)? logical replication completed skipping transaction at LSN $lsn/,
$offset);
# Insert non-conflict data
$node_publisher->safe_psql('postgres',
"INSERT INTO tbl VALUES $nonconflict_data");
$node_publisher->wait_for_catchup('sub');
# Check replicated data
my $res =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
is($res, $expected, $msg);
}
# Create publisher node. Set a low value of logical_decoding_work_mem to test
# streaming cases.
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf(
'postgresql.conf',
qq[
logical_decoding_work_mem = 64kB
max_prepared_transactions = 10
]);
$node_publisher->start;
# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
$node_subscriber->append_conf(
'postgresql.conf',
qq[
max_prepared_transactions = 10
]);
$node_subscriber->start;
# Initial table setup on both publisher and subscriber. On the subscriber, we
# create the same tables but with a primary key. Also, insert some data that
# will conflict with the data replicated from publisher later.
$node_publisher->safe_psql(
'postgres',
qq[
CREATE TABLE tbl (i INT, t TEXT);
INSERT INTO tbl VALUES (1, NULL);
]);
$node_subscriber->safe_psql(
'postgres',
qq[
CREATE TABLE tbl (i INT PRIMARY KEY, t TEXT);
INSERT INTO tbl VALUES (1, NULL);
]);
# Create a pub/sub to set up logical replication. This tests that the
# uniqueness violation will cause the subscription to fail during initial
# synchronization and make it disabled.
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION pub FOR TABLE tbl");
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on)"
);
# Initial synchronization failure causes the subscription to be disabled.
$node_subscriber->poll_query_until('postgres',
"SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
) or die "Timed out while waiting for subscriber to be disabled";
# Truncate the table on the subscriber which caused the subscription to be
# disabled.
$node_subscriber->safe_psql('postgres', "TRUNCATE tbl");
# Re-enable the subscription "sub".
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
# Wait for the data to replicate.
$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub');
# Confirm that we have finished the table sync.
my $result =
$node_subscriber->safe_psql('postgres', "SELECT COUNT(*) FROM tbl");
is($result, qq(1), "subscription sub replicated data");
# Insert data to tbl, raising an error on the subscriber due to violation
# of the unique constraint on tbl. Then skip the transaction.
$node_publisher->safe_psql(
'postgres',
qq[
BEGIN;
INSERT INTO tbl VALUES (1, NULL);
COMMIT;
]);
test_skip_lsn($node_publisher, $node_subscriber,
"(2, NULL)", "2", "test skipping transaction");
# Test for PREPARE and COMMIT PREPARED. Insert the same data to tbl and
# PREPARE the transaction, raising an error. Then skip the transaction.
$node_publisher->safe_psql(
'postgres',
qq[
BEGIN;
INSERT INTO tbl VALUES (1, NULL);
PREPARE TRANSACTION 'gtx';
COMMIT PREPARED 'gtx';
]);
test_skip_lsn($node_publisher, $node_subscriber,
"(3, NULL)", "3", "test skipping prepare and commit prepared ");
# Test for STREAM COMMIT. Insert enough rows to tbl to exceed the 64kB
# limit, also raising an error on the subscriber during applying spooled
# changes for the same reason. Then skip the transaction.
$node_publisher->safe_psql(
'postgres',
qq[
BEGIN;
INSERT INTO tbl SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);
COMMIT;
]);
test_skip_lsn($node_publisher, $node_subscriber, "(4, md5(4::text))",
"4", "test skipping stream-commit");
$result = $node_subscriber->safe_psql('postgres',
"SELECT COUNT(*) FROM pg_prepared_xacts");
is($result, "0",
"check all prepared transactions are resolved on the subscriber");
$node_subscriber->stop;
$node_publisher->stop;
done_testing();