blob: 6a61be917f545093527b6a836a308414b011ae45 [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* async.c
* Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
*
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.147 2009/06/11 14:48:55 momjian Exp $
*
*-------------------------------------------------------------------------
*/
/*-------------------------------------------------------------------------
* New Async Notification Model:
* 1. Multiple backends on same machine. Multiple backends listening on
* one relation. (Note: "listening on a relation" is not really the
* right way to think about it, since the notify names need not have
* anything to do with the names of relations actually in the database.
* But this terminology is all over the code and docs, and I don't feel
* like trying to replace it.)
*
* 2. There is a tuple in relation "pg_listener" for each active LISTEN,
* ie, each relname/listenerPID pair. The "notification" field of the
* tuple is zero when no NOTIFY is pending for that listener, or the PID
* of the originating backend when a cross-backend NOTIFY is pending.
* (We skip writing to pg_listener when doing a self-NOTIFY, so the
* notification field should never be equal to the listenerPID field.)
*
* 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
* relname to a list of outstanding NOTIFY requests. Actual processing
* happens if and only if we reach transaction commit. At that time (in
* routine AtCommit_Notify) we scan pg_listener for matching relnames.
* If the listenerPID in a matching tuple is ours, we just send a notify
* message to our own front end. If it is not ours, and "notification"
* is not already nonzero, we set notification to our own PID and send a
* SIGUSR2 signal to the receiving process (indicated by listenerPID).
* BTW: if the signal operation fails, we presume that the listener backend
* crashed without removing this tuple, and remove the tuple for it.
*
* 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
* notify processing immediately if this backend is idle (ie, it is
* waiting for a frontend command and is not within a transaction block).
* Otherwise the handler may only set a flag, which will cause the
* processing to occur just before we next go idle.
*
* 5. Inbound-notify processing consists of scanning pg_listener for tuples
* matching our own listenerPID and having nonzero notification fields.
* For each such tuple, we send a message to our frontend and clear the
* notification field. BTW: this routine has to start/commit its own
* transaction, since by assumption it is only called from outside any
* transaction.
*
* Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list
* of pending actions. If we reach transaction commit, the changes are
* applied to pg_listener just before executing any pending NOTIFYs. This
* method is necessary because to avoid race conditions, we must hold lock
* on pg_listener from when we insert a new listener tuple until we commit.
* To do that and not create undue hazard of deadlock, we don't want to
* touch pg_listener until we are otherwise done with the transaction;
* in particular it'd be uncool to still be taking user-commanded locks
* while holding the pg_listener lock.
*
* Although we grab ExclusiveLock on pg_listener for any operation,
* the lock is never held very long, so it shouldn't cause too much of
* a performance problem. (Previously we used AccessExclusiveLock, but
* there's no real reason to forbid concurrent reads.)
*
* An application that listens on the same relname it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
* by comparing be_pid in the NOTIFY message to the application's own backend's
* PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
* frontend during startup.) The above design guarantees that notifies from
* other backends will never be missed by ignoring self-notifies. Note,
* however, that we do *not* guarantee that a separate frontend message will
* be sent for every outside NOTIFY. Since there is only room for one
* originating PID in pg_listener, outside notifies occurring at about the
* same time may be collapsed into a single message bearing the PID of the
* first outside backend to perform the NOTIFY.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <unistd.h>
#include <signal.h>
#include "access/heapam.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "catalog/pg_listener.h"
#include "commands/async.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/tqual.h"
/*
* State for pending LISTEN/UNLISTEN actions consists of an ordered list of
* all actions requested in the current transaction. As explained above,
* we don't actually modify pg_listener until we reach transaction commit.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
* successful subtransactions attach their lists to their parent's list.
* Failed subtransactions simply discard their lists.
*/
typedef enum
{
LISTEN_LISTEN,
LISTEN_UNLISTEN,
LISTEN_UNLISTEN_ALL
} ListenActionKind;
typedef struct
{
ListenActionKind action;
char condname[1]; /* actually, as long as needed */
} ListenAction;
static List *pendingActions = NIL; /* list of ListenAction */
static List *upperPendingActions = NIL; /* list of upper-xact lists */
/*
* State for outbound notifies consists of a list of all relnames NOTIFYed
* in the current transaction. We do not actually perform a NOTIFY until
* and unless the transaction commits. pendingNotifies is NIL if no
* NOTIFYs have been done in the current transaction.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
* successful subtransactions attach their lists to their parent's list.
* Failed subtransactions simply discard their lists.
*
* Note: the action and notify lists do not interact within a transaction.
* In particular, if a transaction does NOTIFY and then LISTEN on the same
* condition name, it will get a self-notify at commit. This is a bit odd
* but is consistent with our historical behavior.
*/
static List *pendingNotifies = NIL; /* list of C strings */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
/*
* State for inbound notifies consists of two flags: one saying whether
* the signal handler is currently allowed to call ProcessIncomingNotify
* directly, and one saying whether the signal has occurred but the handler
* was not allowed to call ProcessIncomingNotify at the time.
*
* NB: the "volatile" on these declarations is critical! If your compiler
* does not grok "volatile", you'd be best advised to compile this file
* with all optimization turned off.
*/
static volatile sig_atomic_t notifyInterruptEnabled = 0;
static volatile sig_atomic_t notifyInterruptOccurred = 0;
/* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false;
bool Trace_notify = false;
static void queue_listen(ListenActionKind action, const char *condname);
static void Async_UnlistenOnExit(int code, Datum arg);
static void Exec_Listen(Relation lRel, const char *relname);
static void Exec_Unlisten(Relation lRel, const char *relname);
static void Exec_UnlistenAll(Relation lRel);
static void Send_Notify(Relation lRel);
static void ProcessIncomingNotify(void);
static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
static bool AsyncExistsPendingNotify(const char *relname);
static void ClearPendingActionsAndNotifies(void);
/*
* Async_Notify
*
* This is executed by the SQL notify command.
*
* Adds the relation to the list of pending notifies.
* Actual notification happens during transaction commit.
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
void
Async_Notify(const char *relname)
{
if (Trace_notify)
elog(DEBUG1, "Async_Notify(%s)", relname);
/* no point in making duplicate entries in the list ... */
if (!AsyncExistsPendingNotify(relname))
{
/*
* The name list needs to live until end of transaction, so store it
* in the transaction context.
*/
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
/*
* Ordering of the list isn't important. We choose to put new entries
* on the front, as this might make duplicate-elimination a tad faster
* when the same condition is signaled many times in a row.
*/
pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
MemoryContextSwitchTo(oldcontext);
}
}
/*
* queue_listen
* Common code for listen, unlisten, unlisten all commands.
*
* Adds the request to the list of pending actions.
* Actual update of pg_listener happens during transaction commit.
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
static void
queue_listen(ListenActionKind action, const char *condname)
{
MemoryContext oldcontext;
ListenAction *actrec;
/*
* Unlike Async_Notify, we don't try to collapse out duplicates. It would
* be too complicated to ensure we get the right interactions of
* conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
* would be any performance benefit anyway in sane applications.
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
/* space for terminating null is included in sizeof(ListenAction) */
actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname));
actrec->action = action;
strcpy(actrec->condname, condname);
pendingActions = lappend(pendingActions, actrec);
MemoryContextSwitchTo(oldcontext);
}
/*
* Async_Listen
*
* This is executed by the SQL listen command.
*/
void
Async_Listen(const char *relname)
{
if (Trace_notify)
elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);
queue_listen(LISTEN_LISTEN, relname);
}
/*
* Async_Unlisten
*
* This is executed by the SQL unlisten command.
*/
void
Async_Unlisten(const char *relname)
{
if (Trace_notify)
elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);
/* If we couldn't possibly be listening, no need to queue anything */
if (pendingActions == NIL && !unlistenExitRegistered)
return;
queue_listen(LISTEN_UNLISTEN, relname);
}
/*
* Async_UnlistenAll
*
* This is invoked by UNLISTEN * command, and also at backend exit.
*/
void
Async_UnlistenAll(void)
{
if (Trace_notify)
elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
/* If we couldn't possibly be listening, no need to queue anything */
if (pendingActions == NIL && !unlistenExitRegistered)
return;
queue_listen(LISTEN_UNLISTEN_ALL, "");
}
/*
* Async_UnlistenOnExit
*
* Clean up the pg_listener table at backend exit.
*
* This is executed if we have done any LISTENs in this backend.
* It might not be necessary anymore, if the user UNLISTENed everything,
* but we don't try to detect that case.
*/
static void
Async_UnlistenOnExit(int code, Datum arg)
{
/*
* We need to start/commit a transaction for the unlisten, but if there is
* already an active transaction we had better abort that one first.
* Otherwise we'd end up committing changes that probably ought to be
* discarded.
*/
AbortOutOfAnyTransaction();
/* Now we can do the unlisten */
StartTransactionCommand();
Async_UnlistenAll();
CommitTransactionCommand();
}
/*
* AtPrepare_Notify
*
* This is called at the prepare phase of a two-phase
* transaction. Save the state for possible commit later.
*/
void
AtPrepare_Notify(void)
{
ListCell *p;
/* It's not sensible to have any pending LISTEN/UNLISTEN actions */
if (pendingActions)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN")));
/* We can deal with pending NOTIFY though */
foreach(p, pendingNotifies)
{
const char *relname = (const char *) lfirst(p);
RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
relname, strlen(relname) + 1);
}
/*
* We can clear the state immediately, rather than needing a separate
* PostPrepare call, because if the transaction fails we'd just discard
* the state anyway.
*/
ClearPendingActionsAndNotifies();
}
/*
* AtCommit_Notify
*
* This is called at transaction commit.
*
* If there are pending LISTEN/UNLISTEN actions, insert or delete
* tuples in pg_listener accordingly.
*
* If there are outbound notify requests in the pendingNotifies list,
* scan pg_listener for matching tuples, and either signal the other
* backend or send a message to our own frontend.
*
* NOTE: we are still inside the current transaction, therefore can
* piggyback on its committing of changes.
*/
void
AtCommit_Notify(void)
{
Relation lRel;
ListCell *p;
if (pendingActions == NIL && pendingNotifies == NIL)
return; /* no relevant statements in this xact */
/*
* NOTIFY is disabled if not normal processing mode. This test used to be
* in xact.c, but it seems cleaner to do it here.
*/
if (!IsNormalProcessingMode())
{
ClearPendingActionsAndNotifies();
return;
}
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify");
/* Acquire ExclusiveLock on pg_listener */
lRel = heap_open(ListenerRelationId, ExclusiveLock);
/* Perform any pending listen/unlisten actions */
foreach(p, pendingActions)
{
ListenAction *actrec = (ListenAction *) lfirst(p);
switch (actrec->action)
{
case LISTEN_LISTEN:
Exec_Listen(lRel, actrec->condname);
break;
case LISTEN_UNLISTEN:
Exec_Unlisten(lRel, actrec->condname);
break;
case LISTEN_UNLISTEN_ALL:
Exec_UnlistenAll(lRel);
break;
}
/* We must CCI after each action in case of conflicting actions */
CommandCounterIncrement();
}
/* Perform any pending notifies */
if (pendingNotifies)
Send_Notify(lRel);
/*
* We do NOT release the lock on pg_listener here; we need to hold it
* until end of transaction (which is about to happen, anyway) to ensure
* that notified backends see our tuple updates when they look. Else they
* might disregard the signal, which would make the application programmer
* very unhappy. Also, this prevents race conditions when we have just
* inserted a listening tuple.
*/
heap_close(lRel, NoLock);
ClearPendingActionsAndNotifies();
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify: done");
}
/*
* Exec_Listen --- subroutine for AtCommit_Notify
*
* Register the current backend as listening on the specified relation.
*/
static void
Exec_Listen(Relation lRel, const char *relname)
{
HeapScanDesc scan;
HeapTuple tuple;
Datum values[Natts_pg_listener];
bool nulls[Natts_pg_listener];
NameData condname;
bool alreadyListener = false;
if (Trace_notify)
elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid);
/* Detect whether we are already listening on this relname */
scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
if (listener->listenerpid == MyProcPid &&
strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
{
alreadyListener = true;
/* No need to scan the rest of the table */
break;
}
}
heap_endscan(scan);
if (alreadyListener)
return;
/*
* OK to insert a new tuple
*/
memset(nulls, false, sizeof(nulls));
namestrcpy(&condname, relname);
values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname);
values[Anum_pg_listener_listenerpid - 1] = Int32GetDatum(MyProcPid);
values[Anum_pg_listener_notification - 1] = Int32GetDatum(0); /* no notifies pending */
tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls);
simple_heap_insert(lRel, tuple);
#ifdef NOT_USED /* currently there are no indexes */
CatalogUpdateIndexes(lRel, tuple);
#endif
heap_freetuple(tuple);
/*
* now that we are listening, make sure we will unlisten before dying.
*/
if (!unlistenExitRegistered)
{
on_shmem_exit(Async_UnlistenOnExit, 0);
unlistenExitRegistered = true;
}
}
/*
* Exec_Unlisten --- subroutine for AtCommit_Notify
*
* Remove the current backend from the list of listening backends
* for the specified relation.
*/
static void
Exec_Unlisten(Relation lRel, const char *relname)
{
HeapScanDesc scan;
HeapTuple tuple;
if (Trace_notify)
elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid);
scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
if (listener->listenerpid == MyProcPid &&
strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
{
/* Found the matching tuple, delete it */
simple_heap_delete(lRel, &tuple->t_self);
/*
* We assume there can be only one match, so no need to scan the
* rest of the table
*/
break;
}
}
heap_endscan(scan);
/*
* We do not complain about unlistening something not being listened;
* should we?
*/
}
/*
* Exec_UnlistenAll --- subroutine for AtCommit_Notify
*
* Update pg_listener to unlisten all relations for this backend.
*/
static void
Exec_UnlistenAll(Relation lRel)
{
HeapScanDesc scan;
HeapTuple lTuple;
ScanKeyData key[1];
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenAll");
/* Find and delete all entries with my listenerPID */
ScanKeyInit(&key[0],
Anum_pg_listener_listenerpid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(MyProcPid));
scan = heap_beginscan(lRel, SnapshotNow, 1, key);
while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
simple_heap_delete(lRel, &lTuple->t_self);
heap_endscan(scan);
}
/*
* Send_Notify --- subroutine for AtCommit_Notify
*
* Scan pg_listener for tuples matching our pending notifies, and
* either signal the other backend or send a message to our own frontend.
*/
static void
Send_Notify(Relation lRel)
{
TupleDesc tdesc = RelationGetDescr(lRel);
HeapScanDesc scan;
HeapTuple lTuple,
rTuple;
Datum value[Natts_pg_listener];
bool repl[Natts_pg_listener],
nulls[Natts_pg_listener];
/* preset data to update notify column to MyProcPid */
memset(nulls, false, sizeof(nulls));
memset(repl, false, sizeof(repl));
repl[Anum_pg_listener_notification - 1] = true;
memset(value, 0, sizeof(value));
value[Anum_pg_listener_notification - 1] = Int32GetDatum(MyProcPid);
scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
char *relname = NameStr(listener->relname);
int32 listenerPID = listener->listenerpid;
if (!AsyncExistsPendingNotify(relname))
continue;
if (listenerPID == MyProcPid)
{
/*
* Self-notify: no need to bother with table update. Indeed, we
* *must not* clear the notification field in this path, or we
* could lose an outside notify, which'd be bad for applications
* that ignore self-notify messages.
*/
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify: notifying self");
NotifyMyFrontEnd(relname, listenerPID);
}
else
{
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
listenerPID);
/*
* If someone has already notified this listener, we don't bother
* modifying the table, but we do still send a SIGUSR2 signal,
* just in case that backend missed the earlier signal for some
* reason. It's OK to send the signal first, because the other
* guy can't read pg_listener until we unlock it.
*/
if (kill(listenerPID, SIGUSR2) < 0)
{
/*
* Get rid of pg_listener entry if it refers to a PID that no
* longer exists. Presumably, that backend crashed without
* deleting its pg_listener entries. This code used to only
* delete the entry if errno==ESRCH, but as far as I can see
* we should just do it for any failure (certainly at least
* for EPERM too...)
*/
simple_heap_delete(lRel, &lTuple->t_self);
}
else if (listener->notification == 0)
{
/* Rewrite the tuple with my PID in notification column */
rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
simple_heap_update(lRel, &lTuple->t_self, rTuple);
#ifdef NOT_USED /* currently there are no indexes */
CatalogUpdateIndexes(lRel, rTuple);
#endif
}
}
}
heap_endscan(scan);
}
/*
* AtAbort_Notify
*
* This is called at transaction abort.
*
* Gets rid of pending actions and outbound notifies that we would have
* executed if the transaction got committed.
*/
void
AtAbort_Notify(void)
{
ClearPendingActionsAndNotifies();
}
/*
* AtSubStart_Notify() --- Take care of subtransaction start.
*
* Push empty state for the new subtransaction.
*/
void
AtSubStart_Notify(void)
{
MemoryContext old_cxt;
/* Keep the list-of-lists in TopTransactionContext for simplicity */
old_cxt = MemoryContextSwitchTo(TopTransactionContext);
upperPendingActions = lcons(pendingActions, upperPendingActions);
Assert(list_length(upperPendingActions) ==
GetCurrentTransactionNestLevel() - 1);
pendingActions = NIL;
upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
Assert(list_length(upperPendingNotifies) ==
GetCurrentTransactionNestLevel() - 1);
pendingNotifies = NIL;
MemoryContextSwitchTo(old_cxt);
}
/*
* AtSubCommit_Notify() --- Take care of subtransaction commit.
*
* Reassign all items in the pending lists to the parent transaction.
*/
void
AtSubCommit_Notify(void)
{
List *parentPendingActions;
List *parentPendingNotifies;
parentPendingActions = (List *) linitial(upperPendingActions);
upperPendingActions = list_delete_first(upperPendingActions);
Assert(list_length(upperPendingActions) ==
GetCurrentTransactionNestLevel() - 2);
/*
* Mustn't try to eliminate duplicates here --- see queue_listen()
*/
pendingActions = list_concat(parentPendingActions, pendingActions);
parentPendingNotifies = (List *) linitial(upperPendingNotifies);
upperPendingNotifies = list_delete_first(upperPendingNotifies);
Assert(list_length(upperPendingNotifies) ==
GetCurrentTransactionNestLevel() - 2);
/*
* We could try to eliminate duplicates here, but it seems not worthwhile.
*/
pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
}
/*
* AtSubAbort_Notify() --- Take care of subtransaction abort.
*/
void
AtSubAbort_Notify(void)
{
int my_level = GetCurrentTransactionNestLevel();
/*
* All we have to do is pop the stack --- the actions/notifies made in
* this subxact are no longer interesting, and the space will be freed
* when CurTransactionContext is recycled.
*
* This routine could be called more than once at a given nesting level if
* there is trouble during subxact abort. Avoid dumping core by using
* GetCurrentTransactionNestLevel as the indicator of how far we need to
* prune the list.
*/
while (list_length(upperPendingActions) > my_level - 2)
{
pendingActions = (List *) linitial(upperPendingActions);
upperPendingActions = list_delete_first(upperPendingActions);
}
while (list_length(upperPendingNotifies) > my_level - 2)
{
pendingNotifies = (List *) linitial(upperPendingNotifies);
upperPendingNotifies = list_delete_first(upperPendingNotifies);
}
}
/*
* NotifyInterruptHandler
*
* This is the signal handler for SIGUSR2.
*
* If we are idle (notifyInterruptEnabled is set), we can safely invoke
* ProcessIncomingNotify directly. Otherwise, just set a flag
* to do it later.
*/
void
NotifyInterruptHandler(SIGNAL_ARGS)
{
int save_errno = errno;
/*
* Note: this is a SIGNAL HANDLER. You must be very wary what you do
* here. Some helpful soul had this routine sprinkled with TPRINTFs, which
* would likely lead to corruption of stdio buffers if they were ever
* turned on.
*/
/* Don't joggle the elbow of proc_exit */
if (proc_exit_inprogress)
return;
if (notifyInterruptEnabled)
{
bool save_ImmediateInterruptOK = ImmediateInterruptOK;
/*
* We may be called while ImmediateInterruptOK is true; turn it off
* while messing with the NOTIFY state. (We would have to save and
* restore it anyway, because PGSemaphore operations inside
* ProcessIncomingNotify() might reset it.)
*/
ImmediateInterruptOK = false;
/*
* I'm not sure whether some flavors of Unix might allow another
* SIGUSR2 occurrence to recursively interrupt this routine. To cope
* with the possibility, we do the same sort of dance that
* EnableNotifyInterrupt must do --- see that routine for comments.
*/
notifyInterruptEnabled = 0; /* disable any recursive signal */
notifyInterruptOccurred = 1; /* do at least one iteration */
for (;;)
{
notifyInterruptEnabled = 1;
if (!notifyInterruptOccurred)
break;
notifyInterruptEnabled = 0;
if (notifyInterruptOccurred)
{
/* Here, it is finally safe to do stuff. */
if (Trace_notify)
elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
ProcessIncomingNotify();
if (Trace_notify)
elog(DEBUG1, "NotifyInterruptHandler: done");
}
}
/*
* Restore ImmediateInterruptOK, and check for interrupts if needed.
*/
ImmediateInterruptOK = save_ImmediateInterruptOK;
if (save_ImmediateInterruptOK)
CHECK_FOR_INTERRUPTS();
}
else
{
/*
* In this path it is NOT SAFE to do much of anything, except this:
*/
notifyInterruptOccurred = 1;
}
errno = save_errno;
}
/*
* EnableNotifyInterrupt
*
* This is called by the PostgresMain main loop just before waiting
* for a frontend command. If we are truly idle (ie, *not* inside
* a transaction block), then process any pending inbound notifies,
* and enable the signal handler to process future notifies directly.
*
* NOTE: the signal handler starts out disabled, and stays so until
* PostgresMain calls this the first time.
*/
void
EnableNotifyInterrupt(void)
{
if (IsTransactionOrTransactionBlock())
return; /* not really idle */
/*
* This code is tricky because we are communicating with a signal handler
* that could interrupt us at any point. If we just checked
* notifyInterruptOccurred and then set notifyInterruptEnabled, we could
* fail to respond promptly to a signal that happens in between those two
* steps. (A very small time window, perhaps, but Murphy's Law says you
* can hit it...) Instead, we first set the enable flag, then test the
* occurred flag. If we see an unserviced interrupt has occurred, we
* re-clear the enable flag before going off to do the service work. (That
* prevents re-entrant invocation of ProcessIncomingNotify() if another
* interrupt occurs.) If an interrupt comes in between the setting and
* clearing of notifyInterruptEnabled, then it will have done the service
* work and left notifyInterruptOccurred zero, so we have to check again
* after clearing enable. The whole thing has to be in a loop in case
* another interrupt occurs while we're servicing the first. Once we get
* out of the loop, enable is set and we know there is no unserviced
* interrupt.
*
* NB: an overenthusiastic optimizing compiler could easily break this
* code. Hopefully, they all understand what "volatile" means these days.
*/
for (;;)
{
notifyInterruptEnabled = 1;
if (!notifyInterruptOccurred)
break;
notifyInterruptEnabled = 0;
if (notifyInterruptOccurred)
{
if (Trace_notify)
elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
ProcessIncomingNotify();
if (Trace_notify)
elog(DEBUG1, "EnableNotifyInterrupt: done");
}
}
}
/*
* DisableNotifyInterrupt
*
* This is called by the PostgresMain main loop just after receiving
* a frontend command. Signal handler execution of inbound notifies
* is disabled until the next EnableNotifyInterrupt call.
*
* The SIGUSR1 signal handler also needs to call this, so as to
* prevent conflicts if one signal interrupts the other. So we
* must return the previous state of the flag.
*/
bool
DisableNotifyInterrupt(void)
{
bool result = (notifyInterruptEnabled != 0);
notifyInterruptEnabled = 0;
return result;
}
/*
* ProcessIncomingNotify
*
* Deal with arriving NOTIFYs from other backends.
* This is called either directly from the SIGUSR2 signal handler,
* or the next time control reaches the outer idle loop.
* Scan pg_listener for arriving notifies, report them to my front end,
* and clear the notification field in pg_listener until next time.
*
* NOTE: since we are outside any transaction, we must create our own.
*/
static void
ProcessIncomingNotify(void)
{
Relation lRel;
TupleDesc tdesc;
ScanKeyData key[1];
HeapScanDesc scan;
HeapTuple lTuple,
rTuple;
Datum value[Natts_pg_listener];
bool repl[Natts_pg_listener],
nulls[Natts_pg_listener];
bool catchup_enabled;
bool client_wait_timeout_enabled;
/* Must prevent SIGUSR1 interrupt while I am running */
catchup_enabled = DisableCatchupInterrupt();
client_wait_timeout_enabled = DisableClientWaitTimeoutInterrupt();
if (Trace_notify)
elog(DEBUG1, "ProcessIncomingNotify");
set_ps_display("notify interrupt", false);
notifyInterruptOccurred = 0;
StartTransactionCommand();
lRel = heap_open(ListenerRelationId, ExclusiveLock);
tdesc = RelationGetDescr(lRel);
/* Scan only entries with my listenerPID */
ScanKeyInit(&key[0],
Anum_pg_listener_listenerpid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(MyProcPid));
scan = heap_beginscan(lRel, SnapshotNow, 1, key);
/* Prepare data for rewriting 0 into notification field */
memset(nulls, false, sizeof(nulls));
memset(repl, false, sizeof(repl));
repl[Anum_pg_listener_notification - 1] = true;
memset(value, 0, sizeof(value));
value[Anum_pg_listener_notification - 1] = Int32GetDatum(0);
while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
char *relname = NameStr(listener->relname);
int32 sourcePID = listener->notification;
if (sourcePID != 0)
{
/* Notify the frontend */
if (Trace_notify)
elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
relname, (int) sourcePID);
NotifyMyFrontEnd(relname, sourcePID);
/*
* Rewrite the tuple with 0 in notification column.
*/
rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl);
simple_heap_update(lRel, &lTuple->t_self, rTuple);
#ifdef NOT_USED /* currently there are no indexes */
CatalogUpdateIndexes(lRel, rTuple);
#endif
}
}
heap_endscan(scan);
/*
* We do NOT release the lock on pg_listener here; we need to hold it
* until end of transaction (which is about to happen, anyway) to ensure
* that other backends see our tuple updates when they look. Otherwise, a
* transaction started after this one might mistakenly think it doesn't
* need to send this backend a new NOTIFY.
*/
heap_close(lRel, NoLock);
CommitTransactionCommand();
/*
* Must flush the notify messages to ensure frontend gets them promptly.
*/
pq_flush();
set_ps_display("idle", false);
if (Trace_notify)
elog(DEBUG1, "ProcessIncomingNotify: done");
if (catchup_enabled)
EnableCatchupInterrupt();
if (client_wait_timeout_enabled)
EnableClientWaitTimeoutInterrupt();
}
/*
* Send NOTIFY message to my front end.
*/
static void
NotifyMyFrontEnd(char *relname, int32 listenerPID)
{
if (whereToSendOutput == DestRemote)
{
StringInfoData buf;
pq_beginmessage(&buf, 'A');
pq_sendint(&buf, listenerPID, sizeof(int32));
pq_sendstring(&buf, relname);
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
{
/* XXX Add parameter string here later */
pq_sendstring(&buf, "");
}
pq_endmessage(&buf);
/*
* NOTE: we do not do pq_flush() here. For a self-notify, it will
* happen at the end of the transaction, and for incoming notifies
* ProcessIncomingNotify will do it after finding all the notifies.
*/
}
else
elog(INFO, "NOTIFY for %s", relname);
}
/* Does pendingNotifies include the given relname? */
static bool
AsyncExistsPendingNotify(const char *relname)
{
ListCell *p;
foreach(p, pendingNotifies)
{
const char *prelname = (const char *) lfirst(p);
if (strcmp(prelname, relname) == 0)
return true;
}
return false;
}
/* Clear the pendingActions and pendingNotifies lists. */
static void
ClearPendingActionsAndNotifies(void)
{
/*
* We used to have to explicitly deallocate the list members and nodes,
* because they were malloc'd. Now, since we know they are palloc'd in
* CurTransactionContext, we need not do that --- they'll go away
* automatically at transaction exit. We need only reset the list head
* pointers.
*/
pendingActions = NIL;
pendingNotifies = NIL;
}
/*
* 2PC processing routine for COMMIT PREPARED case.
*
* (We don't have to do anything for ROLLBACK PREPARED.)
*/
void
notify_twophase_postcommit(TransactionId xid, uint16 info,
void *recdata, uint32 len)
{
/*
* Set up to issue the NOTIFY at the end of my own current transaction.
* (XXX this has some issues if my own transaction later rolls back, or if
* there is any significant delay before I commit. OK for now because we
* disallow COMMIT PREPARED inside a transaction block.)
*/
Async_Notify((char *) recdata);
}