| -- predictability |
| SET synchronous_commit = on; |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding'); |
| ?column? |
| ---------- |
| init |
| (1 row) |
| |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t', 'test_decoding', true); |
| ?column? |
| ---------- |
| init |
| (1 row) |
| |
| SELECT pg_drop_replication_slot('regression_slot_p'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding', false); |
| ?column? |
| ---------- |
| init |
| (1 row) |
| |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'test_decoding', true); |
| ?column? |
| ---------- |
| init |
| (1 row) |
| |
| SELECT pg_create_logical_replication_slot('foo', 'nonexistent'); |
| ERROR: could not access file "nonexistent": No such file or directory |
| -- here we want to start a new session and wait till old one is gone |
| select pg_backend_pid() as oldpid \gset |
| \c - |
| SET synchronous_commit = on; |
| do 'declare c int = 0; |
| begin |
| while (select count(*) from pg_replication_slots where active_pid = ' |
| :'oldpid' |
| ') > 0 loop c := c + 1; perform pg_sleep(0.01); end loop; |
| raise log ''slot test looped % times'', c; |
| end'; |
| -- should fail because the temporary slots were dropped automatically |
| SELECT pg_drop_replication_slot('regression_slot_t'); |
| ERROR: replication slot "regression_slot_t" does not exist |
| SELECT pg_drop_replication_slot('regression_slot_t2'); |
| ERROR: replication slot "regression_slot_t2" does not exist |
| -- permanent slot has survived |
| SELECT pg_drop_replication_slot('regression_slot_p'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| -- test switching between slots in a session |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true); |
| ?column? |
| ---------- |
| init |
| (1 row) |
| |
| CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120)); |
| BEGIN; |
| INSERT INTO replication_example(somedata, text) VALUES (1, 1); |
| INSERT INTO replication_example(somedata, text) VALUES (1, 2); |
| COMMIT; |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding', true); |
| ?column? |
| ---------- |
| init |
| (1 row) |
| |
| INSERT INTO replication_example(somedata, text) VALUES (1, 3); |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| data |
| --------------------------------------------------------------------------------------------------------- |
| BEGIN |
| table public.replication_example: INSERT: id[integer]:1 somedata[integer]:1 text[character varying]:'1' |
| table public.replication_example: INSERT: id[integer]:2 somedata[integer]:1 text[character varying]:'2' |
| COMMIT |
| BEGIN |
| table public.replication_example: INSERT: id[integer]:3 somedata[integer]:1 text[character varying]:'3' |
| COMMIT |
| (7 rows) |
| |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| data |
| --------------------------------------------------------------------------------------------------------- |
| BEGIN |
| table public.replication_example: INSERT: id[integer]:3 somedata[integer]:1 text[character varying]:'3' |
| COMMIT |
| (3 rows) |
| |
| INSERT INTO replication_example(somedata, text) VALUES (1, 4); |
| INSERT INTO replication_example(somedata, text) VALUES (1, 5); |
| SELECT pg_current_wal_lsn() AS wal_lsn \gset |
| INSERT INTO replication_example(somedata, text) VALUES (1, 6); |
| SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset |
| SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn()); |
| slot_name |
| ------------------ |
| regression_slot2 |
| (1 row) |
| |
| SELECT :'wal_lsn' = :'end_lsn'; |
| ?column? |
| ---------- |
| t |
| (1 row) |
| |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| data |
| --------------------------------------------------------------------------------------------------------- |
| BEGIN |
| table public.replication_example: INSERT: id[integer]:6 somedata[integer]:1 text[character varying]:'6' |
| COMMIT |
| (3 rows) |
| |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| data |
| ------ |
| (0 rows) |
| |
| DROP TABLE replication_example; |
| -- error |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true); |
| ERROR: replication slot "regression_slot1" already exists |
| -- both should error as they should be dropped on error |
| SELECT pg_drop_replication_slot('regression_slot1'); |
| ERROR: replication slot "regression_slot1" does not exist |
| SELECT pg_drop_replication_slot('regression_slot2'); |
| ERROR: replication slot "regression_slot2" does not exist |
| -- slot advance with physical slot, error with non-reserved slot |
| SELECT slot_name FROM pg_create_physical_replication_slot('regression_slot3'); |
| slot_name |
| ------------------ |
| regression_slot3 |
| (1 row) |
| |
| SELECT pg_replication_slot_advance('regression_slot3', '0/0'); -- invalid LSN |
| ERROR: invalid target WAL LSN |
| SELECT pg_replication_slot_advance('regression_slot3', '0/1'); -- error |
| ERROR: replication slot "regression_slot3" cannot be advanced |
| DETAIL: This slot has never previously reserved WAL, or it has been invalidated. |
| SELECT pg_drop_replication_slot('regression_slot3'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| -- |
| -- Test copy functions for logical replication slots |
| -- |
| -- Create and copy logical slots |
| SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false); |
| ?column? |
| ---------- |
| init |
| (1 row) |
| |
| SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change'); |
| ?column? |
| ---------- |
| copy |
| (1 row) |
| |
| SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput'); |
| ?column? |
| ---------- |
| copy |
| (1 row) |
| |
| SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput'); |
| ?column? |
| ---------- |
| copy |
| (1 row) |
| |
| -- Check all copied slots status |
| SELECT |
| o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary |
| FROM |
| (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o |
| LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn |
| WHERE |
| o.slot_name != c.slot_name |
| ORDER BY o.slot_name, c.slot_name; |
| slot_name | plugin | temporary | slot_name | plugin | temporary |
| ------------+---------------+-----------+---------------------------------+---------------+----------- |
| orig_slot1 | test_decoding | f | copied_slot1_change_plugin | pgoutput | f |
| orig_slot1 | test_decoding | f | copied_slot1_change_plugin_temp | pgoutput | t |
| orig_slot1 | test_decoding | f | copied_slot1_no_change | test_decoding | f |
| (3 rows) |
| |
| -- Now we have maximum 4 replication slots. Check slots are properly |
| -- released even when raise error during creating the target slot. |
| SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error |
| ERROR: all replication slots are in use |
| HINT: Free one or increase max_replication_slots. |
| -- temporary slots were dropped automatically |
| SELECT pg_drop_replication_slot('orig_slot1'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| SELECT pg_drop_replication_slot('copied_slot1_no_change'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| SELECT pg_drop_replication_slot('copied_slot1_change_plugin'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| -- Test based on the temporary logical slot |
| SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true); |
| ?column? |
| ---------- |
| init |
| (1 row) |
| |
| SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change'); |
| ?column? |
| ---------- |
| copy |
| (1 row) |
| |
| SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', true, 'pgoutput'); |
| ?column? |
| ---------- |
| copy |
| (1 row) |
| |
| SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', false, 'pgoutput'); |
| ?column? |
| ---------- |
| copy |
| (1 row) |
| |
| -- Check all copied slots status |
| SELECT |
| o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary |
| FROM |
| (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o |
| LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn |
| WHERE |
| o.slot_name != c.slot_name |
| ORDER BY o.slot_name, c.slot_name; |
| slot_name | plugin | temporary | slot_name | plugin | temporary |
| ------------+---------------+-----------+---------------------------------+---------------+----------- |
| orig_slot2 | test_decoding | t | copied_slot2_change_plugin | pgoutput | t |
| orig_slot2 | test_decoding | t | copied_slot2_change_plugin_temp | pgoutput | f |
| orig_slot2 | test_decoding | t | copied_slot2_no_change | test_decoding | t |
| (3 rows) |
| |
| -- Cannot copy a logical slot to a physical slot |
| SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error |
| ERROR: cannot copy physical replication slot "orig_slot2" as a logical replication slot |
| -- temporary slots were dropped automatically |
| SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| -- |
| -- Test copy functions for physical replication slots |
| -- |
| -- Create and copy physical slots |
| SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true); |
| ?column? |
| ---------- |
| init |
| (1 row) |
| |
| SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false); |
| ?column? |
| ---------- |
| init |
| (1 row) |
| |
| SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change'); |
| ?column? |
| ---------- |
| copy |
| (1 row) |
| |
| SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true); |
| ?column? |
| ---------- |
| copy |
| (1 row) |
| |
| -- Check all copied slots status. Since all slots don't reserve WAL we check only other fields. |
| SELECT slot_name, slot_type, temporary FROM pg_replication_slots; |
| slot_name | slot_type | temporary |
| ------------------------+-----------+----------- |
| orig_slot1 | physical | f |
| orig_slot2 | physical | f |
| copied_slot1_no_change | physical | f |
| copied_slot1_temp | physical | t |
| (4 rows) |
| |
| -- Cannot copy a physical slot to a logical slot |
| SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error |
| ERROR: cannot copy logical replication slot "orig_slot1" as a physical replication slot |
| -- Cannot copy a physical slot that doesn't reserve WAL |
| SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error |
| ERROR: cannot copy a replication slot that doesn't reserve WAL |
| -- temporary slots were dropped automatically |
| SELECT pg_drop_replication_slot('orig_slot1'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| SELECT pg_drop_replication_slot('orig_slot2'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| SELECT pg_drop_replication_slot('copied_slot1_no_change'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| -- Test based on the temporary physical slot |
| SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true); |
| ?column? |
| ---------- |
| init |
| (1 row) |
| |
| SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change'); |
| ?column? |
| ---------- |
| copy |
| (1 row) |
| |
| SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false); |
| ?column? |
| ---------- |
| copy |
| (1 row) |
| |
| -- Check all copied slots status |
| SELECT |
| o.slot_name, o.temporary, c.slot_name, c.temporary |
| FROM |
| (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o |
| LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn |
| WHERE |
| o.slot_name != c.slot_name |
| ORDER BY o.slot_name, c.slot_name; |
| slot_name | temporary | slot_name | temporary |
| ------------+-----------+------------------------+----------- |
| orig_slot2 | t | copied_slot2_no_change | t |
| orig_slot2 | t | copied_slot2_notemp | f |
| (2 rows) |
| |
| SELECT pg_drop_replication_slot('orig_slot2'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| SELECT pg_drop_replication_slot('copied_slot2_no_change'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| SELECT pg_drop_replication_slot('copied_slot2_notemp'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |