blob: 19540b67e29198767eded648542a64222d7c1e9f [file]
#include "postgres.h"
#include "funcapi.h"
#include "fmgr.h"
#include "access/htup_details.h"
#include "storage/shmem.h"
#include "utils/memutils.h"
#include "utils/timestamp.h"
#include "storage/lwlock.h"
#include "miscadmin.h"
#include "lib/stringinfo.h"
#include "catalog/pg_type.h"
#include "utils/builtins.h"
#include "utils/date.h"
#include "utils/numeric.h"
#if PG_VERSION_NUM >= 140000
#include "utils/wait_event.h"
#elif PG_VERSION_NUM >= 130000
#include "pgstat.h"
#endif
#include "shmmc.h"
#include "pipe.h"
#include "orafce.h"
#include "builtins.h"
#include <string.h>
/*
* @ Pavel Stehule 2006-2023
*/
#ifndef _GetCurrentTimestamp
#define _GetCurrentTimestamp() GetCurrentTimestamp()
#endif
#ifndef GetNowFloat
#ifdef HAVE_INT64_TIMESTAMP
#define GetNowFloat() ((float8) _GetCurrentTimestamp() / 1000000.0)
#else
#define GetNowFloat() _GetCurrentTimestamp()
#endif
#endif
#define RESULT_DATA 0
#define RESULT_TIMEOUT 1
/* in sec 1000 days */
#define MAXWAIT 86400000
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_text);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_text);
PG_FUNCTION_INFO_V1(dbms_pipe_send_message);
PG_FUNCTION_INFO_V1(dbms_pipe_receive_message);
PG_FUNCTION_INFO_V1(dbms_pipe_unique_session_name);
PG_FUNCTION_INFO_V1(dbms_pipe_list_pipes);
PG_FUNCTION_INFO_V1(dbms_pipe_next_item_type);
PG_FUNCTION_INFO_V1(dbms_pipe_create_pipe);
PG_FUNCTION_INFO_V1(dbms_pipe_create_pipe_2);
PG_FUNCTION_INFO_V1(dbms_pipe_create_pipe_1);
PG_FUNCTION_INFO_V1(dbms_pipe_reset_buffer);
PG_FUNCTION_INFO_V1(dbms_pipe_purge);
PG_FUNCTION_INFO_V1(dbms_pipe_remove_pipe);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_date);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_date);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_timestamp);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_timestamp);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_number);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_number);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_bytea);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_bytea);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_record);
PG_FUNCTION_INFO_V1(dbms_pipe_unpack_message_record);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_integer);
PG_FUNCTION_INFO_V1(dbms_pipe_pack_message_bigint);
typedef enum {
IT_NO_MORE_ITEMS = 0,
IT_NUMBER = 9,
IT_VARCHAR = 11,
IT_DATE = 12,
IT_TIMESTAMPTZ = 13,
IT_BYTEA = 23,
IT_RECORD = 24
} message_data_type;
typedef struct _queue_item {
void *ptr;
struct _queue_item *next_item;
} queue_item;
typedef struct {
long identity;
bool is_valid;
bool registered;
char *pipe_name;
char *creator;
Oid uid;
struct _queue_item *items;
struct _queue_item *last_item;
int16 count;
int16 limit;
int size;
} orafce_pipe;
typedef struct {
int32 size;
message_data_type type;
Oid tupType;
} message_data_item;
typedef struct {
int32 size;
int32 items_count;
message_data_item *next;
} message_buffer;
#define message_buffer_size (MAXALIGN(sizeof(message_buffer)))
#define message_buffer_get_content(buf) ((message_data_item *) (((char*)buf)+message_buffer_size))
#define message_data_item_size (MAXALIGN(sizeof(message_data_item)))
#define message_data_get_content(msg) (((char *)msg) + message_data_item_size)
#define message_data_item_next(msg) \
((message_data_item *) (message_data_get_content(msg) + MAXALIGN(msg->size)))
typedef struct PipesFctx {
int pipe_nth;
} PipesFctx;
typedef struct
{
int tranche_id;
LWLock shmem_lock;
orafce_pipe *pipes;
alert_event *events;
alert_lock *locks;
#if PG_VERSION_NUM >= 130000
ConditionVariable pipe_cv;
ConditionVariable alert_cv;
#endif
size_t size;
int sid;
long identity_seq;
vardata data[1]; /* flexible array member */
} sh_memory;
#define sh_memory_size (offsetof(sh_memory, data))
message_buffer *output_buffer = NULL;
message_buffer *input_buffer = NULL;
orafce_pipe* pipes = NULL;
long *identity_seq = NULL;
#define NOT_INITIALIZED NULL
LWLockId shmem_lockid = NOT_INITIALIZED;
int sid; /* session id */
extern alert_event *events;
extern alert_lock *locks;
#if PG_VERSION_NUM >= 130000
ConditionVariable *pipe_cv = NULL;
ConditionVariable *alert_cv = NULL;
#endif
/*
* write on writer size bytes from ptr
*/
static void
pack_field(message_buffer *buffer, message_data_type type,
int32 size, void *ptr, Oid tupType)
{
int len;
message_data_item *message;
len = MAXALIGN(size) + message_data_item_size;
if (MAXALIGN(buffer->size) + len > LOCALMSGSZ - message_buffer_size)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory"),
errdetail("Packed message is bigger than local buffer."),
errhint("Increase LOCALMSGSZ in 'pipe.h' and recompile library.")));
if (buffer->next == NULL)
buffer->next = message_buffer_get_content(buffer);
message = buffer->next;
message->size = size;
message->type = type;
message->tupType = tupType;
/* padding bytes have to be zeroed - buffer creator is responsible to clear memory */
memcpy(message_data_get_content(message), ptr, size);
buffer->size += len;
buffer->items_count++;
buffer->next = message_data_item_next(message);
}
static void*
unpack_field(message_buffer *buffer, message_data_type *type,
int32 *size, Oid *tupType)
{
void *ptr;
message_data_item *message;
Assert(buffer);
Assert(buffer->items_count > 0);
Assert(buffer->next);
message = buffer->next;
Assert(message);
*size = message->size;
*type = message->type;
*tupType = message->tupType;
ptr = message_data_get_content(message);
buffer->next = --buffer->items_count > 0 ? message_data_item_next(message) : NULL;
return ptr;
}
/*
* Add ptr to queue. If pipe doesn't exist, register new pipe
*/
bool
ora_lock_shmem(size_t size, int max_pipes, int max_events, int max_locks, bool reset)
{
bool found;
/* reset is always false, really */
Assert(!reset);
if (pipes == NULL)
{
sh_memory *sh_mem;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
sh_mem = ShmemInitStruct("dbms_pipe", size, &found);
if (!found)
{
int i;
sh_mem->tranche_id = LWLockNewTrancheId();
LWLockInitialize(&sh_mem->shmem_lock, sh_mem->tranche_id);
LWLockRegisterTranche(sh_mem->tranche_id, "orafce");
shmem_lockid = &sh_mem->shmem_lock;
sh_mem->identity_seq = 0;
sh_mem->size = size - sh_memory_size;
ora_sinit(sh_mem->data, size, true);
pipes = sh_mem->pipes = ora_salloc(max_pipes*sizeof(orafce_pipe));
sid = sh_mem->sid = 1;
for (i = 0; i < max_pipes; i++)
pipes[i].is_valid = false;
events = sh_mem->events = ora_salloc(max_events*sizeof(alert_event));
locks = sh_mem->locks = ora_salloc(max_locks*sizeof(alert_lock));
for (i = 0; i < max_events; i++)
{
events[i].event_name = NULL;
events[i].max_receivers = 0;
events[i].receivers = NULL;
events[i].messages = NULL;
}
for (i = 0; i < max_locks; i++)
{
locks[i].sid = -1;
locks[i].echo = NULL;
}
#if PG_VERSION_NUM >= 130000
ConditionVariableInit(&sh_mem->pipe_cv);
ConditionVariableInit(&sh_mem->alert_cv);
pipe_cv = &sh_mem->pipe_cv;
alert_cv = &sh_mem->alert_cv;
#endif
identity_seq = &sh_mem->identity_seq;
}
else
{
LWLockRegisterTranche(sh_mem->tranche_id, "orafce");
shmem_lockid = &sh_mem->shmem_lock;
#if PG_VERSION_NUM >= 130000
pipe_cv = &sh_mem->pipe_cv;
alert_cv = &sh_mem->alert_cv;
#endif
pipes = sh_mem->pipes;
ora_sinit(sh_mem->data, sh_mem->size, false);
sid = ++(sh_mem->sid);
events = sh_mem->events;
locks = sh_mem->locks;
identity_seq = &sh_mem->identity_seq;
}
LWLockRelease(AddinShmemInitLock);
}
Assert(pipes != NULL);
LWLockAcquire(shmem_lockid, LW_EXCLUSIVE);
return true;
}
#define NOT_ASSIGNED_IDENTITY -1
/*
* can be enhanced access/hash.h
*/
static orafce_pipe*
find_pipe(text* pipe_name,
bool* created,
bool only_check,
long *expected_identity,
bool *identity_alarm)
{
int i;
orafce_pipe *result = NULL;
*created = false;
Assert(!expected_identity || identity_alarm);
if (identity_alarm)
*identity_alarm = false;
for (i = 0; i < MAX_PIPES; i++)
{
if (pipes[i].is_valid &&
strncmp((char*)VARDATA(pipe_name), pipes[i].pipe_name, VARSIZE(pipe_name) - VARHDRSZ) == 0
&& (strlen(pipes[i].pipe_name) == (VARSIZE(pipe_name) - VARHDRSZ)))
{
if (expected_identity && *expected_identity >= 0
&& pipes[i].identity != *expected_identity)
{
*identity_alarm = true;
return result;
}
/* check owner if non public pipe */
if (pipes[i].creator != NULL && pipes[i].uid != GetUserId())
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("insufficient privilege"),
errdetail("Insufficient privilege to access pipe")));
}
if (expected_identity)
*expected_identity = pipes[i].identity;
return &pipes[i];
}
}
if (only_check)
return result;
if (expected_identity && *expected_identity >= 0)
{
*identity_alarm = true;
return result;
}
for (i = 0; i < MAX_PIPES; i++)
if (!pipes[i].is_valid)
{
if (NULL != (pipes[i].pipe_name = ora_scstring(pipe_name)))
{
pipes[i].is_valid = true;
pipes[i].registered = false;
pipes[i].creator = NULL;
pipes[i].uid = -1;
pipes[i].count = 0;
pipes[i].limit = -1;
*created = true;
result = &pipes[i];
if (expected_identity)
*expected_identity = pipes[i].identity = *identity_seq++;
}
break;
}
return result;
}
static bool
new_last(orafce_pipe *p, void *ptr, size_t size)
{
queue_item *aux_q;
if (p->count >= p->limit && p->limit != -1)
return false;
if (p->limit == -1 &&
p->count > 0 &&
(p->size + size + sizeof(queue_item) > 8 * 1024))
return false;
if (p->items == NULL)
{
if (NULL == (p->items = ora_salloc(sizeof(queue_item))))
return false;
p->items->next_item = NULL;
p->items->ptr = ptr;
p->last_item = p->items;
p->count = 1;
return true;
}
if (NULL == (aux_q = ora_salloc(sizeof(queue_item))))
return false;
p->last_item->next_item = aux_q;
p->last_item = aux_q;
aux_q->next_item = NULL;
aux_q->ptr = ptr;
p->count += 1;
return true;
}
static void*
remove_first(orafce_pipe *p, bool *found)
{
struct _queue_item *q;
void *ptr = NULL;
*found = false;
if (NULL != (q = p->items))
{
p->count -= 1;
ptr = q->ptr;
p->items = q->next_item;
*found = true;
ora_sfree(q);
if (p->items == NULL && !p->registered)
{
ora_sfree(p->pipe_name);
if (p->creator)
{
ora_sfree(p->creator);
p->creator = NULL;
}
p->is_valid = false;
}
}
return ptr;
}
/* copy message to local memory, if exists */
static message_buffer*
get_from_pipe(text *pipe_name,
bool *found,
long *identity, bool *identity_alarm)
{
orafce_pipe *p;
bool created;
message_buffer *result = NULL;
if (!ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES, MAX_EVENTS, MAX_LOCKS, false))
return NULL;
if (NULL != (p = find_pipe(pipe_name, &created, false, identity, identity_alarm)))
{
if (!created)
{
message_buffer *shm_msg;
if (NULL != (shm_msg = remove_first(p, found)))
{
p->size -= shm_msg->size;
result = (message_buffer*) MemoryContextAlloc(TopMemoryContext, shm_msg->size);
memcpy(result, shm_msg, shm_msg->size);
ora_sfree(shm_msg);
}
}
}
LWLockRelease(shmem_lockid);
return result;
}
/*
* if ptr is null, then only register pipe
*/
static bool
add_to_pipe(text *pipe_name,
message_buffer *ptr,
int limit, bool limit_is_valid,
long *identity, bool *identity_alarm)
{
bool created;
bool result = false;
message_buffer *sh_ptr;
if (!ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES, MAX_EVENTS, MAX_LOCKS,false))
return false;
for (;;)
{
orafce_pipe *p;
if (NULL != (p = find_pipe(pipe_name, &created, false, identity, identity_alarm)))
{
if (created)
p->registered = ptr == NULL;
if (limit_is_valid && (created || (p->limit < limit)))
p->limit = limit;
if (ptr != NULL)
{
if (NULL != (sh_ptr = ora_salloc(ptr->size)))
{
memcpy(sh_ptr,ptr,ptr->size);
if (new_last(p, sh_ptr, ptr->size))
{
p->size += ptr->size;
result = true;
break;
}
ora_sfree(sh_ptr);
}
if (created)
{
/* I created new pipe, but haven't memory for new value */
ora_sfree(p->pipe_name);
p->is_valid = false;
result = false;
}
}
else
result = true;
}
break;
}
LWLockRelease(shmem_lockid);
return result;
}
static void
remove_pipe(text *pipe_name, bool purge)
{
orafce_pipe *p;
bool created;
if (NULL != (p = find_pipe(pipe_name, &created, true, NULL, NULL)))
{
queue_item *q = p->items;
while (q != NULL)
{
queue_item *aux_q;
aux_q = q->next_item;
if (q->ptr)
ora_sfree(q->ptr);
ora_sfree(q);
q = aux_q;
}
p->items = NULL;
p->size = 0;
p->count = 0;
if (!(purge && p->registered))
{
ora_sfree(p->pipe_name);
p->is_valid = false;
if (p->creator)
{
ora_sfree(p->creator);
p->creator = NULL;
}
}
}
}
Datum
dbms_pipe_next_item_type (PG_FUNCTION_ARGS)
{
PG_RETURN_INT32(input_buffer != NULL ? input_buffer->next->type : IT_NO_MORE_ITEMS);
}
static void
reset_buffer(message_buffer *buffer, int32 size)
{
memset(buffer, 0, size);
buffer->size = message_buffer_size;
buffer->items_count = 0;
buffer->next = message_buffer_get_content(buffer);
}
static message_buffer*
check_buffer(message_buffer *buffer, int32 size)
{
if (buffer == NULL)
{
buffer = (message_buffer*) MemoryContextAlloc(TopMemoryContext, size);
if (buffer == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory"),
errdetail("Failed while allocation block %d bytes in memory.", size)));
reset_buffer(buffer, size);
}
return buffer;
}
Datum
dbms_pipe_pack_message_text(PG_FUNCTION_ARGS)
{
text *str = PG_GETARG_TEXT_PP(0);
output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
pack_field(output_buffer, IT_VARCHAR,
VARSIZE_ANY_EXHDR(str), VARDATA_ANY(str), InvalidOid);
PG_RETURN_VOID();
}
Datum
dbms_pipe_pack_message_date(PG_FUNCTION_ARGS)
{
DateADT dt = PG_GETARG_DATEADT(0);
output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
pack_field(output_buffer, IT_DATE,
sizeof(dt), &dt, InvalidOid);
PG_RETURN_VOID();
}
Datum
dbms_pipe_pack_message_timestamp(PG_FUNCTION_ARGS)
{
TimestampTz dt = PG_GETARG_TIMESTAMPTZ(0);
output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
pack_field(output_buffer, IT_TIMESTAMPTZ,
sizeof(dt), &dt, InvalidOid);
PG_RETURN_VOID();
}
Datum
dbms_pipe_pack_message_number(PG_FUNCTION_ARGS)
{
Numeric num = PG_GETARG_NUMERIC(0);
output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
pack_field(output_buffer, IT_NUMBER,
VARSIZE(num) - VARHDRSZ, VARDATA(num), InvalidOid);
PG_RETURN_VOID();
}
Datum
dbms_pipe_pack_message_bytea(PG_FUNCTION_ARGS)
{
bytea *data = PG_GETARG_BYTEA_P(0);
output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
pack_field(output_buffer, IT_BYTEA,
VARSIZE_ANY_EXHDR(data), VARDATA_ANY(data), InvalidOid);
PG_RETURN_VOID();
}
static void
init_args_3(FunctionCallInfo info, Datum arg0, Datum arg1, Datum arg2)
{
#if PG_VERSION_NUM >= 120000
info->args[0].value = arg0;
info->args[1].value = arg1;
info->args[2].value = arg2;
info->args[0].isnull = false;
info->args[1].isnull = false;
info->args[2].isnull = false;
#else
info->arg[0] = arg0;
info->arg[1] = arg1;
info->arg[2] = arg2;
info->argnull[0] = false;
info->argnull[1] = false;
info->argnull[2] = false;
#endif
}
/*
* We can serialize only typed record
*/
Datum
dbms_pipe_pack_message_record(PG_FUNCTION_ARGS)
{
HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
Oid tupType;
bytea *data;
#if PG_VERSION_NUM >= 120000
LOCAL_FCINFO(info, 3);
#else
FunctionCallInfoData info_data;
FunctionCallInfo info = &info_data;
#endif
tupType = HeapTupleHeaderGetTypeId(rec);
/*
* Normally one would call record_send() using DirectFunctionCall3,
* but that does not work since record_send wants to cache some data
* using fcinfo->flinfo->fn_extra. So we need to pass it our own
* flinfo parameter.
*/
InitFunctionCallInfoData(*info, fcinfo->flinfo, 3, InvalidOid, NULL, NULL);
init_args_3(info, PointerGetDatum(rec), ObjectIdGetDatum(tupType), Int32GetDatum(-1));
data = (bytea*) DatumGetPointer(record_send(info));
output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
pack_field(output_buffer, IT_RECORD,
VARSIZE(data), VARDATA(data), tupType);
PG_RETURN_VOID();
}
static Datum
dbms_pipe_unpack_message(PG_FUNCTION_ARGS, message_data_type dtype)
{
Oid tupType;
void *ptr;
int32 size;
Datum result;
message_data_type next_type;
message_data_type type;
if (input_buffer == NULL ||
input_buffer->items_count <= 0 ||
input_buffer->next == NULL ||
input_buffer->next->type == IT_NO_MORE_ITEMS)
PG_RETURN_NULL();
next_type = input_buffer->next->type;
if (next_type != dtype)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("datatype mismatch"),
errdetail("unpack unexpected type: %d", next_type)));
ptr = unpack_field(input_buffer, &type, &size, &tupType);
Assert(ptr != NULL);
switch (type)
{
case IT_TIMESTAMPTZ:
result = TimestampTzGetDatum(*(TimestampTz*)ptr);
break;
case IT_DATE:
result = DateADTGetDatum(*(DateADT*)ptr);
break;
case IT_VARCHAR:
case IT_NUMBER:
case IT_BYTEA:
result = PointerGetDatum(cstring_to_text_with_len(ptr, size));
break;
case IT_RECORD:
{
#if PG_VERSION_NUM >= 120000
LOCAL_FCINFO(info, 3);
#else
FunctionCallInfoData info_data;
FunctionCallInfo info = &info_data;
#endif
StringInfoData buf;
text *data = cstring_to_text_with_len(ptr, size);
buf.data = VARDATA(data);
buf.len = VARSIZE(data) - VARHDRSZ;
buf.maxlen = buf.len;
buf.cursor = 0;
/*
* Normally one would call record_recv() using DirectFunctionCall3,
* but that does not work since record_recv wants to cache some data
* using fcinfo->flinfo->fn_extra. So we need to pass it our own
* flinfo parameter.
*/
InitFunctionCallInfoData(*info, fcinfo->flinfo, 3, InvalidOid, NULL, NULL);
init_args_3(info, PointerGetDatum(&buf), ObjectIdGetDatum(tupType), Int32GetDatum(-1));
result = record_recv(info);
break;
}
default:
elog(ERROR, "unexpected type: %d", type);
result = (Datum) 0; /* keep compiler quiet */
}
if (input_buffer->items_count == 0)
{
pfree(input_buffer);
input_buffer = NULL;
}
PG_RETURN_DATUM(result);
}
Datum
dbms_pipe_unpack_message_text(PG_FUNCTION_ARGS)
{
return dbms_pipe_unpack_message(fcinfo, IT_VARCHAR);
}
Datum
dbms_pipe_unpack_message_date(PG_FUNCTION_ARGS)
{
return dbms_pipe_unpack_message(fcinfo, IT_DATE);
}
Datum
dbms_pipe_unpack_message_timestamp(PG_FUNCTION_ARGS)
{
return dbms_pipe_unpack_message(fcinfo, IT_TIMESTAMPTZ);
}
Datum
dbms_pipe_unpack_message_number(PG_FUNCTION_ARGS)
{
return dbms_pipe_unpack_message(fcinfo, IT_NUMBER);
}
Datum
dbms_pipe_unpack_message_bytea(PG_FUNCTION_ARGS)
{
return dbms_pipe_unpack_message(fcinfo, IT_BYTEA);
}
Datum
dbms_pipe_unpack_message_record(PG_FUNCTION_ARGS)
{
return dbms_pipe_unpack_message(fcinfo, IT_RECORD);
}
#define WATCH_PRE(t, et, c) \
et = GetNowFloat() + (float8)t; c = 0; (void) c;\
do \
{
#define WATCH_TM_POST(t,et,c) \
if (GetNowFloat() >= et) \
PG_RETURN_INT32(RESULT_TIMEOUT); \
if (cycle++ % 100 == 0) \
CHECK_FOR_INTERRUPTS(); \
pg_usleep(10000L); \
} while(true && t != 0);
Datum
dbms_pipe_receive_message(PG_FUNCTION_ARGS)
{
text *pipe_name = NULL;
int timeout;
bool found = false;
instr_time start_time;
int32 result = RESULT_TIMEOUT;
long identity = NOT_ASSIGNED_IDENTITY;
bool identity_alarm;
#if PG_VERSION_NUM < 130000
long cycle = 0;
#endif
if (PG_ARGISNULL(0))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("pipe name is NULL"),
errdetail("Pipename may not be NULL.")));
else
pipe_name = PG_GETARG_TEXT_P(0);
if (!PG_ARGISNULL(1))
{
timeout = PG_GETARG_INT32(1);
if (timeout < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("negative timeout is not allowed")));
if (timeout > MAXWAIT)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("timeout is too large (maximum: %d)", MAXWAIT)));
}
else
timeout = MAXWAIT;
if (input_buffer)
{
pfree(input_buffer);
input_buffer = NULL;
}
INSTR_TIME_SET_CURRENT(start_time);
for (;;)
{
input_buffer = get_from_pipe(pipe_name, &found, &identity, &identity_alarm);
if (found)
{
if (input_buffer)
input_buffer->next = message_buffer_get_content(input_buffer);
result = RESULT_DATA;
break;
}
if (identity_alarm)
break;
if (timeout > 0)
{
instr_time cur_time;
long cur_timeout;
INSTR_TIME_SET_CURRENT(cur_time);
INSTR_TIME_SUBTRACT(cur_time, start_time);
cur_timeout = timeout * 1000L - (long) INSTR_TIME_GET_MILLISEC(cur_time);
if (cur_timeout <= 0)
break;
#if PG_VERSION_NUM >= 130000
/*
* Timeout should be less than INT_MAX, but we set 1 sec as protection
* against deadlocks.
*/
if (cur_timeout > 1000)
cur_timeout = 1000;
if (ConditionVariableTimedSleep(pipe_cv, cur_timeout, PG_WAIT_EXTENSION))
{
/* exit on timeout */
INSTR_TIME_SET_CURRENT(cur_time);
INSTR_TIME_SUBTRACT(cur_time, start_time);
cur_timeout = timeout * 1000L - (long) INSTR_TIME_GET_MILLISEC(cur_time);
if (cur_timeout <= 0)
break;
}
#else
if (cycle++ % 10)
CHECK_FOR_INTERRUPTS();
pg_usleep(10000L);
/* exit on timeout */
INSTR_TIME_SET_CURRENT(cur_time);
INSTR_TIME_SUBTRACT(cur_time, start_time);
cur_timeout = timeout * 1000L - (long) INSTR_TIME_GET_MILLISEC(cur_time);
if (cur_timeout <= 0)
break;
#endif
}
else
break;
}
#if PG_VERSION_NUM >= 130000
ConditionVariableCancelSleep();
if (result == RESULT_DATA)
ConditionVariableBroadcast(pipe_cv);
#endif
PG_RETURN_INT32(result);
}
Datum
dbms_pipe_send_message(PG_FUNCTION_ARGS)
{
text *pipe_name = NULL;
int timeout;
int limit = 0;
bool valid_limit;
instr_time start_time;
int32 result = RESULT_TIMEOUT;
long identity = NOT_ASSIGNED_IDENTITY;
bool identity_alarm;
#if PG_VERSION_NUM < 130000
long cycle = 0;
#endif
if (PG_ARGISNULL(0))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("pipe name is NULL"),
errdetail("Pipename may not be NULL.")));
else
pipe_name = PG_GETARG_TEXT_P(0);
output_buffer = check_buffer(output_buffer, LOCALMSGSZ);
if (!PG_ARGISNULL(1))
{
timeout = PG_GETARG_INT32(1);
if (timeout < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("negative timeout is not allowed")));
if (timeout > MAXWAIT)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("timeout is too large (maximum: %d)", MAXWAIT)));
}
else
timeout = MAXWAIT;
if (PG_ARGISNULL(2))
valid_limit = false;
else
{
limit = PG_GETARG_INT32(2);
valid_limit = true;
}
INSTR_TIME_SET_CURRENT(start_time);
for (;;)
{
if (add_to_pipe(pipe_name, output_buffer,
limit, valid_limit,
&identity, &identity_alarm))
{
result = RESULT_DATA;
break;
}
if (identity_alarm)
break;
if (timeout > 0)
{
instr_time cur_time;
long cur_timeout;
INSTR_TIME_SET_CURRENT(cur_time);
INSTR_TIME_SUBTRACT(cur_time, start_time);
cur_timeout = timeout * 1000L - (long) INSTR_TIME_GET_MILLISEC(cur_time);
if (cur_timeout <= 0)
break;
#if PG_VERSION_NUM >= 130000
if (cur_timeout > 1000)
cur_timeout = 1000;
if (ConditionVariableTimedSleep(pipe_cv, cur_timeout, PG_WAIT_EXTENSION))
{
/* exit on timeout */
INSTR_TIME_SET_CURRENT(cur_time);
INSTR_TIME_SUBTRACT(cur_time, start_time);
cur_timeout = timeout * 1000L - (long) INSTR_TIME_GET_MILLISEC(cur_time);
if (cur_timeout <= 0)
break;
}
#else
if (cycle++ % 10)
CHECK_FOR_INTERRUPTS();
pg_usleep(10000L);
/* exit on timeout */
INSTR_TIME_SET_CURRENT(cur_time);
INSTR_TIME_SUBTRACT(cur_time, start_time);
cur_timeout = timeout * 1000L - (long) INSTR_TIME_GET_MILLISEC(cur_time);
if (cur_timeout <= 0)
break;
#endif
}
else
break;
}
#if PG_VERSION_NUM >= 130000
ConditionVariableCancelSleep();
if (result == RESULT_DATA)
ConditionVariableBroadcast(pipe_cv);
#endif
reset_buffer(output_buffer, LOCALMSGSZ);
PG_RETURN_INT32(result);
}
Datum
dbms_pipe_unique_session_name(PG_FUNCTION_ARGS)
{
StringInfoData strbuf;
float8 endtime;
int cycle = 0;
int timeout = 10;
WATCH_PRE(timeout, endtime, cycle);
if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES,MAX_EVENTS,MAX_LOCKS,false))
{
text *result;
initStringInfo(&strbuf);
appendStringInfo(&strbuf,"PG$PIPE$%d$%d",sid, MyProcPid);
result = cstring_to_text_with_len(strbuf.data, strbuf.len);
pfree(strbuf.data);
LWLockRelease(shmem_lockid);
PG_RETURN_TEXT_P(result);
}
WATCH_TM_POST(timeout, endtime, cycle);
LOCK_ERROR();
PG_RETURN_NULL();
}
#define DB_PIPES_COLS 6
Datum
dbms_pipe_list_pipes(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
TupleDesc tupdesc;
AttInMetadata *attinmeta;
PipesFctx *fctx;
float8 endtime;
int cycle;
int timeout = 10;
if (SRF_IS_FIRSTCALL())
{
int i;
MemoryContext oldcontext;
bool has_lock = false;
WATCH_PRE(timeout, endtime, cycle);
if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES, MAX_EVENTS, MAX_LOCKS, false))
{
has_lock = true;
break;
}
WATCH_TM_POST(timeout, endtime, cycle);
if (!has_lock)
LOCK_ERROR();
funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
fctx = palloc(sizeof(PipesFctx));
funcctx->user_fctx = fctx;
fctx->pipe_nth = 0;
#if PG_VERSION_NUM >= 120000
tupdesc = CreateTemplateTupleDesc(DB_PIPES_COLS);
#else
tupdesc = CreateTemplateTupleDesc(DB_PIPES_COLS, false);
#endif
i = 0;
TupleDescInitEntry(tupdesc, ++i, "name", VARCHAROID, -1, 0);
TupleDescInitEntry(tupdesc, ++i, "items", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, ++i, "size", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, ++i, "limit", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, ++i, "private", BOOLOID, -1, 0);
TupleDescInitEntry(tupdesc, ++i, "owner", VARCHAROID, -1, 0);
Assert(i == DB_PIPES_COLS);
attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta;
MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
fctx = (PipesFctx *) funcctx->user_fctx;
while (fctx->pipe_nth < MAX_PIPES)
{
if (pipes[fctx->pipe_nth].is_valid)
{
Datum result;
HeapTuple tuple;
char *values[DB_PIPES_COLS];
char items[16];
char size[16];
char limit[16];
/* name */
values[0] = pipes[fctx->pipe_nth].pipe_name;
/* items */
snprintf(items, lengthof(items), "%d", pipes[fctx->pipe_nth].count);
values[1] = items;
/* items */
snprintf(size, lengthof(size), "%d", pipes[fctx->pipe_nth].size);
values[2] = size;
/* limit */
if (pipes[fctx->pipe_nth].limit != -1)
{
snprintf(limit, lengthof(limit), "%d", pipes[fctx->pipe_nth].limit);
values[3] = limit;
}
else
values[3] = NULL;
/* private */
values[4] = (pipes[fctx->pipe_nth].creator ? "true" : "false");
/* owner */
values[5] = pipes[fctx->pipe_nth].creator;
tuple = BuildTupleFromCStrings(funcctx->attinmeta, values);
result = HeapTupleGetDatum(tuple);
fctx->pipe_nth += 1;
SRF_RETURN_NEXT(funcctx, result);
}
fctx->pipe_nth += 1;
}
LWLockRelease(shmem_lockid);
SRF_RETURN_DONE(funcctx);
}
/*
* secondary functions
*/
/*
* Registration explicit pipes
* dbms_pipe.create_pipe(pipe_name varchar, limit := -1 int, private := false bool);
*/
Datum
dbms_pipe_create_pipe(PG_FUNCTION_ARGS)
{
text *pipe_name = NULL;
int limit = 0;
bool is_private;
bool limit_is_valid = false;
bool created;
float8 endtime;
int cycle;
int timeout = 10;
if (PG_ARGISNULL(0))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("pipe name is NULL"),
errdetail("Pipename may not be NULL.")));
else
pipe_name = PG_GETARG_TEXT_P(0);
if (!PG_ARGISNULL(1))
{
limit = PG_GETARG_INT32(1);
limit_is_valid = true;
}
is_private = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);
WATCH_PRE(timeout, endtime, cycle);
if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES,MAX_EVENTS,MAX_LOCKS,false))
{
orafce_pipe *p;
if (NULL != (p = find_pipe(pipe_name, &created, false, NULL, NULL)))
{
if (!created)
{
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("pipe creation error"),
errdetail("Pipe is registered.")));
}
if (is_private)
{
char *user;
p->uid = GetUserId();
user = (char*)DirectFunctionCall1(namein,
CStringGetDatum(GetUserNameFromId(p->uid, false)));
p->creator = ora_sstrcpy(user);
pfree(user);
}
p->limit = limit_is_valid ? limit : -1;
p->registered = true;
LWLockRelease(shmem_lockid);
PG_RETURN_VOID();
}
}
WATCH_TM_POST(timeout, endtime, cycle);
LOCK_ERROR();
PG_RETURN_VOID();
}
/*
* Clean local input, output buffers
*/
Datum
dbms_pipe_reset_buffer(PG_FUNCTION_ARGS)
{
if (output_buffer != NULL)
{
pfree(output_buffer);
output_buffer = NULL;
}
if (input_buffer != NULL)
{
pfree(input_buffer);
input_buffer = NULL;
}
PG_RETURN_VOID();
}
/*
* Remove all stored messages in pipe. Remove implicit created
* pipe.
*/
Datum
dbms_pipe_purge(PG_FUNCTION_ARGS)
{
text *pipe_name = PG_GETARG_TEXT_P(0);
float8 endtime;
int cycle = 0;
int timeout = 10;
WATCH_PRE(timeout, endtime, cycle);
if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES,MAX_EVENTS,MAX_LOCKS,false))
{
remove_pipe(pipe_name, true);
LWLockRelease(shmem_lockid);
PG_RETURN_VOID();
}
WATCH_TM_POST(timeout, endtime, cycle);
LOCK_ERROR();
#if PG_VERSION_NUM >= 130000
ConditionVariableBroadcast(pipe_cv);
#endif
PG_RETURN_VOID();
}
/*
* Remove pipe if exists
*/
Datum
dbms_pipe_remove_pipe(PG_FUNCTION_ARGS)
{
text *pipe_name = PG_GETARG_TEXT_P(0);
float8 endtime;
int cycle = 0;
int timeout = 10;
WATCH_PRE(timeout, endtime, cycle);
if (ora_lock_shmem(SHMEMMSGSZ, MAX_PIPES,MAX_EVENTS,MAX_LOCKS,false))
{
remove_pipe(pipe_name, false);
LWLockRelease(shmem_lockid);
PG_RETURN_VOID();
}
WATCH_TM_POST(timeout, endtime, cycle);
LOCK_ERROR();
#if PG_VERSION_NUM >= 130000
ConditionVariableBroadcast(pipe_cv);
#endif
PG_RETURN_VOID();
}
/*
* Some void udf which I can't wrap in sql
*/
Datum
dbms_pipe_create_pipe_2(PG_FUNCTION_ARGS)
{
Datum arg1;
int limit = -1;
if (PG_ARGISNULL(0))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("pipe name is NULL"),
errdetail("Pipename may not be NULL.")));
arg1 = PG_GETARG_DATUM(0);
if (!PG_ARGISNULL(1))
limit = PG_GETARG_INT32(1);
DirectFunctionCall3(dbms_pipe_create_pipe,
arg1,
Int32GetDatum(limit),
BoolGetDatum(false));
PG_RETURN_VOID();
}
Datum
dbms_pipe_create_pipe_1(PG_FUNCTION_ARGS)
{
Datum arg1;
if (PG_ARGISNULL(0))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("pipe name is NULL"),
errdetail("Pipename may not be NULL.")));
arg1 = PG_GETARG_DATUM(0);
DirectFunctionCall3(dbms_pipe_create_pipe,
arg1,
(Datum) -1,
BoolGetDatum(false));
PG_RETURN_VOID();
}
Datum
dbms_pipe_pack_message_integer(PG_FUNCTION_ARGS)
{
/* Casting from int4 to numeric */
DirectFunctionCall1(dbms_pipe_pack_message_number,
DirectFunctionCall1(int4_numeric, PG_GETARG_DATUM(0)));
PG_RETURN_VOID();
}
Datum
dbms_pipe_pack_message_bigint(PG_FUNCTION_ARGS)
{
/* Casting from int8 to numeric */
DirectFunctionCall1(dbms_pipe_pack_message_number,
DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(0)));
PG_RETURN_VOID();
}