| /*------------------------------------------------------------------------- |
| * launcher.c |
| * PostgreSQL logical replication worker launcher process |
| * |
| * Copyright (c) 2016-2021, PostgreSQL Global Development Group |
| * |
| * IDENTIFICATION |
| * src/backend/replication/logical/launcher.c |
| * |
| * NOTES |
| * This module contains the logical replication worker launcher which |
| * uses the background worker infrastructure to start the logical |
| * replication workers for every enabled subscription. |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include "access/heapam.h" |
| #include "access/htup.h" |
| #include "access/htup_details.h" |
| #include "access/tableam.h" |
| #include "access/xact.h" |
| #include "catalog/pg_subscription.h" |
| #include "catalog/pg_subscription_rel.h" |
| #include "funcapi.h" |
| #include "libpq/pqsignal.h" |
| #include "miscadmin.h" |
| #include "pgstat.h" |
| #include "postmaster/bgworker.h" |
| #include "postmaster/fork_process.h" |
| #include "postmaster/interrupt.h" |
| #include "postmaster/postmaster.h" |
| #include "replication/logicallauncher.h" |
| #include "replication/logicalworker.h" |
| #include "replication/slot.h" |
| #include "replication/walreceiver.h" |
| #include "replication/worker_internal.h" |
| #include "storage/ipc.h" |
| #include "storage/proc.h" |
| #include "storage/procarray.h" |
| #include "storage/procsignal.h" |
| #include "tcop/tcopprot.h" |
| #include "utils/memutils.h" |
| #include "utils/pg_lsn.h" |
| #include "utils/ps_status.h" |
| #include "utils/snapmgr.h" |
| #include "utils/timeout.h" |
| |
| /* max sleep time between cycles (3min) */ |
| #define DEFAULT_NAPTIME_PER_CYCLE 180000L |
| |
| int max_logical_replication_workers = 4; |
| int max_sync_workers_per_subscription = 2; |
| |
| LogicalRepWorker *MyLogicalRepWorker = NULL; |
| |
| typedef struct LogicalRepCtxStruct |
| { |
| /* Supervisor process. */ |
| pid_t launcher_pid; |
| |
| /* Background workers. */ |
| LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; |
| } LogicalRepCtxStruct; |
| |
| LogicalRepCtxStruct *LogicalRepCtx; |
| |
| static void ApplyLauncherWakeup(void); |
| static void logicalrep_launcher_onexit(int code, Datum arg); |
| static void logicalrep_worker_onexit(int code, Datum arg); |
| static void logicalrep_worker_detach(void); |
| static void logicalrep_worker_cleanup(LogicalRepWorker *worker); |
| |
| static bool on_commit_launcher_wakeup = false; |
| |
| Datum pg_stat_get_subscription(PG_FUNCTION_ARGS); |
| |
| |
| /* |
| * Load the list of subscriptions. |
| * |
| * Only the fields interesting for worker start/stop functions are filled for |
| * each subscription. |
| */ |
| static List * |
| get_subscription_list(void) |
| { |
| List *res = NIL; |
| Relation rel; |
| TableScanDesc scan; |
| HeapTuple tup; |
| MemoryContext resultcxt; |
| |
| /* This is the context that we will allocate our output data in */ |
| resultcxt = CurrentMemoryContext; |
| |
| /* |
| * Start a transaction so we can access pg_database, and get a snapshot. |
| * We don't have a use for the snapshot itself, but we're interested in |
| * the secondary effect that it sets RecentGlobalXmin. (This is critical |
| * for anything that reads heap pages, because HOT may decide to prune |
| * them even if the process doesn't attempt to modify any tuples.) |
| * |
| * FIXME: This comment is inaccurate / the code buggy. A snapshot that is |
| * not pushed/active does not reliably prevent HOT pruning (->xmin could |
| * e.g. be cleared when cache invalidations are processed). |
| */ |
| StartTransactionCommand(); |
| (void) GetTransactionSnapshot(); |
| |
| rel = table_open(SubscriptionRelationId, AccessShareLock); |
| scan = table_beginscan_catalog(rel, 0, NULL); |
| |
| while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) |
| { |
| Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup); |
| Subscription *sub; |
| MemoryContext oldcxt; |
| |
| /* |
| * Allocate our results in the caller's context, not the |
| * transaction's. We do this inside the loop, and restore the original |
| * context at the end, so that leaky things like heap_getnext() are |
| * not called in a potentially long-lived context. |
| */ |
| oldcxt = MemoryContextSwitchTo(resultcxt); |
| |
| sub = (Subscription *) palloc0(sizeof(Subscription)); |
| sub->oid = subform->oid; |
| sub->dbid = subform->subdbid; |
| sub->owner = subform->subowner; |
| sub->enabled = subform->subenabled; |
| sub->name = pstrdup(NameStr(subform->subname)); |
| /* We don't fill fields we are not interested in. */ |
| |
| res = lappend(res, sub); |
| MemoryContextSwitchTo(oldcxt); |
| } |
| |
| table_endscan(scan); |
| table_close(rel, AccessShareLock); |
| |
| CommitTransactionCommand(); |
| |
| return res; |
| } |
| |
| /* |
| * Wait for a background worker to start up and attach to the shmem context. |
| * |
| * This is only needed for cleaning up the shared memory in case the worker |
| * fails to attach. |
| */ |
| static void |
| WaitForReplicationWorkerAttach(LogicalRepWorker *worker, |
| uint16 generation, |
| BackgroundWorkerHandle *handle) |
| { |
| BgwHandleStatus status; |
| int rc; |
| |
| for (;;) |
| { |
| pid_t pid; |
| |
| CHECK_FOR_INTERRUPTS(); |
| |
| LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
| |
| /* Worker either died or has started; no need to do anything. */ |
| if (!worker->in_use || worker->proc) |
| { |
| LWLockRelease(LogicalRepWorkerLock); |
| return; |
| } |
| |
| LWLockRelease(LogicalRepWorkerLock); |
| |
| /* Check if worker has died before attaching, and clean up after it. */ |
| status = GetBackgroundWorkerPid(handle, &pid); |
| |
| if (status == BGWH_STOPPED) |
| { |
| LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); |
| /* Ensure that this was indeed the worker we waited for. */ |
| if (generation == worker->generation) |
| logicalrep_worker_cleanup(worker); |
| LWLockRelease(LogicalRepWorkerLock); |
| return; |
| } |
| |
| /* |
| * We need timeout because we generally don't get notified via latch |
| * about the worker attach. But we don't expect to have to wait long. |
| */ |
| rc = WaitLatch(MyLatch, |
| WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
| 10L, WAIT_EVENT_BGWORKER_STARTUP); |
| |
| if (rc & WL_LATCH_SET) |
| { |
| ResetLatch(MyLatch); |
| CHECK_FOR_INTERRUPTS(); |
| } |
| } |
| } |
| |
| /* |
| * Walks the workers array and searches for one that matches given |
| * subscription id and relid. |
| */ |
| LogicalRepWorker * |
| logicalrep_worker_find(Oid subid, Oid relid, bool only_running) |
| { |
| int i; |
| LogicalRepWorker *res = NULL; |
| |
| Assert(LWLockHeldByMe(LogicalRepWorkerLock)); |
| |
| /* Search for attached worker for a given subscription id. */ |
| for (i = 0; i < max_logical_replication_workers; i++) |
| { |
| LogicalRepWorker *w = &LogicalRepCtx->workers[i]; |
| |
| if (w->in_use && w->subid == subid && w->relid == relid && |
| (!only_running || w->proc)) |
| { |
| res = w; |
| break; |
| } |
| } |
| |
| return res; |
| } |
| |
| /* |
| * Similar to logicalrep_worker_find(), but returns list of all workers for |
| * the subscription, instead just one. |
| */ |
| List * |
| logicalrep_workers_find(Oid subid, bool only_running) |
| { |
| int i; |
| List *res = NIL; |
| |
| Assert(LWLockHeldByMe(LogicalRepWorkerLock)); |
| |
| /* Search for attached worker for a given subscription id. */ |
| for (i = 0; i < max_logical_replication_workers; i++) |
| { |
| LogicalRepWorker *w = &LogicalRepCtx->workers[i]; |
| |
| if (w->in_use && w->subid == subid && (!only_running || w->proc)) |
| res = lappend(res, w); |
| } |
| |
| return res; |
| } |
| |
| /* |
| * Start new apply background worker, if possible. |
| */ |
| void |
| logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, |
| Oid relid) |
| { |
| BackgroundWorker bgw; |
| BackgroundWorkerHandle *bgw_handle; |
| uint16 generation; |
| int i; |
| int slot = 0; |
| LogicalRepWorker *worker = NULL; |
| int nsyncworkers; |
| TimestampTz now; |
| |
| ereport(DEBUG1, |
| (errmsg_internal("starting logical replication worker for subscription \"%s\"", |
| subname))); |
| |
| /* Report this after the initial starting message for consistency. */ |
| if (max_replication_slots == 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
| errmsg("cannot start logical replication workers when max_replication_slots = 0"))); |
| |
| /* |
| * We need to do the modification of the shared memory under lock so that |
| * we have consistent view. |
| */ |
| LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); |
| |
| retry: |
| /* Find unused worker slot. */ |
| for (i = 0; i < max_logical_replication_workers; i++) |
| { |
| LogicalRepWorker *w = &LogicalRepCtx->workers[i]; |
| |
| if (!w->in_use) |
| { |
| worker = w; |
| slot = i; |
| break; |
| } |
| } |
| |
| nsyncworkers = logicalrep_sync_worker_count(subid); |
| |
| now = GetCurrentTimestamp(); |
| |
| /* |
| * If we didn't find a free slot, try to do garbage collection. The |
| * reason we do this is because if some worker failed to start up and its |
| * parent has crashed while waiting, the in_use state was never cleared. |
| */ |
| if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) |
| { |
| bool did_cleanup = false; |
| |
| for (i = 0; i < max_logical_replication_workers; i++) |
| { |
| LogicalRepWorker *w = &LogicalRepCtx->workers[i]; |
| |
| /* |
| * If the worker was marked in use but didn't manage to attach in |
| * time, clean it up. |
| */ |
| if (w->in_use && !w->proc && |
| TimestampDifferenceExceeds(w->launch_time, now, |
| wal_receiver_timeout)) |
| { |
| elog(WARNING, |
| "logical replication worker for subscription %u took too long to start; canceled", |
| w->subid); |
| |
| logicalrep_worker_cleanup(w); |
| did_cleanup = true; |
| } |
| } |
| |
| if (did_cleanup) |
| goto retry; |
| } |
| |
| /* |
| * We don't allow to invoke more sync workers once we have reached the sync |
| * worker limit per subscription. So, just return silently as we might get |
| * here because of an otherwise harmless race condition. |
| */ |
| if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription) |
| { |
| LWLockRelease(LogicalRepWorkerLock); |
| return; |
| } |
| |
| /* |
| * However if there are no more free worker slots, inform user about it |
| * before exiting. |
| */ |
| if (worker == NULL) |
| { |
| LWLockRelease(LogicalRepWorkerLock); |
| ereport(WARNING, |
| (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
| errmsg("out of logical replication worker slots"), |
| errhint("You might need to increase max_logical_replication_workers."))); |
| return; |
| } |
| |
| /* Prepare the worker slot. */ |
| worker->launch_time = now; |
| worker->in_use = true; |
| worker->generation++; |
| worker->proc = NULL; |
| worker->dbid = dbid; |
| worker->userid = userid; |
| worker->subid = subid; |
| worker->relid = relid; |
| worker->relstate = SUBREL_STATE_UNKNOWN; |
| worker->relstate_lsn = InvalidXLogRecPtr; |
| worker->last_lsn = InvalidXLogRecPtr; |
| TIMESTAMP_NOBEGIN(worker->last_send_time); |
| TIMESTAMP_NOBEGIN(worker->last_recv_time); |
| worker->reply_lsn = InvalidXLogRecPtr; |
| TIMESTAMP_NOBEGIN(worker->reply_time); |
| |
| /* Before releasing lock, remember generation for future identification. */ |
| generation = worker->generation; |
| |
| LWLockRelease(LogicalRepWorkerLock); |
| |
| /* Register the new dynamic worker. */ |
| memset(&bgw, 0, sizeof(bgw)); |
| bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | |
| BGWORKER_BACKEND_DATABASE_CONNECTION; |
| bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; |
| snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); |
| snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); |
| if (OidIsValid(relid)) |
| snprintf(bgw.bgw_name, BGW_MAXLEN, |
| "logical replication worker for subscription %u sync %u", subid, relid); |
| else |
| snprintf(bgw.bgw_name, BGW_MAXLEN, |
| "logical replication worker for subscription %u", subid); |
| snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"); |
| |
| bgw.bgw_restart_time = BGW_NEVER_RESTART; |
| bgw.bgw_notify_pid = MyProcPid; |
| bgw.bgw_main_arg = Int32GetDatum(slot); |
| |
| if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) |
| { |
| /* Failed to start worker, so clean up the worker slot. */ |
| LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); |
| Assert(generation == worker->generation); |
| logicalrep_worker_cleanup(worker); |
| LWLockRelease(LogicalRepWorkerLock); |
| |
| ereport(WARNING, |
| (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
| errmsg("out of background worker slots"), |
| errhint("You might need to increase max_worker_processes."))); |
| return; |
| } |
| |
| /* Now wait until it attaches. */ |
| WaitForReplicationWorkerAttach(worker, generation, bgw_handle); |
| } |
| |
| /* |
| * Stop the logical replication worker for subid/relid, if any, and wait until |
| * it detaches from the slot. |
| */ |
| void |
| logicalrep_worker_stop(Oid subid, Oid relid) |
| { |
| LogicalRepWorker *worker; |
| uint16 generation; |
| |
| LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
| |
| worker = logicalrep_worker_find(subid, relid, false); |
| |
| /* No worker, nothing to do. */ |
| if (!worker) |
| { |
| LWLockRelease(LogicalRepWorkerLock); |
| return; |
| } |
| |
| /* |
| * Remember which generation was our worker so we can check if what we see |
| * is still the same one. |
| */ |
| generation = worker->generation; |
| |
| /* |
| * If we found a worker but it does not have proc set then it is still |
| * starting up; wait for it to finish starting and then kill it. |
| */ |
| while (worker->in_use && !worker->proc) |
| { |
| int rc; |
| |
| LWLockRelease(LogicalRepWorkerLock); |
| |
| /* Wait a bit --- we don't expect to have to wait long. */ |
| rc = WaitLatch(MyLatch, |
| WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
| 10L, WAIT_EVENT_BGWORKER_STARTUP); |
| |
| if (rc & WL_LATCH_SET) |
| { |
| ResetLatch(MyLatch); |
| CHECK_FOR_INTERRUPTS(); |
| } |
| |
| /* Recheck worker status. */ |
| LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
| |
| /* |
| * Check whether the worker slot is no longer used, which would mean |
| * that the worker has exited, or whether the worker generation is |
| * different, meaning that a different worker has taken the slot. |
| */ |
| if (!worker->in_use || worker->generation != generation) |
| { |
| LWLockRelease(LogicalRepWorkerLock); |
| return; |
| } |
| |
| /* Worker has assigned proc, so it has started. */ |
| if (worker->proc) |
| break; |
| } |
| |
| /* Now terminate the worker ... */ |
| kill(worker->proc->pid, SIGTERM); |
| |
| /* ... and wait for it to die. */ |
| for (;;) |
| { |
| int rc; |
| |
| /* is it gone? */ |
| if (!worker->proc || worker->generation != generation) |
| break; |
| |
| LWLockRelease(LogicalRepWorkerLock); |
| |
| /* Wait a bit --- we don't expect to have to wait long. */ |
| rc = WaitLatch(MyLatch, |
| WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
| 10L, WAIT_EVENT_BGWORKER_SHUTDOWN); |
| |
| if (rc & WL_LATCH_SET) |
| { |
| ResetLatch(MyLatch); |
| CHECK_FOR_INTERRUPTS(); |
| } |
| |
| LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
| } |
| |
| LWLockRelease(LogicalRepWorkerLock); |
| } |
| |
| /* |
| * Wake up (using latch) any logical replication worker for specified sub/rel. |
| */ |
| void |
| logicalrep_worker_wakeup(Oid subid, Oid relid) |
| { |
| LogicalRepWorker *worker; |
| |
| LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
| |
| worker = logicalrep_worker_find(subid, relid, true); |
| |
| if (worker) |
| logicalrep_worker_wakeup_ptr(worker); |
| |
| LWLockRelease(LogicalRepWorkerLock); |
| } |
| |
| /* |
| * Wake up (using latch) the specified logical replication worker. |
| * |
| * Caller must hold lock, else worker->proc could change under us. |
| */ |
| void |
| logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker) |
| { |
| Assert(LWLockHeldByMe(LogicalRepWorkerLock)); |
| |
| SetLatch(&worker->proc->procLatch); |
| } |
| |
| /* |
| * Attach to a slot. |
| */ |
| void |
| logicalrep_worker_attach(int slot) |
| { |
| /* Block concurrent access. */ |
| LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); |
| |
| Assert(slot >= 0 && slot < max_logical_replication_workers); |
| MyLogicalRepWorker = &LogicalRepCtx->workers[slot]; |
| |
| if (!MyLogicalRepWorker->in_use) |
| { |
| LWLockRelease(LogicalRepWorkerLock); |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("logical replication worker slot %d is empty, cannot attach", |
| slot))); |
| } |
| |
| if (MyLogicalRepWorker->proc) |
| { |
| LWLockRelease(LogicalRepWorkerLock); |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("logical replication worker slot %d is already used by " |
| "another worker, cannot attach", slot))); |
| } |
| |
| MyLogicalRepWorker->proc = MyProc; |
| before_shmem_exit(logicalrep_worker_onexit, (Datum) 0); |
| |
| LWLockRelease(LogicalRepWorkerLock); |
| } |
| |
| /* |
| * Detach the worker (cleans up the worker info). |
| */ |
| static void |
| logicalrep_worker_detach(void) |
| { |
| /* Block concurrent access. */ |
| LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); |
| |
| logicalrep_worker_cleanup(MyLogicalRepWorker); |
| |
| LWLockRelease(LogicalRepWorkerLock); |
| } |
| |
| /* |
| * Clean up worker info. |
| */ |
| static void |
| logicalrep_worker_cleanup(LogicalRepWorker *worker) |
| { |
| Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE)); |
| |
| worker->in_use = false; |
| worker->proc = NULL; |
| worker->dbid = InvalidOid; |
| worker->userid = InvalidOid; |
| worker->subid = InvalidOid; |
| worker->relid = InvalidOid; |
| } |
| |
| /* |
| * Cleanup function for logical replication launcher. |
| * |
| * Called on logical replication launcher exit. |
| */ |
| static void |
| logicalrep_launcher_onexit(int code, Datum arg) |
| { |
| LogicalRepCtx->launcher_pid = 0; |
| } |
| |
| /* |
| * Cleanup function. |
| * |
| * Called on logical replication worker exit. |
| */ |
| static void |
| logicalrep_worker_onexit(int code, Datum arg) |
| { |
| /* Disconnect gracefully from the remote side. */ |
| if (LogRepWorkerWalRcvConn) |
| walrcv_disconnect(LogRepWorkerWalRcvConn); |
| |
| logicalrep_worker_detach(); |
| |
| ApplyLauncherWakeup(); |
| } |
| |
| /* |
| * Count the number of registered (not necessarily running) sync workers |
| * for a subscription. |
| */ |
| int |
| logicalrep_sync_worker_count(Oid subid) |
| { |
| int i; |
| int res = 0; |
| |
| Assert(LWLockHeldByMe(LogicalRepWorkerLock)); |
| |
| /* Search for attached worker for a given subscription id. */ |
| for (i = 0; i < max_logical_replication_workers; i++) |
| { |
| LogicalRepWorker *w = &LogicalRepCtx->workers[i]; |
| |
| if (w->subid == subid && OidIsValid(w->relid)) |
| res++; |
| } |
| |
| return res; |
| } |
| |
| /* |
| * ApplyLauncherShmemSize |
| * Compute space needed for replication launcher shared memory |
| */ |
| Size |
| ApplyLauncherShmemSize(void) |
| { |
| Size size; |
| |
| /* |
| * Need the fixed struct and the array of LogicalRepWorker. |
| */ |
| size = sizeof(LogicalRepCtxStruct); |
| size = MAXALIGN(size); |
| size = add_size(size, mul_size(max_logical_replication_workers, |
| sizeof(LogicalRepWorker))); |
| return size; |
| } |
| |
| /* |
| * ApplyLauncherRegister |
| * Register a background worker running the logical replication launcher. |
| */ |
| void |
| ApplyLauncherRegister(void) |
| { |
| BackgroundWorker bgw; |
| |
| if (max_logical_replication_workers == 0) |
| return; |
| |
| memset(&bgw, 0, sizeof(bgw)); |
| bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | |
| BGWORKER_BACKEND_DATABASE_CONNECTION; |
| bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; |
| snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); |
| snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain"); |
| snprintf(bgw.bgw_name, BGW_MAXLEN, |
| "logical replication launcher"); |
| snprintf(bgw.bgw_type, BGW_MAXLEN, |
| "logical replication launcher"); |
| bgw.bgw_restart_time = 5; |
| bgw.bgw_notify_pid = 0; |
| bgw.bgw_main_arg = (Datum) 0; |
| |
| RegisterBackgroundWorker(&bgw); |
| } |
| |
| /* |
| * ApplyLauncherShmemInit |
| * Allocate and initialize replication launcher shared memory |
| */ |
| void |
| ApplyLauncherShmemInit(void) |
| { |
| bool found; |
| |
| LogicalRepCtx = (LogicalRepCtxStruct *) |
| ShmemInitStruct("Logical Replication Launcher Data", |
| ApplyLauncherShmemSize(), |
| &found); |
| |
| if (!found) |
| { |
| int slot; |
| |
| memset(LogicalRepCtx, 0, ApplyLauncherShmemSize()); |
| |
| /* Initialize memory and spin locks for each worker slot. */ |
| for (slot = 0; slot < max_logical_replication_workers; slot++) |
| { |
| LogicalRepWorker *worker = &LogicalRepCtx->workers[slot]; |
| |
| memset(worker, 0, sizeof(LogicalRepWorker)); |
| SpinLockInit(&worker->relmutex); |
| } |
| } |
| } |
| |
| /* |
| * Wakeup the launcher on commit if requested. |
| */ |
| void |
| AtEOXact_ApplyLauncher(bool isCommit) |
| { |
| if (isCommit) |
| { |
| if (on_commit_launcher_wakeup) |
| ApplyLauncherWakeup(); |
| } |
| |
| on_commit_launcher_wakeup = false; |
| } |
| |
| /* |
| * Request wakeup of the launcher on commit of the transaction. |
| * |
| * This is used to send launcher signal to stop sleeping and process the |
| * subscriptions when current transaction commits. Should be used when new |
| * tuple was added to the pg_subscription catalog. |
| */ |
| void |
| ApplyLauncherWakeupAtCommit(void) |
| { |
| if (!on_commit_launcher_wakeup) |
| on_commit_launcher_wakeup = true; |
| } |
| |
| static void |
| ApplyLauncherWakeup(void) |
| { |
| if (LogicalRepCtx->launcher_pid != 0) |
| kill(LogicalRepCtx->launcher_pid, SIGUSR1); |
| } |
| |
| /* |
| * Main loop for the apply launcher process. |
| */ |
| void |
| ApplyLauncherMain(Datum main_arg) |
| { |
| TimestampTz last_start_time = 0; |
| |
| ereport(DEBUG1, |
| (errmsg_internal("logical replication launcher started"))); |
| |
| before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0); |
| |
| Assert(LogicalRepCtx->launcher_pid == 0); |
| LogicalRepCtx->launcher_pid = MyProcPid; |
| |
| /* Establish signal handlers. */ |
| pqsignal(SIGHUP, SignalHandlerForConfigReload); |
| pqsignal(SIGTERM, die); |
| BackgroundWorkerUnblockSignals(); |
| |
| /* |
| * Establish connection to nailed catalogs (we only ever access |
| * pg_subscription). |
| */ |
| BackgroundWorkerInitializeConnection(NULL, NULL, 0); |
| |
| /* Enter main loop */ |
| for (;;) |
| { |
| int rc; |
| List *sublist; |
| ListCell *lc; |
| MemoryContext subctx; |
| MemoryContext oldctx; |
| TimestampTz now; |
| long wait_time = DEFAULT_NAPTIME_PER_CYCLE; |
| |
| CHECK_FOR_INTERRUPTS(); |
| |
| now = GetCurrentTimestamp(); |
| |
| /* Limit the start retry to once a wal_retrieve_retry_interval */ |
| if (TimestampDifferenceExceeds(last_start_time, now, |
| wal_retrieve_retry_interval)) |
| { |
| /* Use temporary context for the database list and worker info. */ |
| subctx = AllocSetContextCreate(TopMemoryContext, |
| "Logical Replication Launcher sublist", |
| ALLOCSET_DEFAULT_SIZES); |
| oldctx = MemoryContextSwitchTo(subctx); |
| |
| /* search for subscriptions to start or stop. */ |
| sublist = get_subscription_list(); |
| |
| /* Start the missing workers for enabled subscriptions. */ |
| foreach(lc, sublist) |
| { |
| Subscription *sub = (Subscription *) lfirst(lc); |
| LogicalRepWorker *w; |
| |
| if (!sub->enabled) |
| continue; |
| |
| LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
| w = logicalrep_worker_find(sub->oid, InvalidOid, false); |
| LWLockRelease(LogicalRepWorkerLock); |
| |
| if (w == NULL) |
| { |
| last_start_time = now; |
| wait_time = wal_retrieve_retry_interval; |
| |
| logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, |
| sub->owner, InvalidOid); |
| } |
| } |
| |
| /* Switch back to original memory context. */ |
| MemoryContextSwitchTo(oldctx); |
| /* Clean the temporary memory. */ |
| MemoryContextDelete(subctx); |
| } |
| else |
| { |
| /* |
| * The wait in previous cycle was interrupted in less than |
| * wal_retrieve_retry_interval since last worker was started, this |
| * usually means crash of the worker, so we should retry in |
| * wal_retrieve_retry_interval again. |
| */ |
| wait_time = wal_retrieve_retry_interval; |
| } |
| |
| /* Wait for more work. */ |
| rc = WaitLatch(MyLatch, |
| WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
| wait_time, |
| WAIT_EVENT_LOGICAL_LAUNCHER_MAIN); |
| |
| if (rc & WL_LATCH_SET) |
| { |
| ResetLatch(MyLatch); |
| CHECK_FOR_INTERRUPTS(); |
| } |
| |
| if (ConfigReloadPending) |
| { |
| ConfigReloadPending = false; |
| ProcessConfigFile(PGC_SIGHUP); |
| } |
| } |
| |
| /* Not reachable */ |
| } |
| |
| /* |
| * Is current process the logical replication launcher? |
| */ |
| bool |
| IsLogicalLauncher(void) |
| { |
| return LogicalRepCtx->launcher_pid == MyProcPid; |
| } |
| |
| /* |
| * Returns state of the subscriptions. |
| */ |
| Datum |
| pg_stat_get_subscription(PG_FUNCTION_ARGS) |
| { |
| #define PG_STAT_GET_SUBSCRIPTION_COLS 8 |
| Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); |
| int i; |
| ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; |
| TupleDesc tupdesc; |
| Tuplestorestate *tupstore; |
| MemoryContext per_query_ctx; |
| MemoryContext oldcontext; |
| |
| /* check to see if caller supports us returning a tuplestore */ |
| if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("set-valued function called in context that cannot accept a set"))); |
| if (!(rsinfo->allowedModes & SFRM_Materialize)) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("materialize mode required, but it is not allowed in this context"))); |
| |
| /* Build a tuple descriptor for our result type */ |
| if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
| elog(ERROR, "return type must be a row type"); |
| |
| per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; |
| oldcontext = MemoryContextSwitchTo(per_query_ctx); |
| |
| tupstore = tuplestore_begin_heap(true, false, work_mem); |
| rsinfo->returnMode = SFRM_Materialize; |
| rsinfo->setResult = tupstore; |
| rsinfo->setDesc = tupdesc; |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| /* Make sure we get consistent view of the workers. */ |
| LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
| |
| for (i = 0; i < max_logical_replication_workers; i++) |
| { |
| /* for each row */ |
| Datum values[PG_STAT_GET_SUBSCRIPTION_COLS]; |
| bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS]; |
| int worker_pid; |
| LogicalRepWorker worker; |
| |
| memcpy(&worker, &LogicalRepCtx->workers[i], |
| sizeof(LogicalRepWorker)); |
| if (!worker.proc || !IsBackendPid(worker.proc->pid)) |
| continue; |
| |
| if (OidIsValid(subid) && worker.subid != subid) |
| continue; |
| |
| worker_pid = worker.proc->pid; |
| |
| MemSet(values, 0, sizeof(values)); |
| MemSet(nulls, 0, sizeof(nulls)); |
| |
| values[0] = ObjectIdGetDatum(worker.subid); |
| if (OidIsValid(worker.relid)) |
| values[1] = ObjectIdGetDatum(worker.relid); |
| else |
| nulls[1] = true; |
| values[2] = Int32GetDatum(worker_pid); |
| if (XLogRecPtrIsInvalid(worker.last_lsn)) |
| nulls[3] = true; |
| else |
| values[3] = LSNGetDatum(worker.last_lsn); |
| if (worker.last_send_time == 0) |
| nulls[4] = true; |
| else |
| values[4] = TimestampTzGetDatum(worker.last_send_time); |
| if (worker.last_recv_time == 0) |
| nulls[5] = true; |
| else |
| values[5] = TimestampTzGetDatum(worker.last_recv_time); |
| if (XLogRecPtrIsInvalid(worker.reply_lsn)) |
| nulls[6] = true; |
| else |
| values[6] = LSNGetDatum(worker.reply_lsn); |
| if (worker.reply_time == 0) |
| nulls[7] = true; |
| else |
| values[7] = TimestampTzGetDatum(worker.reply_time); |
| |
| tuplestore_putvalues(tupstore, tupdesc, values, nulls); |
| |
| /* |
| * If only a single subscription was requested, and we found it, |
| * break. |
| */ |
| if (OidIsValid(subid)) |
| break; |
| } |
| |
| LWLockRelease(LogicalRepWorkerLock); |
| |
| /* clean up and return the tuplestore */ |
| tuplestore_donestoring(tupstore); |
| |
| return (Datum) 0; |
| } |