| \set ECHO none |
| |
| -- wait for other processes, wait max 100 sec |
| do $$ |
| declare c int; |
| begin |
| if pg_try_advisory_xact_lock(1) then |
| for i in 1..1000 loop |
| perform pg_sleep(0.1); |
| c := (select count(*) from pg_locks where locktype = 'advisory' and objid = 1 and not granted); |
| if c = 1 then |
| return; |
| end if; |
| end loop; |
| else |
| perform pg_advisory_xact_lock(1); |
| end if; |
| end; |
| $$; |
| |
| \set VERBOSITY terse |
| --Wait for 'pipe_test_owner' created notification to be sent by session A |
| SELECT dbms_pipe.receive_message('pipe_test_owner_created_notifier'); |
| |
| -- create new connection under the userid of 'pipe_test_owner' |
| SET SESSION AUTHORIZATION pipe_test_owner; |
| |
| /* Tests receive_message(text,integer), next_item_type() and all versions of |
| * unpack_message_<type>() and purge(text) |
| */ |
| |
| CREATE OR REPLACE FUNCTION receiveFrom(pipename text) RETURNS void AS $$ |
| DECLARE |
| typ INTEGER; |
| BEGIN |
| WHILE true LOOP |
| SELECT dbms_pipe.receive_message(pipename, 0) INTO typ; |
| -- 0 means data is available |
| IF typ = 0 THEN |
| EXIT; |
| END IF; |
| PERFORM pg_sleep(0.5); |
| END LOOP; |
| WHILE true LOOP |
| SELECT dbms_pipe.next_item_type() INTO typ; |
| IF typ = 0 THEN EXIT; |
| ELSIF typ=9 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_number(); |
| ELSIF typ=11 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_text(); |
| ELSIF typ=12 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_date(); |
| ELSIF typ=13 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_timestamp(); |
| ELSIF typ=23 THEN RAISE NOTICE 'RECEIVE %: %', typ, encode(dbms_pipe.unpack_message_bytea(),'escape'); |
| ELSIF typ=24 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_record(); |
| END IF; |
| PERFORM dbms_pipe.receive_message(pipename, 2); |
| END LOOP; |
| PERFORM dbms_pipe.purge(pipename); |
| END; |
| $$ LANGUAGE plpgsql; |
| |
| CREATE OR REPLACE FUNCTION bulkReceive() RETURNS void AS $$ |
| DECLARE |
| typ INTEGER; |
| BEGIN |
| IF dbms_pipe.receive_message('named_pipe_2',2) = 1 THEN |
| RAISE NOTICE 'Timeout'; |
| PERFORM pg_sleep(2); |
| PERFORM dbms_pipe.receive_message('named_pipe_2',2); |
| END IF; |
| WHILE true LOOP |
| SELECT dbms_pipe.next_item_type() INTO typ; |
| IF typ = 0 THEN EXIT; |
| ELSIF typ=9 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_number(); |
| ELSIF typ=11 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_text(); |
| ELSIF typ=12 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_date(); |
| ELSIF typ=13 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_timestamp(); |
| ELSIF typ=23 THEN RAISE NOTICE 'RECEIVE %: %', typ, encode(dbms_pipe.unpack_message_bytea()::bytea,'escape'); |
| ELSIF typ=24 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_record(); |
| END IF; |
| END LOOP; |
| PERFORM dbms_pipe.purge('named_pipe_2'); |
| END; |
| $$ LANGUAGE plpgsql; |
| |
| -- Tests receive_message(text) |
| |
| CREATE OR REPLACE FUNCTION checkReceive1(pipename text) RETURNS void AS $$ |
| BEGIN |
| PERFORM dbms_pipe.receive_message(pipename); |
| RAISE NOTICE 'RECEIVE %',dbms_pipe.unpack_message_text(); |
| END; $$ LANGUAGE plpgsql; |
| |
| CREATE OR REPLACE FUNCTION dropTempTable() RETURNS void AS $$ |
| BEGIN |
| WHILE dbms_pipe.receive_message('pipe_name_3') <> 0 LOOP |
| CONTINUE; |
| END LOOP; |
| DROP TABLE TEMP; |
| END; $$ LANGUAGE plpgsql; |
| |
| |
| CREATE OR REPLACE FUNCTION checkUniqueSessionNameB() RETURNS bool AS $$ |
| DECLARE |
| result bool; |
| BEGIN |
| PERFORM dbms_pipe.receive_message('pipe_name_4'); |
| SELECT dbms_pipe.unpack_message_text() = dbms_pipe.unique_session_name() INTO result; |
| RETURN result; |
| END; $$ LANGUAGE plpgsql; |
| |
| \set ECHO all |
| |
| -- Receives messages sent via an implicit pipe |
| SELECT receiveFrom('named_pipe'); |
| |
| -- Bulk receive messages |
| SELECT bulkReceive(); |
| |
| -- Receives messages sent via an explicit private pipe under the same user |
| -- 'pipe_test_owner' |
| SELECT dbms_pipe.receive_message('recv_private1_notifier'); |
| SELECT receiveFrom('private_pipe_1'); |
| |
| -- Switch user to 'pipe_test_other' |
| DROP USER IF EXISTS pipe_test_other; |
| CREATE USER pipe_test_other; |
| SET SESSION AUTHORIZATION pipe_test_other; |
| |
| -- Try to receive messages sent via an explicit private pipe under the user |
| -- 'pipe_test_other' who is not the owner of pipe. |
| -- insufficient privileges in case of 'private_pipe_2'. |
| |
| SELECT dbms_pipe.receive_message('recv_private2_notifier'); |
| SELECT receiveFrom('private_pipe_2'); |
| |
| -- These are explicit private pipes created using create_pipe(text,integer) |
| -- and create_pipe(text) |
| SELECT dbms_pipe.receive_message('recv_public1_notifier'); |
| SELECT receiveFrom('public_pipe_3'); |
| |
| SELECT dbms_pipe.receive_message('recv_public2_notifier'); |
| SELECT receiveFrom('public_pipe_4'); |
| |
| -- Switch back to user 'pipe_test_owner' |
| SET SESSION AUTHORIZATION pipe_test_owner; |
| DROP USER pipe_test_other; |
| |
| -- Tests receive_message(text) |
| SELECT checkReceive1('pipe_name_1'); |
| SELECT checkReceive1('pipe_name_2'); |
| |
| -- Tests dbms_pipe.db_pipes view |
| SELECT name, items, "limit", private, owner |
| FROM dbms_pipe.db_pipes |
| WHERE name LIKE 'private%' |
| ORDER BY name; |
| |
| -- Tests dbms_pipe.__list_pipes(); attribute size is not included |
| -- since it can be different across runs. |
| SELECT name, items, "limit", private, owner |
| FROM dbms_pipe.__list_pipes() AS (name varchar, items int4, siz int4, "limit" int4, private bool, owner varchar) |
| WHERE name <> 'pipe_name_4' |
| ORDER BY 1; |
| |
| -- Tests remove_pipe(text) |
| SELECT dbms_pipe.remove_pipe('private_pipe_1'); |
| SELECT dbms_pipe.remove_pipe('private_pipe_2'); |
| SELECT dbms_pipe.remove_pipe('public_pipe_3'); |
| SELECT dbms_pipe.remove_pipe('public_pipe_4'); |
| SELECT dbms_pipe.purge('pipe_name_1'); |
| SELECT dbms_pipe.purge('pipe_name_2'); |
| |
| -- Receives drop table notification from session A via 'pipe_name_3' |
| SELECT dropTempTable(); |
| SELECT dbms_pipe.purge('pipe_name_3'); |
| |
| |
| -- tests unique_session_name() (uses 'pipe_name_4') |
| SELECT checkUniqueSessionNameB(); |
| SELECT dbms_pipe.purge('pipe_name_4'); |
| |
| DROP FUNCTION receiveFrom(text); |
| DROP FUNCTION checkReceive1(text); |
| DROP FUNCTION checkUniqueSessionNameB(); |
| DROP FUNCTION bulkReceive(); |
| DROP FUNCTION dropTempTable(); |
| |
| -- Perform a recieve on removed pipe resulting on timeout |
| SELECT dbms_pipe.receive_message('public_pipe_4',2); |
| SELECT dbms_pipe.purge('public_pipe_4'); |
| |
| SET SESSION AUTHORIZATION DEFAULT; |
| DROP USER pipe_test_owner; |