| /*------------------------------------------------------------------------- |
| * |
| * shm_mq.c |
| * single-reader, single-writer shared memory message queue |
| * |
| * Both the sender and the receiver must have a PGPROC; their respective |
| * process latches are used for synchronization. Only the sender may send, |
| * and only the receiver may receive. This is intended to allow a user |
| * backend to communicate with worker backends that it has registered. |
| * |
| * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * src/backend/storage/ipc/shm_mq.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include "miscadmin.h" |
| #include "pgstat.h" |
| #include "port/pg_bitutils.h" |
| #include "postmaster/bgworker.h" |
| #include "storage/procsignal.h" |
| #include "storage/shm_mq.h" |
| #include "storage/spin.h" |
| #include "utils/memutils.h" |
| |
| /* |
| * This structure represents the actual queue, stored in shared memory. |
| * |
| * Some notes on synchronization: |
| * |
| * mq_receiver and mq_bytes_read can only be changed by the receiver; and |
| * mq_sender and mq_bytes_written can only be changed by the sender. |
| * mq_receiver and mq_sender are protected by mq_mutex, although, importantly, |
| * they cannot change once set, and thus may be read without a lock once this |
| * is known to be the case. |
| * |
| * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead, |
| * they are written atomically using 8 byte loads and stores. Memory barriers |
| * must be carefully used to synchronize reads and writes of these values with |
| * reads and writes of the actual data in mq_ring. |
| * |
| * mq_detached needs no locking. It can be set by either the sender or the |
| * receiver, but only ever from false to true, so redundant writes don't |
| * matter. It is important that if we set mq_detached and then set the |
| * counterparty's latch, the counterparty must be certain to see the change |
| * after waking up. Since SetLatch begins with a memory barrier and ResetLatch |
| * ends with one, this should be OK. |
| * |
| * mq_ring_size and mq_ring_offset never change after initialization, and |
| * can therefore be read without the lock. |
| * |
| * Importantly, mq_ring can be safely read and written without a lock. |
| * At any given time, the difference between mq_bytes_read and |
| * mq_bytes_written defines the number of bytes within mq_ring that contain |
| * unread data, and mq_bytes_read defines the position where those bytes |
| * begin. The sender can increase the number of unread bytes at any time, |
| * but only the receiver can give license to overwrite those bytes, by |
| * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read |
| * the unread bytes it knows to be present without the lock. Conversely, |
| * the sender can write to the unused portion of the ring buffer without |
| * the lock, because nobody else can be reading or writing those bytes. The |
| * receiver could be making more bytes unused by incrementing mq_bytes_read, |
| * but that's OK. Note that it would be unsafe for the receiver to read any |
| * data it's already marked as read, or to write any data; and it would be |
| * unsafe for the sender to reread any data after incrementing |
| * mq_bytes_written, but fortunately there's no need for any of that. |
| */ |
| struct shm_mq |
| { |
| slock_t mq_mutex; |
| PGPROC *mq_receiver; |
| PGPROC *mq_sender; |
| pg_atomic_uint64 mq_bytes_read; |
| pg_atomic_uint64 mq_bytes_written; |
| Size mq_ring_size; |
| bool mq_detached; |
| uint8 mq_ring_offset; |
| char mq_ring[FLEXIBLE_ARRAY_MEMBER]; |
| }; |
| |
| /* |
| * This structure is a backend-private handle for access to a queue. |
| * |
| * mqh_queue is a pointer to the queue we've attached, and mqh_segment is |
| * an optional pointer to the dynamic shared memory segment that contains it. |
| * (If mqh_segment is provided, we register an on_dsm_detach callback to |
| * make sure we detach from the queue before detaching from DSM.) |
| * |
| * If this queue is intended to connect the current process with a background |
| * worker that started it, the user can pass a pointer to the worker handle |
| * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this |
| * is to allow us to begin sending to or receiving from that queue before the |
| * process we'll be communicating with has even been started. If it fails |
| * to start, the handle will allow us to notice that and fail cleanly, rather |
| * than waiting forever; see shm_mq_wait_internal. This is mostly useful in |
| * simple cases - e.g. where there are just 2 processes communicating; in |
| * more complex scenarios, every process may not have a BackgroundWorkerHandle |
| * available, or may need to watch for the failure of more than one other |
| * process at a time. |
| * |
| * When a message exists as a contiguous chunk of bytes in the queue - that is, |
| * it is smaller than the size of the ring buffer and does not wrap around |
| * the end - we return the message to the caller as a pointer into the buffer. |
| * For messages that are larger or happen to wrap, we reassemble the message |
| * locally by copying the chunks into a backend-local buffer. mqh_buffer is |
| * the buffer, and mqh_buflen is the number of bytes allocated for it. |
| * |
| * mqh_send_pending, is number of bytes that is written to the queue but not |
| * yet updated in the shared memory. We will not update it until the written |
| * data is 1/4th of the ring size or the tuple queue is full. This will |
| * prevent frequent CPU cache misses, and it will also avoid frequent |
| * SetLatch() calls, which are quite expensive. |
| * |
| * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete |
| * are used to track the state of non-blocking operations. When the caller |
| * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they |
| * are expected to retry the call at a later time with the same argument; |
| * we need to retain enough state to pick up where we left off. |
| * mqh_length_word_complete tracks whether we are done sending or receiving |
| * (whichever we're doing) the entire length word. mqh_partial_bytes tracks |
| * the number of bytes read or written for either the length word or the |
| * message itself, and mqh_expected_bytes - which is used only for reads - |
| * tracks the expected total size of the payload. |
| * |
| * mqh_counterparty_attached tracks whether we know the counterparty to have |
| * attached to the queue at some previous point. This lets us avoid some |
| * mutex acquisitions. |
| * |
| * mqh_context is the memory context in effect at the time we attached to |
| * the shm_mq. The shm_mq_handle itself is allocated in this context, and |
| * we make sure any other allocations we do happen in this context as well, |
| * to avoid nasty surprises. |
| */ |
| struct shm_mq_handle |
| { |
| shm_mq *mqh_queue; |
| dsm_segment *mqh_segment; |
| BackgroundWorkerHandle *mqh_handle; |
| char *mqh_buffer; |
| Size mqh_buflen; |
| Size mqh_consume_pending; |
| Size mqh_send_pending; |
| Size mqh_partial_bytes; |
| Size mqh_expected_bytes; |
| bool mqh_length_word_complete; |
| bool mqh_counterparty_attached; |
| MemoryContext mqh_context; |
| }; |
| |
| static void shm_mq_detach_internal(shm_mq *mq); |
| static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, |
| const void *data, bool nowait, Size *bytes_written); |
| static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, |
| Size bytes_needed, bool nowait, Size *nbytesp, |
| void **datap); |
| static bool shm_mq_counterparty_gone(shm_mq *mq, |
| BackgroundWorkerHandle *handle); |
| static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, |
| BackgroundWorkerHandle *handle); |
| static void shm_mq_inc_bytes_read(shm_mq *mq, Size n); |
| static void shm_mq_inc_bytes_written(shm_mq *mq, Size n); |
| static void shm_mq_detach_callback(dsm_segment *seg, Datum arg); |
| |
| /* Minimum queue size is enough for header and at least one chunk of data. */ |
| const Size shm_mq_minimum_size = |
| MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF; |
| |
| #define MQH_INITIAL_BUFSIZE 8192 |
| |
| /* |
| * Initialize a new shared message queue. |
| */ |
| shm_mq * |
| shm_mq_create(void *address, Size size) |
| { |
| shm_mq *mq = address; |
| Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring)); |
| |
| /* If the size isn't MAXALIGN'd, just discard the odd bytes. */ |
| size = MAXALIGN_DOWN(size); |
| |
| /* Queue size must be large enough to hold some data. */ |
| Assert(size > data_offset); |
| |
| /* Initialize queue header. */ |
| SpinLockInit(&mq->mq_mutex); |
| mq->mq_receiver = NULL; |
| mq->mq_sender = NULL; |
| pg_atomic_init_u64(&mq->mq_bytes_read, 0); |
| pg_atomic_init_u64(&mq->mq_bytes_written, 0); |
| mq->mq_ring_size = size - data_offset; |
| mq->mq_detached = false; |
| mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring); |
| |
| return mq; |
| } |
| |
| /* |
| * Set the identity of the process that will receive from a shared message |
| * queue. |
| */ |
| void |
| shm_mq_set_receiver(shm_mq *mq, PGPROC *proc) |
| { |
| PGPROC *sender; |
| |
| SpinLockAcquire(&mq->mq_mutex); |
| Assert(mq->mq_receiver == NULL); |
| mq->mq_receiver = proc; |
| sender = mq->mq_sender; |
| SpinLockRelease(&mq->mq_mutex); |
| |
| if (sender != NULL) |
| SetLatch(&sender->procLatch); |
| } |
| |
| /* |
| * Set the identity of the process that will send to a shared message queue. |
| */ |
| void |
| shm_mq_set_sender(shm_mq *mq, PGPROC *proc) |
| { |
| PGPROC *receiver; |
| |
| SpinLockAcquire(&mq->mq_mutex); |
| Assert(mq->mq_sender == NULL); |
| mq->mq_sender = proc; |
| receiver = mq->mq_receiver; |
| SpinLockRelease(&mq->mq_mutex); |
| |
| if (receiver != NULL) |
| SetLatch(&receiver->procLatch); |
| } |
| |
| /* |
| * Get the configured receiver. |
| */ |
| PGPROC * |
| shm_mq_get_receiver(shm_mq *mq) |
| { |
| PGPROC *receiver; |
| |
| SpinLockAcquire(&mq->mq_mutex); |
| receiver = mq->mq_receiver; |
| SpinLockRelease(&mq->mq_mutex); |
| |
| return receiver; |
| } |
| |
| /* |
| * Get the configured sender. |
| */ |
| PGPROC * |
| shm_mq_get_sender(shm_mq *mq) |
| { |
| PGPROC *sender; |
| |
| SpinLockAcquire(&mq->mq_mutex); |
| sender = mq->mq_sender; |
| SpinLockRelease(&mq->mq_mutex); |
| |
| return sender; |
| } |
| |
| /* |
| * Attach to a shared message queue so we can send or receive messages. |
| * |
| * The memory context in effect at the time this function is called should |
| * be one which will last for at least as long as the message queue itself. |
| * We'll allocate the handle in that context, and future allocations that |
| * are needed to buffer incoming data will happen in that context as well. |
| * |
| * If seg != NULL, the queue will be automatically detached when that dynamic |
| * shared memory segment is detached. |
| * |
| * If handle != NULL, the queue can be read or written even before the |
| * other process has attached. We'll wait for it to do so if needed. The |
| * handle must be for a background worker initialized with bgw_notify_pid |
| * equal to our PID. |
| * |
| * shm_mq_detach() should be called when done. This will free the |
| * shm_mq_handle and mark the queue itself as detached, so that our |
| * counterpart won't get stuck waiting for us to fill or drain the queue |
| * after we've already lost interest. |
| */ |
| shm_mq_handle * |
| shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) |
| { |
| shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle)); |
| |
| Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc); |
| mqh->mqh_queue = mq; |
| mqh->mqh_segment = seg; |
| mqh->mqh_handle = handle; |
| mqh->mqh_buffer = NULL; |
| mqh->mqh_buflen = 0; |
| mqh->mqh_consume_pending = 0; |
| mqh->mqh_send_pending = 0; |
| mqh->mqh_partial_bytes = 0; |
| mqh->mqh_expected_bytes = 0; |
| mqh->mqh_length_word_complete = false; |
| mqh->mqh_counterparty_attached = false; |
| mqh->mqh_context = CurrentMemoryContext; |
| |
| if (seg != NULL) |
| on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq)); |
| |
| return mqh; |
| } |
| |
| /* |
| * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had |
| * been passed to shm_mq_attach. |
| */ |
| void |
| shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle) |
| { |
| Assert(mqh->mqh_handle == NULL); |
| mqh->mqh_handle = handle; |
| } |
| |
| /* |
| * Write a message into a shared message queue. |
| */ |
| shm_mq_result |
| shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, |
| bool force_flush) |
| { |
| shm_mq_iovec iov; |
| |
| iov.data = data; |
| iov.len = nbytes; |
| |
| return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush); |
| } |
| |
| /* |
| * Write a message into a shared message queue, gathered from multiple |
| * addresses. |
| * |
| * When nowait = false, we'll wait on our process latch when the ring buffer |
| * fills up, and then continue writing once the receiver has drained some data. |
| * The process latch is reset after each wait. |
| * |
| * When nowait = true, we do not manipulate the state of the process latch; |
| * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In |
| * this case, the caller should call this function again, with the same |
| * arguments, each time the process latch is set. (Once begun, the sending |
| * of a message cannot be aborted except by detaching from the queue; changing |
| * the length or payload will corrupt the queue.) |
| * |
| * When force_flush = true, we immediately update the shm_mq's mq_bytes_written |
| * and notify the receiver (if it is already attached). Otherwise, we don't |
| * update it until we have written an amount of data greater than 1/4th of the |
| * ring size. |
| */ |
| shm_mq_result |
| shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, |
| bool force_flush) |
| { |
| shm_mq_result res; |
| shm_mq *mq = mqh->mqh_queue; |
| PGPROC *receiver; |
| Size nbytes = 0; |
| Size bytes_written; |
| int i; |
| int which_iov = 0; |
| Size offset; |
| |
| Assert(mq->mq_sender == MyProc); |
| |
| /* Compute total size of write. */ |
| for (i = 0; i < iovcnt; ++i) |
| nbytes += iov[i].len; |
| |
| /* Prevent writing messages overwhelming the receiver. */ |
| if (nbytes > MaxAllocSize) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| errmsg("cannot send a message of size %zu via shared memory queue", |
| nbytes))); |
| |
| /* Try to write, or finish writing, the length word into the buffer. */ |
| while (!mqh->mqh_length_word_complete) |
| { |
| Assert(mqh->mqh_partial_bytes < sizeof(Size)); |
| res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes, |
| ((char *) &nbytes) + mqh->mqh_partial_bytes, |
| nowait, &bytes_written); |
| |
| if (res == SHM_MQ_DETACHED) |
| { |
| /* Reset state in case caller tries to send another message. */ |
| mqh->mqh_partial_bytes = 0; |
| mqh->mqh_length_word_complete = false; |
| return res; |
| } |
| mqh->mqh_partial_bytes += bytes_written; |
| |
| if (mqh->mqh_partial_bytes >= sizeof(Size)) |
| { |
| Assert(mqh->mqh_partial_bytes == sizeof(Size)); |
| |
| mqh->mqh_partial_bytes = 0; |
| mqh->mqh_length_word_complete = true; |
| } |
| |
| if (res != SHM_MQ_SUCCESS) |
| return res; |
| |
| /* Length word can't be split unless bigger than required alignment. */ |
| Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF); |
| } |
| |
| /* Write the actual data bytes into the buffer. */ |
| Assert(mqh->mqh_partial_bytes <= nbytes); |
| offset = mqh->mqh_partial_bytes; |
| do |
| { |
| Size chunksize; |
| |
| /* Figure out which bytes need to be sent next. */ |
| if (offset >= iov[which_iov].len) |
| { |
| offset -= iov[which_iov].len; |
| ++which_iov; |
| if (which_iov >= iovcnt) |
| break; |
| continue; |
| } |
| |
| /* |
| * We want to avoid copying the data if at all possible, but every |
| * chunk of bytes we write into the queue has to be MAXALIGN'd, except |
| * the last. Thus, if a chunk other than the last one ends on a |
| * non-MAXALIGN'd boundary, we have to combine the tail end of its |
| * data with data from one or more following chunks until we either |
| * reach the last chunk or accumulate a number of bytes which is |
| * MAXALIGN'd. |
| */ |
| if (which_iov + 1 < iovcnt && |
| offset + MAXIMUM_ALIGNOF > iov[which_iov].len) |
| { |
| char tmpbuf[MAXIMUM_ALIGNOF]; |
| int j = 0; |
| |
| for (;;) |
| { |
| if (offset < iov[which_iov].len) |
| { |
| tmpbuf[j] = iov[which_iov].data[offset]; |
| j++; |
| offset++; |
| if (j == MAXIMUM_ALIGNOF) |
| break; |
| } |
| else |
| { |
| offset -= iov[which_iov].len; |
| which_iov++; |
| if (which_iov >= iovcnt) |
| break; |
| } |
| } |
| |
| res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written); |
| |
| if (res == SHM_MQ_DETACHED) |
| { |
| /* Reset state in case caller tries to send another message. */ |
| mqh->mqh_partial_bytes = 0; |
| mqh->mqh_length_word_complete = false; |
| return res; |
| } |
| |
| mqh->mqh_partial_bytes += bytes_written; |
| if (res != SHM_MQ_SUCCESS) |
| return res; |
| continue; |
| } |
| |
| /* |
| * If this is the last chunk, we can write all the data, even if it |
| * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to |
| * MAXALIGN_DOWN the write size. |
| */ |
| chunksize = iov[which_iov].len - offset; |
| if (which_iov + 1 < iovcnt) |
| chunksize = MAXALIGN_DOWN(chunksize); |
| res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset], |
| nowait, &bytes_written); |
| |
| if (res == SHM_MQ_DETACHED) |
| { |
| /* Reset state in case caller tries to send another message. */ |
| mqh->mqh_length_word_complete = false; |
| mqh->mqh_partial_bytes = 0; |
| return res; |
| } |
| |
| mqh->mqh_partial_bytes += bytes_written; |
| offset += bytes_written; |
| if (res != SHM_MQ_SUCCESS) |
| return res; |
| } while (mqh->mqh_partial_bytes < nbytes); |
| |
| /* Reset for next message. */ |
| mqh->mqh_partial_bytes = 0; |
| mqh->mqh_length_word_complete = false; |
| |
| /* If queue has been detached, let caller know. */ |
| if (mq->mq_detached) |
| return SHM_MQ_DETACHED; |
| |
| /* |
| * If the counterparty is known to have attached, we can read mq_receiver |
| * without acquiring the spinlock. Otherwise, more caution is needed. |
| */ |
| if (mqh->mqh_counterparty_attached) |
| receiver = mq->mq_receiver; |
| else |
| { |
| SpinLockAcquire(&mq->mq_mutex); |
| receiver = mq->mq_receiver; |
| SpinLockRelease(&mq->mq_mutex); |
| if (receiver != NULL) |
| mqh->mqh_counterparty_attached = true; |
| } |
| |
| /* |
| * If the caller has requested force flush or we have written more than |
| * 1/4 of the ring size, mark it as written in shared memory and notify |
| * the receiver. |
| */ |
| if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2)) |
| { |
| shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending); |
| if (receiver != NULL) |
| SetLatch(&receiver->procLatch); |
| mqh->mqh_send_pending = 0; |
| } |
| |
| return SHM_MQ_SUCCESS; |
| } |
| |
| /* |
| * Receive a message from a shared message queue. |
| * |
| * We set *nbytes to the message length and *data to point to the message |
| * payload. If the entire message exists in the queue as a single, |
| * contiguous chunk, *data will point directly into shared memory; otherwise, |
| * it will point to a temporary buffer. This mostly avoids data copying in |
| * the hoped-for case where messages are short compared to the buffer size, |
| * while still allowing longer messages. In either case, the return value |
| * remains valid until the next receive operation is performed on the queue. |
| * |
| * When nowait = false, we'll wait on our process latch when the ring buffer |
| * is empty and we have not yet received a full message. The sender will |
| * set our process latch after more data has been written, and we'll resume |
| * processing. Each call will therefore return a complete message |
| * (unless the sender detaches the queue). |
| * |
| * When nowait = true, we do not manipulate the state of the process latch; |
| * instead, whenever the buffer is empty and we need to read from it, we |
| * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this |
| * function again after the process latch has been set. |
| */ |
| shm_mq_result |
| shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) |
| { |
| shm_mq *mq = mqh->mqh_queue; |
| shm_mq_result res; |
| Size rb = 0; |
| Size nbytes; |
| void *rawdata; |
| |
| Assert(mq->mq_receiver == MyProc); |
| |
| /* We can't receive data until the sender has attached. */ |
| if (!mqh->mqh_counterparty_attached) |
| { |
| if (nowait) |
| { |
| int counterparty_gone; |
| |
| /* |
| * We shouldn't return at this point at all unless the sender |
| * hasn't attached yet. However, the correct return value depends |
| * on whether the sender is still attached. If we first test |
| * whether the sender has ever attached and then test whether the |
| * sender has detached, there's a race condition: a sender that |
| * attaches and detaches very quickly might fool us into thinking |
| * the sender never attached at all. So, test whether our |
| * counterparty is definitively gone first, and only afterwards |
| * check whether the sender ever attached in the first place. |
| */ |
| counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle); |
| if (shm_mq_get_sender(mq) == NULL) |
| { |
| if (counterparty_gone) |
| return SHM_MQ_DETACHED; |
| else |
| return SHM_MQ_WOULD_BLOCK; |
| } |
| } |
| else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle) |
| && shm_mq_get_sender(mq) == NULL) |
| { |
| mq->mq_detached = true; |
| return SHM_MQ_DETACHED; |
| } |
| mqh->mqh_counterparty_attached = true; |
| } |
| |
| /* |
| * If we've consumed an amount of data greater than 1/4th of the ring |
| * size, mark it consumed in shared memory. We try to avoid doing this |
| * unnecessarily when only a small amount of data has been consumed, |
| * because SetLatch() is fairly expensive and we don't want to do it too |
| * often. |
| */ |
| if (mqh->mqh_consume_pending > mq->mq_ring_size / 4) |
| { |
| shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending); |
| mqh->mqh_consume_pending = 0; |
| } |
| |
| /* Try to read, or finish reading, the length word from the buffer. */ |
| while (!mqh->mqh_length_word_complete) |
| { |
| /* Try to receive the message length word. */ |
| Assert(mqh->mqh_partial_bytes < sizeof(Size)); |
| res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes, |
| nowait, &rb, &rawdata); |
| if (res != SHM_MQ_SUCCESS) |
| return res; |
| |
| /* |
| * Hopefully, we'll receive the entire message length word at once. |
| * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over |
| * multiple reads. |
| */ |
| if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size)) |
| { |
| Size needed; |
| |
| nbytes = *(Size *) rawdata; |
| |
| /* If we've already got the whole message, we're done. */ |
| needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes); |
| if (rb >= needed) |
| { |
| mqh->mqh_consume_pending += needed; |
| *nbytesp = nbytes; |
| *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size)); |
| return SHM_MQ_SUCCESS; |
| } |
| |
| /* |
| * We don't have the whole message, but we at least have the whole |
| * length word. |
| */ |
| mqh->mqh_expected_bytes = nbytes; |
| mqh->mqh_length_word_complete = true; |
| mqh->mqh_consume_pending += MAXALIGN(sizeof(Size)); |
| rb -= MAXALIGN(sizeof(Size)); |
| } |
| else |
| { |
| Size lengthbytes; |
| |
| /* Can't be split unless bigger than required alignment. */ |
| Assert(sizeof(Size) > MAXIMUM_ALIGNOF); |
| |
| /* Message word is split; need buffer to reassemble. */ |
| if (mqh->mqh_buffer == NULL) |
| { |
| mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, |
| MQH_INITIAL_BUFSIZE); |
| mqh->mqh_buflen = MQH_INITIAL_BUFSIZE; |
| } |
| Assert(mqh->mqh_buflen >= sizeof(Size)); |
| |
| /* Copy partial length word; remember to consume it. */ |
| if (mqh->mqh_partial_bytes + rb > sizeof(Size)) |
| lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes; |
| else |
| lengthbytes = rb; |
| memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, |
| lengthbytes); |
| mqh->mqh_partial_bytes += lengthbytes; |
| mqh->mqh_consume_pending += MAXALIGN(lengthbytes); |
| rb -= lengthbytes; |
| |
| /* If we now have the whole word, we're ready to read payload. */ |
| if (mqh->mqh_partial_bytes >= sizeof(Size)) |
| { |
| Assert(mqh->mqh_partial_bytes == sizeof(Size)); |
| mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer; |
| mqh->mqh_length_word_complete = true; |
| mqh->mqh_partial_bytes = 0; |
| } |
| } |
| } |
| nbytes = mqh->mqh_expected_bytes; |
| |
| /* |
| * Should be disallowed on the sending side already, but better check and |
| * error out on the receiver side as well rather than trying to read a |
| * prohibitively large message. |
| */ |
| if (nbytes > MaxAllocSize) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| errmsg("invalid message size %zu in shared memory queue", |
| nbytes))); |
| |
| if (mqh->mqh_partial_bytes == 0) |
| { |
| /* |
| * Try to obtain the whole message in a single chunk. If this works, |
| * we need not copy the data and can return a pointer directly into |
| * shared memory. |
| */ |
| res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata); |
| if (res != SHM_MQ_SUCCESS) |
| return res; |
| if (rb >= nbytes) |
| { |
| mqh->mqh_length_word_complete = false; |
| mqh->mqh_consume_pending += MAXALIGN(nbytes); |
| *nbytesp = nbytes; |
| *datap = rawdata; |
| return SHM_MQ_SUCCESS; |
| } |
| |
| /* |
| * The message has wrapped the buffer. We'll need to copy it in order |
| * to return it to the client in one chunk. First, make sure we have |
| * a large enough buffer available. |
| */ |
| if (mqh->mqh_buflen < nbytes) |
| { |
| Size newbuflen; |
| |
| /* |
| * Increase size to the next power of 2 that's >= nbytes, but |
| * limit to MaxAllocSize. |
| */ |
| newbuflen = pg_nextpower2_size_t(nbytes); |
| newbuflen = Min(newbuflen, MaxAllocSize); |
| |
| if (mqh->mqh_buffer != NULL) |
| { |
| pfree(mqh->mqh_buffer); |
| mqh->mqh_buffer = NULL; |
| mqh->mqh_buflen = 0; |
| } |
| mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen); |
| mqh->mqh_buflen = newbuflen; |
| } |
| } |
| |
| /* Loop until we've copied the entire message. */ |
| for (;;) |
| { |
| Size still_needed; |
| |
| /* Copy as much as we can. */ |
| Assert(mqh->mqh_partial_bytes + rb <= nbytes); |
| if (rb > 0) |
| { |
| memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb); |
| mqh->mqh_partial_bytes += rb; |
| } |
| |
| /* |
| * Update count of bytes that can be consumed, accounting for |
| * alignment padding. Note that this will never actually insert any |
| * padding except at the end of a message, because the buffer size is |
| * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well. |
| */ |
| Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb)); |
| mqh->mqh_consume_pending += MAXALIGN(rb); |
| |
| /* If we got all the data, exit the loop. */ |
| if (mqh->mqh_partial_bytes >= nbytes) |
| break; |
| |
| /* Wait for some more data. */ |
| still_needed = nbytes - mqh->mqh_partial_bytes; |
| res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata); |
| if (res != SHM_MQ_SUCCESS) |
| return res; |
| if (rb > still_needed) |
| rb = still_needed; |
| } |
| |
| /* Return the complete message, and reset for next message. */ |
| *nbytesp = nbytes; |
| *datap = mqh->mqh_buffer; |
| mqh->mqh_length_word_complete = false; |
| mqh->mqh_partial_bytes = 0; |
| return SHM_MQ_SUCCESS; |
| } |
| |
| /* |
| * Wait for the other process that's supposed to use this queue to attach |
| * to it. |
| * |
| * The return value is SHM_MQ_DETACHED if the worker has already detached or |
| * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached. |
| * Note that we will only be able to detect that the worker has died before |
| * attaching if a background worker handle was passed to shm_mq_attach(). |
| */ |
| shm_mq_result |
| shm_mq_wait_for_attach(shm_mq_handle *mqh) |
| { |
| shm_mq *mq = mqh->mqh_queue; |
| PGPROC **victim; |
| |
| if (shm_mq_get_receiver(mq) == MyProc) |
| victim = &mq->mq_sender; |
| else |
| { |
| Assert(shm_mq_get_sender(mq) == MyProc); |
| victim = &mq->mq_receiver; |
| } |
| |
| if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle)) |
| return SHM_MQ_SUCCESS; |
| else |
| return SHM_MQ_DETACHED; |
| } |
| |
| /* |
| * Detach from a shared message queue, and destroy the shm_mq_handle. |
| */ |
| void |
| shm_mq_detach(shm_mq_handle *mqh) |
| { |
| /* Before detaching, notify the receiver about any already-written data. */ |
| if (mqh->mqh_send_pending > 0) |
| { |
| shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending); |
| mqh->mqh_send_pending = 0; |
| } |
| |
| /* Notify counterparty that we're outta here. */ |
| shm_mq_detach_internal(mqh->mqh_queue); |
| |
| /* Cancel on_dsm_detach callback, if any. */ |
| if (mqh->mqh_segment) |
| cancel_on_dsm_detach(mqh->mqh_segment, |
| shm_mq_detach_callback, |
| PointerGetDatum(mqh->mqh_queue)); |
| |
| /* Release local memory associated with handle. */ |
| if (mqh->mqh_buffer != NULL) |
| pfree(mqh->mqh_buffer); |
| pfree(mqh); |
| } |
| |
| /* |
| * Notify counterparty that we're detaching from shared message queue. |
| * |
| * The purpose of this function is to make sure that the process |
| * with which we're communicating doesn't block forever waiting for us to |
| * fill or drain the queue once we've lost interest. When the sender |
| * detaches, the receiver can read any messages remaining in the queue; |
| * further reads will return SHM_MQ_DETACHED. If the receiver detaches, |
| * further attempts to send messages will likewise return SHM_MQ_DETACHED. |
| * |
| * This is separated out from shm_mq_detach() because if the on_dsm_detach |
| * callback fires, we only want to do this much. We do not try to touch |
| * the local shm_mq_handle, as it may have been pfree'd already. |
| */ |
| static void |
| shm_mq_detach_internal(shm_mq *mq) |
| { |
| PGPROC *victim; |
| |
| SpinLockAcquire(&mq->mq_mutex); |
| if (mq->mq_sender == MyProc) |
| victim = mq->mq_receiver; |
| else |
| { |
| Assert(mq->mq_receiver == MyProc); |
| victim = mq->mq_sender; |
| } |
| mq->mq_detached = true; |
| SpinLockRelease(&mq->mq_mutex); |
| |
| if (victim != NULL) |
| SetLatch(&victim->procLatch); |
| } |
| |
| /* |
| * Get the shm_mq from handle. |
| */ |
| shm_mq * |
| shm_mq_get_queue(shm_mq_handle *mqh) |
| { |
| return mqh->mqh_queue; |
| } |
| |
| /* |
| * Write bytes into a shared message queue. |
| */ |
| static shm_mq_result |
| shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, |
| bool nowait, Size *bytes_written) |
| { |
| shm_mq *mq = mqh->mqh_queue; |
| Size sent = 0; |
| uint64 used; |
| Size ringsize = mq->mq_ring_size; |
| Size available; |
| |
| while (sent < nbytes) |
| { |
| uint64 rb; |
| uint64 wb; |
| |
| /* Compute number of ring buffer bytes used and available. */ |
| rb = pg_atomic_read_u64(&mq->mq_bytes_read); |
| wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending; |
| Assert(wb >= rb); |
| used = wb - rb; |
| Assert(used <= ringsize); |
| available = Min(ringsize - used, nbytes - sent); |
| |
| /* |
| * Bail out if the queue has been detached. Note that we would be in |
| * trouble if the compiler decided to cache the value of |
| * mq->mq_detached in a register or on the stack across loop |
| * iterations. It probably shouldn't do that anyway since we'll |
| * always return, call an external function that performs a system |
| * call, or reach a memory barrier at some point later in the loop, |
| * but just to be sure, insert a compiler barrier here. |
| */ |
| pg_compiler_barrier(); |
| if (mq->mq_detached) |
| { |
| *bytes_written = sent; |
| return SHM_MQ_DETACHED; |
| } |
| |
| if (available == 0 && !mqh->mqh_counterparty_attached) |
| { |
| /* |
| * The queue is full, so if the receiver isn't yet known to be |
| * attached, we must wait for that to happen. |
| */ |
| if (nowait) |
| { |
| if (shm_mq_counterparty_gone(mq, mqh->mqh_handle)) |
| { |
| *bytes_written = sent; |
| return SHM_MQ_DETACHED; |
| } |
| if (shm_mq_get_receiver(mq) == NULL) |
| { |
| *bytes_written = sent; |
| return SHM_MQ_WOULD_BLOCK; |
| } |
| } |
| else if (!shm_mq_wait_internal(mq, &mq->mq_receiver, |
| mqh->mqh_handle)) |
| { |
| mq->mq_detached = true; |
| *bytes_written = sent; |
| return SHM_MQ_DETACHED; |
| } |
| mqh->mqh_counterparty_attached = true; |
| |
| /* |
| * The receiver may have read some data after attaching, so we |
| * must not wait without rechecking the queue state. |
| */ |
| } |
| else if (available == 0) |
| { |
| if (QueryFinishPending) |
| { |
| *bytes_written = sent; |
| return SHM_MQ_QUERY_FINISH; |
| } |
| |
| /* Update the pending send bytes in the shared memory. */ |
| shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending); |
| |
| /* |
| * Since mq->mqh_counterparty_attached is known to be true at this |
| * point, mq_receiver has been set, and it can't change once set. |
| * Therefore, we can read it without acquiring the spinlock. |
| */ |
| Assert(mqh->mqh_counterparty_attached); |
| SetLatch(&mq->mq_receiver->procLatch); |
| |
| /* |
| * We have just updated the mqh_send_pending bytes in the shared |
| * memory so reset it. |
| */ |
| mqh->mqh_send_pending = 0; |
| |
| /* Skip manipulation of our latch if nowait = true. */ |
| if (nowait) |
| { |
| *bytes_written = sent; |
| return SHM_MQ_WOULD_BLOCK; |
| } |
| |
| /* |
| * Wait for our latch to be set. It might already be set for some |
| * unrelated reason, but that'll just result in one extra trip |
| * through the loop. It's worth it to avoid resetting the latch |
| * at top of loop, because setting an already-set latch is much |
| * cheaper than setting one that has been reset. |
| */ |
| (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, |
| WAIT_EVENT_MQ_SEND); |
| |
| /* Reset the latch so we don't spin. */ |
| ResetLatch(MyLatch); |
| |
| /* An interrupt may have occurred while we were waiting. */ |
| CHECK_FOR_INTERRUPTS(); |
| } |
| else |
| { |
| Size offset; |
| Size sendnow; |
| |
| offset = wb % (uint64) ringsize; |
| sendnow = Min(available, ringsize - offset); |
| |
| /* |
| * Write as much data as we can via a single memcpy(). Make sure |
| * these writes happen after the read of mq_bytes_read, above. |
| * This barrier pairs with the one in shm_mq_inc_bytes_read. |
| * (Since we're separating the read of mq_bytes_read from a |
| * subsequent write to mq_ring, we need a full barrier here.) |
| */ |
| pg_memory_barrier(); |
| memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], |
| (char *) data + sent, sendnow); |
| sent += sendnow; |
| |
| /* |
| * Update count of bytes written, with alignment padding. Note |
| * that this will never actually insert any padding except at the |
| * end of a run of bytes, because the buffer size is a multiple of |
| * MAXIMUM_ALIGNOF, and each read is as well. |
| */ |
| Assert(sent == nbytes || sendnow == MAXALIGN(sendnow)); |
| |
| /* |
| * For efficiency, we don't update the bytes written in the shared |
| * memory and also don't set the reader's latch here. Refer to |
| * the comments atop the shm_mq_handle structure for more |
| * information. |
| */ |
| mqh->mqh_send_pending += MAXALIGN(sendnow); |
| } |
| } |
| |
| *bytes_written = sent; |
| return SHM_MQ_SUCCESS; |
| } |
| |
| /* |
| * Wait until at least *nbytesp bytes are available to be read from the |
| * shared message queue, or until the buffer wraps around. If the queue is |
| * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait |
| * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set |
| * to the location at which data bytes can be read, *nbytesp is set to the |
| * number of bytes which can be read at that address, and the return value |
| * is SHM_MQ_SUCCESS. |
| */ |
| static shm_mq_result |
| shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, |
| Size *nbytesp, void **datap) |
| { |
| shm_mq *mq = mqh->mqh_queue; |
| Size ringsize = mq->mq_ring_size; |
| uint64 used; |
| uint64 written; |
| |
| for (;;) |
| { |
| Size offset; |
| uint64 read; |
| |
| /* Get bytes written, so we can compute what's available to read. */ |
| written = pg_atomic_read_u64(&mq->mq_bytes_written); |
| |
| /* |
| * Get bytes read. Include bytes we could consume but have not yet |
| * consumed. |
| */ |
| read = pg_atomic_read_u64(&mq->mq_bytes_read) + |
| mqh->mqh_consume_pending; |
| used = written - read; |
| Assert(used <= ringsize); |
| offset = read % (uint64) ringsize; |
| |
| /* If we have enough data or buffer has wrapped, we're done. */ |
| if (used >= bytes_needed || offset + used >= ringsize) |
| { |
| *nbytesp = Min(used, ringsize - offset); |
| *datap = &mq->mq_ring[mq->mq_ring_offset + offset]; |
| |
| /* |
| * Separate the read of mq_bytes_written, above, from caller's |
| * attempt to read the data itself. Pairs with the barrier in |
| * shm_mq_inc_bytes_written. |
| */ |
| pg_read_barrier(); |
| return SHM_MQ_SUCCESS; |
| } |
| |
| /* |
| * Fall out before waiting if the queue has been detached. |
| * |
| * Note that we don't check for this until *after* considering whether |
| * the data already available is enough, since the receiver can finish |
| * receiving a message stored in the buffer even after the sender has |
| * detached. |
| */ |
| if (mq->mq_detached) |
| { |
| /* |
| * If the writer advanced mq_bytes_written and then set |
| * mq_detached, we might not have read the final value of |
| * mq_bytes_written above. Insert a read barrier and then check |
| * again if mq_bytes_written has advanced. |
| */ |
| pg_read_barrier(); |
| if (written != pg_atomic_read_u64(&mq->mq_bytes_written)) |
| continue; |
| |
| return SHM_MQ_DETACHED; |
| } |
| |
| /* |
| * We didn't get enough data to satisfy the request, so mark any data |
| * previously-consumed as read to make more buffer space. |
| */ |
| if (mqh->mqh_consume_pending > 0) |
| { |
| shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending); |
| mqh->mqh_consume_pending = 0; |
| } |
| |
| /* Skip manipulation of our latch if nowait = true. */ |
| if (nowait) |
| return SHM_MQ_WOULD_BLOCK; |
| |
| /* |
| * Wait for our latch to be set. It might already be set for some |
| * unrelated reason, but that'll just result in one extra trip through |
| * the loop. It's worth it to avoid resetting the latch at top of |
| * loop, because setting an already-set latch is much cheaper than |
| * setting one that has been reset. |
| */ |
| (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, |
| WAIT_EVENT_MQ_RECEIVE); |
| |
| /* Reset the latch so we don't spin. */ |
| ResetLatch(MyLatch); |
| |
| /* An interrupt may have occurred while we were waiting. */ |
| CHECK_FOR_INTERRUPTS(); |
| } |
| } |
| |
| /* |
| * Test whether a counterparty who may not even be alive yet is definitely gone. |
| */ |
| static bool |
| shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle) |
| { |
| pid_t pid; |
| |
| /* If the queue has been detached, counterparty is definitely gone. */ |
| if (mq->mq_detached) |
| return true; |
| |
| /* If there's a handle, check worker status. */ |
| if (handle != NULL) |
| { |
| BgwHandleStatus status; |
| |
| /* Check for unexpected worker death. */ |
| status = GetBackgroundWorkerPid(handle, &pid); |
| if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED) |
| { |
| /* Mark it detached, just to make it official. */ |
| mq->mq_detached = true; |
| return true; |
| } |
| } |
| |
| /* Counterparty is not definitively gone. */ |
| return false; |
| } |
| |
| /* |
| * This is used when a process is waiting for its counterpart to attach to the |
| * queue. We exit when the other process attaches as expected, or, if |
| * handle != NULL, when the referenced background process or the postmaster |
| * dies. Note that if handle == NULL, and the process fails to attach, we'll |
| * potentially get stuck here forever waiting for a process that may never |
| * start. We do check for interrupts, though. |
| * |
| * ptr is a pointer to the memory address that we're expecting to become |
| * non-NULL when our counterpart attaches to the queue. |
| */ |
| static bool |
| shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle) |
| { |
| bool result = false; |
| |
| for (;;) |
| { |
| BgwHandleStatus status; |
| pid_t pid; |
| |
| if (QueryFinishPending) |
| break; |
| |
| /* Acquire the lock just long enough to check the pointer. */ |
| SpinLockAcquire(&mq->mq_mutex); |
| result = (*ptr != NULL); |
| SpinLockRelease(&mq->mq_mutex); |
| |
| /* Fail if detached; else succeed if initialized. */ |
| if (mq->mq_detached) |
| { |
| result = false; |
| break; |
| } |
| if (result) |
| break; |
| |
| if (handle != NULL) |
| { |
| /* Check for unexpected worker death. */ |
| status = GetBackgroundWorkerPid(handle, &pid); |
| if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED) |
| { |
| result = false; |
| break; |
| } |
| } |
| |
| /* Wait to be signaled. */ |
| (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, |
| WAIT_EVENT_MQ_INTERNAL); |
| |
| /* Reset the latch so we don't spin. */ |
| ResetLatch(MyLatch); |
| |
| /* An interrupt may have occurred while we were waiting. */ |
| CHECK_FOR_INTERRUPTS(); |
| } |
| |
| return result; |
| } |
| |
| /* |
| * Increment the number of bytes read. |
| */ |
| static void |
| shm_mq_inc_bytes_read(shm_mq *mq, Size n) |
| { |
| PGPROC *sender; |
| |
| /* |
| * Separate prior reads of mq_ring from the increment of mq_bytes_read |
| * which follows. This pairs with the full barrier in |
| * shm_mq_send_bytes(). We only need a read barrier here because the |
| * increment of mq_bytes_read is actually a read followed by a dependent |
| * write. |
| */ |
| pg_read_barrier(); |
| |
| /* |
| * There's no need to use pg_atomic_fetch_add_u64 here, because nobody |
| * else can be changing this value. This method should be cheaper. |
| */ |
| pg_atomic_write_u64(&mq->mq_bytes_read, |
| pg_atomic_read_u64(&mq->mq_bytes_read) + n); |
| |
| /* |
| * We shouldn't have any bytes to read without a sender, so we can read |
| * mq_sender here without a lock. Once it's initialized, it can't change. |
| */ |
| sender = mq->mq_sender; |
| Assert(sender != NULL); |
| SetLatch(&sender->procLatch); |
| } |
| |
| /* |
| * Increment the number of bytes written. |
| */ |
| static void |
| shm_mq_inc_bytes_written(shm_mq *mq, Size n) |
| { |
| /* |
| * Separate prior reads of mq_ring from the write of mq_bytes_written |
| * which we're about to do. Pairs with the read barrier found in |
| * shm_mq_receive_bytes. |
| */ |
| pg_write_barrier(); |
| |
| /* |
| * There's no need to use pg_atomic_fetch_add_u64 here, because nobody |
| * else can be changing this value. This method avoids taking the bus |
| * lock unnecessarily. |
| */ |
| pg_atomic_write_u64(&mq->mq_bytes_written, |
| pg_atomic_read_u64(&mq->mq_bytes_written) + n); |
| } |
| |
| /* Shim for on_dsm_detach callback. */ |
| static void |
| shm_mq_detach_callback(dsm_segment *seg, Datum arg) |
| { |
| shm_mq *mq = (shm_mq *) DatumGetPointer(arg); |
| |
| shm_mq_detach_internal(mq); |
| } |