| /*------------------------------------------------------------------------- |
| * worker.c |
| * PostgreSQL logical replication worker (apply) |
| * |
| * Copyright (c) 2016-2023, PostgreSQL Global Development Group |
| * |
| * IDENTIFICATION |
| * src/backend/replication/logical/worker.c |
| * |
| * NOTES |
| * This file contains the worker which applies logical changes as they come |
| * from remote logical replication stream. |
| * |
| * The main worker (apply) is started by logical replication worker |
| * launcher for every enabled subscription in a database. It uses |
| * walsender protocol to communicate with publisher. |
| * |
| * This module includes server facing code and shares libpqwalreceiver |
| * module with walreceiver for providing the libpq specific functionality. |
| * |
| * |
| * STREAMED TRANSACTIONS |
| * --------------------- |
| * Streamed transactions (large transactions exceeding a memory limit on the |
| * upstream) are applied using one of two approaches: |
| * |
| * 1) Write to temporary files and apply when the final commit arrives |
| * |
| * This approach is used when the user has set the subscription's streaming |
| * option as on. |
| * |
| * Unlike the regular (non-streamed) case, handling streamed transactions has |
| * to handle aborts of both the toplevel transaction and subtransactions. This |
| * is achieved by tracking offsets for subtransactions, which is then used |
| * to truncate the file with serialized changes. |
| * |
| * The files are placed in tmp file directory by default, and the filenames |
| * include both the XID of the toplevel transaction and OID of the |
| * subscription. This is necessary so that different workers processing a |
| * remote transaction with the same XID doesn't interfere. |
| * |
| * We use BufFiles instead of using normal temporary files because (a) the |
| * BufFile infrastructure supports temporary files that exceed the OS file size |
| * limit, (b) provides a way for automatic clean up on the error and (c) provides |
| * a way to survive these files across local transactions and allow to open and |
| * close at stream start and close. We decided to use FileSet |
| * infrastructure as without that it deletes the files on the closure of the |
| * file and if we decide to keep stream files open across the start/stop stream |
| * then it will consume a lot of memory (more than 8K for each BufFile and |
| * there could be multiple such BufFiles as the subscriber could receive |
| * multiple start/stop streams for different transactions before getting the |
| * commit). Moreover, if we don't use FileSet then we also need to invent |
| * a new way to pass filenames to BufFile APIs so that we are allowed to open |
| * the file we desired across multiple stream-open calls for the same |
| * transaction. |
| * |
| * 2) Parallel apply workers. |
| * |
| * This approach is used when the user has set the subscription's streaming |
| * option as parallel. See logical/applyparallelworker.c for information about |
| * this approach. |
| * |
| * TWO_PHASE TRANSACTIONS |
| * ---------------------- |
| * Two phase transactions are replayed at prepare and then committed or |
| * rolled back at commit prepared and rollback prepared respectively. It is |
| * possible to have a prepared transaction that arrives at the apply worker |
| * when the tablesync is busy doing the initial copy. In this case, the apply |
| * worker skips all the prepared operations [e.g. inserts] while the tablesync |
| * is still busy (see the condition of should_apply_changes_for_rel). The |
| * tablesync worker might not get such a prepared transaction because say it |
| * was prior to the initial consistent point but might have got some later |
| * commits. Now, the tablesync worker will exit without doing anything for the |
| * prepared transaction skipped by the apply worker as the sync location for it |
| * will be already ahead of the apply worker's current location. This would lead |
| * to an "empty prepare", because later when the apply worker does the commit |
| * prepare, there is nothing in it (the inserts were skipped earlier). |
| * |
| * To avoid this, and similar prepare confusions the subscription's two_phase |
| * commit is enabled only after the initial sync is over. The two_phase option |
| * has been implemented as a tri-state with values DISABLED, PENDING, and |
| * ENABLED. |
| * |
| * Even if the user specifies they want a subscription with two_phase = on, |
| * internally it will start with a tri-state of PENDING which only becomes |
| * ENABLED after all tablesync initializations are completed - i.e. when all |
| * tablesync workers have reached their READY state. In other words, the value |
| * PENDING is only a temporary state for subscription start-up. |
| * |
| * Until the two_phase is properly available (ENABLED) the subscription will |
| * behave as if two_phase = off. When the apply worker detects that all |
| * tablesyncs have become READY (while the tri-state was PENDING) it will |
| * restart the apply worker process. This happens in |
| * process_syncing_tables_for_apply. |
| * |
| * When the (re-started) apply worker finds that all tablesyncs are READY for a |
| * two_phase tri-state of PENDING it start streaming messages with the |
| * two_phase option which in turn enables the decoding of two-phase commits at |
| * the publisher. Then, it updates the tri-state value from PENDING to ENABLED. |
| * Now, it is possible that during the time we have not enabled two_phase, the |
| * publisher (replication server) would have skipped some prepares but we |
| * ensure that such prepares are sent along with commit prepare, see |
| * ReorderBufferFinishPrepared. |
| * |
| * If the subscription has no tables then a two_phase tri-state PENDING is |
| * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH |
| * PUBLICATION which might otherwise be disallowed (see below). |
| * |
| * If ever a user needs to be aware of the tri-state value, they can fetch it |
| * from the pg_subscription catalog (see column subtwophasestate). |
| * |
| * We don't allow to toggle two_phase option of a subscription because it can |
| * lead to an inconsistent replica. Consider, initially, it was on and we have |
| * received some prepare then we turn it off, now at commit time the server |
| * will send the entire transaction data along with the commit. With some more |
| * analysis, we can allow changing this option from off to on but not sure if |
| * that alone would be useful. |
| * |
| * Finally, to avoid problems mentioned in previous paragraphs from any |
| * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on' |
| * to 'off' and then again back to 'on') there is a restriction for |
| * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when |
| * the two_phase tri-state is ENABLED, except when copy_data = false. |
| * |
| * We can get prepare of the same GID more than once for the genuine cases |
| * where we have defined multiple subscriptions for publications on the same |
| * server and prepared transaction has operations on tables subscribed to those |
| * subscriptions. For such cases, if we use the GID sent by publisher one of |
| * the prepares will be successful and others will fail, in which case the |
| * server will send them again. Now, this can lead to a deadlock if user has |
| * set synchronous_standby_names for all the subscriptions on subscriber. To |
| * avoid such deadlocks, we generate a unique GID (consisting of the |
| * subscription oid and the xid of the prepared transaction) for each prepare |
| * transaction on the subscriber. |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include <sys/stat.h> |
| #include <unistd.h> |
| |
| #include "access/genam.h" |
| #include "access/table.h" |
| #include "access/tableam.h" |
| #include "access/twophase.h" |
| #include "access/xact.h" |
| #include "access/xlog_internal.h" |
| #include "catalog/catalog.h" |
| #include "catalog/indexing.h" |
| #include "catalog/namespace.h" |
| #include "catalog/partition.h" |
| #include "catalog/pg_inherits.h" |
| #include "catalog/pg_subscription.h" |
| #include "catalog/pg_subscription_rel.h" |
| #include "catalog/pg_tablespace.h" |
| #include "commands/tablecmds.h" |
| #include "commands/tablespace.h" |
| #include "commands/trigger.h" |
| #include "executor/executor.h" |
| #include "executor/execPartition.h" |
| #include "executor/nodeModifyTable.h" |
| #include "funcapi.h" |
| #include "libpq/pqformat.h" |
| #include "libpq/pqsignal.h" |
| #include "mb/pg_wchar.h" |
| #include "miscadmin.h" |
| #include "nodes/makefuncs.h" |
| #include "optimizer/optimizer.h" |
| #include "parser/parse_relation.h" |
| #include "pgstat.h" |
| #include "postmaster/bgworker.h" |
| #include "postmaster/interrupt.h" |
| #include "postmaster/postmaster.h" |
| #include "postmaster/walwriter.h" |
| #include "replication/decode.h" |
| #include "replication/logical.h" |
| #include "replication/logicallauncher.h" |
| #include "replication/logicalproto.h" |
| #include "replication/logicalrelation.h" |
| #include "replication/logicalworker.h" |
| #include "replication/origin.h" |
| #include "replication/reorderbuffer.h" |
| #include "replication/snapbuild.h" |
| #include "replication/walreceiver.h" |
| #include "replication/worker_internal.h" |
| #include "rewrite/rewriteHandler.h" |
| #include "storage/buffile.h" |
| #include "storage/bufmgr.h" |
| #include "storage/fd.h" |
| #include "storage/ipc.h" |
| #include "storage/lmgr.h" |
| #include "storage/proc.h" |
| #include "storage/procarray.h" |
| #include "tcop/tcopprot.h" |
| #include "utils/acl.h" |
| #include "utils/builtins.h" |
| #include "utils/catcache.h" |
| #include "utils/dynahash.h" |
| #include "utils/datum.h" |
| #include "utils/fmgroids.h" |
| #include "utils/guc.h" |
| #include "utils/inval.h" |
| #include "utils/lsyscache.h" |
| #include "utils/memutils.h" |
| #include "utils/pg_lsn.h" |
| #include "utils/rel.h" |
| #include "utils/rls.h" |
| #include "utils/syscache.h" |
| #include "utils/timeout.h" |
| #include "utils/usercontext.h" |
| |
| #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */ |
| |
| typedef struct FlushPosition |
| { |
| dlist_node node; |
| XLogRecPtr local_end; |
| XLogRecPtr remote_end; |
| } FlushPosition; |
| |
| static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); |
| |
| typedef struct ApplyExecutionData |
| { |
| EState *estate; /* executor state, used to track resources */ |
| |
| LogicalRepRelMapEntry *targetRel; /* replication target rel */ |
| ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */ |
| |
| /* These fields are used when the target relation is partitioned: */ |
| ModifyTableState *mtstate; /* dummy ModifyTable state */ |
| PartitionTupleRouting *proute; /* partition routing info */ |
| } ApplyExecutionData; |
| |
| /* Struct for saving and restoring apply errcontext information */ |
| typedef struct ApplyErrorCallbackArg |
| { |
| LogicalRepMsgType command; /* 0 if invalid */ |
| LogicalRepRelMapEntry *rel; |
| |
| /* Remote node information */ |
| int remote_attnum; /* -1 if invalid */ |
| TransactionId remote_xid; |
| XLogRecPtr finish_lsn; |
| char *origin_name; |
| } ApplyErrorCallbackArg; |
| |
| /* |
| * The action to be taken for the changes in the transaction. |
| * |
| * TRANS_LEADER_APPLY: |
| * This action means that we are in the leader apply worker or table sync |
| * worker. The changes of the transaction are either directly applied or |
| * are read from temporary files (for streaming transactions) and then |
| * applied by the worker. |
| * |
| * TRANS_LEADER_SERIALIZE: |
| * This action means that we are in the leader apply worker or table sync |
| * worker. Changes are written to temporary files and then applied when the |
| * final commit arrives. |
| * |
| * TRANS_LEADER_SEND_TO_PARALLEL: |
| * This action means that we are in the leader apply worker and need to send |
| * the changes to the parallel apply worker. |
| * |
| * TRANS_LEADER_PARTIAL_SERIALIZE: |
| * This action means that we are in the leader apply worker and have sent some |
| * changes directly to the parallel apply worker and the remaining changes are |
| * serialized to a file, due to timeout while sending data. The parallel apply |
| * worker will apply these serialized changes when the final commit arrives. |
| * |
| * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to |
| * serializing changes, the leader worker also needs to serialize the |
| * STREAM_XXX message to a file, and wait for the parallel apply worker to |
| * finish the transaction when processing the transaction finish command. So |
| * this new action was introduced to keep the code and logic clear. |
| * |
| * TRANS_PARALLEL_APPLY: |
| * This action means that we are in the parallel apply worker and changes of |
| * the transaction are applied directly by the worker. |
| */ |
| typedef enum |
| { |
| /* The action for non-streaming transactions. */ |
| TRANS_LEADER_APPLY, |
| |
| /* Actions for streaming transactions. */ |
| TRANS_LEADER_SERIALIZE, |
| TRANS_LEADER_SEND_TO_PARALLEL, |
| TRANS_LEADER_PARTIAL_SERIALIZE, |
| TRANS_PARALLEL_APPLY |
| } TransApplyAction; |
| |
| /* errcontext tracker */ |
| ApplyErrorCallbackArg apply_error_callback_arg = |
| { |
| .command = 0, |
| .rel = NULL, |
| .remote_attnum = -1, |
| .remote_xid = InvalidTransactionId, |
| .finish_lsn = InvalidXLogRecPtr, |
| .origin_name = NULL, |
| }; |
| |
| ErrorContextCallback *apply_error_context_stack = NULL; |
| |
| MemoryContext ApplyMessageContext = NULL; |
| MemoryContext ApplyContext = NULL; |
| |
| /* per stream context for streaming transactions */ |
| static MemoryContext LogicalStreamingContext = NULL; |
| |
| WalReceiverConn *LogRepWorkerWalRcvConn = NULL; |
| |
| Subscription *MySubscription = NULL; |
| static bool MySubscriptionValid = false; |
| |
| static List *on_commit_wakeup_workers_subids = NIL; |
| |
| bool in_remote_transaction = false; |
| static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; |
| |
| /* fields valid only when processing streamed transaction */ |
| static bool in_streamed_transaction = false; |
| |
| static TransactionId stream_xid = InvalidTransactionId; |
| |
| /* |
| * The number of changes applied by parallel apply worker during one streaming |
| * block. |
| */ |
| static uint32 parallel_stream_nchanges = 0; |
| |
| /* Are we initializing a apply worker? */ |
| bool InitializingApplyWorker = false; |
| |
| /* |
| * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for |
| * the subscription if the remote transaction's finish LSN matches the subskiplsn. |
| * Once we start skipping changes, we don't stop it until we skip all changes of |
| * the transaction even if pg_subscription is updated and MySubscription->skiplsn |
| * gets changed or reset during that. Also, in streaming transaction cases (streaming = on), |
| * we don't skip receiving and spooling the changes since we decide whether or not |
| * to skip applying the changes when starting to apply changes. The subskiplsn is |
| * cleared after successfully skipping the transaction or applying non-empty |
| * transaction. The latter prevents the mistakenly specified subskiplsn from |
| * being left. Note that we cannot skip the streaming transactions when using |
| * parallel apply workers because we cannot get the finish LSN before applying |
| * the changes. So, we don't start parallel apply worker when finish LSN is set |
| * by the user. |
| */ |
| static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; |
| #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn))) |
| |
| /* BufFile handle of the current streaming file */ |
| static BufFile *stream_fd = NULL; |
| |
| typedef struct SubXactInfo |
| { |
| TransactionId xid; /* XID of the subxact */ |
| int fileno; /* file number in the buffile */ |
| off_t offset; /* offset in the file */ |
| } SubXactInfo; |
| |
| /* Sub-transaction data for the current streaming transaction */ |
| typedef struct ApplySubXactData |
| { |
| uint32 nsubxacts; /* number of sub-transactions */ |
| uint32 nsubxacts_max; /* current capacity of subxacts */ |
| TransactionId subxact_last; /* xid of the last sub-transaction */ |
| SubXactInfo *subxacts; /* sub-xact offset in changes file */ |
| } ApplySubXactData; |
| |
| static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; |
| |
| static inline void subxact_filename(char *path, Oid subid, TransactionId xid); |
| static inline void changes_filename(char *path, Oid subid, TransactionId xid); |
| |
| /* |
| * Information about subtransactions of a given toplevel transaction. |
| */ |
| static void subxact_info_write(Oid subid, TransactionId xid); |
| static void subxact_info_read(Oid subid, TransactionId xid); |
| static void subxact_info_add(TransactionId xid); |
| static inline void cleanup_subxact_info(void); |
| |
| /* |
| * Serialize and deserialize changes for a toplevel transaction. |
| */ |
| static void stream_open_file(Oid subid, TransactionId xid, |
| bool first_segment); |
| static void stream_write_change(char action, StringInfo s); |
| static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s); |
| static void stream_close_file(void); |
| |
| static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); |
| |
| static void DisableSubscriptionAndExit(void); |
| |
| static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); |
| static void apply_handle_insert_internal(ApplyExecutionData *edata, |
| ResultRelInfo *relinfo, |
| TupleTableSlot *remoteslot); |
| static void apply_handle_update_internal(ApplyExecutionData *edata, |
| ResultRelInfo *relinfo, |
| TupleTableSlot *remoteslot, |
| LogicalRepTupleData *newtup, |
| Oid localindexoid); |
| static void apply_handle_delete_internal(ApplyExecutionData *edata, |
| ResultRelInfo *relinfo, |
| TupleTableSlot *remoteslot, |
| Oid localindexoid); |
| static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, |
| LogicalRepRelation *remoterel, |
| Oid localidxoid, |
| TupleTableSlot *remoteslot, |
| TupleTableSlot **localslot); |
| static void apply_handle_tuple_routing(ApplyExecutionData *edata, |
| TupleTableSlot *remoteslot, |
| LogicalRepTupleData *newtup, |
| CmdType operation); |
| |
| /* Compute GID for two_phase transactions */ |
| static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); |
| |
| /* Functions for skipping changes */ |
| static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); |
| static void stop_skipping_changes(void); |
| static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn); |
| |
| /* Functions for apply error callback */ |
| static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); |
| static inline void reset_apply_error_context_info(void); |
| |
| static TransApplyAction get_transaction_apply_action(TransactionId xid, |
| ParallelApplyWorkerInfo **winfo); |
| |
| static void replorigin_reset(int code, Datum arg); |
| |
| /* |
| * Form the origin name for the subscription. |
| * |
| * This is a common function for tablesync and other workers. Tablesync workers |
| * must pass a valid relid. Other callers must pass relid = InvalidOid. |
| * |
| * Return the name in the supplied buffer. |
| */ |
| void |
| ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, |
| char *originname, Size szoriginname) |
| { |
| if (OidIsValid(relid)) |
| { |
| /* Replication origin name for tablesync workers. */ |
| snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid); |
| } |
| else |
| { |
| /* Replication origin name for non-tablesync workers. */ |
| snprintf(originname, szoriginname, "pg_%u", suboid); |
| } |
| } |
| |
| /* |
| * Should this worker apply changes for given relation. |
| * |
| * This is mainly needed for initial relation data sync as that runs in |
| * separate worker process running in parallel and we need some way to skip |
| * changes coming to the leader apply worker during the sync of a table. |
| * |
| * Note we need to do smaller or equals comparison for SYNCDONE state because |
| * it might hold position of end of initial slot consistent point WAL |
| * record + 1 (ie start of next record) and next record can be COMMIT of |
| * transaction we are now processing (which is what we set remote_final_lsn |
| * to in apply_handle_begin). |
| * |
| * Note that for streaming transactions that are being applied in the parallel |
| * apply worker, we disallow applying changes if the target table in the |
| * subscription is not in the READY state, because we cannot decide whether to |
| * apply the change as we won't know remote_final_lsn by that time. |
| * |
| * We already checked this in pa_can_start() before assigning the |
| * streaming transaction to the parallel worker, but it also needs to be |
| * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH |
| * PUBLICATION in parallel, the new table can be added to pg_subscription_rel |
| * while applying this transaction. |
| */ |
| static bool |
| should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) |
| { |
| if (am_tablesync_worker()) |
| return MyLogicalRepWorker->relid == rel->localreloid; |
| else if (am_parallel_apply_worker()) |
| { |
| /* We don't synchronize rel's that are in unknown state. */ |
| if (rel->state != SUBREL_STATE_READY && |
| rel->state != SUBREL_STATE_UNKNOWN) |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("logical replication parallel apply worker for subscription \"%s\" will stop", |
| MySubscription->name), |
| errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized."))); |
| |
| return rel->state == SUBREL_STATE_READY; |
| } |
| else |
| return (rel->state == SUBREL_STATE_READY || |
| (rel->state == SUBREL_STATE_SYNCDONE && |
| rel->statelsn <= remote_final_lsn)); |
| } |
| |
| /* |
| * Begin one step (one INSERT, UPDATE, etc) of a replication transaction. |
| * |
| * Start a transaction, if this is the first step (else we keep using the |
| * existing transaction). |
| * Also provide a global snapshot and ensure we run in ApplyMessageContext. |
| */ |
| static void |
| begin_replication_step(void) |
| { |
| SetCurrentStatementStartTimestamp(); |
| |
| if (!IsTransactionState()) |
| { |
| StartTransactionCommand(); |
| maybe_reread_subscription(); |
| } |
| |
| PushActiveSnapshot(GetTransactionSnapshot()); |
| |
| MemoryContextSwitchTo(ApplyMessageContext); |
| } |
| |
| /* |
| * Finish up one step of a replication transaction. |
| * Callers of begin_replication_step() must also call this. |
| * |
| * We don't close out the transaction here, but we should increment |
| * the command counter to make the effects of this step visible. |
| */ |
| static void |
| end_replication_step(void) |
| { |
| PopActiveSnapshot(); |
| |
| CommandCounterIncrement(); |
| } |
| |
| /* |
| * Handle streamed transactions for both the leader apply worker and the |
| * parallel apply workers. |
| * |
| * In the streaming case (receiving a block of the streamed transaction), for |
| * serialize mode, simply redirect it to a file for the proper toplevel |
| * transaction, and for parallel mode, the leader apply worker will send the |
| * changes to parallel apply workers and the parallel apply worker will define |
| * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE |
| * messages will be applied by both leader apply worker and parallel apply |
| * workers). |
| * |
| * Returns true for streamed transactions (when the change is either serialized |
| * to file or sent to parallel apply worker), false otherwise (regular mode or |
| * needs to be processed by parallel apply worker). |
| * |
| * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION |
| * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent |
| * to a parallel apply worker. |
| */ |
| static bool |
| handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) |
| { |
| TransactionId current_xid; |
| ParallelApplyWorkerInfo *winfo; |
| TransApplyAction apply_action; |
| StringInfoData original_msg; |
| |
| apply_action = get_transaction_apply_action(stream_xid, &winfo); |
| |
| /* not in streaming mode */ |
| if (apply_action == TRANS_LEADER_APPLY) |
| return false; |
| |
| Assert(TransactionIdIsValid(stream_xid)); |
| |
| /* |
| * The parallel apply worker needs the xid in this message to decide |
| * whether to define a savepoint, so save the original message that has |
| * not moved the cursor after the xid. We will serialize this message to a |
| * file in PARTIAL_SERIALIZE mode. |
| */ |
| original_msg = *s; |
| |
| /* |
| * We should have received XID of the subxact as the first part of the |
| * message, so extract it. |
| */ |
| current_xid = pq_getmsgint(s, 4); |
| |
| if (!TransactionIdIsValid(current_xid)) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("invalid transaction ID in streamed replication transaction"))); |
| |
| switch (apply_action) |
| { |
| case TRANS_LEADER_SERIALIZE: |
| Assert(stream_fd); |
| |
| /* Add the new subxact to the array (unless already there). */ |
| subxact_info_add(current_xid); |
| |
| /* Write the change to the current file */ |
| stream_write_change(action, s); |
| return true; |
| |
| case TRANS_LEADER_SEND_TO_PARALLEL: |
| Assert(winfo); |
| |
| /* |
| * XXX The publisher side doesn't always send relation/type update |
| * messages after the streaming transaction, so also update the |
| * relation/type in leader apply worker. See function |
| * cleanup_rel_sync_cache. |
| */ |
| if (pa_send_data(winfo, s->len, s->data)) |
| return (action != LOGICAL_REP_MSG_RELATION && |
| action != LOGICAL_REP_MSG_TYPE); |
| |
| /* |
| * Switch to serialize mode when we are not able to send the |
| * change to parallel apply worker. |
| */ |
| pa_switch_to_partial_serialize(winfo, false); |
| |
| /* fall through */ |
| case TRANS_LEADER_PARTIAL_SERIALIZE: |
| stream_write_change(action, &original_msg); |
| |
| /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */ |
| return (action != LOGICAL_REP_MSG_RELATION && |
| action != LOGICAL_REP_MSG_TYPE); |
| |
| case TRANS_PARALLEL_APPLY: |
| parallel_stream_nchanges += 1; |
| |
| /* Define a savepoint for a subxact if needed. */ |
| pa_start_subtrans(current_xid, stream_xid); |
| return false; |
| |
| default: |
| elog(ERROR, "unexpected apply action: %d", (int) apply_action); |
| return false; /* silence compiler warning */ |
| } |
| } |
| |
| /* |
| * Executor state preparation for evaluation of constraint expressions, |
| * indexes and triggers for the specified relation. |
| * |
| * Note that the caller must open and close any indexes to be updated. |
| */ |
| static ApplyExecutionData * |
| create_edata_for_relation(LogicalRepRelMapEntry *rel) |
| { |
| ApplyExecutionData *edata; |
| EState *estate; |
| RangeTblEntry *rte; |
| List *perminfos = NIL; |
| ResultRelInfo *resultRelInfo; |
| |
| edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData)); |
| edata->targetRel = rel; |
| |
| edata->estate = estate = CreateExecutorState(); |
| |
| rte = makeNode(RangeTblEntry); |
| rte->rtekind = RTE_RELATION; |
| rte->relid = RelationGetRelid(rel->localrel); |
| rte->relkind = rel->localrel->rd_rel->relkind; |
| rte->rellockmode = AccessShareLock; |
| |
| addRTEPermissionInfo(&perminfos, rte); |
| |
| ExecInitRangeTable(estate, list_make1(rte), perminfos); |
| |
| edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo); |
| |
| /* |
| * Use Relation opened by logicalrep_rel_open() instead of opening it |
| * again. |
| */ |
| InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0); |
| |
| /* |
| * We put the ResultRelInfo in the es_opened_result_relations list, even |
| * though we don't populate the es_result_relations array. That's a bit |
| * bogus, but it's enough to make ExecGetTriggerResultRel() find them. |
| * |
| * ExecOpenIndices() is not called here either, each execution path doing |
| * an apply operation being responsible for that. |
| */ |
| estate->es_opened_result_relations = |
| lappend(estate->es_opened_result_relations, resultRelInfo); |
| |
| estate->es_output_cid = GetCurrentCommandId(true); |
| |
| /* Prepare to catch AFTER triggers. */ |
| AfterTriggerBeginQuery(); |
| |
| /* other fields of edata remain NULL for now */ |
| |
| return edata; |
| } |
| |
| /* |
| * Finish any operations related to the executor state created by |
| * create_edata_for_relation(). |
| */ |
| static void |
| finish_edata(ApplyExecutionData *edata) |
| { |
| EState *estate = edata->estate; |
| |
| /* Handle any queued AFTER triggers. */ |
| AfterTriggerEndQuery(estate); |
| |
| /* Shut down tuple routing, if any was done. */ |
| if (edata->proute) |
| ExecCleanupTupleRouting(edata->mtstate, edata->proute); |
| |
| /* |
| * Cleanup. It might seem that we should call ExecCloseResultRelations() |
| * here, but we intentionally don't. It would close the rel we added to |
| * es_opened_result_relations above, which is wrong because we took no |
| * corresponding refcount. We rely on ExecCleanupTupleRouting() to close |
| * any other relations opened during execution. |
| */ |
| ExecResetTupleTable(estate->es_tupleTable, false); |
| FreeExecutorState(estate); |
| pfree(edata); |
| } |
| |
| /* |
| * Executes default values for columns for which we can't map to remote |
| * relation columns. |
| * |
| * This allows us to support tables which have more columns on the downstream |
| * than on the upstream. |
| */ |
| static void |
| slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, |
| TupleTableSlot *slot) |
| { |
| TupleDesc desc = RelationGetDescr(rel->localrel); |
| int num_phys_attrs = desc->natts; |
| int i; |
| int attnum, |
| num_defaults = 0; |
| int *defmap; |
| ExprState **defexprs; |
| ExprContext *econtext; |
| |
| econtext = GetPerTupleExprContext(estate); |
| |
| /* We got all the data via replication, no need to evaluate anything. */ |
| if (num_phys_attrs == rel->remoterel.natts) |
| return; |
| |
| defmap = (int *) palloc(num_phys_attrs * sizeof(int)); |
| defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); |
| |
| Assert(rel->attrmap->maplen == num_phys_attrs); |
| for (attnum = 0; attnum < num_phys_attrs; attnum++) |
| { |
| Expr *defexpr; |
| |
| if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated) |
| continue; |
| |
| if (rel->attrmap->attnums[attnum] >= 0) |
| continue; |
| |
| defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1); |
| |
| if (defexpr != NULL) |
| { |
| /* Run the expression through planner */ |
| defexpr = expression_planner(defexpr); |
| |
| /* Initialize executable expression in copycontext */ |
| defexprs[num_defaults] = ExecInitExpr(defexpr, NULL); |
| defmap[num_defaults] = attnum; |
| num_defaults++; |
| } |
| } |
| |
| for (i = 0; i < num_defaults; i++) |
| slot->tts_values[defmap[i]] = |
| ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]); |
| } |
| |
| /* |
| * Store tuple data into slot. |
| * |
| * Incoming data can be either text or binary format. |
| */ |
| static void |
| slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, |
| LogicalRepTupleData *tupleData) |
| { |
| int natts = slot->tts_tupleDescriptor->natts; |
| int i; |
| |
| ExecClearTuple(slot); |
| |
| /* Call the "in" function for each non-dropped, non-null attribute */ |
| Assert(natts == rel->attrmap->maplen); |
| for (i = 0; i < natts; i++) |
| { |
| Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i); |
| int remoteattnum = rel->attrmap->attnums[i]; |
| |
| if (!att->attisdropped && remoteattnum >= 0) |
| { |
| StringInfo colvalue = &tupleData->colvalues[remoteattnum]; |
| |
| Assert(remoteattnum < tupleData->ncols); |
| |
| /* Set attnum for error callback */ |
| apply_error_callback_arg.remote_attnum = remoteattnum; |
| |
| if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) |
| { |
| Oid typinput; |
| Oid typioparam; |
| |
| getTypeInputInfo(att->atttypid, &typinput, &typioparam); |
| slot->tts_values[i] = |
| OidInputFunctionCall(typinput, colvalue->data, |
| typioparam, att->atttypmod); |
| slot->tts_isnull[i] = false; |
| } |
| else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) |
| { |
| Oid typreceive; |
| Oid typioparam; |
| |
| /* |
| * In some code paths we may be asked to re-parse the same |
| * tuple data. Reset the StringInfo's cursor so that works. |
| */ |
| colvalue->cursor = 0; |
| |
| getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); |
| slot->tts_values[i] = |
| OidReceiveFunctionCall(typreceive, colvalue, |
| typioparam, att->atttypmod); |
| |
| /* Trouble if it didn't eat the whole buffer */ |
| if (colvalue->cursor != colvalue->len) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), |
| errmsg("incorrect binary data format in logical replication column %d", |
| remoteattnum + 1))); |
| slot->tts_isnull[i] = false; |
| } |
| else |
| { |
| /* |
| * NULL value from remote. (We don't expect to see |
| * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as |
| * NULL.) |
| */ |
| slot->tts_values[i] = (Datum) 0; |
| slot->tts_isnull[i] = true; |
| } |
| |
| /* Reset attnum for error callback */ |
| apply_error_callback_arg.remote_attnum = -1; |
| } |
| else |
| { |
| /* |
| * We assign NULL to dropped attributes and missing values |
| * (missing values should be later filled using |
| * slot_fill_defaults). |
| */ |
| slot->tts_values[i] = (Datum) 0; |
| slot->tts_isnull[i] = true; |
| } |
| } |
| |
| ExecStoreVirtualTuple(slot); |
| } |
| |
| /* |
| * Replace updated columns with data from the LogicalRepTupleData struct. |
| * This is somewhat similar to heap_modify_tuple but also calls the type |
| * input functions on the user data. |
| * |
| * "slot" is filled with a copy of the tuple in "srcslot", replacing |
| * columns provided in "tupleData" and leaving others as-is. |
| * |
| * Caution: unreplaced pass-by-ref columns in "slot" will point into the |
| * storage for "srcslot". This is OK for current usage, but someday we may |
| * need to materialize "slot" at the end to make it independent of "srcslot". |
| */ |
| static void |
| slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, |
| LogicalRepRelMapEntry *rel, |
| LogicalRepTupleData *tupleData) |
| { |
| int natts = slot->tts_tupleDescriptor->natts; |
| int i; |
| |
| /* We'll fill "slot" with a virtual tuple, so we must start with ... */ |
| ExecClearTuple(slot); |
| |
| /* |
| * Copy all the column data from srcslot, so that we'll have valid values |
| * for unreplaced columns. |
| */ |
| Assert(natts == srcslot->tts_tupleDescriptor->natts); |
| slot_getallattrs(srcslot); |
| memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum)); |
| memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool)); |
| |
| /* Call the "in" function for each replaced attribute */ |
| Assert(natts == rel->attrmap->maplen); |
| for (i = 0; i < natts; i++) |
| { |
| Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i); |
| int remoteattnum = rel->attrmap->attnums[i]; |
| |
| if (remoteattnum < 0) |
| continue; |
| |
| Assert(remoteattnum < tupleData->ncols); |
| |
| if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) |
| { |
| StringInfo colvalue = &tupleData->colvalues[remoteattnum]; |
| |
| /* Set attnum for error callback */ |
| apply_error_callback_arg.remote_attnum = remoteattnum; |
| |
| if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) |
| { |
| Oid typinput; |
| Oid typioparam; |
| |
| getTypeInputInfo(att->atttypid, &typinput, &typioparam); |
| slot->tts_values[i] = |
| OidInputFunctionCall(typinput, colvalue->data, |
| typioparam, att->atttypmod); |
| slot->tts_isnull[i] = false; |
| } |
| else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) |
| { |
| Oid typreceive; |
| Oid typioparam; |
| |
| /* |
| * In some code paths we may be asked to re-parse the same |
| * tuple data. Reset the StringInfo's cursor so that works. |
| */ |
| colvalue->cursor = 0; |
| |
| getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); |
| slot->tts_values[i] = |
| OidReceiveFunctionCall(typreceive, colvalue, |
| typioparam, att->atttypmod); |
| |
| /* Trouble if it didn't eat the whole buffer */ |
| if (colvalue->cursor != colvalue->len) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), |
| errmsg("incorrect binary data format in logical replication column %d", |
| remoteattnum + 1))); |
| slot->tts_isnull[i] = false; |
| } |
| else |
| { |
| /* must be LOGICALREP_COLUMN_NULL */ |
| slot->tts_values[i] = (Datum) 0; |
| slot->tts_isnull[i] = true; |
| } |
| |
| /* Reset attnum for error callback */ |
| apply_error_callback_arg.remote_attnum = -1; |
| } |
| } |
| |
| /* And finally, declare that "slot" contains a valid virtual tuple */ |
| ExecStoreVirtualTuple(slot); |
| } |
| |
| /* |
| * Handle BEGIN message. |
| */ |
| static void |
| apply_handle_begin(StringInfo s) |
| { |
| LogicalRepBeginData begin_data; |
| |
| /* There must not be an active streaming transaction. */ |
| Assert(!TransactionIdIsValid(stream_xid)); |
| |
| logicalrep_read_begin(s, &begin_data); |
| set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); |
| |
| remote_final_lsn = begin_data.final_lsn; |
| |
| maybe_start_skipping_changes(begin_data.final_lsn); |
| |
| in_remote_transaction = true; |
| |
| pgstat_report_activity(STATE_RUNNING, NULL); |
| } |
| |
| /* |
| * Handle COMMIT message. |
| * |
| * TODO, support tracking of multiple origins |
| */ |
| static void |
| apply_handle_commit(StringInfo s) |
| { |
| LogicalRepCommitData commit_data; |
| |
| logicalrep_read_commit(s, &commit_data); |
| |
| if (commit_data.commit_lsn != remote_final_lsn) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)", |
| LSN_FORMAT_ARGS(commit_data.commit_lsn), |
| LSN_FORMAT_ARGS(remote_final_lsn)))); |
| |
| apply_handle_commit_internal(&commit_data); |
| |
| /* Process any tables that are being synchronized in parallel. */ |
| process_syncing_tables(commit_data.end_lsn); |
| |
| pgstat_report_activity(STATE_IDLE, NULL); |
| reset_apply_error_context_info(); |
| } |
| |
| /* |
| * Handle BEGIN PREPARE message. |
| */ |
| static void |
| apply_handle_begin_prepare(StringInfo s) |
| { |
| LogicalRepPreparedTxnData begin_data; |
| |
| /* Tablesync should never receive prepare. */ |
| if (am_tablesync_worker()) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); |
| |
| /* There must not be an active streaming transaction. */ |
| Assert(!TransactionIdIsValid(stream_xid)); |
| |
| logicalrep_read_begin_prepare(s, &begin_data); |
| set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn); |
| |
| remote_final_lsn = begin_data.prepare_lsn; |
| |
| maybe_start_skipping_changes(begin_data.prepare_lsn); |
| |
| in_remote_transaction = true; |
| |
| pgstat_report_activity(STATE_RUNNING, NULL); |
| } |
| |
| /* |
| * Common function to prepare the GID. |
| */ |
| static void |
| apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data) |
| { |
| char gid[GIDSIZE]; |
| |
| /* |
| * Compute unique GID for two_phase transactions. We don't use GID of |
| * prepared transaction sent by server as that can lead to deadlock when |
| * we have multiple subscriptions from same node point to publications on |
| * the same node. See comments atop worker.c |
| */ |
| TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid, |
| gid, sizeof(gid)); |
| |
| /* |
| * BeginTransactionBlock is necessary to balance the EndTransactionBlock |
| * called within the PrepareTransactionBlock below. |
| */ |
| if (!IsTransactionBlock()) |
| { |
| BeginTransactionBlock(); |
| CommitTransactionCommand(); /* Completes the preceding Begin command. */ |
| } |
| |
| /* |
| * Update origin state so we can restart streaming from correct position |
| * in case of crash. |
| */ |
| replorigin_session_origin_lsn = prepare_data->end_lsn; |
| replorigin_session_origin_timestamp = prepare_data->prepare_time; |
| |
| PrepareTransactionBlock(gid); |
| } |
| |
| /* |
| * Handle PREPARE message. |
| */ |
| static void |
| apply_handle_prepare(StringInfo s) |
| { |
| LogicalRepPreparedTxnData prepare_data; |
| |
| logicalrep_read_prepare(s, &prepare_data); |
| |
| if (prepare_data.prepare_lsn != remote_final_lsn) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)", |
| LSN_FORMAT_ARGS(prepare_data.prepare_lsn), |
| LSN_FORMAT_ARGS(remote_final_lsn)))); |
| |
| /* |
| * Unlike commit, here, we always prepare the transaction even though no |
| * change has happened in this transaction or all changes are skipped. It |
| * is done this way because at commit prepared time, we won't know whether |
| * we have skipped preparing a transaction because of those reasons. |
| * |
| * XXX, We can optimize such that at commit prepared time, we first check |
| * whether we have prepared the transaction or not but that doesn't seem |
| * worthwhile because such cases shouldn't be common. |
| */ |
| begin_replication_step(); |
| |
| apply_handle_prepare_internal(&prepare_data); |
| |
| end_replication_step(); |
| CommitTransactionCommand(); |
| pgstat_report_stat(false); |
| |
| store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); |
| |
| in_remote_transaction = false; |
| |
| /* Process any tables that are being synchronized in parallel. */ |
| process_syncing_tables(prepare_data.end_lsn); |
| |
| /* |
| * Since we have already prepared the transaction, in a case where the |
| * server crashes before clearing the subskiplsn, it will be left but the |
| * transaction won't be resent. But that's okay because it's a rare case |
| * and the subskiplsn will be cleared when finishing the next transaction. |
| */ |
| stop_skipping_changes(); |
| clear_subscription_skip_lsn(prepare_data.prepare_lsn); |
| |
| pgstat_report_activity(STATE_IDLE, NULL); |
| reset_apply_error_context_info(); |
| } |
| |
| /* |
| * Handle a COMMIT PREPARED of a previously PREPARED transaction. |
| * |
| * Note that we don't need to wait here if the transaction was prepared in a |
| * parallel apply worker. In that case, we have already waited for the prepare |
| * to finish in apply_handle_stream_prepare() which will ensure all the |
| * operations in that transaction have happened in the subscriber, so no |
| * concurrent transaction can cause deadlock or transaction dependency issues. |
| */ |
| static void |
| apply_handle_commit_prepared(StringInfo s) |
| { |
| LogicalRepCommitPreparedTxnData prepare_data; |
| char gid[GIDSIZE]; |
| |
| logicalrep_read_commit_prepared(s, &prepare_data); |
| set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn); |
| |
| /* Compute GID for two_phase transactions. */ |
| TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, |
| gid, sizeof(gid)); |
| |
| /* There is no transaction when COMMIT PREPARED is called */ |
| begin_replication_step(); |
| |
| /* |
| * Update origin state so we can restart streaming from correct position |
| * in case of crash. |
| */ |
| replorigin_session_origin_lsn = prepare_data.end_lsn; |
| replorigin_session_origin_timestamp = prepare_data.commit_time; |
| |
| FinishPreparedTransaction(gid, true, true); |
| end_replication_step(); |
| CommitTransactionCommand(); |
| pgstat_report_stat(false); |
| |
| store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); |
| in_remote_transaction = false; |
| |
| /* Process any tables that are being synchronized in parallel. */ |
| process_syncing_tables(prepare_data.end_lsn); |
| |
| clear_subscription_skip_lsn(prepare_data.end_lsn); |
| |
| pgstat_report_activity(STATE_IDLE, NULL); |
| reset_apply_error_context_info(); |
| } |
| |
| /* |
| * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION. |
| * |
| * Note that we don't need to wait here if the transaction was prepared in a |
| * parallel apply worker. In that case, we have already waited for the prepare |
| * to finish in apply_handle_stream_prepare() which will ensure all the |
| * operations in that transaction have happened in the subscriber, so no |
| * concurrent transaction can cause deadlock or transaction dependency issues. |
| */ |
| static void |
| apply_handle_rollback_prepared(StringInfo s) |
| { |
| LogicalRepRollbackPreparedTxnData rollback_data; |
| char gid[GIDSIZE]; |
| |
| logicalrep_read_rollback_prepared(s, &rollback_data); |
| set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn); |
| |
| /* Compute GID for two_phase transactions. */ |
| TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, |
| gid, sizeof(gid)); |
| |
| /* |
| * It is possible that we haven't received prepare because it occurred |
| * before walsender reached a consistent point or the two_phase was still |
| * not enabled by that time, so in such cases, we need to skip rollback |
| * prepared. |
| */ |
| if (LookupGXact(gid, rollback_data.prepare_end_lsn, |
| rollback_data.prepare_time)) |
| { |
| /* |
| * Update origin state so we can restart streaming from correct |
| * position in case of crash. |
| */ |
| replorigin_session_origin_lsn = rollback_data.rollback_end_lsn; |
| replorigin_session_origin_timestamp = rollback_data.rollback_time; |
| |
| /* There is no transaction when ABORT/ROLLBACK PREPARED is called */ |
| begin_replication_step(); |
| FinishPreparedTransaction(gid, false, true); |
| end_replication_step(); |
| CommitTransactionCommand(); |
| |
| clear_subscription_skip_lsn(rollback_data.rollback_end_lsn); |
| } |
| |
| pgstat_report_stat(false); |
| |
| store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd); |
| in_remote_transaction = false; |
| |
| /* Process any tables that are being synchronized in parallel. */ |
| process_syncing_tables(rollback_data.rollback_end_lsn); |
| |
| pgstat_report_activity(STATE_IDLE, NULL); |
| reset_apply_error_context_info(); |
| } |
| |
| /* |
| * Handle STREAM PREPARE. |
| */ |
| static void |
| apply_handle_stream_prepare(StringInfo s) |
| { |
| LogicalRepPreparedTxnData prepare_data; |
| ParallelApplyWorkerInfo *winfo; |
| TransApplyAction apply_action; |
| |
| /* Save the message before it is consumed. */ |
| StringInfoData original_msg = *s; |
| |
| if (in_streamed_transaction) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("STREAM PREPARE message without STREAM STOP"))); |
| |
| /* Tablesync should never receive prepare. */ |
| if (am_tablesync_worker()) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("tablesync worker received a STREAM PREPARE message"))); |
| |
| logicalrep_read_stream_prepare(s, &prepare_data); |
| set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn); |
| |
| apply_action = get_transaction_apply_action(prepare_data.xid, &winfo); |
| |
| switch (apply_action) |
| { |
| case TRANS_LEADER_APPLY: |
| |
| /* |
| * The transaction has been serialized to file, so replay all the |
| * spooled operations. |
| */ |
| apply_spooled_messages(MyLogicalRepWorker->stream_fileset, |
| prepare_data.xid, prepare_data.prepare_lsn); |
| |
| /* Mark the transaction as prepared. */ |
| apply_handle_prepare_internal(&prepare_data); |
| |
| CommitTransactionCommand(); |
| |
| store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); |
| |
| in_remote_transaction = false; |
| |
| /* Unlink the files with serialized changes and subxact info. */ |
| stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); |
| |
| elog(DEBUG1, "finished processing the STREAM PREPARE command"); |
| break; |
| |
| case TRANS_LEADER_SEND_TO_PARALLEL: |
| Assert(winfo); |
| |
| if (pa_send_data(winfo, s->len, s->data)) |
| { |
| /* Finish processing the streaming transaction. */ |
| pa_xact_finish(winfo, prepare_data.end_lsn); |
| break; |
| } |
| |
| /* |
| * Switch to serialize mode when we are not able to send the |
| * change to parallel apply worker. |
| */ |
| pa_switch_to_partial_serialize(winfo, true); |
| |
| /* fall through */ |
| case TRANS_LEADER_PARTIAL_SERIALIZE: |
| Assert(winfo); |
| |
| stream_open_and_write_change(prepare_data.xid, |
| LOGICAL_REP_MSG_STREAM_PREPARE, |
| &original_msg); |
| |
| pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); |
| |
| /* Finish processing the streaming transaction. */ |
| pa_xact_finish(winfo, prepare_data.end_lsn); |
| break; |
| |
| case TRANS_PARALLEL_APPLY: |
| |
| /* |
| * If the parallel apply worker is applying spooled messages then |
| * close the file before preparing. |
| */ |
| if (stream_fd) |
| stream_close_file(); |
| |
| begin_replication_step(); |
| |
| /* Mark the transaction as prepared. */ |
| apply_handle_prepare_internal(&prepare_data); |
| |
| end_replication_step(); |
| |
| CommitTransactionCommand(); |
| |
| MyParallelShared->last_commit_end = XactLastCommitEnd; |
| |
| pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED); |
| pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock); |
| |
| pa_reset_subtrans(); |
| |
| elog(DEBUG1, "finished processing the STREAM PREPARE command"); |
| break; |
| |
| default: |
| elog(ERROR, "unexpected apply action: %d", (int) apply_action); |
| break; |
| } |
| |
| pgstat_report_stat(false); |
| |
| /* Process any tables that are being synchronized in parallel. */ |
| process_syncing_tables(prepare_data.end_lsn); |
| |
| /* |
| * Similar to prepare case, the subskiplsn could be left in a case of |
| * server crash but it's okay. See the comments in apply_handle_prepare(). |
| */ |
| stop_skipping_changes(); |
| clear_subscription_skip_lsn(prepare_data.prepare_lsn); |
| |
| pgstat_report_activity(STATE_IDLE, NULL); |
| |
| reset_apply_error_context_info(); |
| } |
| |
| /* |
| * Handle ORIGIN message. |
| * |
| * TODO, support tracking of multiple origins |
| */ |
| static void |
| apply_handle_origin(StringInfo s) |
| { |
| /* |
| * ORIGIN message can only come inside streaming transaction or inside |
| * remote transaction and before any actual writes. |
| */ |
| if (!in_streamed_transaction && |
| (!in_remote_transaction || |
| (IsTransactionState() && !am_tablesync_worker()))) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("ORIGIN message sent out of order"))); |
| } |
| |
| /* |
| * Initialize fileset (if not already done). |
| * |
| * Create a new file when first_segment is true, otherwise open the existing |
| * file. |
| */ |
| void |
| stream_start_internal(TransactionId xid, bool first_segment) |
| { |
| begin_replication_step(); |
| |
| /* |
| * Initialize the worker's stream_fileset if we haven't yet. This will be |
| * used for the entire duration of the worker so create it in a permanent |
| * context. We create this on the very first streaming message from any |
| * transaction and then use it for this and other streaming transactions. |
| * Now, we could create a fileset at the start of the worker as well but |
| * then we won't be sure that it will ever be used. |
| */ |
| if (!MyLogicalRepWorker->stream_fileset) |
| { |
| MemoryContext oldctx; |
| |
| oldctx = MemoryContextSwitchTo(ApplyContext); |
| |
| MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); |
| FileSetInit(MyLogicalRepWorker->stream_fileset); |
| |
| MemoryContextSwitchTo(oldctx); |
| } |
| |
| /* Open the spool file for this transaction. */ |
| stream_open_file(MyLogicalRepWorker->subid, xid, first_segment); |
| |
| /* If this is not the first segment, open existing subxact file. */ |
| if (!first_segment) |
| subxact_info_read(MyLogicalRepWorker->subid, xid); |
| |
| end_replication_step(); |
| } |
| |
| /* |
| * Handle STREAM START message. |
| */ |
| static void |
| apply_handle_stream_start(StringInfo s) |
| { |
| bool first_segment; |
| ParallelApplyWorkerInfo *winfo; |
| TransApplyAction apply_action; |
| |
| /* Save the message before it is consumed. */ |
| StringInfoData original_msg = *s; |
| |
| if (in_streamed_transaction) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("duplicate STREAM START message"))); |
| |
| /* There must not be an active streaming transaction. */ |
| Assert(!TransactionIdIsValid(stream_xid)); |
| |
| /* notify handle methods we're processing a remote transaction */ |
| in_streamed_transaction = true; |
| |
| /* extract XID of the top-level transaction */ |
| stream_xid = logicalrep_read_stream_start(s, &first_segment); |
| |
| if (!TransactionIdIsValid(stream_xid)) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("invalid transaction ID in streamed replication transaction"))); |
| |
| set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr); |
| |
| /* Try to allocate a worker for the streaming transaction. */ |
| if (first_segment) |
| pa_allocate_worker(stream_xid); |
| |
| apply_action = get_transaction_apply_action(stream_xid, &winfo); |
| |
| switch (apply_action) |
| { |
| case TRANS_LEADER_SERIALIZE: |
| |
| /* |
| * Function stream_start_internal starts a transaction. This |
| * transaction will be committed on the stream stop unless it is a |
| * tablesync worker in which case it will be committed after |
| * processing all the messages. We need this transaction for |
| * handling the BufFile, used for serializing the streaming data |
| * and subxact info. |
| */ |
| stream_start_internal(stream_xid, first_segment); |
| break; |
| |
| case TRANS_LEADER_SEND_TO_PARALLEL: |
| Assert(winfo); |
| |
| /* |
| * Once we start serializing the changes, the parallel apply |
| * worker will wait for the leader to release the stream lock |
| * until the end of the transaction. So, we don't need to release |
| * the lock or increment the stream count in that case. |
| */ |
| if (pa_send_data(winfo, s->len, s->data)) |
| { |
| /* |
| * Unlock the shared object lock so that the parallel apply |
| * worker can continue to receive changes. |
| */ |
| if (!first_segment) |
| pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); |
| |
| /* |
| * Increment the number of streaming blocks waiting to be |
| * processed by parallel apply worker. |
| */ |
| pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1); |
| |
| /* Cache the parallel apply worker for this transaction. */ |
| pa_set_stream_apply_worker(winfo); |
| break; |
| } |
| |
| /* |
| * Switch to serialize mode when we are not able to send the |
| * change to parallel apply worker. |
| */ |
| pa_switch_to_partial_serialize(winfo, !first_segment); |
| |
| /* fall through */ |
| case TRANS_LEADER_PARTIAL_SERIALIZE: |
| Assert(winfo); |
| |
| /* |
| * Open the spool file unless it was already opened when switching |
| * to serialize mode. The transaction started in |
| * stream_start_internal will be committed on the stream stop. |
| */ |
| if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL) |
| stream_start_internal(stream_xid, first_segment); |
| |
| stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg); |
| |
| /* Cache the parallel apply worker for this transaction. */ |
| pa_set_stream_apply_worker(winfo); |
| break; |
| |
| case TRANS_PARALLEL_APPLY: |
| if (first_segment) |
| { |
| /* Hold the lock until the end of the transaction. */ |
| pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock); |
| pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED); |
| |
| /* |
| * Signal the leader apply worker, as it may be waiting for |
| * us. |
| */ |
| logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); |
| } |
| |
| parallel_stream_nchanges = 0; |
| break; |
| |
| default: |
| elog(ERROR, "unexpected apply action: %d", (int) apply_action); |
| break; |
| } |
| |
| pgstat_report_activity(STATE_RUNNING, NULL); |
| } |
| |
| /* |
| * Update the information about subxacts and close the file. |
| * |
| * This function should be called when the stream_start_internal function has |
| * been called. |
| */ |
| void |
| stream_stop_internal(TransactionId xid) |
| { |
| /* |
| * Serialize information about subxacts for the toplevel transaction, then |
| * close the stream messages spool file. |
| */ |
| subxact_info_write(MyLogicalRepWorker->subid, xid); |
| stream_close_file(); |
| |
| /* We must be in a valid transaction state */ |
| Assert(IsTransactionState()); |
| |
| /* Commit the per-stream transaction */ |
| CommitTransactionCommand(); |
| |
| /* Reset per-stream context */ |
| MemoryContextReset(LogicalStreamingContext); |
| } |
| |
| /* |
| * Handle STREAM STOP message. |
| */ |
| static void |
| apply_handle_stream_stop(StringInfo s) |
| { |
| ParallelApplyWorkerInfo *winfo; |
| TransApplyAction apply_action; |
| |
| if (!in_streamed_transaction) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("STREAM STOP message without STREAM START"))); |
| |
| apply_action = get_transaction_apply_action(stream_xid, &winfo); |
| |
| switch (apply_action) |
| { |
| case TRANS_LEADER_SERIALIZE: |
| stream_stop_internal(stream_xid); |
| break; |
| |
| case TRANS_LEADER_SEND_TO_PARALLEL: |
| Assert(winfo); |
| |
| /* |
| * Lock before sending the STREAM_STOP message so that the leader |
| * can hold the lock first and the parallel apply worker will wait |
| * for leader to release the lock. See Locking Considerations atop |
| * applyparallelworker.c. |
| */ |
| pa_lock_stream(winfo->shared->xid, AccessExclusiveLock); |
| |
| if (pa_send_data(winfo, s->len, s->data)) |
| { |
| pa_set_stream_apply_worker(NULL); |
| break; |
| } |
| |
| /* |
| * Switch to serialize mode when we are not able to send the |
| * change to parallel apply worker. |
| */ |
| pa_switch_to_partial_serialize(winfo, true); |
| |
| /* fall through */ |
| case TRANS_LEADER_PARTIAL_SERIALIZE: |
| stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s); |
| stream_stop_internal(stream_xid); |
| pa_set_stream_apply_worker(NULL); |
| break; |
| |
| case TRANS_PARALLEL_APPLY: |
| elog(DEBUG1, "applied %u changes in the streaming chunk", |
| parallel_stream_nchanges); |
| |
| /* |
| * By the time parallel apply worker is processing the changes in |
| * the current streaming block, the leader apply worker may have |
| * sent multiple streaming blocks. This can lead to parallel apply |
| * worker start waiting even when there are more chunk of streams |
| * in the queue. So, try to lock only if there is no message left |
| * in the queue. See Locking Considerations atop |
| * applyparallelworker.c. |
| * |
| * Note that here we have a race condition where we can start |
| * waiting even when there are pending streaming chunks. This can |
| * happen if the leader sends another streaming block and acquires |
| * the stream lock again after the parallel apply worker checks |
| * that there is no pending streaming block and before it actually |
| * starts waiting on a lock. We can handle this case by not |
| * allowing the leader to increment the stream block count during |
| * the time parallel apply worker acquires the lock but it is not |
| * clear whether that is worth the complexity. |
| * |
| * Now, if this missed chunk contains rollback to savepoint, then |
| * there is a risk of deadlock which probably shouldn't happen |
| * after restart. |
| */ |
| pa_decr_and_wait_stream_block(); |
| break; |
| |
| default: |
| elog(ERROR, "unexpected apply action: %d", (int) apply_action); |
| break; |
| } |
| |
| in_streamed_transaction = false; |
| stream_xid = InvalidTransactionId; |
| |
| /* |
| * The parallel apply worker could be in a transaction in which case we |
| * need to report the state as STATE_IDLEINTRANSACTION. |
| */ |
| if (IsTransactionOrTransactionBlock()) |
| pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL); |
| else |
| pgstat_report_activity(STATE_IDLE, NULL); |
| |
| reset_apply_error_context_info(); |
| } |
| |
| /* |
| * Helper function to handle STREAM ABORT message when the transaction was |
| * serialized to file. |
| */ |
| static void |
| stream_abort_internal(TransactionId xid, TransactionId subxid) |
| { |
| /* |
| * If the two XIDs are the same, it's in fact abort of toplevel xact, so |
| * just delete the files with serialized info. |
| */ |
| if (xid == subxid) |
| stream_cleanup_files(MyLogicalRepWorker->subid, xid); |
| else |
| { |
| /* |
| * OK, so it's a subxact. We need to read the subxact file for the |
| * toplevel transaction, determine the offset tracked for the subxact, |
| * and truncate the file with changes. We also remove the subxacts |
| * with higher offsets (or rather higher XIDs). |
| * |
| * We intentionally scan the array from the tail, because we're likely |
| * aborting a change for the most recent subtransactions. |
| * |
| * We can't use the binary search here as subxact XIDs won't |
| * necessarily arrive in sorted order, consider the case where we have |
| * released the savepoint for multiple subtransactions and then |
| * performed rollback to savepoint for one of the earlier |
| * sub-transaction. |
| */ |
| int64 i; |
| int64 subidx; |
| BufFile *fd; |
| bool found = false; |
| char path[MAXPGPATH]; |
| |
| subidx = -1; |
| begin_replication_step(); |
| subxact_info_read(MyLogicalRepWorker->subid, xid); |
| |
| for (i = subxact_data.nsubxacts; i > 0; i--) |
| { |
| if (subxact_data.subxacts[i - 1].xid == subxid) |
| { |
| subidx = (i - 1); |
| found = true; |
| break; |
| } |
| } |
| |
| /* |
| * If it's an empty sub-transaction then we will not find the subxid |
| * here so just cleanup the subxact info and return. |
| */ |
| if (!found) |
| { |
| /* Cleanup the subxact info */ |
| cleanup_subxact_info(); |
| end_replication_step(); |
| CommitTransactionCommand(); |
| return; |
| } |
| |
| /* open the changes file */ |
| changes_filename(path, MyLogicalRepWorker->subid, xid); |
| fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, |
| O_RDWR, false); |
| |
| /* OK, truncate the file at the right offset */ |
| BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno, |
| subxact_data.subxacts[subidx].offset); |
| BufFileClose(fd); |
| |
| /* discard the subxacts added later */ |
| subxact_data.nsubxacts = subidx; |
| |
| /* write the updated subxact list */ |
| subxact_info_write(MyLogicalRepWorker->subid, xid); |
| |
| end_replication_step(); |
| CommitTransactionCommand(); |
| } |
| } |
| |
| /* |
| * Handle STREAM ABORT message. |
| */ |
| static void |
| apply_handle_stream_abort(StringInfo s) |
| { |
| TransactionId xid; |
| TransactionId subxid; |
| LogicalRepStreamAbortData abort_data; |
| ParallelApplyWorkerInfo *winfo; |
| TransApplyAction apply_action; |
| |
| /* Save the message before it is consumed. */ |
| StringInfoData original_msg = *s; |
| bool toplevel_xact; |
| |
| if (in_streamed_transaction) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("STREAM ABORT message without STREAM STOP"))); |
| |
| /* We receive abort information only when we can apply in parallel. */ |
| logicalrep_read_stream_abort(s, &abort_data, |
| MyLogicalRepWorker->parallel_apply); |
| |
| xid = abort_data.xid; |
| subxid = abort_data.subxid; |
| toplevel_xact = (xid == subxid); |
| |
| set_apply_error_context_xact(subxid, abort_data.abort_lsn); |
| |
| apply_action = get_transaction_apply_action(xid, &winfo); |
| |
| switch (apply_action) |
| { |
| case TRANS_LEADER_APPLY: |
| |
| /* |
| * We are in the leader apply worker and the transaction has been |
| * serialized to file. |
| */ |
| stream_abort_internal(xid, subxid); |
| |
| elog(DEBUG1, "finished processing the STREAM ABORT command"); |
| break; |
| |
| case TRANS_LEADER_SEND_TO_PARALLEL: |
| Assert(winfo); |
| |
| /* |
| * For the case of aborting the subtransaction, we increment the |
| * number of streaming blocks and take the lock again before |
| * sending the STREAM_ABORT to ensure that the parallel apply |
| * worker will wait on the lock for the next set of changes after |
| * processing the STREAM_ABORT message if it is not already |
| * waiting for STREAM_STOP message. |
| * |
| * It is important to perform this locking before sending the |
| * STREAM_ABORT message so that the leader can hold the lock first |
| * and the parallel apply worker will wait for the leader to |
| * release the lock. This is the same as what we do in |
| * apply_handle_stream_stop. See Locking Considerations atop |
| * applyparallelworker.c. |
| */ |
| if (!toplevel_xact) |
| { |
| pa_unlock_stream(xid, AccessExclusiveLock); |
| pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1); |
| pa_lock_stream(xid, AccessExclusiveLock); |
| } |
| |
| if (pa_send_data(winfo, s->len, s->data)) |
| { |
| /* |
| * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to |
| * wait here for the parallel apply worker to finish as that |
| * is not required to maintain the commit order and won't have |
| * the risk of failures due to transaction dependencies and |
| * deadlocks. However, it is possible that before the parallel |
| * worker finishes and we clear the worker info, the xid |
| * wraparound happens on the upstream and a new transaction |
| * with the same xid can appear and that can lead to duplicate |
| * entries in ParallelApplyTxnHash. Yet another problem could |
| * be that we may have serialized the changes in partial |
| * serialize mode and the file containing xact changes may |
| * already exist, and after xid wraparound trying to create |
| * the file for the same xid can lead to an error. To avoid |
| * these problems, we decide to wait for the aborts to finish. |
| * |
| * Note, it is okay to not update the flush location position |
| * for aborts as in worst case that means such a transaction |
| * won't be sent again after restart. |
| */ |
| if (toplevel_xact) |
| pa_xact_finish(winfo, InvalidXLogRecPtr); |
| |
| break; |
| } |
| |
| /* |
| * Switch to serialize mode when we are not able to send the |
| * change to parallel apply worker. |
| */ |
| pa_switch_to_partial_serialize(winfo, true); |
| |
| /* fall through */ |
| case TRANS_LEADER_PARTIAL_SERIALIZE: |
| Assert(winfo); |
| |
| /* |
| * Parallel apply worker might have applied some changes, so write |
| * the STREAM_ABORT message so that it can rollback the |
| * subtransaction if needed. |
| */ |
| stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT, |
| &original_msg); |
| |
| if (toplevel_xact) |
| { |
| pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); |
| pa_xact_finish(winfo, InvalidXLogRecPtr); |
| } |
| break; |
| |
| case TRANS_PARALLEL_APPLY: |
| |
| /* |
| * If the parallel apply worker is applying spooled messages then |
| * close the file before aborting. |
| */ |
| if (toplevel_xact && stream_fd) |
| stream_close_file(); |
| |
| pa_stream_abort(&abort_data); |
| |
| /* |
| * We need to wait after processing rollback to savepoint for the |
| * next set of changes. |
| * |
| * We have a race condition here due to which we can start waiting |
| * here when there are more chunk of streams in the queue. See |
| * apply_handle_stream_stop. |
| */ |
| if (!toplevel_xact) |
| pa_decr_and_wait_stream_block(); |
| |
| elog(DEBUG1, "finished processing the STREAM ABORT command"); |
| break; |
| |
| default: |
| elog(ERROR, "unexpected apply action: %d", (int) apply_action); |
| break; |
| } |
| |
| reset_apply_error_context_info(); |
| } |
| |
| /* |
| * Ensure that the passed location is fileset's end. |
| */ |
| static void |
| ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, |
| off_t offset) |
| { |
| char path[MAXPGPATH]; |
| BufFile *fd; |
| int last_fileno; |
| off_t last_offset; |
| |
| Assert(!IsTransactionState()); |
| |
| begin_replication_step(); |
| |
| changes_filename(path, MyLogicalRepWorker->subid, xid); |
| |
| fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false); |
| |
| BufFileSeek(fd, 0, 0, SEEK_END); |
| BufFileTell(fd, &last_fileno, &last_offset); |
| |
| BufFileClose(fd); |
| |
| end_replication_step(); |
| |
| if (last_fileno != fileno || last_offset != offset) |
| elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"", |
| path); |
| } |
| |
| /* |
| * Common spoolfile processing. |
| */ |
| void |
| apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, |
| XLogRecPtr lsn) |
| { |
| StringInfoData s2; |
| int nchanges; |
| char path[MAXPGPATH]; |
| char *buffer = NULL; |
| MemoryContext oldcxt; |
| ResourceOwner oldowner; |
| int fileno; |
| off_t offset; |
| |
| if (!am_parallel_apply_worker()) |
| maybe_start_skipping_changes(lsn); |
| |
| /* Make sure we have an open transaction */ |
| begin_replication_step(); |
| |
| /* |
| * Allocate file handle and memory required to process all the messages in |
| * TopTransactionContext to avoid them getting reset after each message is |
| * processed. |
| */ |
| oldcxt = MemoryContextSwitchTo(TopTransactionContext); |
| |
| /* Open the spool file for the committed/prepared transaction */ |
| changes_filename(path, MyLogicalRepWorker->subid, xid); |
| elog(DEBUG1, "replaying changes from file \"%s\"", path); |
| |
| /* |
| * Make sure the file is owned by the toplevel transaction so that the |
| * file will not be accidentally closed when aborting a subtransaction. |
| */ |
| oldowner = CurrentResourceOwner; |
| CurrentResourceOwner = TopTransactionResourceOwner; |
| |
| stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false); |
| |
| CurrentResourceOwner = oldowner; |
| |
| buffer = palloc(BLCKSZ); |
| initStringInfo(&s2); |
| |
| MemoryContextSwitchTo(oldcxt); |
| |
| remote_final_lsn = lsn; |
| |
| /* |
| * Make sure the handle apply_dispatch methods are aware we're in a remote |
| * transaction. |
| */ |
| in_remote_transaction = true; |
| pgstat_report_activity(STATE_RUNNING, NULL); |
| |
| end_replication_step(); |
| |
| /* |
| * Read the entries one by one and pass them through the same logic as in |
| * apply_dispatch. |
| */ |
| nchanges = 0; |
| while (true) |
| { |
| size_t nbytes; |
| int len; |
| |
| CHECK_FOR_INTERRUPTS(); |
| |
| /* read length of the on-disk record */ |
| nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true); |
| |
| /* have we reached end of the file? */ |
| if (nbytes == 0) |
| break; |
| |
| /* do we have a correct length? */ |
| if (len <= 0) |
| elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"", |
| len, path); |
| |
| /* make sure we have sufficiently large buffer */ |
| buffer = repalloc(buffer, len); |
| |
| /* and finally read the data into the buffer */ |
| BufFileReadExact(stream_fd, buffer, len); |
| |
| BufFileTell(stream_fd, &fileno, &offset); |
| |
| /* copy the buffer to the stringinfo and call apply_dispatch */ |
| resetStringInfo(&s2); |
| appendBinaryStringInfo(&s2, buffer, len); |
| |
| /* Ensure we are reading the data into our memory context. */ |
| oldcxt = MemoryContextSwitchTo(ApplyMessageContext); |
| |
| apply_dispatch(&s2); |
| |
| MemoryContextReset(ApplyMessageContext); |
| |
| MemoryContextSwitchTo(oldcxt); |
| |
| nchanges++; |
| |
| /* |
| * It is possible the file has been closed because we have processed |
| * the transaction end message like stream_commit in which case that |
| * must be the last message. |
| */ |
| if (!stream_fd) |
| { |
| ensure_last_message(stream_fileset, xid, fileno, offset); |
| break; |
| } |
| |
| if (nchanges % 1000 == 0) |
| elog(DEBUG1, "replayed %d changes from file \"%s\"", |
| nchanges, path); |
| } |
| |
| if (stream_fd) |
| stream_close_file(); |
| |
| elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", |
| nchanges, path); |
| |
| return; |
| } |
| |
| /* |
| * Handle STREAM COMMIT message. |
| */ |
| static void |
| apply_handle_stream_commit(StringInfo s) |
| { |
| TransactionId xid; |
| LogicalRepCommitData commit_data; |
| ParallelApplyWorkerInfo *winfo; |
| TransApplyAction apply_action; |
| |
| /* Save the message before it is consumed. */ |
| StringInfoData original_msg = *s; |
| |
| if (in_streamed_transaction) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("STREAM COMMIT message without STREAM STOP"))); |
| |
| xid = logicalrep_read_stream_commit(s, &commit_data); |
| set_apply_error_context_xact(xid, commit_data.commit_lsn); |
| |
| apply_action = get_transaction_apply_action(xid, &winfo); |
| |
| switch (apply_action) |
| { |
| case TRANS_LEADER_APPLY: |
| |
| /* |
| * The transaction has been serialized to file, so replay all the |
| * spooled operations. |
| */ |
| apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, |
| commit_data.commit_lsn); |
| |
| apply_handle_commit_internal(&commit_data); |
| |
| /* Unlink the files with serialized changes and subxact info. */ |
| stream_cleanup_files(MyLogicalRepWorker->subid, xid); |
| |
| elog(DEBUG1, "finished processing the STREAM COMMIT command"); |
| break; |
| |
| case TRANS_LEADER_SEND_TO_PARALLEL: |
| Assert(winfo); |
| |
| if (pa_send_data(winfo, s->len, s->data)) |
| { |
| /* Finish processing the streaming transaction. */ |
| pa_xact_finish(winfo, commit_data.end_lsn); |
| break; |
| } |
| |
| /* |
| * Switch to serialize mode when we are not able to send the |
| * change to parallel apply worker. |
| */ |
| pa_switch_to_partial_serialize(winfo, true); |
| |
| /* fall through */ |
| case TRANS_LEADER_PARTIAL_SERIALIZE: |
| Assert(winfo); |
| |
| stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT, |
| &original_msg); |
| |
| pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); |
| |
| /* Finish processing the streaming transaction. */ |
| pa_xact_finish(winfo, commit_data.end_lsn); |
| break; |
| |
| case TRANS_PARALLEL_APPLY: |
| |
| /* |
| * If the parallel apply worker is applying spooled messages then |
| * close the file before committing. |
| */ |
| if (stream_fd) |
| stream_close_file(); |
| |
| apply_handle_commit_internal(&commit_data); |
| |
| MyParallelShared->last_commit_end = XactLastCommitEnd; |
| |
| /* |
| * It is important to set the transaction state as finished before |
| * releasing the lock. See pa_wait_for_xact_finish. |
| */ |
| pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED); |
| pa_unlock_transaction(xid, AccessExclusiveLock); |
| |
| pa_reset_subtrans(); |
| |
| elog(DEBUG1, "finished processing the STREAM COMMIT command"); |
| break; |
| |
| default: |
| elog(ERROR, "unexpected apply action: %d", (int) apply_action); |
| break; |
| } |
| |
| /* Process any tables that are being synchronized in parallel. */ |
| process_syncing_tables(commit_data.end_lsn); |
| |
| pgstat_report_activity(STATE_IDLE, NULL); |
| |
| reset_apply_error_context_info(); |
| } |
| |
| /* |
| * Helper function for apply_handle_commit and apply_handle_stream_commit. |
| */ |
| static void |
| apply_handle_commit_internal(LogicalRepCommitData *commit_data) |
| { |
| if (is_skipping_changes()) |
| { |
| stop_skipping_changes(); |
| |
| /* |
| * Start a new transaction to clear the subskiplsn, if not started |
| * yet. |
| */ |
| if (!IsTransactionState()) |
| StartTransactionCommand(); |
| } |
| |
| if (IsTransactionState()) |
| { |
| /* |
| * The transaction is either non-empty or skipped, so we clear the |
| * subskiplsn. |
| */ |
| clear_subscription_skip_lsn(commit_data->commit_lsn); |
| |
| /* |
| * Update origin state so we can restart streaming from correct |
| * position in case of crash. |
| */ |
| replorigin_session_origin_lsn = commit_data->end_lsn; |
| replorigin_session_origin_timestamp = commit_data->committime; |
| |
| CommitTransactionCommand(); |
| |
| if (IsTransactionBlock()) |
| { |
| EndTransactionBlock(false); |
| CommitTransactionCommand(); |
| } |
| |
| pgstat_report_stat(false); |
| |
| store_flush_position(commit_data->end_lsn, XactLastCommitEnd); |
| } |
| else |
| { |
| /* Process any invalidation messages that might have accumulated. */ |
| AcceptInvalidationMessages(); |
| maybe_reread_subscription(); |
| } |
| |
| in_remote_transaction = false; |
| } |
| |
| /* |
| * Handle RELATION message. |
| * |
| * Note we don't do validation against local schema here. The validation |
| * against local schema is postponed until first change for given relation |
| * comes as we only care about it when applying changes for it anyway and we |
| * do less locking this way. |
| */ |
| static void |
| apply_handle_relation(StringInfo s) |
| { |
| LogicalRepRelation *rel; |
| |
| if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s)) |
| return; |
| |
| rel = logicalrep_read_rel(s); |
| logicalrep_relmap_update(rel); |
| |
| /* Also reset all entries in the partition map that refer to remoterel. */ |
| logicalrep_partmap_reset_relmap(rel); |
| } |
| |
| /* |
| * Handle TYPE message. |
| * |
| * This implementation pays no attention to TYPE messages; we expect the user |
| * to have set things up so that the incoming data is acceptable to the input |
| * functions for the locally subscribed tables. Hence, we just read and |
| * discard the message. |
| */ |
| static void |
| apply_handle_type(StringInfo s) |
| { |
| LogicalRepTyp typ; |
| |
| if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s)) |
| return; |
| |
| logicalrep_read_typ(s, &typ); |
| } |
| |
| /* |
| * Check that we (the subscription owner) have sufficient privileges on the |
| * target relation to perform the given operation. |
| */ |
| static void |
| TargetPrivilegesCheck(Relation rel, AclMode mode) |
| { |
| Oid relid; |
| AclResult aclresult; |
| |
| relid = RelationGetRelid(rel); |
| aclresult = pg_class_aclcheck(relid, GetUserId(), mode); |
| if (aclresult != ACLCHECK_OK) |
| aclcheck_error(aclresult, |
| get_relkind_objtype(rel->rd_rel->relkind), |
| get_rel_name(relid)); |
| |
| /* |
| * We lack the infrastructure to honor RLS policies. It might be possible |
| * to add such infrastructure here, but tablesync workers lack it, too, so |
| * we don't bother. RLS does not ordinarily apply to TRUNCATE commands, |
| * but it seems dangerous to replicate a TRUNCATE and then refuse to |
| * replicate subsequent INSERTs, so we forbid all commands the same. |
| */ |
| if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"", |
| GetUserNameFromId(GetUserId(), true), |
| RelationGetRelationName(rel)))); |
| } |
| |
| /* |
| * Handle INSERT message. |
| */ |
| |
| static void |
| apply_handle_insert(StringInfo s) |
| { |
| LogicalRepRelMapEntry *rel; |
| LogicalRepTupleData newtup; |
| LogicalRepRelId relid; |
| UserContext ucxt; |
| ApplyExecutionData *edata; |
| EState *estate; |
| TupleTableSlot *remoteslot; |
| MemoryContext oldctx; |
| bool run_as_owner; |
| |
| /* |
| * Quick return if we are skipping data modification changes or handling |
| * streamed transactions. |
| */ |
| if (is_skipping_changes() || |
| handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) |
| return; |
| |
| begin_replication_step(); |
| |
| relid = logicalrep_read_insert(s, &newtup); |
| rel = logicalrep_rel_open(relid, RowExclusiveLock); |
| if (!should_apply_changes_for_rel(rel)) |
| { |
| /* |
| * The relation can't become interesting in the middle of the |
| * transaction so it's safe to unlock it. |
| */ |
| logicalrep_rel_close(rel, RowExclusiveLock); |
| end_replication_step(); |
| return; |
| } |
| |
| /* |
| * Make sure that any user-supplied code runs as the table owner, unless |
| * the user has opted out of that behavior. |
| */ |
| run_as_owner = MySubscription->runasowner; |
| if (!run_as_owner) |
| SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); |
| |
| /* Set relation for error callback */ |
| apply_error_callback_arg.rel = rel; |
| |
| /* Initialize the executor state. */ |
| edata = create_edata_for_relation(rel); |
| estate = edata->estate; |
| remoteslot = ExecInitExtraTupleSlot(estate, |
| RelationGetDescr(rel->localrel), |
| &TTSOpsVirtual); |
| |
| /* Process and store remote tuple in the slot */ |
| oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
| slot_store_data(remoteslot, rel, &newtup); |
| slot_fill_defaults(rel, estate, remoteslot); |
| MemoryContextSwitchTo(oldctx); |
| |
| /* For a partitioned table, insert the tuple into a partition. */ |
| if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
| apply_handle_tuple_routing(edata, |
| remoteslot, NULL, CMD_INSERT); |
| else |
| apply_handle_insert_internal(edata, edata->targetRelInfo, |
| remoteslot); |
| |
| finish_edata(edata); |
| |
| /* Reset relation for error callback */ |
| apply_error_callback_arg.rel = NULL; |
| |
| if (!run_as_owner) |
| RestoreUserContext(&ucxt); |
| |
| logicalrep_rel_close(rel, NoLock); |
| |
| end_replication_step(); |
| } |
| |
| /* |
| * Workhorse for apply_handle_insert() |
| * relinfo is for the relation we're actually inserting into |
| * (could be a child partition of edata->targetRelInfo) |
| */ |
| static void |
| apply_handle_insert_internal(ApplyExecutionData *edata, |
| ResultRelInfo *relinfo, |
| TupleTableSlot *remoteslot) |
| { |
| EState *estate = edata->estate; |
| |
| /* We must open indexes here. */ |
| ExecOpenIndices(relinfo, false); |
| |
| /* Do the insert. */ |
| TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); |
| ExecSimpleRelationInsert(relinfo, estate, remoteslot); |
| |
| /* Cleanup. */ |
| ExecCloseIndices(relinfo); |
| } |
| |
| /* |
| * Check if the logical replication relation is updatable and throw |
| * appropriate error if it isn't. |
| */ |
| static void |
| check_relation_updatable(LogicalRepRelMapEntry *rel) |
| { |
| /* |
| * For partitioned tables, we only need to care if the target partition is |
| * updatable (aka has PK or RI defined for it). |
| */ |
| if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
| return; |
| |
| /* Updatable, no error. */ |
| if (rel->updatable) |
| return; |
| |
| /* |
| * We are in error mode so it's fine this is somewhat slow. It's better to |
| * give user correct error. |
| */ |
| if (OidIsValid(GetRelationIdentityOrPK(rel->localrel))) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("publisher did not send replica identity column " |
| "expected by the logical replication target relation \"%s.%s\"", |
| rel->remoterel.nspname, rel->remoterel.relname))); |
| } |
| |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("logical replication target relation \"%s.%s\" has " |
| "neither REPLICA IDENTITY index nor PRIMARY " |
| "KEY and published relation does not have " |
| "REPLICA IDENTITY FULL", |
| rel->remoterel.nspname, rel->remoterel.relname))); |
| } |
| |
| /* |
| * Handle UPDATE message. |
| * |
| * TODO: FDW support |
| */ |
| static void |
| apply_handle_update(StringInfo s) |
| { |
| LogicalRepRelMapEntry *rel; |
| LogicalRepRelId relid; |
| UserContext ucxt; |
| ApplyExecutionData *edata; |
| EState *estate; |
| LogicalRepTupleData oldtup; |
| LogicalRepTupleData newtup; |
| bool has_oldtup; |
| TupleTableSlot *remoteslot; |
| RTEPermissionInfo *target_perminfo; |
| MemoryContext oldctx; |
| bool run_as_owner; |
| |
| /* |
| * Quick return if we are skipping data modification changes or handling |
| * streamed transactions. |
| */ |
| if (is_skipping_changes() || |
| handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) |
| return; |
| |
| begin_replication_step(); |
| |
| relid = logicalrep_read_update(s, &has_oldtup, &oldtup, |
| &newtup); |
| rel = logicalrep_rel_open(relid, RowExclusiveLock); |
| if (!should_apply_changes_for_rel(rel)) |
| { |
| /* |
| * The relation can't become interesting in the middle of the |
| * transaction so it's safe to unlock it. |
| */ |
| logicalrep_rel_close(rel, RowExclusiveLock); |
| end_replication_step(); |
| return; |
| } |
| |
| /* Set relation for error callback */ |
| apply_error_callback_arg.rel = rel; |
| |
| /* Check if we can do the update. */ |
| check_relation_updatable(rel); |
| |
| /* |
| * Make sure that any user-supplied code runs as the table owner, unless |
| * the user has opted out of that behavior. |
| */ |
| run_as_owner = MySubscription->runasowner; |
| if (!run_as_owner) |
| SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); |
| |
| /* Initialize the executor state. */ |
| edata = create_edata_for_relation(rel); |
| estate = edata->estate; |
| remoteslot = ExecInitExtraTupleSlot(estate, |
| RelationGetDescr(rel->localrel), |
| &TTSOpsVirtual); |
| |
| /* |
| * Populate updatedCols so that per-column triggers can fire, and so |
| * executor can correctly pass down indexUnchanged hint. This could |
| * include more columns than were actually changed on the publisher |
| * because the logical replication protocol doesn't contain that |
| * information. But it would for example exclude columns that only exist |
| * on the subscriber, since we are not touching those. |
| */ |
| target_perminfo = list_nth(estate->es_rteperminfos, 0); |
| for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) |
| { |
| Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i); |
| int remoteattnum = rel->attrmap->attnums[i]; |
| |
| if (!att->attisdropped && remoteattnum >= 0) |
| { |
| Assert(remoteattnum < newtup.ncols); |
| if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) |
| target_perminfo->updatedCols = |
| bms_add_member(target_perminfo->updatedCols, |
| i + 1 - FirstLowInvalidHeapAttributeNumber); |
| } |
| } |
| |
| /* Build the search tuple. */ |
| oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
| slot_store_data(remoteslot, rel, |
| has_oldtup ? &oldtup : &newtup); |
| MemoryContextSwitchTo(oldctx); |
| |
| /* For a partitioned table, apply update to correct partition. */ |
| if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
| apply_handle_tuple_routing(edata, |
| remoteslot, &newtup, CMD_UPDATE); |
| else |
| apply_handle_update_internal(edata, edata->targetRelInfo, |
| remoteslot, &newtup, rel->localindexoid); |
| |
| finish_edata(edata); |
| |
| /* Reset relation for error callback */ |
| apply_error_callback_arg.rel = NULL; |
| |
| if (!run_as_owner) |
| RestoreUserContext(&ucxt); |
| |
| logicalrep_rel_close(rel, NoLock); |
| |
| end_replication_step(); |
| } |
| |
| /* |
| * Workhorse for apply_handle_update() |
| * relinfo is for the relation we're actually updating in |
| * (could be a child partition of edata->targetRelInfo) |
| */ |
| static void |
| apply_handle_update_internal(ApplyExecutionData *edata, |
| ResultRelInfo *relinfo, |
| TupleTableSlot *remoteslot, |
| LogicalRepTupleData *newtup, |
| Oid localindexoid) |
| { |
| EState *estate = edata->estate; |
| LogicalRepRelMapEntry *relmapentry = edata->targetRel; |
| Relation localrel = relinfo->ri_RelationDesc; |
| EPQState epqstate; |
| TupleTableSlot *localslot; |
| bool found; |
| MemoryContext oldctx; |
| |
| EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); |
| ExecOpenIndices(relinfo, false); |
| |
| found = FindReplTupleInLocalRel(edata, localrel, |
| &relmapentry->remoterel, |
| localindexoid, |
| remoteslot, &localslot); |
| ExecClearTuple(remoteslot); |
| |
| /* |
| * Tuple found. |
| * |
| * Note this will fail if there are other conflicting unique indexes. |
| */ |
| if (found) |
| { |
| /* Process and store remote tuple in the slot */ |
| oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
| slot_modify_data(remoteslot, localslot, relmapentry, newtup); |
| MemoryContextSwitchTo(oldctx); |
| |
| EvalPlanQualSetSlot(&epqstate, remoteslot); |
| |
| /* Do the actual update. */ |
| TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE); |
| ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot, |
| remoteslot); |
| } |
| else |
| { |
| /* |
| * The tuple to be updated could not be found. Do nothing except for |
| * emitting a log message. |
| * |
| * XXX should this be promoted to ereport(LOG) perhaps? |
| */ |
| elog(DEBUG1, |
| "logical replication did not find row to be updated " |
| "in replication target relation \"%s\"", |
| RelationGetRelationName(localrel)); |
| } |
| |
| /* Cleanup. */ |
| ExecCloseIndices(relinfo); |
| EvalPlanQualEnd(&epqstate); |
| } |
| |
| /* |
| * Handle DELETE message. |
| * |
| * TODO: FDW support |
| */ |
| static void |
| apply_handle_delete(StringInfo s) |
| { |
| LogicalRepRelMapEntry *rel; |
| LogicalRepTupleData oldtup; |
| LogicalRepRelId relid; |
| UserContext ucxt; |
| ApplyExecutionData *edata; |
| EState *estate; |
| TupleTableSlot *remoteslot; |
| MemoryContext oldctx; |
| bool run_as_owner; |
| |
| /* |
| * Quick return if we are skipping data modification changes or handling |
| * streamed transactions. |
| */ |
| if (is_skipping_changes() || |
| handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) |
| return; |
| |
| begin_replication_step(); |
| |
| relid = logicalrep_read_delete(s, &oldtup); |
| rel = logicalrep_rel_open(relid, RowExclusiveLock); |
| if (!should_apply_changes_for_rel(rel)) |
| { |
| /* |
| * The relation can't become interesting in the middle of the |
| * transaction so it's safe to unlock it. |
| */ |
| logicalrep_rel_close(rel, RowExclusiveLock); |
| end_replication_step(); |
| return; |
| } |
| |
| /* Set relation for error callback */ |
| apply_error_callback_arg.rel = rel; |
| |
| /* Check if we can do the delete. */ |
| check_relation_updatable(rel); |
| |
| /* |
| * Make sure that any user-supplied code runs as the table owner, unless |
| * the user has opted out of that behavior. |
| */ |
| run_as_owner = MySubscription->runasowner; |
| if (!run_as_owner) |
| SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); |
| |
| /* Initialize the executor state. */ |
| edata = create_edata_for_relation(rel); |
| estate = edata->estate; |
| remoteslot = ExecInitExtraTupleSlot(estate, |
| RelationGetDescr(rel->localrel), |
| &TTSOpsVirtual); |
| |
| /* Build the search tuple. */ |
| oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
| slot_store_data(remoteslot, rel, &oldtup); |
| MemoryContextSwitchTo(oldctx); |
| |
| /* For a partitioned table, apply delete to correct partition. */ |
| if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
| apply_handle_tuple_routing(edata, |
| remoteslot, NULL, CMD_DELETE); |
| else |
| apply_handle_delete_internal(edata, edata->targetRelInfo, |
| remoteslot, rel->localindexoid); |
| |
| finish_edata(edata); |
| |
| /* Reset relation for error callback */ |
| apply_error_callback_arg.rel = NULL; |
| |
| if (!run_as_owner) |
| RestoreUserContext(&ucxt); |
| |
| logicalrep_rel_close(rel, NoLock); |
| |
| end_replication_step(); |
| } |
| |
| /* |
| * Workhorse for apply_handle_delete() |
| * relinfo is for the relation we're actually deleting from |
| * (could be a child partition of edata->targetRelInfo) |
| */ |
| static void |
| apply_handle_delete_internal(ApplyExecutionData *edata, |
| ResultRelInfo *relinfo, |
| TupleTableSlot *remoteslot, |
| Oid localindexoid) |
| { |
| EState *estate = edata->estate; |
| Relation localrel = relinfo->ri_RelationDesc; |
| LogicalRepRelation *remoterel = &edata->targetRel->remoterel; |
| EPQState epqstate; |
| TupleTableSlot *localslot; |
| bool found; |
| |
| EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); |
| ExecOpenIndices(relinfo, false); |
| |
| found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid, |
| remoteslot, &localslot); |
| |
| /* If found delete it. */ |
| if (found) |
| { |
| EvalPlanQualSetSlot(&epqstate, localslot); |
| |
| /* Do the actual delete. */ |
| TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE); |
| ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot); |
| } |
| else |
| { |
| /* |
| * The tuple to be deleted could not be found. Do nothing except for |
| * emitting a log message. |
| * |
| * XXX should this be promoted to ereport(LOG) perhaps? |
| */ |
| elog(DEBUG1, |
| "logical replication did not find row to be deleted " |
| "in replication target relation \"%s\"", |
| RelationGetRelationName(localrel)); |
| } |
| |
| /* Cleanup. */ |
| ExecCloseIndices(relinfo); |
| EvalPlanQualEnd(&epqstate); |
| } |
| |
| /* |
| * Try to find a tuple received from the publication side (in 'remoteslot') in |
| * the corresponding local relation using either replica identity index, |
| * primary key, index or if needed, sequential scan. |
| * |
| * Local tuple, if found, is returned in '*localslot'. |
| */ |
| static bool |
| FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, |
| LogicalRepRelation *remoterel, |
| Oid localidxoid, |
| TupleTableSlot *remoteslot, |
| TupleTableSlot **localslot) |
| { |
| EState *estate = edata->estate; |
| bool found; |
| |
| /* |
| * Regardless of the top-level operation, we're performing a read here, so |
| * check for SELECT privileges. |
| */ |
| TargetPrivilegesCheck(localrel, ACL_SELECT); |
| |
| *localslot = table_slot_create(localrel, &estate->es_tupleTable); |
| |
| Assert(OidIsValid(localidxoid) || |
| (remoterel->replident == REPLICA_IDENTITY_FULL)); |
| |
| if (OidIsValid(localidxoid)) |
| { |
| #ifdef USE_ASSERT_CHECKING |
| Relation idxrel = index_open(localidxoid, AccessShareLock); |
| |
| /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */ |
| Assert(GetRelationIdentityOrPK(idxrel) == localidxoid || |
| IsIndexUsableForReplicaIdentityFull(BuildIndexInfo(idxrel), |
| edata->targetRel->attrmap)); |
| index_close(idxrel, AccessShareLock); |
| #endif |
| |
| found = RelationFindReplTupleByIndex(localrel, localidxoid, |
| LockTupleExclusive, |
| remoteslot, *localslot); |
| } |
| else |
| found = RelationFindReplTupleSeq(localrel, LockTupleExclusive, |
| remoteslot, *localslot); |
| |
| return found; |
| } |
| |
| /* |
| * This handles insert, update, delete on a partitioned table. |
| */ |
| static void |
| apply_handle_tuple_routing(ApplyExecutionData *edata, |
| TupleTableSlot *remoteslot, |
| LogicalRepTupleData *newtup, |
| CmdType operation) |
| { |
| EState *estate = edata->estate; |
| LogicalRepRelMapEntry *relmapentry = edata->targetRel; |
| ResultRelInfo *relinfo = edata->targetRelInfo; |
| Relation parentrel = relinfo->ri_RelationDesc; |
| ModifyTableState *mtstate; |
| PartitionTupleRouting *proute; |
| ResultRelInfo *partrelinfo; |
| Relation partrel; |
| TupleTableSlot *remoteslot_part; |
| TupleConversionMap *map; |
| MemoryContext oldctx; |
| LogicalRepRelMapEntry *part_entry = NULL; |
| AttrMap *attrmap = NULL; |
| |
| /* ModifyTableState is needed for ExecFindPartition(). */ |
| edata->mtstate = mtstate = makeNode(ModifyTableState); |
| mtstate->ps.plan = NULL; |
| mtstate->ps.state = estate; |
| mtstate->operation = operation; |
| mtstate->resultRelInfo = relinfo; |
| |
| /* ... as is PartitionTupleRouting. */ |
| edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel); |
| |
| /* |
| * Find the partition to which the "search tuple" belongs. |
| */ |
| Assert(remoteslot != NULL); |
| oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
| partrelinfo = ExecFindPartition(mtstate, relinfo, proute, |
| remoteslot, estate); |
| Assert(partrelinfo != NULL); |
| partrel = partrelinfo->ri_RelationDesc; |
| |
| /* |
| * Check for supported relkind. We need this since partitions might be of |
| * unsupported relkinds; and the set of partitions can change, so checking |
| * at CREATE/ALTER SUBSCRIPTION would be insufficient. |
| */ |
| CheckSubscriptionRelkind(partrel->rd_rel->relkind, |
| get_namespace_name(RelationGetNamespace(partrel)), |
| RelationGetRelationName(partrel)); |
| |
| /* |
| * To perform any of the operations below, the tuple must match the |
| * partition's rowtype. Convert if needed or just copy, using a dedicated |
| * slot to store the tuple in any case. |
| */ |
| remoteslot_part = partrelinfo->ri_PartitionTupleSlot; |
| if (remoteslot_part == NULL) |
| remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable); |
| map = ExecGetRootToChildMap(partrelinfo, estate); |
| if (map != NULL) |
| { |
| attrmap = map->attrMap; |
| remoteslot_part = execute_attr_map_slot(attrmap, remoteslot, |
| remoteslot_part); |
| } |
| else |
| { |
| remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot); |
| slot_getallattrs(remoteslot_part); |
| } |
| MemoryContextSwitchTo(oldctx); |
| |
| /* Check if we can do the update or delete on the leaf partition. */ |
| if (operation == CMD_UPDATE || operation == CMD_DELETE) |
| { |
| part_entry = logicalrep_partition_open(relmapentry, partrel, |
| attrmap); |
| check_relation_updatable(part_entry); |
| } |
| |
| switch (operation) |
| { |
| case CMD_INSERT: |
| apply_handle_insert_internal(edata, partrelinfo, |
| remoteslot_part); |
| break; |
| |
| case CMD_DELETE: |
| apply_handle_delete_internal(edata, partrelinfo, |
| remoteslot_part, |
| part_entry->localindexoid); |
| break; |
| |
| case CMD_UPDATE: |
| |
| /* |
| * For UPDATE, depending on whether or not the updated tuple |
| * satisfies the partition's constraint, perform a simple UPDATE |
| * of the partition or move the updated tuple into a different |
| * suitable partition. |
| */ |
| { |
| TupleTableSlot *localslot; |
| ResultRelInfo *partrelinfo_new; |
| Relation partrel_new; |
| bool found; |
| |
| /* Get the matching local tuple from the partition. */ |
| found = FindReplTupleInLocalRel(edata, partrel, |
| &part_entry->remoterel, |
| part_entry->localindexoid, |
| remoteslot_part, &localslot); |
| if (!found) |
| { |
| /* |
| * The tuple to be updated could not be found. Do nothing |
| * except for emitting a log message. |
| * |
| * XXX should this be promoted to ereport(LOG) perhaps? |
| */ |
| elog(DEBUG1, |
| "logical replication did not find row to be updated " |
| "in replication target relation's partition \"%s\"", |
| RelationGetRelationName(partrel)); |
| return; |
| } |
| |
| /* |
| * Apply the update to the local tuple, putting the result in |
| * remoteslot_part. |
| */ |
| oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
| slot_modify_data(remoteslot_part, localslot, part_entry, |
| newtup); |
| MemoryContextSwitchTo(oldctx); |
| |
| /* |
| * Does the updated tuple still satisfy the current |
| * partition's constraint? |
| */ |
| if (!partrel->rd_rel->relispartition || |
| ExecPartitionCheck(partrelinfo, remoteslot_part, estate, |
| false)) |
| { |
| /* |
| * Yes, so simply UPDATE the partition. We don't call |
| * apply_handle_update_internal() here, which would |
| * normally do the following work, to avoid repeating some |
| * work already done above to find the local tuple in the |
| * partition. |
| */ |
| EPQState epqstate; |
| |
| EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); |
| ExecOpenIndices(partrelinfo, false); |
| |
| EvalPlanQualSetSlot(&epqstate, remoteslot_part); |
| TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, |
| ACL_UPDATE); |
| ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate, |
| localslot, remoteslot_part); |
| ExecCloseIndices(partrelinfo); |
| EvalPlanQualEnd(&epqstate); |
| } |
| else |
| { |
| /* Move the tuple into the new partition. */ |
| |
| /* |
| * New partition will be found using tuple routing, which |
| * can only occur via the parent table. We might need to |
| * convert the tuple to the parent's rowtype. Note that |
| * this is the tuple found in the partition, not the |
| * original search tuple received by this function. |
| */ |
| if (map) |
| { |
| TupleConversionMap *PartitionToRootMap = |
| convert_tuples_by_name(RelationGetDescr(partrel), |
| RelationGetDescr(parentrel)); |
| |
| remoteslot = |
| execute_attr_map_slot(PartitionToRootMap->attrMap, |
| remoteslot_part, remoteslot); |
| } |
| else |
| { |
| remoteslot = ExecCopySlot(remoteslot, remoteslot_part); |
| slot_getallattrs(remoteslot); |
| } |
| |
| /* Find the new partition. */ |
| oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
| partrelinfo_new = ExecFindPartition(mtstate, relinfo, |
| proute, remoteslot, |
| estate); |
| MemoryContextSwitchTo(oldctx); |
| Assert(partrelinfo_new != partrelinfo); |
| partrel_new = partrelinfo_new->ri_RelationDesc; |
| |
| /* Check that new partition also has supported relkind. */ |
| CheckSubscriptionRelkind(partrel_new->rd_rel->relkind, |
| get_namespace_name(RelationGetNamespace(partrel_new)), |
| RelationGetRelationName(partrel_new)); |
| |
| /* DELETE old tuple found in the old partition. */ |
| apply_handle_delete_internal(edata, partrelinfo, |
| localslot, |
| part_entry->localindexoid); |
| |
| /* INSERT new tuple into the new partition. */ |
| |
| /* |
| * Convert the replacement tuple to match the destination |
| * partition rowtype. |
| */ |
| oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
| remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot; |
| if (remoteslot_part == NULL) |
| remoteslot_part = table_slot_create(partrel_new, |
| &estate->es_tupleTable); |
| map = ExecGetRootToChildMap(partrelinfo_new, estate); |
| if (map != NULL) |
| { |
| remoteslot_part = execute_attr_map_slot(map->attrMap, |
| remoteslot, |
| remoteslot_part); |
| } |
| else |
| { |
| remoteslot_part = ExecCopySlot(remoteslot_part, |
| remoteslot); |
| slot_getallattrs(remoteslot); |
| } |
| MemoryContextSwitchTo(oldctx); |
| apply_handle_insert_internal(edata, partrelinfo_new, |
| remoteslot_part); |
| } |
| } |
| break; |
| |
| default: |
| elog(ERROR, "unrecognized CmdType: %d", (int) operation); |
| break; |
| } |
| } |
| |
| /* |
| * Handle TRUNCATE message. |
| * |
| * TODO: FDW support |
| */ |
| static void |
| apply_handle_truncate(StringInfo s) |
| { |
| bool cascade = false; |
| bool restart_seqs = false; |
| List *remote_relids = NIL; |
| List *remote_rels = NIL; |
| List *rels = NIL; |
| List *part_rels = NIL; |
| List *relids = NIL; |
| List *relids_logged = NIL; |
| ListCell *lc; |
| LOCKMODE lockmode = AccessExclusiveLock; |
| |
| /* |
| * Quick return if we are skipping data modification changes or handling |
| * streamed transactions. |
| */ |
| if (is_skipping_changes() || |
| handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) |
| return; |
| |
| begin_replication_step(); |
| |
| remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); |
| |
| foreach(lc, remote_relids) |
| { |
| LogicalRepRelId relid = lfirst_oid(lc); |
| LogicalRepRelMapEntry *rel; |
| |
| rel = logicalrep_rel_open(relid, lockmode); |
| if (!should_apply_changes_for_rel(rel)) |
| { |
| /* |
| * The relation can't become interesting in the middle of the |
| * transaction so it's safe to unlock it. |
| */ |
| logicalrep_rel_close(rel, lockmode); |
| continue; |
| } |
| |
| remote_rels = lappend(remote_rels, rel); |
| TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE); |
| rels = lappend(rels, rel->localrel); |
| relids = lappend_oid(relids, rel->localreloid); |
| if (RelationIsLogicallyLogged(rel->localrel)) |
| relids_logged = lappend_oid(relids_logged, rel->localreloid); |
| |
| /* |
| * Truncate partitions if we got a message to truncate a partitioned |
| * table. |
| */ |
| if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
| { |
| ListCell *child; |
| List *children = find_all_inheritors(rel->localreloid, |
| lockmode, |
| NULL); |
| |
| foreach(child, children) |
| { |
| Oid childrelid = lfirst_oid(child); |
| Relation childrel; |
| |
| if (list_member_oid(relids, childrelid)) |
| continue; |
| |
| /* find_all_inheritors already got lock */ |
| childrel = table_open(childrelid, NoLock); |
| |
| /* |
| * Ignore temp tables of other backends. See similar code in |
| * ExecuteTruncate(). |
| */ |
| if (RELATION_IS_OTHER_TEMP(childrel)) |
| { |
| table_close(childrel, lockmode); |
| continue; |
| } |
| |
| TargetPrivilegesCheck(childrel, ACL_TRUNCATE); |
| rels = lappend(rels, childrel); |
| part_rels = lappend(part_rels, childrel); |
| relids = lappend_oid(relids, childrelid); |
| /* Log this relation only if needed for logical decoding */ |
| if (RelationIsLogicallyLogged(childrel)) |
| relids_logged = lappend_oid(relids_logged, childrelid); |
| } |
| } |
| } |
| |
| /* |
| * Even if we used CASCADE on the upstream primary we explicitly default |
| * to replaying changes without further cascading. This might be later |
| * changeable with a user specified option. |
| * |
| * MySubscription->runasowner tells us whether we want to execute |
| * replication actions as the subscription owner; the last argument to |
| * TruncateGuts tells it whether we want to switch to the table owner. |
| * Those are exactly opposite conditions. |
| */ |
| ExecuteTruncateGuts(rels, |
| relids, |
| relids_logged, |
| DROP_RESTRICT, |
| restart_seqs, |
| !MySubscription->runasowner, |
| NULL); |
| foreach(lc, remote_rels) |
| { |
| LogicalRepRelMapEntry *rel = lfirst(lc); |
| |
| logicalrep_rel_close(rel, NoLock); |
| } |
| foreach(lc, part_rels) |
| { |
| Relation rel = lfirst(lc); |
| |
| table_close(rel, NoLock); |
| } |
| |
| end_replication_step(); |
| } |
| |
| |
| /* |
| * Logical replication protocol message dispatcher. |
| */ |
| void |
| apply_dispatch(StringInfo s) |
| { |
| LogicalRepMsgType action = pq_getmsgbyte(s); |
| LogicalRepMsgType saved_command; |
| |
| /* |
| * Set the current command being applied. Since this function can be |
| * called recursively when applying spooled changes, save the current |
| * command. |
| */ |
| saved_command = apply_error_callback_arg.command; |
| apply_error_callback_arg.command = action; |
| |
| switch (action) |
| { |
| case LOGICAL_REP_MSG_BEGIN: |
| apply_handle_begin(s); |
| break; |
| |
| case LOGICAL_REP_MSG_COMMIT: |
| apply_handle_commit(s); |
| break; |
| |
| case LOGICAL_REP_MSG_INSERT: |
| apply_handle_insert(s); |
| break; |
| |
| case LOGICAL_REP_MSG_UPDATE: |
| apply_handle_update(s); |
| break; |
| |
| case LOGICAL_REP_MSG_DELETE: |
| apply_handle_delete(s); |
| break; |
| |
| case LOGICAL_REP_MSG_TRUNCATE: |
| apply_handle_truncate(s); |
| break; |
| |
| case LOGICAL_REP_MSG_RELATION: |
| apply_handle_relation(s); |
| break; |
| |
| case LOGICAL_REP_MSG_TYPE: |
| apply_handle_type(s); |
| break; |
| |
| case LOGICAL_REP_MSG_ORIGIN: |
| apply_handle_origin(s); |
| break; |
| |
| case LOGICAL_REP_MSG_MESSAGE: |
| |
| /* |
| * Logical replication does not use generic logical messages yet. |
| * Although, it could be used by other applications that use this |
| * output plugin. |
| */ |
| break; |
| |
| case LOGICAL_REP_MSG_STREAM_START: |
| apply_handle_stream_start(s); |
| break; |
| |
| case LOGICAL_REP_MSG_STREAM_STOP: |
| apply_handle_stream_stop(s); |
| break; |
| |
| case LOGICAL_REP_MSG_STREAM_ABORT: |
| apply_handle_stream_abort(s); |
| break; |
| |
| case LOGICAL_REP_MSG_STREAM_COMMIT: |
| apply_handle_stream_commit(s); |
| break; |
| |
| case LOGICAL_REP_MSG_BEGIN_PREPARE: |
| apply_handle_begin_prepare(s); |
| break; |
| |
| case LOGICAL_REP_MSG_PREPARE: |
| apply_handle_prepare(s); |
| break; |
| |
| case LOGICAL_REP_MSG_COMMIT_PREPARED: |
| apply_handle_commit_prepared(s); |
| break; |
| |
| case LOGICAL_REP_MSG_ROLLBACK_PREPARED: |
| apply_handle_rollback_prepared(s); |
| break; |
| |
| case LOGICAL_REP_MSG_STREAM_PREPARE: |
| apply_handle_stream_prepare(s); |
| break; |
| |
| default: |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg("invalid logical replication message type \"??? (%d)\"", action))); |
| } |
| |
| /* Reset the current command */ |
| apply_error_callback_arg.command = saved_command; |
| } |
| |
| /* |
| * Figure out which write/flush positions to report to the walsender process. |
| * |
| * We can't simply report back the last LSN the walsender sent us because the |
| * local transaction might not yet be flushed to disk locally. Instead we |
| * build a list that associates local with remote LSNs for every commit. When |
| * reporting back the flush position to the sender we iterate that list and |
| * check which entries on it are already locally flushed. Those we can report |
| * as having been flushed. |
| * |
| * The have_pending_txes is true if there are outstanding transactions that |
| * need to be flushed. |
| */ |
| static void |
| get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, |
| bool *have_pending_txes) |
| { |
| dlist_mutable_iter iter; |
| XLogRecPtr local_flush = GetFlushRecPtr(NULL); |
| |
| *write = InvalidXLogRecPtr; |
| *flush = InvalidXLogRecPtr; |
| |
| dlist_foreach_modify(iter, &lsn_mapping) |
| { |
| FlushPosition *pos = |
| dlist_container(FlushPosition, node, iter.cur); |
| |
| *write = pos->remote_end; |
| |
| if (pos->local_end <= local_flush) |
| { |
| *flush = pos->remote_end; |
| dlist_delete(iter.cur); |
| pfree(pos); |
| } |
| else |
| { |
| /* |
| * Don't want to uselessly iterate over the rest of the list which |
| * could potentially be long. Instead get the last element and |
| * grab the write position from there. |
| */ |
| pos = dlist_tail_element(FlushPosition, node, |
| &lsn_mapping); |
| *write = pos->remote_end; |
| *have_pending_txes = true; |
| return; |
| } |
| } |
| |
| *have_pending_txes = !dlist_is_empty(&lsn_mapping); |
| } |
| |
| /* |
| * Store current remote/local lsn pair in the tracking list. |
| */ |
| void |
| store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn) |
| { |
| FlushPosition *flushpos; |
| |
| /* |
| * Skip for parallel apply workers, because the lsn_mapping is maintained |
| * by the leader apply worker. |
| */ |
| if (am_parallel_apply_worker()) |
| return; |
| |
| /* Need to do this in permanent context */ |
| MemoryContextSwitchTo(ApplyContext); |
| |
| /* Track commit lsn */ |
| flushpos = (FlushPosition *) palloc(sizeof(FlushPosition)); |
| flushpos->local_end = local_lsn; |
| flushpos->remote_end = remote_lsn; |
| |
| dlist_push_tail(&lsn_mapping, &flushpos->node); |
| MemoryContextSwitchTo(ApplyMessageContext); |
| } |
| |
| |
| /* Update statistics of the worker. */ |
| static void |
| UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) |
| { |
| MyLogicalRepWorker->last_lsn = last_lsn; |
| MyLogicalRepWorker->last_send_time = send_time; |
| MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp(); |
| if (reply) |
| { |
| MyLogicalRepWorker->reply_lsn = last_lsn; |
| MyLogicalRepWorker->reply_time = send_time; |
| } |
| } |
| |
| /* |
| * Apply main loop. |
| */ |
| static void |
| LogicalRepApplyLoop(XLogRecPtr last_received) |
| { |
| TimestampTz last_recv_timestamp = GetCurrentTimestamp(); |
| bool ping_sent = false; |
| TimeLineID tli; |
| ErrorContextCallback errcallback; |
| |
| /* |
| * Init the ApplyMessageContext which we clean up after each replication |
| * protocol message. |
| */ |
| ApplyMessageContext = AllocSetContextCreate(ApplyContext, |
| "ApplyMessageContext", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| /* |
| * This memory context is used for per-stream data when the streaming mode |
| * is enabled. This context is reset on each stream stop. |
| */ |
| LogicalStreamingContext = AllocSetContextCreate(ApplyContext, |
| "LogicalStreamingContext", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| /* mark as idle, before starting to loop */ |
| pgstat_report_activity(STATE_IDLE, NULL); |
| |
| /* |
| * Push apply error context callback. Fields will be filled while applying |
| * a change. |
| */ |
| errcallback.callback = apply_error_callback; |
| errcallback.previous = error_context_stack; |
| error_context_stack = &errcallback; |
| apply_error_context_stack = error_context_stack; |
| |
| /* This outer loop iterates once per wait. */ |
| for (;;) |
| { |
| pgsocket fd = PGINVALID_SOCKET; |
| int rc; |
| int len; |
| char *buf = NULL; |
| bool endofstream = false; |
| long wait_time; |
| |
| CHECK_FOR_INTERRUPTS(); |
| |
| MemoryContextSwitchTo(ApplyMessageContext); |
| |
| len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); |
| |
| if (len != 0) |
| { |
| /* Loop to process all available data (without blocking). */ |
| for (;;) |
| { |
| CHECK_FOR_INTERRUPTS(); |
| |
| if (len == 0) |
| { |
| break; |
| } |
| else if (len < 0) |
| { |
| ereport(LOG, |
| (errmsg("data stream from publisher has ended"))); |
| endofstream = true; |
| break; |
| } |
| else |
| { |
| int c; |
| StringInfoData s; |
| |
| if (ConfigReloadPending) |
| { |
| ConfigReloadPending = false; |
| ProcessConfigFile(PGC_SIGHUP); |
| } |
| |
| /* Reset timeout. */ |
| last_recv_timestamp = GetCurrentTimestamp(); |
| ping_sent = false; |
| |
| /* Ensure we are reading the data into our memory context. */ |
| MemoryContextSwitchTo(ApplyMessageContext); |
| |
| s.data = buf; |
| s.len = len; |
| s.cursor = 0; |
| s.maxlen = -1; |
| |
| c = pq_getmsgbyte(&s); |
| |
| if (c == 'w') |
| { |
| XLogRecPtr start_lsn; |
| XLogRecPtr end_lsn; |
| TimestampTz send_time; |
| |
| start_lsn = pq_getmsgint64(&s); |
| end_lsn = pq_getmsgint64(&s); |
| send_time = pq_getmsgint64(&s); |
| |
| if (last_received < start_lsn) |
| last_received = start_lsn; |
| |
| if (last_received < end_lsn) |
| last_received = end_lsn; |
| |
| UpdateWorkerStats(last_received, send_time, false); |
| |
| apply_dispatch(&s); |
| } |
| else if (c == 'k') |
| { |
| XLogRecPtr end_lsn; |
| TimestampTz timestamp; |
| bool reply_requested; |
| |
| end_lsn = pq_getmsgint64(&s); |
| timestamp = pq_getmsgint64(&s); |
| reply_requested = pq_getmsgbyte(&s); |
| |
| if (last_received < end_lsn) |
| last_received = end_lsn; |
| |
| send_feedback(last_received, reply_requested, false); |
| UpdateWorkerStats(last_received, timestamp, true); |
| } |
| /* other message types are purposefully ignored */ |
| |
| MemoryContextReset(ApplyMessageContext); |
| } |
| |
| len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); |
| } |
| } |
| |
| /* confirm all writes so far */ |
| send_feedback(last_received, false, false); |
| |
| if (!in_remote_transaction && !in_streamed_transaction) |
| { |
| /* |
| * If we didn't get any transactions for a while there might be |
| * unconsumed invalidation messages in the queue, consume them |
| * now. |
| */ |
| AcceptInvalidationMessages(); |
| maybe_reread_subscription(); |
| |
| /* Process any table synchronization changes. */ |
| process_syncing_tables(last_received); |
| } |
| |
| /* Cleanup the memory. */ |
| MemoryContextResetAndDeleteChildren(ApplyMessageContext); |
| MemoryContextSwitchTo(TopMemoryContext); |
| |
| /* Check if we need to exit the streaming loop. */ |
| if (endofstream) |
| break; |
| |
| /* |
| * Wait for more data or latch. If we have unflushed transactions, |
| * wake up after WalWriterDelay to see if they've been flushed yet (in |
| * which case we should send a feedback message). Otherwise, there's |
| * no particular urgency about waking up unless we get data or a |
| * signal. |
| */ |
| if (!dlist_is_empty(&lsn_mapping)) |
| wait_time = WalWriterDelay; |
| else |
| wait_time = NAPTIME_PER_CYCLE; |
| |
| rc = WaitLatchOrSocket(MyLatch, |
| WL_SOCKET_READABLE | WL_LATCH_SET | |
| WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
| fd, wait_time, |
| WAIT_EVENT_LOGICAL_APPLY_MAIN); |
| |
| if (rc & WL_LATCH_SET) |
| { |
| ResetLatch(MyLatch); |
| CHECK_FOR_INTERRUPTS(); |
| } |
| |
| if (ConfigReloadPending) |
| { |
| ConfigReloadPending = false; |
| ProcessConfigFile(PGC_SIGHUP); |
| } |
| |
| if (rc & WL_TIMEOUT) |
| { |
| /* |
| * We didn't receive anything new. If we haven't heard anything |
| * from the server for more than wal_receiver_timeout / 2, ping |
| * the server. Also, if it's been longer than |
| * wal_receiver_status_interval since the last update we sent, |
| * send a status update to the primary anyway, to report any |
| * progress in applying WAL. |
| */ |
| bool requestReply = false; |
| |
| /* |
| * Check if time since last receive from primary has reached the |
| * configured limit. |
| */ |
| if (wal_receiver_timeout > 0) |
| { |
| TimestampTz now = GetCurrentTimestamp(); |
| TimestampTz timeout; |
| |
| timeout = |
| TimestampTzPlusMilliseconds(last_recv_timestamp, |
| wal_receiver_timeout); |
| |
| if (now >= timeout) |
| ereport(ERROR, |
| (errcode(ERRCODE_CONNECTION_FAILURE), |
| errmsg("terminating logical replication worker due to timeout"))); |
| |
| /* Check to see if it's time for a ping. */ |
| if (!ping_sent) |
| { |
| timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, |
| (wal_receiver_timeout / 2)); |
| if (now >= timeout) |
| { |
| requestReply = true; |
| ping_sent = true; |
| } |
| } |
| } |
| |
| send_feedback(last_received, requestReply, requestReply); |
| |
| /* |
| * Force reporting to ensure long idle periods don't lead to |
| * arbitrarily delayed stats. Stats can only be reported outside |
| * of (implicit or explicit) transactions. That shouldn't lead to |
| * stats being delayed for long, because transactions are either |
| * sent as a whole on commit or streamed. Streamed transactions |
| * are spilled to disk and applied on commit. |
| */ |
| if (!IsTransactionState()) |
| pgstat_report_stat(true); |
| } |
| } |
| |
| /* Pop the error context stack */ |
| error_context_stack = errcallback.previous; |
| apply_error_context_stack = error_context_stack; |
| |
| /* All done */ |
| walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); |
| } |
| |
| /* |
| * Send a Standby Status Update message to server. |
| * |
| * 'recvpos' is the latest LSN we've received data to, force is set if we need |
| * to send a response to avoid timeouts. |
| */ |
| static void |
| send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) |
| { |
| static StringInfo reply_message = NULL; |
| static TimestampTz send_time = 0; |
| |
| static XLogRecPtr last_recvpos = InvalidXLogRecPtr; |
| static XLogRecPtr last_writepos = InvalidXLogRecPtr; |
| static XLogRecPtr last_flushpos = InvalidXLogRecPtr; |
| |
| XLogRecPtr writepos; |
| XLogRecPtr flushpos; |
| TimestampTz now; |
| bool have_pending_txes; |
| |
| /* |
| * If the user doesn't want status to be reported to the publisher, be |
| * sure to exit before doing anything at all. |
| */ |
| if (!force && wal_receiver_status_interval <= 0) |
| return; |
| |
| /* It's legal to not pass a recvpos */ |
| if (recvpos < last_recvpos) |
| recvpos = last_recvpos; |
| |
| get_flush_position(&writepos, &flushpos, &have_pending_txes); |
| |
| /* |
| * No outstanding transactions to flush, we can report the latest received |
| * position. This is important for synchronous replication. |
| */ |
| if (!have_pending_txes) |
| flushpos = writepos = recvpos; |
| |
| if (writepos < last_writepos) |
| writepos = last_writepos; |
| |
| if (flushpos < last_flushpos) |
| flushpos = last_flushpos; |
| |
| now = GetCurrentTimestamp(); |
| |
| /* if we've already reported everything we're good */ |
| if (!force && |
| writepos == last_writepos && |
| flushpos == last_flushpos && |
| !TimestampDifferenceExceeds(send_time, now, |
| wal_receiver_status_interval * 1000)) |
| return; |
| send_time = now; |
| |
| if (!reply_message) |
| { |
| MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); |
| |
| reply_message = makeStringInfo(); |
| MemoryContextSwitchTo(oldctx); |
| } |
| else |
| resetStringInfo(reply_message); |
| |
| pq_sendbyte(reply_message, 'r'); |
| pq_sendint64(reply_message, recvpos); /* write */ |
| pq_sendint64(reply_message, flushpos); /* flush */ |
| pq_sendint64(reply_message, writepos); /* apply */ |
| pq_sendint64(reply_message, now); /* sendTime */ |
| pq_sendbyte(reply_message, requestReply); /* replyRequested */ |
| |
| elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X", |
| force, |
| LSN_FORMAT_ARGS(recvpos), |
| LSN_FORMAT_ARGS(writepos), |
| LSN_FORMAT_ARGS(flushpos)); |
| |
| walrcv_send(LogRepWorkerWalRcvConn, |
| reply_message->data, reply_message->len); |
| |
| if (recvpos > last_recvpos) |
| last_recvpos = recvpos; |
| if (writepos > last_writepos) |
| last_writepos = writepos; |
| if (flushpos > last_flushpos) |
| last_flushpos = flushpos; |
| } |
| |
| /* |
| * Exit routine for apply workers due to subscription parameter changes. |
| */ |
| static void |
| apply_worker_exit(void) |
| { |
| if (am_parallel_apply_worker()) |
| { |
| /* |
| * Don't stop the parallel apply worker as the leader will detect the |
| * subscription parameter change and restart logical replication later |
| * anyway. This also prevents the leader from reporting errors when |
| * trying to communicate with a stopped parallel apply worker, which |
| * would accidentally disable subscriptions if disable_on_error was |
| * set. |
| */ |
| return; |
| } |
| |
| /* |
| * Reset the last-start time for this apply worker so that the launcher |
| * will restart it without waiting for wal_retrieve_retry_interval if the |
| * subscription is still active, and so that we won't leak that hash table |
| * entry if it isn't. |
| */ |
| if (!am_tablesync_worker()) |
| ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); |
| |
| proc_exit(0); |
| } |
| |
| /* |
| * Reread subscription info if needed. Most changes will be exit. |
| */ |
| void |
| maybe_reread_subscription(void) |
| { |
| MemoryContext oldctx; |
| Subscription *newsub; |
| bool started_tx = false; |
| |
| /* When cache state is valid there is nothing to do here. */ |
| if (MySubscriptionValid) |
| return; |
| |
| /* This function might be called inside or outside of transaction. */ |
| if (!IsTransactionState()) |
| { |
| StartTransactionCommand(); |
| started_tx = true; |
| } |
| |
| /* Ensure allocations in permanent context. */ |
| oldctx = MemoryContextSwitchTo(ApplyContext); |
| |
| newsub = GetSubscription(MyLogicalRepWorker->subid, true); |
| |
| /* |
| * Exit if the subscription was removed. This normally should not happen |
| * as the worker gets killed during DROP SUBSCRIPTION. |
| */ |
| if (!newsub) |
| { |
| ereport(LOG, |
| (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed", |
| MySubscription->name))); |
| |
| /* Ensure we remove no-longer-useful entry for worker's start time */ |
| if (!am_tablesync_worker() && !am_parallel_apply_worker()) |
| ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); |
| proc_exit(0); |
| } |
| |
| /* Exit if the subscription was disabled. */ |
| if (!newsub->enabled) |
| { |
| ereport(LOG, |
| (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled", |
| MySubscription->name))); |
| |
| apply_worker_exit(); |
| } |
| |
| /* !slotname should never happen when enabled is true. */ |
| Assert(newsub->slotname); |
| |
| /* two-phase should not be altered */ |
| Assert(newsub->twophasestate == MySubscription->twophasestate); |
| |
| /* |
| * Exit if any parameter that affects the remote connection was changed. |
| * The launcher will start a new worker but note that the parallel apply |
| * worker won't restart if the streaming option's value is changed from |
| * 'parallel' to any other value or the server decides not to stream the |
| * in-progress transaction. |
| */ |
| if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 || |
| strcmp(newsub->name, MySubscription->name) != 0 || |
| strcmp(newsub->slotname, MySubscription->slotname) != 0 || |
| newsub->binary != MySubscription->binary || |
| newsub->stream != MySubscription->stream || |
| newsub->passwordrequired != MySubscription->passwordrequired || |
| strcmp(newsub->origin, MySubscription->origin) != 0 || |
| newsub->owner != MySubscription->owner || |
| !equal(newsub->publications, MySubscription->publications)) |
| { |
| if (am_parallel_apply_worker()) |
| ereport(LOG, |
| (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change", |
| MySubscription->name))); |
| else |
| ereport(LOG, |
| (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change", |
| MySubscription->name))); |
| |
| apply_worker_exit(); |
| } |
| |
| /* Check for other changes that should never happen too. */ |
| if (newsub->dbid != MySubscription->dbid) |
| { |
| elog(ERROR, "subscription %u changed unexpectedly", |
| MyLogicalRepWorker->subid); |
| } |
| |
| /* Clean old subscription info and switch to new one. */ |
| FreeSubscription(MySubscription); |
| MySubscription = newsub; |
| |
| MemoryContextSwitchTo(oldctx); |
| |
| /* Change synchronous commit according to the user's wishes */ |
| SetConfigOption("synchronous_commit", MySubscription->synccommit, |
| PGC_BACKEND, PGC_S_OVERRIDE); |
| |
| if (started_tx) |
| CommitTransactionCommand(); |
| |
| MySubscriptionValid = true; |
| } |
| |
| /* |
| * Callback from subscription syscache invalidation. |
| */ |
| static void |
| subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) |
| { |
| MySubscriptionValid = false; |
| } |
| |
| /* |
| * subxact_info_write |
| * Store information about subxacts for a toplevel transaction. |
| * |
| * For each subxact we store offset of it's first change in the main file. |
| * The file is always over-written as a whole. |
| * |
| * XXX We should only store subxacts that were not aborted yet. |
| */ |
| static void |
| subxact_info_write(Oid subid, TransactionId xid) |
| { |
| char path[MAXPGPATH]; |
| Size len; |
| BufFile *fd; |
| workfile_set *work_set; |
| |
| Assert(TransactionIdIsValid(xid)); |
| |
| /* construct the subxact filename */ |
| subxact_filename(path, subid, xid); |
| |
| /* Delete the subxacts file, if exists. */ |
| if (subxact_data.nsubxacts == 0) |
| { |
| cleanup_subxact_info(); |
| BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true); |
| |
| return; |
| } |
| |
| /* |
| * Create the subxact file if it not already created, otherwise open the |
| * existing file. |
| */ |
| fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR, |
| true); |
| if (fd == NULL) |
| { |
| work_set = workfile_mgr_create_set("Subxact", path, false /* hold pin */); |
| fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path, work_set); |
| |
| } |
| |
| len = sizeof(SubXactInfo) * subxact_data.nsubxacts; |
| |
| /* Write the subxact count and subxact info */ |
| BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts)); |
| BufFileWrite(fd, subxact_data.subxacts, len); |
| |
| BufFileClose(fd); |
| |
| /* free the memory allocated for subxact info */ |
| cleanup_subxact_info(); |
| } |
| |
| /* |
| * subxact_info_read |
| * Restore information about subxacts of a streamed transaction. |
| * |
| * Read information about subxacts into the structure subxact_data that can be |
| * used later. |
| */ |
| static void |
| subxact_info_read(Oid subid, TransactionId xid) |
| { |
| char path[MAXPGPATH]; |
| Size len; |
| BufFile *fd; |
| MemoryContext oldctx; |
| |
| Assert(!subxact_data.subxacts); |
| Assert(subxact_data.nsubxacts == 0); |
| Assert(subxact_data.nsubxacts_max == 0); |
| |
| /* |
| * If the subxact file doesn't exist that means we don't have any subxact |
| * info. |
| */ |
| subxact_filename(path, subid, xid); |
| fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, |
| true); |
| if (fd == NULL) |
| return; |
| |
| /* read number of subxact items */ |
| BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts)); |
| |
| len = sizeof(SubXactInfo) * subxact_data.nsubxacts; |
| |
| /* we keep the maximum as a power of 2 */ |
| subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts); |
| |
| /* |
| * Allocate subxact information in the logical streaming context. We need |
| * this information during the complete stream so that we can add the sub |
| * transaction info to this. On stream stop we will flush this information |
| * to the subxact file and reset the logical streaming context. |
| */ |
| oldctx = MemoryContextSwitchTo(LogicalStreamingContext); |
| subxact_data.subxacts = palloc(subxact_data.nsubxacts_max * |
| sizeof(SubXactInfo)); |
| MemoryContextSwitchTo(oldctx); |
| |
| if (len > 0) |
| BufFileReadExact(fd, subxact_data.subxacts, len); |
| |
| BufFileClose(fd); |
| } |
| |
| /* |
| * subxact_info_add |
| * Add information about a subxact (offset in the main file). |
| */ |
| static void |
| subxact_info_add(TransactionId xid) |
| { |
| SubXactInfo *subxacts = subxact_data.subxacts; |
| int64 i; |
| |
| /* We must have a valid top level stream xid and a stream fd. */ |
| Assert(TransactionIdIsValid(stream_xid)); |
| Assert(stream_fd != NULL); |
| |
| /* |
| * If the XID matches the toplevel transaction, we don't want to add it. |
| */ |
| if (stream_xid == xid) |
| return; |
| |
| /* |
| * In most cases we're checking the same subxact as we've already seen in |
| * the last call, so make sure to ignore it (this change comes later). |
| */ |
| if (subxact_data.subxact_last == xid) |
| return; |
| |
| /* OK, remember we're processing this XID. */ |
| subxact_data.subxact_last = xid; |
| |
| /* |
| * Check if the transaction is already present in the array of subxact. We |
| * intentionally scan the array from the tail, because we're likely adding |
| * a change for the most recent subtransactions. |
| * |
| * XXX Can we rely on the subxact XIDs arriving in sorted order? That |
| * would allow us to use binary search here. |
| */ |
| for (i = subxact_data.nsubxacts; i > 0; i--) |
| { |
| /* found, so we're done */ |
| if (subxacts[i - 1].xid == xid) |
| return; |
| } |
| |
| /* This is a new subxact, so we need to add it to the array. */ |
| if (subxact_data.nsubxacts == 0) |
| { |
| MemoryContext oldctx; |
| |
| subxact_data.nsubxacts_max = 128; |
| |
| /* |
| * Allocate this memory for subxacts in per-stream context, see |
| * subxact_info_read. |
| */ |
| oldctx = MemoryContextSwitchTo(LogicalStreamingContext); |
| subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo)); |
| MemoryContextSwitchTo(oldctx); |
| } |
| else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max) |
| { |
| subxact_data.nsubxacts_max *= 2; |
| subxacts = repalloc(subxacts, |
| subxact_data.nsubxacts_max * sizeof(SubXactInfo)); |
| } |
| |
| subxacts[subxact_data.nsubxacts].xid = xid; |
| |
| /* |
| * Get the current offset of the stream file and store it as offset of |
| * this subxact. |
| */ |
| BufFileTell(stream_fd, |
| &subxacts[subxact_data.nsubxacts].fileno, |
| &subxacts[subxact_data.nsubxacts].offset); |
| |
| subxact_data.nsubxacts++; |
| subxact_data.subxacts = subxacts; |
| } |
| |
| /* format filename for file containing the info about subxacts */ |
| static inline void |
| subxact_filename(char *path, Oid subid, TransactionId xid) |
| { |
| snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid); |
| } |
| |
| /* format filename for file containing serialized changes */ |
| static inline void |
| changes_filename(char *path, Oid subid, TransactionId xid) |
| { |
| snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid); |
| } |
| |
| /* |
| * stream_cleanup_files |
| * Cleanup files for a subscription / toplevel transaction. |
| * |
| * Remove files with serialized changes and subxact info for a particular |
| * toplevel transaction. Each subscription has a separate set of files |
| * for any toplevel transaction. |
| */ |
| void |
| stream_cleanup_files(Oid subid, TransactionId xid) |
| { |
| char path[MAXPGPATH]; |
| |
| /* Delete the changes file. */ |
| changes_filename(path, subid, xid); |
| BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false); |
| |
| /* Delete the subxact file, if it exists. */ |
| subxact_filename(path, subid, xid); |
| BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true); |
| } |
| |
| /* |
| * stream_open_file |
| * Open a file that we'll use to serialize changes for a toplevel |
| * transaction. |
| * |
| * Open a file for streamed changes from a toplevel transaction identified |
| * by stream_xid (global variable). If it's the first chunk of streamed |
| * changes for this transaction, create the buffile, otherwise open the |
| * previously created file. |
| */ |
| static void |
| stream_open_file(Oid subid, TransactionId xid, bool first_segment) |
| { |
| char path[MAXPGPATH]; |
| MemoryContext oldcxt; |
| |
| Assert(OidIsValid(subid)); |
| Assert(TransactionIdIsValid(xid)); |
| Assert(stream_fd == NULL); |
| |
| |
| changes_filename(path, subid, xid); |
| elog(DEBUG1, "opening file \"%s\" for streamed changes", path); |
| |
| /* |
| * Create/open the buffiles under the logical streaming context so that we |
| * have those files until stream stop. |
| */ |
| oldcxt = MemoryContextSwitchTo(LogicalStreamingContext); |
| |
| /* |
| * If this is the first streamed segment, create the changes file. |
| * Otherwise, just open the file for writing, in append mode. |
| */ |
| if (first_segment) |
| { |
| workfile_set *work_set; |
| |
| work_set = workfile_mgr_create_set("LogicalStreaming", path, false /* hold pin */); |
| stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, |
| path, work_set); |
| } |
| else |
| { |
| /* |
| * Open the file and seek to the end of the file because we always |
| * append the changes file. |
| */ |
| stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, |
| path, O_RDWR, false); |
| BufFileSeek(stream_fd, 0, 0, SEEK_END); |
| } |
| |
| MemoryContextSwitchTo(oldcxt); |
| } |
| |
| /* |
| * stream_close_file |
| * Close the currently open file with streamed changes. |
| */ |
| static void |
| stream_close_file(void) |
| { |
| Assert(stream_fd != NULL); |
| |
| BufFileClose(stream_fd); |
| |
| stream_fd = NULL; |
| } |
| |
| /* |
| * stream_write_change |
| * Serialize a change to a file for the current toplevel transaction. |
| * |
| * The change is serialized in a simple format, with length (not including |
| * the length), action code (identifying the message type) and message |
| * contents (without the subxact TransactionId value). |
| */ |
| static void |
| stream_write_change(char action, StringInfo s) |
| { |
| int len; |
| |
| Assert(stream_fd != NULL); |
| |
| /* total on-disk size, including the action type character */ |
| len = (s->len - s->cursor) + sizeof(char); |
| |
| /* first write the size */ |
| BufFileWrite(stream_fd, &len, sizeof(len)); |
| |
| /* then the action */ |
| BufFileWrite(stream_fd, &action, sizeof(action)); |
| |
| /* and finally the remaining part of the buffer (after the XID) */ |
| len = (s->len - s->cursor); |
| |
| BufFileWrite(stream_fd, &s->data[s->cursor], len); |
| } |
| |
| /* |
| * stream_open_and_write_change |
| * Serialize a message to a file for the given transaction. |
| * |
| * This function is similar to stream_write_change except that it will open the |
| * target file if not already before writing the message and close the file at |
| * the end. |
| */ |
| static void |
| stream_open_and_write_change(TransactionId xid, char action, StringInfo s) |
| { |
| Assert(!in_streamed_transaction); |
| |
| if (!stream_fd) |
| stream_start_internal(xid, false); |
| |
| stream_write_change(action, s); |
| stream_stop_internal(xid); |
| } |
| |
| /* |
| * Cleanup the memory for subxacts and reset the related variables. |
| */ |
| static inline void |
| cleanup_subxact_info() |
| { |
| if (subxact_data.subxacts) |
| pfree(subxact_data.subxacts); |
| |
| subxact_data.subxacts = NULL; |
| subxact_data.subxact_last = InvalidTransactionId; |
| subxact_data.nsubxacts = 0; |
| subxact_data.nsubxacts_max = 0; |
| } |
| |
| /* |
| * Form the prepared transaction GID for two_phase transactions. |
| * |
| * Return the GID in the supplied buffer. |
| */ |
| static void |
| TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid) |
| { |
| Assert(subid != InvalidRepOriginId); |
| |
| if (!TransactionIdIsValid(xid)) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROTOCOL_VIOLATION), |
| errmsg_internal("invalid two-phase transaction ID"))); |
| |
| snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid); |
| } |
| |
| /* |
| * Execute the initial sync with error handling. Disable the subscription, |
| * if it's required. |
| * |
| * Allocate the slot name in long-lived context on return. Note that we don't |
| * handle FATAL errors which are probably because of system resource error and |
| * are not repeatable. |
| */ |
| static void |
| start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) |
| { |
| char *syncslotname = NULL; |
| |
| Assert(am_tablesync_worker()); |
| |
| PG_TRY(); |
| { |
| /* Call initial sync. */ |
| syncslotname = LogicalRepSyncTableStart(origin_startpos); |
| } |
| PG_CATCH(); |
| { |
| if (MySubscription->disableonerr) |
| DisableSubscriptionAndExit(); |
| else |
| { |
| /* |
| * Report the worker failed during table synchronization. Abort |
| * the current transaction so that the stats message is sent in an |
| * idle state. |
| */ |
| AbortOutOfAnyTransaction(); |
| pgstat_report_subscription_error(MySubscription->oid, false); |
| |
| PG_RE_THROW(); |
| } |
| } |
| PG_END_TRY(); |
| |
| /* allocate slot name in long-lived context */ |
| *myslotname = MemoryContextStrdup(ApplyContext, syncslotname); |
| pfree(syncslotname); |
| } |
| |
| /* |
| * Reset the origin state. |
| */ |
| static void |
| replorigin_reset(int code, Datum arg) |
| { |
| replorigin_session_origin = InvalidRepOriginId; |
| replorigin_session_origin_lsn = InvalidXLogRecPtr; |
| replorigin_session_origin_timestamp = 0; |
| } |
| |
| /* |
| * Run the apply loop with error handling. Disable the subscription, |
| * if necessary. |
| * |
| * Note that we don't handle FATAL errors which are probably because |
| * of system resource error and are not repeatable. |
| */ |
| static void |
| start_apply(XLogRecPtr origin_startpos) |
| { |
| PG_TRY(); |
| { |
| LogicalRepApplyLoop(origin_startpos); |
| } |
| PG_CATCH(); |
| { |
| /* |
| * Reset the origin state to prevent the advancement of origin |
| * progress if we fail to apply. Otherwise, this will result in |
| * transaction loss as that transaction won't be sent again by the |
| * server. |
| */ |
| replorigin_reset(0, (Datum) 0); |
| |
| if (MySubscription->disableonerr) |
| DisableSubscriptionAndExit(); |
| else |
| { |
| /* |
| * Report the worker failed while applying changes. Abort the |
| * current transaction so that the stats message is sent in an |
| * idle state. |
| */ |
| AbortOutOfAnyTransaction(); |
| pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); |
| |
| PG_RE_THROW(); |
| } |
| } |
| PG_END_TRY(); |
| } |
| |
| /* |
| * Common initialization for leader apply worker and parallel apply worker. |
| * |
| * Initialize the database connection, in-memory subscription and necessary |
| * config options. |
| */ |
| void |
| InitializeApplyWorker(void) |
| { |
| MemoryContext oldctx; |
| /* |
| * In GPDB, we build libpqwalreceiver functions, as well as a copy of |
| * libpq into the backend itself, to support QD-QE communication. See |
| * src/backend/libpq. |
| */ |
| if (!WalReceiverFunctions) |
| libpqwalreceiver_PG_init(); |
| |
| /* Run as replica session replication role. */ |
| SetConfigOption("session_replication_role", "replica", |
| PGC_SUSET, PGC_S_OVERRIDE); |
| |
| /* Connect to our database. */ |
| BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid, |
| MyLogicalRepWorker->userid, |
| 0); |
| |
| /* |
| * Set always-secure search path, so malicious users can't redirect user |
| * code (e.g. pg_index.indexprs). |
| */ |
| SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE); |
| |
| /* Load the subscription into persistent memory context. */ |
| ApplyContext = AllocSetContextCreate(TopMemoryContext, |
| "ApplyContext", |
| ALLOCSET_DEFAULT_SIZES); |
| StartTransactionCommand(); |
| oldctx = MemoryContextSwitchTo(ApplyContext); |
| |
| MySubscription = GetSubscription(MyLogicalRepWorker->subid, true); |
| if (!MySubscription) |
| { |
| ereport(LOG, |
| (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup", |
| MyLogicalRepWorker->subid))); |
| |
| /* Ensure we remove no-longer-useful entry for worker's start time */ |
| if (!am_tablesync_worker() && !am_parallel_apply_worker()) |
| ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); |
| proc_exit(0); |
| } |
| |
| MySubscriptionValid = true; |
| MemoryContextSwitchTo(oldctx); |
| |
| if (!MySubscription->enabled) |
| { |
| ereport(LOG, |
| (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup", |
| MySubscription->name))); |
| |
| apply_worker_exit(); |
| } |
| |
| /* Setup synchronous commit according to the user's wishes */ |
| SetConfigOption("synchronous_commit", MySubscription->synccommit, |
| PGC_BACKEND, PGC_S_OVERRIDE); |
| |
| /* Keep us informed about subscription changes. */ |
| CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, |
| subscription_change_cb, |
| (Datum) 0); |
| |
| if (am_tablesync_worker()) |
| ereport(LOG, |
| (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", |
| MySubscription->name, |
| get_rel_name(MyLogicalRepWorker->relid)))); |
| else |
| ereport(LOG, |
| (errmsg("logical replication apply worker for subscription \"%s\" has started", |
| MySubscription->name))); |
| |
| CommitTransactionCommand(); |
| } |
| |
| /* Logical Replication Apply worker entry point */ |
| void |
| ApplyWorkerMain(Datum main_arg) |
| { |
| int worker_slot = DatumGetInt32(main_arg); |
| char originname[NAMEDATALEN]; |
| XLogRecPtr origin_startpos = InvalidXLogRecPtr; |
| char *myslotname = NULL; |
| WalRcvStreamOptions options; |
| int server_version; |
| |
| InitializingApplyWorker = true; |
| |
| /* Attach to slot */ |
| logicalrep_worker_attach(worker_slot); |
| |
| /* Setup signal handling */ |
| pqsignal(SIGHUP, SignalHandlerForConfigReload); |
| pqsignal(SIGTERM, die); |
| BackgroundWorkerUnblockSignals(); |
| |
| /* |
| * We don't currently need any ResourceOwner in a walreceiver process, but |
| * if we did, we could call CreateAuxProcessResourceOwner here. |
| */ |
| |
| /* Initialise stats to a sanish value */ |
| MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = |
| MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); |
| |
| /* Load the libpq-specific functions */ |
| load_file("libpqwalreceiver", false); |
| |
| InitializeApplyWorker(); |
| |
| /* |
| * Register a callback to reset the origin state before aborting any |
| * pending transaction during shutdown (see ShutdownPostgres()). This will |
| * avoid origin advancement for an in-complete transaction which could |
| * otherwise lead to its loss as such a transaction won't be sent by the |
| * server again. |
| * |
| * Note that even a LOG or DEBUG statement placed after setting the origin |
| * state may process a shutdown signal before committing the current apply |
| * operation. So, it is important to register such a callback here. |
| */ |
| before_shmem_exit(replorigin_reset, (Datum) 0); |
| |
| InitializingApplyWorker = false; |
| |
| /* Connect to the origin and start the replication. */ |
| elog(DEBUG1, "connecting to publisher using connection string \"%s\"", |
| MySubscription->conninfo); |
| |
| if (am_tablesync_worker()) |
| { |
| start_table_sync(&origin_startpos, &myslotname); |
| |
| ReplicationOriginNameForLogicalRep(MySubscription->oid, |
| MyLogicalRepWorker->relid, |
| originname, |
| sizeof(originname)); |
| set_apply_error_context_origin(originname); |
| } |
| else |
| { |
| /* This is the leader apply worker */ |
| RepOriginId originid; |
| TimeLineID startpointTLI; |
| char *err; |
| bool must_use_password; |
| |
| myslotname = MySubscription->slotname; |
| |
| /* |
| * This shouldn't happen if the subscription is enabled, but guard |
| * against DDL bugs or manual catalog changes. (libpqwalreceiver will |
| * crash if slot is NULL.) |
| */ |
| if (!myslotname) |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("subscription has no replication slot set"))); |
| |
| /* Setup replication origin tracking. */ |
| StartTransactionCommand(); |
| ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, |
| originname, sizeof(originname)); |
| originid = replorigin_by_name(originname, true); |
| if (!OidIsValid(originid)) |
| originid = replorigin_create(originname); |
| replorigin_session_setup(originid, 0); |
| replorigin_session_origin = originid; |
| origin_startpos = replorigin_session_get_progress(false); |
| |
| /* Is the use of a password mandatory? */ |
| must_use_password = MySubscription->passwordrequired && |
| !superuser_arg(MySubscription->owner); |
| |
| /* Note that the superuser_arg call can access the DB */ |
| CommitTransactionCommand(); |
| |
| LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, |
| must_use_password, |
| MySubscription->name, &err); |
| if (LogRepWorkerWalRcvConn == NULL) |
| ereport(ERROR, |
| (errcode(ERRCODE_CONNECTION_FAILURE), |
| errmsg("could not connect to the publisher: %s", err))); |
| |
| /* |
| * We don't really use the output identify_system for anything but it |
| * does some initializations on the upstream so let's still call it. |
| */ |
| (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); |
| |
| set_apply_error_context_origin(originname); |
| } |
| |
| /* |
| * Setup callback for syscache so that we know when something changes in |
| * the subscription relation state. |
| */ |
| CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, |
| invalidate_syncing_table_states, |
| (Datum) 0); |
| |
| /* Build logical replication streaming options. */ |
| options.logical = true; |
| options.startpoint = origin_startpos; |
| options.slotname = myslotname; |
| |
| server_version = walrcv_server_version(LogRepWorkerWalRcvConn); |
| options.proto.logical.proto_version = |
| server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : |
| server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : |
| server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : |
| LOGICALREP_PROTO_VERSION_NUM; |
| |
| options.proto.logical.publication_names = MySubscription->publications; |
| options.proto.logical.binary = MySubscription->binary; |
| |
| /* |
| * Assign the appropriate option value for streaming option according to |
| * the 'streaming' mode and the publisher's ability to support that mode. |
| */ |
| if (server_version >= 160000 && |
| MySubscription->stream == LOGICALREP_STREAM_PARALLEL) |
| { |
| options.proto.logical.streaming_str = "parallel"; |
| MyLogicalRepWorker->parallel_apply = true; |
| } |
| else if (server_version >= 140000 && |
| MySubscription->stream != LOGICALREP_STREAM_OFF) |
| { |
| options.proto.logical.streaming_str = "on"; |
| MyLogicalRepWorker->parallel_apply = false; |
| } |
| else |
| { |
| options.proto.logical.streaming_str = NULL; |
| MyLogicalRepWorker->parallel_apply = false; |
| } |
| |
| options.proto.logical.twophase = false; |
| options.proto.logical.origin = pstrdup(MySubscription->origin); |
| |
| if (!am_tablesync_worker()) |
| { |
| /* |
| * Even when the two_phase mode is requested by the user, it remains |
| * as the tri-state PENDING until all tablesyncs have reached READY |
| * state. Only then, can it become ENABLED. |
| * |
| * Note: If the subscription has no tables then leave the state as |
| * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to |
| * work. |
| */ |
| if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && |
| AllTablesyncsReady()) |
| { |
| /* Start streaming with two_phase enabled */ |
| options.proto.logical.twophase = true; |
| walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); |
| |
| StartTransactionCommand(); |
| UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); |
| MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; |
| CommitTransactionCommand(); |
| } |
| else |
| { |
| walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); |
| } |
| |
| ereport(DEBUG1, |
| (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", |
| MySubscription->name, |
| MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : |
| MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : |
| MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : |
| "?"))); |
| } |
| else |
| { |
| /* Start normal logical streaming replication. */ |
| walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); |
| } |
| |
| /* Run the main loop. */ |
| start_apply(origin_startpos); |
| |
| proc_exit(0); |
| } |
| |
| /* |
| * After error recovery, disable the subscription in a new transaction |
| * and exit cleanly. |
| */ |
| static void |
| DisableSubscriptionAndExit(void) |
| { |
| /* |
| * Emit the error message, and recover from the error state to an idle |
| * state |
| */ |
| HOLD_INTERRUPTS(); |
| |
| EmitErrorReport(); |
| AbortOutOfAnyTransaction(); |
| FlushErrorState(); |
| |
| RESUME_INTERRUPTS(); |
| |
| /* Report the worker failed during either table synchronization or apply */ |
| pgstat_report_subscription_error(MyLogicalRepWorker->subid, |
| !am_tablesync_worker()); |
| |
| /* Disable the subscription */ |
| StartTransactionCommand(); |
| DisableSubscription(MySubscription->oid); |
| CommitTransactionCommand(); |
| |
| /* Ensure we remove no-longer-useful entry for worker's start time */ |
| if (!am_tablesync_worker() && !am_parallel_apply_worker()) |
| ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid); |
| |
| /* Notify the subscription has been disabled and exit */ |
| ereport(LOG, |
| errmsg("subscription \"%s\" has been disabled because of an error", |
| MySubscription->name)); |
| |
| proc_exit(0); |
| } |
| |
| /* |
| * Is current process a logical replication worker? |
| */ |
| bool |
| IsLogicalWorker(void) |
| { |
| return MyLogicalRepWorker != NULL; |
| } |
| |
| /* |
| * Is current process a logical replication parallel apply worker? |
| */ |
| bool |
| IsLogicalParallelApplyWorker(void) |
| { |
| return IsLogicalWorker() && am_parallel_apply_worker(); |
| } |
| |
| /* |
| * Start skipping changes of the transaction if the given LSN matches the |
| * LSN specified by subscription's skiplsn. |
| */ |
| static void |
| maybe_start_skipping_changes(XLogRecPtr finish_lsn) |
| { |
| Assert(!is_skipping_changes()); |
| Assert(!in_remote_transaction); |
| Assert(!in_streamed_transaction); |
| |
| /* |
| * Quick return if it's not requested to skip this transaction. This |
| * function is called for every remote transaction and we assume that |
| * skipping the transaction is not used often. |
| */ |
| if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) || |
| MySubscription->skiplsn != finish_lsn)) |
| return; |
| |
| /* Start skipping all changes of this transaction */ |
| skip_xact_finish_lsn = finish_lsn; |
| |
| ereport(LOG, |
| errmsg("logical replication starts skipping transaction at LSN %X/%X", |
| LSN_FORMAT_ARGS(skip_xact_finish_lsn))); |
| } |
| |
| /* |
| * Stop skipping changes by resetting skip_xact_finish_lsn if enabled. |
| */ |
| static void |
| stop_skipping_changes(void) |
| { |
| if (!is_skipping_changes()) |
| return; |
| |
| ereport(LOG, |
| (errmsg("logical replication completed skipping transaction at LSN %X/%X", |
| LSN_FORMAT_ARGS(skip_xact_finish_lsn)))); |
| |
| /* Stop skipping changes */ |
| skip_xact_finish_lsn = InvalidXLogRecPtr; |
| } |
| |
| /* |
| * Clear subskiplsn of pg_subscription catalog. |
| * |
| * finish_lsn is the transaction's finish LSN that is used to check if the |
| * subskiplsn matches it. If not matched, we raise a warning when clearing the |
| * subskiplsn in order to inform users for cases e.g., where the user mistakenly |
| * specified the wrong subskiplsn. |
| */ |
| static void |
| clear_subscription_skip_lsn(XLogRecPtr finish_lsn) |
| { |
| Relation rel; |
| Form_pg_subscription subform; |
| HeapTuple tup; |
| XLogRecPtr myskiplsn = MySubscription->skiplsn; |
| bool started_tx = false; |
| |
| if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker()) |
| return; |
| |
| if (!IsTransactionState()) |
| { |
| StartTransactionCommand(); |
| started_tx = true; |
| } |
| |
| /* |
| * Protect subskiplsn of pg_subscription from being concurrently updated |
| * while clearing it. |
| */ |
| LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, |
| AccessShareLock); |
| |
| rel = table_open(SubscriptionRelationId, RowExclusiveLock); |
| |
| /* Fetch the existing tuple. */ |
| tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, |
| ObjectIdGetDatum(MySubscription->oid)); |
| |
| if (!HeapTupleIsValid(tup)) |
| elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); |
| |
| subform = (Form_pg_subscription) GETSTRUCT(tup); |
| |
| /* |
| * Clear the subskiplsn. If the user has already changed subskiplsn before |
| * clearing it we don't update the catalog and the replication origin |
| * state won't get advanced. So in the worst case, if the server crashes |
| * before sending an acknowledgment of the flush position the transaction |
| * will be sent again and the user needs to set subskiplsn again. We can |
| * reduce the possibility by logging a replication origin WAL record to |
| * advance the origin LSN instead but there is no way to advance the |
| * origin timestamp and it doesn't seem to be worth doing anything about |
| * it since it's a very rare case. |
| */ |
| if (subform->subskiplsn == myskiplsn) |
| { |
| bool nulls[Natts_pg_subscription]; |
| bool replaces[Natts_pg_subscription]; |
| Datum values[Natts_pg_subscription]; |
| |
| memset(values, 0, sizeof(values)); |
| memset(nulls, false, sizeof(nulls)); |
| memset(replaces, false, sizeof(replaces)); |
| |
| /* reset subskiplsn */ |
| values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr); |
| replaces[Anum_pg_subscription_subskiplsn - 1] = true; |
| |
| tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, |
| replaces); |
| CatalogTupleUpdate(rel, &tup->t_self, tup); |
| |
| if (myskiplsn != finish_lsn) |
| ereport(WARNING, |
| errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name), |
| errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.", |
| LSN_FORMAT_ARGS(finish_lsn), |
| LSN_FORMAT_ARGS(myskiplsn))); |
| } |
| |
| heap_freetuple(tup); |
| table_close(rel, NoLock); |
| |
| if (started_tx) |
| CommitTransactionCommand(); |
| } |
| |
| /* Error callback to give more context info about the change being applied */ |
| void |
| apply_error_callback(void *arg) |
| { |
| ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; |
| |
| if (apply_error_callback_arg.command == 0) |
| return; |
| |
| Assert(errarg->origin_name); |
| |
| if (errarg->rel == NULL) |
| { |
| if (!TransactionIdIsValid(errarg->remote_xid)) |
| errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"", |
| errarg->origin_name, |
| logicalrep_message_type(errarg->command)); |
| else if (XLogRecPtrIsInvalid(errarg->finish_lsn)) |
| errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u", |
| errarg->origin_name, |
| logicalrep_message_type(errarg->command), |
| errarg->remote_xid); |
| else |
| errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X", |
| errarg->origin_name, |
| logicalrep_message_type(errarg->command), |
| errarg->remote_xid, |
| LSN_FORMAT_ARGS(errarg->finish_lsn)); |
| } |
| else |
| { |
| if (errarg->remote_attnum < 0) |
| { |
| if (XLogRecPtrIsInvalid(errarg->finish_lsn)) |
| errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u", |
| errarg->origin_name, |
| logicalrep_message_type(errarg->command), |
| errarg->rel->remoterel.nspname, |
| errarg->rel->remoterel.relname, |
| errarg->remote_xid); |
| else |
| errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X", |
| errarg->origin_name, |
| logicalrep_message_type(errarg->command), |
| errarg->rel->remoterel.nspname, |
| errarg->rel->remoterel.relname, |
| errarg->remote_xid, |
| LSN_FORMAT_ARGS(errarg->finish_lsn)); |
| } |
| else |
| { |
| if (XLogRecPtrIsInvalid(errarg->finish_lsn)) |
| errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u", |
| errarg->origin_name, |
| logicalrep_message_type(errarg->command), |
| errarg->rel->remoterel.nspname, |
| errarg->rel->remoterel.relname, |
| errarg->rel->remoterel.attnames[errarg->remote_attnum], |
| errarg->remote_xid); |
| else |
| errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X", |
| errarg->origin_name, |
| logicalrep_message_type(errarg->command), |
| errarg->rel->remoterel.nspname, |
| errarg->rel->remoterel.relname, |
| errarg->rel->remoterel.attnames[errarg->remote_attnum], |
| errarg->remote_xid, |
| LSN_FORMAT_ARGS(errarg->finish_lsn)); |
| } |
| } |
| } |
| |
| /* Set transaction information of apply error callback */ |
| static inline void |
| set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn) |
| { |
| apply_error_callback_arg.remote_xid = xid; |
| apply_error_callback_arg.finish_lsn = lsn; |
| } |
| |
| /* Reset all information of apply error callback */ |
| static inline void |
| reset_apply_error_context_info(void) |
| { |
| apply_error_callback_arg.command = 0; |
| apply_error_callback_arg.rel = NULL; |
| apply_error_callback_arg.remote_attnum = -1; |
| set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); |
| } |
| |
| /* |
| * Request wakeup of the workers for the given subscription OID |
| * at commit of the current transaction. |
| * |
| * This is used to ensure that the workers process assorted changes |
| * as soon as possible. |
| */ |
| void |
| LogicalRepWorkersWakeupAtCommit(Oid subid) |
| { |
| MemoryContext oldcxt; |
| |
| oldcxt = MemoryContextSwitchTo(TopTransactionContext); |
| on_commit_wakeup_workers_subids = |
| list_append_unique_oid(on_commit_wakeup_workers_subids, subid); |
| MemoryContextSwitchTo(oldcxt); |
| } |
| |
| /* |
| * Wake up the workers of any subscriptions that were changed in this xact. |
| */ |
| void |
| AtEOXact_LogicalRepWorkers(bool isCommit) |
| { |
| if (isCommit && on_commit_wakeup_workers_subids != NIL) |
| { |
| ListCell *lc; |
| |
| LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
| foreach(lc, on_commit_wakeup_workers_subids) |
| { |
| Oid subid = lfirst_oid(lc); |
| List *workers; |
| ListCell *lc2; |
| |
| workers = logicalrep_workers_find(subid, true); |
| foreach(lc2, workers) |
| { |
| LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2); |
| |
| logicalrep_worker_wakeup_ptr(worker); |
| } |
| } |
| LWLockRelease(LogicalRepWorkerLock); |
| } |
| |
| /* The List storage will be reclaimed automatically in xact cleanup. */ |
| on_commit_wakeup_workers_subids = NIL; |
| } |
| |
| /* |
| * Allocate the origin name in long-lived context for error context message. |
| */ |
| void |
| set_apply_error_context_origin(char *originname) |
| { |
| apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, |
| originname); |
| } |
| |
| /* |
| * Return the action to be taken for the given transaction. See |
| * TransApplyAction for information on each of the actions. |
| * |
| * *winfo is assigned to the destination parallel worker info when the leader |
| * apply worker has to pass all the transaction's changes to the parallel |
| * apply worker. |
| */ |
| static TransApplyAction |
| get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) |
| { |
| *winfo = NULL; |
| |
| if (am_parallel_apply_worker()) |
| { |
| return TRANS_PARALLEL_APPLY; |
| } |
| |
| /* |
| * If we are processing this transaction using a parallel apply worker |
| * then either we send the changes to the parallel worker or if the worker |
| * is busy then serialize the changes to the file which will later be |
| * processed by the parallel worker. |
| */ |
| *winfo = pa_find_worker(xid); |
| |
| if (*winfo && (*winfo)->serialize_changes) |
| { |
| return TRANS_LEADER_PARTIAL_SERIALIZE; |
| } |
| else if (*winfo) |
| { |
| return TRANS_LEADER_SEND_TO_PARALLEL; |
| } |
| |
| /* |
| * If there is no parallel worker involved to process this transaction |
| * then we either directly apply the change or serialize it to a file |
| * which will later be applied when the transaction finish message is |
| * processed. |
| */ |
| else if (in_streamed_transaction) |
| { |
| return TRANS_LEADER_SERIALIZE; |
| } |
| else |
| { |
| return TRANS_LEADER_APPLY; |
| } |
| } |