| /*------------------------------------------------------------------------- |
| * |
| * async.c |
| * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN |
| * |
| * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * IDENTIFICATION |
| * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.147 2009/06/11 14:48:55 momjian Exp $ |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * New Async Notification Model: |
| * 1. Multiple backends on same machine. Multiple backends listening on |
| * one relation. (Note: "listening on a relation" is not really the |
| * right way to think about it, since the notify names need not have |
| * anything to do with the names of relations actually in the database. |
| * But this terminology is all over the code and docs, and I don't feel |
| * like trying to replace it.) |
| * |
| * 2. There is a tuple in relation "pg_listener" for each active LISTEN, |
| * ie, each relname/listenerPID pair. The "notification" field of the |
| * tuple is zero when no NOTIFY is pending for that listener, or the PID |
| * of the originating backend when a cross-backend NOTIFY is pending. |
| * (We skip writing to pg_listener when doing a self-NOTIFY, so the |
| * notification field should never be equal to the listenerPID field.) |
| * |
| * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target |
| * relname to a list of outstanding NOTIFY requests. Actual processing |
| * happens if and only if we reach transaction commit. At that time (in |
| * routine AtCommit_Notify) we scan pg_listener for matching relnames. |
| * If the listenerPID in a matching tuple is ours, we just send a notify |
| * message to our own front end. If it is not ours, and "notification" |
| * is not already nonzero, we set notification to our own PID and send a |
| * SIGUSR2 signal to the receiving process (indicated by listenerPID). |
| * BTW: if the signal operation fails, we presume that the listener backend |
| * crashed without removing this tuple, and remove the tuple for it. |
| * |
| * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound- |
| * notify processing immediately if this backend is idle (ie, it is |
| * waiting for a frontend command and is not within a transaction block). |
| * Otherwise the handler may only set a flag, which will cause the |
| * processing to occur just before we next go idle. |
| * |
| * 5. Inbound-notify processing consists of scanning pg_listener for tuples |
| * matching our own listenerPID and having nonzero notification fields. |
| * For each such tuple, we send a message to our frontend and clear the |
| * notification field. BTW: this routine has to start/commit its own |
| * transaction, since by assumption it is only called from outside any |
| * transaction. |
| * |
| * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list |
| * of pending actions. If we reach transaction commit, the changes are |
| * applied to pg_listener just before executing any pending NOTIFYs. This |
| * method is necessary because to avoid race conditions, we must hold lock |
| * on pg_listener from when we insert a new listener tuple until we commit. |
| * To do that and not create undue hazard of deadlock, we don't want to |
| * touch pg_listener until we are otherwise done with the transaction; |
| * in particular it'd be uncool to still be taking user-commanded locks |
| * while holding the pg_listener lock. |
| * |
| * Although we grab ExclusiveLock on pg_listener for any operation, |
| * the lock is never held very long, so it shouldn't cause too much of |
| * a performance problem. (Previously we used AccessExclusiveLock, but |
| * there's no real reason to forbid concurrent reads.) |
| * |
| * An application that listens on the same relname it notifies will get |
| * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, |
| * by comparing be_pid in the NOTIFY message to the application's own backend's |
| * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the |
| * frontend during startup.) The above design guarantees that notifies from |
| * other backends will never be missed by ignoring self-notifies. Note, |
| * however, that we do *not* guarantee that a separate frontend message will |
| * be sent for every outside NOTIFY. Since there is only room for one |
| * originating PID in pg_listener, outside notifies occurring at about the |
| * same time may be collapsed into a single message bearing the PID of the |
| * first outside backend to perform the NOTIFY. |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include <unistd.h> |
| #include <signal.h> |
| |
| #include "access/heapam.h" |
| #include "access/twophase_rmgr.h" |
| #include "access/xact.h" |
| #include "catalog/pg_listener.h" |
| #include "commands/async.h" |
| #include "libpq/libpq.h" |
| #include "libpq/pqformat.h" |
| #include "miscadmin.h" |
| #include "storage/ipc.h" |
| #include "storage/proc.h" |
| #include "storage/sinval.h" |
| #include "tcop/tcopprot.h" |
| #include "utils/builtins.h" |
| #include "utils/fmgroids.h" |
| #include "utils/memutils.h" |
| #include "utils/ps_status.h" |
| #include "utils/tqual.h" |
| |
| |
| /* |
| * State for pending LISTEN/UNLISTEN actions consists of an ordered list of |
| * all actions requested in the current transaction. As explained above, |
| * we don't actually modify pg_listener until we reach transaction commit. |
| * |
| * The list is kept in CurTransactionContext. In subtransactions, each |
| * subtransaction has its own list in its own CurTransactionContext, but |
| * successful subtransactions attach their lists to their parent's list. |
| * Failed subtransactions simply discard their lists. |
| */ |
| typedef enum |
| { |
| LISTEN_LISTEN, |
| LISTEN_UNLISTEN, |
| LISTEN_UNLISTEN_ALL |
| } ListenActionKind; |
| |
| typedef struct |
| { |
| ListenActionKind action; |
| char condname[1]; /* actually, as long as needed */ |
| } ListenAction; |
| |
| static List *pendingActions = NIL; /* list of ListenAction */ |
| |
| static List *upperPendingActions = NIL; /* list of upper-xact lists */ |
| |
| /* |
| * State for outbound notifies consists of a list of all relnames NOTIFYed |
| * in the current transaction. We do not actually perform a NOTIFY until |
| * and unless the transaction commits. pendingNotifies is NIL if no |
| * NOTIFYs have been done in the current transaction. |
| * |
| * The list is kept in CurTransactionContext. In subtransactions, each |
| * subtransaction has its own list in its own CurTransactionContext, but |
| * successful subtransactions attach their lists to their parent's list. |
| * Failed subtransactions simply discard their lists. |
| * |
| * Note: the action and notify lists do not interact within a transaction. |
| * In particular, if a transaction does NOTIFY and then LISTEN on the same |
| * condition name, it will get a self-notify at commit. This is a bit odd |
| * but is consistent with our historical behavior. |
| */ |
| static List *pendingNotifies = NIL; /* list of C strings */ |
| |
| static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ |
| |
| /* |
| * State for inbound notifies consists of two flags: one saying whether |
| * the signal handler is currently allowed to call ProcessIncomingNotify |
| * directly, and one saying whether the signal has occurred but the handler |
| * was not allowed to call ProcessIncomingNotify at the time. |
| * |
| * NB: the "volatile" on these declarations is critical! If your compiler |
| * does not grok "volatile", you'd be best advised to compile this file |
| * with all optimization turned off. |
| */ |
| static volatile sig_atomic_t notifyInterruptEnabled = 0; |
| static volatile sig_atomic_t notifyInterruptOccurred = 0; |
| |
| /* True if we've registered an on_shmem_exit cleanup */ |
| static bool unlistenExitRegistered = false; |
| |
| bool Trace_notify = false; |
| |
| |
| static void queue_listen(ListenActionKind action, const char *condname); |
| static void Async_UnlistenOnExit(int code, Datum arg); |
| static void Exec_Listen(Relation lRel, const char *relname); |
| static void Exec_Unlisten(Relation lRel, const char *relname); |
| static void Exec_UnlistenAll(Relation lRel); |
| static void Send_Notify(Relation lRel); |
| static void ProcessIncomingNotify(void); |
| static void NotifyMyFrontEnd(char *relname, int32 listenerPID); |
| static bool AsyncExistsPendingNotify(const char *relname); |
| static void ClearPendingActionsAndNotifies(void); |
| |
| |
| /* |
| * Async_Notify |
| * |
| * This is executed by the SQL notify command. |
| * |
| * Adds the relation to the list of pending notifies. |
| * Actual notification happens during transaction commit. |
| * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| */ |
| void |
| Async_Notify(const char *relname) |
| { |
| if (Trace_notify) |
| elog(DEBUG1, "Async_Notify(%s)", relname); |
| |
| /* no point in making duplicate entries in the list ... */ |
| if (!AsyncExistsPendingNotify(relname)) |
| { |
| /* |
| * The name list needs to live until end of transaction, so store it |
| * in the transaction context. |
| */ |
| MemoryContext oldcontext; |
| |
| oldcontext = MemoryContextSwitchTo(CurTransactionContext); |
| |
| /* |
| * Ordering of the list isn't important. We choose to put new entries |
| * on the front, as this might make duplicate-elimination a tad faster |
| * when the same condition is signaled many times in a row. |
| */ |
| pendingNotifies = lcons(pstrdup(relname), pendingNotifies); |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| } |
| |
| /* |
| * queue_listen |
| * Common code for listen, unlisten, unlisten all commands. |
| * |
| * Adds the request to the list of pending actions. |
| * Actual update of pg_listener happens during transaction commit. |
| * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| */ |
| static void |
| queue_listen(ListenActionKind action, const char *condname) |
| { |
| MemoryContext oldcontext; |
| ListenAction *actrec; |
| |
| /* |
| * Unlike Async_Notify, we don't try to collapse out duplicates. It would |
| * be too complicated to ensure we get the right interactions of |
| * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there |
| * would be any performance benefit anyway in sane applications. |
| */ |
| oldcontext = MemoryContextSwitchTo(CurTransactionContext); |
| |
| /* space for terminating null is included in sizeof(ListenAction) */ |
| actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname)); |
| actrec->action = action; |
| strcpy(actrec->condname, condname); |
| |
| pendingActions = lappend(pendingActions, actrec); |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* |
| * Async_Listen |
| * |
| * This is executed by the SQL listen command. |
| */ |
| void |
| Async_Listen(const char *relname) |
| { |
| if (Trace_notify) |
| elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid); |
| |
| queue_listen(LISTEN_LISTEN, relname); |
| } |
| |
| /* |
| * Async_Unlisten |
| * |
| * This is executed by the SQL unlisten command. |
| */ |
| void |
| Async_Unlisten(const char *relname) |
| { |
| if (Trace_notify) |
| elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid); |
| |
| /* If we couldn't possibly be listening, no need to queue anything */ |
| if (pendingActions == NIL && !unlistenExitRegistered) |
| return; |
| |
| queue_listen(LISTEN_UNLISTEN, relname); |
| } |
| |
| /* |
| * Async_UnlistenAll |
| * |
| * This is invoked by UNLISTEN * command, and also at backend exit. |
| */ |
| void |
| Async_UnlistenAll(void) |
| { |
| if (Trace_notify) |
| elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid); |
| |
| /* If we couldn't possibly be listening, no need to queue anything */ |
| if (pendingActions == NIL && !unlistenExitRegistered) |
| return; |
| |
| queue_listen(LISTEN_UNLISTEN_ALL, ""); |
| } |
| |
| /* |
| * Async_UnlistenOnExit |
| * |
| * Clean up the pg_listener table at backend exit. |
| * |
| * This is executed if we have done any LISTENs in this backend. |
| * It might not be necessary anymore, if the user UNLISTENed everything, |
| * but we don't try to detect that case. |
| */ |
| static void |
| Async_UnlistenOnExit(int code, Datum arg) |
| { |
| /* |
| * We need to start/commit a transaction for the unlisten, but if there is |
| * already an active transaction we had better abort that one first. |
| * Otherwise we'd end up committing changes that probably ought to be |
| * discarded. |
| */ |
| AbortOutOfAnyTransaction(); |
| /* Now we can do the unlisten */ |
| StartTransactionCommand(); |
| Async_UnlistenAll(); |
| CommitTransactionCommand(); |
| } |
| |
| /* |
| * AtPrepare_Notify |
| * |
| * This is called at the prepare phase of a two-phase |
| * transaction. Save the state for possible commit later. |
| */ |
| void |
| AtPrepare_Notify(void) |
| { |
| ListCell *p; |
| |
| /* It's not sensible to have any pending LISTEN/UNLISTEN actions */ |
| if (pendingActions) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN"))); |
| |
| /* We can deal with pending NOTIFY though */ |
| foreach(p, pendingNotifies) |
| { |
| const char *relname = (const char *) lfirst(p); |
| |
| RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0, |
| relname, strlen(relname) + 1); |
| } |
| |
| /* |
| * We can clear the state immediately, rather than needing a separate |
| * PostPrepare call, because if the transaction fails we'd just discard |
| * the state anyway. |
| */ |
| ClearPendingActionsAndNotifies(); |
| } |
| |
| /* |
| * AtCommit_Notify |
| * |
| * This is called at transaction commit. |
| * |
| * If there are pending LISTEN/UNLISTEN actions, insert or delete |
| * tuples in pg_listener accordingly. |
| * |
| * If there are outbound notify requests in the pendingNotifies list, |
| * scan pg_listener for matching tuples, and either signal the other |
| * backend or send a message to our own frontend. |
| * |
| * NOTE: we are still inside the current transaction, therefore can |
| * piggyback on its committing of changes. |
| */ |
| void |
| AtCommit_Notify(void) |
| { |
| Relation lRel; |
| ListCell *p; |
| |
| if (pendingActions == NIL && pendingNotifies == NIL) |
| return; /* no relevant statements in this xact */ |
| |
| /* |
| * NOTIFY is disabled if not normal processing mode. This test used to be |
| * in xact.c, but it seems cleaner to do it here. |
| */ |
| if (!IsNormalProcessingMode()) |
| { |
| ClearPendingActionsAndNotifies(); |
| return; |
| } |
| |
| if (Trace_notify) |
| elog(DEBUG1, "AtCommit_Notify"); |
| |
| /* Acquire ExclusiveLock on pg_listener */ |
| lRel = heap_open(ListenerRelationId, ExclusiveLock); |
| |
| /* Perform any pending listen/unlisten actions */ |
| foreach(p, pendingActions) |
| { |
| ListenAction *actrec = (ListenAction *) lfirst(p); |
| |
| switch (actrec->action) |
| { |
| case LISTEN_LISTEN: |
| Exec_Listen(lRel, actrec->condname); |
| break; |
| case LISTEN_UNLISTEN: |
| Exec_Unlisten(lRel, actrec->condname); |
| break; |
| case LISTEN_UNLISTEN_ALL: |
| Exec_UnlistenAll(lRel); |
| break; |
| } |
| |
| /* We must CCI after each action in case of conflicting actions */ |
| CommandCounterIncrement(); |
| } |
| |
| /* Perform any pending notifies */ |
| if (pendingNotifies) |
| Send_Notify(lRel); |
| |
| /* |
| * We do NOT release the lock on pg_listener here; we need to hold it |
| * until end of transaction (which is about to happen, anyway) to ensure |
| * that notified backends see our tuple updates when they look. Else they |
| * might disregard the signal, which would make the application programmer |
| * very unhappy. Also, this prevents race conditions when we have just |
| * inserted a listening tuple. |
| */ |
| heap_close(lRel, NoLock); |
| |
| ClearPendingActionsAndNotifies(); |
| |
| if (Trace_notify) |
| elog(DEBUG1, "AtCommit_Notify: done"); |
| } |
| |
| /* |
| * Exec_Listen --- subroutine for AtCommit_Notify |
| * |
| * Register the current backend as listening on the specified relation. |
| */ |
| static void |
| Exec_Listen(Relation lRel, const char *relname) |
| { |
| HeapScanDesc scan; |
| HeapTuple tuple; |
| Datum values[Natts_pg_listener]; |
| bool nulls[Natts_pg_listener]; |
| NameData condname; |
| bool alreadyListener = false; |
| |
| if (Trace_notify) |
| elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid); |
| |
| /* Detect whether we are already listening on this relname */ |
| scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); |
| while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) |
| { |
| Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); |
| |
| if (listener->listenerpid == MyProcPid && |
| strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) |
| { |
| alreadyListener = true; |
| /* No need to scan the rest of the table */ |
| break; |
| } |
| } |
| heap_endscan(scan); |
| |
| if (alreadyListener) |
| return; |
| |
| /* |
| * OK to insert a new tuple |
| */ |
| memset(nulls, false, sizeof(nulls)); |
| |
| namestrcpy(&condname, relname); |
| values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname); |
| values[Anum_pg_listener_listenerpid - 1] = Int32GetDatum(MyProcPid); |
| values[Anum_pg_listener_notification - 1] = Int32GetDatum(0); /* no notifies pending */ |
| |
| tuple = heap_form_tuple(RelationGetDescr(lRel), values, nulls); |
| |
| simple_heap_insert(lRel, tuple); |
| |
| #ifdef NOT_USED /* currently there are no indexes */ |
| CatalogUpdateIndexes(lRel, tuple); |
| #endif |
| |
| heap_freetuple(tuple); |
| |
| /* |
| * now that we are listening, make sure we will unlisten before dying. |
| */ |
| if (!unlistenExitRegistered) |
| { |
| on_shmem_exit(Async_UnlistenOnExit, 0); |
| unlistenExitRegistered = true; |
| } |
| } |
| |
| /* |
| * Exec_Unlisten --- subroutine for AtCommit_Notify |
| * |
| * Remove the current backend from the list of listening backends |
| * for the specified relation. |
| */ |
| static void |
| Exec_Unlisten(Relation lRel, const char *relname) |
| { |
| HeapScanDesc scan; |
| HeapTuple tuple; |
| |
| if (Trace_notify) |
| elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid); |
| |
| scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); |
| while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) |
| { |
| Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple); |
| |
| if (listener->listenerpid == MyProcPid && |
| strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0) |
| { |
| /* Found the matching tuple, delete it */ |
| simple_heap_delete(lRel, &tuple->t_self); |
| |
| /* |
| * We assume there can be only one match, so no need to scan the |
| * rest of the table |
| */ |
| break; |
| } |
| } |
| heap_endscan(scan); |
| |
| /* |
| * We do not complain about unlistening something not being listened; |
| * should we? |
| */ |
| } |
| |
| /* |
| * Exec_UnlistenAll --- subroutine for AtCommit_Notify |
| * |
| * Update pg_listener to unlisten all relations for this backend. |
| */ |
| static void |
| Exec_UnlistenAll(Relation lRel) |
| { |
| HeapScanDesc scan; |
| HeapTuple lTuple; |
| ScanKeyData key[1]; |
| |
| if (Trace_notify) |
| elog(DEBUG1, "Exec_UnlistenAll"); |
| |
| /* Find and delete all entries with my listenerPID */ |
| ScanKeyInit(&key[0], |
| Anum_pg_listener_listenerpid, |
| BTEqualStrategyNumber, F_INT4EQ, |
| Int32GetDatum(MyProcPid)); |
| scan = heap_beginscan(lRel, SnapshotNow, 1, key); |
| |
| while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) |
| simple_heap_delete(lRel, &lTuple->t_self); |
| |
| heap_endscan(scan); |
| } |
| |
| /* |
| * Send_Notify --- subroutine for AtCommit_Notify |
| * |
| * Scan pg_listener for tuples matching our pending notifies, and |
| * either signal the other backend or send a message to our own frontend. |
| */ |
| static void |
| Send_Notify(Relation lRel) |
| { |
| TupleDesc tdesc = RelationGetDescr(lRel); |
| HeapScanDesc scan; |
| HeapTuple lTuple, |
| rTuple; |
| Datum value[Natts_pg_listener]; |
| bool repl[Natts_pg_listener], |
| nulls[Natts_pg_listener]; |
| |
| /* preset data to update notify column to MyProcPid */ |
| memset(nulls, false, sizeof(nulls)); |
| memset(repl, false, sizeof(repl)); |
| repl[Anum_pg_listener_notification - 1] = true; |
| memset(value, 0, sizeof(value)); |
| value[Anum_pg_listener_notification - 1] = Int32GetDatum(MyProcPid); |
| |
| scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); |
| |
| while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) |
| { |
| Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); |
| char *relname = NameStr(listener->relname); |
| int32 listenerPID = listener->listenerpid; |
| |
| if (!AsyncExistsPendingNotify(relname)) |
| continue; |
| |
| if (listenerPID == MyProcPid) |
| { |
| /* |
| * Self-notify: no need to bother with table update. Indeed, we |
| * *must not* clear the notification field in this path, or we |
| * could lose an outside notify, which'd be bad for applications |
| * that ignore self-notify messages. |
| */ |
| if (Trace_notify) |
| elog(DEBUG1, "AtCommit_Notify: notifying self"); |
| |
| NotifyMyFrontEnd(relname, listenerPID); |
| } |
| else |
| { |
| if (Trace_notify) |
| elog(DEBUG1, "AtCommit_Notify: notifying pid %d", |
| listenerPID); |
| |
| /* |
| * If someone has already notified this listener, we don't bother |
| * modifying the table, but we do still send a SIGUSR2 signal, |
| * just in case that backend missed the earlier signal for some |
| * reason. It's OK to send the signal first, because the other |
| * guy can't read pg_listener until we unlock it. |
| */ |
| if (kill(listenerPID, SIGUSR2) < 0) |
| { |
| /* |
| * Get rid of pg_listener entry if it refers to a PID that no |
| * longer exists. Presumably, that backend crashed without |
| * deleting its pg_listener entries. This code used to only |
| * delete the entry if errno==ESRCH, but as far as I can see |
| * we should just do it for any failure (certainly at least |
| * for EPERM too...) |
| */ |
| simple_heap_delete(lRel, &lTuple->t_self); |
| } |
| else if (listener->notification == 0) |
| { |
| /* Rewrite the tuple with my PID in notification column */ |
| rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl); |
| simple_heap_update(lRel, &lTuple->t_self, rTuple); |
| |
| #ifdef NOT_USED /* currently there are no indexes */ |
| CatalogUpdateIndexes(lRel, rTuple); |
| #endif |
| } |
| } |
| } |
| |
| heap_endscan(scan); |
| } |
| |
| /* |
| * AtAbort_Notify |
| * |
| * This is called at transaction abort. |
| * |
| * Gets rid of pending actions and outbound notifies that we would have |
| * executed if the transaction got committed. |
| */ |
| void |
| AtAbort_Notify(void) |
| { |
| ClearPendingActionsAndNotifies(); |
| } |
| |
| /* |
| * AtSubStart_Notify() --- Take care of subtransaction start. |
| * |
| * Push empty state for the new subtransaction. |
| */ |
| void |
| AtSubStart_Notify(void) |
| { |
| MemoryContext old_cxt; |
| |
| /* Keep the list-of-lists in TopTransactionContext for simplicity */ |
| old_cxt = MemoryContextSwitchTo(TopTransactionContext); |
| |
| upperPendingActions = lcons(pendingActions, upperPendingActions); |
| |
| Assert(list_length(upperPendingActions) == |
| GetCurrentTransactionNestLevel() - 1); |
| |
| pendingActions = NIL; |
| |
| upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies); |
| |
| Assert(list_length(upperPendingNotifies) == |
| GetCurrentTransactionNestLevel() - 1); |
| |
| pendingNotifies = NIL; |
| |
| MemoryContextSwitchTo(old_cxt); |
| } |
| |
| /* |
| * AtSubCommit_Notify() --- Take care of subtransaction commit. |
| * |
| * Reassign all items in the pending lists to the parent transaction. |
| */ |
| void |
| AtSubCommit_Notify(void) |
| { |
| List *parentPendingActions; |
| List *parentPendingNotifies; |
| |
| parentPendingActions = (List *) linitial(upperPendingActions); |
| upperPendingActions = list_delete_first(upperPendingActions); |
| |
| Assert(list_length(upperPendingActions) == |
| GetCurrentTransactionNestLevel() - 2); |
| |
| /* |
| * Mustn't try to eliminate duplicates here --- see queue_listen() |
| */ |
| pendingActions = list_concat(parentPendingActions, pendingActions); |
| |
| parentPendingNotifies = (List *) linitial(upperPendingNotifies); |
| upperPendingNotifies = list_delete_first(upperPendingNotifies); |
| |
| Assert(list_length(upperPendingNotifies) == |
| GetCurrentTransactionNestLevel() - 2); |
| |
| /* |
| * We could try to eliminate duplicates here, but it seems not worthwhile. |
| */ |
| pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies); |
| } |
| |
| /* |
| * AtSubAbort_Notify() --- Take care of subtransaction abort. |
| */ |
| void |
| AtSubAbort_Notify(void) |
| { |
| int my_level = GetCurrentTransactionNestLevel(); |
| |
| /* |
| * All we have to do is pop the stack --- the actions/notifies made in |
| * this subxact are no longer interesting, and the space will be freed |
| * when CurTransactionContext is recycled. |
| * |
| * This routine could be called more than once at a given nesting level if |
| * there is trouble during subxact abort. Avoid dumping core by using |
| * GetCurrentTransactionNestLevel as the indicator of how far we need to |
| * prune the list. |
| */ |
| while (list_length(upperPendingActions) > my_level - 2) |
| { |
| pendingActions = (List *) linitial(upperPendingActions); |
| upperPendingActions = list_delete_first(upperPendingActions); |
| } |
| |
| while (list_length(upperPendingNotifies) > my_level - 2) |
| { |
| pendingNotifies = (List *) linitial(upperPendingNotifies); |
| upperPendingNotifies = list_delete_first(upperPendingNotifies); |
| } |
| } |
| |
| /* |
| * NotifyInterruptHandler |
| * |
| * This is the signal handler for SIGUSR2. |
| * |
| * If we are idle (notifyInterruptEnabled is set), we can safely invoke |
| * ProcessIncomingNotify directly. Otherwise, just set a flag |
| * to do it later. |
| */ |
| void |
| NotifyInterruptHandler(SIGNAL_ARGS) |
| { |
| int save_errno = errno; |
| |
| /* |
| * Note: this is a SIGNAL HANDLER. You must be very wary what you do |
| * here. Some helpful soul had this routine sprinkled with TPRINTFs, which |
| * would likely lead to corruption of stdio buffers if they were ever |
| * turned on. |
| */ |
| |
| /* Don't joggle the elbow of proc_exit */ |
| if (proc_exit_inprogress) |
| return; |
| |
| if (notifyInterruptEnabled) |
| { |
| bool save_ImmediateInterruptOK = ImmediateInterruptOK; |
| |
| /* |
| * We may be called while ImmediateInterruptOK is true; turn it off |
| * while messing with the NOTIFY state. (We would have to save and |
| * restore it anyway, because PGSemaphore operations inside |
| * ProcessIncomingNotify() might reset it.) |
| */ |
| ImmediateInterruptOK = false; |
| |
| /* |
| * I'm not sure whether some flavors of Unix might allow another |
| * SIGUSR2 occurrence to recursively interrupt this routine. To cope |
| * with the possibility, we do the same sort of dance that |
| * EnableNotifyInterrupt must do --- see that routine for comments. |
| */ |
| notifyInterruptEnabled = 0; /* disable any recursive signal */ |
| notifyInterruptOccurred = 1; /* do at least one iteration */ |
| for (;;) |
| { |
| notifyInterruptEnabled = 1; |
| if (!notifyInterruptOccurred) |
| break; |
| notifyInterruptEnabled = 0; |
| if (notifyInterruptOccurred) |
| { |
| /* Here, it is finally safe to do stuff. */ |
| if (Trace_notify) |
| elog(DEBUG1, "NotifyInterruptHandler: perform async notify"); |
| |
| ProcessIncomingNotify(); |
| |
| if (Trace_notify) |
| elog(DEBUG1, "NotifyInterruptHandler: done"); |
| } |
| } |
| |
| /* |
| * Restore ImmediateInterruptOK, and check for interrupts if needed. |
| */ |
| ImmediateInterruptOK = save_ImmediateInterruptOK; |
| if (save_ImmediateInterruptOK) |
| CHECK_FOR_INTERRUPTS(); |
| } |
| else |
| { |
| /* |
| * In this path it is NOT SAFE to do much of anything, except this: |
| */ |
| notifyInterruptOccurred = 1; |
| } |
| |
| errno = save_errno; |
| } |
| |
| /* |
| * EnableNotifyInterrupt |
| * |
| * This is called by the PostgresMain main loop just before waiting |
| * for a frontend command. If we are truly idle (ie, *not* inside |
| * a transaction block), then process any pending inbound notifies, |
| * and enable the signal handler to process future notifies directly. |
| * |
| * NOTE: the signal handler starts out disabled, and stays so until |
| * PostgresMain calls this the first time. |
| */ |
| void |
| EnableNotifyInterrupt(void) |
| { |
| if (IsTransactionOrTransactionBlock()) |
| return; /* not really idle */ |
| |
| /* |
| * This code is tricky because we are communicating with a signal handler |
| * that could interrupt us at any point. If we just checked |
| * notifyInterruptOccurred and then set notifyInterruptEnabled, we could |
| * fail to respond promptly to a signal that happens in between those two |
| * steps. (A very small time window, perhaps, but Murphy's Law says you |
| * can hit it...) Instead, we first set the enable flag, then test the |
| * occurred flag. If we see an unserviced interrupt has occurred, we |
| * re-clear the enable flag before going off to do the service work. (That |
| * prevents re-entrant invocation of ProcessIncomingNotify() if another |
| * interrupt occurs.) If an interrupt comes in between the setting and |
| * clearing of notifyInterruptEnabled, then it will have done the service |
| * work and left notifyInterruptOccurred zero, so we have to check again |
| * after clearing enable. The whole thing has to be in a loop in case |
| * another interrupt occurs while we're servicing the first. Once we get |
| * out of the loop, enable is set and we know there is no unserviced |
| * interrupt. |
| * |
| * NB: an overenthusiastic optimizing compiler could easily break this |
| * code. Hopefully, they all understand what "volatile" means these days. |
| */ |
| for (;;) |
| { |
| notifyInterruptEnabled = 1; |
| if (!notifyInterruptOccurred) |
| break; |
| notifyInterruptEnabled = 0; |
| if (notifyInterruptOccurred) |
| { |
| if (Trace_notify) |
| elog(DEBUG1, "EnableNotifyInterrupt: perform async notify"); |
| |
| ProcessIncomingNotify(); |
| |
| if (Trace_notify) |
| elog(DEBUG1, "EnableNotifyInterrupt: done"); |
| } |
| } |
| } |
| |
| /* |
| * DisableNotifyInterrupt |
| * |
| * This is called by the PostgresMain main loop just after receiving |
| * a frontend command. Signal handler execution of inbound notifies |
| * is disabled until the next EnableNotifyInterrupt call. |
| * |
| * The SIGUSR1 signal handler also needs to call this, so as to |
| * prevent conflicts if one signal interrupts the other. So we |
| * must return the previous state of the flag. |
| */ |
| bool |
| DisableNotifyInterrupt(void) |
| { |
| bool result = (notifyInterruptEnabled != 0); |
| |
| notifyInterruptEnabled = 0; |
| |
| return result; |
| } |
| |
| /* |
| * ProcessIncomingNotify |
| * |
| * Deal with arriving NOTIFYs from other backends. |
| * This is called either directly from the SIGUSR2 signal handler, |
| * or the next time control reaches the outer idle loop. |
| * Scan pg_listener for arriving notifies, report them to my front end, |
| * and clear the notification field in pg_listener until next time. |
| * |
| * NOTE: since we are outside any transaction, we must create our own. |
| */ |
| static void |
| ProcessIncomingNotify(void) |
| { |
| Relation lRel; |
| TupleDesc tdesc; |
| ScanKeyData key[1]; |
| HeapScanDesc scan; |
| HeapTuple lTuple, |
| rTuple; |
| Datum value[Natts_pg_listener]; |
| bool repl[Natts_pg_listener], |
| nulls[Natts_pg_listener]; |
| bool catchup_enabled; |
| bool client_wait_timeout_enabled; |
| |
| /* Must prevent SIGUSR1 interrupt while I am running */ |
| catchup_enabled = DisableCatchupInterrupt(); |
| client_wait_timeout_enabled = DisableClientWaitTimeoutInterrupt(); |
| |
| if (Trace_notify) |
| elog(DEBUG1, "ProcessIncomingNotify"); |
| |
| set_ps_display("notify interrupt", false); |
| |
| notifyInterruptOccurred = 0; |
| |
| StartTransactionCommand(); |
| |
| lRel = heap_open(ListenerRelationId, ExclusiveLock); |
| tdesc = RelationGetDescr(lRel); |
| |
| /* Scan only entries with my listenerPID */ |
| ScanKeyInit(&key[0], |
| Anum_pg_listener_listenerpid, |
| BTEqualStrategyNumber, F_INT4EQ, |
| Int32GetDatum(MyProcPid)); |
| scan = heap_beginscan(lRel, SnapshotNow, 1, key); |
| |
| /* Prepare data for rewriting 0 into notification field */ |
| memset(nulls, false, sizeof(nulls)); |
| memset(repl, false, sizeof(repl)); |
| repl[Anum_pg_listener_notification - 1] = true; |
| memset(value, 0, sizeof(value)); |
| value[Anum_pg_listener_notification - 1] = Int32GetDatum(0); |
| |
| while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) |
| { |
| Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple); |
| char *relname = NameStr(listener->relname); |
| int32 sourcePID = listener->notification; |
| |
| if (sourcePID != 0) |
| { |
| /* Notify the frontend */ |
| |
| if (Trace_notify) |
| elog(DEBUG1, "ProcessIncomingNotify: received %s from %d", |
| relname, (int) sourcePID); |
| |
| NotifyMyFrontEnd(relname, sourcePID); |
| |
| /* |
| * Rewrite the tuple with 0 in notification column. |
| */ |
| rTuple = heap_modify_tuple(lTuple, tdesc, value, nulls, repl); |
| simple_heap_update(lRel, &lTuple->t_self, rTuple); |
| |
| #ifdef NOT_USED /* currently there are no indexes */ |
| CatalogUpdateIndexes(lRel, rTuple); |
| #endif |
| } |
| } |
| heap_endscan(scan); |
| |
| /* |
| * We do NOT release the lock on pg_listener here; we need to hold it |
| * until end of transaction (which is about to happen, anyway) to ensure |
| * that other backends see our tuple updates when they look. Otherwise, a |
| * transaction started after this one might mistakenly think it doesn't |
| * need to send this backend a new NOTIFY. |
| */ |
| heap_close(lRel, NoLock); |
| |
| CommitTransactionCommand(); |
| |
| /* |
| * Must flush the notify messages to ensure frontend gets them promptly. |
| */ |
| pq_flush(); |
| |
| set_ps_display("idle", false); |
| |
| if (Trace_notify) |
| elog(DEBUG1, "ProcessIncomingNotify: done"); |
| |
| if (catchup_enabled) |
| EnableCatchupInterrupt(); |
| |
| if (client_wait_timeout_enabled) |
| EnableClientWaitTimeoutInterrupt(); |
| } |
| |
| /* |
| * Send NOTIFY message to my front end. |
| */ |
| static void |
| NotifyMyFrontEnd(char *relname, int32 listenerPID) |
| { |
| if (whereToSendOutput == DestRemote) |
| { |
| StringInfoData buf; |
| |
| pq_beginmessage(&buf, 'A'); |
| pq_sendint(&buf, listenerPID, sizeof(int32)); |
| pq_sendstring(&buf, relname); |
| if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) |
| { |
| /* XXX Add parameter string here later */ |
| pq_sendstring(&buf, ""); |
| } |
| pq_endmessage(&buf); |
| |
| /* |
| * NOTE: we do not do pq_flush() here. For a self-notify, it will |
| * happen at the end of the transaction, and for incoming notifies |
| * ProcessIncomingNotify will do it after finding all the notifies. |
| */ |
| } |
| else |
| elog(INFO, "NOTIFY for %s", relname); |
| } |
| |
| /* Does pendingNotifies include the given relname? */ |
| static bool |
| AsyncExistsPendingNotify(const char *relname) |
| { |
| ListCell *p; |
| |
| foreach(p, pendingNotifies) |
| { |
| const char *prelname = (const char *) lfirst(p); |
| |
| if (strcmp(prelname, relname) == 0) |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /* Clear the pendingActions and pendingNotifies lists. */ |
| static void |
| ClearPendingActionsAndNotifies(void) |
| { |
| /* |
| * We used to have to explicitly deallocate the list members and nodes, |
| * because they were malloc'd. Now, since we know they are palloc'd in |
| * CurTransactionContext, we need not do that --- they'll go away |
| * automatically at transaction exit. We need only reset the list head |
| * pointers. |
| */ |
| pendingActions = NIL; |
| pendingNotifies = NIL; |
| } |
| |
| /* |
| * 2PC processing routine for COMMIT PREPARED case. |
| * |
| * (We don't have to do anything for ROLLBACK PREPARED.) |
| */ |
| void |
| notify_twophase_postcommit(TransactionId xid, uint16 info, |
| void *recdata, uint32 len) |
| { |
| /* |
| * Set up to issue the NOTIFY at the end of my own current transaction. |
| * (XXX this has some issues if my own transaction later rolls back, or if |
| * there is any significant delay before I commit. OK for now because we |
| * disallow COMMIT PREPARED inside a transaction block.) |
| */ |
| Async_Notify((char *) recdata); |
| } |