#include "postgres.h"
#include "funcapi.h"
#include "fmgr.h"
#include "storage/shmem.h"
#include "utils/memutils.h"
#include "utils/timestamp.h"
#include "storage/lwlock.h"
#include "miscadmin.h"
#include "string.h"
#include "lib/stringinfo.h"
#include "catalog/pg_type.h"
#include "utils/builtins.h"
#include "utils/date.h"
#include "utils/numeric.h"

#include "shmmc.h"
#include "pipe.h"
#include "orafunc.h"
#include "builtins.h"

/*
 * @ Pavel Stehule 2006
 */

#ifndef _GetCurrentTimestamp
#define _GetCurrentTimestamp()	GetCurrentTimestamp()
#endif

#ifndef GetNowFloat
#ifdef HAVE_INT64_TIMESTAMP
#define GetNowFloat()   ((float8) _GetCurrentTimestamp() / 1000000.0)
#else
#define GetNowFloat()   _GetCurrentTimestamp()
#endif
#endif

#define RESULT_DATA	0
#define RESULT_WAIT	1

#define NOT_INITIALIZED -1
#define ONE_YEAR (60*60*24*365)

PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_text);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_text);
PG_FUNCTION_INFO_V1(dbms_pipe_send_message);
PG_FUNCTION_INFO_V1(dbms_pipe_receive_message);
PG_FUNCTION_INFO_V1(dbms_pipe_unique_session_name);
PG_FUNCTION_INFO_V1(dbms_pipe_list_pipes);
PG_FUNCTION_INFO_V1(dbms_pipe_next_item_type);
PG_FUNCTION_INFO_V1(dbms_pipe_create_pipe);
PG_FUNCTION_INFO_V1(dbms_pipe_create_pipe_2);
PG_FUNCTION_INFO_V1(dbms_pipe_create_pipe_1);
PG_FUNCTION_INFO_V1(dbms_pipe_reset_buffer);
PG_FUNCTION_INFO_V1(dbms_pipe_purge);
PG_FUNCTION_INFO_V1(dbms_pipe_remove_pipe);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_date);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_date);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_timestamp);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_timestamp);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_number);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_number);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_bytea);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_bytea);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_record);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_record);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_integer);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_bigint);

typedef enum {
	IT_NO_MORE_ITEMS = 0,
	IT_NUMBER = 9,
	IT_VARCHAR = 11,
	IT_DATE = 12,
	IT_TIMESTAMPTZ = 13,
	IT_BYTEA = 23,
	IT_RECORD = 24
} message_data_type;

typedef struct _queue_item {
	void *ptr;
	struct _queue_item *next_item;
} queue_item;

typedef struct {
	bool is_valid;
	bool registered;
	char *pipe_name;
	char *creator;
	Oid  uid;
	struct _queue_item *items;
	int16 count;
	int16 limit;
	int size;
} pipe;

typedef struct {
	int32 size;
	message_data_type type;
	Oid tupType;
} message_data_item;

typedef struct {
	int32 size;
	int32 items_count;
	message_data_item *next;
} message_buffer;

#define message_buffer_size		(MAXALIGN(sizeof(message_buffer)))
#define message_buffer_get_content(buf)	((message_data_item *) (((char*)buf)+message_buffer_size))


#define message_data_item_size	(MAXALIGN(sizeof(message_data_item)))
#define message_data_get_content(msg) (((char *)msg) + message_data_item_size)
#define message_data_item_next(msg) \
	((message_data_item *) (message_data_get_content(msg) + MAXALIGN(msg->size)))

typedef struct PipesFctx {
	int pipe_nth;
} PipesFctx;

typedef struct
{
	LWLockId shmem_lock;
	pipe *pipes;
	alert_event *events;
	alert_lock *locks;
	size_t size;
	unsigned int sid;
	vardata data[1]; /* flexible array member */
} sh_memory;

#define sh_memory_size			(offsetof(sh_memory, data))

message_buffer *output_buffer = NULL;
message_buffer *input_buffer = NULL;

pipe* pipes = NULL;
LWLockId shmem_lock = NOT_INITIALIZED;
unsigned int sid;                                 /* session id */
Oid uid;

extern alert_event *events;
extern alert_lock  *locks;

/*
 * write on writer size bytes from ptr
 */

static void
pack_field(message_buffer *buffer, message_data_type type,
			int32 size, void *ptr, Oid tupType)
{
	int len;
	message_data_item *message;

	len = MAXALIGN(size) + message_data_item_size;
	if (MAXALIGN(buffer->size) + len > LOCALMSGSZ - message_buffer_size)
		ereport(ERROR,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of memory"),
				 errdetail("Packed message is bigger than local buffer."),
				 errhint("Increase LOCALMSGSZ in 'pipe.h' and recompile library.")));

	if (buffer->next == NULL)
		buffer->next =  message_buffer_get_content(buffer);

	message = buffer->next;

	message->size = size;
	message->type = type;
	message->tupType = tupType;

	/* padding bytes have to be zeroed - buffer creator is responsible to clear memory */

	memcpy(message_data_get_content(message), ptr, size);

	buffer->size += len;
	buffer->items_count++;
	buffer->next = message_data_item_next(message);
}


static void*
unpack_field(message_buffer *buffer, message_data_type *type,
				int32 *size, Oid *tupType)
{
	void *ptr;
	message_data_item *message;

	Assert(buffer != NULL);
	Assert(buffer->items_count > 0);
	Assert(buffer->next != NULL);

	message = buffer->next;
	*size = message->size;
	*type = message->type;
	*tupType = message->tupType;
	ptr = message_data_get_content(message);

	buffer->next = --buffer->items_count > 0 ? message_data_item_next(message) : NULL;

	return ptr;
}


/*
 * Add ptr to queue. If pipe doesn't exist, register new pipe
 */

bool
ora_lock_shmem(size_t size, int max_pipes, int max_events, int max_locks, bool reset)
{
	int i;
	bool found;

	sh_memory *sh_mem;

	if (pipes == NULL)
	{
		sh_mem = ShmemInitStruct("dbms_pipe", size, &found);
		uid = GetUserId();
		if (sh_mem == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory"),
					 errdetail("Failed while allocation block %lu bytes in shared memory.", (unsigned long) size)));

		if (!found)
		{
			shmem_lock = sh_mem->shmem_lock = LWLockAssign();
			LWLockAcquire(sh_mem->shmem_lock, LW_EXCLUSIVE);
			sh_mem->size = size - sh_memory_size;
			ora_sinit(sh_mem->data, size, true);
			pipes = sh_mem->pipes = ora_salloc(max_pipes*sizeof(pipe));
			sid = sh_mem->sid = 1;
			for (i = 0; i < max_pipes; i++)
				pipes[i].is_valid = false;

			events = sh_mem->events = ora_salloc(max_events*sizeof(alert_event));
			locks = sh_mem->locks = ora_salloc(max_locks*sizeof(alert_lock));

			for (i = 0; i < max_events; i++)
			{
				events[i].event_name = NULL;
				events[i].max_receivers = 0;
				events[i].receivers = NULL;
				events[i].messages = NULL;
			}
			for (i = 0; i < max_locks; i++)
			{
				locks[i].sid = -1;
				locks[i].echo = NULL;
			}

		}
		else if (sh_mem->shmem_lock != 0)
		{
			pipes = sh_mem->pipes;
			shmem_lock = sh_mem->shmem_lock;
			LWLockAcquire(sh_mem->shmem_lock, LW_EXCLUSIVE);
			ora_sinit(sh_mem->data, sh_mem->size, reset);
			sid = ++(sh_mem->sid);
			events = sh_mem->events;
			locks = sh_mem->locks;
		}
	}
	else
	{
		LWLockAcquire(shmem_lock, LW_EXCLUSIVE);
	}
/*
	if (reset && pipes == NULL)
		elog(ERROR, "Can't purge memory");
*/

	return pipes != NULL;
}


/*
 * can be enhanced access/hash.h
 */

static pipe*
find_pipe(text* pipe_name, bool* created, bool only_check)
{
	int i;
	pipe *result = NULL;

	*created = false;
	for (i = 0; i < MAX_PIPES; i++)
	{
		if (pipes[i].is_valid &&
			strncmp((char*)VARDATA(pipe_name), pipes[i].pipe_name, VARSIZE(pipe_name) - VARHDRSZ) == 0
			&& (strlen(pipes[i].pipe_name) == (VARSIZE(pipe_name) - VARHDRSZ)))
		{
			/* check owner if non public pipe */

			if (pipes[i].creator != NULL && pipes[i].uid != uid)
			{
				LWLockRelease(shmem_lock);
				ereport(ERROR,
						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
						 errmsg("insufficient privilege"),
						 errdetail("Insufficient privilege to access pipe")));
			}

			return &pipes[i];
		}
	}

	if (only_check)
		return result;

	for (i = 0; i < MAX_PIPES; i++)
		if (!pipes[i].is_valid)
		{
			if (NULL != (pipes[i].pipe_name = ora_scstring(pipe_name)))
			{
				pipes[i].is_valid = true;
				pipes[i].registered = false;
				pipes[i].creator = NULL;
				pipes[i].uid = -1;
				pipes[i].count = 0;
				pipes[i].limit = -1;

				*created = true;
				result = &pipes[i];
			}
			break;
		}

	return result;
}


static bool
new_last(pipe *p, void *ptr)
{
	queue_item *q, *aux_q;

	if (p->count >= p->limit && p->limit != -1)
		return false;

	if (p->items == NULL)
	{
		if (NULL == (p->items = ora_salloc(sizeof(queue_item))))
			return false;
		p->items->next_item = NULL;
		p->items->ptr = ptr;
		p->count = 1;
		return true;
	}
	q = p->items;
	while (q->next_item != NULL)
		q = q->next_item;


	if (NULL == (aux_q = ora_salloc(sizeof(queue_item))))
		return false;

	q->next_item = aux_q;
	aux_q->next_item = NULL;
	aux_q->ptr = ptr;

	p->count += 1;

	return true;
}


static void*
remove_first(pipe *p, bool *found)
{
	struct _queue_item *q;
	void *ptr = NULL;

	*found = false;

	if (NULL != (q = p->items))
	{
		p->count -= 1;
		ptr = q->ptr;
		p->items = q->next_item;
		*found = true;

		ora_sfree(q);
		if (p->items == NULL && !p->registered)
		{
			ora_sfree(p->pipe_name);
			p->is_valid = false;
		}

	}

	return ptr;
}


/* copy message to local memory, if exists */

static message_buffer*
get_from_pipe(text *pipe_name, bool *found)
{
	pipe *p;
	bool created;
	message_buffer *shm_msg;
	message_buffer *result = NULL;

	if (!ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES, MAX_EVENTS, MAX_LOCKS, false))
		return NULL;

	if (NULL != (p = find_pipe(pipe_name, &created,false)))
	{
		if (!created)
		{
			if (NULL != (shm_msg = remove_first(p, found)))
			{
				p->size -= shm_msg->size;

				result = (message_buffer*) MemoryContextAlloc(TopMemoryContext, shm_msg->size);
				memcpy(result, shm_msg, shm_msg->size);
				ora_sfree(shm_msg);
			}
		}
	}

	LWLockRelease(shmem_lock);

	return result;
}


/*
 * if ptr is null, then only register pipe
 */

static bool
add_to_pipe(text *pipe_name, message_buffer *ptr, int limit, bool limit_is_valid)
{
	pipe *p;
	bool created;
	bool result = false;
	message_buffer *sh_ptr;

	if (!ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES, MAX_EVENTS, MAX_LOCKS,false))
		return false;

	for (;;)
	{
		if (NULL != (p = find_pipe(pipe_name, &created, false)))
		{
			if (created)
				p->registered = ptr == NULL;

			if (limit_is_valid && (created || (p->limit < limit)))
				p->limit = limit;

			if (ptr != NULL)
			{
				if (NULL != (sh_ptr = ora_salloc(ptr->size)))
				{
					memcpy(sh_ptr,ptr,ptr->size);
					if (new_last(p, sh_ptr))
					{
						p->size += ptr->size;
						result = true;
						break;
					}
					ora_sfree(sh_ptr);
				}
				if (created)
				{
					/* I created new pipe, but haven't memory for new value */
					ora_sfree(p->pipe_name);
					p->is_valid = false;
					result = false;
				}
			}
			else
				result = true;
		}
		break;
	}
	LWLockRelease(shmem_lock);
	return result;
}


static void
remove_pipe(text *pipe_name, bool purge)
{
	pipe *p;
	bool created;

	if (NULL != (p = find_pipe(pipe_name, &created, true)))
	{
		queue_item *q = p->items;
		while (q != NULL)
		{
			queue_item *aux_q;

			aux_q = q->next_item;
			if (q->ptr)
				ora_sfree(q->ptr);
			ora_sfree(q);
			q = aux_q;
		}
		p->items = NULL;
		p->size = 0;
		p->count = 0;
		if (!(purge && p->registered))
		{
			ora_sfree(p->pipe_name);
			p->is_valid = false;
		}
	}
}


Datum
dbms_pipe_next_item_type (PG_FUNCTION_ARGS)
{
	PG_RETURN_INT32(input_buffer != NULL ? input_buffer->next->type : IT_NO_MORE_ITEMS);
}


static void
init_buffer(message_buffer *buffer, int32 size)
{
	memset(buffer, 0, size);
	buffer->size = message_buffer_size;
	buffer->items_count = 0;
	buffer->next = message_buffer_get_content(buffer);
}

static message_buffer*
check_buffer(message_buffer *buffer, int32 size)
{
	if (buffer == NULL)
	{
		buffer = (message_buffer*) MemoryContextAlloc(TopMemoryContext, size);
		if (buffer == NULL)
			ereport(ERROR,
					(errcode(ERRCODE_OUT_OF_MEMORY),
					 errmsg("out of memory"),
					 errdetail("Failed while allocation block %d bytes in memory.", size)));

		init_buffer(buffer, size);
	}

	return buffer;
}

Datum
dbms_pipe_pack_message_text(PG_FUNCTION_ARGS)
{
	text *str = PG_GETARG_TEXT_PP(0);

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
	pack_field(output_buffer, IT_VARCHAR,
		VARSIZE_ANY_EXHDR(str), VARDATA_ANY(str), InvalidOid);

	PG_RETURN_VOID();
}


Datum
dbms_pipe_pack_message_date(PG_FUNCTION_ARGS)
{
	DateADT dt = PG_GETARG_DATEADT(0);

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
	pack_field(output_buffer, IT_DATE,
			   sizeof(dt), &dt, InvalidOid);

	PG_RETURN_VOID();
}


Datum
dbms_pipe_pack_message_timestamp(PG_FUNCTION_ARGS)
{
	TimestampTz dt = PG_GETARG_TIMESTAMPTZ(0);

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
	pack_field(output_buffer, IT_TIMESTAMPTZ,
			   sizeof(dt), &dt, InvalidOid);

	PG_RETURN_VOID();
}


Datum
dbms_pipe_pack_message_number(PG_FUNCTION_ARGS)
{
	Numeric num = PG_GETARG_NUMERIC(0);

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
	pack_field(output_buffer, IT_NUMBER,
			   VARSIZE(num) - VARHDRSZ, VARDATA(num), InvalidOid);

	PG_RETURN_VOID();
}


Datum
dbms_pipe_pack_message_bytea(PG_FUNCTION_ARGS)
{
	bytea *data = PG_GETARG_BYTEA_P(0);

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
	pack_field(output_buffer, IT_BYTEA,
		VARSIZE_ANY_EXHDR(data), VARDATA_ANY(data), InvalidOid);

	PG_RETURN_VOID();
}


/*
 *  We can serialize only typed record
 */

Datum
dbms_pipe_pack_message_record(PG_FUNCTION_ARGS)
{
	HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
	Oid tupType;
	bytea *data;
	FunctionCallInfoData info;

	tupType = HeapTupleHeaderGetTypeId(rec);

	/*
	 * Normally one would call record_send() using DirectFunctionCall3,
	 * but that does not work since record_send wants to cache some data
	 * using fcinfo->flinfo->fn_extra.  So we need to pass it our own
	 * flinfo parameter.
	 */
	InitFunctionCallInfoData(info, fcinfo->flinfo, 3, NULL, NULL);

	info.arg[0] = PointerGetDatum(rec);
	info.arg[1] = ObjectIdGetDatum(tupType);
	info.arg[2] = Int32GetDatum(-1);
	info.argnull[0] = false;
	info.argnull[1] = false;
	info.argnull[2] = false;

	data = (bytea*) DatumGetPointer(record_send(&info));

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
	pack_field(output_buffer, IT_RECORD,
			   VARSIZE(data), VARDATA(data), tupType);

	PG_RETURN_VOID();
}


static Datum
dbms_pipe_unpack_message(PG_FUNCTION_ARGS, message_data_type dtype)
{
	Oid		tupType;
	void *ptr;
	message_data_type type;
	int32 size;
	Datum result;
	message_data_type next_type;

	if (input_buffer == NULL ||
		input_buffer->items_count <= 0 ||
		input_buffer->next == NULL ||
		input_buffer->next->type == IT_NO_MORE_ITEMS)
		PG_RETURN_NULL();

	next_type = input_buffer->next->type;
	if (next_type != dtype)
		ereport(ERROR,
				(errcode(ERRCODE_DATATYPE_MISMATCH),
				 errmsg("datatype mismatch"),
				 errdetail("unpack unexpected type: %d", next_type)));

	ptr = unpack_field(input_buffer, &type, &size, &tupType);
	Assert(ptr != NULL);

	switch (type)
	{
		case IT_TIMESTAMPTZ:
			result = TimestampTzGetDatum(*(TimestampTz*)ptr);
			break;
		case IT_DATE:
			result = DateADTGetDatum(*(DateADT*)ptr);
			break;
		case IT_VARCHAR:
		case IT_NUMBER:
		case IT_BYTEA:
			result = PointerGetDatum(cstring_to_text_with_len(ptr, size));
			break;
		case IT_RECORD:
		{
			FunctionCallInfoData	info;
			StringInfoData	buf;
			text		   *data = cstring_to_text_with_len(ptr, size);

			buf.data = VARDATA(data);
			buf.len = VARSIZE(data) - VARHDRSZ;
			buf.maxlen = buf.len;
			buf.cursor = 0;

			/*
			 * Normally one would call record_recv() using DirectFunctionCall3,
			 * but that does not work since record_recv wants to cache some data
			 * using fcinfo->flinfo->fn_extra.  So we need to pass it our own
			 * flinfo parameter.
			 */
			InitFunctionCallInfoData(info, fcinfo->flinfo, 3, NULL, NULL);

			info.arg[0] = PointerGetDatum(&buf);
			info.arg[1] = ObjectIdGetDatum(tupType);
			info.arg[2] = Int32GetDatum(-1);
			info.argnull[0] = false;
			info.argnull[1] = false;
			info.argnull[2] = false;

			result = record_recv(&info);
			break;
		}
		default:
			elog(ERROR, "unexpected type: %d", type);
			result = (Datum) 0;	/* keep compiler quiet */
	}

	if (input_buffer->items_count == 0)
	{
		pfree(input_buffer);
		input_buffer = NULL;
	}

	PG_RETURN_DATUM(result);
}


Datum
dbms_pipe_unpack_message_text(PG_FUNCTION_ARGS)
{
	return dbms_pipe_unpack_message(fcinfo, IT_VARCHAR);
}


Datum
dbms_pipe_unpack_message_date(PG_FUNCTION_ARGS)
{
	return dbms_pipe_unpack_message(fcinfo, IT_DATE);
}

Datum
dbms_pipe_unpack_message_timestamp(PG_FUNCTION_ARGS)
{
	return dbms_pipe_unpack_message(fcinfo, IT_TIMESTAMPTZ);
}


Datum
dbms_pipe_unpack_message_number(PG_FUNCTION_ARGS)
{
	return dbms_pipe_unpack_message(fcinfo, IT_NUMBER);
}


Datum
dbms_pipe_unpack_message_bytea(PG_FUNCTION_ARGS)
{
	return dbms_pipe_unpack_message(fcinfo, IT_BYTEA);
}


Datum
dbms_pipe_unpack_message_record(PG_FUNCTION_ARGS)
{
	return dbms_pipe_unpack_message(fcinfo, IT_RECORD);
}


#define WATCH_PRE(t, et, c) \
et = GetNowFloat() + (float8)t; c = 0; \
do \
{ \

#define WATCH_POST(t,et,c) \
if (GetNowFloat() >= et) \
PG_RETURN_INT32(RESULT_WAIT); \
if (cycle++ % 100 == 0) \
CHECK_FOR_INTERRUPTS(); \
pg_usleep(10000L); \
} while(true && t != 0);


Datum
dbms_pipe_receive_message(PG_FUNCTION_ARGS)
{
	text *pipe_name = NULL;
	int timeout = ONE_YEAR;
	int cycle = 0;
	float8 endtime;
	bool found = false;

	if (PG_ARGISNULL(0))
		ereport(ERROR,
				(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
				 errmsg("pipe name is NULL"),
				 errdetail("Pipename may not be NULL.")));
	else
		pipe_name = PG_GETARG_TEXT_P(0);

	if (!PG_ARGISNULL(1))
		timeout = PG_GETARG_INT32(1);

	if (input_buffer != NULL)
	{
		pfree(input_buffer);
		input_buffer = NULL;
	}

	WATCH_PRE(timeout, endtime, cycle);
	if (NULL != (input_buffer = get_from_pipe(pipe_name, &found)))
	{
		input_buffer->next = message_buffer_get_content(input_buffer);
		break;
	}
/* found empty message */
	if (found)
		break;

	WATCH_POST(timeout, endtime, cycle);
	PG_RETURN_INT32(RESULT_DATA);
}


Datum
dbms_pipe_send_message(PG_FUNCTION_ARGS)
{
	text *pipe_name = NULL;
	int timeout = ONE_YEAR;
	int limit = 0;
	bool valid_limit;

	int cycle = 0;
	float8 endtime;

	if (PG_ARGISNULL(0))
		ereport(ERROR,
				(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
				 errmsg("pipe name is NULL"),
				 errdetail("Pipename may not be NULL.")));
	else
		pipe_name = PG_GETARG_TEXT_P(0);

	output_buffer = check_buffer(output_buffer, LOCALMSGSZ);

	if (!PG_ARGISNULL(1))
		timeout = PG_GETARG_INT32(1);

	if (PG_ARGISNULL(2))
		valid_limit = false;
	else
	{
		limit = PG_GETARG_INT32(2);
		valid_limit = true;
	}

	if (input_buffer != NULL) /* XXX Strange? */
	{
		pfree(input_buffer);
		input_buffer = NULL;
	}

	WATCH_PRE(timeout, endtime, cycle);
	if (add_to_pipe(pipe_name, output_buffer,
					limit, valid_limit))
		break;
	WATCH_POST(timeout, endtime, cycle);

	init_buffer(output_buffer, LOCALMSGSZ);

	PG_RETURN_INT32(RESULT_DATA);
}


Datum
dbms_pipe_unique_session_name (PG_FUNCTION_ARGS)
{
	StringInfoData strbuf;
	text *result;

	float8 endtime;
	int cycle = 0;
	int timeout = 10;

	WATCH_PRE(timeout, endtime, cycle);
	if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES,MAX_EVENTS,MAX_LOCKS,false))
	{
		initStringInfo(&strbuf);
		appendStringInfo(&strbuf,"PG$PIPE$%d$%d",sid, MyProcPid);

		result = cstring_to_text_with_len(strbuf.data, strbuf.len);
		pfree(strbuf.data);
		LWLockRelease(shmem_lock);

		PG_RETURN_TEXT_P(result);
	}
	WATCH_POST(timeout, endtime, cycle);
	LOCK_ERROR();

	PG_RETURN_NULL();
}

#define DB_PIPES_COLS		6

Datum
dbms_pipe_list_pipes(PG_FUNCTION_ARGS)
{
	FuncCallContext *funcctx;
	TupleDesc        tupdesc;
	TupleTableSlot  *slot;
	AttInMetadata   *attinmeta;
	PipesFctx       *fctx;

	float8 endtime;
	int cycle = 0;
	int timeout = 10;

	if (SRF_IS_FIRSTCALL())
	{
		int		i;
		MemoryContext  oldcontext;
		bool has_lock = false;

		WATCH_PRE(timeout, endtime, cycle);
		if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES, MAX_EVENTS, MAX_LOCKS, false))
		{
			has_lock = true;
			break;
		}
		WATCH_POST(timeout, endtime, cycle);
		if (!has_lock)
			LOCK_ERROR();

		funcctx = SRF_FIRSTCALL_INIT();
		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

		fctx = palloc(sizeof(PipesFctx));
		funcctx->user_fctx = fctx;
		fctx->pipe_nth = 0;

		tupdesc = CreateTemplateTupleDesc(DB_PIPES_COLS , false);
		i = 0;
		TupleDescInitEntry(tupdesc, ++i, "name",    VARCHAROID, -1, 0);
		TupleDescInitEntry(tupdesc, ++i, "items",   INT4OID,    -1, 0);
		TupleDescInitEntry(tupdesc, ++i, "size",    INT4OID,    -1, 0);
		TupleDescInitEntry(tupdesc, ++i, "limit",   INT4OID,    -1, 0);
		TupleDescInitEntry(tupdesc, ++i, "private", BOOLOID,    -1, 0);
		TupleDescInitEntry(tupdesc, ++i, "owner",   VARCHAROID, -1, 0);
		Assert(i == DB_PIPES_COLS);

		slot = TupleDescGetSlot(tupdesc);
		funcctx->slot = slot;

		attinmeta = TupleDescGetAttInMetadata(tupdesc);
		funcctx->attinmeta = attinmeta;

		MemoryContextSwitchTo(oldcontext);
	}

	funcctx = SRF_PERCALL_SETUP();
	fctx = (PipesFctx *) funcctx->user_fctx;

	while (fctx->pipe_nth < MAX_PIPES)
	{
		if (pipes[fctx->pipe_nth].is_valid)
		{
			Datum		result;
			HeapTuple	tuple;
			char	   *values[DB_PIPES_COLS];
			char		items[16];
			char		size[16];
			char		limit[16];

			/* name */
			values[0] = pipes[fctx->pipe_nth].pipe_name;
			/* items */
			snprintf(items, lengthof(items), "%d", pipes[fctx->pipe_nth].count);
			values[1] = items;
			/* items */
			snprintf(size, lengthof(size), "%d", pipes[fctx->pipe_nth].size);
			values[2] = size;
			/* limit */
			if (pipes[fctx->pipe_nth].limit != -1)
			{
				snprintf(limit, lengthof(limit), "%d", pipes[fctx->pipe_nth].limit);
				values[3] = limit;
			}
			else
				values[3] = NULL;
			/* private */
			values[4] = (pipes[fctx->pipe_nth].creator ? "true" : "false");
			/* owner */
			values[5] = pipes[fctx->pipe_nth].creator;

			tuple = BuildTupleFromCStrings(funcctx->attinmeta, values);
			result = TupleGetDatum(funcctx->slot, tuple);

			fctx->pipe_nth += 1;
			SRF_RETURN_NEXT(funcctx, result);
		}
		fctx->pipe_nth += 1;
	}

	LWLockRelease(shmem_lock);
	SRF_RETURN_DONE(funcctx);
}

/*
 * secondary functions
 */

/*
 * Registration explicit pipes
 *   dbms_pipe.create_pipe(pipe_name varchar, limit := -1 int, private := false bool);
 */

Datum
dbms_pipe_create_pipe (PG_FUNCTION_ARGS)
{
	text *pipe_name = NULL;
	int   limit = 0;
	bool  is_private;
	bool  limit_is_valid = false;
	bool  created;
	float8 endtime;
	int cycle = 0;
	int timeout = 10;

	if (PG_ARGISNULL(0))
		ereport(ERROR,
				(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
				 errmsg("pipe name is NULL"),
				 errdetail("Pipename may not be NULL.")));
	else
		pipe_name = PG_GETARG_TEXT_P(0);

	if (!PG_ARGISNULL(1))
	{
		limit = PG_GETARG_INT32(1);
		limit_is_valid = true;
	}

	is_private = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);

	WATCH_PRE(timeout, endtime, cycle);
	if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES,MAX_EVENTS,MAX_LOCKS,false))
	{
		pipe *p;
		if (NULL != (p = find_pipe(pipe_name, &created, false)))
		{
			if (!created)
			{
				LWLockRelease(shmem_lock);
				ereport(ERROR,
						(errcode(ERRCODE_DUPLICATE_OBJECT),
						 errmsg("pipe creation error"),
						 errdetail("Pipe is registered.")));
			}
			if (is_private)
			{
				char *user;

				p->uid = GetUserId();
#ifdef GP_VERSION_NUM
				user = (GetUserNameFromId(p->uid)); 
#else
				user = (char*)DirectFunctionCall1(namein, CStringGetDatum(GetUserNameFromId(p->uid)));
#endif
				p->creator = ora_sstrcpy(user);
				pfree(user);
			}
			p->limit = limit_is_valid ? limit : -1;
			p->registered = true;

			LWLockRelease(shmem_lock);
			PG_RETURN_VOID();
		}
	}
	WATCH_POST(timeout, endtime, cycle);
	LOCK_ERROR();

	PG_RETURN_VOID();
}


/*
 * Clean local input, output buffers
 */

Datum
dbms_pipe_reset_buffer(PG_FUNCTION_ARGS)
{
	if (output_buffer != NULL)
	{
		pfree(output_buffer);
		output_buffer = NULL;
	}

	if (input_buffer != NULL)
	{
		pfree(input_buffer);
		input_buffer = NULL;
	}

	PG_RETURN_VOID();
}


/*
 * Remove all stored messages in pipe. Remove implicit created
 * pipe.
 */

Datum
dbms_pipe_purge (PG_FUNCTION_ARGS)
{
	text *pipe_name = PG_GETARG_TEXT_P(0);

	float8 endtime;
	int cycle = 0;
	int timeout = 10;

	WATCH_PRE(timeout, endtime, cycle);
	if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES,MAX_EVENTS,MAX_LOCKS,false))
	{

		remove_pipe(pipe_name, true);
		LWLockRelease(shmem_lock);

		PG_RETURN_VOID();
	}
	WATCH_POST(timeout, endtime, cycle);
	LOCK_ERROR();

	PG_RETURN_VOID();
}

/*
 * Remove pipe if exists
 */

Datum
dbms_pipe_remove_pipe (PG_FUNCTION_ARGS)
{
	text *pipe_name = PG_GETARG_TEXT_P(0);

	float8 endtime;
	int cycle = 0;
	int timeout = 10;

	WATCH_PRE(timeout, endtime, cycle);
	if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES,MAX_EVENTS,MAX_LOCKS,false))
	{

		remove_pipe(pipe_name, false);
		LWLockRelease(shmem_lock);

		PG_RETURN_VOID();
	}
	WATCH_POST(timeout, endtime, cycle);
	LOCK_ERROR();

	PG_RETURN_VOID();
}


/*
 * Some void udf which I can't wrap in sql
 */

Datum
dbms_pipe_create_pipe_2 (PG_FUNCTION_ARGS)
{
	Datum arg1 = PG_GETARG_DATUM(0);
	Datum arg2 = PG_GETARG_DATUM(1);

	DirectFunctionCall3(dbms_pipe_create_pipe,
						arg1,
						arg2,
						BoolGetDatum(false));

	PG_RETURN_VOID();
}

Datum
dbms_pipe_create_pipe_1 (PG_FUNCTION_ARGS)
{
	Datum arg1 = PG_GETARG_DATUM(0);

	DirectFunctionCall3(dbms_pipe_create_pipe,
						arg1,
						(Datum)0,
						BoolGetDatum(false));

	PG_RETURN_VOID();
}

Datum
dbms_pipe_pack_message_integer(PG_FUNCTION_ARGS)
{
	/* Casting from int4 to numeric */
	DirectFunctionCall1(dbms_pipe_pack_message_number,
				DirectFunctionCall1(int4_numeric, PG_GETARG_DATUM(0)));

	PG_RETURN_VOID();
}

Datum
dbms_pipe_pack_message_bigint(PG_FUNCTION_ARGS)
{
	/* Casting from int8 to numeric */
	DirectFunctionCall1(dbms_pipe_pack_message_number,
				DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(0)));

	PG_RETURN_VOID();

}
