| /* ------------------------------------------------------------------------- |
| * |
| * decode.c |
| * This module decodes WAL records read using xlogreader.h's APIs for the |
| * purpose of logical decoding by passing information to the |
| * reorderbuffer module (containing the actual changes) and to the |
| * snapbuild module to build a fitting catalog snapshot (to be able to |
| * properly decode the changes in the reorderbuffer). |
| * |
| * NOTE: |
| * This basically tries to handle all low level xlog stuff for |
| * reorderbuffer.c and snapbuild.c. There's some minor leakage where a |
| * specific record's struct is used to pass data along, but those just |
| * happen to contain the right amount of data in a convenient |
| * format. There isn't and shouldn't be much intelligence about the |
| * contents of records in here except turning them into a more usable |
| * format. |
| * |
| * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * IDENTIFICATION |
| * src/backend/replication/logical/decode.c |
| * |
| * ------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| |
| #include "access/heapam.h" |
| #include "access/heapam_xlog.h" |
| #include "access/transam.h" |
| #include "access/xact.h" |
| #include "access/xlog_internal.h" |
| #include "access/xlogreader.h" |
| #include "access/xlogrecord.h" |
| #include "access/xlogutils.h" |
| #include "catalog/pg_control.h" |
| #include "replication/decode.h" |
| #include "replication/logical.h" |
| #include "replication/message.h" |
| #include "replication/origin.h" |
| #include "replication/reorderbuffer.h" |
| #include "replication/snapbuild.h" |
| #include "storage/standby.h" |
| |
| typedef struct XLogRecordBuffer |
| { |
| XLogRecPtr origptr; |
| XLogRecPtr endptr; |
| XLogReaderState *record; |
| } XLogRecordBuffer; |
| |
| /* RMGR Handlers */ |
| static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
| static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
| static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
| static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
| static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
| static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
| |
| /* individual record(group)'s handlers */ |
| static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
| static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
| static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
| static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
| static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
| static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); |
| |
| static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, |
| xl_xact_parsed_commit *parsed, TransactionId xid); |
| static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, |
| xl_xact_parsed_abort *parsed, TransactionId xid); |
| |
| /* common function to decode tuples */ |
| static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); |
| |
| /* |
| * Take every XLogReadRecord()ed record and perform the actions required to |
| * decode it using the output plugin already setup in the logical decoding |
| * context. |
| * |
| * NB: Note that every record's xid needs to be processed by reorderbuffer |
| * (xids contained in the content of records are not relevant for this rule). |
| * That means that for records which'd otherwise not go through the |
| * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to |
| * call ReorderBufferProcessXid for each record type by default, because |
| * e.g. empty xacts can be handled more efficiently if there's no previous |
| * state for them. |
| * |
| * We also support the ability to fast forward thru records, skipping some |
| * record types completely - see individual record types for details. |
| */ |
| void |
| LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) |
| { |
| XLogRecordBuffer buf; |
| |
| buf.origptr = ctx->reader->ReadRecPtr; |
| buf.endptr = ctx->reader->EndRecPtr; |
| buf.record = record; |
| |
| /* cast so we get a warning when new rmgrs are added */ |
| switch ((RmgrId) XLogRecGetRmid(record)) |
| { |
| /* |
| * Rmgrs we care about for logical decoding. Add new rmgrs in |
| * rmgrlist.h's order. |
| */ |
| case RM_XLOG_ID: |
| DecodeXLogOp(ctx, &buf); |
| break; |
| |
| case RM_XACT_ID: |
| DecodeXactOp(ctx, &buf); |
| break; |
| |
| case RM_STANDBY_ID: |
| DecodeStandbyOp(ctx, &buf); |
| break; |
| |
| case RM_HEAP2_ID: |
| DecodeHeap2Op(ctx, &buf); |
| break; |
| |
| case RM_HEAP_ID: |
| DecodeHeapOp(ctx, &buf); |
| break; |
| |
| case RM_LOGICALMSG_ID: |
| DecodeLogicalMsgOp(ctx, &buf); |
| break; |
| |
| /* |
| * Rmgrs irrelevant for logical decoding; they describe stuff not |
| * represented in logical decoding. Add new rmgrs in rmgrlist.h's |
| * order. |
| */ |
| case RM_SMGR_ID: |
| case RM_CLOG_ID: |
| case RM_DBASE_ID: |
| case RM_TBLSPC_ID: |
| case RM_MULTIXACT_ID: |
| case RM_RELMAP_ID: |
| case RM_BTREE_ID: |
| case RM_HASH_ID: |
| case RM_GIN_ID: |
| case RM_GIST_ID: |
| case RM_SEQ_ID: |
| case RM_SPGIST_ID: |
| case RM_BRIN_ID: |
| case RM_COMMIT_TS_ID: |
| case RM_REPLORIGIN_ID: |
| case RM_GENERIC_ID: |
| /* just deal with xid, and done */ |
| ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), |
| buf.origptr); |
| break; |
| case RM_NEXT_ID: |
| elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); |
| } |
| } |
| |
| /* |
| * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer(). |
| */ |
| static void |
| DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
| { |
| SnapBuild *builder = ctx->snapshot_builder; |
| uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; |
| |
| ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record), |
| buf->origptr); |
| |
| switch (info) |
| { |
| /* this is also used in END_OF_RECOVERY checkpoints */ |
| case XLOG_CHECKPOINT_SHUTDOWN: |
| case XLOG_END_OF_RECOVERY: |
| SnapBuildSerializationPoint(builder, buf->origptr); |
| |
| break; |
| case XLOG_CHECKPOINT_ONLINE: |
| |
| /* |
| * a RUNNING_XACTS record will have been logged near to this, we |
| * can restart from there. |
| */ |
| break; |
| case XLOG_NOOP: |
| case XLOG_NEXTOID: |
| case XLOG_SWITCH: |
| case XLOG_BACKUP_END: |
| case XLOG_PARAMETER_CHANGE: |
| case XLOG_RESTORE_POINT: |
| case XLOG_FPW_CHANGE: |
| case XLOG_FPI_FOR_HINT: |
| case XLOG_FPI: |
| case XLOG_OVERWRITE_CONTRECORD: |
| break; |
| default: |
| elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info); |
| } |
| } |
| |
| /* |
| * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer(). |
| */ |
| static void |
| DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
| { |
| SnapBuild *builder = ctx->snapshot_builder; |
| ReorderBuffer *reorder = ctx->reorder; |
| XLogReaderState *r = buf->record; |
| uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK; |
| |
| /* |
| * If the snapshot isn't yet fully built, we cannot decode anything, so |
| * bail out. |
| * |
| * However, it's critical to process XLOG_XACT_ASSIGNMENT records even |
| * when the snapshot is being built: it is possible to get later records |
| * that require subxids to be properly assigned. |
| */ |
| if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT && |
| info != XLOG_XACT_ASSIGNMENT) |
| return; |
| |
| switch (info) |
| { |
| case XLOG_XACT_COMMIT: |
| case XLOG_XACT_COMMIT_PREPARED: |
| { |
| xl_xact_commit *xlrec; |
| xl_xact_parsed_commit parsed; |
| TransactionId xid; |
| |
| xlrec = (xl_xact_commit *) XLogRecGetData(r); |
| ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); |
| |
| if (!TransactionIdIsValid(parsed.twophase_xid)) |
| xid = XLogRecGetXid(r); |
| else |
| xid = parsed.twophase_xid; |
| |
| DecodeCommit(ctx, buf, &parsed, xid); |
| break; |
| } |
| case XLOG_XACT_ABORT: |
| case XLOG_XACT_ABORT_PREPARED: |
| { |
| xl_xact_abort *xlrec; |
| xl_xact_parsed_abort parsed; |
| TransactionId xid; |
| |
| xlrec = (xl_xact_abort *) XLogRecGetData(r); |
| ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); |
| |
| if (!TransactionIdIsValid(parsed.twophase_xid)) |
| xid = XLogRecGetXid(r); |
| else |
| xid = parsed.twophase_xid; |
| |
| DecodeAbort(ctx, buf, &parsed, xid); |
| break; |
| } |
| case XLOG_XACT_ASSIGNMENT: |
| { |
| xl_xact_assignment *xlrec; |
| int i; |
| TransactionId *sub_xid; |
| |
| xlrec = (xl_xact_assignment *) XLogRecGetData(r); |
| |
| sub_xid = &xlrec->xsub[0]; |
| |
| for (i = 0; i < xlrec->nsubxacts; i++) |
| { |
| ReorderBufferAssignChild(reorder, xlrec->xtop, |
| *(sub_xid++), buf->origptr); |
| } |
| break; |
| } |
| case XLOG_XACT_PREPARE: |
| |
| /* |
| * Currently decoding ignores PREPARE TRANSACTION and will just |
| * decode the transaction when the COMMIT PREPARED is sent or |
| * throw away the transaction's contents when a ROLLBACK PREPARED |
| * is received. In the future we could add code to expose prepared |
| * transactions in the changestream allowing for a kind of |
| * distributed 2PC. |
| */ |
| ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); |
| break; |
| default: |
| elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); |
| } |
| } |
| |
| /* |
| * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer(). |
| */ |
| static void |
| DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
| { |
| SnapBuild *builder = ctx->snapshot_builder; |
| XLogReaderState *r = buf->record; |
| uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; |
| |
| ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); |
| |
| switch (info) |
| { |
| case XLOG_RUNNING_XACTS: |
| { |
| xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r); |
| |
| SnapBuildProcessRunningXacts(builder, buf->origptr, running); |
| |
| /* |
| * Abort all transactions that we keep track of, that are |
| * older than the record's oldestRunningXid. This is the most |
| * convenient spot for doing so since, in contrast to shutdown |
| * or end-of-recovery checkpoints, we have information about |
| * all running transactions which includes prepared ones, |
| * while shutdown checkpoints just know that no non-prepared |
| * transactions are in progress. |
| */ |
| ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid); |
| } |
| break; |
| case XLOG_STANDBY_LOCK: |
| break; |
| case XLOG_INVALIDATIONS: |
| { |
| xl_invalidations *invalidations = |
| (xl_invalidations *) XLogRecGetData(r); |
| |
| if (!ctx->fast_forward) |
| ReorderBufferImmediateInvalidation(ctx->reorder, |
| invalidations->nmsgs, |
| invalidations->msgs); |
| } |
| break; |
| default: |
| elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info); |
| } |
| } |
| |
| /* |
| * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer(). |
| */ |
| static void |
| DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
| { |
| uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; |
| TransactionId xid = XLogRecGetXid(buf->record); |
| SnapBuild *builder = ctx->snapshot_builder; |
| |
| ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); |
| |
| /* |
| * If we don't have snapshot or we are just fast-forwarding, there is no |
| * point in decoding changes. |
| */ |
| if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || |
| ctx->fast_forward) |
| return; |
| |
| switch (info) |
| { |
| case XLOG_HEAP2_MULTI_INSERT: |
| if (!ctx->fast_forward && |
| SnapBuildProcessChange(builder, xid, buf->origptr)) |
| DecodeMultiInsert(ctx, buf); |
| break; |
| case XLOG_HEAP2_NEW_CID: |
| { |
| xl_heap_new_cid *xlrec; |
| |
| xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record); |
| SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec); |
| |
| break; |
| } |
| case XLOG_HEAP2_REWRITE: |
| |
| /* |
| * Although these records only exist to serve the needs of logical |
| * decoding, all the work happens as part of crash or archive |
| * recovery, so we don't need to do anything here. |
| */ |
| break; |
| |
| /* |
| * Everything else here is just low level physical stuff we're not |
| * interested in. |
| */ |
| case XLOG_HEAP2_FREEZE_PAGE: |
| case XLOG_HEAP2_CLEAN: |
| case XLOG_HEAP2_CLEANUP_INFO: |
| case XLOG_HEAP2_VISIBLE: |
| case XLOG_HEAP2_LOCK_UPDATED: |
| break; |
| default: |
| elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info); |
| } |
| } |
| |
| /* |
| * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer(). |
| */ |
| static void |
| DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
| { |
| uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; |
| TransactionId xid = XLogRecGetXid(buf->record); |
| SnapBuild *builder = ctx->snapshot_builder; |
| |
| ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); |
| |
| /* |
| * If we don't have snapshot or we are just fast-forwarding, there is no |
| * point in decoding data changes. |
| */ |
| if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || |
| ctx->fast_forward) |
| return; |
| |
| switch (info) |
| { |
| case XLOG_HEAP_INSERT: |
| if (SnapBuildProcessChange(builder, xid, buf->origptr)) |
| DecodeInsert(ctx, buf); |
| break; |
| |
| /* |
| * Treat HOT update as normal updates. There is no useful |
| * information in the fact that we could make it a HOT update |
| * locally and the WAL layout is compatible. |
| */ |
| case XLOG_HEAP_HOT_UPDATE: |
| case XLOG_HEAP_UPDATE: |
| if (SnapBuildProcessChange(builder, xid, buf->origptr)) |
| DecodeUpdate(ctx, buf); |
| break; |
| |
| case XLOG_HEAP_DELETE: |
| if (SnapBuildProcessChange(builder, xid, buf->origptr)) |
| DecodeDelete(ctx, buf); |
| break; |
| |
| case XLOG_HEAP_TRUNCATE: |
| if (SnapBuildProcessChange(builder, xid, buf->origptr)) |
| DecodeTruncate(ctx, buf); |
| break; |
| |
| case XLOG_HEAP_INPLACE: |
| |
| /* |
| * Inplace updates are only ever performed on catalog tuples and |
| * can, per definition, not change tuple visibility. Since we |
| * don't decode catalog tuples, we're not interested in the |
| * record's contents. |
| * |
| * In-place updates can be used either by XID-bearing transactions |
| * (e.g. in CREATE INDEX CONCURRENTLY) or by XID-less |
| * transactions (e.g. VACUUM). In the former case, the commit |
| * record will include cache invalidations, so we mark the |
| * transaction as catalog modifying here. Currently that's |
| * redundant because the commit will do that as well, but once we |
| * support decoding in-progress relations, this will be important. |
| */ |
| if (!TransactionIdIsValid(xid)) |
| break; |
| |
| SnapBuildProcessChange(builder, xid, buf->origptr); |
| ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); |
| break; |
| |
| case XLOG_HEAP_CONFIRM: |
| if (SnapBuildProcessChange(builder, xid, buf->origptr)) |
| DecodeSpecConfirm(ctx, buf); |
| break; |
| |
| case XLOG_HEAP_LOCK: |
| /* we don't care about row level locks for now */ |
| break; |
| |
| default: |
| elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info); |
| break; |
| } |
| } |
| |
| static inline bool |
| FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) |
| { |
| if (ctx->callbacks.filter_by_origin_cb == NULL) |
| return false; |
| |
| return filter_by_origin_cb_wrapper(ctx, origin_id); |
| } |
| |
| /* |
| * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). |
| */ |
| static void |
| DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
| { |
| SnapBuild *builder = ctx->snapshot_builder; |
| XLogReaderState *r = buf->record; |
| TransactionId xid = XLogRecGetXid(r); |
| uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; |
| RepOriginId origin_id = XLogRecGetOrigin(r); |
| Snapshot snapshot; |
| xl_logical_message *message; |
| |
| if (info != XLOG_LOGICAL_MESSAGE) |
| elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info); |
| |
| ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); |
| |
| /* |
| * If we don't have snapshot or we are just fast-forwarding, there is no |
| * point in decoding messages. |
| */ |
| if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || |
| ctx->fast_forward) |
| return; |
| |
| message = (xl_logical_message *) XLogRecGetData(r); |
| |
| if (message->dbId != ctx->slot->data.database || |
| FilterByOrigin(ctx, origin_id)) |
| return; |
| |
| if (message->transactional && |
| !SnapBuildProcessChange(builder, xid, buf->origptr)) |
| return; |
| else if (!message->transactional && |
| (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || |
| SnapBuildXactNeedsSkip(builder, buf->origptr))) |
| return; |
| |
| snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); |
| ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr, |
| message->transactional, |
| message->message, /* first part of message is |
| * prefix */ |
| message->message_size, |
| message->message + message->prefix_size); |
| } |
| |
| /* |
| * Consolidated commit record handling between the different form of commit |
| * records. |
| */ |
| static void |
| DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, |
| xl_xact_parsed_commit *parsed, TransactionId xid) |
| { |
| XLogRecPtr origin_lsn = InvalidXLogRecPtr; |
| TimestampTz commit_time = parsed->xact_time; |
| RepOriginId origin_id = XLogRecGetOrigin(buf->record); |
| int i; |
| |
| if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) |
| { |
| origin_lsn = parsed->origin_lsn; |
| commit_time = parsed->origin_timestamp; |
| } |
| |
| /* |
| * Process invalidation messages, even if we're not interested in the |
| * transaction's contents, since the various caches need to always be |
| * consistent. |
| */ |
| if (parsed->nmsgs > 0) |
| { |
| if (!ctx->fast_forward) |
| ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, |
| parsed->nmsgs, parsed->msgs); |
| /* |
| * If the COMMIT record has invalidation messages, it could have catalog |
| * changes. It is possible that we didn't mark this transaction and |
| * its subtransactions as containing catalog changes when the decoding |
| * starts from a commit record without decoding the transaction's other |
| * changes. Therefore, we ensure to mark such transactions as containing |
| * catalog change. |
| * |
| * This must be done before SnapBuildCommitTxn() so that we can include |
| * these transactions in the historic snapshot. |
| */ |
| SnapBuildXidSetCatalogChanges(ctx->snapshot_builder, xid, |
| parsed->nsubxacts, parsed->subxacts, |
| buf->origptr); |
| } |
| |
| SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, |
| parsed->nsubxacts, parsed->subxacts); |
| |
| /* ---- |
| * Check whether we are interested in this specific transaction, and tell |
| * the reorderbuffer to forget the content of the (sub-)transactions |
| * if not. |
| * |
| * There can be several reasons we might not be interested in this |
| * transaction: |
| * 1) We might not be interested in decoding transactions up to this |
| * LSN. This can happen because we previously decoded it and now just |
| * are restarting or if we haven't assembled a consistent snapshot yet. |
| * 2) The transaction happened in another database. |
| * 3) The output plugin is not interested in the origin. |
| * 4) We are doing fast-forwarding |
| * |
| * We can't just use ReorderBufferAbort() here, because we need to execute |
| * the transaction's invalidations. This currently won't be needed if |
| * we're just skipping over the transaction because currently we only do |
| * so during startup, to get to the first transaction the client needs. As |
| * we have reset the catalog caches before starting to read WAL, and we |
| * haven't yet touched any catalogs, there can't be anything to invalidate. |
| * But if we're "forgetting" this commit because it's it happened in |
| * another database, the invalidations might be important, because they |
| * could be for shared catalogs and we might have loaded data into the |
| * relevant syscaches. |
| * --- |
| */ |
| if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || |
| (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || |
| ctx->fast_forward || FilterByOrigin(ctx, origin_id)) |
| { |
| for (i = 0; i < parsed->nsubxacts; i++) |
| { |
| ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr); |
| } |
| ReorderBufferForget(ctx->reorder, xid, buf->origptr); |
| |
| return; |
| } |
| |
| /* tell the reorderbuffer about the surviving subtransactions */ |
| for (i = 0; i < parsed->nsubxacts; i++) |
| { |
| ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], |
| buf->origptr, buf->endptr); |
| } |
| |
| /* replay actions of all transaction + subtransactions in order */ |
| ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, |
| commit_time, origin_id, origin_lsn); |
| } |
| |
| /* |
| * Get the data from the various forms of abort records and pass it on to |
| * snapbuild.c and reorderbuffer.c |
| */ |
| static void |
| DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, |
| xl_xact_parsed_abort *parsed, TransactionId xid) |
| { |
| int i; |
| |
| for (i = 0; i < parsed->nsubxacts; i++) |
| { |
| ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], |
| buf->record->EndRecPtr); |
| } |
| |
| ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); |
| } |
| |
| /* |
| * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs. |
| * |
| * Deletes can contain the new tuple. |
| */ |
| static void |
| DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
| { |
| Size datalen; |
| char *tupledata; |
| Size tuplelen; |
| XLogReaderState *r = buf->record; |
| xl_heap_insert *xlrec; |
| ReorderBufferChange *change; |
| RelFileNode target_node; |
| |
| xlrec = (xl_heap_insert *) XLogRecGetData(r); |
| |
| /* |
| * Ignore insert records without new tuples (this does happen when |
| * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL). |
| */ |
| if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)) |
| return; |
| |
| /* only interested in our database */ |
| XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); |
| if (target_node.dbNode != ctx->slot->data.database) |
| return; |
| |
| /* output plugin doesn't look for this origin, no need to queue */ |
| if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) |
| return; |
| |
| change = ReorderBufferGetChange(ctx->reorder); |
| if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE)) |
| change->action = REORDER_BUFFER_CHANGE_INSERT; |
| else |
| change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT; |
| change->origin_id = XLogRecGetOrigin(r); |
| |
| memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); |
| |
| tupledata = XLogRecGetBlockData(r, 0, &datalen); |
| tuplelen = datalen - SizeOfHeapHeader; |
| |
| change->data.tp.newtuple = |
| ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); |
| |
| DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple); |
| |
| change->data.tp.clear_toast_afterwards = true; |
| |
| ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); |
| } |
| |
| /* |
| * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout |
| * in the record, from wal into proper tuplebufs. |
| * |
| * Updates can possibly contain a new tuple and the old primary key. |
| */ |
| static void |
| DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
| { |
| XLogReaderState *r = buf->record; |
| xl_heap_update *xlrec; |
| ReorderBufferChange *change; |
| char *data; |
| RelFileNode target_node; |
| |
| xlrec = (xl_heap_update *) XLogRecGetData(r); |
| |
| /* only interested in our database */ |
| XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); |
| if (target_node.dbNode != ctx->slot->data.database) |
| return; |
| |
| /* output plugin doesn't look for this origin, no need to queue */ |
| if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) |
| return; |
| |
| change = ReorderBufferGetChange(ctx->reorder); |
| change->action = REORDER_BUFFER_CHANGE_UPDATE; |
| change->origin_id = XLogRecGetOrigin(r); |
| memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); |
| |
| if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE) |
| { |
| Size datalen; |
| Size tuplelen; |
| |
| data = XLogRecGetBlockData(r, 0, &datalen); |
| |
| tuplelen = datalen - SizeOfHeapHeader; |
| |
| change->data.tp.newtuple = |
| ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); |
| |
| DecodeXLogTuple(data, datalen, change->data.tp.newtuple); |
| } |
| |
| if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD) |
| { |
| Size datalen; |
| Size tuplelen; |
| |
| /* caution, remaining data in record is not aligned */ |
| data = XLogRecGetData(r) + SizeOfHeapUpdate; |
| datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate; |
| tuplelen = datalen - SizeOfHeapHeader; |
| |
| change->data.tp.oldtuple = |
| ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); |
| |
| DecodeXLogTuple(data, datalen, change->data.tp.oldtuple); |
| } |
| |
| change->data.tp.clear_toast_afterwards = true; |
| |
| ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); |
| } |
| |
| /* |
| * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs. |
| * |
| * Deletes can possibly contain the old primary key. |
| */ |
| static void |
| DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
| { |
| XLogReaderState *r = buf->record; |
| xl_heap_delete *xlrec; |
| ReorderBufferChange *change; |
| RelFileNode target_node; |
| |
| xlrec = (xl_heap_delete *) XLogRecGetData(r); |
| |
| /* only interested in our database */ |
| XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); |
| if (target_node.dbNode != ctx->slot->data.database) |
| return; |
| |
| /* output plugin doesn't look for this origin, no need to queue */ |
| if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) |
| return; |
| |
| change = ReorderBufferGetChange(ctx->reorder); |
| |
| if (xlrec->flags & XLH_DELETE_IS_SUPER) |
| change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT; |
| else |
| change->action = REORDER_BUFFER_CHANGE_DELETE; |
| |
| change->origin_id = XLogRecGetOrigin(r); |
| |
| memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); |
| |
| /* old primary key stored */ |
| if (xlrec->flags & XLH_DELETE_CONTAINS_OLD) |
| { |
| Size datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete; |
| Size tuplelen = datalen - SizeOfHeapHeader; |
| |
| Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader)); |
| |
| change->data.tp.oldtuple = |
| ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); |
| |
| DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, |
| datalen, change->data.tp.oldtuple); |
| } |
| |
| change->data.tp.clear_toast_afterwards = true; |
| |
| ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); |
| } |
| |
| /* |
| * Parse XLOG_HEAP_TRUNCATE from wal |
| */ |
| static void |
| DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
| { |
| XLogReaderState *r = buf->record; |
| xl_heap_truncate *xlrec; |
| ReorderBufferChange *change; |
| |
| xlrec = (xl_heap_truncate *) XLogRecGetData(r); |
| |
| /* only interested in our database */ |
| if (xlrec->dbId != ctx->slot->data.database) |
| return; |
| |
| /* output plugin doesn't look for this origin, no need to queue */ |
| if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) |
| return; |
| |
| change = ReorderBufferGetChange(ctx->reorder); |
| change->action = REORDER_BUFFER_CHANGE_TRUNCATE; |
| change->origin_id = XLogRecGetOrigin(r); |
| if (xlrec->flags & XLH_TRUNCATE_CASCADE) |
| change->data.truncate.cascade = true; |
| if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS) |
| change->data.truncate.restart_seqs = true; |
| change->data.truncate.nrelids = xlrec->nrelids; |
| change->data.truncate.relids = ReorderBufferGetRelids(ctx->reorder, |
| xlrec->nrelids); |
| memcpy(change->data.truncate.relids, xlrec->relids, |
| xlrec->nrelids * sizeof(Oid)); |
| ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), |
| buf->origptr, change); |
| } |
| |
| /* |
| * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs. |
| * |
| * Currently MULTI_INSERT will always contain the full tuples. |
| */ |
| static void |
| DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
| { |
| XLogReaderState *r = buf->record; |
| xl_heap_multi_insert *xlrec; |
| int i; |
| char *data; |
| char *tupledata; |
| Size tuplelen; |
| RelFileNode rnode; |
| |
| xlrec = (xl_heap_multi_insert *) XLogRecGetData(r); |
| |
| /* |
| * Ignore insert records without new tuples. This happens when a |
| * multi_insert is done on a catalog or on a non-persistent relation. |
| */ |
| if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)) |
| return; |
| |
| /* only interested in our database */ |
| XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL); |
| if (rnode.dbNode != ctx->slot->data.database) |
| return; |
| |
| /* output plugin doesn't look for this origin, no need to queue */ |
| if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) |
| return; |
| |
| /* |
| * We know that this multi_insert isn't for a catalog, so the block should |
| * always have data even if a full-page write of it is taken. |
| */ |
| tupledata = XLogRecGetBlockData(r, 0, &tuplelen); |
| Assert(tupledata != NULL); |
| |
| data = tupledata; |
| for (i = 0; i < xlrec->ntuples; i++) |
| { |
| ReorderBufferChange *change; |
| xl_multi_insert_tuple *xlhdr; |
| int datalen; |
| ReorderBufferTupleBuf *tuple; |
| HeapTupleHeader header; |
| |
| change = ReorderBufferGetChange(ctx->reorder); |
| change->action = REORDER_BUFFER_CHANGE_INSERT; |
| change->origin_id = XLogRecGetOrigin(r); |
| |
| memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode)); |
| |
| xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); |
| data = ((char *) xlhdr) + SizeOfMultiInsertTuple; |
| datalen = xlhdr->datalen; |
| |
| change->data.tp.newtuple = |
| ReorderBufferGetTupleBuf(ctx->reorder, datalen); |
| |
| tuple = change->data.tp.newtuple; |
| header = tuple->tuple.t_data; |
| |
| /* not a disk based tuple */ |
| ItemPointerSetInvalid(&tuple->tuple.t_self); |
| |
| /* |
| * We can only figure this out after reassembling the transactions. |
| */ |
| tuple->tuple.t_tableOid = InvalidOid; |
| |
| tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; |
| |
| memset(header, 0, SizeofHeapTupleHeader); |
| |
| memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader, |
| (char *) data, |
| datalen); |
| header->t_infomask = xlhdr->t_infomask; |
| header->t_infomask2 = xlhdr->t_infomask2; |
| header->t_hoff = xlhdr->t_hoff; |
| |
| /* |
| * Reset toast reassembly state only after the last row in the last |
| * xl_multi_insert_tuple record emitted by one heap_multi_insert() |
| * call. |
| */ |
| if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI && |
| (i + 1) == xlrec->ntuples) |
| change->data.tp.clear_toast_afterwards = true; |
| else |
| change->data.tp.clear_toast_afterwards = false; |
| |
| ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), |
| buf->origptr, change); |
| |
| /* move to the next xl_multi_insert_tuple entry */ |
| data += datalen; |
| } |
| Assert(data == tupledata + tuplelen); |
| } |
| |
| /* |
| * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change. |
| * |
| * This is pretty trivial, all the state essentially already setup by the |
| * speculative insertion. |
| */ |
| static void |
| DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) |
| { |
| XLogReaderState *r = buf->record; |
| ReorderBufferChange *change; |
| RelFileNode target_node; |
| |
| /* only interested in our database */ |
| XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); |
| if (target_node.dbNode != ctx->slot->data.database) |
| return; |
| |
| /* output plugin doesn't look for this origin, no need to queue */ |
| if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) |
| return; |
| |
| change = ReorderBufferGetChange(ctx->reorder); |
| change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM; |
| change->origin_id = XLogRecGetOrigin(r); |
| |
| memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); |
| |
| change->data.tp.clear_toast_afterwards = true; |
| |
| ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); |
| } |
| |
| |
| /* |
| * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete |
| * (but not by heap_multi_insert) into a tuplebuf. |
| * |
| * The size 'len' and the pointer 'data' in the record need to be |
| * computed outside as they are record specific. |
| */ |
| static void |
| DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) |
| { |
| xl_heap_header xlhdr; |
| int datalen = len - SizeOfHeapHeader; |
| HeapTupleHeader header; |
| |
| Assert(datalen >= 0); |
| |
| tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; |
| header = tuple->tuple.t_data; |
| |
| /* not a disk based tuple */ |
| ItemPointerSetInvalid(&tuple->tuple.t_self); |
| |
| /* we can only figure this out after reassembling the transactions */ |
| tuple->tuple.t_tableOid = InvalidOid; |
| |
| /* data is not stored aligned, copy to aligned storage */ |
| memcpy((char *) &xlhdr, |
| data, |
| SizeOfHeapHeader); |
| |
| memset(header, 0, SizeofHeapTupleHeader); |
| |
| memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, |
| data + SizeOfHeapHeader, |
| datalen); |
| |
| header->t_infomask = xlhdr.t_infomask; |
| header->t_infomask2 = xlhdr.t_infomask2; |
| header->t_hoff = xlhdr.t_hoff; |
| } |