blob: 1f72b566555c92761a622dc889843a7a3011ed69 [file] [log] [blame]
\set ECHO none
-- wait for other processes, wait max 100 sec
do $$
declare c int;
begin
if pg_try_advisory_xact_lock(1) then
for i in 1..1000 loop
perform pg_sleep(0.1);
c := (select count(*) from pg_locks where locktype = 'advisory' and objid = 1 and not granted);
if c = 1 then
return;
end if;
end loop;
else
perform pg_advisory_xact_lock(1);
end if;
end;
$$;
\set VERBOSITY terse
--Wait for 'pipe_test_owner' created notification to be sent by session A
SELECT dbms_pipe.receive_message('pipe_test_owner_created_notifier');
-- create new connection under the userid of 'pipe_test_owner'
SET SESSION AUTHORIZATION pipe_test_owner;
/* Tests receive_message(text,integer), next_item_type() and all versions of
* unpack_message_<type>() and purge(text)
*/
CREATE OR REPLACE FUNCTION receiveFrom(pipename text) RETURNS void AS $$
DECLARE
typ INTEGER;
BEGIN
WHILE true LOOP
SELECT dbms_pipe.receive_message(pipename, 0) INTO typ;
-- 0 means data is available
IF typ = 0 THEN
EXIT;
END IF;
PERFORM pg_sleep(0.5);
END LOOP;
WHILE true LOOP
SELECT dbms_pipe.next_item_type() INTO typ;
IF typ = 0 THEN EXIT;
ELSIF typ=9 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_number();
ELSIF typ=11 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_text();
ELSIF typ=12 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_date();
ELSIF typ=13 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_timestamp();
ELSIF typ=23 THEN RAISE NOTICE 'RECEIVE %: %', typ, encode(dbms_pipe.unpack_message_bytea(),'escape');
ELSIF typ=24 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_record();
END IF;
PERFORM dbms_pipe.receive_message(pipename, 2);
END LOOP;
PERFORM dbms_pipe.purge(pipename);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION bulkReceive() RETURNS void AS $$
DECLARE
typ INTEGER;
BEGIN
IF dbms_pipe.receive_message('named_pipe_2',2) = 1 THEN
RAISE NOTICE 'Timeout';
PERFORM pg_sleep(2);
PERFORM dbms_pipe.receive_message('named_pipe_2',2);
END IF;
WHILE true LOOP
SELECT dbms_pipe.next_item_type() INTO typ;
IF typ = 0 THEN EXIT;
ELSIF typ=9 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_number();
ELSIF typ=11 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_text();
ELSIF typ=12 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_date();
ELSIF typ=13 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_timestamp();
ELSIF typ=23 THEN RAISE NOTICE 'RECEIVE %: %', typ, encode(dbms_pipe.unpack_message_bytea()::bytea,'escape');
ELSIF typ=24 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_record();
END IF;
END LOOP;
PERFORM dbms_pipe.purge('named_pipe_2');
END;
$$ LANGUAGE plpgsql;
-- Tests receive_message(text)
CREATE OR REPLACE FUNCTION checkReceive1(pipename text) RETURNS void AS $$
BEGIN
PERFORM dbms_pipe.receive_message(pipename);
RAISE NOTICE 'RECEIVE %',dbms_pipe.unpack_message_text();
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION dropTempTable() RETURNS void AS $$
BEGIN
WHILE dbms_pipe.receive_message('pipe_name_3') <> 0 LOOP
CONTINUE;
END LOOP;
DROP TABLE TEMP;
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION checkUniqueSessionNameB() RETURNS bool AS $$
DECLARE
result bool;
BEGIN
PERFORM dbms_pipe.receive_message('pipe_name_4');
SELECT dbms_pipe.unpack_message_text() = dbms_pipe.unique_session_name() INTO result;
RETURN result;
END; $$ LANGUAGE plpgsql;
\set ECHO all
-- Receives messages sent via an implicit pipe
SELECT receiveFrom('named_pipe');
-- Bulk receive messages
SELECT bulkReceive();
-- Receives messages sent via an explicit private pipe under the same user
-- 'pipe_test_owner'
SELECT dbms_pipe.receive_message('recv_private1_notifier');
SELECT receiveFrom('private_pipe_1');
-- Switch user to 'pipe_test_other'
DROP USER IF EXISTS pipe_test_other;
CREATE USER pipe_test_other;
SET SESSION AUTHORIZATION pipe_test_other;
-- Try to receive messages sent via an explicit private pipe under the user
-- 'pipe_test_other' who is not the owner of pipe.
-- insufficient privileges in case of 'private_pipe_2'.
SELECT dbms_pipe.receive_message('recv_private2_notifier');
SELECT receiveFrom('private_pipe_2');
-- These are explicit private pipes created using create_pipe(text,integer)
-- and create_pipe(text)
SELECT dbms_pipe.receive_message('recv_public1_notifier');
SELECT receiveFrom('public_pipe_3');
SELECT dbms_pipe.receive_message('recv_public2_notifier');
SELECT receiveFrom('public_pipe_4');
-- Switch back to user 'pipe_test_owner'
SET SESSION AUTHORIZATION pipe_test_owner;
DROP USER pipe_test_other;
-- Tests receive_message(text)
SELECT checkReceive1('pipe_name_1');
SELECT checkReceive1('pipe_name_2');
-- Tests dbms_pipe.db_pipes view
SELECT name, items, "limit", private, owner
FROM dbms_pipe.db_pipes
WHERE name LIKE 'private%'
ORDER BY name;
-- Tests dbms_pipe.__list_pipes(); attribute size is not included
-- since it can be different across runs.
SELECT name, items, "limit", private, owner
FROM dbms_pipe.__list_pipes() AS (name varchar, items int4, siz int4, "limit" int4, private bool, owner varchar)
WHERE name <> 'pipe_name_4'
ORDER BY 1;
-- Tests remove_pipe(text)
SELECT dbms_pipe.remove_pipe('private_pipe_1');
SELECT dbms_pipe.remove_pipe('private_pipe_2');
SELECT dbms_pipe.remove_pipe('public_pipe_3');
SELECT dbms_pipe.remove_pipe('public_pipe_4');
SELECT dbms_pipe.purge('pipe_name_1');
SELECT dbms_pipe.purge('pipe_name_2');
-- Receives drop table notification from session A via 'pipe_name_3'
SELECT dropTempTable();
SELECT dbms_pipe.purge('pipe_name_3');
-- tests unique_session_name() (uses 'pipe_name_4')
SELECT checkUniqueSessionNameB();
SELECT dbms_pipe.purge('pipe_name_4');
DROP FUNCTION receiveFrom(text);
DROP FUNCTION checkReceive1(text);
DROP FUNCTION checkUniqueSessionNameB();
DROP FUNCTION bulkReceive();
DROP FUNCTION dropTempTable();
-- Perform a recieve on removed pipe resulting on timeout
SELECT dbms_pipe.receive_message('public_pipe_4',2);
SELECT dbms_pipe.purge('public_pipe_4');
SET SESSION AUTHORIZATION DEFAULT;
DROP USER pipe_test_owner;