| # Copyright (c) 2022-2023, PostgreSQL Global Development Group |
| |
| # Test logical replication behavior with subscriber using available index |
| use strict; |
| use warnings; |
| use PostgreSQL::Test::Cluster; |
| use PostgreSQL::Test::Utils; |
| use Test::More; |
| |
| # create publisher node |
| my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); |
| $node_publisher->init(allows_streaming => 'logical'); |
| $node_publisher->start; |
| |
| # create subscriber node |
| my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); |
| $node_subscriber->init(allows_streaming => 'logical'); |
| $node_subscriber->start; |
| |
| my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; |
| my $appname = 'tap_sub'; |
| my $result = ''; |
| |
| # ============================================================================= |
| # Testcase start: Subscription can use index with multiple rows and columns |
| # |
| |
| # create tables pub and sub |
| $node_publisher->safe_psql('postgres', |
| "CREATE TABLE test_replica_id_full (x int, y text)"); |
| $node_publisher->safe_psql('postgres', |
| "ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL"); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE TABLE test_replica_id_full (x int, y text)"); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)"); |
| |
| # insert some initial data within the range 0-9 for x and y |
| $node_publisher->safe_psql('postgres', |
| "INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,10) i" |
| ); |
| |
| # create pub/sub |
| $node_publisher->safe_psql('postgres', |
| "CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full"); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" |
| ); |
| |
| # wait for initial table synchronization to finish |
| $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); |
| |
| # delete 2 rows |
| $node_publisher->safe_psql('postgres', |
| "DELETE FROM test_replica_id_full WHERE x IN (5, 6)"); |
| |
| # update 2 rows |
| $node_publisher->safe_psql('postgres', |
| "UPDATE test_replica_id_full SET x = 100, y = '200' WHERE x IN (1, 2)"); |
| |
| # wait until the index is used on the subscriber |
| $node_publisher->wait_for_catchup($appname); |
| $node_subscriber->poll_query_until('postgres', |
| q{select (idx_scan = 4) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';} |
| ) |
| or die |
| "Timed out while waiting for check subscriber tap_sub_rep_full updates 4 rows via index"; |
| |
| # make sure that the subscriber has the correct data after the UPDATE |
| $result = $node_subscriber->safe_psql('postgres', |
| "select count(*) from test_replica_id_full WHERE (x = 100 and y = '200')" |
| ); |
| is($result, qq(2), |
| 'ensure subscriber has the correct data at the end of the test'); |
| |
| # make sure that the subscriber has the correct data after the first DELETE |
| $result = $node_subscriber->safe_psql('postgres', |
| "select count(*) from test_replica_id_full where x in (5, 6)"); |
| is($result, qq(0), |
| 'ensure subscriber has the correct data at the end of the test'); |
| |
| # cleanup pub |
| $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); |
| $node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full"); |
| # cleanup sub |
| $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full"); |
| $node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full"); |
| |
| # Testcase end: Subscription can use index with multiple rows and columns |
| # ============================================================================= |
| |
| # ============================================================================= |
| # Testcase start: Subscription can use index on partitioned tables |
| |
| # create tables pub and sub |
| $node_publisher->safe_psql('postgres', |
| "CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1)" |
| ); |
| $node_publisher->safe_psql('postgres', |
| "CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10)" |
| ); |
| $node_publisher->safe_psql('postgres', |
| "CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20)" |
| ); |
| |
| $node_publisher->safe_psql('postgres', |
| "ALTER TABLE users_table_part REPLICA IDENTITY FULL"); |
| $node_publisher->safe_psql('postgres', |
| "ALTER TABLE users_table_part_0 REPLICA IDENTITY FULL"); |
| $node_publisher->safe_psql('postgres', |
| "ALTER TABLE users_table_part_1 REPLICA IDENTITY FULL"); |
| |
| $node_subscriber->safe_psql('postgres', |
| "CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1)" |
| ); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10)" |
| ); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20)" |
| ); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE INDEX users_table_part_idx ON users_table_part(user_id, value_1)" |
| ); |
| |
| # insert some initial data |
| $node_publisher->safe_psql('postgres', |
| "INSERT INTO users_table_part SELECT (i%100), (i%20), i FROM generate_series(0,100) i" |
| ); |
| |
| # create pub/sub |
| $node_publisher->safe_psql('postgres', |
| "CREATE PUBLICATION tap_pub_rep_full FOR TABLE users_table_part"); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" |
| ); |
| |
| # wait for initial table synchronization to finish |
| $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); |
| |
| # update rows, moving them to other partitions |
| $node_publisher->safe_psql('postgres', |
| "UPDATE users_table_part SET value_1 = 0 WHERE user_id = 4"); |
| |
| # delete rows from different partitions |
| $node_publisher->safe_psql('postgres', |
| "DELETE FROM users_table_part WHERE user_id = 1 and value_1 = 1"); |
| $node_publisher->safe_psql('postgres', |
| "DELETE FROM users_table_part WHERE user_id = 12 and value_1 = 12"); |
| |
| # wait until the index is used on the subscriber |
| $node_publisher->wait_for_catchup($appname); |
| $node_subscriber->poll_query_until('postgres', |
| q{select sum(idx_scan)=3 from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';} |
| ) |
| or die |
| "Timed out while waiting for check subscriber tap_sub_rep_full updates partitioned table"; |
| |
| # make sure that the subscriber has the correct data |
| $result = $node_subscriber->safe_psql('postgres', |
| "select sum(user_id+value_1+value_2) from users_table_part"); |
| is($result, qq(10907), |
| 'ensure subscriber has the correct data at the end of the test'); |
| $result = $node_subscriber->safe_psql('postgres', |
| "select count(DISTINCT(user_id,value_1, value_2)) from users_table_part"); |
| is($result, qq(99), |
| 'ensure subscriber has the correct data at the end of the test'); |
| |
| # cleanup pub |
| $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); |
| $node_publisher->safe_psql('postgres', "DROP TABLE users_table_part"); |
| |
| # cleanup sub |
| $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full"); |
| $node_subscriber->safe_psql('postgres', "DROP TABLE users_table_part"); |
| |
| # Testcase end: Subscription can use index on partitioned tables |
| # ============================================================================= |
| |
| # ============================================================================= |
| # Testcase start: Subscription will not use indexes with only expressions or |
| # partial index |
| |
| # create tables pub and sub |
| $node_publisher->safe_psql('postgres', |
| "CREATE TABLE people (firstname text, lastname text)"); |
| $node_publisher->safe_psql('postgres', |
| "ALTER TABLE people REPLICA IDENTITY FULL"); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE TABLE people (firstname text, lastname text)"); |
| |
| # index with only an expression |
| $node_subscriber->safe_psql('postgres', |
| "CREATE INDEX people_names_expr_only ON people ((firstname || ' ' || lastname))" |
| ); |
| |
| # partial index |
| $node_subscriber->safe_psql('postgres', |
| "CREATE INDEX people_names_partial ON people(firstname) WHERE (firstname = 'first_name_1')" |
| ); |
| |
| # insert some initial data |
| $node_publisher->safe_psql('postgres', |
| "INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0,200) i" |
| ); |
| |
| # create pub/sub |
| $node_publisher->safe_psql('postgres', |
| "CREATE PUBLICATION tap_pub_rep_full FOR TABLE people"); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" |
| ); |
| |
| # wait for initial table synchronization to finish |
| $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); |
| |
| # update 2 rows |
| $node_publisher->safe_psql('postgres', |
| "UPDATE people SET firstname = 'no-name' WHERE firstname = 'first_name_1'" |
| ); |
| $node_publisher->safe_psql('postgres', |
| "UPDATE people SET firstname = 'no-name' WHERE firstname = 'first_name_2' AND lastname = 'last_name_2'" |
| ); |
| |
| # make sure none of the indexes is used on the subscriber |
| $node_publisher->wait_for_catchup($appname); |
| $result = $node_subscriber->safe_psql('postgres', |
| "select sum(idx_scan) from pg_stat_all_indexes where indexrelname IN ('people_names_expr_only', 'people_names_partial')" |
| ); |
| is($result, qq(0), |
| 'ensure subscriber tap_sub_rep_full updates two rows via seq. scan with index on expressions' |
| ); |
| |
| $node_publisher->safe_psql('postgres', |
| "DELETE FROM people WHERE firstname = 'first_name_3'"); |
| $node_publisher->safe_psql('postgres', |
| "DELETE FROM people WHERE firstname = 'first_name_4' AND lastname = 'last_name_4'" |
| ); |
| |
| # make sure the index is not used on the subscriber |
| $node_publisher->wait_for_catchup($appname); |
| $result = $node_subscriber->safe_psql('postgres', |
| "select sum(idx_scan) from pg_stat_all_indexes where indexrelname IN ('people_names_expr_only', 'people_names_partial')" |
| ); |
| is($result, qq(0), |
| 'ensure subscriber tap_sub_rep_full updates two rows via seq. scan with index on expressions' |
| ); |
| |
| # make sure that the subscriber has the correct data |
| $result = |
| $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM people"); |
| is($result, qq(199), |
| 'ensure subscriber has the correct data at the end of the test'); |
| |
| # cleanup pub |
| $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); |
| $node_publisher->safe_psql('postgres', "DROP TABLE people"); |
| # cleanup sub |
| $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full"); |
| $node_subscriber->safe_psql('postgres', "DROP TABLE people"); |
| |
| # Testcase end: Subscription will not use indexes with only expressions or |
| # partial index |
| # ============================================================================= |
| |
| # ============================================================================= |
| # Testcase start: Subscription can use index having expressions and columns |
| |
| # create tables pub and sub |
| $node_publisher->safe_psql('postgres', |
| "CREATE TABLE people (firstname text, lastname text)"); |
| $node_publisher->safe_psql('postgres', |
| "ALTER TABLE people REPLICA IDENTITY FULL"); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE TABLE people (firstname text, lastname text)"); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE INDEX people_names ON people (firstname, lastname, (firstname || ' ' || lastname))" |
| ); |
| |
| # insert some initial data |
| $node_publisher->safe_psql('postgres', |
| "INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0, 20) i" |
| ); |
| |
| # create pub/sub |
| $node_publisher->safe_psql('postgres', |
| "CREATE PUBLICATION tap_pub_rep_full FOR TABLE people"); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" |
| ); |
| |
| # wait for initial table synchronization to finish |
| $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); |
| |
| # update 1 row |
| $node_publisher->safe_psql('postgres', |
| "UPDATE people SET firstname = 'no-name' WHERE firstname = 'first_name_1'" |
| ); |
| |
| # delete the updated row |
| $node_publisher->safe_psql('postgres', |
| "DELETE FROM people WHERE firstname = 'no-name'"); |
| |
| # wait until the index is used on the subscriber |
| $node_publisher->wait_for_catchup($appname); |
| $node_subscriber->poll_query_until('postgres', |
| q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'people_names';} |
| ) |
| or die |
| "Timed out while waiting for check subscriber tap_sub_rep_full deletes two rows via index scan with index on expressions and columns"; |
| |
| # make sure that the subscriber has the correct data |
| $result = |
| $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM people"); |
| is($result, qq(20), |
| 'ensure subscriber has the correct data at the end of the test'); |
| |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM people WHERE firstname = 'no-name'"); |
| is($result, qq(0), |
| 'ensure subscriber has the correct data at the end of the test'); |
| |
| # now, drop the index with the expression, we'll use sequential scan |
| $node_subscriber->safe_psql('postgres', "DROP INDEX people_names"); |
| |
| # delete 1 row |
| $node_publisher->safe_psql('postgres', |
| "DELETE FROM people WHERE lastname = 'last_name_18'"); |
| |
| # make sure that the subscriber has the correct data |
| $node_publisher->wait_for_catchup($appname); |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM people WHERE lastname = 'last_name_18'"); |
| is($result, qq(0), |
| 'ensure subscriber has the correct data at the end of the test'); |
| |
| # cleanup pub |
| $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); |
| $node_publisher->safe_psql('postgres', "DROP TABLE people"); |
| # cleanup sub |
| $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full"); |
| $node_subscriber->safe_psql('postgres', "DROP TABLE people"); |
| |
| # Testcase end: Subscription can use index having expressions and columns |
| # ============================================================================= |
| |
| # ============================================================================= |
| # Testcase start: Null values and missing column |
| |
| $node_publisher->safe_psql('postgres', |
| "CREATE TABLE test_replica_id_full (x int)"); |
| |
| $node_publisher->safe_psql('postgres', |
| "ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL"); |
| |
| $node_subscriber->safe_psql('postgres', |
| "CREATE TABLE test_replica_id_full (x int, y int)"); |
| |
| $node_subscriber->safe_psql('postgres', |
| "CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)"); |
| |
| # create pub/sub |
| $node_publisher->safe_psql('postgres', |
| "CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full"); |
| |
| $node_subscriber->safe_psql('postgres', |
| "CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" |
| ); |
| |
| # wait for initial table synchronization to finish |
| $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); |
| |
| # load some data, and update 2 tuples |
| $node_publisher->safe_psql('postgres', |
| "INSERT INTO test_replica_id_full VALUES (1), (2), (3)"); |
| $node_publisher->safe_psql('postgres', |
| "UPDATE test_replica_id_full SET x = x + 1 WHERE x = 1"); |
| |
| # check if the index is used even when the index has NULL values |
| $node_publisher->wait_for_catchup($appname); |
| $node_subscriber->poll_query_until('postgres', |
| q{select idx_scan=1 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';} |
| ) |
| or die |
| "Timed out while waiting for check subscriber tap_sub_rep_full updates test_replica_id_full table"; |
| |
| # make sure that the subscriber has the correct data |
| $result = $node_subscriber->safe_psql('postgres', |
| "select sum(x) from test_replica_id_full WHERE y IS NULL"); |
| is($result, qq(7), |
| 'ensure subscriber has the correct data at the end of the test'); |
| |
| # make sure that the subscriber has the correct data |
| $result = $node_subscriber->safe_psql('postgres', |
| "select count(*) from test_replica_id_full WHERE y IS NULL"); |
| is($result, qq(3), |
| 'ensure subscriber has the correct data at the end of the test'); |
| |
| # cleanup pub |
| $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); |
| $node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full"); |
| |
| # cleanup sub |
| $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full"); |
| $node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full"); |
| |
| # Testcase end: Null values And missing column |
| # ============================================================================= |
| |
| # ============================================================================= |
| # Testcase start: Subscription using a unique index when Pub/Sub has different |
| # data |
| # |
| # The subscriber has duplicate tuples that publisher does not have. When |
| # publisher updates/deletes 1 row, subscriber uses indexes and updates/deletes |
| # exactly 1 row. |
| # |
| |
| # create tables pub and sub |
| $node_publisher->safe_psql('postgres', |
| "CREATE TABLE test_replica_id_full (x int, y int)"); |
| $node_publisher->safe_psql('postgres', |
| "ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL"); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE TABLE test_replica_id_full (x int, y int)"); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE UNIQUE INDEX test_replica_id_full_idxy ON test_replica_id_full(x,y)" |
| ); |
| |
| # insert some initial data |
| $node_publisher->safe_psql('postgres', |
| "INSERT INTO test_replica_id_full SELECT i, i FROM generate_series(0,21) i" |
| ); |
| |
| # create pub/sub |
| $node_publisher->safe_psql('postgres', |
| "CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full"); |
| $node_subscriber->safe_psql('postgres', |
| "CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full" |
| ); |
| |
| # wait for initial table synchronization to finish |
| $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); |
| |
| # duplicate the data in subscriber for y column |
| $node_subscriber->safe_psql('postgres', |
| "INSERT INTO test_replica_id_full SELECT i+100, i FROM generate_series(0,21) i" |
| ); |
| |
| # now, we update only 1 row on the publisher and expect the subscriber to only |
| # update 1 row although there are two tuples with y = 15 on the subscriber |
| $node_publisher->safe_psql('postgres', |
| "UPDATE test_replica_id_full SET x = 2000 WHERE y = 15"); |
| |
| # wait until the index is used on the subscriber |
| $node_publisher->wait_for_catchup($appname); |
| $node_subscriber->poll_query_until('postgres', |
| q{select (idx_scan = 1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idxy';} |
| ) |
| or die |
| "Timed out while waiting for check subscriber tap_sub_rep_full updates one row via index"; |
| |
| # make sure that the subscriber has the correct data |
| # we only updated 1 row |
| $result = $node_subscriber->safe_psql('postgres', |
| "SELECT count(*) FROM test_replica_id_full WHERE x = 2000"); |
| is($result, qq(1), |
| 'ensure subscriber has the correct data at the end of the test'); |
| |
| # cleanup pub |
| $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full"); |
| $node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full"); |
| # cleanup sub |
| $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full"); |
| $node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full"); |
| |
| # Testcase start: Subscription using a unique index when Pub/Sub has different |
| # data |
| # ============================================================================= |
| |
| $node_subscriber->stop('fast'); |
| $node_publisher->stop('fast'); |
| |
| done_testing(); |