| -- predictability |
| SET synchronous_commit = on; |
| -- superuser required by default |
| CREATE ROLE regress_origin_replication REPLICATION; |
| SET ROLE regress_origin_replication; |
| SELECT pg_replication_origin_advance('regress_test_decoding: perm', '0/1'); |
| ERROR: permission denied for function pg_replication_origin_advance |
| SELECT pg_replication_origin_create('regress_test_decoding: perm'); |
| ERROR: permission denied for function pg_replication_origin_create |
| SELECT pg_replication_origin_drop('regress_test_decoding: perm'); |
| ERROR: permission denied for function pg_replication_origin_drop |
| SELECT pg_replication_origin_oid('regress_test_decoding: perm'); |
| ERROR: permission denied for function pg_replication_origin_oid |
| SELECT pg_replication_origin_progress('regress_test_decoding: perm', false); |
| ERROR: permission denied for function pg_replication_origin_progress |
| SELECT pg_replication_origin_session_is_setup(); |
| ERROR: permission denied for function pg_replication_origin_session_is_setup |
| SELECT pg_replication_origin_session_progress(false); |
| ERROR: permission denied for function pg_replication_origin_session_progress |
| SELECT pg_replication_origin_session_reset(); |
| ERROR: permission denied for function pg_replication_origin_session_reset |
| SELECT pg_replication_origin_session_setup('regress_test_decoding: perm'); |
| ERROR: permission denied for function pg_replication_origin_session_setup |
| SELECT pg_replication_origin_xact_reset(); |
| ERROR: permission denied for function pg_replication_origin_xact_reset |
| SELECT pg_replication_origin_xact_setup('0/1', '2013-01-01 00:00'); |
| ERROR: permission denied for function pg_replication_origin_xact_setup |
| SELECT pg_show_replication_origin_status(); |
| ERROR: permission denied for function pg_show_replication_origin_status |
| RESET ROLE; |
| DROP ROLE regress_origin_replication; |
| CREATE TABLE origin_tbl(id serial primary key, data text); |
| CREATE TABLE target_tbl(id serial primary key, data text); |
| SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); |
| pg_replication_origin_create |
| ------------------------------ |
| 1 |
| (1 row) |
| |
| -- ensure duplicate creations fail |
| SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); |
| ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index" |
| DETAIL: Key (roname)=(regress_test_decoding: regression_slot) already exists. |
| --ensure deletions work (once) |
| SELECT pg_replication_origin_create('regress_test_decoding: temp'); |
| pg_replication_origin_create |
| ------------------------------ |
| 2 |
| (1 row) |
| |
| SELECT pg_replication_origin_drop('regress_test_decoding: temp'); |
| pg_replication_origin_drop |
| ---------------------------- |
| |
| (1 row) |
| |
| SELECT pg_replication_origin_drop('regress_test_decoding: temp'); |
| ERROR: replication origin "regress_test_decoding: temp" does not exist |
| -- various failure checks for undefined slots |
| select pg_replication_origin_advance('regress_test_decoding: temp', '0/1'); |
| ERROR: replication origin "regress_test_decoding: temp" does not exist |
| select pg_replication_origin_session_setup('regress_test_decoding: temp'); |
| ERROR: replication origin "regress_test_decoding: temp" does not exist |
| select pg_replication_origin_progress('regress_test_decoding: temp', true); |
| ERROR: replication origin "regress_test_decoding: temp" does not exist |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); |
| ?column? |
| ---------- |
| init |
| (1 row) |
| |
| -- origin tx |
| INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again'); |
| INSERT INTO target_tbl(data) |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| -- as is normal, the insert into target_tbl shows up |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| data |
| ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| BEGIN |
| table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN' |
| table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again''' |
| table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT' |
| COMMIT |
| (5 rows) |
| |
| INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again'); |
| -- mark session as replaying |
| SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot'); |
| pg_replication_origin_session_setup |
| ------------------------------------- |
| |
| (1 row) |
| |
| -- ensure we prevent duplicate setup |
| SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot'); |
| ERROR: cannot setup replication origin when one is already setup |
| SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded'); |
| ?column? |
| ---------- |
| |
| (1 row) |
| |
| BEGIN; |
| -- setup transaction origin |
| SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00'); |
| pg_replication_origin_xact_setup |
| ---------------------------------- |
| |
| (1 row) |
| |
| INSERT INTO target_tbl(data) |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); |
| COMMIT; |
| -- check replication progress for the session is correct |
| SELECT pg_replication_origin_session_progress(false); |
| pg_replication_origin_session_progress |
| ---------------------------------------- |
| 0/AABBCCDD |
| (1 row) |
| |
| SELECT pg_replication_origin_session_progress(true); |
| pg_replication_origin_session_progress |
| ---------------------------------------- |
| 0/AABBCCDD |
| (1 row) |
| |
| SELECT pg_replication_origin_session_reset(); |
| pg_replication_origin_session_reset |
| ------------------------------------- |
| |
| (1 row) |
| |
| SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status; |
| local_id | external_id | remote_lsn | ?column? |
| ----------+----------------------------------------+------------+---------- |
| 1 | regress_test_decoding: regression_slot | 0/AABBCCDD | t |
| (1 row) |
| |
| -- check replication progress identified by name is correct |
| SELECT pg_replication_origin_progress('regress_test_decoding: regression_slot', false); |
| pg_replication_origin_progress |
| -------------------------------- |
| 0/AABBCCDD |
| (1 row) |
| |
| SELECT pg_replication_origin_progress('regress_test_decoding: regression_slot', true); |
| pg_replication_origin_progress |
| -------------------------------- |
| 0/AABBCCDD |
| (1 row) |
| |
| -- ensure reset requires previously setup state |
| SELECT pg_replication_origin_session_reset(); |
| ERROR: no replication origin is configured |
| -- and magically the replayed xact will be filtered! |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); |
| data |
| ------ |
| (0 rows) |
| |
| --but new original changes still show up |
| INSERT INTO origin_tbl(data) VALUES ('will be replicated'); |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); |
| data |
| -------------------------------------------------------------------------------- |
| BEGIN |
| table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated' |
| COMMIT |
| (3 rows) |
| |
| SELECT pg_drop_replication_slot('regression_slot'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot'); |
| pg_replication_origin_drop |
| ---------------------------- |
| |
| (1 row) |
| |
| -- Set of transactions with no origin LSNs and commit timestamps set for |
| -- this session. |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_no_lsn', 'test_decoding'); |
| ?column? |
| ---------- |
| init |
| (1 row) |
| |
| SELECT pg_replication_origin_create('regress_test_decoding: regression_slot_no_lsn'); |
| pg_replication_origin_create |
| ------------------------------ |
| 1 |
| (1 row) |
| |
| -- mark session as replaying |
| SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot_no_lsn'); |
| pg_replication_origin_session_setup |
| ------------------------------------- |
| |
| (1 row) |
| |
| -- Simple transactions |
| BEGIN; |
| INSERT INTO origin_tbl(data) VALUES ('no_lsn, commit'); |
| COMMIT; |
| BEGIN; |
| INSERT INTO origin_tbl(data) VALUES ('no_lsn, rollback'); |
| ROLLBACK; |
| -- 2PC transactions |
| BEGIN; |
| INSERT INTO origin_tbl(data) VALUES ('no_lsn, commit prepared'); |
| PREPARE TRANSACTION 'replorigin_prepared'; |
| COMMIT PREPARED 'replorigin_prepared'; |
| BEGIN; |
| INSERT INTO origin_tbl(data) VALUES ('no_lsn, rollback prepared'); |
| PREPARE TRANSACTION 'replorigin_prepared'; |
| ROLLBACK PREPARED 'replorigin_prepared'; |
| SELECT local_id, external_id, |
| remote_lsn <> '0/0' AS valid_remote_lsn, |
| local_lsn <> '0/0' AS valid_local_lsn |
| FROM pg_replication_origin_status; |
| local_id | external_id | valid_remote_lsn | valid_local_lsn |
| ----------+-----------------------------------------------+------------------+----------------- |
| 1 | regress_test_decoding: regression_slot_no_lsn | f | t |
| (1 row) |
| |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); |
| data |
| ------------------------------------------------------------------------------------- |
| BEGIN |
| table public.origin_tbl: INSERT: id[integer]:4 data[text]:'no_lsn, commit' |
| COMMIT |
| BEGIN |
| table public.origin_tbl: INSERT: id[integer]:6 data[text]:'no_lsn, commit prepared' |
| COMMIT |
| (6 rows) |
| |
| -- Clean up |
| SELECT pg_replication_origin_session_reset(); |
| pg_replication_origin_session_reset |
| ------------------------------------- |
| |
| (1 row) |
| |
| SELECT pg_drop_replication_slot('regression_slot_no_lsn'); |
| pg_drop_replication_slot |
| -------------------------- |
| |
| (1 row) |
| |
| SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn'); |
| pg_replication_origin_drop |
| ---------------------------- |
| |
| (1 row) |
| |