| /*------------------------------------------------------------------------- |
| * |
| * async.c |
| * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN |
| * |
| * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * IDENTIFICATION |
| * src/backend/commands/async.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * Async Notification Model as of 9.0: |
| * |
| * 1. Multiple backends on same machine. Multiple backends listening on |
| * several channels. (Channels are also called "conditions" in other |
| * parts of the code.) |
| * |
| * 2. There is one central queue in disk-based storage (directory pg_notify/), |
| * with actively-used pages mapped into shared memory by the slru.c module. |
| * All notification messages are placed in the queue and later read out |
| * by listening backends. |
| * |
| * There is no central knowledge of which backend listens on which channel; |
| * every backend has its own list of interesting channels. |
| * |
| * Although there is only one queue, notifications are treated as being |
| * database-local; this is done by including the sender's database OID |
| * in each notification message. Listening backends ignore messages |
| * that don't match their database OID. This is important because it |
| * ensures senders and receivers have the same database encoding and won't |
| * misinterpret non-ASCII text in the channel name or payload string. |
| * |
| * Since notifications are not expected to survive database crashes, |
| * we can simply clean out the pg_notify data at any reboot, and there |
| * is no need for WAL support or fsync'ing. |
| * |
| * 3. Every backend that is listening on at least one channel registers by |
| * entering its PID into the array in AsyncQueueControl. It then scans all |
| * incoming notifications in the central queue and first compares the |
| * database OID of the notification with its own database OID and then |
| * compares the notified channel with the list of channels that it listens |
| * to. In case there is a match it delivers the notification event to its |
| * frontend. Non-matching events are simply skipped. |
| * |
| * 4. The NOTIFY statement (routine Async_Notify) stores the notification in |
| * a backend-local list which will not be processed until transaction end. |
| * |
| * Duplicate notifications from the same transaction are sent out as one |
| * notification only. This is done to save work when for example a trigger |
| * on a 2 million row table fires a notification for each row that has been |
| * changed. If the application needs to receive every single notification |
| * that has been sent, it can easily add some unique string into the extra |
| * payload parameter. |
| * |
| * When the transaction is ready to commit, PreCommit_Notify() adds the |
| * pending notifications to the head of the queue. The head pointer of the |
| * queue always points to the next free position and a position is just a |
| * page number and the offset in that page. This is done before marking the |
| * transaction as committed in clog. If we run into problems writing the |
| * notifications, we can still call elog(ERROR, ...) and the transaction |
| * will roll back. |
| * |
| * Once we have put all of the notifications into the queue, we return to |
| * CommitTransaction() which will then do the actual transaction commit. |
| * |
| * After commit we are called another time (AtCommit_Notify()). Here we |
| * make any actual updates to the effective listen state (listenChannels). |
| * Then we signal any backends that may be interested in our messages |
| * (including our own backend, if listening). This is done by |
| * SignalBackends(), which scans the list of listening backends and sends a |
| * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't |
| * know which backend is listening on which channel so we must signal them |
| * all). We can exclude backends that are already up to date, though, and |
| * we can also exclude backends that are in other databases (unless they |
| * are way behind and should be kicked to make them advance their |
| * pointers). |
| * |
| * Finally, after we are out of the transaction altogether and about to go |
| * idle, we scan the queue for messages that need to be sent to our |
| * frontend (which might be notifies from other backends, or self-notifies |
| * from our own). This step is not part of the CommitTransaction sequence |
| * for two important reasons. First, we could get errors while sending |
| * data to our frontend, and it's really bad for errors to happen in |
| * post-commit cleanup. Second, in cases where a procedure issues commits |
| * within a single frontend command, we don't want to send notifies to our |
| * frontend until the command is done; but notifies to other backends |
| * should go out immediately after each commit. |
| * |
| * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler |
| * sets the process's latch, which triggers the event to be processed |
| * immediately if this backend is idle (i.e., it is waiting for a frontend |
| * command and is not within a transaction block. C.f. |
| * ProcessClientReadInterrupt()). Otherwise the handler may only set a |
| * flag, which will cause the processing to occur just before we next go |
| * idle. |
| * |
| * Inbound-notify processing consists of reading all of the notifications |
| * that have arrived since scanning last time. We read every notification |
| * until we reach either a notification from an uncommitted transaction or |
| * the head pointer's position. |
| * |
| * 6. To avoid SLRU wraparound and limit disk space consumption, the tail |
| * pointer needs to be advanced so that old pages can be truncated. |
| * This is relatively expensive (notably, it requires an exclusive lock), |
| * so we don't want to do it often. We make sending backends do this work |
| * if they advanced the queue head into a new page, but only once every |
| * QUEUE_CLEANUP_DELAY pages. |
| * |
| * An application that listens on the same channel it notifies will get |
| * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, |
| * by comparing be_pid in the NOTIFY message to the application's own backend's |
| * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the |
| * frontend during startup.) The above design guarantees that notifies from |
| * other backends will never be missed by ignoring self-notifies. |
| * |
| * The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS) |
| * can be varied without affecting anything but performance. The maximum |
| * amount of notification data that can be queued at one time is determined |
| * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below. |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include <limits.h> |
| #include <unistd.h> |
| #include <signal.h> |
| |
| #include "access/parallel.h" |
| #include "access/slru.h" |
| #include "access/transam.h" |
| #include "access/xact.h" |
| #include "catalog/pg_database.h" |
| #include "commands/async.h" |
| #include "common/hashfn.h" |
| #include "funcapi.h" |
| #include "libpq/libpq.h" |
| #include "libpq/pqformat.h" |
| #include "miscadmin.h" |
| #include "storage/ipc.h" |
| #include "storage/lmgr.h" |
| #include "storage/proc.h" |
| #include "storage/procarray.h" |
| #include "storage/procsignal.h" |
| #include "storage/sinval.h" |
| #include "tcop/tcopprot.h" |
| #include "utils/builtins.h" |
| #include "utils/memutils.h" |
| #include "utils/ps_status.h" |
| #include "utils/snapmgr.h" |
| #include "utils/timestamp.h" |
| |
| #include "tcop/idle_resource_cleaner.h" |
| |
| |
| /* |
| * Maximum size of a NOTIFY payload, including terminating NULL. This |
| * must be kept small enough so that a notification message fits on one |
| * SLRU page. The magic fudge factor here is noncritical as long as it's |
| * more than AsyncQueueEntryEmptySize --- we make it significantly bigger |
| * than that, so changes in that data structure won't affect user-visible |
| * restrictions. |
| */ |
| #define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128) |
| |
| /* |
| * Struct representing an entry in the global notify queue |
| * |
| * This struct declaration has the maximal length, but in a real queue entry |
| * the data area is only big enough for the actual channel and payload strings |
| * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible |
| * entry size, if both channel and payload strings are empty (but note it |
| * doesn't include alignment padding). |
| * |
| * The "length" field should always be rounded up to the next QUEUEALIGN |
| * multiple so that all fields are properly aligned. |
| */ |
| typedef struct AsyncQueueEntry |
| { |
| int length; /* total allocated length of entry */ |
| Oid dboid; /* sender's database OID */ |
| TransactionId xid; /* sender's XID */ |
| int32 srcPid; /* sender's PID */ |
| char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; |
| } AsyncQueueEntry; |
| |
| /* Currently, no field of AsyncQueueEntry requires more than int alignment */ |
| #define QUEUEALIGN(len) INTALIGN(len) |
| |
| #define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2) |
| |
| /* |
| * Struct describing a queue position, and assorted macros for working with it |
| */ |
| typedef struct QueuePosition |
| { |
| int page; /* SLRU page number */ |
| int offset; /* byte offset within page */ |
| } QueuePosition; |
| |
| #define QUEUE_POS_PAGE(x) ((x).page) |
| #define QUEUE_POS_OFFSET(x) ((x).offset) |
| |
| #define SET_QUEUE_POS(x,y,z) \ |
| do { \ |
| (x).page = (y); \ |
| (x).offset = (z); \ |
| } while (0) |
| |
| #define QUEUE_POS_EQUAL(x,y) \ |
| ((x).page == (y).page && (x).offset == (y).offset) |
| |
| #define QUEUE_POS_IS_ZERO(x) \ |
| ((x).page == 0 && (x).offset == 0) |
| |
| /* choose logically smaller QueuePosition */ |
| #define QUEUE_POS_MIN(x,y) \ |
| (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \ |
| (x).page != (y).page ? (y) : \ |
| (x).offset < (y).offset ? (x) : (y)) |
| |
| /* choose logically larger QueuePosition */ |
| #define QUEUE_POS_MAX(x,y) \ |
| (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \ |
| (x).page != (y).page ? (x) : \ |
| (x).offset > (y).offset ? (x) : (y)) |
| |
| /* |
| * Parameter determining how often we try to advance the tail pointer: |
| * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is |
| * also the distance by which a backend in another database needs to be |
| * behind before we'll decide we need to wake it up to advance its pointer. |
| * |
| * Resist the temptation to make this really large. While that would save |
| * work in some places, it would add cost in others. In particular, this |
| * should likely be less than NUM_NOTIFY_BUFFERS, to ensure that backends |
| * catch up before the pages they'll need to read fall out of SLRU cache. |
| */ |
| #define QUEUE_CLEANUP_DELAY 4 |
| |
| /* |
| * Struct describing a listening backend's status |
| */ |
| typedef struct QueueBackendStatus |
| { |
| int32 pid; /* either a PID or InvalidPid */ |
| Oid dboid; /* backend's database OID, or InvalidOid */ |
| BackendId nextListener; /* id of next listener, or InvalidBackendId */ |
| QueuePosition pos; /* backend has read queue up to here */ |
| } QueueBackendStatus; |
| |
| /* |
| * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff) |
| * |
| * The AsyncQueueControl structure is protected by the NotifyQueueLock and |
| * NotifyQueueTailLock. |
| * |
| * When holding NotifyQueueLock in SHARED mode, backends may only inspect |
| * their own entries as well as the head and tail pointers. Consequently we |
| * can allow a backend to update its own record while holding only SHARED lock |
| * (since no other backend will inspect it). |
| * |
| * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the |
| * entries of other backends and also change the head pointer. When holding |
| * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends |
| * can change the tail pointers. |
| * |
| * NotifySLRULock is used as the control lock for the pg_notify SLRU buffers. |
| * In order to avoid deadlocks, whenever we need multiple locks, we first get |
| * NotifyQueueTailLock, then NotifyQueueLock, and lastly NotifySLRULock. |
| * |
| * Each backend uses the backend[] array entry with index equal to its |
| * BackendId (which can range from 1 to MaxBackends). We rely on this to make |
| * SendProcSignal fast. |
| * |
| * The backend[] array entries for actively-listening backends are threaded |
| * together using firstListener and the nextListener links, so that we can |
| * scan them without having to iterate over inactive entries. We keep this |
| * list in order by BackendId so that the scan is cache-friendly when there |
| * are many active entries. |
| */ |
| typedef struct AsyncQueueControl |
| { |
| QueuePosition head; /* head points to the next free location */ |
| QueuePosition tail; /* tail must be <= the queue position of every |
| * listening backend */ |
| int stopPage; /* oldest unrecycled page; must be <= |
| * tail.page */ |
| BackendId firstListener; /* id of first listener, or InvalidBackendId */ |
| TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ |
| QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; |
| /* backend[0] is not used; used entries are from [1] to [MaxBackends] */ |
| } AsyncQueueControl; |
| |
| static AsyncQueueControl *asyncQueueControl; |
| |
| #define QUEUE_HEAD (asyncQueueControl->head) |
| #define QUEUE_TAIL (asyncQueueControl->tail) |
| #define QUEUE_STOP_PAGE (asyncQueueControl->stopPage) |
| #define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener) |
| #define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid) |
| #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) |
| #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) |
| #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) |
| |
| /* |
| * The SLRU buffer area through which we access the notification queue |
| */ |
| static SlruCtlData NotifyCtlData; |
| |
| #define NotifyCtl (&NotifyCtlData) |
| #define QUEUE_PAGESIZE BLCKSZ |
| #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ |
| |
| /* |
| * Use segments 0000 through FFFF. Each contains SLRU_PAGES_PER_SEGMENT pages |
| * which gives us the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1. |
| * We could use as many segments as SlruScanDirectory() allows, but this gives |
| * us so much space already that it doesn't seem worth the trouble. |
| * |
| * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2 |
| * pages, because more than that would confuse slru.c into thinking there |
| * was a wraparound condition. With the default BLCKSZ this means there |
| * can be up to 8GB of queued-and-not-read data. |
| * |
| * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of |
| * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour. |
| */ |
| #define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1) |
| |
| /* |
| * listenChannels identifies the channels we are actually listening to |
| * (ie, have committed a LISTEN on). It is a simple list of channel names, |
| * allocated in TopMemoryContext. |
| */ |
| static List *listenChannels = NIL; /* list of C strings */ |
| |
| /* |
| * State for pending LISTEN/UNLISTEN actions consists of an ordered list of |
| * all actions requested in the current transaction. As explained above, |
| * we don't actually change listenChannels until we reach transaction commit. |
| * |
| * The list is kept in CurTransactionContext. In subtransactions, each |
| * subtransaction has its own list in its own CurTransactionContext, but |
| * successful subtransactions attach their lists to their parent's list. |
| * Failed subtransactions simply discard their lists. |
| */ |
| typedef enum |
| { |
| LISTEN_LISTEN, |
| LISTEN_UNLISTEN, |
| LISTEN_UNLISTEN_ALL |
| } ListenActionKind; |
| |
| typedef struct |
| { |
| ListenActionKind action; |
| char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */ |
| } ListenAction; |
| |
| typedef struct ActionList |
| { |
| int nestingLevel; /* current transaction nesting depth */ |
| List *actions; /* list of ListenAction structs */ |
| struct ActionList *upper; /* details for upper transaction levels */ |
| } ActionList; |
| |
| static ActionList *pendingActions = NULL; |
| |
| /* |
| * State for outbound notifies consists of a list of all channels+payloads |
| * NOTIFYed in the current transaction. We do not actually perform a NOTIFY |
| * until and unless the transaction commits. pendingNotifies is NULL if no |
| * NOTIFYs have been done in the current (sub) transaction. |
| * |
| * We discard duplicate notify events issued in the same transaction. |
| * Hence, in addition to the list proper (which we need to track the order |
| * of the events, since we guarantee to deliver them in order), we build a |
| * hash table which we can probe to detect duplicates. Since building the |
| * hash table is somewhat expensive, we do so only once we have at least |
| * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction; |
| * before that we just scan the events linearly. |
| * |
| * The list is kept in CurTransactionContext. In subtransactions, each |
| * subtransaction has its own list in its own CurTransactionContext, but |
| * successful subtransactions add their entries to their parent's list. |
| * Failed subtransactions simply discard their lists. Since these lists |
| * are independent, there may be notify events in a subtransaction's list |
| * that duplicate events in some ancestor (sub) transaction; we get rid of |
| * the dups when merging the subtransaction's list into its parent's. |
| * |
| * Note: the action and notify lists do not interact within a transaction. |
| * In particular, if a transaction does NOTIFY and then LISTEN on the same |
| * condition name, it will get a self-notify at commit. This is a bit odd |
| * but is consistent with our historical behavior. |
| */ |
| typedef struct Notification |
| { |
| uint16 channel_len; /* length of channel-name string */ |
| uint16 payload_len; /* length of payload string */ |
| /* null-terminated channel name, then null-terminated payload follow */ |
| char data[FLEXIBLE_ARRAY_MEMBER]; |
| } Notification; |
| |
| typedef struct NotificationList |
| { |
| int nestingLevel; /* current transaction nesting depth */ |
| List *events; /* list of Notification structs */ |
| HTAB *hashtab; /* hash of NotificationHash structs, or NULL */ |
| struct NotificationList *upper; /* details for upper transaction levels */ |
| } NotificationList; |
| |
| #define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */ |
| |
| typedef struct NotificationHash |
| { |
| Notification *event; /* => the actual Notification struct */ |
| } NotificationHash; |
| |
| static NotificationList *pendingNotifies = NULL; |
| |
| /* |
| * Inbound notifications are initially processed by HandleNotifyInterrupt(), |
| * called from inside a signal handler. That just sets the |
| * notifyInterruptPending flag and sets the process |
| * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to |
| * actually deal with the interrupt. |
| */ |
| volatile sig_atomic_t notifyInterruptPending = false; |
| |
| /* True if we've registered an on_shmem_exit cleanup */ |
| static bool unlistenExitRegistered = false; |
| |
| /* True if we're currently registered as a listener in asyncQueueControl */ |
| static bool amRegisteredListener = false; |
| |
| /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */ |
| static bool tryAdvanceTail = false; |
| |
| /* GUC parameter */ |
| bool Trace_notify = false; |
| |
| /* local function prototypes */ |
| static int asyncQueuePageDiff(int p, int q); |
| static bool asyncQueuePagePrecedes(int p, int q); |
| static void queue_listen(ListenActionKind action, const char *channel); |
| static void Async_UnlistenOnExit(int code, Datum arg); |
| static void Exec_ListenPreCommit(void); |
| static void Exec_ListenCommit(const char *channel); |
| static void Exec_UnlistenCommit(const char *channel); |
| static void Exec_UnlistenAllCommit(void); |
| static bool IsListeningOn(const char *channel); |
| static void asyncQueueUnregister(void); |
| static bool asyncQueueIsFull(void); |
| static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength); |
| static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); |
| static ListCell *asyncQueueAddEntries(ListCell *nextNotify); |
| static double asyncQueueUsage(void); |
| static void asyncQueueFillWarning(void); |
| static void SignalBackends(void); |
| static void asyncQueueReadAllNotifications(void); |
| static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, |
| QueuePosition stop, |
| char *page_buffer, |
| Snapshot snapshot); |
| static void asyncQueueAdvanceTail(void); |
| static void ProcessIncomingNotify(bool flush); |
| static bool AsyncExistsPendingNotify(Notification *n); |
| static void AddEventToPendingNotifies(Notification *n); |
| static uint32 notification_hash(const void *key, Size keysize); |
| static int notification_match(const void *key1, const void *key2, Size keysize); |
| static void ClearPendingActionsAndNotifies(void); |
| |
| /* |
| * Compute the difference between two queue page numbers (i.e., p - q), |
| * accounting for wraparound. |
| */ |
| static int |
| asyncQueuePageDiff(int p, int q) |
| { |
| int diff; |
| |
| /* |
| * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be |
| * in the range 0..QUEUE_MAX_PAGE. |
| */ |
| Assert(p >= 0 && p <= QUEUE_MAX_PAGE); |
| Assert(q >= 0 && q <= QUEUE_MAX_PAGE); |
| |
| diff = p - q; |
| if (diff >= ((QUEUE_MAX_PAGE + 1) / 2)) |
| diff -= QUEUE_MAX_PAGE + 1; |
| else if (diff < -((QUEUE_MAX_PAGE + 1) / 2)) |
| diff += QUEUE_MAX_PAGE + 1; |
| return diff; |
| } |
| |
| /* |
| * Is p < q, accounting for wraparound? |
| * |
| * Since asyncQueueIsFull() blocks creation of a page that could precede any |
| * extant page, we need not assess entries within a page. |
| */ |
| static bool |
| asyncQueuePagePrecedes(int p, int q) |
| { |
| return asyncQueuePageDiff(p, q) < 0; |
| } |
| |
| /* |
| * Report space needed for our shared memory area |
| */ |
| Size |
| AsyncShmemSize(void) |
| { |
| Size size; |
| |
| /* This had better match AsyncShmemInit */ |
| size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); |
| size = add_size(size, offsetof(AsyncQueueControl, backend)); |
| |
| size = add_size(size, SimpleLruShmemSize(NUM_NOTIFY_BUFFERS, 0)); |
| |
| return size; |
| } |
| |
| /* |
| * Initialize our shared memory area |
| */ |
| void |
| AsyncShmemInit(void) |
| { |
| bool found; |
| Size size; |
| |
| /* |
| * Create or attach to the AsyncQueueControl structure. |
| * |
| * The used entries in the backend[] array run from 1 to MaxBackends; the |
| * zero'th entry is unused but must be allocated. |
| */ |
| size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); |
| size = add_size(size, offsetof(AsyncQueueControl, backend)); |
| |
| asyncQueueControl = (AsyncQueueControl *) |
| ShmemInitStruct("Async Queue Control", size, &found); |
| |
| if (!found) |
| { |
| /* First time through, so initialize it */ |
| SET_QUEUE_POS(QUEUE_HEAD, 0, 0); |
| SET_QUEUE_POS(QUEUE_TAIL, 0, 0); |
| QUEUE_STOP_PAGE = 0; |
| QUEUE_FIRST_LISTENER = InvalidBackendId; |
| asyncQueueControl->lastQueueFillWarn = 0; |
| /* zero'th entry won't be used, but let's initialize it anyway */ |
| for (int i = 0; i <= MaxBackends; i++) |
| { |
| QUEUE_BACKEND_PID(i) = InvalidPid; |
| QUEUE_BACKEND_DBOID(i) = InvalidOid; |
| QUEUE_NEXT_LISTENER(i) = InvalidBackendId; |
| SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); |
| } |
| } |
| |
| /* |
| * Set up SLRU management of the pg_notify data. |
| */ |
| NotifyCtl->PagePrecedes = asyncQueuePagePrecedes; |
| SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0, |
| NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER, |
| SYNC_HANDLER_NONE); |
| |
| if (!found) |
| { |
| /* |
| * During start or reboot, clean out the pg_notify directory. |
| */ |
| (void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL); |
| } |
| } |
| |
| |
| /* |
| * pg_notify - |
| * SQL function to send a notification event |
| */ |
| Datum |
| pg_notify(PG_FUNCTION_ARGS) |
| { |
| const char *channel; |
| const char *payload; |
| |
| if (PG_ARGISNULL(0)) |
| channel = ""; |
| else |
| channel = text_to_cstring(PG_GETARG_TEXT_PP(0)); |
| |
| if (PG_ARGISNULL(1)) |
| payload = ""; |
| else |
| payload = text_to_cstring(PG_GETARG_TEXT_PP(1)); |
| |
| /* For NOTIFY as a statement, this is checked in ProcessUtility */ |
| PreventCommandDuringRecovery("NOTIFY"); |
| |
| Async_Notify(channel, payload); |
| |
| PG_RETURN_VOID(); |
| } |
| |
| |
| /* |
| * Async_Notify |
| * |
| * This is executed by the SQL notify command. |
| * |
| * Adds the message to the list of pending notifies. |
| * Actual notification happens during transaction commit. |
| * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| */ |
| void |
| Async_Notify(const char *channel, const char *payload) |
| { |
| int my_level = GetCurrentTransactionNestLevel(); |
| size_t channel_len; |
| size_t payload_len; |
| Notification *n; |
| MemoryContext oldcontext; |
| |
| if (IsParallelWorker()) |
| elog(ERROR, "cannot send notifications from a parallel worker"); |
| |
| if (Trace_notify) |
| elog(DEBUG1, "Async_Notify(%s)", channel); |
| |
| channel_len = channel ? strlen(channel) : 0; |
| payload_len = payload ? strlen(payload) : 0; |
| |
| /* a channel name must be specified */ |
| if (channel_len == 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("channel name cannot be empty"))); |
| |
| /* enforce length limits */ |
| if (channel_len >= NAMEDATALEN) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("channel name too long"))); |
| |
| if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("payload string too long"))); |
| |
| /* |
| * We must construct the Notification entry, even if we end up not using |
| * it, in order to compare it cheaply to existing list entries. |
| * |
| * The notification list needs to live until end of transaction, so store |
| * it in the transaction context. |
| */ |
| oldcontext = MemoryContextSwitchTo(CurTransactionContext); |
| |
| n = (Notification *) palloc(offsetof(Notification, data) + |
| channel_len + payload_len + 2); |
| n->channel_len = channel_len; |
| n->payload_len = payload_len; |
| strcpy(n->data, channel); |
| if (payload) |
| strcpy(n->data + channel_len + 1, payload); |
| else |
| n->data[channel_len + 1] = '\0'; |
| |
| if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel) |
| { |
| NotificationList *notifies; |
| |
| /* |
| * First notify event in current (sub)xact. Note that we allocate the |
| * NotificationList in TopTransactionContext; the nestingLevel might |
| * get changed later by AtSubCommit_Notify. |
| */ |
| notifies = (NotificationList *) |
| MemoryContextAlloc(TopTransactionContext, |
| sizeof(NotificationList)); |
| notifies->nestingLevel = my_level; |
| notifies->events = list_make1(n); |
| /* We certainly don't need a hashtable yet */ |
| notifies->hashtab = NULL; |
| notifies->upper = pendingNotifies; |
| pendingNotifies = notifies; |
| } |
| else |
| { |
| /* Now check for duplicates */ |
| if (AsyncExistsPendingNotify(n)) |
| { |
| /* It's a dup, so forget it */ |
| pfree(n); |
| MemoryContextSwitchTo(oldcontext); |
| return; |
| } |
| |
| /* Append more events to existing list */ |
| AddEventToPendingNotifies(n); |
| } |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* |
| * queue_listen |
| * Common code for listen, unlisten, unlisten all commands. |
| * |
| * Adds the request to the list of pending actions. |
| * Actual update of the listenChannels list happens during transaction |
| * commit. |
| */ |
| static void |
| queue_listen(ListenActionKind action, const char *channel) |
| { |
| MemoryContext oldcontext; |
| ListenAction *actrec; |
| int my_level = GetCurrentTransactionNestLevel(); |
| |
| /* |
| * Unlike Async_Notify, we don't try to collapse out duplicates. It would |
| * be too complicated to ensure we get the right interactions of |
| * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there |
| * would be any performance benefit anyway in sane applications. |
| */ |
| oldcontext = MemoryContextSwitchTo(CurTransactionContext); |
| |
| /* space for terminating null is included in sizeof(ListenAction) */ |
| actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) + |
| strlen(channel) + 1); |
| actrec->action = action; |
| strcpy(actrec->channel, channel); |
| |
| if (pendingActions == NULL || my_level > pendingActions->nestingLevel) |
| { |
| ActionList *actions; |
| |
| /* |
| * First action in current sub(xact). Note that we allocate the |
| * ActionList in TopTransactionContext; the nestingLevel might get |
| * changed later by AtSubCommit_Notify. |
| */ |
| actions = (ActionList *) |
| MemoryContextAlloc(TopTransactionContext, sizeof(ActionList)); |
| actions->nestingLevel = my_level; |
| actions->actions = list_make1(actrec); |
| actions->upper = pendingActions; |
| pendingActions = actions; |
| } |
| else |
| pendingActions->actions = lappend(pendingActions->actions, actrec); |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* |
| * Async_Listen |
| * |
| * This is executed by the SQL listen command. |
| */ |
| void |
| Async_Listen(const char *channel) |
| { |
| if (Trace_notify) |
| elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid); |
| |
| queue_listen(LISTEN_LISTEN, channel); |
| } |
| |
| /* |
| * Async_Unlisten |
| * |
| * This is executed by the SQL unlisten command. |
| */ |
| void |
| Async_Unlisten(const char *channel) |
| { |
| if (Trace_notify) |
| elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid); |
| |
| /* If we couldn't possibly be listening, no need to queue anything */ |
| if (pendingActions == NULL && !unlistenExitRegistered) |
| return; |
| |
| queue_listen(LISTEN_UNLISTEN, channel); |
| } |
| |
| /* |
| * Async_UnlistenAll |
| * |
| * This is invoked by UNLISTEN * command, and also at backend exit. |
| */ |
| void |
| Async_UnlistenAll(void) |
| { |
| if (Trace_notify) |
| elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid); |
| |
| /* If we couldn't possibly be listening, no need to queue anything */ |
| if (pendingActions == NULL && !unlistenExitRegistered) |
| return; |
| |
| queue_listen(LISTEN_UNLISTEN_ALL, ""); |
| } |
| |
| /* |
| * SQL function: return a set of the channel names this backend is actively |
| * listening to. |
| * |
| * Note: this coding relies on the fact that the listenChannels list cannot |
| * change within a transaction. |
| */ |
| Datum |
| pg_listening_channels(PG_FUNCTION_ARGS) |
| { |
| FuncCallContext *funcctx; |
| |
| /* stuff done only on the first call of the function */ |
| if (SRF_IS_FIRSTCALL()) |
| { |
| /* create a function context for cross-call persistence */ |
| funcctx = SRF_FIRSTCALL_INIT(); |
| } |
| |
| /* stuff done on every call of the function */ |
| funcctx = SRF_PERCALL_SETUP(); |
| |
| if (funcctx->call_cntr < list_length(listenChannels)) |
| { |
| char *channel = (char *) list_nth(listenChannels, |
| funcctx->call_cntr); |
| |
| SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel)); |
| } |
| |
| SRF_RETURN_DONE(funcctx); |
| } |
| |
| /* |
| * Async_UnlistenOnExit |
| * |
| * This is executed at backend exit if we have done any LISTENs in this |
| * backend. It might not be necessary anymore, if the user UNLISTENed |
| * everything, but we don't try to detect that case. |
| */ |
| static void |
| Async_UnlistenOnExit(int code, Datum arg) |
| { |
| Exec_UnlistenAllCommit(); |
| asyncQueueUnregister(); |
| } |
| |
| /* |
| * AtPrepare_Notify |
| * |
| * This is called at the prepare phase of a two-phase |
| * transaction. Save the state for possible commit later. |
| */ |
| void |
| AtPrepare_Notify(void) |
| { |
| /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */ |
| if (pendingActions || pendingNotifies) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY"))); |
| } |
| |
| /* |
| * PreCommit_Notify |
| * |
| * This is called at transaction commit, before actually committing to |
| * clog. |
| * |
| * If there are pending LISTEN actions, make sure we are listed in the |
| * shared-memory listener array. This must happen before commit to |
| * ensure we don't miss any notifies from transactions that commit |
| * just after ours. |
| * |
| * If there are outbound notify requests in the pendingNotifies list, |
| * add them to the global queue. We do that before commit so that |
| * we can still throw error if we run out of queue space. |
| */ |
| void |
| PreCommit_Notify(void) |
| { |
| ListCell *p; |
| |
| if (!pendingActions && !pendingNotifies) |
| return; /* no relevant statements in this xact */ |
| |
| if (Trace_notify) |
| elog(DEBUG1, "PreCommit_Notify"); |
| |
| /* Preflight for any pending listen/unlisten actions */ |
| if (pendingActions != NULL) |
| { |
| foreach(p, pendingActions->actions) |
| { |
| ListenAction *actrec = (ListenAction *) lfirst(p); |
| |
| switch (actrec->action) |
| { |
| case LISTEN_LISTEN: |
| Exec_ListenPreCommit(); |
| break; |
| case LISTEN_UNLISTEN: |
| /* there is no Exec_UnlistenPreCommit() */ |
| break; |
| case LISTEN_UNLISTEN_ALL: |
| /* there is no Exec_UnlistenAllPreCommit() */ |
| break; |
| } |
| } |
| } |
| |
| /* Queue any pending notifies (must happen after the above) */ |
| if (pendingNotifies) |
| { |
| ListCell *nextNotify; |
| |
| /* |
| * Make sure that we have an XID assigned to the current transaction. |
| * GetCurrentTransactionId is cheap if we already have an XID, but not |
| * so cheap if we don't, and we'd prefer not to do that work while |
| * holding NotifyQueueLock. |
| */ |
| (void) GetCurrentTransactionId(); |
| |
| /* |
| * Serialize writers by acquiring a special lock that we hold till |
| * after commit. This ensures that queue entries appear in commit |
| * order, and in particular that there are never uncommitted queue |
| * entries ahead of committed ones, so an uncommitted transaction |
| * can't block delivery of deliverable notifications. |
| * |
| * We use a heavyweight lock so that it'll automatically be released |
| * after either commit or abort. This also allows deadlocks to be |
| * detected, though really a deadlock shouldn't be possible here. |
| * |
| * The lock is on "database 0", which is pretty ugly but it doesn't |
| * seem worth inventing a special locktag category just for this. |
| * (Historical note: before PG 9.0, a similar lock on "database 0" was |
| * used by the flatfiles mechanism.) |
| */ |
| LockSharedObject(DatabaseRelationId, InvalidOid, 0, |
| AccessExclusiveLock); |
| |
| /* Now push the notifications into the queue */ |
| nextNotify = list_head(pendingNotifies->events); |
| while (nextNotify != NULL) |
| { |
| /* |
| * Add the pending notifications to the queue. We acquire and |
| * release NotifyQueueLock once per page, which might be overkill |
| * but it does allow readers to get in while we're doing this. |
| * |
| * A full queue is very uncommon and should really not happen, |
| * given that we have so much space available in the SLRU pages. |
| * Nevertheless we need to deal with this possibility. Note that |
| * when we get here we are in the process of committing our |
| * transaction, but we have not yet committed to clog, so at this |
| * point in time we can still roll the transaction back. |
| */ |
| LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); |
| asyncQueueFillWarning(); |
| if (asyncQueueIsFull()) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| errmsg("too many notifications in the NOTIFY queue"))); |
| nextNotify = asyncQueueAddEntries(nextNotify); |
| LWLockRelease(NotifyQueueLock); |
| } |
| |
| /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */ |
| } |
| } |
| |
| /* |
| * AtCommit_Notify |
| * |
| * This is called at transaction commit, after committing to clog. |
| * |
| * Update listenChannels and clear transaction-local state. |
| * |
| * If we issued any notifications in the transaction, send signals to |
| * listening backends (possibly including ourselves) to process them. |
| * Also, if we filled enough queue pages with new notifies, try to |
| * advance the queue tail pointer. |
| */ |
| void |
| AtCommit_Notify(void) |
| { |
| ListCell *p; |
| |
| /* |
| * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to |
| * return as soon as possible |
| */ |
| if (!pendingActions && !pendingNotifies) |
| return; |
| |
| if (Trace_notify) |
| elog(DEBUG1, "AtCommit_Notify"); |
| |
| /* Perform any pending listen/unlisten actions */ |
| if (pendingActions != NULL) |
| { |
| foreach(p, pendingActions->actions) |
| { |
| ListenAction *actrec = (ListenAction *) lfirst(p); |
| |
| switch (actrec->action) |
| { |
| case LISTEN_LISTEN: |
| Exec_ListenCommit(actrec->channel); |
| break; |
| case LISTEN_UNLISTEN: |
| Exec_UnlistenCommit(actrec->channel); |
| break; |
| case LISTEN_UNLISTEN_ALL: |
| Exec_UnlistenAllCommit(); |
| break; |
| } |
| } |
| } |
| |
| /* If no longer listening to anything, get out of listener array */ |
| if (amRegisteredListener && listenChannels == NIL) |
| asyncQueueUnregister(); |
| |
| /* |
| * Send signals to listening backends. We need do this only if there are |
| * pending notifies, which were previously added to the shared queue by |
| * PreCommit_Notify(). |
| */ |
| if (pendingNotifies != NULL) |
| SignalBackends(); |
| |
| /* |
| * If it's time to try to advance the global tail pointer, do that. |
| * |
| * (It might seem odd to do this in the sender, when more than likely the |
| * listeners won't yet have read the messages we just sent. However, |
| * there's less contention if only the sender does it, and there is little |
| * need for urgency in advancing the global tail. So this typically will |
| * be clearing out messages that were sent some time ago.) |
| */ |
| if (tryAdvanceTail) |
| { |
| tryAdvanceTail = false; |
| asyncQueueAdvanceTail(); |
| } |
| |
| /* And clean up */ |
| ClearPendingActionsAndNotifies(); |
| } |
| |
| /* |
| * Exec_ListenPreCommit --- subroutine for PreCommit_Notify |
| * |
| * This function must make sure we are ready to catch any incoming messages. |
| */ |
| static void |
| Exec_ListenPreCommit(void) |
| { |
| QueuePosition head; |
| QueuePosition max; |
| BackendId prevListener; |
| |
| /* |
| * Nothing to do if we are already listening to something, nor if we |
| * already ran this routine in this transaction. |
| */ |
| if (amRegisteredListener) |
| return; |
| |
| if (Trace_notify) |
| elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid); |
| |
| /* |
| * Before registering, make sure we will unlisten before dying. (Note: |
| * this action does not get undone if we abort later.) |
| */ |
| if (!unlistenExitRegistered) |
| { |
| before_shmem_exit(Async_UnlistenOnExit, 0); |
| unlistenExitRegistered = true; |
| } |
| |
| /* |
| * This is our first LISTEN, so establish our pointer. |
| * |
| * We set our pointer to the global tail pointer and then move it forward |
| * over already-committed notifications. This ensures we cannot miss any |
| * not-yet-committed notifications. We might get a few more but that |
| * doesn't hurt. |
| * |
| * In some scenarios there might be a lot of committed notifications that |
| * have not yet been pruned away (because some backend is being lazy about |
| * reading them). To reduce our startup time, we can look at other |
| * backends and adopt the maximum "pos" pointer of any backend that's in |
| * our database; any notifications it's already advanced over are surely |
| * committed and need not be re-examined by us. (We must consider only |
| * backends connected to our DB, because others will not have bothered to |
| * check committed-ness of notifications in our DB.) |
| * |
| * We need exclusive lock here so we can look at other backends' entries |
| * and manipulate the list links. |
| */ |
| LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); |
| head = QUEUE_HEAD; |
| max = QUEUE_TAIL; |
| prevListener = InvalidBackendId; |
| for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) |
| { |
| if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) |
| max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i)); |
| /* Also find last listening backend before this one */ |
| if (i < MyBackendId) |
| prevListener = i; |
| } |
| QUEUE_BACKEND_POS(MyBackendId) = max; |
| QUEUE_BACKEND_PID(MyBackendId) = MyProcPid; |
| QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId; |
| /* Insert backend into list of listeners at correct position */ |
| if (prevListener > 0) |
| { |
| QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_NEXT_LISTENER(prevListener); |
| QUEUE_NEXT_LISTENER(prevListener) = MyBackendId; |
| } |
| else |
| { |
| QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER; |
| QUEUE_FIRST_LISTENER = MyBackendId; |
| } |
| LWLockRelease(NotifyQueueLock); |
| |
| /* Now we are listed in the global array, so remember we're listening */ |
| amRegisteredListener = true; |
| |
| /* |
| * Try to move our pointer forward as far as possible. This will skip |
| * over already-committed notifications, which we want to do because they |
| * might be quite stale. Note that we are not yet listening on anything, |
| * so we won't deliver such notifications to our frontend. Also, although |
| * our transaction might have executed NOTIFY, those message(s) aren't |
| * queued yet so we won't skip them here. |
| */ |
| if (!QUEUE_POS_EQUAL(max, head)) |
| asyncQueueReadAllNotifications(); |
| } |
| |
| /* |
| * Exec_ListenCommit --- subroutine for AtCommit_Notify |
| * |
| * Add the channel to the list of channels we are listening on. |
| */ |
| static void |
| Exec_ListenCommit(const char *channel) |
| { |
| MemoryContext oldcontext; |
| |
| /* Do nothing if we are already listening on this channel */ |
| if (IsListeningOn(channel)) |
| return; |
| |
| /* |
| * Add the new channel name to listenChannels. |
| * |
| * XXX It is theoretically possible to get an out-of-memory failure here, |
| * which would be bad because we already committed. For the moment it |
| * doesn't seem worth trying to guard against that, but maybe improve this |
| * later. |
| */ |
| oldcontext = MemoryContextSwitchTo(TopMemoryContext); |
| listenChannels = lappend(listenChannels, pstrdup(channel)); |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* |
| * Exec_UnlistenCommit --- subroutine for AtCommit_Notify |
| * |
| * Remove the specified channel name from listenChannels. |
| */ |
| static void |
| Exec_UnlistenCommit(const char *channel) |
| { |
| ListCell *q; |
| |
| if (Trace_notify) |
| elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid); |
| |
| foreach(q, listenChannels) |
| { |
| char *lchan = (char *) lfirst(q); |
| |
| if (strcmp(lchan, channel) == 0) |
| { |
| listenChannels = foreach_delete_current(listenChannels, q); |
| pfree(lchan); |
| break; |
| } |
| } |
| |
| /* |
| * We do not complain about unlistening something not being listened; |
| * should we? |
| */ |
| } |
| |
| /* |
| * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify |
| * |
| * Unlisten on all channels for this backend. |
| */ |
| static void |
| Exec_UnlistenAllCommit(void) |
| { |
| if (Trace_notify) |
| elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid); |
| |
| list_free_deep(listenChannels); |
| listenChannels = NIL; |
| } |
| |
| /* |
| * ProcessCompletedNotifies --- nowadays this does nothing |
| * |
| * This routine used to send signals and handle self-notifies, |
| * but that functionality has been moved elsewhere. |
| * We'd delete it entirely, except that the documentation used to instruct |
| * background-worker authors to call it. To avoid an ABI break in stable |
| * branches, keep it as a no-op routine. |
| */ |
| void |
| ProcessCompletedNotifies(void) |
| { |
| } |
| |
| /* |
| * Test whether we are actively listening on the given channel name. |
| * |
| * Note: this function is executed for every notification found in the queue. |
| * Perhaps it is worth further optimization, eg convert the list to a sorted |
| * array so we can binary-search it. In practice the list is likely to be |
| * fairly short, though. |
| */ |
| static bool |
| IsListeningOn(const char *channel) |
| { |
| ListCell *p; |
| |
| foreach(p, listenChannels) |
| { |
| char *lchan = (char *) lfirst(p); |
| |
| if (strcmp(lchan, channel) == 0) |
| return true; |
| } |
| return false; |
| } |
| |
| /* |
| * Remove our entry from the listeners array when we are no longer listening |
| * on any channel. NB: must not fail if we're already not listening. |
| */ |
| static void |
| asyncQueueUnregister(void) |
| { |
| Assert(listenChannels == NIL); /* else caller error */ |
| |
| if (!amRegisteredListener) /* nothing to do */ |
| return; |
| |
| /* |
| * Need exclusive lock here to manipulate list links. |
| */ |
| LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); |
| /* Mark our entry as invalid */ |
| QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; |
| QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid; |
| /* and remove it from the list */ |
| if (QUEUE_FIRST_LISTENER == MyBackendId) |
| QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyBackendId); |
| else |
| { |
| for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) |
| { |
| if (QUEUE_NEXT_LISTENER(i) == MyBackendId) |
| { |
| QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyBackendId); |
| break; |
| } |
| } |
| } |
| QUEUE_NEXT_LISTENER(MyBackendId) = InvalidBackendId; |
| LWLockRelease(NotifyQueueLock); |
| |
| /* mark ourselves as no longer listed in the global array */ |
| amRegisteredListener = false; |
| } |
| |
| /* |
| * Test whether there is room to insert more notification messages. |
| * |
| * Caller must hold at least shared NotifyQueueLock. |
| */ |
| static bool |
| asyncQueueIsFull(void) |
| { |
| int nexthead; |
| int boundary; |
| |
| /* |
| * The queue is full if creating a new head page would create a page that |
| * logically precedes the current global tail pointer, ie, the head |
| * pointer would wrap around compared to the tail. We cannot create such |
| * a head page for fear of confusing slru.c. For safety we round the tail |
| * pointer back to a segment boundary (truncation logic in |
| * asyncQueueAdvanceTail does not do this, so doing it here is optional). |
| * |
| * Note that this test is *not* dependent on how much space there is on |
| * the current head page. This is necessary because asyncQueueAddEntries |
| * might try to create the next head page in any case. |
| */ |
| nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1; |
| if (nexthead > QUEUE_MAX_PAGE) |
| nexthead = 0; /* wrap around */ |
| boundary = QUEUE_STOP_PAGE; |
| boundary -= boundary % SLRU_PAGES_PER_SEGMENT; |
| return asyncQueuePagePrecedes(nexthead, boundary); |
| } |
| |
| /* |
| * Advance the QueuePosition to the next entry, assuming that the current |
| * entry is of length entryLength. If we jump to a new page the function |
| * returns true, else false. |
| */ |
| static bool |
| asyncQueueAdvance(volatile QueuePosition *position, int entryLength) |
| { |
| int pageno = QUEUE_POS_PAGE(*position); |
| int offset = QUEUE_POS_OFFSET(*position); |
| bool pageJump = false; |
| |
| /* |
| * Move to the next writing position: First jump over what we have just |
| * written or read. |
| */ |
| offset += entryLength; |
| Assert(offset <= QUEUE_PAGESIZE); |
| |
| /* |
| * In a second step check if another entry can possibly be written to the |
| * page. If so, stay here, we have reached the next position. If not, then |
| * we need to move on to the next page. |
| */ |
| if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE) |
| { |
| pageno++; |
| if (pageno > QUEUE_MAX_PAGE) |
| pageno = 0; /* wrap around */ |
| offset = 0; |
| pageJump = true; |
| } |
| |
| SET_QUEUE_POS(*position, pageno, offset); |
| return pageJump; |
| } |
| |
| /* |
| * Fill the AsyncQueueEntry at *qe with an outbound notification message. |
| */ |
| static void |
| asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) |
| { |
| size_t channellen = n->channel_len; |
| size_t payloadlen = n->payload_len; |
| int entryLength; |
| |
| Assert(channellen < NAMEDATALEN); |
| Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH); |
| |
| /* The terminators are already included in AsyncQueueEntryEmptySize */ |
| entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen; |
| entryLength = QUEUEALIGN(entryLength); |
| qe->length = entryLength; |
| qe->dboid = MyDatabaseId; |
| qe->xid = GetCurrentTransactionId(); |
| qe->srcPid = MyProcPid; |
| memcpy(qe->data, n->data, channellen + payloadlen + 2); |
| } |
| |
| /* |
| * Add pending notifications to the queue. |
| * |
| * We go page by page here, i.e. we stop once we have to go to a new page but |
| * we will be called again and then fill that next page. If an entry does not |
| * fit into the current page, we write a dummy entry with an InvalidOid as the |
| * database OID in order to fill the page. So every page is always used up to |
| * the last byte which simplifies reading the page later. |
| * |
| * We are passed the list cell (in pendingNotifies->events) containing the next |
| * notification to write and return the first still-unwritten cell back. |
| * Eventually we will return NULL indicating all is done. |
| * |
| * We are holding NotifyQueueLock already from the caller and grab |
| * NotifySLRULock locally in this function. |
| */ |
| static ListCell * |
| asyncQueueAddEntries(ListCell *nextNotify) |
| { |
| AsyncQueueEntry qe; |
| QueuePosition queue_head; |
| int pageno; |
| int offset; |
| int slotno; |
| |
| /* We hold both NotifyQueueLock and NotifySLRULock during this operation */ |
| LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE); |
| |
| /* |
| * We work with a local copy of QUEUE_HEAD, which we write back to shared |
| * memory upon exiting. The reason for this is that if we have to advance |
| * to a new page, SimpleLruZeroPage might fail (out of disk space, for |
| * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise, |
| * subsequent insertions would try to put entries into a page that slru.c |
| * thinks doesn't exist yet.) So, use a local position variable. Note |
| * that if we do fail, any already-inserted queue entries are forgotten; |
| * this is okay, since they'd be useless anyway after our transaction |
| * rolls back. |
| */ |
| queue_head = QUEUE_HEAD; |
| |
| /* |
| * If this is the first write since the postmaster started, we need to |
| * initialize the first page of the async SLRU. Otherwise, the current |
| * page should be initialized already, so just fetch it. |
| * |
| * (We could also take the first path when the SLRU position has just |
| * wrapped around, but re-zeroing the page is harmless in that case.) |
| */ |
| pageno = QUEUE_POS_PAGE(queue_head); |
| if (QUEUE_POS_IS_ZERO(queue_head)) |
| slotno = SimpleLruZeroPage(NotifyCtl, pageno); |
| else |
| slotno = SimpleLruReadPage(NotifyCtl, pageno, true, |
| InvalidTransactionId); |
| |
| /* Note we mark the page dirty before writing in it */ |
| NotifyCtl->shared->page_dirty[slotno] = true; |
| |
| while (nextNotify != NULL) |
| { |
| Notification *n = (Notification *) lfirst(nextNotify); |
| |
| /* Construct a valid queue entry in local variable qe */ |
| asyncQueueNotificationToEntry(n, &qe); |
| |
| offset = QUEUE_POS_OFFSET(queue_head); |
| |
| /* Check whether the entry really fits on the current page */ |
| if (offset + qe.length <= QUEUE_PAGESIZE) |
| { |
| /* OK, so advance nextNotify past this item */ |
| nextNotify = lnext(pendingNotifies->events, nextNotify); |
| } |
| else |
| { |
| /* |
| * Write a dummy entry to fill up the page. Actually readers will |
| * only check dboid and since it won't match any reader's database |
| * OID, they will ignore this entry and move on. |
| */ |
| qe.length = QUEUE_PAGESIZE - offset; |
| qe.dboid = InvalidOid; |
| qe.data[0] = '\0'; /* empty channel */ |
| qe.data[1] = '\0'; /* empty payload */ |
| } |
| |
| /* Now copy qe into the shared buffer page */ |
| memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, |
| &qe, |
| qe.length); |
| |
| /* Advance queue_head appropriately, and detect if page is full */ |
| if (asyncQueueAdvance(&(queue_head), qe.length)) |
| { |
| /* |
| * Page is full, so we're done here, but first fill the next page |
| * with zeroes. The reason to do this is to ensure that slru.c's |
| * idea of the head page is always the same as ours, which avoids |
| * boundary problems in SimpleLruTruncate. The test in |
| * asyncQueueIsFull() ensured that there is room to create this |
| * page without overrunning the queue. |
| */ |
| slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); |
| |
| /* |
| * If the new page address is a multiple of QUEUE_CLEANUP_DELAY, |
| * set flag to remember that we should try to advance the tail |
| * pointer (we don't want to actually do that right here). |
| */ |
| if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) |
| tryAdvanceTail = true; |
| |
| /* And exit the loop */ |
| break; |
| } |
| } |
| |
| /* Success, so update the global QUEUE_HEAD */ |
| QUEUE_HEAD = queue_head; |
| |
| LWLockRelease(NotifySLRULock); |
| |
| return nextNotify; |
| } |
| |
| /* |
| * SQL function to return the fraction of the notification queue currently |
| * occupied. |
| */ |
| Datum |
| pg_notification_queue_usage(PG_FUNCTION_ARGS) |
| { |
| double usage; |
| |
| /* Advance the queue tail so we don't report a too-large result */ |
| asyncQueueAdvanceTail(); |
| |
| LWLockAcquire(NotifyQueueLock, LW_SHARED); |
| usage = asyncQueueUsage(); |
| LWLockRelease(NotifyQueueLock); |
| |
| PG_RETURN_FLOAT8(usage); |
| } |
| |
| /* |
| * Return the fraction of the queue that is currently occupied. |
| * |
| * The caller must hold NotifyQueueLock in (at least) shared mode. |
| * |
| * Note: we measure the distance to the logical tail page, not the physical |
| * tail page. In some sense that's wrong, but the relative position of the |
| * physical tail is affected by details such as SLRU segment boundaries, |
| * so that a result based on that is unpleasantly unstable. |
| */ |
| static double |
| asyncQueueUsage(void) |
| { |
| int headPage = QUEUE_POS_PAGE(QUEUE_HEAD); |
| int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); |
| int occupied; |
| |
| occupied = headPage - tailPage; |
| |
| if (occupied == 0) |
| return (double) 0; /* fast exit for common case */ |
| |
| if (occupied < 0) |
| { |
| /* head has wrapped around, tail not yet */ |
| occupied += QUEUE_MAX_PAGE + 1; |
| } |
| |
| return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2); |
| } |
| |
| /* |
| * Check whether the queue is at least half full, and emit a warning if so. |
| * |
| * This is unlikely given the size of the queue, but possible. |
| * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL. |
| * |
| * Caller must hold exclusive NotifyQueueLock. |
| */ |
| static void |
| asyncQueueFillWarning(void) |
| { |
| double fillDegree; |
| TimestampTz t; |
| |
| fillDegree = asyncQueueUsage(); |
| if (fillDegree < 0.5) |
| return; |
| |
| t = GetCurrentTimestamp(); |
| |
| if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn, |
| t, QUEUE_FULL_WARN_INTERVAL)) |
| { |
| QueuePosition min = QUEUE_HEAD; |
| int32 minPid = InvalidPid; |
| |
| for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) |
| { |
| Assert(QUEUE_BACKEND_PID(i) != InvalidPid); |
| min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); |
| if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i))) |
| minPid = QUEUE_BACKEND_PID(i); |
| } |
| |
| ereport(WARNING, |
| (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100), |
| (minPid != InvalidPid ? |
| errdetail("The server process with PID %d is among those with the oldest transactions.", minPid) |
| : 0), |
| (minPid != InvalidPid ? |
| errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.") |
| : 0))); |
| |
| asyncQueueControl->lastQueueFillWarn = t; |
| } |
| } |
| |
| /* |
| * Send signals to listening backends. |
| * |
| * Normally we signal only backends in our own database, since only those |
| * backends could be interested in notifies we send. However, if there's |
| * notify traffic in our database but no traffic in another database that |
| * does have listener(s), those listeners will fall further and further |
| * behind. Waken them anyway if they're far enough behind, so that they'll |
| * advance their queue position pointers, allowing the global tail to advance. |
| * |
| * Since we know the BackendId and the Pid the signaling is quite cheap. |
| * |
| * This is called during CommitTransaction(), so it's important for it |
| * to have very low probability of failure. |
| */ |
| static void |
| SignalBackends(void) |
| { |
| int32 *pids; |
| BackendId *ids; |
| int count; |
| |
| /* |
| * Identify backends that we need to signal. We don't want to send |
| * signals while holding the NotifyQueueLock, so this loop just builds a |
| * list of target PIDs. |
| * |
| * XXX in principle these pallocs could fail, which would be bad. Maybe |
| * preallocate the arrays? They're not that large, though. |
| */ |
| pids = (int32 *) palloc(MaxBackends * sizeof(int32)); |
| ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId)); |
| count = 0; |
| |
| LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); |
| for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) |
| { |
| int32 pid = QUEUE_BACKEND_PID(i); |
| QueuePosition pos; |
| |
| Assert(pid != InvalidPid); |
| pos = QUEUE_BACKEND_POS(i); |
| if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) |
| { |
| /* |
| * Always signal listeners in our own database, unless they're |
| * already caught up (unlikely, but possible). |
| */ |
| if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) |
| continue; |
| } |
| else |
| { |
| /* |
| * Listeners in other databases should be signaled only if they |
| * are far behind. |
| */ |
| if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), |
| QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY) |
| continue; |
| } |
| /* OK, need to signal this one */ |
| pids[count] = pid; |
| ids[count] = i; |
| count++; |
| } |
| LWLockRelease(NotifyQueueLock); |
| |
| /* Now send signals */ |
| for (int i = 0; i < count; i++) |
| { |
| int32 pid = pids[i]; |
| |
| /* |
| * If we are signaling our own process, no need to involve the kernel; |
| * just set the flag directly. |
| */ |
| if (pid == MyProcPid) |
| { |
| notifyInterruptPending = true; |
| continue; |
| } |
| |
| /* |
| * Note: assuming things aren't broken, a signal failure here could |
| * only occur if the target backend exited since we released |
| * NotifyQueueLock; which is unlikely but certainly possible. So we |
| * just log a low-level debug message if it happens. |
| */ |
| if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0) |
| elog(DEBUG3, "could not signal backend with PID %d: %m", pid); |
| } |
| |
| pfree(pids); |
| pfree(ids); |
| } |
| |
| /* |
| * AtAbort_Notify |
| * |
| * This is called at transaction abort. |
| * |
| * Gets rid of pending actions and outbound notifies that we would have |
| * executed if the transaction got committed. |
| */ |
| void |
| AtAbort_Notify(void) |
| { |
| /* |
| * If we LISTEN but then roll back the transaction after PreCommit_Notify, |
| * we have registered as a listener but have not made any entry in |
| * listenChannels. In that case, deregister again. |
| */ |
| if (amRegisteredListener && listenChannels == NIL) |
| asyncQueueUnregister(); |
| |
| /* And clean up */ |
| ClearPendingActionsAndNotifies(); |
| } |
| |
| /* |
| * AtSubCommit_Notify() --- Take care of subtransaction commit. |
| * |
| * Reassign all items in the pending lists to the parent transaction. |
| */ |
| void |
| AtSubCommit_Notify(void) |
| { |
| int my_level = GetCurrentTransactionNestLevel(); |
| |
| /* If there are actions at our nesting level, we must reparent them. */ |
| if (pendingActions != NULL && |
| pendingActions->nestingLevel >= my_level) |
| { |
| if (pendingActions->upper == NULL || |
| pendingActions->upper->nestingLevel < my_level - 1) |
| { |
| /* nothing to merge; give the whole thing to the parent */ |
| --pendingActions->nestingLevel; |
| } |
| else |
| { |
| ActionList *childPendingActions = pendingActions; |
| |
| pendingActions = pendingActions->upper; |
| |
| /* |
| * Mustn't try to eliminate duplicates here --- see queue_listen() |
| */ |
| pendingActions->actions = |
| list_concat(pendingActions->actions, |
| childPendingActions->actions); |
| pfree(childPendingActions); |
| } |
| } |
| |
| /* If there are notifies at our nesting level, we must reparent them. */ |
| if (pendingNotifies != NULL && |
| pendingNotifies->nestingLevel >= my_level) |
| { |
| Assert(pendingNotifies->nestingLevel == my_level); |
| |
| if (pendingNotifies->upper == NULL || |
| pendingNotifies->upper->nestingLevel < my_level - 1) |
| { |
| /* nothing to merge; give the whole thing to the parent */ |
| --pendingNotifies->nestingLevel; |
| } |
| else |
| { |
| /* |
| * Formerly, we didn't bother to eliminate duplicates here, but |
| * now we must, else we fall foul of "Assert(!found)", either here |
| * or during a later attempt to build the parent-level hashtable. |
| */ |
| NotificationList *childPendingNotifies = pendingNotifies; |
| ListCell *l; |
| |
| pendingNotifies = pendingNotifies->upper; |
| /* Insert all the subxact's events into parent, except for dups */ |
| foreach(l, childPendingNotifies->events) |
| { |
| Notification *childn = (Notification *) lfirst(l); |
| |
| if (!AsyncExistsPendingNotify(childn)) |
| AddEventToPendingNotifies(childn); |
| } |
| pfree(childPendingNotifies); |
| } |
| } |
| } |
| |
| /* |
| * AtSubAbort_Notify() --- Take care of subtransaction abort. |
| */ |
| void |
| AtSubAbort_Notify(void) |
| { |
| int my_level = GetCurrentTransactionNestLevel(); |
| |
| /* |
| * All we have to do is pop the stack --- the actions/notifies made in |
| * this subxact are no longer interesting, and the space will be freed |
| * when CurTransactionContext is recycled. We still have to free the |
| * ActionList and NotificationList objects themselves, though, because |
| * those are allocated in TopTransactionContext. |
| * |
| * Note that there might be no entries at all, or no entries for the |
| * current subtransaction level, either because none were ever created, or |
| * because we reentered this routine due to trouble during subxact abort. |
| */ |
| while (pendingActions != NULL && |
| pendingActions->nestingLevel >= my_level) |
| { |
| ActionList *childPendingActions = pendingActions; |
| |
| pendingActions = pendingActions->upper; |
| pfree(childPendingActions); |
| } |
| |
| while (pendingNotifies != NULL && |
| pendingNotifies->nestingLevel >= my_level) |
| { |
| NotificationList *childPendingNotifies = pendingNotifies; |
| |
| pendingNotifies = pendingNotifies->upper; |
| pfree(childPendingNotifies); |
| } |
| } |
| |
| /* |
| * HandleNotifyInterrupt |
| * |
| * Signal handler portion of interrupt handling. Let the backend know |
| * that there's a pending notify interrupt. If we're currently reading |
| * from the client, this will interrupt the read and |
| * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt(). |
| */ |
| void |
| HandleNotifyInterrupt(void) |
| { |
| /* |
| * Note: this is called by a SIGNAL HANDLER. You must be very wary what |
| * you do here. |
| */ |
| |
| /* signal that work needs to be done */ |
| notifyInterruptPending = true; |
| |
| /* make sure the event is processed in due course */ |
| SetLatch(MyLatch); |
| } |
| |
| /* |
| * ProcessNotifyInterrupt |
| * |
| * This is called if we see notifyInterruptPending set, just before |
| * transmitting ReadyForQuery at the end of a frontend command, and |
| * also if a notify signal occurs while reading from the frontend. |
| * HandleNotifyInterrupt() will cause the read to be interrupted |
| * via the process's latch, and this routine will get called. |
| * If we are truly idle (ie, *not* inside a transaction block), |
| * process the incoming notifies. |
| * |
| * If "flush" is true, force any frontend messages out immediately. |
| * This can be false when being called at the end of a frontend command, |
| * since we'll flush after sending ReadyForQuery. |
| */ |
| void |
| ProcessNotifyInterrupt(bool flush) |
| { |
| if (IsTransactionOrTransactionBlock()) |
| return; /* not really idle */ |
| |
| /* Loop in case another signal arrives while sending messages */ |
| while (notifyInterruptPending) |
| ProcessIncomingNotify(flush); |
| } |
| |
| |
| /* |
| * Read all pending notifications from the queue, and deliver appropriate |
| * ones to my frontend. Stop when we reach queue head or an uncommitted |
| * notification. |
| */ |
| static void |
| asyncQueueReadAllNotifications(void) |
| { |
| volatile QueuePosition pos; |
| QueuePosition head; |
| Snapshot snapshot; |
| |
| /* page_buffer must be adequately aligned, so use a union */ |
| union |
| { |
| char buf[QUEUE_PAGESIZE]; |
| AsyncQueueEntry align; |
| } page_buffer; |
| |
| /* Fetch current state */ |
| LWLockAcquire(NotifyQueueLock, LW_SHARED); |
| /* Assert checks that we have a valid state entry */ |
| Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId)); |
| pos = QUEUE_BACKEND_POS(MyBackendId); |
| head = QUEUE_HEAD; |
| LWLockRelease(NotifyQueueLock); |
| |
| if (QUEUE_POS_EQUAL(pos, head)) |
| { |
| /* Nothing to do, we have read all notifications already. */ |
| return; |
| } |
| |
| /*---------- |
| * Get snapshot we'll use to decide which xacts are still in progress. |
| * This is trickier than it might seem, because of race conditions. |
| * Consider the following example: |
| * |
| * Backend 1: Backend 2: |
| * |
| * transaction starts |
| * UPDATE foo SET ...; |
| * NOTIFY foo; |
| * commit starts |
| * queue the notify message |
| * transaction starts |
| * LISTEN foo; -- first LISTEN in session |
| * SELECT * FROM foo WHERE ...; |
| * commit to clog |
| * commit starts |
| * add backend 2 to array of listeners |
| * advance to queue head (this code) |
| * commit to clog |
| * |
| * Transaction 2's SELECT has not seen the UPDATE's effects, since that |
| * wasn't committed yet. Ideally we'd ensure that client 2 would |
| * eventually get transaction 1's notify message, but there's no way |
| * to do that; until we're in the listener array, there's no guarantee |
| * that the notify message doesn't get removed from the queue. |
| * |
| * Therefore the coding technique transaction 2 is using is unsafe: |
| * applications must commit a LISTEN before inspecting database state, |
| * if they want to ensure they will see notifications about subsequent |
| * changes to that state. |
| * |
| * What we do guarantee is that we'll see all notifications from |
| * transactions committing after the snapshot we take here. |
| * Exec_ListenPreCommit has already added us to the listener array, |
| * so no not-yet-committed messages can be removed from the queue |
| * before we see them. |
| *---------- |
| */ |
| snapshot = RegisterSnapshot(GetLatestSnapshot()); |
| |
| /* |
| * It is possible that we fail while trying to send a message to our |
| * frontend (for example, because of encoding conversion failure). If |
| * that happens it is critical that we not try to send the same message |
| * over and over again. Therefore, we place a PG_TRY block here that will |
| * forcibly advance our queue position before we lose control to an error. |
| * (We could alternatively retake NotifyQueueLock and move the position |
| * before handling each individual message, but that seems like too much |
| * lock traffic.) |
| */ |
| PG_TRY(); |
| { |
| bool reachedStop; |
| |
| do |
| { |
| int curpage = QUEUE_POS_PAGE(pos); |
| int curoffset = QUEUE_POS_OFFSET(pos); |
| int slotno; |
| int copysize; |
| |
| /* |
| * We copy the data from SLRU into a local buffer, so as to avoid |
| * holding the NotifySLRULock while we are examining the entries |
| * and possibly transmitting them to our frontend. Copy only the |
| * part of the page we will actually inspect. |
| */ |
| slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, |
| InvalidTransactionId); |
| if (curpage == QUEUE_POS_PAGE(head)) |
| { |
| /* we only want to read as far as head */ |
| copysize = QUEUE_POS_OFFSET(head) - curoffset; |
| if (copysize < 0) |
| copysize = 0; /* just for safety */ |
| } |
| else |
| { |
| /* fetch all the rest of the page */ |
| copysize = QUEUE_PAGESIZE - curoffset; |
| } |
| memcpy(page_buffer.buf + curoffset, |
| NotifyCtl->shared->page_buffer[slotno] + curoffset, |
| copysize); |
| /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ |
| LWLockRelease(NotifySLRULock); |
| |
| /* |
| * Process messages up to the stop position, end of page, or an |
| * uncommitted message. |
| * |
| * Our stop position is what we found to be the head's position |
| * when we entered this function. It might have changed already. |
| * But if it has, we will receive (or have already received and |
| * queued) another signal and come here again. |
| * |
| * We are not holding NotifyQueueLock here! The queue can only |
| * extend beyond the head pointer (see above) and we leave our |
| * backend's pointer where it is so nobody will truncate or |
| * rewrite pages under us. Especially we don't want to hold a lock |
| * while sending the notifications to the frontend. |
| */ |
| reachedStop = asyncQueueProcessPageEntries(&pos, head, |
| page_buffer.buf, |
| snapshot); |
| } while (!reachedStop); |
| } |
| PG_FINALLY(); |
| { |
| /* Update shared state */ |
| LWLockAcquire(NotifyQueueLock, LW_SHARED); |
| QUEUE_BACKEND_POS(MyBackendId) = pos; |
| LWLockRelease(NotifyQueueLock); |
| } |
| PG_END_TRY(); |
| |
| /* Done with snapshot */ |
| UnregisterSnapshot(snapshot); |
| } |
| |
| /* |
| * Fetch notifications from the shared queue, beginning at position current, |
| * and deliver relevant ones to my frontend. |
| * |
| * The current page must have been fetched into page_buffer from shared |
| * memory. (We could access the page right in shared memory, but that |
| * would imply holding the NotifySLRULock throughout this routine.) |
| * |
| * We stop if we reach the "stop" position, or reach a notification from an |
| * uncommitted transaction, or reach the end of the page. |
| * |
| * The function returns true once we have reached the stop position or an |
| * uncommitted notification, and false if we have finished with the page. |
| * In other words: once it returns true there is no need to look further. |
| * The QueuePosition *current is advanced past all processed messages. |
| */ |
| static bool |
| asyncQueueProcessPageEntries(volatile QueuePosition *current, |
| QueuePosition stop, |
| char *page_buffer, |
| Snapshot snapshot) |
| { |
| bool reachedStop = false; |
| bool reachedEndOfPage; |
| AsyncQueueEntry *qe; |
| |
| do |
| { |
| QueuePosition thisentry = *current; |
| |
| if (QUEUE_POS_EQUAL(thisentry, stop)) |
| break; |
| |
| qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry)); |
| |
| /* |
| * Advance *current over this message, possibly to the next page. As |
| * noted in the comments for asyncQueueReadAllNotifications, we must |
| * do this before possibly failing while processing the message. |
| */ |
| reachedEndOfPage = asyncQueueAdvance(current, qe->length); |
| |
| /* Ignore messages destined for other databases */ |
| if (qe->dboid == MyDatabaseId) |
| { |
| if (XidInMVCCSnapshot_Local(qe->xid, snapshot)) |
| { |
| /* |
| * The source transaction is still in progress, so we can't |
| * process this message yet. Break out of the loop, but first |
| * back up *current so we will reprocess the message next |
| * time. (Note: it is unlikely but not impossible for |
| * TransactionIdDidCommit to fail, so we can't really avoid |
| * this advance-then-back-up behavior when dealing with an |
| * uncommitted message.) |
| * |
| * Note that we must test XidInMVCCSnapshot before we test |
| * TransactionIdDidCommit, else we might return a message from |
| * a transaction that is not yet visible to snapshots; compare |
| * the comments at the head of heapam_visibility.c. |
| * |
| * Also, while our own xact won't be listed in the snapshot, |
| * we need not check for TransactionIdIsCurrentTransactionId |
| * because our transaction cannot (yet) have queued any |
| * messages. |
| */ |
| *current = thisentry; |
| reachedStop = true; |
| break; |
| } |
| else if (TransactionIdDidCommit(qe->xid)) |
| { |
| /* qe->data is the null-terminated channel name */ |
| char *channel = qe->data; |
| |
| if (IsListeningOn(channel)) |
| { |
| /* payload follows channel name */ |
| char *payload = qe->data + strlen(channel) + 1; |
| |
| NotifyMyFrontEnd(channel, payload, qe->srcPid); |
| } |
| } |
| else |
| { |
| /* |
| * The source transaction aborted or crashed, so we just |
| * ignore its notifications. |
| */ |
| } |
| } |
| |
| /* Loop back if we're not at end of page */ |
| } while (!reachedEndOfPage); |
| |
| if (QUEUE_POS_EQUAL(*current, stop)) |
| reachedStop = true; |
| |
| return reachedStop; |
| } |
| |
| /* |
| * Advance the shared queue tail variable to the minimum of all the |
| * per-backend tail pointers. Truncate pg_notify space if possible. |
| * |
| * This is (usually) called during CommitTransaction(), so it's important for |
| * it to have very low probability of failure. |
| */ |
| static void |
| asyncQueueAdvanceTail(void) |
| { |
| QueuePosition min; |
| int oldtailpage; |
| int newtailpage; |
| int boundary; |
| |
| /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */ |
| LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE); |
| |
| /* |
| * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact |
| * (ie, exactly match at least one backend's queue position), so it must |
| * be updated atomically with the actual computation. Since v13, we could |
| * get away with not doing it like that, but it seems prudent to keep it |
| * so. |
| * |
| * Also, because incoming backends will scan forward from QUEUE_TAIL, that |
| * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is |
| * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest |
| * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL), |
| * there are pages we can truncate but haven't yet finished doing so. |
| * |
| * For concurrency's sake, we don't want to hold NotifyQueueLock while |
| * performing SimpleLruTruncate. This is OK because no backend will try |
| * to access the pages we are in the midst of truncating. |
| */ |
| LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); |
| min = QUEUE_HEAD; |
| for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) |
| { |
| Assert(QUEUE_BACKEND_PID(i) != InvalidPid); |
| min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); |
| } |
| QUEUE_TAIL = min; |
| oldtailpage = QUEUE_STOP_PAGE; |
| LWLockRelease(NotifyQueueLock); |
| |
| /* |
| * We can truncate something if the global tail advanced across an SLRU |
| * segment boundary. |
| * |
| * XXX it might be better to truncate only once every several segments, to |
| * reduce the number of directory scans. |
| */ |
| newtailpage = QUEUE_POS_PAGE(min); |
| boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT); |
| if (asyncQueuePagePrecedes(oldtailpage, boundary)) |
| { |
| /* |
| * SimpleLruTruncate() will ask for NotifySLRULock but will also |
| * release the lock again. |
| */ |
| SimpleLruTruncate(NotifyCtl, newtailpage); |
| |
| /* |
| * Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict |
| * for the segment immediately prior to the old tail, allowing fresh |
| * data into that segment. |
| */ |
| LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); |
| QUEUE_STOP_PAGE = newtailpage; |
| LWLockRelease(NotifyQueueLock); |
| } |
| |
| LWLockRelease(NotifyQueueTailLock); |
| } |
| |
| /* |
| * ProcessIncomingNotify |
| * |
| * Scan the queue for arriving notifications and report them to the front |
| * end. The notifications might be from other sessions, or our own; |
| * there's no need to distinguish here. |
| * |
| * If "flush" is true, force any frontend messages out immediately. |
| * |
| * NOTE: since we are outside any transaction, we must create our own. |
| */ |
| static void |
| ProcessIncomingNotify(bool flush) |
| { |
| bool client_wait_timeout_enabled; |
| |
| /* We *must* reset the flag */ |
| notifyInterruptPending = false; |
| |
| /* Do nothing else if we aren't actively listening */ |
| if (listenChannels == NIL) |
| return; |
| |
| client_wait_timeout_enabled = DisableClientWaitTimeoutInterrupt(); |
| |
| if (Trace_notify) |
| elog(DEBUG1, "ProcessIncomingNotify"); |
| |
| set_ps_display("notify interrupt"); |
| |
| /* |
| * We must run asyncQueueReadAllNotifications inside a transaction, else |
| * bad things happen if it gets an error. |
| */ |
| StartTransactionCommand(); |
| |
| asyncQueueReadAllNotifications(); |
| |
| CommitTransactionCommand(); |
| |
| /* |
| * If this isn't an end-of-command case, we must flush the notify messages |
| * to ensure frontend gets them promptly. |
| */ |
| if (flush) |
| pq_flush(); |
| |
| set_ps_display("idle"); |
| |
| if (Trace_notify) |
| elog(DEBUG1, "ProcessIncomingNotify: done"); |
| |
| if (client_wait_timeout_enabled) |
| EnableClientWaitTimeoutInterrupt(); |
| } |
| |
| /* |
| * Send NOTIFY message to my front end. |
| * |
| * GPDB: We have exposed this function globally for our dispatch-notify |
| * mechanism. We overload the srcPid field to pass in the gp_session_id |
| * from GPDB specific callsites. |
| */ |
| void |
| NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) |
| { |
| if (whereToSendOutput == DestRemote) |
| { |
| StringInfoData buf; |
| |
| pq_beginmessage(&buf, 'A'); |
| pq_sendint32(&buf, srcPid); |
| pq_sendstring(&buf, channel); |
| pq_sendstring(&buf, payload); |
| pq_endmessage(&buf); |
| |
| /* |
| * NOTE: we do not do pq_flush() here. Some level of caller will |
| * handle it later, allowing this message to be combined into a packet |
| * with other ones. |
| */ |
| } |
| else |
| elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload); |
| } |
| |
| /* Does pendingNotifies include a match for the given event? */ |
| static bool |
| AsyncExistsPendingNotify(Notification *n) |
| { |
| if (pendingNotifies == NULL) |
| return false; |
| |
| if (pendingNotifies->hashtab != NULL) |
| { |
| /* Use the hash table to probe for a match */ |
| if (hash_search(pendingNotifies->hashtab, |
| &n, |
| HASH_FIND, |
| NULL)) |
| return true; |
| } |
| else |
| { |
| /* Must scan the event list */ |
| ListCell *l; |
| |
| foreach(l, pendingNotifies->events) |
| { |
| Notification *oldn = (Notification *) lfirst(l); |
| |
| if (n->channel_len == oldn->channel_len && |
| n->payload_len == oldn->payload_len && |
| memcmp(n->data, oldn->data, |
| n->channel_len + n->payload_len + 2) == 0) |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| /* |
| * Add a notification event to a pre-existing pendingNotifies list. |
| * |
| * Because pendingNotifies->events is already nonempty, this works |
| * correctly no matter what CurrentMemoryContext is. |
| */ |
| static void |
| AddEventToPendingNotifies(Notification *n) |
| { |
| Assert(pendingNotifies->events != NIL); |
| |
| /* Create the hash table if it's time to */ |
| if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES && |
| pendingNotifies->hashtab == NULL) |
| { |
| HASHCTL hash_ctl; |
| ListCell *l; |
| |
| /* Create the hash table */ |
| hash_ctl.keysize = sizeof(Notification *); |
| hash_ctl.entrysize = sizeof(NotificationHash); |
| hash_ctl.hash = notification_hash; |
| hash_ctl.match = notification_match; |
| hash_ctl.hcxt = CurTransactionContext; |
| pendingNotifies->hashtab = |
| hash_create("Pending Notifies", |
| 256L, |
| &hash_ctl, |
| HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT); |
| |
| /* Insert all the already-existing events */ |
| foreach(l, pendingNotifies->events) |
| { |
| Notification *oldn = (Notification *) lfirst(l); |
| NotificationHash *hentry; |
| bool found; |
| |
| hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab, |
| &oldn, |
| HASH_ENTER, |
| &found); |
| Assert(!found); |
| hentry->event = oldn; |
| } |
| } |
| |
| /* Add new event to the list, in order */ |
| pendingNotifies->events = lappend(pendingNotifies->events, n); |
| |
| /* Add event to the hash table if needed */ |
| if (pendingNotifies->hashtab != NULL) |
| { |
| NotificationHash *hentry; |
| bool found; |
| |
| hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab, |
| &n, |
| HASH_ENTER, |
| &found); |
| Assert(!found); |
| hentry->event = n; |
| } |
| } |
| |
| /* |
| * notification_hash: hash function for notification hash table |
| * |
| * The hash "keys" are pointers to Notification structs. |
| */ |
| static uint32 |
| notification_hash(const void *key, Size keysize) |
| { |
| const Notification *k = *(const Notification *const *) key; |
| |
| Assert(keysize == sizeof(Notification *)); |
| /* We don't bother to include the payload's trailing null in the hash */ |
| return DatumGetUInt32(hash_any((const unsigned char *) k->data, |
| k->channel_len + k->payload_len + 1)); |
| } |
| |
| /* |
| * notification_match: match function to use with notification_hash |
| */ |
| static int |
| notification_match(const void *key1, const void *key2, Size keysize) |
| { |
| const Notification *k1 = *(const Notification *const *) key1; |
| const Notification *k2 = *(const Notification *const *) key2; |
| |
| Assert(keysize == sizeof(Notification *)); |
| if (k1->channel_len == k2->channel_len && |
| k1->payload_len == k2->payload_len && |
| memcmp(k1->data, k2->data, |
| k1->channel_len + k1->payload_len + 2) == 0) |
| return 0; /* equal */ |
| return 1; /* not equal */ |
| } |
| |
| /* Clear the pendingActions and pendingNotifies lists. */ |
| static void |
| ClearPendingActionsAndNotifies(void) |
| { |
| /* |
| * Everything's allocated in either TopTransactionContext or the context |
| * for the subtransaction to which it corresponds. So, there's nothing to |
| * do here except reset the pointers; the space will be reclaimed when the |
| * contexts are deleted. |
| */ |
| pendingActions = NULL; |
| pendingNotifies = NULL; |
| } |