blob: cc8a1775bc69fbf9f407212f21f0dff2bb12f915 [file] [log] [blame]
\set ECHO none
-- wait for other processes, wait max 100 sec
do $$
declare c int;
begin
if pg_try_advisory_xact_lock(1) then
for i in 1..1000 loop
perform pg_sleep(0.1);
c := (select count(*) from pg_locks where locktype = 'advisory' and objid = 1 and not granted);
if c = 1 then
return;
end if;
end loop;
else
perform pg_advisory_xact_lock(1);
end if;
end;
$$;
SET client_min_messages = warning;
DROP TABLE IF EXISTS TEMP;
CREATE TABLE TEMP(id integer,name text);
INSERT INTO TEMP VALUES (1,'bob'),(2,'rob'),(3,'john');
DROP USER IF EXISTS pipe_test_owner;
CREATE ROLE pipe_test_owner WITH CREATEROLE;
ALTER TABLE TEMP OWNER TO pipe_test_owner;
SET client_min_messages = notice;
-- Notify session B of 'pipe_test_owner' having been created.
SELECT dbms_pipe.pack_message(1);
SELECT dbms_pipe.send_message('pipe_test_owner_created_notifier');
-- Create a new connection under the userid of pipe_test_owner
SET SESSION AUTHORIZATION pipe_test_owner;
/* create an implicit pipe and sends message using
* send_message(text,integer,integer)
*/
CREATE OR REPLACE FUNCTION send(pipename text) RETURNS void AS $$
BEGIN
IF dbms_pipe.send_message(pipename,2,10) = 1 THEN
RAISE NOTICE 'Timeout';
PERFORM pg_sleep(2);
PERFORM dbms_pipe.send_message(pipename,2,10);
END IF;
END; $$ LANGUAGE plpgsql;
-- Test pack_message for all supported types and send_message
CREATE OR REPLACE FUNCTION createImplicitPipe() RETURNS void AS $$
DECLARE
row TEMP%ROWTYPE;
BEGIN
PERFORM dbms_pipe.pack_message('Message From Session A'::text);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message('2013-01-01'::date);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00'::timestamp);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00-08'::timestamptz);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message(12345.6789::numeric);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message(12345::integer);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message(99999999999::bigint);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message(E'\\201'::bytea);
PERFORM send('named_pipe');
SELECT * INTO row FROM TEMP WHERE id=2;
PERFORM dbms_pipe.pack_message(row);
PERFORM send('named_pipe');
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION bulkSend() RETURNS void AS $$
DECLARE
row TEMP%ROWTYPE;
BEGIN
PERFORM dbms_pipe.pack_message('Message From Session A'::text);
PERFORM dbms_pipe.pack_message('2013-01-01'::date);
PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00'::timestamp);
PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00-08'::timestamptz);
PERFORM dbms_pipe.pack_message(12345.6789::numeric);
PERFORM dbms_pipe.pack_message(12345::integer);
PERFORM dbms_pipe.pack_message(99999999999::bigint);
PERFORM dbms_pipe.pack_message(E'\\201'::bytea);
SELECT * INTO row FROM TEMP WHERE id=2;
PERFORM dbms_pipe.pack_message(row);
PERFORM send('named_pipe_2');
END; $$ LANGUAGE plpgsql;
/* Creates an explicit pipe using either create_pipe(text,integer,bool),
* create_pipe(text,integer) OR create_pipe(text).
* In case third parameter (bool) absent, default is false, that is, it's a public pipe.
*/
CREATE OR REPLACE FUNCTION createPipe(name text,ver integer) RETURNS void AS $$
BEGIN
IF ver = 3 THEN
PERFORM dbms_pipe.create_pipe(name,4,true);
ELSIF ver = 2 THEN
PERFORM dbms_pipe.create_pipe(name,4);
ELSE
PERFORM dbms_pipe.create_pipe(name);
END IF;
END; $$ LANGUAGE plpgsql;
/* Testing create_pipe for different versions, one of them, is the case of
* private pipe
*/
CREATE OR REPLACE FUNCTION createExplicitPipe(pipename text,create_version integer) RETURNS void AS $$
DECLARE
row TEMP%ROWTYPE;
BEGIN
PERFORM createPipe(pipename,create_version);
PERFORM dbms_pipe.reset_buffer();
PERFORM dbms_pipe.pack_message('Message From Session A'::text);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message('2013-01-01'::date);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00'::timestamp);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00-08'::timestamptz);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message(12345.6789::numeric);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message(12345::integer);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message(99999999999::bigint);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message(E'\\201'::bytea);
PERFORM send(pipename);
SELECT * INTO row FROM TEMP WHERE id=2;
PERFORM dbms_pipe.pack_message(row);
PERFORM send(pipename);
END; $$ LANGUAGE plpgsql;
-- Test send_message(text)
CREATE OR REPLACE FUNCTION checkSend1() RETURNS void AS $$
BEGIN
PERFORM dbms_pipe.pack_message('checking one-argument send_message()');
PERFORM dbms_pipe.send_message('pipe_name_1');
END; $$ LANGUAGE plpgsql;
-- Test send_message(text,integer)
CREATE OR REPLACE FUNCTION checkSend2() RETURNS void AS $$
BEGIN
PERFORM dbms_pipe.pack_message('checking two-argument send_message()');
IF dbms_pipe.send_message('pipe_name_2',2) = 1 THEN
RAISE NOTICE 'Timeout';
PERFORM pg_sleep(2);
PERFORM dbms_pipe.send_message('pipe_name_2',2);
END IF;
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION notifyDropTemp() RETURNS void AS $$
BEGIN
PERFORM dbms_pipe.pack_message(1);
PERFORM dbms_pipe.send_message('pipe_name_3');
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION checkUniqueSessionNameA() RETURNS void AS $$
BEGIN
PERFORM dbms_pipe.pack_message(dbms_pipe.unique_session_name());
PERFORM dbms_pipe.send_message('pipe_name_4');
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION notify(pipename text) RETURNS void AS $$
BEGIN
PERFORM dbms_pipe.pack_message(1);
PERFORM dbms_pipe.send_message(pipename);
END; $$ LANGUAGE plpgsql;
\set ECHO all
SELECT createImplicitPipe();
-- Bulk send messages
SELECT bulkSend();
-- An explicit private pipe
SELECT notify('recv_private1_notifier');
SELECT createExplicitPipe('private_pipe_1',3);
-- An explicit private pipe
SELECT notify('recv_private2_notifier');
SELECT createExplicitPipe('private_pipe_2',3);
-- An explicit public pipe (uses two-argument create_pipe)
SELECT notify('recv_public1_notifier');
SELECT createExplicitPipe('public_pipe_3',2);
-- An explicit public pipe (uses one-argument create_pipe)
SELECT notify('recv_public2_notifier');
SELECT createExplicitPipe('public_pipe_4',1);
-- tests send_message(text)
SELECT checkSend1();
-- tests send_message(text,integer)
SELECT checkSend2();
SELECT notifyDropTemp();
-- tests unique_session_name()
SELECT checkUniqueSessionNameA();
DROP FUNCTION createImplicitPipe();
DROP FUNCTION createExplicitPipe(text,integer);
DROP FUNCTION createPipe(text,integer);
DROP FUNCTION checkSend1();
DROP FUNCTION checkSend2();
DROP FUNCTION checkUniqueSessionNameA();
DROP FUNCTION bulkSend();
DROP FUNCTION notifyDropTemp();
DROP FUNCTION notify(text);
DROP FUNCTION send(text);