| \set ECHO none |
| |
| -- wait for other processes, wait max 100 sec |
| do $$ |
| declare c int; |
| begin |
| if pg_try_advisory_xact_lock(1) then |
| for i in 1..1000 loop |
| perform pg_sleep(0.1); |
| c := (select count(*) from pg_locks where locktype = 'advisory' and objid = 1 and not granted); |
| if c = 1 then |
| return; |
| end if; |
| end loop; |
| else |
| perform pg_advisory_xact_lock(1); |
| end if; |
| end; |
| $$; |
| |
| SET client_min_messages = warning; |
| DROP TABLE IF EXISTS TEMP; |
| CREATE TABLE TEMP(id integer,name text); |
| INSERT INTO TEMP VALUES (1,'bob'),(2,'rob'),(3,'john'); |
| |
| DROP USER IF EXISTS pipe_test_owner; |
| CREATE ROLE pipe_test_owner WITH CREATEROLE; |
| ALTER TABLE TEMP OWNER TO pipe_test_owner; |
| SET client_min_messages = notice; |
| |
| -- Notify session B of 'pipe_test_owner' having been created. |
| SELECT dbms_pipe.pack_message(1); |
| SELECT dbms_pipe.send_message('pipe_test_owner_created_notifier'); |
| -- Create a new connection under the userid of pipe_test_owner |
| SET SESSION AUTHORIZATION pipe_test_owner; |
| |
| /* create an implicit pipe and sends message using |
| * send_message(text,integer,integer) |
| */ |
| CREATE OR REPLACE FUNCTION send(pipename text) RETURNS void AS $$ |
| BEGIN |
| IF dbms_pipe.send_message(pipename,2,10) = 1 THEN |
| RAISE NOTICE 'Timeout'; |
| PERFORM pg_sleep(2); |
| PERFORM dbms_pipe.send_message(pipename,2,10); |
| END IF; |
| END; $$ LANGUAGE plpgsql; |
| |
| -- Test pack_message for all supported types and send_message |
| CREATE OR REPLACE FUNCTION createImplicitPipe() RETURNS void AS $$ |
| DECLARE |
| row TEMP%ROWTYPE; |
| BEGIN |
| PERFORM dbms_pipe.pack_message('Message From Session A'::text); |
| PERFORM send('named_pipe'); |
| PERFORM dbms_pipe.pack_message('2013-01-01'::date); |
| PERFORM send('named_pipe'); |
| PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00'::timestamp); |
| PERFORM send('named_pipe'); |
| PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00-08'::timestamptz); |
| PERFORM send('named_pipe'); |
| PERFORM dbms_pipe.pack_message(12345.6789::numeric); |
| PERFORM send('named_pipe'); |
| PERFORM dbms_pipe.pack_message(12345::integer); |
| PERFORM send('named_pipe'); |
| PERFORM dbms_pipe.pack_message(99999999999::bigint); |
| PERFORM send('named_pipe'); |
| PERFORM dbms_pipe.pack_message(E'\\201'::bytea); |
| PERFORM send('named_pipe'); |
| SELECT * INTO row FROM TEMP WHERE id=2; |
| PERFORM dbms_pipe.pack_message(row); |
| PERFORM send('named_pipe'); |
| END; $$ LANGUAGE plpgsql; |
| |
| CREATE OR REPLACE FUNCTION bulkSend() RETURNS void AS $$ |
| DECLARE |
| row TEMP%ROWTYPE; |
| BEGIN |
| PERFORM dbms_pipe.pack_message('Message From Session A'::text); |
| PERFORM dbms_pipe.pack_message('2013-01-01'::date); |
| PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00'::timestamp); |
| PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00-08'::timestamptz); |
| PERFORM dbms_pipe.pack_message(12345.6789::numeric); |
| PERFORM dbms_pipe.pack_message(12345::integer); |
| PERFORM dbms_pipe.pack_message(99999999999::bigint); |
| PERFORM dbms_pipe.pack_message(E'\\201'::bytea); |
| SELECT * INTO row FROM TEMP WHERE id=2; |
| PERFORM dbms_pipe.pack_message(row); |
| PERFORM send('named_pipe_2'); |
| END; $$ LANGUAGE plpgsql; |
| |
| |
| /* Creates an explicit pipe using either create_pipe(text,integer,bool), |
| * create_pipe(text,integer) OR create_pipe(text). |
| * In case third parameter (bool) absent, default is false, that is, it's a public pipe. |
| */ |
| CREATE OR REPLACE FUNCTION createPipe(name text,ver integer) RETURNS void AS $$ |
| BEGIN |
| IF ver = 3 THEN |
| PERFORM dbms_pipe.create_pipe(name,4,true); |
| ELSIF ver = 2 THEN |
| PERFORM dbms_pipe.create_pipe(name,4); |
| ELSE |
| PERFORM dbms_pipe.create_pipe(name); |
| END IF; |
| END; $$ LANGUAGE plpgsql; |
| |
| |
| /* Testing create_pipe for different versions, one of them, is the case of |
| * private pipe |
| */ |
| |
| CREATE OR REPLACE FUNCTION createExplicitPipe(pipename text,create_version integer) RETURNS void AS $$ |
| DECLARE |
| row TEMP%ROWTYPE; |
| BEGIN |
| |
| PERFORM createPipe(pipename,create_version); |
| |
| PERFORM dbms_pipe.reset_buffer(); |
| |
| PERFORM dbms_pipe.pack_message('Message From Session A'::text); |
| PERFORM send(pipename); |
| PERFORM dbms_pipe.pack_message('2013-01-01'::date); |
| PERFORM send(pipename); |
| PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00'::timestamp); |
| PERFORM send(pipename); |
| PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00-08'::timestamptz); |
| PERFORM send(pipename); |
| PERFORM dbms_pipe.pack_message(12345.6789::numeric); |
| PERFORM send(pipename); |
| PERFORM dbms_pipe.pack_message(12345::integer); |
| PERFORM send(pipename); |
| PERFORM dbms_pipe.pack_message(99999999999::bigint); |
| PERFORM send(pipename); |
| PERFORM dbms_pipe.pack_message(E'\\201'::bytea); |
| PERFORM send(pipename); |
| SELECT * INTO row FROM TEMP WHERE id=2; |
| PERFORM dbms_pipe.pack_message(row); |
| PERFORM send(pipename); |
| END; $$ LANGUAGE plpgsql; |
| |
| |
| -- Test send_message(text) |
| CREATE OR REPLACE FUNCTION checkSend1() RETURNS void AS $$ |
| BEGIN |
| PERFORM dbms_pipe.pack_message('checking one-argument send_message()'); |
| PERFORM dbms_pipe.send_message('pipe_name_1'); |
| END; $$ LANGUAGE plpgsql; |
| |
| |
| -- Test send_message(text,integer) |
| CREATE OR REPLACE FUNCTION checkSend2() RETURNS void AS $$ |
| BEGIN |
| PERFORM dbms_pipe.pack_message('checking two-argument send_message()'); |
| IF dbms_pipe.send_message('pipe_name_2',2) = 1 THEN |
| RAISE NOTICE 'Timeout'; |
| PERFORM pg_sleep(2); |
| PERFORM dbms_pipe.send_message('pipe_name_2',2); |
| END IF; |
| END; $$ LANGUAGE plpgsql; |
| |
| CREATE OR REPLACE FUNCTION notifyDropTemp() RETURNS void AS $$ |
| BEGIN |
| PERFORM dbms_pipe.pack_message(1); |
| PERFORM dbms_pipe.send_message('pipe_name_3'); |
| END; $$ LANGUAGE plpgsql; |
| |
| |
| CREATE OR REPLACE FUNCTION checkUniqueSessionNameA() RETURNS void AS $$ |
| BEGIN |
| PERFORM dbms_pipe.pack_message(dbms_pipe.unique_session_name()); |
| PERFORM dbms_pipe.send_message('pipe_name_4'); |
| END; $$ LANGUAGE plpgsql; |
| |
| CREATE OR REPLACE FUNCTION notify(pipename text) RETURNS void AS $$ |
| BEGIN |
| PERFORM dbms_pipe.pack_message(1); |
| PERFORM dbms_pipe.send_message(pipename); |
| END; $$ LANGUAGE plpgsql; |
| |
| \set ECHO all |
| |
| SELECT createImplicitPipe(); |
| |
| -- Bulk send messages |
| SELECT bulkSend(); |
| |
| -- An explicit private pipe |
| SELECT notify('recv_private1_notifier'); |
| SELECT createExplicitPipe('private_pipe_1',3); |
| |
| -- An explicit private pipe |
| SELECT notify('recv_private2_notifier'); |
| SELECT createExplicitPipe('private_pipe_2',3); |
| |
| -- An explicit public pipe (uses two-argument create_pipe) |
| SELECT notify('recv_public1_notifier'); |
| SELECT createExplicitPipe('public_pipe_3',2); |
| |
| -- An explicit public pipe (uses one-argument create_pipe) |
| SELECT notify('recv_public2_notifier'); |
| SELECT createExplicitPipe('public_pipe_4',1); |
| |
| -- tests send_message(text) |
| SELECT checkSend1(); |
| |
| -- tests send_message(text,integer) |
| SELECT checkSend2(); |
| |
| SELECT notifyDropTemp(); |
| |
| -- tests unique_session_name() |
| SELECT checkUniqueSessionNameA(); |
| |
| DROP FUNCTION createImplicitPipe(); |
| DROP FUNCTION createExplicitPipe(text,integer); |
| DROP FUNCTION createPipe(text,integer); |
| DROP FUNCTION checkSend1(); |
| DROP FUNCTION checkSend2(); |
| DROP FUNCTION checkUniqueSessionNameA(); |
| DROP FUNCTION bulkSend(); |
| DROP FUNCTION notifyDropTemp(); |
| DROP FUNCTION notify(text); |
| DROP FUNCTION send(text); |