| /*------------------------------------------------------------------------- |
| * |
| * walreceiver.c |
| * |
| * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It |
| * is the process in the standby server that takes charge of receiving |
| * XLOG records from a primary server during streaming replication. |
| * |
| * When the startup process determines that it's time to start streaming, |
| * it instructs postmaster to start walreceiver. Walreceiver first connects |
| * to the primary server (it will be served by a walsender process |
| * in the primary server), and then keeps receiving XLOG records and |
| * writing them to the disk as long as the connection is alive. As XLOG |
| * records are received and flushed to disk, it updates the |
| * WalRcv->flushedUpto variable in shared memory, to inform the startup |
| * process of how far it can proceed with XLOG replay. |
| * |
| * A WAL receiver cannot directly load GUC parameters used when establishing |
| * its connection to the primary. Instead it relies on parameter values |
| * that are passed down by the startup process when streaming is requested. |
| * This applies, for example, to the replication slot and the connection |
| * string to be used for the connection with the primary. |
| * |
| * If the primary server ends streaming, but doesn't disconnect, walreceiver |
| * goes into "waiting" mode, and waits for the startup process to give new |
| * instructions. The startup process will treat that the same as |
| * disconnection, and will rescan the archive/pg_wal directory. But when the |
| * startup process wants to try streaming replication again, it will just |
| * nudge the existing walreceiver process that's waiting, instead of launching |
| * a new one. |
| * |
| * Normal termination is by SIGTERM, which instructs the walreceiver to |
| * exit(0). Emergency termination is by SIGQUIT; like any postmaster child |
| * process, the walreceiver will simply abort and exit on SIGQUIT. A close |
| * of the connection and a FATAL error are treated not as a crash but as |
| * normal operation. |
| * |
| * This file contains the server-facing parts of walreceiver. The libpq- |
| * specific parts are in the libpqwalreceiver module. It's loaded |
| * dynamically to avoid linking the server with libpq. |
| * |
| * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group |
| * |
| * |
| * IDENTIFICATION |
| * src/backend/replication/walreceiver.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| |
| #include <unistd.h> |
| |
| #include "access/htup_details.h" |
| #include "access/timeline.h" |
| #include "access/transam.h" |
| #include "access/xlog_internal.h" |
| #include "access/xlogarchive.h" |
| #include "catalog/pg_authid.h" |
| #include "catalog/pg_type.h" |
| #include "common/ip.h" |
| #include "funcapi.h" |
| #include "libpq/pqformat.h" |
| #include "libpq/pqsignal.h" |
| #include "miscadmin.h" |
| #include "pgstat.h" |
| #include "postmaster/interrupt.h" |
| #include "replication/walreceiver.h" |
| #include "replication/walsender.h" |
| #include "storage/ipc.h" |
| #include "storage/pmsignal.h" |
| #include "storage/proc.h" |
| #include "storage/procarray.h" |
| #include "storage/procsignal.h" |
| #include "utils/acl.h" |
| #include "utils/builtins.h" |
| #include "utils/guc.h" |
| #include "utils/pg_lsn.h" |
| #include "utils/ps_status.h" |
| #include "utils/resowner.h" |
| #include "utils/timestamp.h" |
| #include "utils/faultinjector.h" |
| #include "storage/fd.h" |
| |
| |
| /* |
| * GUC variables. (Other variables that affect walreceiver are in xlog.c |
| * because they're passed down from the startup process, for better |
| * synchronization.) |
| */ |
| int wal_receiver_status_interval; |
| int wal_receiver_timeout; |
| bool hot_standby_feedback; |
| |
| /* libpqwalreceiver connection */ |
| static WalReceiverConn *wrconn = NULL; |
| WalReceiverFunctionsType *WalReceiverFunctions = NULL; |
| |
| #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ |
| |
| /* |
| * These variables are used similarly to openLogFile/SegNo, |
| * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID |
| * corresponding the filename of recvFile. |
| */ |
| static int recvFile = -1; |
| static TimeLineID recvFileTLI = 0; |
| static XLogSegNo recvSegNo = 0; |
| |
| /* |
| * LogstreamResult indicates the byte positions that we have already |
| * written/fsynced. |
| */ |
| static struct |
| { |
| XLogRecPtr Write; /* last byte + 1 written out in the standby */ |
| XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ |
| } LogstreamResult; |
| |
| static StringInfoData reply_message; |
| static StringInfoData incoming_message; |
| |
| /* Prototypes for private functions */ |
| static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); |
| static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); |
| static void WalRcvDie(int code, Datum arg); |
| static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); |
| static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); |
| static void XLogWalRcvFlush(bool dying); |
| static void XLogWalRcvClose(XLogRecPtr recptr); |
| static void XLogWalRcvSendReply(bool force, bool requestReply); |
| static void XLogWalRcvSendHSFeedback(bool immed); |
| static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); |
| |
| /* |
| * Process any interrupts the walreceiver process may have received. |
| * This should be called any time the process's latch has become set. |
| * |
| * Currently, only SIGTERM is of interest. We can't just exit(1) within the |
| * SIGTERM signal handler, because the signal might arrive in the middle of |
| * some critical operation, like while we're holding a spinlock. Instead, the |
| * signal handler sets a flag variable as well as setting the process's latch. |
| * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the |
| * latch has become set. Operations that could block for a long time, such as |
| * reading from a remote server, must pay attention to the latch too; see |
| * libpqrcv_PQgetResult for example. |
| */ |
| void |
| ProcessWalRcvInterrupts(void) |
| { |
| /* |
| * Although walreceiver interrupt handling doesn't use the same scheme as |
| * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive |
| * any incoming signals on Win32, and also to make sure we process any |
| * barrier events. |
| */ |
| CHECK_FOR_INTERRUPTS(); |
| |
| if (ShutdownRequestPending) |
| { |
| ereport(FATAL, |
| (errcode(ERRCODE_ADMIN_SHUTDOWN), |
| errmsg("terminating walreceiver process due to administrator command"))); |
| } |
| } |
| |
| |
| /* Main entry point for walreceiver process */ |
| void |
| WalReceiverMain(void) |
| { |
| char conninfo[MAXCONNINFO]; |
| char *tmp_conninfo; |
| char slotname[NAMEDATALEN]; |
| bool is_temp_slot; |
| XLogRecPtr startpoint; |
| TimeLineID startpointTLI; |
| TimeLineID primaryTLI; |
| bool first_stream; |
| WalRcvData *walrcv = WalRcv; |
| TimestampTz last_recv_timestamp; |
| TimestampTz now; |
| bool ping_sent; |
| char *err; |
| char *sender_host = NULL; |
| int sender_port = 0; |
| |
| /* |
| * WalRcv should be set up already (if we are a backend, we inherit this |
| * by fork() or EXEC_BACKEND mechanism from the postmaster). |
| */ |
| Assert(walrcv != NULL); |
| |
| now = GetCurrentTimestamp(); |
| |
| /* |
| * Mark walreceiver as running in shared memory. |
| * |
| * Do this as early as possible, so that if we fail later on, we'll set |
| * state to STOPPED. If we die before this, the startup process will keep |
| * waiting for us to start up, until it times out. |
| */ |
| SpinLockAcquire(&walrcv->mutex); |
| Assert(walrcv->pid == 0); |
| switch (walrcv->walRcvState) |
| { |
| case WALRCV_STOPPING: |
| /* If we've already been requested to stop, don't start up. */ |
| walrcv->walRcvState = WALRCV_STOPPED; |
| /* fall through */ |
| |
| case WALRCV_STOPPED: |
| SpinLockRelease(&walrcv->mutex); |
| ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); |
| proc_exit(1); |
| break; |
| |
| case WALRCV_STARTING: |
| /* The usual case */ |
| break; |
| |
| case WALRCV_WAITING: |
| case WALRCV_STREAMING: |
| case WALRCV_RESTARTING: |
| default: |
| /* Shouldn't happen */ |
| SpinLockRelease(&walrcv->mutex); |
| elog(PANIC, "walreceiver still running according to shared memory state"); |
| } |
| /* Advertise our PID so that the startup process can kill us */ |
| walrcv->pid = MyProcPid; |
| walrcv->walRcvState = WALRCV_STREAMING; |
| |
| elogif(debug_walrepl_rcv, LOG, |
| "WAL receiver state is set to '%s'", |
| WalRcvGetStateString(walrcv->walRcvState)); |
| |
| /* Fetch information required to start streaming */ |
| walrcv->ready_to_display = false; |
| strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); |
| strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN); |
| is_temp_slot = walrcv->is_temp_slot; |
| startpoint = walrcv->receiveStart; |
| startpointTLI = walrcv->receiveStartTLI; |
| |
| /* |
| * At most one of is_temp_slot and slotname can be set; otherwise, |
| * RequestXLogStreaming messed up. |
| */ |
| Assert(!is_temp_slot || (slotname[0] == '\0')); |
| |
| /* Initialise to a sanish value */ |
| walrcv->lastMsgSendTime = |
| walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now; |
| |
| /* Report the latch to use to awaken this process */ |
| walrcv->latch = &MyProc->procLatch; |
| |
| SpinLockRelease(&walrcv->mutex); |
| |
| pg_atomic_write_u64(&WalRcv->writtenUpto, 0); |
| |
| /* Arrange to clean up at walreceiver exit */ |
| on_shmem_exit(WalRcvDie, 0); |
| |
| /* Properly accept or ignore signals the postmaster might send us */ |
| pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config |
| * file */ |
| pqsignal(SIGINT, SIG_IGN); |
| pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */ |
| /* SIGQUIT handler was already set up by InitPostmasterChild */ |
| pqsignal(SIGALRM, SIG_IGN); |
| pqsignal(SIGPIPE, SIG_IGN); |
| pqsignal(SIGUSR1, procsignal_sigusr1_handler); |
| pqsignal(SIGUSR2, SIG_IGN); |
| |
| /* Reset some signals that are accepted by postmaster but not here */ |
| pqsignal(SIGCHLD, SIG_DFL); |
| |
| /* Load the libpq-specific functions */ |
| libpqwalreceiver_PG_init(); |
| if (WalReceiverFunctions == NULL) |
| elog(ERROR, "libpqwalreceiver didn't initialize correctly"); |
| |
| /* Unblock signals (they were blocked when the postmaster forked us) */ |
| PG_SETMASK(&UnBlockSig); |
| |
| /* Establish the connection to the primary for XLOG streaming */ |
| wrconn = walrcv_connect(conninfo, false, |
| cluster_name[0] ? cluster_name : "walreceiver", |
| &err); |
| if (!wrconn) |
| ereport(ERROR, |
| (errcode(ERRCODE_CONNECTION_FAILURE), |
| errmsg("could not connect to the primary server: %s", err))); |
| |
| /* |
| * Save user-visible connection string. This clobbers the original |
| * conninfo, for security. Also save host and port of the sender server |
| * this walreceiver is connected to. |
| */ |
| tmp_conninfo = walrcv_get_conninfo(wrconn); |
| walrcv_get_senderinfo(wrconn, &sender_host, &sender_port); |
| SpinLockAcquire(&walrcv->mutex); |
| memset(walrcv->conninfo, 0, MAXCONNINFO); |
| if (tmp_conninfo) |
| strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO); |
| |
| memset(walrcv->sender_host, 0, NI_MAXHOST); |
| if (sender_host) |
| strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST); |
| |
| walrcv->sender_port = sender_port; |
| walrcv->ready_to_display = true; |
| SpinLockRelease(&walrcv->mutex); |
| |
| if (tmp_conninfo) |
| pfree(tmp_conninfo); |
| |
| if (sender_host) |
| pfree(sender_host); |
| |
| first_stream = true; |
| for (;;) |
| { |
| char *primary_sysid; |
| char standby_sysid[32]; |
| WalRcvStreamOptions options; |
| |
| /* |
| * Check that we're connected to a valid server using the |
| * IDENTIFY_SYSTEM replication command. |
| */ |
| primary_sysid = walrcv_identify_system(wrconn, &primaryTLI); |
| |
| snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, |
| GetSystemIdentifier()); |
| if (strcmp(primary_sysid, standby_sysid) != 0) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("database system identifier differs between the primary and standby"), |
| errdetail("The primary's identifier is %s, the standby's identifier is %s.", |
| primary_sysid, standby_sysid))); |
| } |
| |
| /* |
| * Confirm that the current timeline of the primary is the same or |
| * ahead of ours. |
| */ |
| if (primaryTLI < startpointTLI) |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("highest timeline %u of the primary is behind recovery timeline %u", |
| primaryTLI, startpointTLI))); |
| |
| /* |
| * Get any missing history files. We do this always, even when we're |
| * not interested in that timeline, so that if we're promoted to |
| * become the primary later on, we don't select the same timeline that |
| * was already used in the current primary. This isn't bullet-proof - |
| * you'll need some external software to manage your cluster if you |
| * need to ensure that a unique timeline id is chosen in every case, |
| * but let's avoid the confusion of timeline id collisions where we |
| * can. |
| */ |
| WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); |
| |
| /* |
| * Create temporary replication slot if requested, and update slot |
| * name in shared memory. (Note the slot name cannot already be set |
| * in this case.) |
| */ |
| if (is_temp_slot) |
| { |
| snprintf(slotname, sizeof(slotname), |
| "pg_walreceiver_%lld", |
| (long long int) walrcv_get_backend_pid(wrconn)); |
| |
| walrcv_create_slot(wrconn, slotname, true, 0, NULL); |
| |
| SpinLockAcquire(&walrcv->mutex); |
| strlcpy(walrcv->slotname, slotname, NAMEDATALEN); |
| SpinLockRelease(&walrcv->mutex); |
| } |
| |
| /* |
| * Start streaming. |
| * |
| * We'll try to start at the requested starting point and timeline, |
| * even if it's different from the server's latest timeline. In case |
| * we've already reached the end of the old timeline, the server will |
| * finish the streaming immediately, and we will go back to await |
| * orders from the startup process. If recovery_target_timeline is |
| * 'latest', the startup process will scan pg_wal and find the new |
| * history file, bump recovery target timeline, and ask us to restart |
| * on the new timeline. |
| */ |
| options.logical = false; |
| options.startpoint = startpoint; |
| options.slotname = slotname[0] != '\0' ? slotname : NULL; |
| options.proto.physical.startpointTLI = startpointTLI; |
| ThisTimeLineID = startpointTLI; |
| if (walrcv_startstreaming(wrconn, &options)) |
| { |
| if (first_stream) |
| ereport(LOG, |
| (errmsg("started streaming WAL from primary at %X/%X on timeline %u", |
| LSN_FORMAT_ARGS(startpoint), startpointTLI))); |
| else |
| ereport(LOG, |
| (errmsg("restarted WAL streaming at %X/%X on timeline %u", |
| LSN_FORMAT_ARGS(startpoint), startpointTLI))); |
| first_stream = false; |
| |
| /* Initialize LogstreamResult and buffers for processing messages */ |
| LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); |
| initStringInfo(&reply_message); |
| initStringInfo(&incoming_message); |
| |
| /* Initialize the last recv timestamp */ |
| last_recv_timestamp = GetCurrentTimestamp(); |
| ping_sent = false; |
| |
| /* Loop until end-of-streaming or error */ |
| for (;;) |
| { |
| char *buf; |
| int len; |
| bool endofwal = false; |
| pgsocket wait_fd = PGINVALID_SOCKET; |
| int rc; |
| |
| /* |
| * Exit walreceiver if we're not in recovery. This should not |
| * happen, but cross-check the status here. |
| */ |
| if (!RecoveryInProgress()) |
| ereport(FATAL, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("cannot continue WAL streaming, recovery has already ended"))); |
| |
| /* Process any requests or signals received recently */ |
| ProcessWalRcvInterrupts(); |
| |
| if (ConfigReloadPending) |
| { |
| ConfigReloadPending = false; |
| ProcessConfigFile(PGC_SIGHUP); |
| XLogWalRcvSendHSFeedback(true); |
| } |
| |
| /* See if we can read data immediately */ |
| len = walrcv_receive(wrconn, &buf, &wait_fd); |
| if (len != 0) |
| { |
| /* |
| * Process the received data, and any subsequent data we |
| * can read without blocking. |
| */ |
| for (;;) |
| { |
| if (len > 0) |
| { |
| /* |
| * Something was received from primary, so reset |
| * timeout |
| */ |
| last_recv_timestamp = GetCurrentTimestamp(); |
| ping_sent = false; |
| XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1); |
| } |
| else if (len == 0) |
| break; |
| else if (len < 0) |
| { |
| ereport(LOG, |
| (errmsg("replication terminated by primary server"), |
| errdetail("End of WAL reached on timeline %u at %X/%X.", |
| startpointTLI, |
| LSN_FORMAT_ARGS(LogstreamResult.Write)))); |
| endofwal = true; |
| break; |
| } |
| len = walrcv_receive(wrconn, &buf, &wait_fd); |
| } |
| |
| /* Let the primary know that we received some data. */ |
| XLogWalRcvSendReply(false, false); |
| |
| /* |
| * If we've written some records, flush them to disk and |
| * let the startup process and primary server know about |
| * them. |
| */ |
| XLogWalRcvFlush(false); |
| } |
| |
| /* Check if we need to exit the streaming loop. */ |
| if (endofwal) |
| break; |
| |
| /* |
| * Ideally we would reuse a WaitEventSet object repeatedly |
| * here to avoid the overheads of WaitLatchOrSocket on epoll |
| * systems, but we can't be sure that libpq (or any other |
| * walreceiver implementation) has the same socket (even if |
| * the fd is the same number, it may have been closed and |
| * reopened since the last time). In future, if there is a |
| * function for removing sockets from WaitEventSet, then we |
| * could add and remove just the socket each time, potentially |
| * avoiding some system calls. |
| */ |
| Assert(wait_fd != PGINVALID_SOCKET); |
| rc = WaitLatchOrSocket(MyLatch, |
| WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | |
| WL_TIMEOUT | WL_LATCH_SET, |
| wait_fd, |
| NAPTIME_PER_CYCLE, |
| WAIT_EVENT_WAL_RECEIVER_MAIN); |
| if (rc & WL_LATCH_SET) |
| { |
| ResetLatch(MyLatch); |
| ProcessWalRcvInterrupts(); |
| |
| if (walrcv->force_reply) |
| { |
| /* |
| * The recovery process has asked us to send apply |
| * feedback now. Make sure the flag is really set to |
| * false in shared memory before sending the reply, so |
| * we don't miss a new request for a reply. |
| */ |
| walrcv->force_reply = false; |
| pg_memory_barrier(); |
| XLogWalRcvSendReply(true, false); |
| } |
| } |
| if (rc & WL_TIMEOUT) |
| { |
| /* |
| * We didn't receive anything new. If we haven't heard |
| * anything from the server for more than |
| * wal_receiver_timeout / 2, ping the server. Also, if |
| * it's been longer than wal_receiver_status_interval |
| * since the last update we sent, send a status update to |
| * the primary anyway, to report any progress in applying |
| * WAL. |
| */ |
| bool requestReply = false; |
| |
| /* |
| * Check if time since last receive from primary has |
| * reached the configured limit. |
| */ |
| if (wal_receiver_timeout > 0) |
| { |
| TimestampTz now = GetCurrentTimestamp(); |
| TimestampTz timeout; |
| |
| timeout = |
| TimestampTzPlusMilliseconds(last_recv_timestamp, |
| wal_receiver_timeout); |
| |
| if (now >= timeout) |
| ereport(ERROR, |
| (errcode(ERRCODE_CONNECTION_FAILURE), |
| errmsg("terminating walreceiver due to timeout"))); |
| |
| /* |
| * We didn't receive anything new, for half of |
| * receiver replication timeout. Ping the server. |
| */ |
| if (!ping_sent) |
| { |
| timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, |
| (wal_receiver_timeout / 2)); |
| if (now >= timeout) |
| { |
| requestReply = true; |
| ping_sent = true; |
| } |
| } |
| } |
| |
| XLogWalRcvSendReply(requestReply, requestReply); |
| XLogWalRcvSendHSFeedback(false); |
| } |
| } |
| |
| /* |
| * The backend finished streaming. Exit streaming COPY-mode from |
| * our side, too. |
| */ |
| walrcv_endstreaming(wrconn, &primaryTLI); |
| |
| /* |
| * If the server had switched to a new timeline that we didn't |
| * know about when we began streaming, fetch its timeline history |
| * file now. |
| */ |
| WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); |
| } |
| else |
| ereport(LOG, |
| (errmsg("primary server contains no more WAL on requested timeline %u", |
| startpointTLI))); |
| |
| /* |
| * End of WAL reached on the requested timeline. Close the last |
| * segment, and await for new orders from the startup process. |
| */ |
| if (recvFile >= 0) |
| { |
| char xlogfname[MAXFNAMELEN]; |
| |
| XLogWalRcvFlush(false); |
| XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); |
| if (close(recvFile) != 0) |
| ereport(PANIC, |
| (errcode_for_file_access(), |
| errmsg("could not close log segment %s: %m", |
| xlogfname))); |
| |
| /* |
| * Create .done file forcibly to prevent the streamed segment from |
| * being archived later. |
| */ |
| if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) |
| XLogArchiveForceDone(xlogfname); |
| else |
| XLogArchiveNotify(xlogfname); |
| } |
| recvFile = -1; |
| |
| elog(DEBUG1, "walreceiver ended streaming and awaits new instructions"); |
| WalRcvWaitForStartPosition(&startpoint, &startpointTLI); |
| } |
| /* not reached */ |
| } |
| |
| /* |
| * Wait for startup process to set receiveStart and receiveStartTLI. |
| */ |
| static void |
| WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) |
| { |
| WalRcvData *walrcv = WalRcv; |
| int state; |
| |
| SpinLockAcquire(&walrcv->mutex); |
| state = walrcv->walRcvState; |
| if (state != WALRCV_STREAMING) |
| { |
| SpinLockRelease(&walrcv->mutex); |
| if (state == WALRCV_STOPPING) |
| proc_exit(0); |
| else |
| elog(FATAL, "unexpected walreceiver state"); |
| } |
| walrcv->walRcvState = WALRCV_WAITING; |
| walrcv->receiveStart = InvalidXLogRecPtr; |
| walrcv->receiveStartTLI = 0; |
| SpinLockRelease(&walrcv->mutex); |
| |
| set_ps_display("idle"); |
| |
| /* |
| * nudge startup process to notice that we've stopped streaming and are |
| * now waiting for instructions. |
| */ |
| WakeupRecovery(); |
| for (;;) |
| { |
| ResetLatch(MyLatch); |
| |
| ProcessWalRcvInterrupts(); |
| |
| SpinLockAcquire(&walrcv->mutex); |
| Assert(walrcv->walRcvState == WALRCV_RESTARTING || |
| walrcv->walRcvState == WALRCV_WAITING || |
| walrcv->walRcvState == WALRCV_STOPPING); |
| if (walrcv->walRcvState == WALRCV_RESTARTING) |
| { |
| /* |
| * No need to handle changes in primary_conninfo or |
| * primary_slotname here. Startup process will signal us to |
| * terminate in case those change. |
| */ |
| *startpoint = walrcv->receiveStart; |
| *startpointTLI = walrcv->receiveStartTLI; |
| walrcv->walRcvState = WALRCV_STREAMING; |
| SpinLockRelease(&walrcv->mutex); |
| break; |
| } |
| if (walrcv->walRcvState == WALRCV_STOPPING) |
| { |
| /* |
| * We should've received SIGTERM if the startup process wants us |
| * to die, but might as well check it here too. |
| */ |
| SpinLockRelease(&walrcv->mutex); |
| exit(1); |
| } |
| SpinLockRelease(&walrcv->mutex); |
| |
| (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, |
| WAIT_EVENT_WAL_RECEIVER_WAIT_START); |
| } |
| |
| if (update_process_title) |
| { |
| char activitymsg[50]; |
| |
| snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X", |
| LSN_FORMAT_ARGS(*startpoint)); |
| set_ps_display(activitymsg); |
| } |
| } |
| |
| /* |
| * Fetch any missing timeline history files between 'first' and 'last' |
| * (inclusive) from the server. |
| */ |
| static void |
| WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) |
| { |
| TimeLineID tli; |
| |
| for (tli = first; tli <= last; tli++) |
| { |
| /* there's no history file for timeline 1 */ |
| if (tli != 1 && !existsTimeLineHistory(tli)) |
| { |
| char *fname; |
| char *content; |
| int len; |
| char expectedfname[MAXFNAMELEN]; |
| |
| ereport(LOG, |
| (errmsg("fetching timeline history file for timeline %u from primary server", |
| tli))); |
| |
| walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len); |
| |
| /* |
| * Check that the filename on the primary matches what we |
| * calculated ourselves. This is just a sanity check, it should |
| * always match. |
| */ |
| TLHistoryFileName(expectedfname, tli); |
| if (strcmp(fname, expectedfname) != 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u", |
| tli))); |
| |
| /* |
| * Write the file to pg_wal. |
| */ |
| writeTimeLineHistoryFile(tli, content, len); |
| |
| /* |
| * Mark the streamed history file as ready for archiving if |
| * archive_mode is always. |
| */ |
| if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) |
| XLogArchiveForceDone(fname); |
| else |
| XLogArchiveNotify(fname); |
| |
| pfree(fname); |
| pfree(content); |
| } |
| } |
| } |
| |
| /* |
| * Mark us as STOPPED in shared memory at exit. |
| */ |
| static void |
| WalRcvDie(int code, Datum arg) |
| { |
| WalRcvData *walrcv = WalRcv; |
| |
| /* Ensure that all WAL records received are flushed to disk */ |
| XLogWalRcvFlush(true); |
| |
| /* Mark ourselves inactive in shared memory */ |
| SpinLockAcquire(&walrcv->mutex); |
| Assert(walrcv->walRcvState == WALRCV_STREAMING || |
| walrcv->walRcvState == WALRCV_RESTARTING || |
| walrcv->walRcvState == WALRCV_STARTING || |
| walrcv->walRcvState == WALRCV_WAITING || |
| walrcv->walRcvState == WALRCV_STOPPING); |
| Assert(walrcv->pid == MyProcPid); |
| walrcv->walRcvState = WALRCV_STOPPED; |
| walrcv->pid = 0; |
| walrcv->ready_to_display = false; |
| walrcv->latch = NULL; |
| SpinLockRelease(&walrcv->mutex); |
| |
| ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); |
| |
| /* Terminate the connection gracefully. */ |
| if (wrconn != NULL) |
| walrcv_disconnect(wrconn); |
| |
| /* Wake up the startup process to notice promptly that we're gone */ |
| WakeupRecovery(); |
| } |
| |
| /* |
| * Accept the message from XLOG stream, and process it. |
| */ |
| static void |
| XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) |
| { |
| int hdrlen; |
| XLogRecPtr dataStart; |
| XLogRecPtr walEnd; |
| TimestampTz sendTime; |
| bool replyRequested; |
| |
| resetStringInfo(&incoming_message); |
| |
| switch (type) |
| { |
| case 'w': /* WAL records */ |
| { |
| /* copy message to StringInfo */ |
| hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); |
| if (len < hdrlen) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("invalid WAL message received from primary"))); |
| appendBinaryStringInfo(&incoming_message, buf, hdrlen); |
| |
| /* read the fields */ |
| dataStart = pq_getmsgint64(&incoming_message); |
| walEnd = pq_getmsgint64(&incoming_message); |
| sendTime = pq_getmsgint64(&incoming_message); |
| ProcessWalSndrMessage(walEnd, sendTime); |
| |
| buf += hdrlen; |
| len -= hdrlen; |
| XLogWalRcvWrite(buf, len, dataStart); |
| break; |
| } |
| case 'k': /* Keepalive */ |
| { |
| /* copy message to StringInfo */ |
| hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); |
| if (len != hdrlen) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("invalid keepalive message received from primary"))); |
| appendBinaryStringInfo(&incoming_message, buf, hdrlen); |
| |
| /* read the fields */ |
| walEnd = pq_getmsgint64(&incoming_message); |
| sendTime = pq_getmsgint64(&incoming_message); |
| replyRequested = pq_getmsgbyte(&incoming_message); |
| |
| ProcessWalSndrMessage(walEnd, sendTime); |
| |
| /* If the primary requested a reply, send one immediately */ |
| if (replyRequested) |
| XLogWalRcvSendReply(true, false); |
| break; |
| } |
| default: |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("invalid replication message type %d", |
| type))); |
| } |
| } |
| |
| /* |
| * Write XLOG data to disk. |
| */ |
| static void |
| XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) |
| { |
| int startoff; |
| int byteswritten; |
| |
| while (nbytes > 0) |
| { |
| int segbytes; |
| char xlogfname[MAXFNAMELEN]; |
| |
| /* Close the current segment if it's completed */ |
| if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) |
| XLogWalRcvClose(recptr); |
| |
| if (recvFile < 0) |
| { |
| bool use_existent = true; |
| |
| /* Create/use new log file */ |
| XLByteToSeg(recptr, recvSegNo, wal_segment_size); |
| recvFile = XLogFileInit(recvSegNo, &use_existent, true); |
| recvFileTLI = ThisTimeLineID; |
| } |
| |
| /* Calculate the start offset of the received logs */ |
| startoff = XLogSegmentOffset(recptr, wal_segment_size); |
| |
| if (startoff + nbytes > wal_segment_size) |
| segbytes = wal_segment_size - startoff; |
| else |
| segbytes = nbytes; |
| |
| /* OK to write the logs */ |
| errno = 0; |
| |
| byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff); |
| if (byteswritten <= 0) |
| { |
| int save_errno; |
| |
| /* if write didn't set errno, assume no disk space */ |
| if (errno == 0) |
| errno = ENOSPC; |
| |
| save_errno = errno; |
| XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); |
| errno = save_errno; |
| ereport(PANIC, |
| (errcode_for_file_access(), |
| errmsg("could not write to log segment %s " |
| "at offset %u, length %lu: %m", |
| xlogfname, startoff, (unsigned long) segbytes))); |
| } |
| |
| /* Update state for write */ |
| recptr += byteswritten; |
| |
| nbytes -= byteswritten; |
| buf += byteswritten; |
| |
| LogstreamResult.Write = recptr; |
| |
| XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); |
| elogif(debug_walrepl_rcv, LOG, |
| "walrcv write -- Wrote %d bytes in file %s, offset %d." |
| "Latest write location is %X/%X.", |
| byteswritten, |
| xlogfname, |
| startoff, |
| (uint32) (LogstreamResult.Write >> 32), |
| (uint32) LogstreamResult.Write); |
| } |
| |
| /* Update shared-memory status */ |
| pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); |
| |
| /* |
| * Close the current segment if it's fully written up in the last cycle of |
| * the loop, to create its archive notification file soon. Otherwise WAL |
| * archiving of the segment will be delayed until any data in the next |
| * segment is received and written. |
| */ |
| if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) |
| XLogWalRcvClose(recptr); |
| } |
| |
| /* |
| * Flush the log to disk. |
| * |
| * If we're in the midst of dying, it's unwise to do anything that might throw |
| * an error, so we skip sending a reply in that case. |
| */ |
| static void |
| XLogWalRcvFlush(bool dying) |
| { |
| if (LogstreamResult.Flush < LogstreamResult.Write) |
| { |
| #ifdef FAULT_INJECTOR |
| /* Simulate the case that the standby / mirror is lagging behind. */ |
| if (SIMPLE_FAULT_INJECTOR("walrecv_skip_flush") == FaultInjectorTypeSkip) |
| return; |
| #endif |
| |
| WalRcvData *walrcv = WalRcv; |
| |
| issue_xlog_fsync(recvFile, recvSegNo); |
| |
| LogstreamResult.Flush = LogstreamResult.Write; |
| |
| /* Update shared-memory status */ |
| SpinLockAcquire(&walrcv->mutex); |
| if (walrcv->flushedUpto < LogstreamResult.Flush) |
| { |
| walrcv->latestChunkStart = walrcv->flushedUpto; |
| walrcv->flushedUpto = LogstreamResult.Flush; |
| walrcv->receivedTLI = ThisTimeLineID; |
| } |
| SpinLockRelease(&walrcv->mutex); |
| |
| /* Signal the startup process and walsender that new WAL has arrived */ |
| WakeupRecovery(); |
| if (AllowCascadeReplication()) |
| WalSndWakeup(); |
| |
| /* Report XLOG streaming progress in PS display */ |
| if (update_process_title) |
| { |
| char activitymsg[50]; |
| |
| snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", |
| LSN_FORMAT_ARGS(LogstreamResult.Write)); |
| set_ps_display(activitymsg); |
| } |
| |
| /* Also let the primary know that we made some progress */ |
| if (!dying) |
| { |
| XLogWalRcvSendReply(false, false); |
| XLogWalRcvSendHSFeedback(false); |
| } |
| } |
| } |
| |
| /* |
| * Close the current segment. |
| * |
| * Flush the segment to disk before closing it. Otherwise we have to |
| * reopen and fsync it later. |
| * |
| * Create an archive notification file since the segment is known completed. |
| */ |
| static void |
| XLogWalRcvClose(XLogRecPtr recptr) |
| { |
| char xlogfname[MAXFNAMELEN]; |
| |
| Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)); |
| |
| /* |
| * fsync() and close current file before we switch to next one. We would |
| * otherwise have to reopen this file to fsync it later |
| */ |
| XLogWalRcvFlush(false); |
| |
| XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); |
| |
| /* |
| * XLOG segment files will be re-read by recovery in startup process soon, |
| * so we don't advise the OS to release cache pages associated with the |
| * file like XLogFileClose() does. |
| */ |
| if (close(recvFile) != 0) |
| ereport(PANIC, |
| (errcode_for_file_access(), |
| errmsg("could not close log segment %s: %m", |
| xlogfname))); |
| |
| /* |
| * Create .done file forcibly to prevent the streamed segment from being |
| * archived later. |
| */ |
| if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) |
| XLogArchiveForceDone(xlogfname); |
| else |
| XLogArchiveNotify(xlogfname); |
| |
| recvFile = -1; |
| } |
| |
| /* |
| * Send reply message to primary, indicating our current WAL locations, oldest |
| * xmin and the current time. |
| * |
| * If 'force' is not set, the message is only sent if enough time has |
| * passed since last status update to reach wal_receiver_status_interval. |
| * If wal_receiver_status_interval is disabled altogether and 'force' is |
| * false, this is a no-op. |
| * |
| * If 'requestReply' is true, requests the server to reply immediately upon |
| * receiving this message. This is used for heartbeats, when approaching |
| * wal_receiver_timeout. |
| */ |
| static void |
| XLogWalRcvSendReply(bool force, bool requestReply) |
| { |
| static XLogRecPtr writePtr = 0; |
| static XLogRecPtr flushPtr = 0; |
| XLogRecPtr applyPtr; |
| static TimestampTz sendTime = 0; |
| TimestampTz now; |
| |
| /* |
| * If the user doesn't want status to be reported to the primary, be sure |
| * to exit before doing anything at all. |
| */ |
| if (!force && wal_receiver_status_interval <= 0) |
| return; |
| |
| /* Get current timestamp. */ |
| now = GetCurrentTimestamp(); |
| |
| /* |
| * We can compare the write and flush positions to the last message we |
| * sent without taking any lock, but the apply position requires a spin |
| * lock, so we don't check that unless something else has changed or 10 |
| * seconds have passed. This means that the apply WAL location will |
| * appear, from the primary's point of view, to lag slightly, but since |
| * this is only for reporting purposes and only on idle systems, that's |
| * probably OK. |
| */ |
| if (!force |
| && writePtr == LogstreamResult.Write |
| && flushPtr == LogstreamResult.Flush |
| && !TimestampDifferenceExceeds(sendTime, now, |
| wal_receiver_status_interval * 1000)) |
| return; |
| sendTime = now; |
| |
| /* Construct a new message */ |
| writePtr = LogstreamResult.Write; |
| flushPtr = LogstreamResult.Flush; |
| applyPtr = GetXLogReplayRecPtr(NULL); |
| |
| resetStringInfo(&reply_message); |
| pq_sendbyte(&reply_message, 'r'); |
| pq_sendint64(&reply_message, writePtr); |
| pq_sendint64(&reply_message, flushPtr); |
| pq_sendint64(&reply_message, applyPtr); |
| pq_sendint64(&reply_message, GetCurrentTimestamp()); |
| pq_sendbyte(&reply_message, requestReply ? 1 : 0); |
| |
| /* Send it */ |
| elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s", |
| LSN_FORMAT_ARGS(writePtr), |
| LSN_FORMAT_ARGS(flushPtr), |
| LSN_FORMAT_ARGS(applyPtr), |
| requestReply ? " (reply requested)" : ""); |
| |
| walrcv_send(wrconn, reply_message.data, reply_message.len); |
| } |
| |
| /* |
| * Send hot standby feedback message to primary, plus the current time, |
| * in case they don't have a watch. |
| * |
| * If the user disables feedback, send one final message to tell sender |
| * to forget about the xmin on this standby. We also send this message |
| * on first connect because a previous connection might have set xmin |
| * on a replication slot. (If we're not using a slot it's harmless to |
| * send a feedback message explicitly setting InvalidTransactionId). |
| */ |
| static void |
| XLogWalRcvSendHSFeedback(bool immed) |
| { |
| TimestampTz now; |
| FullTransactionId nextFullXid; |
| TransactionId nextXid; |
| uint32 xmin_epoch, |
| catalog_xmin_epoch; |
| TransactionId xmin, |
| catalog_xmin; |
| static TimestampTz sendTime = 0; |
| |
| /* initially true so we always send at least one feedback message */ |
| static bool primary_has_standby_xmin = true; |
| |
| /* |
| * If the user doesn't want status to be reported to the primary, be sure |
| * to exit before doing anything at all. |
| */ |
| if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) && |
| !primary_has_standby_xmin) |
| return; |
| |
| /* Get current timestamp. */ |
| now = GetCurrentTimestamp(); |
| |
| if (!immed) |
| { |
| /* |
| * Send feedback at most once per wal_receiver_status_interval. |
| */ |
| if (!TimestampDifferenceExceeds(sendTime, now, |
| wal_receiver_status_interval * 1000)) |
| return; |
| sendTime = now; |
| } |
| |
| /* |
| * If Hot Standby is not yet accepting connections there is nothing to |
| * send. Check this after the interval has expired to reduce number of |
| * calls. |
| * |
| * Bailing out here also ensures that we don't send feedback until we've |
| * read our own replication slot state, so we don't tell the primary to |
| * discard needed xmin or catalog_xmin from any slots that may exist on |
| * this replica. |
| */ |
| if (!HotStandbyActive()) |
| return; |
| |
| /* |
| * Make the expensive call to get the oldest xmin once we are certain |
| * everything else has been checked. |
| */ |
| if (hot_standby_feedback) |
| { |
| GetReplicationHorizons(&xmin, &catalog_xmin); |
| } |
| else |
| { |
| xmin = InvalidTransactionId; |
| catalog_xmin = InvalidTransactionId; |
| } |
| |
| /* |
| * Get epoch and adjust if nextXid and oldestXmin are different sides of |
| * the epoch boundary. |
| */ |
| nextFullXid = ReadNextFullTransactionId(); |
| nextXid = XidFromFullTransactionId(nextFullXid); |
| xmin_epoch = EpochFromFullTransactionId(nextFullXid); |
| catalog_xmin_epoch = xmin_epoch; |
| if (nextXid < xmin) |
| xmin_epoch--; |
| if (nextXid < catalog_xmin) |
| catalog_xmin_epoch--; |
| |
| elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u", |
| xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch); |
| |
| /* Construct the message and send it. */ |
| resetStringInfo(&reply_message); |
| pq_sendbyte(&reply_message, 'h'); |
| pq_sendint64(&reply_message, GetCurrentTimestamp()); |
| pq_sendint32(&reply_message, xmin); |
| pq_sendint32(&reply_message, xmin_epoch); |
| pq_sendint32(&reply_message, catalog_xmin); |
| pq_sendint32(&reply_message, catalog_xmin_epoch); |
| walrcv_send(wrconn, reply_message.data, reply_message.len); |
| if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin)) |
| primary_has_standby_xmin = true; |
| else |
| primary_has_standby_xmin = false; |
| } |
| |
| /* |
| * Update shared memory status upon receiving a message from primary. |
| * |
| * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest |
| * message, reported by primary. |
| */ |
| static void |
| ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) |
| { |
| WalRcvData *walrcv = WalRcv; |
| |
| TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); |
| |
| /* Update shared-memory status */ |
| SpinLockAcquire(&walrcv->mutex); |
| if (walrcv->latestWalEnd < walEnd) |
| walrcv->latestWalEndTime = sendTime; |
| walrcv->latestWalEnd = walEnd; |
| walrcv->lastMsgSendTime = sendTime; |
| walrcv->lastMsgReceiptTime = lastMsgReceiptTime; |
| SpinLockRelease(&walrcv->mutex); |
| |
| if (message_level_is_interesting(DEBUG2)) |
| { |
| char *sendtime; |
| char *receipttime; |
| int applyDelay; |
| |
| /* Copy because timestamptz_to_str returns a static buffer */ |
| sendtime = pstrdup(timestamptz_to_str(sendTime)); |
| receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime)); |
| applyDelay = GetReplicationApplyDelay(); |
| |
| /* apply delay is not available */ |
| if (applyDelay == -1) |
| elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms", |
| sendtime, |
| receipttime, |
| GetReplicationTransferLatency()); |
| else |
| elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms", |
| sendtime, |
| receipttime, |
| applyDelay, |
| GetReplicationTransferLatency()); |
| |
| pfree(sendtime); |
| pfree(receipttime); |
| } |
| } |
| |
| /* |
| * Wake up the walreceiver main loop. |
| * |
| * This is called by the startup process whenever interesting xlog records |
| * are applied, so that walreceiver can check if it needs to send an apply |
| * notification back to the primary which may be waiting in a COMMIT with |
| * synchronous_commit = remote_apply. |
| */ |
| void |
| WalRcvForceReply(void) |
| { |
| Latch *latch; |
| |
| WalRcv->force_reply = true; |
| /* fetching the latch pointer might not be atomic, so use spinlock */ |
| SpinLockAcquire(&WalRcv->mutex); |
| latch = WalRcv->latch; |
| SpinLockRelease(&WalRcv->mutex); |
| if (latch) |
| SetLatch(latch); |
| } |
| |
| /* |
| * Return a string constant representing the state. This is used |
| * in system functions and views, and should *not* be translated. |
| */ |
| const char * |
| WalRcvGetStateString(WalRcvState state) |
| { |
| switch (state) |
| { |
| case WALRCV_STOPPED: |
| return "stopped"; |
| case WALRCV_STARTING: |
| return "starting"; |
| case WALRCV_STREAMING: |
| return "streaming"; |
| case WALRCV_WAITING: |
| return "waiting"; |
| case WALRCV_RESTARTING: |
| return "restarting"; |
| case WALRCV_STOPPING: |
| return "stopping"; |
| } |
| return "UNKNOWN"; |
| } |
| |
| /* |
| * Returns activity of WAL receiver, including pid, state and xlog locations |
| * received from the WAL sender of another server. |
| */ |
| Datum |
| pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) |
| { |
| TupleDesc tupdesc; |
| Datum *values; |
| bool *nulls; |
| int pid; |
| bool ready_to_display; |
| WalRcvState state; |
| XLogRecPtr receive_start_lsn; |
| TimeLineID receive_start_tli; |
| XLogRecPtr written_lsn; |
| XLogRecPtr flushed_lsn; |
| TimeLineID received_tli; |
| TimestampTz last_send_time; |
| TimestampTz last_receipt_time; |
| XLogRecPtr latest_end_lsn; |
| TimestampTz latest_end_time; |
| char sender_host[NI_MAXHOST]; |
| int sender_port = 0; |
| char slotname[NAMEDATALEN]; |
| char conninfo[MAXCONNINFO]; |
| |
| /* Take a lock to ensure value consistency */ |
| SpinLockAcquire(&WalRcv->mutex); |
| pid = (int) WalRcv->pid; |
| ready_to_display = WalRcv->ready_to_display; |
| state = WalRcv->walRcvState; |
| receive_start_lsn = WalRcv->receiveStart; |
| receive_start_tli = WalRcv->receiveStartTLI; |
| flushed_lsn = WalRcv->flushedUpto; |
| received_tli = WalRcv->receivedTLI; |
| last_send_time = WalRcv->lastMsgSendTime; |
| last_receipt_time = WalRcv->lastMsgReceiptTime; |
| latest_end_lsn = WalRcv->latestWalEnd; |
| latest_end_time = WalRcv->latestWalEndTime; |
| strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname)); |
| strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host)); |
| sender_port = WalRcv->sender_port; |
| strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo)); |
| SpinLockRelease(&WalRcv->mutex); |
| |
| /* |
| * No WAL receiver (or not ready yet), just return a tuple with NULL |
| * values |
| */ |
| if (pid == 0 || !ready_to_display) |
| PG_RETURN_NULL(); |
| |
| /* |
| * Read "writtenUpto" without holding a spinlock. Note that it may not be |
| * consistent with the other shared variables of the WAL receiver |
| * protected by a spinlock, but this should not be used for data integrity |
| * checks. |
| */ |
| written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto); |
| |
| /* determine result type */ |
| if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
| elog(ERROR, "return type must be a row type"); |
| |
| values = palloc0(sizeof(Datum) * tupdesc->natts); |
| nulls = palloc0(sizeof(bool) * tupdesc->natts); |
| |
| /* Fetch values */ |
| values[0] = Int32GetDatum(pid); |
| |
| if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS)) |
| { |
| /* |
| * Only superusers and members of pg_read_all_stats can see details. |
| * Other users only get the pid value to know whether it is a WAL |
| * receiver, but no details. |
| */ |
| MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1)); |
| } |
| else |
| { |
| values[1] = CStringGetTextDatum(WalRcvGetStateString(state)); |
| |
| if (XLogRecPtrIsInvalid(receive_start_lsn)) |
| nulls[2] = true; |
| else |
| values[2] = LSNGetDatum(receive_start_lsn); |
| values[3] = Int32GetDatum(receive_start_tli); |
| if (XLogRecPtrIsInvalid(written_lsn)) |
| nulls[4] = true; |
| else |
| values[4] = LSNGetDatum(written_lsn); |
| if (XLogRecPtrIsInvalid(flushed_lsn)) |
| nulls[5] = true; |
| else |
| values[5] = LSNGetDatum(flushed_lsn); |
| values[6] = Int32GetDatum(received_tli); |
| if (last_send_time == 0) |
| nulls[7] = true; |
| else |
| values[7] = TimestampTzGetDatum(last_send_time); |
| if (last_receipt_time == 0) |
| nulls[8] = true; |
| else |
| values[8] = TimestampTzGetDatum(last_receipt_time); |
| if (XLogRecPtrIsInvalid(latest_end_lsn)) |
| nulls[9] = true; |
| else |
| values[9] = LSNGetDatum(latest_end_lsn); |
| if (latest_end_time == 0) |
| nulls[10] = true; |
| else |
| values[10] = TimestampTzGetDatum(latest_end_time); |
| if (*slotname == '\0') |
| nulls[11] = true; |
| else |
| values[11] = CStringGetTextDatum(slotname); |
| if (*sender_host == '\0') |
| nulls[12] = true; |
| else |
| values[12] = CStringGetTextDatum(sender_host); |
| if (sender_port == 0) |
| nulls[13] = true; |
| else |
| values[13] = Int32GetDatum(sender_port); |
| if (*conninfo == '\0') |
| nulls[14] = true; |
| else |
| values[14] = CStringGetTextDatum(conninfo); |
| } |
| |
| /* Returns the record as Datum */ |
| PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); |
| } |