| -- predictability |
| SET synchronous_commit = on; |
| |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); |
| -- fail because of an already existing slot |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); |
| -- fail because of an invalid name |
| SELECT 'init' FROM pg_create_logical_replication_slot('Invalid Name', 'test_decoding'); |
| |
| -- fail twice because of an invalid parameter values |
| SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', 'frakbar'); |
| SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'nonexistent-option', 'frakbar'); |
| SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', 'frakbar'); |
| |
| -- succeed once |
| SELECT pg_drop_replication_slot('regression_slot'); |
| -- fail |
| SELECT pg_drop_replication_slot('regression_slot'); |
| |
| -- check that we're detecting a streaming rep slot used for logical decoding |
| SELECT 'init' FROM pg_create_physical_replication_slot('repl'); |
| SELECT data FROM pg_logical_slot_get_changes('repl', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| SELECT pg_drop_replication_slot('repl'); |
| |
| |
| SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); |
| |
| /* check whether status function reports us, only reproduceable columns */ |
| SELECT slot_name, plugin, slot_type, active, |
| NOT catalog_xmin IS NULL AS catalog_xmin_set, |
| xmin IS NULl AS data_xmin_not_set, |
| pg_wal_lsn_diff(restart_lsn, '0/01000000') > 0 AS some_wal |
| FROM pg_replication_slots; |
| |
| /* |
| * Check that changes are handled correctly when interleaved with ddl |
| */ |
| CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120)); |
| BEGIN; |
| INSERT INTO replication_example(somedata, text) VALUES (1, 1); |
| INSERT INTO replication_example(somedata, text) VALUES (1, 2); |
| COMMIT; |
| |
| ALTER TABLE replication_example ADD COLUMN bar int; |
| |
| INSERT INTO replication_example(somedata, text, bar) VALUES (2, 1, 4); |
| |
| BEGIN; |
| INSERT INTO replication_example(somedata, text, bar) VALUES (2, 2, 4); |
| INSERT INTO replication_example(somedata, text, bar) VALUES (2, 3, 4); |
| INSERT INTO replication_example(somedata, text, bar) VALUES (2, 4, NULL); |
| COMMIT; |
| |
| ALTER TABLE replication_example DROP COLUMN bar; |
| INSERT INTO replication_example(somedata, text) VALUES (3, 1); |
| |
| BEGIN; |
| INSERT INTO replication_example(somedata, text) VALUES (3, 2); |
| INSERT INTO replication_example(somedata, text) VALUES (3, 3); |
| COMMIT; |
| |
| ALTER TABLE replication_example RENAME COLUMN text TO somenum; |
| |
| INSERT INTO replication_example(somedata, somenum) VALUES (4, 1); |
| |
| -- collect all changes |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| |
| ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4); |
| -- check that this doesn't produce any changes from the heap rewrite |
| SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| |
| INSERT INTO replication_example(somedata, somenum) VALUES (5, 1); |
| |
| BEGIN; |
| INSERT INTO replication_example(somedata, somenum) VALUES (6, 1); |
| ALTER TABLE replication_example ADD COLUMN zaphod1 int; |
| INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 2, 1); |
| ALTER TABLE replication_example ADD COLUMN zaphod2 int; |
| INSERT INTO replication_example(somedata, somenum, zaphod2) VALUES (6, 3, 1); |
| INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 4, 2); |
| COMMIT; |
| |
| -- show changes |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| |
| -- ON CONFLICT DO UPDATE support |
| BEGIN; |
| INSERT INTO replication_example(id, somedata, somenum) SELECT i, i, i FROM generate_series(-15, 15) i |
| ON CONFLICT (id) DO UPDATE SET somenum = excluded.somenum + 1; |
| COMMIT; |
| |
| /* display results */ |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| |
| -- MERGE support |
| BEGIN; |
| MERGE INTO replication_example t |
| USING (SELECT i as id, i as data, i as num FROM generate_series(-20, 5) i) s |
| ON t.id = s.id |
| WHEN MATCHED AND t.id < 0 THEN |
| UPDATE SET somenum = somenum + 1 |
| WHEN MATCHED AND t.id >= 0 THEN |
| DELETE |
| WHEN NOT MATCHED THEN |
| INSERT VALUES (s.*); |
| COMMIT; |
| |
| /* display results */ |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| |
| CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int); |
| INSERT INTO tr_unique(data) VALUES(10); |
| ALTER TABLE tr_unique RENAME TO tr_pkey; |
| ALTER TABLE tr_pkey ADD COLUMN id serial primary key; |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-rewrites', '1'); |
| |
| INSERT INTO tr_pkey(data) VALUES(1); |
| --show deletion with primary key |
| DELETE FROM tr_pkey; |
| |
| /* display results */ |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| |
| /* |
| * check that disk spooling works (also for logical messages) |
| */ |
| BEGIN; |
| CREATE TABLE tr_etoomuch (id serial primary key, data int); |
| INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i); |
| SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg'); |
| DELETE FROM tr_etoomuch WHERE id < 5000; |
| UPDATE tr_etoomuch SET data = - data WHERE id > 5000; |
| CREATE TABLE tr_oddlength (id text primary key, data text); |
| INSERT INTO tr_oddlength VALUES('ab', 'foo'); |
| COMMIT; |
| |
| /* display results, but hide most of the output */ |
| SELECT count(*), min(data), max(data) |
| FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') |
| GROUP BY substring(data, 1, 24) |
| ORDER BY 1,2; |
| |
| -- check updates of primary keys work correctly |
| BEGIN; |
| CREATE TABLE spoolme AS SELECT g.i FROM generate_series(1, 5000) g(i); |
| UPDATE tr_etoomuch SET id = -id WHERE id = 5000; |
| UPDATE tr_oddlength SET id = 'x', data = 'quux'; |
| UPDATE tr_oddlength SET id = 'yy', data = 'a'; |
| DELETE FROM spoolme; |
| DROP TABLE spoolme; |
| COMMIT; |
| |
| SELECT data |
| FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') |
| WHERE data ~ 'UPDATE'; |
| |
| -- check that a large, spooled, upsert works |
| INSERT INTO tr_etoomuch (id, data) |
| SELECT g.i, -g.i FROM generate_series(8000, 12000) g(i) |
| ON CONFLICT(id) DO UPDATE SET data = EXCLUDED.data; |
| |
| SELECT substring(data, 1, 29), count(*) |
| FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') WITH ORDINALITY |
| GROUP BY 1 |
| ORDER BY min(ordinality); |
| |
| /* |
| * check whether we decode subtransactions correctly in relation with each |
| * other |
| */ |
| CREATE TABLE tr_sub (id serial primary key, path text); |
| |
| -- toplevel, subtxn, toplevel, subtxn, subtxn |
| BEGIN; |
| INSERT INTO tr_sub(path) VALUES ('1-top-#1'); |
| |
| SAVEPOINT a; |
| INSERT INTO tr_sub(path) VALUES ('1-top-1-#1'); |
| INSERT INTO tr_sub(path) VALUES ('1-top-1-#2'); |
| RELEASE SAVEPOINT a; |
| |
| SAVEPOINT b; |
| SAVEPOINT c; |
| INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#1'); |
| INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#2'); |
| RELEASE SAVEPOINT c; |
| INSERT INTO tr_sub(path) VALUES ('1-top-2-#1'); |
| RELEASE SAVEPOINT b; |
| COMMIT; |
| |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| |
| -- check that we handle xlog assignments correctly |
| BEGIN; |
| -- nest 80 subtxns |
| SAVEPOINT subtop;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a; |
| -- assign xid by inserting |
| INSERT INTO tr_sub(path) VALUES ('2-top-1...--#1'); |
| INSERT INTO tr_sub(path) VALUES ('2-top-1...--#2'); |
| INSERT INTO tr_sub(path) VALUES ('2-top-1...--#3'); |
| RELEASE SAVEPOINT subtop; |
| INSERT INTO tr_sub(path) VALUES ('2-top-#1'); |
| COMMIT; |
| |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| |
| -- make sure rollbacked subtransactions aren't decoded |
| BEGIN; |
| INSERT INTO tr_sub(path) VALUES ('3-top-2-#1'); |
| SAVEPOINT a; |
| INSERT INTO tr_sub(path) VALUES ('3-top-2-1-#1'); |
| SAVEPOINT b; |
| INSERT INTO tr_sub(path) VALUES ('3-top-2-2-#1'); |
| ROLLBACK TO SAVEPOINT b; |
| INSERT INTO tr_sub(path) VALUES ('3-top-2-#2'); |
| COMMIT; |
| |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| |
| -- test whether a known, but not yet logged toplevel xact, followed by a |
| -- subxact commit is handled correctly |
| BEGIN; |
| SELECT pg_current_xact_id() != '0'; -- so no fixed xid appears in the outfile |
| SAVEPOINT a; |
| INSERT INTO tr_sub(path) VALUES ('4-top-1-#1'); |
| RELEASE SAVEPOINT a; |
| COMMIT; |
| |
| -- test whether a change in a subtransaction, in an unknown toplevel |
| -- xact is handled correctly. |
| BEGIN; |
| SAVEPOINT a; |
| INSERT INTO tr_sub(path) VALUES ('5-top-1-#1'); |
| COMMIT; |
| |
| |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| |
| -- check that DDL in aborted subtransactions handled correctly |
| CREATE TABLE tr_sub_ddl(data int); |
| BEGIN; |
| SAVEPOINT a; |
| ALTER TABLE tr_sub_ddl ALTER COLUMN data TYPE text; |
| INSERT INTO tr_sub_ddl VALUES ('blah-blah'); |
| ROLLBACK TO SAVEPOINT a; |
| ALTER TABLE tr_sub_ddl ALTER COLUMN data TYPE bigint; |
| INSERT INTO tr_sub_ddl VALUES(43); |
| COMMIT; |
| |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| |
| |
| /* |
| * Check whether treating a table as a catalog table works somewhat |
| */ |
| CREATE TABLE replication_metadata ( |
| id serial primary key, |
| relation name NOT NULL, |
| options text[] |
| ) |
| WITH (user_catalog_table = true) |
| ; |
| \d+ replication_metadata |
| |
| INSERT INTO replication_metadata(relation, options) |
| VALUES ('foo', ARRAY['a', 'b']); |
| |
| ALTER TABLE replication_metadata RESET (user_catalog_table); |
| \d+ replication_metadata |
| |
| INSERT INTO replication_metadata(relation, options) |
| VALUES ('bar', ARRAY['a', 'b']); |
| |
| ALTER TABLE replication_metadata SET (user_catalog_table = true); |
| \d+ replication_metadata |
| |
| INSERT INTO replication_metadata(relation, options) |
| VALUES ('blub', NULL); |
| |
| -- make sure rewrites don't work |
| ALTER TABLE replication_metadata ADD COLUMN rewritemeornot int; |
| ALTER TABLE replication_metadata ALTER COLUMN rewritemeornot TYPE text; |
| |
| ALTER TABLE replication_metadata SET (user_catalog_table = false); |
| \d+ replication_metadata |
| |
| INSERT INTO replication_metadata(relation, options) |
| VALUES ('zaphod', NULL); |
| |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| |
| /* |
| * check whether we handle updates/deletes correct with & without a pkey |
| */ |
| |
| /* we should handle the case without a key at all more gracefully */ |
| CREATE TABLE table_without_key(id serial, data int); |
| INSERT INTO table_without_key(data) VALUES(1),(2); |
| DELETE FROM table_without_key WHERE data = 1; |
| -- won't log old keys |
| UPDATE table_without_key SET data = 3 WHERE data = 2; |
| UPDATE table_without_key SET id = -id; |
| UPDATE table_without_key SET id = -id; |
| -- should log the full old row now |
| ALTER TABLE table_without_key REPLICA IDENTITY FULL; |
| UPDATE table_without_key SET data = 3 WHERE data = 2; |
| UPDATE table_without_key SET id = -id; |
| UPDATE table_without_key SET id = -id; |
| -- ensure that FULL correctly deals with new columns |
| ALTER TABLE table_without_key ADD COLUMN new_column text; |
| UPDATE table_without_key SET id = -id; |
| UPDATE table_without_key SET id = -id, new_column = 'someval'; |
| DELETE FROM table_without_key WHERE data = 3; |
| |
| CREATE TABLE table_with_pkey(id serial primary key, data int); |
| INSERT INTO table_with_pkey(data) VALUES(1), (2); |
| DELETE FROM table_with_pkey WHERE data = 1; |
| -- should log the old pkey |
| UPDATE table_with_pkey SET data = 3 WHERE data = 2; |
| UPDATE table_with_pkey SET id = -id; |
| UPDATE table_with_pkey SET id = -id; |
| -- check that we log nothing despite having a pkey |
| ALTER TABLE table_without_key REPLICA IDENTITY NOTHING; |
| UPDATE table_with_pkey SET id = -id; |
| -- check that we log everything despite having a pkey |
| ALTER TABLE table_without_key REPLICA IDENTITY FULL; |
| UPDATE table_with_pkey SET id = -id; |
| DELETE FROM table_with_pkey WHERE data = 3; |
| |
| CREATE TABLE table_with_unique_not_null(id serial unique, data int); |
| ALTER TABLE table_with_unique_not_null ALTER COLUMN id SET NOT NULL; --already set |
| -- won't log anything, replica identity not setup |
| INSERT INTO table_with_unique_not_null(data) VALUES(1), (2); |
| DELETE FROM table_with_unique_not_null WHERE data = 1; |
| UPDATE table_with_unique_not_null SET data = 3 WHERE data = 2; |
| UPDATE table_with_unique_not_null SET id = -id; |
| UPDATE table_with_unique_not_null SET id = -id; |
| DELETE FROM table_with_unique_not_null WHERE data = 3; |
| -- should log old key |
| ALTER TABLE table_with_unique_not_null REPLICA IDENTITY USING INDEX table_with_unique_not_null_id_key; |
| INSERT INTO table_with_unique_not_null(data) VALUES(1), (2); |
| DELETE FROM table_with_unique_not_null WHERE data = 1; |
| UPDATE table_with_unique_not_null SET data = 3 WHERE data = 2; |
| UPDATE table_with_unique_not_null SET id = -id; |
| UPDATE table_with_unique_not_null SET id = -id; |
| DELETE FROM table_with_unique_not_null WHERE data = 3; |
| |
| -- check tables with dropped indexes used in REPLICA IDENTITY |
| -- table with primary key |
| CREATE TABLE table_dropped_index_with_pk (a int PRIMARY KEY, b int, c int); |
| CREATE UNIQUE INDEX table_dropped_index_with_pk_idx |
| ON table_dropped_index_with_pk(a); |
| ALTER TABLE table_dropped_index_with_pk REPLICA IDENTITY |
| USING INDEX table_dropped_index_with_pk_idx; |
| DROP INDEX table_dropped_index_with_pk_idx; |
| INSERT INTO table_dropped_index_with_pk VALUES (1,1,1), (2,2,2), (3,3,3); |
| UPDATE table_dropped_index_with_pk SET a = 4 WHERE a = 1; |
| UPDATE table_dropped_index_with_pk SET b = 5 WHERE a = 2; |
| UPDATE table_dropped_index_with_pk SET b = 6, c = 7 WHERE a = 3; |
| DELETE FROM table_dropped_index_with_pk WHERE b = 1; |
| DELETE FROM table_dropped_index_with_pk WHERE a = 3; |
| DROP TABLE table_dropped_index_with_pk; |
| |
| -- table without primary key |
| CREATE TABLE table_dropped_index_no_pk (a int NOT NULL, b int, c int); |
| CREATE UNIQUE INDEX table_dropped_index_no_pk_idx |
| ON table_dropped_index_no_pk(a); |
| ALTER TABLE table_dropped_index_no_pk REPLICA IDENTITY |
| USING INDEX table_dropped_index_no_pk_idx; |
| DROP INDEX table_dropped_index_no_pk_idx; |
| INSERT INTO table_dropped_index_no_pk VALUES (1,1,1), (2,2,2), (3,3,3); |
| UPDATE table_dropped_index_no_pk SET a = 4 WHERE a = 1; |
| UPDATE table_dropped_index_no_pk SET b = 5 WHERE a = 2; |
| UPDATE table_dropped_index_no_pk SET b = 6, c = 7 WHERE a = 3; |
| DELETE FROM table_dropped_index_no_pk WHERE b = 1; |
| DELETE FROM table_dropped_index_no_pk WHERE a = 3; |
| DROP TABLE table_dropped_index_no_pk; |
| |
| -- check toast support |
| BEGIN; |
| CREATE SEQUENCE toasttable_rand_seq START 79 INCREMENT 1499; -- portable "random" |
| CREATE TABLE toasttable( |
| id serial primary key, |
| toasted_col1 text, |
| rand1 float8 DEFAULT nextval('toasttable_rand_seq'), |
| toasted_col2 text, |
| rand2 float8 DEFAULT nextval('toasttable_rand_seq') |
| ); |
| COMMIT; |
| -- uncompressed external toast data |
| INSERT INTO toasttable(toasted_col1) SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i); |
| |
| -- compressed external toast data |
| INSERT INTO toasttable(toasted_col2) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i); |
| |
| -- update of existing column |
| UPDATE toasttable |
| SET toasted_col1 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i)) |
| WHERE id = 1; |
| |
| -- This output is extremely wide, and using aligned mode causes psql to |
| -- produce 200kB of useless dashes. Turn that off temporarily to avoid it. |
| \pset format unaligned |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| \pset format aligned |
| |
| INSERT INTO toasttable(toasted_col1) SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i); |
| |
| -- update of second column, first column unchanged |
| UPDATE toasttable |
| SET toasted_col2 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i)) |
| WHERE id = 1; |
| |
| -- make sure we decode correctly even if the toast table is gone |
| DROP TABLE toasttable; |
| |
| \pset format unaligned |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| |
| -- done, free logical replication slot |
| SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); |
| \pset format aligned |
| |
| SELECT pg_drop_replication_slot('regression_slot'); |
| |
| /* check that the slot is gone */ |
| \x |
| SELECT * FROM pg_replication_slots; |
| \x |