| /*------------------------------------------------------------------------- |
| * |
| * xact.c |
| * top level transaction system support routines |
| * |
| * See src/backend/access/transam/README for more information. |
| * |
| * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * |
| * IDENTIFICATION |
| * src/backend/access/transam/xact.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include <time.h> |
| #include <unistd.h> |
| |
| #include "access/commit_ts.h" |
| #include "access/heapam.h" |
| #include "access/multixact.h" |
| #include "access/parallel.h" |
| #include "access/subtrans.h" |
| #include "access/transam.h" |
| #include "access/twophase.h" |
| #include "access/xact.h" |
| #include "access/xlog.h" |
| #include "access/xloginsert.h" |
| #include "access/xact_storage_tablespace.h" |
| #include "access/xlogutils.h" |
| #include "catalog/catalog.h" |
| #include "catalog/index.h" |
| #include "catalog/namespace.h" |
| #include "catalog/pg_enum.h" |
| #include "catalog/storage.h" |
| #include "catalog/storage_tablespace.h" |
| #include "catalog/storage_database.h" |
| #include "commands/async.h" |
| #include "commands/matview.h" |
| #include "commands/dbcommands.h" |
| #include "commands/extension.h" |
| #include "commands/resgroupcmds.h" |
| #include "commands/tablecmds.h" |
| #include "commands/trigger.h" |
| #include "executor/spi.h" |
| #include "libpq/be-fsstubs.h" |
| #include "libpq/pqsignal.h" |
| #include "miscadmin.h" |
| #include "pg_trace.h" |
| #include "pgstat.h" |
| #include "replication/logical.h" |
| #include "replication/logicallauncher.h" |
| #include "replication/origin.h" |
| #include "replication/snapbuild.h" |
| #include "replication/syncrep.h" |
| #include "replication/walsender.h" |
| #include "storage/condition_variable.h" |
| #include "storage/fd.h" |
| #include "storage/freespace.h" |
| #include "storage/lmgr.h" |
| #include "storage/md.h" |
| #include "storage/predicate.h" |
| #include "storage/proc.h" |
| #include "storage/procarray.h" |
| #include "storage/sinvaladt.h" |
| #include "storage/smgr.h" |
| #include "utils/builtins.h" |
| #include "utils/catcache.h" |
| #include "utils/combocid.h" |
| #include "utils/faultinjector.h" |
| #include "utils/guc.h" |
| #include "utils/inval.h" |
| #include "utils/memutils.h" |
| #include "utils/relmapper.h" |
| |
| #include "utils/builtins.h" |
| #include "utils/resource_manager.h" |
| #include "utils/sharedsnapshot.h" |
| #include "utils/snapmgr.h" |
| #include "utils/timeout.h" |
| #include "utils/timestamp.h" |
| #include "pg_trace.h" |
| |
| #include "access/distributedlog.h" |
| #include "catalog/oid_dispatch.h" |
| #include "cdb/cdbdistributedsnapshot.h" |
| #include "cdb/cdbendpoint.h" |
| #include "cdb/cdbgang.h" |
| #include "cdb/cdblocaldistribxact.h" |
| #include "cdb/cdbtm.h" |
| #include "cdb/cdbvars.h" /* Gp_role, Gp_is_writer, interconnect_setup_timeout */ |
| #include "utils/workfile_mgr.h" |
| #include "utils/vmem_tracker.h" |
| #include "cdb/cdbdisp.h" |
| #include "postmaster/autovacuum.h" |
| |
| #include "cdb/cdbdisp_query.h" /* Extend Protocol Data */ |
| |
| /* |
| * User-tweakable parameters |
| */ |
| int DefaultXactIsoLevel = XACT_READ_COMMITTED; |
| int XactIsoLevel; |
| |
| bool DefaultXactReadOnly = false; |
| bool XactReadOnly; |
| |
| bool DefaultXactDeferrable = false; |
| bool XactDeferrable; |
| |
| int synchronous_commit = SYNCHRONOUS_COMMIT_ON; |
| |
| /* |
| * CheckXidAlive is a xid value pointing to a possibly ongoing (sub) |
| * transaction. Currently, it is used in logical decoding. It's possible |
| * that such transactions can get aborted while the decoding is ongoing in |
| * which case we skip decoding that particular transaction. To ensure that we |
| * check whether the CheckXidAlive is aborted after fetching the tuple from |
| * system tables. We also ensure that during logical decoding we never |
| * directly access the tableam or heap APIs because we are checking for the |
| * concurrent aborts only in systable_* APIs. |
| */ |
| TransactionId CheckXidAlive = InvalidTransactionId; |
| bool bsysscan = false; |
| |
| /* |
| * When running as a parallel worker, we place only a single |
| * TransactionStateData on the parallel worker's state stack, and the XID |
| * reflected there will be that of the *innermost* currently-active |
| * subtransaction in the backend that initiated parallelism. However, |
| * GetTopTransactionId() and TransactionIdIsCurrentTransactionId() |
| * need to return the same answers in the parallel worker as they would have |
| * in the user backend, so we need some additional bookkeeping. |
| * |
| * XactTopFullTransactionId stores the XID of our toplevel transaction, which |
| * will be the same as TopTransactionStateData.fullTransactionId in an |
| * ordinary backend; but in a parallel backend, which does not have the entire |
| * transaction state, it will instead be copied from the backend that started |
| * the parallel operation. |
| * |
| * nParallelCurrentXids will be 0 and ParallelCurrentXids NULL in an ordinary |
| * backend, but in a parallel backend, nParallelCurrentXids will contain the |
| * number of XIDs that need to be considered current, and ParallelCurrentXids |
| * will contain the XIDs themselves. This includes all XIDs that were current |
| * or sub-committed in the parent at the time the parallel operation began. |
| * The XIDs are stored sorted in numerical order (not logical order) to make |
| * lookups as fast as possible. |
| */ |
| FullTransactionId XactTopFullTransactionId = {InvalidTransactionId}; |
| int nParallelCurrentXids = 0; |
| TransactionId *ParallelCurrentXids; |
| |
| int32 gp_subtrans_warn_limit = 16777216; /* 16 million */ |
| |
| /* gp-specific |
| * routine for marking when a sequence makes a mark in the xlog. we need |
| * to keep track of this because sequences are the only reason a reader should |
| * ever write to the xlog during commit. As a result, we keep track of such |
| * and will complain loudly if its violated. |
| */ |
| bool seqXlogWrite; |
| |
| /* |
| * Miscellaneous flag bits to record events which occur on the top level |
| * transaction. These flags are only persisted in MyXactFlags and are intended |
| * so we remember to do certain things later on in the transaction. This is |
| * globally accessible, so can be set from anywhere in the code that requires |
| * recording flags. |
| * |
| * Flag XACT_FLAGS_ACCESSEDTEMPREL not used in GPDB, see comments in |
| * PrepareTransaction() |
| */ |
| int MyXactFlags; |
| |
| /* |
| * transaction states - transaction state from server perspective |
| */ |
| typedef enum TransState |
| { |
| TRANS_DEFAULT, /* idle */ |
| TRANS_START, /* transaction starting */ |
| TRANS_INPROGRESS, /* inside a valid transaction */ |
| TRANS_COMMIT, /* commit in progress */ |
| TRANS_ABORT, /* abort in progress */ |
| TRANS_PREPARE /* prepare in progress */ |
| } TransState; |
| |
| /* |
| * transaction block states - transaction state of client queries |
| * |
| * Note: the subtransaction states are used only for non-topmost |
| * transactions; the others appear only in the topmost transaction. |
| */ |
| typedef enum TBlockState |
| { |
| /* not-in-transaction-block states */ |
| TBLOCK_DEFAULT, /* idle */ |
| TBLOCK_STARTED, /* running single-query transaction */ |
| |
| /* transaction block states */ |
| TBLOCK_BEGIN, /* starting transaction block */ |
| TBLOCK_INPROGRESS, /* live transaction */ |
| TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN */ |
| TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker */ |
| TBLOCK_END, /* COMMIT received */ |
| TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */ |
| TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */ |
| TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received */ |
| TBLOCK_PREPARE, /* live xact, PREPARE received */ |
| |
| /* subtransaction states */ |
| TBLOCK_SUBBEGIN, /* starting a subtransaction */ |
| TBLOCK_SUBINPROGRESS, /* live subtransaction */ |
| TBLOCK_SUBRELEASE, /* RELEASE received */ |
| TBLOCK_SUBCOMMIT, /* COMMIT received while TBLOCK_SUBINPROGRESS */ |
| TBLOCK_SUBABORT, /* failed subxact, awaiting ROLLBACK */ |
| TBLOCK_SUBABORT_END, /* failed subxact, ROLLBACK received */ |
| TBLOCK_SUBABORT_PENDING, /* live subxact, ROLLBACK received */ |
| TBLOCK_SUBRESTART, /* live subxact, ROLLBACK TO received */ |
| TBLOCK_SUBABORT_RESTART /* failed subxact, ROLLBACK TO received */ |
| } TBlockState; |
| |
| /* |
| * transaction state structure |
| */ |
| typedef struct TransactionStateData |
| { |
| FullTransactionId fullTransactionId; /* my FullTransactionId */ |
| SubTransactionId subTransactionId; /* my subxact ID */ |
| char *name; /* savepoint name, if any */ |
| int savepointLevel; /* savepoint level */ |
| TransState state; /* low-level state */ |
| TBlockState blockState; /* high-level state */ |
| int nestingLevel; /* transaction nesting depth */ |
| int gucNestLevel; /* GUC context nesting depth */ |
| MemoryContext curTransactionContext; /* my xact-lifetime context */ |
| ResourceOwner curTransactionOwner; /* my query resources */ |
| TransactionId *childXids; /* subcommitted child XIDs, in XID order */ |
| int nChildXids; /* # of subcommitted child XIDs */ |
| int maxChildXids; /* allocated size of childXids[] */ |
| Oid prevUser; /* previous CurrentUserId setting */ |
| int prevSecContext; /* previous SecurityRestrictionContext */ |
| bool prevXactReadOnly; /* entry-time xact r/o state */ |
| bool startedInRecovery; /* did we start in recovery? */ |
| bool didLogXid; /* has xid been included in WAL record? */ |
| int parallelModeLevel; /* Enter/ExitParallelMode counter */ |
| bool chain; /* start a new block after this one */ |
| bool assigned; /* assigned to top-level XID */ |
| bool executorSaysXactDoesWrites; /* GP executor says xact does writes */ |
| struct TransactionStateData *parent; /* back link to parent */ |
| struct TransactionStateData *fastLink; /* back link to jump to parent for efficient search */ |
| } TransactionStateData; |
| |
| static bool TopXactexecutorDidWriteXLog; /* QE has wrote xlog */ |
| |
| typedef TransactionStateData *TransactionState; |
| |
| #define NUM_NODES_TO_SKIP_FOR_FAST_SEARCH 100 |
| static int fastNodeCount; |
| static TransactionState previousFastLink; |
| |
| /* |
| * Serialized representation used to transmit transaction state to parallel |
| * workers through shared memory. |
| */ |
| typedef struct SerializedTransactionState |
| { |
| int xactIsoLevel; |
| bool xactDeferrable; |
| FullTransactionId topFullTransactionId; |
| FullTransactionId currentFullTransactionId; |
| CommandId currentCommandId; |
| int nParallelCurrentXids; |
| TransactionId parallelCurrentXids[FLEXIBLE_ARRAY_MEMBER]; |
| } SerializedTransactionState; |
| |
| /* The size of SerializedTransactionState, not including the final array. */ |
| #define SerializedTransactionStateHeaderSize \ |
| offsetof(SerializedTransactionState, parallelCurrentXids) |
| |
| /* |
| * CurrentTransactionState always points to the current transaction state |
| * block. It will point to TopTransactionStateData when not in a |
| * transaction at all, or when in a top-level transaction. |
| */ |
| static TransactionStateData TopTransactionStateData = { |
| .state = TRANS_DEFAULT, |
| .blockState = TBLOCK_DEFAULT, |
| .assigned = false, |
| }; |
| |
| /* |
| * unreportedXids holds XIDs of all subtransactions that have not yet been |
| * reported in an XLOG_XACT_ASSIGNMENT record. |
| */ |
| static int nUnreportedXids; |
| static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS]; |
| |
| static TransactionState CurrentTransactionState = &TopTransactionStateData; |
| |
| /* |
| * The subtransaction ID and command ID assignment counters are global |
| * to a whole transaction, so we do not keep them in the state stack. |
| */ |
| static SubTransactionId currentSubTransactionId; |
| static CommandId currentCommandId; |
| static bool currentCommandIdUsed; |
| |
| /* |
| * xactStartTimestamp is the value of transaction_timestamp(). |
| * stmtStartTimestamp is the value of statement_timestamp(). |
| * xactStopTimestamp is the time at which we log a commit or abort WAL record. |
| * These do not change as we enter and exit subtransactions, so we don't |
| * keep them inside the TransactionState stack. |
| */ |
| static TimestampTz xactStartTimestamp; |
| static TimestampTz stmtStartTimestamp; |
| static TimestampTz xactStopTimestamp; |
| |
| /* |
| * Total number of SAVEPOINT commands executed by this transaction. |
| * |
| */ |
| static int currentSavepointTotal; |
| |
| /* |
| * GID to be used for preparing the current transaction. This is also |
| * global to a whole transaction, so we don't keep it in the state stack. |
| */ |
| static char *prepareGID; |
| |
| /* |
| * Some commands want to force synchronous commit. |
| */ |
| static bool forceSyncCommit = false; |
| |
| /* Flag for logging statements in a transaction. */ |
| bool xact_is_sampled = false; |
| |
| /* |
| * Private context for transaction-abort work --- we reserve space for this |
| * at startup to ensure that AbortTransaction and AbortSubTransaction can work |
| * when we've run out of memory. |
| */ |
| static MemoryContext TransactionAbortContext = NULL; |
| |
| /* |
| * List of add-on start- and end-of-xact callbacks |
| */ |
| typedef struct XactCallbackItem |
| { |
| struct XactCallbackItem *next; |
| XactCallback callback; |
| void *arg; |
| } XactCallbackItem; |
| |
| static XactCallbackItem *Xact_callbacks = NULL; |
| static XactCallbackItem *Xact_callbacks_once = NULL; |
| |
| /* |
| * List of add-on start- and end-of-subxact callbacks |
| */ |
| typedef struct SubXactCallbackItem |
| { |
| struct SubXactCallbackItem *next; |
| SubXactCallback callback; |
| void *arg; |
| } SubXactCallbackItem; |
| |
| static SubXactCallbackItem *SubXact_callbacks = NULL; |
| |
| /* |
| * Subtransaction file used to keep subtransaction Ids that spill over from |
| * shared snapshot. Kept outside of shared snapshot because readers and writer |
| * have their own File pointer. |
| */ |
| File subxip_file = 0; |
| |
| /* local function prototypes */ |
| static void AssignTransactionId(TransactionState s); |
| static void AbortTransaction(void); |
| static void AtAbort_Memory(void); |
| static void AtCleanup_Memory(void); |
| static void AtAbort_ResourceOwner(void); |
| static void AtCCI_LocalCache(void); |
| static void AtCommit_Memory(void); |
| static void AtStart_Cache(void); |
| static void AtStart_Memory(void); |
| static void AtStart_ResourceOwner(void); |
| static void CallXactCallbacks(XactEvent event); |
| static void CallXactCallbacksOnce(XactEvent event); |
| static void CallSubXactCallbacks(SubXactEvent event, |
| SubTransactionId mySubid, |
| SubTransactionId parentSubid); |
| static void CleanupTransaction(void); |
| static void CheckTransactionBlock(bool isTopLevel, bool throwError, |
| const char *stmtType); |
| static void CommitTransaction(void); |
| static TransactionId RecordTransactionAbort(bool isSubXact); |
| static void StartTransaction(void); |
| |
| static void StartSubTransaction(void); |
| static void CommitSubTransaction(void); |
| static void AbortSubTransaction(void); |
| static void CleanupSubTransaction(void); |
| static void PushTransaction(void); |
| static void PopTransaction(void); |
| |
| static void AtSubAbort_Memory(void); |
| static void AtSubCleanup_Memory(void); |
| static void AtSubAbort_ResourceOwner(void); |
| static void AtSubCommit_Memory(void); |
| static void AtSubStart_Memory(void); |
| static void AtSubStart_ResourceOwner(void); |
| |
| static void EndLocalDistribXact(bool isCommit); |
| static void ShowTransactionState(const char *str); |
| static void ShowTransactionStateRec(const char *str, TransactionState state); |
| static const char *BlockStateAsString(TBlockState blockState); |
| static const char *TransStateAsString(TransState state); |
| static void DispatchRollbackToSavepoint(char *name); |
| |
| static bool IsCurrentTransactionIdForReader(TransactionId xid); |
| |
| /* ---------------------------------------------------------------- |
| * transaction state accessors |
| * ---------------------------------------------------------------- |
| */ |
| |
| /* |
| * IsTransactionState |
| * |
| * This returns true if we are inside a valid transaction; that is, |
| * it is safe to initiate database access, take heavyweight locks, etc. |
| */ |
| bool |
| IsTransactionState(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| /* |
| * TRANS_DEFAULT and TRANS_ABORT are obviously unsafe states. However, we |
| * also reject the startup/shutdown states TRANS_START, TRANS_COMMIT, |
| * TRANS_PREPARE since it might be too soon or too late within those |
| * transition states to do anything interesting. Hence, the only "valid" |
| * state is TRANS_INPROGRESS. |
| */ |
| return (s->state == TRANS_INPROGRESS); |
| } |
| |
| bool |
| IsAbortInProgress(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| return (s->state == TRANS_ABORT); |
| } |
| |
| bool |
| IsTransactionPreparing(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| return (s->state == TRANS_PREPARE); |
| } |
| /* |
| * IsAbortedTransactionBlockState |
| * |
| * This returns true if we are within an aborted transaction block. |
| */ |
| bool |
| IsAbortedTransactionBlockState(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| if (s->blockState == TBLOCK_ABORT || |
| s->blockState == TBLOCK_SUBABORT) |
| return true; |
| |
| return false; |
| } |
| |
| bool |
| TransactionDidWriteXLog(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| return s->didLogXid; |
| } |
| |
| bool |
| TopXactExecutorDidWriteXLog(void) |
| { |
| return TopXactexecutorDidWriteXLog; |
| } |
| |
| void |
| GetAllTransactionXids( |
| DistributedTransactionId *distribXid, |
| TransactionId *localXid, |
| TransactionId *subXid) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| *distribXid = getDistributedTransactionId(); |
| *localXid = XidFromFullTransactionId(s->fullTransactionId); |
| *subXid = s->subTransactionId; |
| } |
| |
| /* |
| * GetTopTransactionId |
| * |
| * This will return the XID of the main transaction, assigning one if |
| * it's not yet set. Be careful to call this only inside a valid xact. |
| */ |
| TransactionId |
| GetTopTransactionId(void) |
| { |
| if (!FullTransactionIdIsValid(XactTopFullTransactionId)) |
| AssignTransactionId(&TopTransactionStateData); |
| return XidFromFullTransactionId(XactTopFullTransactionId); |
| } |
| |
| /* |
| * GetTopTransactionIdIfAny |
| * |
| * This will return the XID of the main transaction, if one is assigned. |
| * It will return InvalidTransactionId if we are not currently inside a |
| * transaction, or inside a transaction that hasn't yet been assigned an XID. |
| */ |
| TransactionId |
| GetTopTransactionIdIfAny(void) |
| { |
| return XidFromFullTransactionId(XactTopFullTransactionId); |
| } |
| |
| /* |
| * GetCurrentTransactionId |
| * |
| * This will return the XID of the current transaction (main or sub |
| * transaction), assigning one if it's not yet set. Be careful to call this |
| * only inside a valid xact. |
| */ |
| TransactionId |
| GetCurrentTransactionId(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| if (!FullTransactionIdIsValid(s->fullTransactionId)) |
| AssignTransactionId(s); |
| return XidFromFullTransactionId(s->fullTransactionId); |
| } |
| |
| /* |
| * GetCurrentTransactionIdIfAny |
| * |
| * This will return the XID of the current sub xact, if one is assigned. |
| * It will return InvalidTransactionId if we are not currently inside a |
| * transaction, or inside a transaction that hasn't been assigned an XID yet. |
| */ |
| TransactionId |
| GetCurrentTransactionIdIfAny(void) |
| { |
| return XidFromFullTransactionId(CurrentTransactionState->fullTransactionId); |
| } |
| |
| /* |
| * GetTopFullTransactionId |
| * |
| * This will return the FullTransactionId of the main transaction, assigning |
| * one if it's not yet set. Be careful to call this only inside a valid xact. |
| */ |
| FullTransactionId |
| GetTopFullTransactionId(void) |
| { |
| if (!FullTransactionIdIsValid(XactTopFullTransactionId)) |
| AssignTransactionId(&TopTransactionStateData); |
| return XactTopFullTransactionId; |
| } |
| |
| /* |
| * GetTopFullTransactionIdIfAny |
| * |
| * This will return the FullTransactionId of the main transaction, if one is |
| * assigned. It will return InvalidFullTransactionId if we are not currently |
| * inside a transaction, or inside a transaction that hasn't yet been assigned |
| * one. |
| */ |
| FullTransactionId |
| GetTopFullTransactionIdIfAny(void) |
| { |
| return XactTopFullTransactionId; |
| } |
| |
| /* |
| * GetCurrentFullTransactionId |
| * |
| * This will return the FullTransactionId of the current transaction (main or |
| * sub transaction), assigning one if it's not yet set. Be careful to call |
| * this only inside a valid xact. |
| */ |
| FullTransactionId |
| GetCurrentFullTransactionId(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| if (!FullTransactionIdIsValid(s->fullTransactionId)) |
| AssignTransactionId(s); |
| return s->fullTransactionId; |
| } |
| |
| /* |
| * GetCurrentFullTransactionIdIfAny |
| * |
| * This will return the FullTransactionId of the current sub xact, if one is |
| * assigned. It will return InvalidFullTransactionId if we are not currently |
| * inside a transaction, or inside a transaction that hasn't been assigned one |
| * yet. |
| */ |
| FullTransactionId |
| GetCurrentFullTransactionIdIfAny(void) |
| { |
| return CurrentTransactionState->fullTransactionId; |
| } |
| |
| /* |
| * MarkCurrentTransactionIdLoggedIfAny |
| * |
| * Remember that the current xid - if it is assigned - now has been wal logged. |
| */ |
| void |
| MarkCurrentTransactionIdLoggedIfAny(void) |
| { |
| if (FullTransactionIdIsValid(CurrentTransactionState->fullTransactionId)) |
| CurrentTransactionState->didLogXid = true; |
| } |
| |
| void |
| MarkTopTransactionWriteXLogOnExecutor(void) |
| { |
| TopXactexecutorDidWriteXLog = true; |
| } |
| |
| /* |
| * GetStableLatestTransactionId |
| * |
| * Get the transaction's XID if it has one, else read the next-to-be-assigned |
| * XID. Once we have a value, return that same value for the remainder of the |
| * current transaction. This is meant to provide the reference point for the |
| * age(xid) function, but might be useful for other maintenance tasks as well. |
| */ |
| TransactionId |
| GetStableLatestTransactionId(void) |
| { |
| static LocalTransactionId lxid = InvalidLocalTransactionId; |
| static TransactionId stablexid = InvalidTransactionId; |
| |
| if (lxid != MyProc->lxid) |
| { |
| lxid = MyProc->lxid; |
| stablexid = GetTopTransactionIdIfAny(); |
| if (!TransactionIdIsValid(stablexid)) |
| stablexid = ReadNextTransactionId(); |
| } |
| |
| Assert(TransactionIdIsValid(stablexid)); |
| |
| return stablexid; |
| } |
| |
| /* |
| * AssignTransactionId |
| * |
| * Assigns a new permanent FullTransactionId to the given TransactionState. |
| * We do not assign XIDs to transactions until/unless this is called. |
| * Also, any parent TransactionStates that don't yet have XIDs are assigned |
| * one; this maintains the invariant that a child transaction has an XID |
| * following its parent's. |
| */ |
| static void |
| AssignTransactionId(TransactionState s) |
| { |
| bool isSubXact = (s->parent != NULL); |
| ResourceOwner currentOwner; |
| bool log_unknown_top = false; |
| |
| /* Assert that caller didn't screw up */ |
| Assert(!FullTransactionIdIsValid(s->fullTransactionId)); |
| Assert(s->state == TRANS_INPROGRESS); |
| |
| if (DistributedTransactionContext == DTX_CONTEXT_QE_READER || |
| DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON) |
| { |
| elog(ERROR, "AssignTransactionId() called by %s process", |
| DtxContextToString(DistributedTransactionContext)); |
| } |
| |
| /* |
| * Workers synchronize transaction state at the beginning of each parallel |
| * operation, so we can't account for new XIDs at this point. |
| */ |
| if (IsInParallelMode() || IsParallelWorker()) |
| elog(ERROR, "cannot assign XIDs during a parallel operation"); |
| |
| /* |
| * Ensure parent(s) have XIDs, so that a child always has an XID later |
| * than its parent. Mustn't recurse here, or we might get a stack |
| * overflow if we're at the bottom of a huge stack of subtransactions none |
| * of which have XIDs yet. |
| */ |
| if (isSubXact && !FullTransactionIdIsValid(s->parent->fullTransactionId)) |
| { |
| TransactionState p = s->parent; |
| TransactionState *parents; |
| size_t parentOffset = 0; |
| |
| parents = palloc(sizeof(TransactionState) * s->nestingLevel); |
| while (p != NULL && !FullTransactionIdIsValid(p->fullTransactionId)) |
| { |
| parents[parentOffset++] = p; |
| p = p->parent; |
| } |
| |
| /* |
| * This is technically a recursive call, but the recursion will never |
| * be more than one layer deep. |
| */ |
| while (parentOffset != 0) |
| AssignTransactionId(parents[--parentOffset]); |
| |
| pfree(parents); |
| } |
| |
| /* |
| * When wal_level=logical, guarantee that a subtransaction's xid can only |
| * be seen in the WAL stream if its toplevel xid has been logged before. |
| * If necessary we log an xact_assignment record with fewer than |
| * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set |
| * for a transaction even though it appears in a WAL record, we just might |
| * superfluously log something. That can happen when an xid is included |
| * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in |
| * xl_standby_locks. |
| */ |
| if (isSubXact && XLogLogicalInfoActive() && |
| !TopTransactionStateData.didLogXid) |
| log_unknown_top = true; |
| |
| /* |
| * Generate a new FullTransactionId and record its xid in PG_PROC and |
| * pg_subtrans. |
| * |
| * NB: we must make the subtrans entry BEFORE the Xid appears anywhere in |
| * shared storage other than PG_PROC; because if there's no room for it in |
| * PG_PROC, the subtrans entry is needed to ensure that other backends see |
| * the Xid as "running". See GetNewTransactionId. |
| */ |
| s->fullTransactionId = GetNewTransactionId(isSubXact); |
| |
| ereportif(Debug_print_full_dtm, LOG, |
| (errmsg("AssignTransactionId(): assigned xid " UINT64_FORMAT, |
| U64FromFullTransactionId(s->fullTransactionId)))); |
| |
| if (!isSubXact) |
| XactTopFullTransactionId = s->fullTransactionId; |
| |
| if (isSubXact) |
| { |
| Assert(TransactionIdPrecedes(U64FromFullTransactionId(s->parent->fullTransactionId), |
| U64FromFullTransactionId(s->fullTransactionId))); |
| SubTransSetParent(XidFromFullTransactionId(s->fullTransactionId), |
| XidFromFullTransactionId(s->parent->fullTransactionId)); |
| } |
| |
| /* |
| * If it's a top-level transaction, the predicate locking system needs to |
| * be told about it too. |
| */ |
| if (!isSubXact) |
| RegisterPredicateLockingXid(XidFromFullTransactionId(s->fullTransactionId)); |
| |
| /* |
| * Acquire lock on the transaction XID. (We assume this cannot block.) We |
| * have to ensure that the lock is assigned to the transaction's own |
| * ResourceOwner. |
| */ |
| currentOwner = CurrentResourceOwner; |
| CurrentResourceOwner = s->curTransactionOwner; |
| |
| XactLockTableInsert(XidFromFullTransactionId(s->fullTransactionId)); |
| |
| CurrentResourceOwner = currentOwner; |
| |
| /* |
| * Every PGPROC_MAX_CACHED_SUBXIDS assigned transaction ids within each |
| * top-level transaction we issue a WAL record for the assignment. We |
| * include the top-level xid and all the subxids that have not yet been |
| * reported using XLOG_XACT_ASSIGNMENT records. |
| * |
| * This is required to limit the amount of shared memory required in a hot |
| * standby server to keep track of in-progress XIDs. See notes for |
| * RecordKnownAssignedTransactionIds(). |
| * |
| * We don't keep track of the immediate parent of each subxid, only the |
| * top-level transaction that each subxact belongs to. This is correct in |
| * recovery only because aborted subtransactions are separately WAL |
| * logged. |
| * |
| * This is correct even for the case where several levels above us didn't |
| * have an xid assigned as we recursed up to them beforehand. |
| */ |
| if (isSubXact && XLogStandbyInfoActive()) |
| { |
| unreportedXids[nUnreportedXids] = XidFromFullTransactionId(s->fullTransactionId); |
| nUnreportedXids++; |
| |
| /* |
| * ensure this test matches similar one in |
| * RecoverPreparedTransactions() |
| */ |
| if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS || |
| log_unknown_top) |
| { |
| xl_xact_assignment xlrec; |
| |
| /* |
| * xtop is always set by now because we recurse up transaction |
| * stack to the highest unassigned xid and then come back down |
| */ |
| xlrec.xtop = GetTopTransactionId(); |
| Assert(TransactionIdIsValid(xlrec.xtop)); |
| xlrec.nsubxacts = nUnreportedXids; |
| |
| XLogBeginInsert(); |
| XLogRegisterData((char *) &xlrec, MinSizeOfXactAssignment); |
| XLogRegisterData((char *) unreportedXids, |
| nUnreportedXids * sizeof(TransactionId)); |
| |
| (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT); |
| |
| nUnreportedXids = 0; |
| /* mark top, not current xact as having been logged */ |
| TopTransactionStateData.didLogXid = true; |
| } |
| } |
| } |
| |
| /* |
| * GetCurrentSubTransactionId |
| */ |
| SubTransactionId |
| GetCurrentSubTransactionId(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| return s->subTransactionId; |
| } |
| |
| /* |
| * SubTransactionIsActive |
| * |
| * Test if the specified subxact ID is still active. Note caller is |
| * responsible for checking whether this ID is relevant to the current xact. |
| */ |
| bool |
| SubTransactionIsActive(SubTransactionId subxid) |
| { |
| TransactionState s; |
| |
| for (s = CurrentTransactionState; s != NULL; s = s->parent) |
| { |
| if (s->state == TRANS_ABORT) |
| continue; |
| if (s->subTransactionId == subxid) |
| return true; |
| } |
| return false; |
| } |
| |
| |
| /* |
| * GetCurrentCommandId |
| * |
| * "used" must be true if the caller intends to use the command ID to mark |
| * inserted/updated/deleted tuples. false means the ID is being fetched |
| * for read-only purposes (ie, as a snapshot validity cutoff). See |
| * CommandCounterIncrement() for discussion. |
| */ |
| CommandId |
| GetCurrentCommandId(bool used) |
| { |
| /* this is global to a transaction, not subtransaction-local */ |
| if (used) |
| { |
| /* |
| * Forbid setting currentCommandIdUsed in a parallel worker, because |
| * we have no provision for communicating this back to the leader. We |
| * could relax this restriction when currentCommandIdUsed was already |
| * true at the start of the parallel operation. |
| */ |
| Assert(!IsParallelWorker()); |
| currentCommandIdUsed = true; |
| } |
| return currentCommandId; |
| } |
| |
| /* |
| * SetParallelStartTimestamps |
| * |
| * In a parallel worker, we should inherit the parent transaction's |
| * timestamps rather than setting our own. The parallel worker |
| * infrastructure must call this to provide those values before |
| * calling StartTransaction() or SetCurrentStatementStartTimestamp(). |
| */ |
| void |
| SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts) |
| { |
| Assert(IsParallelWorker()); |
| xactStartTimestamp = xact_ts; |
| stmtStartTimestamp = stmt_ts; |
| } |
| |
| /* |
| * GetCurrentTransactionStartTimestamp |
| */ |
| TimestampTz |
| GetCurrentTransactionStartTimestamp(void) |
| { |
| return xactStartTimestamp; |
| } |
| |
| /* |
| * GetCurrentStatementStartTimestamp |
| */ |
| TimestampTz |
| GetCurrentStatementStartTimestamp(void) |
| { |
| return stmtStartTimestamp; |
| } |
| |
| /* |
| * GetCurrentTransactionStopTimestamp |
| * |
| * We return current time if the transaction stop time hasn't been set |
| * (which can happen if we decide we don't need to log an XLOG record). |
| */ |
| TimestampTz |
| GetCurrentTransactionStopTimestamp(void) |
| { |
| if (xactStopTimestamp != 0) |
| return xactStopTimestamp; |
| return GetCurrentTimestamp(); |
| } |
| |
| /* |
| * SetCurrentStatementStartTimestamp |
| * |
| * In a parallel worker, this should already have been provided by a call |
| * to SetParallelStartTimestamps(). |
| */ |
| void |
| SetCurrentStatementStartTimestamp(void) |
| { |
| if (!IsParallelWorker()) |
| stmtStartTimestamp = GetCurrentTimestamp(); |
| else |
| Assert(stmtStartTimestamp != 0); |
| } |
| |
| /* |
| * SetCurrentTransactionStopTimestamp |
| */ |
| static inline void |
| SetCurrentTransactionStopTimestamp(void) |
| { |
| xactStopTimestamp = GetCurrentTimestamp(); |
| } |
| |
| /* |
| * GetCurrentTransactionNestLevel |
| * |
| * Note: this will return zero when not inside any transaction, one when |
| * inside a top-level transaction, etc. |
| */ |
| int |
| GetCurrentTransactionNestLevel(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| return s->nestingLevel; |
| } |
| |
| /* |
| * We will return true for the Xid of the current subtransaction, any of |
| * its subcommitted children, any of its parents, or any of their |
| * previously subcommitted children. However, a transaction being aborted |
| * is no longer "current", even though it may still have an entry on the |
| * state stack. |
| * |
| * The XID of a child is always greater than that of its parent. |
| * Taking advantage of this fact simple optimizations are added instead of linear traversal to fasten the search |
| * 1] Added fastLink/skipLink pointers to skip nodes in list and scan fast across, instead of visiting all nodes in list |
| * 2] Break-out as soon as XID to search is greater than the current node in (parent / child) list |
| */ |
| static bool |
| TransactionIdIsCurrentTransactionIdInternal(TransactionId xid) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| while (s != NULL) |
| { |
| if (s->state != TRANS_ABORT && |
| FullTransactionIdIsValid(s->fullTransactionId)) |
| { |
| int low, |
| high; |
| |
| if (TransactionIdEquals(xid, XidFromFullTransactionId(s->fullTransactionId))) |
| return true; |
| |
| /* As the childXids array is ordered, we can use binary search */ |
| low = 0; |
| high = s->nChildXids - 1; |
| while (low <= high) |
| { |
| int middle; |
| TransactionId probe; |
| |
| middle = low + (high - low) / 2; |
| probe = s->childXids[middle]; |
| if (TransactionIdEquals(probe, xid)) |
| return true; |
| else if (TransactionIdPrecedes(probe, xid)) |
| low = middle + 1; |
| else |
| high = middle - 1; |
| } |
| |
| /* |
| * If not found in childXID list and greater than s->fullTransactionId |
| * it cannot be on stack below this node, |
| * as stack is in decreasing order of XIDs |
| * So, can safely breakout. |
| */ |
| if (TransactionIdFollows(xid, XidFromFullTransactionId(s->fullTransactionId))) |
| break; |
| } |
| |
| if (s->fastLink) |
| { |
| if (TransactionIdPrecedesOrEquals(xid, XidFromFullTransactionId(s->fastLink->fullTransactionId))) |
| { |
| s = s->fastLink; |
| continue; |
| } |
| } |
| |
| s = s->parent; |
| } |
| |
| return false; |
| } |
| |
| /* |
| * IsCurrentTransactionIdForReader |
| * |
| * We can either be a cursor reader or normal reader. |
| * |
| * The writer_proc will contain all of the subtransaction xids of the current transaction. |
| * - case 1: check writer's top transaction id |
| * - case 2: if not, check writer's subtransactions |
| * - case 3: if overflowed, check topmostxid from pg_subtrans with writer's top transaction id |
| */ |
| static |
| bool IsCurrentTransactionIdForReader(TransactionId xid) |
| { |
| Assert(!Gp_is_writer); |
| |
| Assert(SharedLocalSnapshotSlot); |
| |
| LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_SHARED); |
| |
| PGPROC* writer_proc = SharedLocalSnapshotSlot->writer_proc; |
| |
| if (!writer_proc) |
| { |
| LWLockRelease(SharedLocalSnapshotSlot->slotLock); |
| elog(ERROR, "reference to writer proc not found in shared snapshot"); |
| } |
| else if (!writer_proc->pid) |
| { |
| LWLockRelease(SharedLocalSnapshotSlot->slotLock); |
| elog(ERROR, "writer proc reference shared with reader is invalid"); |
| } |
| |
| TransactionId writer_xid = writer_proc->xid; |
| bool overflowed = writer_proc->subxidStatus.overflowed; |
| bool isCurrent = false; |
| |
| if (TransactionIdIsValid(writer_xid)) |
| { |
| /* |
| * Case 1: check top transaction id |
| */ |
| if (TransactionIdEquals(xid, writer_xid)) |
| { |
| ereportif(Debug_print_full_dtm, LOG, |
| (errmsg("reader encountered writer's top xid %u", xid))); |
| isCurrent = true; |
| } |
| else |
| { |
| /* |
| * Case 2: check cached subtransaction ids from latest to earliest |
| */ |
| int subx_index = writer_proc->subxidStatus.count- 1; |
| while (!isCurrent && subx_index >= 0) |
| { |
| isCurrent = TransactionIdEquals(writer_proc->subxids.xids[subx_index], xid); |
| subx_index--; |
| } |
| } |
| } |
| |
| /* release the lock before accessing pg_subtrans */ |
| LWLockRelease(SharedLocalSnapshotSlot->slotLock); |
| |
| /* |
| * Case 3: if subxids overflowed, check topmostxid of xid from pg_subtrans |
| */ |
| if (!isCurrent && overflowed) |
| { |
| Assert(TransactionIdIsValid(writer_xid)); |
| /* |
| * QE readers don't have access to writer's transaction state. |
| * Therefore, unlike writer, readers have to lookup pg_subtrans, which |
| * is more expensive than searching for an xid in transaction state. If |
| * xid is older than the oldest running transaction we know of, it is |
| * definitely not current and we can skip pg_subtrans. Note that |
| * pg_subtrans is not guaranteed to exist for transactions that are |
| * known to be finished. |
| */ |
| if (TransactionIdFollowsOrEquals(xid, TransactionXmin) && |
| TransactionIdEquals(SubTransGetTopmostTransaction(xid), writer_xid)) |
| { |
| /* |
| * xid is a subtransaction of current transaction. Did it abort? |
| * If this was a writer, TransactionIdIsCurrentTransactionId() |
| * returns false for aborted subtransactions. We must therefore |
| * consult clog. In a writer, this information is available in |
| * CurrentTransactionState. |
| */ |
| isCurrent = TransactionIdDidAbortForReader(xid) ? false : true; |
| } |
| } |
| |
| ereportif(isCurrent && Debug_print_full_dtm, LOG, |
| (errmsg("reader encountered writer's subxact ID %u", xid))); |
| |
| return isCurrent; |
| } |
| |
| /* |
| * TransactionIdIsCurrentTransactionId |
| */ |
| bool |
| TransactionIdIsCurrentTransactionId(TransactionId xid) |
| { |
| bool isCurrentTransactionId = false; |
| |
| /* |
| * We always say that BootstrapTransactionId is "not my transaction ID" |
| * even when it is (ie, during bootstrap). Along with the fact that |
| * transam.c always treats BootstrapTransactionId as already committed, |
| * this causes the heapam_visibility.c routines to see all tuples as |
| * committed, which is what we need during bootstrap. (Bootstrap mode |
| * only inserts tuples, it never updates or deletes them, so all tuples |
| * can be presumed good immediately.) |
| * |
| * Likewise, InvalidTransactionId and FrozenTransactionId are certainly |
| * not my transaction ID, so we can just return "false" immediately for |
| * any non-normal XID. |
| */ |
| if (!TransactionIdIsNormal(xid)) |
| return false; |
| |
| if (TransactionIdEquals(xid, GetTopTransactionIdIfAny())) |
| return true; |
| |
| /* |
| * In parallel workers, the XIDs we must consider as current are stored in |
| * ParallelCurrentXids rather than the transaction-state stack. Note that |
| * the XIDs in this array are sorted numerically rather than according to |
| * transactionIdPrecedes order. |
| */ |
| if (nParallelCurrentXids > 0) |
| { |
| int low, |
| high; |
| |
| low = 0; |
| high = nParallelCurrentXids - 1; |
| while (low <= high) |
| { |
| int middle; |
| TransactionId probe; |
| |
| middle = low + (high - low) / 2; |
| probe = ParallelCurrentXids[middle]; |
| if (probe == xid) |
| return true; |
| else if (probe < xid) |
| low = middle + 1; |
| else |
| high = middle - 1; |
| } |
| return false; |
| } |
| |
| if (DistributedTransactionContext == DTX_CONTEXT_QE_READER || |
| DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON) |
| { |
| isCurrentTransactionId = IsCurrentTransactionIdForReader(xid); |
| |
| ereportif(Debug_print_full_dtm, LOG, |
| (errmsg("qExec Reader xid = %u, is current = %s", |
| xid, (isCurrentTransactionId ? "true" : "false")))); |
| |
| return isCurrentTransactionId; |
| } |
| |
| /* we aren't a reader */ |
| Assert(DistributedTransactionContext != DTX_CONTEXT_QE_ENTRY_DB_SINGLETON); |
| |
| return TransactionIdIsCurrentTransactionIdInternal(xid); |
| } |
| |
| /* |
| * TransactionStartedDuringRecovery |
| * |
| * Returns true if the current transaction started while recovery was still |
| * in progress. Recovery might have ended since so RecoveryInProgress() might |
| * return false already. |
| */ |
| bool |
| TransactionStartedDuringRecovery(void) |
| { |
| return CurrentTransactionState->startedInRecovery; |
| } |
| |
| /* |
| * EnterParallelMode |
| */ |
| void |
| EnterParallelMode(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| Assert(s->parallelModeLevel >= 0); |
| |
| ++s->parallelModeLevel; |
| } |
| |
| /* |
| * ExitParallelMode |
| */ |
| void |
| ExitParallelMode(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| Assert(s->parallelModeLevel > 0); |
| Assert(s->parallelModeLevel > 1 || !ParallelContextActive()); |
| |
| --s->parallelModeLevel; |
| } |
| |
| /* |
| * IsInParallelMode |
| * |
| * Are we in a parallel operation, as either the leader or a worker? Check |
| * this to prohibit operations that change backend-local state expected to |
| * match across all workers. Mere caches usually don't require such a |
| * restriction. State modified in a strict push/pop fashion, such as the |
| * active snapshot stack, is often fine. |
| */ |
| bool |
| IsInParallelMode(void) |
| { |
| return CurrentTransactionState->parallelModeLevel != 0; |
| } |
| |
| /* |
| * CommandCounterIncrement |
| */ |
| void |
| CommandCounterIncrement(void) |
| { |
| /* |
| * If the current value of the command counter hasn't been "used" to mark |
| * tuples, we need not increment it, since there's no need to distinguish |
| * a read-only command from others. This helps postpone command counter |
| * overflow, and keeps no-op CommandCounterIncrement operations cheap. |
| */ |
| if (currentCommandIdUsed) |
| { |
| /* |
| * Workers synchronize transaction state at the beginning of each |
| * parallel operation, so we can't account for new commands after that |
| * point. |
| */ |
| if (IsInParallelMode() || IsParallelWorker()) |
| elog(ERROR, "cannot start commands during a parallel operation"); |
| |
| currentCommandId += 1; |
| if (currentCommandId == InvalidCommandId) |
| { |
| currentCommandId -= 1; |
| ereport(ERROR, |
| (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| errmsg("cannot have more than 2^32-2 commands in a transaction"))); |
| } |
| currentCommandIdUsed = false; |
| |
| /* Propagate new command ID into static snapshots */ |
| SnapshotSetCommandId(currentCommandId); |
| |
| /* |
| * Make any catalog changes done by the just-completed command visible |
| * in the local syscache. We obviously don't need to do this after a |
| * read-only command. (But see hacks in inval.c to make real sure we |
| * don't think a command that queued inval messages was read-only.) |
| */ |
| AtCCI_LocalCache(); |
| } |
| } |
| |
| /* |
| * ForceSyncCommit |
| * |
| * Interface routine to allow commands to force a synchronous commit of the |
| * current top-level transaction. Currently, two-phase commit does not |
| * persist and restore this variable. So long as all callers use |
| * PreventInTransactionBlock(), that omission has no consequences. |
| */ |
| void |
| ForceSyncCommit(void) |
| { |
| forceSyncCommit = true; |
| } |
| |
| |
| /* ---------------------------------------------------------------- |
| * StartTransaction stuff |
| * ---------------------------------------------------------------- |
| */ |
| |
| /* |
| * AtStart_Cache |
| */ |
| static void |
| AtStart_Cache(void) |
| { |
| AcceptInvalidationMessages(); |
| } |
| |
| /* |
| * AtStart_Memory |
| */ |
| static void |
| AtStart_Memory(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| /* |
| * If this is the first time through, create a private context for |
| * AbortTransaction to work in. By reserving some space now, we can |
| * insulate AbortTransaction from out-of-memory scenarios. Like |
| * ErrorContext, we set it up with slow growth rate and a nonzero minimum |
| * size, so that space will be reserved immediately. |
| */ |
| if (TransactionAbortContext == NULL) |
| TransactionAbortContext = |
| AllocSetContextCreate(TopMemoryContext, |
| "TransactionAbortContext", |
| 32 * 1024, |
| 32 * 1024, |
| 32 * 1024); |
| |
| /* |
| * We shouldn't have a transaction context already. |
| */ |
| Assert(TopTransactionContext == NULL); |
| |
| /* |
| * Create a toplevel context for the transaction. |
| */ |
| TopTransactionContext = |
| AllocSetContextCreate(TopMemoryContext, |
| "TopTransactionContext", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| /* |
| * In a top-level transaction, CurTransactionContext is the same as |
| * TopTransactionContext. |
| */ |
| CurTransactionContext = TopTransactionContext; |
| s->curTransactionContext = CurTransactionContext; |
| |
| /* Make the CurTransactionContext active. */ |
| MemoryContextSwitchTo(CurTransactionContext); |
| } |
| |
| /* |
| * AtStart_ResourceOwner |
| */ |
| static void |
| AtStart_ResourceOwner(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| /* |
| * We shouldn't have a transaction resource owner already. |
| */ |
| Assert(TopTransactionResourceOwner == NULL); |
| |
| /* |
| * Create a toplevel resource owner for the transaction. |
| */ |
| s->curTransactionOwner = ResourceOwnerCreate(NULL, "TopTransaction"); |
| |
| TopTransactionResourceOwner = s->curTransactionOwner; |
| CurTransactionResourceOwner = s->curTransactionOwner; |
| CurrentResourceOwner = s->curTransactionOwner; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * StartSubTransaction stuff |
| * ---------------------------------------------------------------- |
| */ |
| |
| /* |
| * AtSubStart_Memory |
| */ |
| static void |
| AtSubStart_Memory(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| Assert(CurTransactionContext != NULL); |
| |
| /* |
| * Create a CurTransactionContext, which will be used to hold data that |
| * survives subtransaction commit but disappears on subtransaction abort. |
| * We make it a child of the immediate parent's CurTransactionContext. |
| */ |
| CurTransactionContext = AllocSetContextCreate(CurTransactionContext, |
| "CurTransactionContext", |
| ALLOCSET_DEFAULT_SIZES); |
| s->curTransactionContext = CurTransactionContext; |
| |
| /* Make the CurTransactionContext active. */ |
| MemoryContextSwitchTo(CurTransactionContext); |
| } |
| |
| /* |
| * AtSubStart_ResourceOwner |
| */ |
| static void |
| AtSubStart_ResourceOwner(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| Assert(s->parent != NULL); |
| |
| /* |
| * Create a resource owner for the subtransaction. We make it a child of |
| * the immediate parent's resource owner. |
| */ |
| s->curTransactionOwner = |
| ResourceOwnerCreate(s->parent->curTransactionOwner, |
| "SubTransaction"); |
| |
| CurTransactionResourceOwner = s->curTransactionOwner; |
| CurrentResourceOwner = s->curTransactionOwner; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * CommitTransaction stuff |
| * ---------------------------------------------------------------- |
| */ |
| |
| /* |
| * RecordTransactionCommit |
| * |
| * Returns latest XID among xact and its children, or InvalidTransactionId |
| * if the xact has no XID. (We compute that here just because it's easier.) |
| * |
| * If you change this function, see RecordTransactionCommitPrepared also. |
| */ |
| static TransactionId |
| RecordTransactionCommit(void) |
| { |
| TransactionId xid; |
| bool markXidCommitted; |
| TransactionId latestXid = InvalidTransactionId; |
| int nrels; |
| RelFileNodePendingDelete *rels; |
| DbDirNode *deldbs; |
| int ndeldbs; |
| int nchildren; |
| TransactionId *children; |
| int nmsgs = 0; |
| SharedInvalidationMessage *invalMessages = NULL; |
| bool RelcacheInitFileInval = false; |
| bool wrote_xlog; |
| bool isDtxPrepared = 0; |
| bool isOnePhaseQE = (Gp_role == GP_ROLE_EXECUTE && MyTmGxactLocal->isOnePhaseCommit); |
| |
| /* Like in CommitTransaction(), treat a QE reader as if there was no XID */ |
| if (DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON || |
| DistributedTransactionContext == DTX_CONTEXT_QE_READER) |
| { |
| xid = InvalidTransactionId; |
| } |
| else |
| xid = GetTopTransactionIdIfAny(); |
| markXidCommitted = TransactionIdIsValid(xid); |
| |
| /* |
| * Log pending invalidations for logical decoding of in-progress |
| * transactions. Normally for DDLs, we log this at each command end, |
| * however, for certain cases where we directly update the system table |
| * without a transaction block, the invalidations are not logged till this |
| * time. |
| */ |
| if (XLogLogicalInfoActive()) |
| LogLogicalInvalidations(); |
| |
| /* Get data needed for commit record */ |
| nrels = smgrGetPendingDeletes(true, &rels); |
| ndeldbs = GetPendingDbDeletes(true, &deldbs); |
| nchildren = xactGetCommittedChildren(&children); |
| if (XLogStandbyInfoActive()) |
| nmsgs = xactGetCommittedInvalidationMessages(&invalMessages, |
| &RelcacheInitFileInval); |
| wrote_xlog = (XactLastRecEnd != 0); |
| |
| isDtxPrepared = isPreparedDtxTransaction(); |
| |
| /* |
| * If we haven't been assigned an XID yet, we neither can, nor do we want |
| * to write a COMMIT record. |
| */ |
| if (!markXidCommitted) |
| { |
| /* |
| * We expect that every RelationDropStorage is followed by a catalog |
| * update, and hence XID assignment, so we shouldn't get here with any |
| * pending deletes. Use a real test not just an Assert to check this, |
| * since it's a bit fragile. |
| */ |
| if (nrels != 0) |
| elog(ERROR, "cannot commit a transaction that deleted files but has no xid"); |
| |
| /* Can't have child XIDs either; AssignTransactionId enforces this */ |
| Assert(nchildren == 0); |
| |
| /* |
| * Transactions without an assigned xid can contain invalidation |
| * messages (e.g. explicit relcache invalidations or catcache |
| * invalidations for inplace updates); standbys need to process those. |
| * We can't emit a commit record without an xid, and we don't want to |
| * force assigning an xid, because that'd be problematic for e.g. |
| * vacuum. Hence we emit a bespoke record for the invalidations. We |
| * don't want to use that in case a commit record is emitted, so they |
| * happen synchronously with commits (besides not wanting to emit more |
| * WAL records). |
| */ |
| if (nmsgs != 0) |
| { |
| LogStandbyInvalidations(nmsgs, invalMessages, |
| RelcacheInitFileInval); |
| wrote_xlog = true; /* not strictly necessary */ |
| } |
| |
| /* |
| * If we didn't create XLOG entries, we're done here; otherwise we |
| * should trigger flushing those entries the same as a commit record |
| * would. This will primarily happen for HOT pruning and the like; we |
| * want these to be flushed to disk in due time. |
| */ |
| if (!isDtxPrepared && !wrote_xlog) |
| goto cleanup; |
| } |
| |
| /* |
| * A QD may write distributed commit record even when it didn't have a |
| * valid local XID if the distributed transaction changed data only on |
| * segments (e.g. DML statement). |
| */ |
| if (markXidCommitted || isDtxPrepared) |
| { |
| bool replorigin; |
| |
| /* |
| * Are we using the replication origins feature? Or, in other words, |
| * are we replaying remote actions? |
| */ |
| replorigin = (replorigin_session_origin != InvalidRepOriginId && |
| replorigin_session_origin != DoNotReplicateId); |
| |
| /* |
| * Begin commit critical section and insert the commit XLOG record. |
| */ |
| /* Tell bufmgr and smgr to prepare for commit */ |
| if (markXidCommitted) |
| BufmgrCommit(); |
| |
| if (isDtxPrepared) |
| SIMPLE_FAULT_INJECTOR("before_xlog_xact_distributed_commit"); |
| |
| /* |
| * Mark ourselves as within our "commit critical section". This |
| * forces any concurrent checkpoint to wait until we've updated |
| * pg_xact. Without this, it is possible for the checkpoint to set |
| * REDO after the XLOG record but fail to flush the pg_xact update to |
| * disk, leading to loss of the transaction commit if the system |
| * crashes a little later. |
| * |
| * Note: we could, but don't bother to, set this flag in |
| * RecordTransactionAbort. That's because loss of a transaction abort |
| * is noncritical; the presumption would be that it aborted, anyway. |
| * |
| * It's safe to change the delayChkpt flag of our own backend without |
| * holding the ProcArrayLock, since we're the only one modifying it. |
| * This makes checkpoint's determination of which xacts are delayChkpt |
| * a bit fuzzy, but it doesn't matter. |
| * |
| * In GPDB, if this is a distributed transaction, checkpoint process |
| * should hold off obtaining the REDO pointer while a backend is |
| * writing distributed commit xlog record and changing state of the |
| * distributed transaction. Otherwise, it is possible that a commit |
| * record is written by a transaction and the checkpointer determines |
| * REDO pointer to be after this commit record. But the transaction is |
| * yet to change its state to INSERTED_DISRIBUTED_COMMITTED and the |
| * checkpoint process fails to record this transaction in the |
| * checkpoint. Crash recovery will never see the commit record for |
| * this transaction and the second phase of 2PC will never happen. The |
| * delayChkpt flag avoids this situation by blocking checkpointer until a |
| * backend has finished updating the state. |
| */ |
| Assert(!MyProc->delayChkpt); |
| START_CRIT_SECTION(); |
| MyProc->delayChkpt = true; |
| |
| SetCurrentTransactionStopTimestamp(); |
| |
| SIMPLE_FAULT_INJECTOR("onephase_transaction_commit"); |
| |
| XactLogCommitRecord(xactStopTimestamp, |
| GetPendingTablespaceForDeletionForCommit(), |
| nchildren, children, nrels, rels, |
| nmsgs, invalMessages, |
| ndeldbs, deldbs, |
| RelcacheInitFileInval, |
| MyXactFlags, |
| InvalidTransactionId, NULL /* plain commit */ ); |
| |
| if (replorigin) |
| /* Move LSNs forward for this replication origin */ |
| replorigin_session_advance(replorigin_session_origin_lsn, |
| XactLastRecEnd); |
| |
| /* |
| * Record commit timestamp. The value comes from plain commit |
| * timestamp if there's no replication origin; otherwise, the |
| * timestamp was already set in replorigin_session_origin_timestamp by |
| * replication. |
| * |
| * We don't need to WAL-log anything here, as the commit record |
| * written above already contains the data. |
| */ |
| |
| if (!replorigin || replorigin_session_origin_timestamp == 0) |
| replorigin_session_origin_timestamp = xactStopTimestamp; |
| |
| TransactionTreeSetCommitTsData(xid, nchildren, children, |
| replorigin_session_origin_timestamp, |
| replorigin_session_origin); |
| } |
| |
| #ifdef IMPLEMENT_ASYNC_COMMIT |
| /* |
| * Check if we want to commit asynchronously. We can allow the XLOG flush |
| * to happen asynchronously if synchronous_commit=off, or if the current |
| * transaction has not performed any WAL-logged operation or didn't assign |
| * an xid. The transaction can end up not writing any WAL, even if it has |
| * an xid, if it only wrote to temporary and/or unlogged tables. It can |
| * end up having written WAL without an xid if it did HOT pruning. In |
| * case of a crash, the loss of such a transaction will be irrelevant; |
| * temp tables will be lost anyway, unlogged tables will be truncated and |
| * HOT pruning will be done again later. (Given the foregoing, you might |
| * think that it would be unnecessary to emit the XLOG record at all in |
| * this case, but we don't currently try to do that. It would certainly |
| * cause problems at least in Hot Standby mode, where the |
| * KnownAssignedXids machinery requires tracking every XID assignment. It |
| * might be OK to skip it only when wal_level < replica, but for now we |
| * don't.) |
| * |
| * In GPDB, however, all user transactions need to be committed synchronously, |
| * because we use two-phase commit across the nodes. In order to make GPDB support |
| * async-commit, we also need to implement the temp table detection. |
| */ |
| if ((wrote_xlog && markXidCommitted && |
| synchronous_commit > SYNCHRONOUS_COMMIT_OFF) || |
| forceSyncCommit || nrels > 0) |
| #endif |
| { |
| XLogFlush(XactLastRecEnd); |
| |
| #ifdef FAULT_INJECTOR |
| if (isDtxPrepared == 0 && |
| CurrentTransactionState->blockState == TBLOCK_END) |
| { |
| FaultInjector_InjectFaultIfSet("local_tm_record_transaction_commit", |
| DDLNotSpecified, |
| "", // databaseName |
| ""); // tableName |
| } |
| #endif |
| |
| /* |
| * Now we may update the CLOG, if we wrote a COMMIT record above |
| */ |
| if (markXidCommitted) |
| { |
| /* |
| * Mark the distributed transaction committed. Note that this |
| * is done *before* updating the clog. As soon as an XID is |
| * marked as comitted in the clog, other backends might try |
| * to look it up in the DistributedLog. |
| */ |
| /* UNDONE: What are the locking issues here? */ |
| if (isDtxPrepared || isOnePhaseQE) |
| DistributedLog_SetCommittedTree(xid, nchildren, children, |
| getDistributedTransactionId(), |
| /* isRedo */ false); |
| |
| TransactionIdCommitTree(xid, nchildren, children); |
| } |
| } |
| #ifdef IMPLEMENT_ASYNC_COMMIT |
| else |
| { |
| /* |
| * Asynchronous commit case: |
| * |
| * This enables possible committed transaction loss in the case of a |
| * postmaster crash because WAL buffers are left unwritten. Ideally we |
| * could issue the WAL write without the fsync, but some |
| * wal_sync_methods do not allow separate write/fsync. |
| * |
| * Report the latest async commit LSN, so that the WAL writer knows to |
| * flush this commit. |
| */ |
| XLogSetAsyncXactLSN(XactLastRecEnd); |
| |
| /* |
| * We must not immediately update the CLOG, since we didn't flush the |
| * XLOG. Instead, we store the LSN up to which the XLOG must be |
| * flushed before the CLOG may be updated. |
| */ |
| if (markXidCommitted) |
| TransactionIdAsyncCommitTree(xid, nchildren, children, XactLastRecEnd); |
| } |
| #endif |
| |
| #ifdef FAULT_INJECTOR |
| if (isDtxPrepared) |
| { |
| FaultInjector_InjectFaultIfSet("dtm_xlog_distributed_commit", |
| DDLNotSpecified, |
| "", // databaseName |
| ""); // tableName |
| } |
| #endif |
| |
| /* |
| * If we entered a commit critical section, leave it now, and let |
| * checkpoints proceed. |
| */ |
| if (markXidCommitted || isDtxPrepared) |
| { |
| MyProc->delayChkpt = false; |
| END_CRIT_SECTION(); |
| SIMPLE_FAULT_INJECTOR("after_xlog_xact_distributed_commit"); |
| } |
| |
| /* Compute latestXid while we have the child XIDs handy */ |
| latestXid = TransactionIdLatest(xid, nchildren, children); |
| |
| /* |
| * Wait for synchronous replication, if required. Similar to the decision |
| * above about using committing asynchronously we only want to wait if |
| * this backend assigned an xid and wrote WAL. No need to wait if an xid |
| * was assigned due to temporary/unlogged tables or due to HOT pruning. |
| * |
| * Note that at this stage we have marked clog, but still show as running |
| * in the procarray and continue to hold locks. |
| */ |
| if ((wrote_xlog && markXidCommitted) || isDtxPrepared) |
| SyncRepWaitForLSN(XactLastRecEnd, true); |
| |
| /* remember end of last commit record */ |
| XactLastCommitEnd = XactLastRecEnd; |
| |
| /* Reset XactLastRecEnd until the next transaction writes something */ |
| XactLastRecEnd = 0; |
| cleanup: |
| /* And clean up local data */ |
| |
| return latestXid; |
| } |
| |
| /* |
| * RecordDistributedForgetCommitted |
| */ |
| XLogRecPtr |
| RecordDistributedForgetCommitted(DistributedTransactionId gxid) |
| { |
| xl_xact_distributed_forget xlrec; |
| XLogRecPtr recptr; |
| |
| xlrec.gxid = gxid; |
| |
| XLogBeginInsert(); |
| XLogRegisterData((char *) &xlrec, sizeof(xl_xact_distributed_forget)); |
| |
| recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_DISTRIBUTED_FORGET); |
| /* only flush immediately if we want to wait for remote_apply */ |
| if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY) |
| XLogFlush(recptr); |
| |
| return recptr; |
| } |
| |
| /* |
| * AtCCI_LocalCache |
| */ |
| static void |
| AtCCI_LocalCache(void) |
| { |
| /* |
| * Make any pending relation map changes visible. We must do this before |
| * processing local sinval messages, so that the map changes will get |
| * reflected into the relcache when relcache invals are processed. |
| */ |
| AtCCI_RelationMap(); |
| |
| /* |
| * Make catalog changes visible to me for the next command. |
| */ |
| CommandEndInvalidationMessages(); |
| } |
| |
| /* |
| * AtCommit_Memory |
| */ |
| static void |
| AtCommit_Memory(void) |
| { |
| /* |
| * Now that we're "out" of a transaction, have the system allocate things |
| * in the top memory context instead of per-transaction contexts. |
| */ |
| MemoryContextSwitchTo(TopMemoryContext); |
| |
| /* |
| * Release all transaction-local memory. |
| */ |
| Assert(TopTransactionContext != NULL); |
| MemoryContextDelete(TopTransactionContext); |
| TopTransactionContext = NULL; |
| CurTransactionContext = NULL; |
| CurrentTransactionState->curTransactionContext = NULL; |
| } |
| |
| |
| /* ---------------------------------------------------------------- |
| * CommitSubTransaction stuff |
| * ---------------------------------------------------------------- |
| */ |
| |
| /* |
| * AtSubCommit_Memory |
| */ |
| static void |
| AtSubCommit_Memory(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| Assert(s->parent != NULL); |
| |
| /* Return to parent transaction level's memory context. */ |
| CurTransactionContext = s->parent->curTransactionContext; |
| MemoryContextSwitchTo(CurTransactionContext); |
| |
| /* |
| * Ordinarily we cannot throw away the child's CurTransactionContext, |
| * since the data it contains will be needed at upper commit. However, if |
| * there isn't actually anything in it, we can throw it away. This avoids |
| * a small memory leak in the common case of "trivial" subxacts. |
| */ |
| if (MemoryContextIsEmpty(s->curTransactionContext)) |
| { |
| MemoryContextDelete(s->curTransactionContext); |
| s->curTransactionContext = NULL; |
| } |
| } |
| |
| /* |
| * AtSubCommit_childXids |
| * |
| * Pass my own XID and my child XIDs up to my parent as committed children. |
| */ |
| static void |
| AtSubCommit_childXids(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| int new_nChildXids; |
| |
| Assert(s->parent != NULL); |
| |
| /* |
| * The parent childXids array will need to hold my XID and all my |
| * childXids, in addition to the XIDs already there. |
| */ |
| new_nChildXids = s->parent->nChildXids + s->nChildXids + 1; |
| |
| /* Allocate or enlarge the parent array if necessary */ |
| if (s->parent->maxChildXids < new_nChildXids) |
| { |
| int new_maxChildXids; |
| TransactionId *new_childXids; |
| |
| /* |
| * Make it 2x what's needed right now, to avoid having to enlarge it |
| * repeatedly. But we can't go above MaxAllocSize. (The latter limit |
| * is what ensures that we don't need to worry about integer overflow |
| * here or in the calculation of new_nChildXids.) |
| */ |
| new_maxChildXids = Min(new_nChildXids * 2, |
| (int) (MaxAllocSize / sizeof(TransactionId))); |
| |
| if (new_maxChildXids < new_nChildXids) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| errmsg("maximum number of committed subtransactions (%d) exceeded", |
| (int) (MaxAllocSize / sizeof(TransactionId))))); |
| |
| /* |
| * We keep the child-XID arrays in TopTransactionContext; this avoids |
| * setting up child-transaction contexts for what might be just a few |
| * bytes of grandchild XIDs. |
| */ |
| if (s->parent->childXids == NULL) |
| new_childXids = |
| MemoryContextAlloc(TopTransactionContext, |
| new_maxChildXids * sizeof(TransactionId)); |
| else |
| new_childXids = repalloc(s->parent->childXids, |
| new_maxChildXids * sizeof(TransactionId)); |
| |
| s->parent->childXids = new_childXids; |
| s->parent->maxChildXids = new_maxChildXids; |
| } |
| |
| /* |
| * Copy all my XIDs to parent's array. |
| * |
| * Note: We rely on the fact that the XID of a child always follows that |
| * of its parent. By copying the XID of this subtransaction before the |
| * XIDs of its children, we ensure that the array stays ordered. Likewise, |
| * all XIDs already in the array belong to subtransactions started and |
| * subcommitted before us, so their XIDs must precede ours. |
| */ |
| s->parent->childXids[s->parent->nChildXids] = XidFromFullTransactionId(s->fullTransactionId); |
| |
| if (s->nChildXids > 0) |
| memcpy(&s->parent->childXids[s->parent->nChildXids + 1], |
| s->childXids, |
| s->nChildXids * sizeof(TransactionId)); |
| |
| s->parent->nChildXids = new_nChildXids; |
| |
| /* Release child's array to avoid leakage */ |
| if (s->childXids != NULL) |
| pfree(s->childXids); |
| /* We must reset these to avoid double-free if fail later in commit */ |
| s->childXids = NULL; |
| s->nChildXids = 0; |
| s->maxChildXids = 0; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * AbortTransaction stuff |
| * ---------------------------------------------------------------- |
| */ |
| |
| /* |
| * RecordTransactionAbort |
| * |
| * Returns latest XID among xact and its children, or InvalidTransactionId |
| * if the xact has no XID. (We compute that here just because it's easier.) |
| */ |
| static TransactionId |
| RecordTransactionAbort(bool isSubXact) |
| { |
| TransactionId xid; |
| TransactionId latestXid; |
| int nrels; |
| RelFileNodePendingDelete *rels; |
| int nchildren; |
| TransactionId *children; |
| TimestampTz xact_time; |
| DbDirNode *deldbs; |
| int ndeldbs; |
| bool isQEReader; |
| |
| /* Like in CommitTransaction(), treat a QE reader as if there was no XID */ |
| isQEReader = (DistributedTransactionContext == DTX_CONTEXT_QE_READER || |
| DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON); |
| /* |
| * Also, if an error occurred during distributed COMMIT processing, and |
| * we had already decided that we are going to commit this transaction and |
| * wrote a commit record for it, there's no turning back. The Distributed |
| * Transaction Manager will take care of completing the transaction for us. |
| * |
| * If the distributed transaction has started rolling back, it means we already |
| * wrote the abort record, skip it. |
| */ |
| if (isQEReader || |
| getCurrentDtxState() == DTX_STATE_NOTIFYING_COMMIT_PREPARED || |
| CurrentDtxIsRollingback() || |
| MyProc->localDistribXactData.state == LOCALDISTRIBXACT_STATE_ABORTED) |
| xid = InvalidTransactionId; |
| else |
| xid = GetCurrentTransactionIdIfAny(); |
| |
| /* |
| * If we haven't been assigned an XID, nobody will care whether we aborted |
| * or not. Hence, we're done in that case. It does not matter if we have |
| * rels to delete (note that this routine is not responsible for actually |
| * deleting 'em). We cannot have any child XIDs, either. |
| */ |
| SetCurrentTransactionStopTimestamp(); |
| if (!TransactionIdIsValid(xid)) |
| { |
| /* Reset XactLastRecEnd until the next transaction writes something */ |
| if (!isSubXact) |
| XactLastRecEnd = 0; |
| return InvalidTransactionId; |
| } |
| |
| /* |
| * We have a valid XID, so we should write an ABORT record for it. |
| * |
| * We do not flush XLOG to disk here, since the default assumption after a |
| * crash would be that we aborted, anyway. For the same reason, we don't |
| * need to worry about interlocking against checkpoint start. |
| */ |
| |
| /* |
| * Check that we haven't aborted halfway through RecordTransactionCommit. |
| */ |
| if (TransactionIdDidCommit(xid)) |
| elog(PANIC, "cannot abort transaction %u, it was already committed", |
| xid); |
| |
| /* Fetch the data we need for the abort record */ |
| nrels = smgrGetPendingDeletes(false, &rels); |
| ndeldbs = GetPendingDbDeletes(false, &deldbs); |
| nchildren = xactGetCommittedChildren(&children); |
| |
| /* XXX do we really need a critical section here? */ |
| START_CRIT_SECTION(); |
| |
| /* Write the ABORT record */ |
| if (isSubXact) |
| xact_time = GetCurrentTimestamp(); |
| else |
| { |
| SetCurrentTransactionStopTimestamp(); |
| xact_time = xactStopTimestamp; |
| } |
| |
| XactLogAbortRecord(xact_time, |
| GetPendingTablespaceForDeletionForAbort(), |
| nchildren, children, |
| nrels, rels, |
| ndeldbs, deldbs, |
| MyXactFlags, InvalidTransactionId, |
| NULL); |
| |
| /* |
| * Report the latest async abort LSN, so that the WAL writer knows to |
| * flush this abort. There's nothing to be gained by delaying this, since |
| * WALWriter may as well do this when it can. This is important with |
| * streaming replication because if we don't flush WAL regularly we will |
| * find that large aborts leave us with a long backlog for when commits |
| * occur after the abort, increasing our window of data loss should |
| * problems occur at that point. |
| */ |
| if (!isSubXact) |
| XLogSetAsyncXactLSN(XactLastRecEnd); |
| |
| /* |
| * Mark the transaction aborted in clog. This is not absolutely necessary |
| * but we may as well do it while we are here; also, in the subxact case |
| * it is helpful because XactLockTableWait makes use of it to avoid |
| * waiting for already-aborted subtransactions. It is OK to do it without |
| * having flushed the ABORT record to disk, because in event of a crash |
| * we'd be assumed to have aborted anyway. |
| */ |
| TransactionIdAbortTree(xid, nchildren, children); |
| |
| END_CRIT_SECTION(); |
| |
| /* Compute latestXid while we have the child XIDs handy */ |
| latestXid = TransactionIdLatest(xid, nchildren, children); |
| |
| /* |
| * If we're aborting a subtransaction, we can immediately remove failed |
| * XIDs from PGPROC's cache of running child XIDs. We do that here for |
| * subxacts, because we already have the child XID array at hand. For |
| * main xacts, the equivalent happens just after this function returns. |
| */ |
| if (isSubXact) |
| XidCacheRemoveRunningXids(xid, nchildren, children, latestXid); |
| |
| /* Reset XactLastRecEnd until the next transaction writes something */ |
| if (!isSubXact) |
| XactLastRecEnd = 0; |
| |
| if (max_wal_senders > 0) |
| WalSndWakeup(); |
| |
| return latestXid; |
| } |
| |
| /* |
| * AtAbort_Memory |
| */ |
| static void |
| AtAbort_Memory(void) |
| { |
| /* |
| * Switch into TransactionAbortContext, which should have some free space |
| * even if nothing else does. We'll work in this context until we've |
| * finished cleaning up. |
| * |
| * It is barely possible to get here when we've not been able to create |
| * TransactionAbortContext yet; if so use TopMemoryContext. |
| */ |
| if (TransactionAbortContext != NULL) |
| MemoryContextSwitchTo(TransactionAbortContext); |
| else |
| MemoryContextSwitchTo(TopMemoryContext); |
| } |
| |
| /* |
| * AtSubAbort_Memory |
| */ |
| static void |
| AtSubAbort_Memory(void) |
| { |
| Assert(TransactionAbortContext != NULL); |
| |
| MemoryContextSwitchTo(TransactionAbortContext); |
| } |
| |
| |
| /* |
| * AtAbort_ResourceOwner |
| */ |
| static void |
| AtAbort_ResourceOwner(void) |
| { |
| /* |
| * Make sure we have a valid ResourceOwner, if possible (else it will be |
| * NULL, which is OK) |
| */ |
| CurrentResourceOwner = TopTransactionResourceOwner; |
| } |
| |
| /* |
| * AtSubAbort_ResourceOwner |
| */ |
| static void |
| AtSubAbort_ResourceOwner(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| /* Make sure we have a valid ResourceOwner */ |
| CurrentResourceOwner = s->curTransactionOwner; |
| } |
| |
| |
| /* |
| * AtSubAbort_childXids |
| */ |
| static void |
| AtSubAbort_childXids(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| /* |
| * We keep the child-XID arrays in TopTransactionContext (see |
| * AtSubCommit_childXids). This means we'd better free the array |
| * explicitly at abort to avoid leakage. |
| */ |
| if (s->childXids != NULL) |
| pfree(s->childXids); |
| s->childXids = NULL; |
| s->nChildXids = 0; |
| s->maxChildXids = 0; |
| |
| /* |
| * We could prune the unreportedXids array here. But we don't bother. That |
| * would potentially reduce number of XLOG_XACT_ASSIGNMENT records but it |
| * would likely introduce more CPU time into the more common paths, so we |
| * choose not to do that. |
| */ |
| } |
| |
| /* ---------------------------------------------------------------- |
| * CleanupTransaction stuff |
| * ---------------------------------------------------------------- |
| */ |
| |
| /* |
| * AtCleanup_Memory |
| */ |
| static void |
| AtCleanup_Memory(void) |
| { |
| Assert(CurrentTransactionState->parent == NULL); |
| |
| /* |
| * Now that we're "out" of a transaction, have the system allocate things |
| * in the top memory context instead of per-transaction contexts. |
| */ |
| MemoryContextSwitchTo(TopMemoryContext); |
| |
| /* |
| * Clear the special abort context for next time. |
| */ |
| if (TransactionAbortContext != NULL) |
| MemoryContextResetAndDeleteChildren(TransactionAbortContext); |
| |
| /* |
| * Release all transaction-local memory. |
| */ |
| if (TopTransactionContext != NULL) |
| MemoryContextDelete(TopTransactionContext); |
| TopTransactionContext = NULL; |
| CurTransactionContext = NULL; |
| CurrentTransactionState->curTransactionContext = NULL; |
| } |
| |
| |
| /* ---------------------------------------------------------------- |
| * CleanupSubTransaction stuff |
| * ---------------------------------------------------------------- |
| */ |
| |
| /* |
| * AtSubCleanup_Memory |
| */ |
| static void |
| AtSubCleanup_Memory(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| Assert(s->parent != NULL); |
| |
| /* Make sure we're not in an about-to-be-deleted context */ |
| MemoryContextSwitchTo(s->parent->curTransactionContext); |
| CurTransactionContext = s->parent->curTransactionContext; |
| |
| /* |
| * Clear the special abort context for next time. |
| */ |
| if (TransactionAbortContext != NULL) |
| MemoryContextResetAndDeleteChildren(TransactionAbortContext); |
| |
| /* |
| * Delete the subxact local memory contexts. Its CurTransactionContext can |
| * go too (note this also kills CurTransactionContexts from any children |
| * of the subxact). |
| */ |
| if (s->curTransactionContext) |
| MemoryContextDelete(s->curTransactionContext); |
| s->curTransactionContext = NULL; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * interface routines |
| * ---------------------------------------------------------------- |
| */ |
| /* MPP routine for setting the transaction id. this is needed for the shared |
| * snapshot for segmates. |
| * |
| * TODO: this sucks to have to allow this since its potentially very dangerous. |
| * maybe we can re-factor the shared snapshot stuff differently to fix this. |
| * but unfortunately the XID and snapshot info is kept in two entirely different |
| * places so it doesn't look good. |
| * |
| * DOH: this totally ignores subtransactions for now! |
| */ |
| void |
| SetSharedTransactionId_writer(DtxContext distributedTransactionContext) |
| { |
| Assert(SharedLocalSnapshotSlot != NULL); |
| Assert(LWLockHeldByMe(SharedLocalSnapshotSlot->slotLock)); |
| |
| Assert(distributedTransactionContext == DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE || |
| distributedTransactionContext == DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER || |
| distributedTransactionContext == DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER || |
| distributedTransactionContext == DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT); |
| |
| ereportif(Debug_print_full_dtm, LOG, |
| (errmsg("%s setting shared xid " UINT64_FORMAT " -> " UINT64_FORMAT, |
| DtxContextToString(distributedTransactionContext), |
| U64FromFullTransactionId(SharedLocalSnapshotSlot->fullXid), |
| U64FromFullTransactionId(TopTransactionStateData.fullTransactionId)))); |
| SharedLocalSnapshotSlot->fullXid = TopTransactionStateData.fullTransactionId; |
| } |
| |
| void |
| SetSharedTransactionId_reader(FullTransactionId xid, CommandId cid, DtxContext distributedTransactionContext) |
| { |
| Assert(distributedTransactionContext == DTX_CONTEXT_QE_READER || |
| distributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON); |
| |
| /* |
| * For DTX_CONTEXT_QE_READER or DTX_CONTEXT_QE_ENTRY_DB_SINGLETON, during |
| * StartTransaction(), currently we temporarily set the |
| * TopTransactionStateData.transactionId to what we find that time in |
| * SharedLocalSnapshot slot. Since, then QE writer could have moved-on and |
| * hence we reset the same to update to correct value here. |
| */ |
| TopTransactionStateData.fullTransactionId = xid; |
| currentCommandId = cid; |
| ereportif(Debug_print_full_dtm, LOG, |
| (errmsg("qExec READER setting local xid= " UINT64_FORMAT ", cid=%u " |
| "(distributedXid "UINT64_FORMAT"/%u)", |
| U64FromFullTransactionId(TopTransactionStateData.fullTransactionId), currentCommandId, |
| QEDtxContextInfo.distributedXid, |
| QEDtxContextInfo.segmateSync))); |
| } |
| |
| /* |
| * StartTransaction |
| */ |
| static void |
| StartTransaction(void) |
| { |
| TransactionState s; |
| VirtualTransactionId vxid; |
| if (Gp_role == GP_ROLE_DISPATCH) |
| setupRegularDtxContext(); |
| |
| if (DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON) |
| { |
| SIMPLE_FAULT_INJECTOR("transaction_start_under_entry_db_singleton"); |
| } |
| |
| /* |
| * Let's just make sure the state stack is empty |
| */ |
| s = &TopTransactionStateData; |
| CurrentTransactionState = s; |
| |
| Assert(!FullTransactionIdIsValid(XactTopFullTransactionId)); |
| |
| /* check the current transaction state */ |
| Assert(s->state == TRANS_DEFAULT); |
| |
| /* |
| * Set the current transaction state information appropriately during |
| * start processing. Note that once the transaction status is switched |
| * this process cannot fail until the user ID and the security context |
| * flags are fetched below. |
| */ |
| s->state = TRANS_START; |
| s->fullTransactionId = InvalidFullTransactionId; /* until assigned */ |
| |
| /* Determine if statements are logged in this transaction */ |
| xact_is_sampled = log_xact_sample_rate != 0 && |
| (log_xact_sample_rate == 1 || |
| random() <= log_xact_sample_rate * MAX_RANDOM_VALUE); |
| |
| /* |
| * initialize current transaction state fields |
| * |
| * note: prevXactReadOnly is not used at the outermost level |
| */ |
| s->nestingLevel = 1; |
| s->gucNestLevel = 1; |
| s->childXids = NULL; |
| s->nChildXids = 0; |
| s->maxChildXids = 0; |
| |
| /* |
| * Once the current user ID and the security context flags are fetched, |
| * both will be properly reset even if transaction startup fails. |
| */ |
| GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); |
| |
| /* SecurityRestrictionContext should never be set outside a transaction */ |
| Assert(s->prevSecContext == 0); |
| |
| /* |
| * Make sure we've reset xact state variables |
| * |
| * If recovery is still in progress, mark this transaction as read-only. |
| * We have lower level defences in XLogInsert and elsewhere to stop us |
| * from modifying data during recovery, but this gives the normal |
| * indication to the user that the transaction is read-only. |
| */ |
| if (RecoveryInProgress()) |
| { |
| s->startedInRecovery = true; |
| XactReadOnly = true; |
| } |
| else |
| { |
| s->startedInRecovery = false; |
| XactReadOnly = DefaultXactReadOnly; |
| } |
| XactDeferrable = DefaultXactDeferrable; |
| XactIsoLevel = DefaultXactIsoLevel; |
| forceSyncCommit = false; |
| seqXlogWrite = false; |
| MyXactFlags = 0; |
| |
| /* |
| * reinitialize within-transaction counters |
| */ |
| s->subTransactionId = TopSubTransactionId; |
| currentSubTransactionId = TopSubTransactionId; |
| currentCommandId = FirstCommandId; |
| currentCommandIdUsed = false; |
| currentSavepointTotal = 0; |
| |
| fastNodeCount = 0; |
| previousFastLink = NULL; |
| |
| /* |
| * initialize reported xid accounting |
| */ |
| nUnreportedXids = 0; |
| s->didLogXid = false; |
| TopXactexecutorDidWriteXLog = false; |
| |
| /* |
| * must initialize resource-management stuff first |
| */ |
| AtStart_Memory(); |
| AtStart_ResourceOwner(); |
| |
| /* |
| * Transactions may be started while recovery is in progress, if |
| * hot standby is enabled. |
| */ |
| AssertImply(DistributedTransactionContext != DTX_CONTEXT_LOCAL_ONLY, |
| EnableHotStandby || !s->startedInRecovery); |
| /* |
| * MPP Modification |
| * |
| * If we're an executor and don't have a valid QDSentXID, then we're starting |
| * a purely-local transaction. |
| */ |
| switch (DistributedTransactionContext) |
| { |
| case DTX_CONTEXT_LOCAL_ONLY: |
| case DTX_CONTEXT_QD_RETRY_PHASE_2: |
| case DTX_CONTEXT_QE_FINISH_PREPARED: |
| { |
| if (DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY && |
| Gp_role == GP_ROLE_UTILITY) |
| { |
| LocalDistribXactData *ele = &MyProc->localDistribXactData; |
| ele->state = LOCALDISTRIBXACT_STATE_ACTIVE; |
| } |
| /* |
| * MPP: we're in utility-mode or a QE starting a pure-local |
| * transaction without any synchronization to segmates! |
| * (e.g. CatchupInterruptHandler) |
| */ |
| } |
| break; |
| |
| case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE: |
| { |
| if (SharedLocalSnapshotSlot != NULL) |
| { |
| LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_EXCLUSIVE); |
| ereportif(Debug_print_full_dtm, LOG, |
| (errmsg("setting shared snapshot startTimestamp = " |
| INT64_FORMAT "[old=" INT64_FORMAT "])", |
| stmtStartTimestamp, |
| SharedLocalSnapshotSlot->startTimestamp))); |
| SharedLocalSnapshotSlot->startTimestamp = stmtStartTimestamp; |
| LWLockRelease(SharedLocalSnapshotSlot->slotLock); |
| } |
| LocalDistribXactData *ele = &MyProc->localDistribXactData; |
| ele->state = LOCALDISTRIBXACT_STATE_ACTIVE; |
| } |
| break; |
| |
| case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER: |
| case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER: |
| /* |
| * Sanity check for the global xid. |
| * |
| * Note for hot standby dispatch: the standby QEs are still |
| * writers, just like primary QEs for SELECT queries. But |
| * hot standby dispatch never has a valid gxid, so we skip |
| * the gxid checks for the standby QEs. |
| */ |
| if (!IS_HOT_STANDBY_QE()) |
| { |
| if (QEDtxContextInfo.distributedXid == InvalidDistributedTransactionId) |
| elog(ERROR, |
| "distributed transaction id is invalid in context %s", |
| DtxContextToString(DistributedTransactionContext)); |
| |
| /* |
| * Update distributed XID info, this is only used for |
| * debugging. |
| */ |
| LocalDistribXactData *ele = &MyProc->localDistribXactData; |
| ele->distribXid = QEDtxContextInfo.distributedXid; |
| ele->state = LOCALDISTRIBXACT_STATE_ACTIVE; |
| } |
| else |
| Assert(QEDtxContextInfo.distributedXid == InvalidDistributedTransactionId); |
| |
| /* fall through */ |
| case DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT: |
| { |
| /* If we're running in test-mode insert a delay in writer. */ |
| if (gp_enable_slow_writer_testmode) |
| pg_usleep(500000); |
| |
| /* |
| * Snapshot must not be created before setting transaction |
| * isolation level. |
| */ |
| Assert(!FirstSnapshotSet); |
| |
| /* Assume transaction characteristics as sent by QD */ |
| XactIsoLevel = mppTxOptions_IsoLevel( |
| QEDtxContextInfo.distributedTxnOptions); |
| XactReadOnly = isMppTxOptions_ReadOnly( |
| QEDtxContextInfo.distributedTxnOptions); |
| |
| /* a hot standby transaction must be read-only */ |
| AssertImply(IS_HOT_STANDBY_QE(), XactReadOnly); |
| |
| /* |
| * MPP: we're a QE Writer. |
| */ |
| MyTmGxact->gxid = QEDtxContextInfo.distributedXid; |
| |
| if (SharedLocalSnapshotSlot != NULL) |
| { |
| LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_EXCLUSIVE); |
| |
| SharedLocalSnapshotSlot->ready = false; |
| SharedLocalSnapshotSlot->fullXid = s->fullTransactionId; |
| SharedLocalSnapshotSlot->startTimestamp = stmtStartTimestamp; |
| SharedLocalSnapshotSlot->distributedXid = QEDtxContextInfo.distributedXid; |
| SharedLocalSnapshotSlot->writer_proc = MyProc; |
| |
| ereportif(Debug_print_full_dtm, LOG, |
| (errmsg( |
| "qExec writer setting distributedXid: "UINT64_FORMAT |
| " sharedQDxid (shared xid " UINT64_FORMAT " -> " UINT64_FORMAT ") ready %s" |
| " (shared timeStamp = " INT64_FORMAT " -> " |
| INT64_FORMAT ")", |
| SharedLocalSnapshotSlot->distributedXid, |
| U64FromFullTransactionId(SharedLocalSnapshotSlot->fullXid), |
| U64FromFullTransactionId(s->fullTransactionId), |
| SharedLocalSnapshotSlot->ready ? "true" : "false", |
| SharedLocalSnapshotSlot->startTimestamp, |
| xactStartTimestamp))); |
| LWLockRelease(SharedLocalSnapshotSlot->slotLock); |
| } |
| } |
| break; |
| |
| case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON: |
| case DTX_CONTEXT_QE_READER: |
| { |
| /* |
| * MPP: we're a QE Reader. |
| */ |
| Assert (SharedLocalSnapshotSlot != NULL); |
| MyTmGxact->gxid = QEDtxContextInfo.distributedXid; |
| |
| /* |
| * Snapshot must not be created before setting transaction |
| * isolation level. |
| */ |
| Assert(!FirstSnapshotSet); |
| |
| /* Assume transaction characteristics as sent by QD */ |
| XactIsoLevel = mppTxOptions_IsoLevel( |
| QEDtxContextInfo.distributedTxnOptions); |
| XactReadOnly = isMppTxOptions_ReadOnly( |
| QEDtxContextInfo.distributedTxnOptions); |
| |
| if (unlikely(Debug_print_full_dtm)) |
| { |
| LWLockAcquire(SharedSnapshotLock, LW_SHARED); /* For SharedSnapshotDump() */ |
| ereport(LOG, (errmsg("qExec reader: distributedXid "UINT64_FORMAT" currcid %d " |
| "gxid = "UINT64_FORMAT" DtxContext '%s' sharedsnapshots: %s", |
| QEDtxContextInfo.distributedXid, |
| QEDtxContextInfo.curcid, |
| getDistributedTransactionId(), |
| DtxContextToString(DistributedTransactionContext), |
| SharedSnapshotDump()))); |
| LWLockRelease(SharedSnapshotLock); |
| } |
| } |
| break; |
| |
| case DTX_CONTEXT_QE_PREPARED: |
| elog(FATAL, "Unexpected segment distribute transaction context: '%s'", |
| DtxContextToString(DistributedTransactionContext)); |
| break; |
| |
| default: |
| elog(PANIC, "Unrecognized DTX transaction context: %d", |
| (int) DistributedTransactionContext); |
| break; |
| } |
| |
| ereportif(Debug_print_snapshot_dtm, LOG, |
| (errmsg("[Distributed Snapshot #%u] *StartTransaction* " |
| "(gxid = "UINT64_FORMAT", xid = " UINT64_FORMAT ", '%s')", |
| (!FirstSnapshotSet ? 0 : |
| GetTransactionSnapshot()-> |
| distribSnapshotWithLocalMapping.ds.distribSnapshotId), |
| getDistributedTransactionId(), |
| U64FromFullTransactionId(s->fullTransactionId), |
| DtxContextToString(DistributedTransactionContext)))); |
| |
| /* |
| * Assign a new LocalTransactionId, and combine it with the backendId to |
| * form a virtual transaction id. |
| */ |
| vxid.backendId = MyBackendId; |
| vxid.localTransactionId = GetNextLocalTransactionId(); |
| |
| /* |
| * Lock the virtual transaction id before we announce it in the proc array |
| */ |
| VirtualXactLockTableInsert(vxid); |
| |
| /* |
| * Advertise it in the proc array. We assume assignment of |
| * localTransactionId is atomic, and the backendId should be set already. |
| */ |
| Assert(MyProc->backendId == vxid.backendId); |
| MyProc->lxid = vxid.localTransactionId; |
| |
| TRACE_POSTGRESQL_TRANSACTION_START(vxid.localTransactionId); |
| |
| /* |
| * set transaction_timestamp() (a/k/a now()). Normally, we want this to |
| * be the same as the first command's statement_timestamp(), so don't do a |
| * fresh GetCurrentTimestamp() call (which'd be expensive anyway). But |
| * for transactions started inside procedures (i.e., nonatomic SPI |
| * contexts), we do need to advance the timestamp. Also, in a parallel |
| * worker, the timestamp should already have been provided by a call to |
| * SetParallelStartTimestamps(). |
| */ |
| if (!IsParallelWorker()) |
| { |
| if (!SPI_inside_nonatomic_context()) |
| xactStartTimestamp = stmtStartTimestamp; |
| else |
| xactStartTimestamp = GetCurrentTimestamp(); |
| } |
| else |
| Assert(xactStartTimestamp != 0); |
| pgstat_report_xact_timestamp(xactStartTimestamp); |
| /* Mark xactStopTimestamp as unset. */ |
| xactStopTimestamp = 0; |
| |
| /* |
| * initialize other subsystems for new transaction |
| */ |
| AtStart_GUC(); |
| AtStart_Cache(); |
| AfterTriggerBeginXact(); |
| |
| /* |
| * done with start processing, set current transaction state to "in |
| * progress" |
| */ |
| s->state = TRANS_INPROGRESS; |
| |
| /* |
| * Update the snapshot of gp_segment_configuration, it's not changed |
| * until the end of transaction, do this update inside a transaction |
| * because it does a catalog lookup. |
| * |
| * Sometimes, a new transaction is started before first access to db, |
| * however, reading a catalog like gp_segment_configuration need a |
| * database be selected. In such case, we disallow updating the snapshot |
| * of segments configuration. |
| */ |
| if (Gp_role == GP_ROLE_DISPATCH && OidIsValid(MyDatabaseId)) |
| cdbcomponent_updateCdbComponents(); |
| |
| /* |
| * Acquire a resource group slot. |
| * |
| * Slot is successfully acquired when AssignResGroupOnMaster() is returned. |
| * This slot will be released when the transaction is committed or aborted. |
| * |
| * Note that AssignResGroupOnMaster() can throw a PG exception. Since we |
| * have set the transaction state to TRANS_INPROGRESS by this point, any |
| * exceptions thrown will trigger AbortTransaction() and free the slot. |
| * |
| * It's important that we acquire the resource group *after* starting the |
| * transaction (i.e. setting up the per-transaction memory context). |
| * As part of determining the resource group that the transaction should be |
| * assigned to, AssignResGroupOnMaster() accesses pg_authid, and a |
| * transaction should be in progress when it does so. |
| */ |
| if (ShouldAssignResGroupOnMaster()) |
| AssignResGroupOnMaster(); |
| |
| initialize_wal_bytes_written(); |
| ShowTransactionState("StartTransaction"); |
| |
| ereportif(Debug_print_full_dtm, LOG, |
| (errmsg("StartTransaction in DTX Context = '%s', " |
| "isolation level %s, read-only = %d, %s", |
| DtxContextToString(DistributedTransactionContext), |
| IsoLevelAsUpperString(XactIsoLevel), XactReadOnly, |
| LocalDistribXact_DisplayString(MyProc->pgprocno)))); |
| } |
| |
| /* |
| * CommitTransaction |
| * |
| * NB: if you change this routine, better look at PrepareTransaction too! |
| */ |
| static void |
| CommitTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| TransactionId latestXid; |
| bool is_parallel_worker; |
| |
| is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS); |
| |
| /* Enforce parallel mode restrictions during parallel worker commit. */ |
| if (is_parallel_worker) |
| EnterParallelMode(); |
| |
| ShowTransactionState("CommitTransaction"); |
| |
| /* |
| * check the current transaction state |
| */ |
| if (s->state != TRANS_INPROGRESS) |
| elog(WARNING, "CommitTransaction while in %s state", |
| TransStateAsString(s->state)); |
| Assert(s->parent == NULL); |
| |
| if (Gp_role == GP_ROLE_EXECUTE && !Gp_is_writer) |
| elog(DEBUG1,"CommitTransaction: called as segment Reader"); |
| |
| system_relation_modified = false; |
| |
| /* |
| * Do pre-commit processing that involves calling user-defined code, such |
| * as triggers. SECURITY_RESTRICTED_OPERATION contexts must not queue an |
| * action that would run here, because that would bypass the sandbox. |
| * Since closing cursors could queue trigger actions, triggers could open |
| * cursors, etc, we have to keep looping until there's nothing left to do. |
| */ |
| for (;;) |
| { |
| /* |
| * Fire all currently pending deferred triggers. |
| */ |
| AfterTriggerFireDeferred(); |
| |
| /* |
| * Close open portals (converting holdable ones into static portals). |
| * If there weren't any, we are done ... otherwise loop back to check |
| * if they queued deferred triggers. Lather, rinse, repeat. |
| */ |
| if (!PreCommit_Portals(false)) |
| break; |
| } |
| |
| /* |
| * The remaining actions cannot call any user-defined code, so it's safe |
| * to start shutting down within-transaction services. But note that most |
| * of this stuff could still throw an error, which would switch us into |
| * the transaction-abort path. |
| */ |
| |
| CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT |
| : XACT_EVENT_PRE_COMMIT); |
| |
| AtEOXact_ExtendProtocolData(); |
| |
| /* If we might have parallel workers, clean them up now. */ |
| if (IsInParallelMode()) |
| AtEOXact_Parallel(true); |
| |
| /* Clean up CBDB style parallel workers which we might have. */ |
| AtEOXact_CBDB_Parallel(); |
| |
| /* Shut down the deferred-trigger manager */ |
| AfterTriggerEndXact(true); |
| |
| AtEOXact_SharedSnapshot(); |
| |
| /* Perform any Resource Scheduler commit procesing. */ |
| if ((Gp_role == GP_ROLE_DISPATCH || IS_SINGLENODE()) && IsResQueueEnabled()) |
| AtCommit_ResScheduler(); |
| |
| AtEOXact_IVM(true); |
| /* |
| * Let ON COMMIT management do its thing (must happen after closing |
| * cursors, to avoid dangling-reference problems) |
| */ |
| PreCommit_on_commit_actions(); |
| |
| /* This can still fail */ |
| AtEOXact_DispatchOids(true); |
| |
| /* |
| * Synchronize files that are created and not WAL-logged during this |
| * transaction. This must happen before AtEOXact_RelationMap(), so that we |
| * don't see committed-but-broken files after a crash. |
| */ |
| smgrDoPendingSyncs(true, is_parallel_worker); |
| |
| /* close large objects before lower-level cleanup */ |
| AtEOXact_LargeObject(true); |
| |
| /* |
| * Insert notifications sent by NOTIFY commands into the queue. This |
| * should be late in the pre-commit sequence to minimize time spent |
| * holding the notify-insertion lock. However, this could result in |
| * creating a snapshot, so we must do it before serializable cleanup. |
| */ |
| PreCommit_Notify(); |
| |
| /* |
| * Mark serializable transaction as complete for predicate locking |
| * purposes. This should be done as late as we can put it and still allow |
| * errors to be raised for failure patterns found at commit. This is not |
| * appropriate in a parallel worker however, because we aren't committing |
| * the leader's transaction and its serializable state will live on. |
| */ |
| if (!is_parallel_worker) |
| PreCommit_CheckForSerializationFailure(); |
| |
| /* |
| * Prepare all QE. |
| */ |
| prepareDtxTransaction(); |
| |
| #ifdef FAULT_INJECTOR |
| if (isPreparedDtxTransaction()) |
| { |
| FaultInjector_InjectFaultIfSet( |
| "transaction_abort_after_distributed_prepared", |
| DDLNotSpecified, |
| "", // databaseName |
| ""); // tableName |
| } |
| #endif |
| |
| if (Debug_abort_after_distributed_prepared && |
| isPreparedDtxTransaction()) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_FAULT_INJECT), |
| errmsg("Raise an error as directed by Debug_abort_after_distributed_prepared"))); |
| } |
| |
| /* Prevent cancel/die interrupt while cleaning up */ |
| HOLD_INTERRUPTS(); |
| |
| /* Commit updates to the relation map --- do this as late as possible */ |
| AtEOXact_RelationMap(true, is_parallel_worker); |
| |
| /* |
| * set the current transaction state information appropriately during |
| * commit processing |
| */ |
| s->state = TRANS_COMMIT; |
| s->parallelModeLevel = 0; |
| |
| if (!is_parallel_worker) |
| { |
| /* |
| * We need to mark our XIDs as committed in pg_xact. This is where we |
| * durably commit. |
| */ |
| latestXid = RecordTransactionCommit(); |
| } |
| else |
| { |
| /* |
| * We must not mark our XID committed; the parallel leader is |
| * responsible for that. |
| */ |
| latestXid = InvalidTransactionId; |
| |
| /* |
| * Make sure the leader will know about any WAL we wrote before it |
| * commits. |
| */ |
| ParallelWorkerReportLastRecEnd(XactLastRecEnd); |
| } |
| |
| TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid); |
| |
| /* |
| * Do 2nd phase of commit to all QE. NOTE: we can't process |
| * signals (which may attempt to abort our now partially-completed |
| * transaction) until we've notified the QEs. |
| * |
| * And, that we have not master released locks, yet, too. |
| * |
| * Note: do this BEFORE clearing the resource owner, as the dispatch |
| * routines might want to use them. Plus, we want AtCommit_Memory to |
| * happen after using the dispatcher. |
| */ |
| if (notifyCommittedDtxTransactionIsNeeded()) |
| notifyCommittedDtxTransaction(); |
| |
| /* |
| * Let others know about no transaction in progress by me. Note that this |
| * must be done _before_ releasing locks we hold and _after_ |
| * RecordTransactionCommit. |
| */ |
| ProcArrayEndTransaction(MyProc, latestXid); |
| |
| EndLocalDistribXact(true); |
| |
| /* |
| * This is all post-commit cleanup. Note that if an error is raised here, |
| * it's too late to abort the transaction. This should be just |
| * noncritical resource releasing. |
| * |
| * The ordering of operations is not entirely random. The idea is: |
| * release resources visible to other backends (eg, files, buffer pins); |
| * then release locks; then release backend-local resources. We want to |
| * release locks at the point where any backend waiting for us will see |
| * our transaction as being fully cleaned up. |
| * |
| * Resources that can be associated with individual queries are handled by |
| * the ResourceOwner mechanism. The other calls here are for backend-wide |
| * state. |
| */ |
| |
| CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_COMMIT |
| : XACT_EVENT_COMMIT); |
| CallXactCallbacksOnce(XACT_EVENT_COMMIT); |
| |
| ResourceOwnerRelease(TopTransactionResourceOwner, |
| RESOURCE_RELEASE_BEFORE_LOCKS, |
| true, true); |
| |
| /* detach combocid dsm */ |
| AtEOXact_ComboCid_Dsm_Detach(); |
| |
| /* Check we've released all buffer pins */ |
| AtEOXact_Buffers(true); |
| |
| /* Clean up the relation cache */ |
| AtEOXact_RelationCache(true); |
| |
| /* |
| * Make catalog changes visible to all backends. This has to happen after |
| * relcache references are dropped (see comments for |
| * AtEOXact_RelationCache), but before locks are released (if anyone is |
| * waiting for lock on a relation we've modified, we want them to know |
| * about the catalog change before they start using the relation). |
| */ |
| AtEOXact_Inval(true); |
| |
| AtEOXact_MultiXact(); |
| |
| AtCommit_TablespaceStorage(); |
| |
| ResourceOwnerRelease(TopTransactionResourceOwner, |
| RESOURCE_RELEASE_LOCKS, |
| true, true); |
| ResourceOwnerRelease(TopTransactionResourceOwner, |
| RESOURCE_RELEASE_AFTER_LOCKS, |
| true, true); |
| |
| /* |
| * Likewise, dropping of files deleted during the transaction is best done |
| * after releasing relcache and buffer pins. (This is not strictly |
| * necessary during commit, since such pins should have been released |
| * already, but this ordering is definitely critical during abort.) Since |
| * this may take many seconds, also delay until after releasing locks. |
| * Other backends will observe the attendant catalog changes and not |
| * attempt to access affected files. |
| * |
| * Same considerations for tablespace deletion |
| */ |
| smgrDoPendingDeletes(true); |
| DoPendingDbDeletes(true); |
| |
| /* |
| * Release the session level lock held for a movedb operation. This is to |
| * prevent another transaction from moving database objects into the source |
| * database oid directory while it is being deleted. We don't worry about |
| * aborts as we release session level locks automatically during an abort |
| * as opposed to a commit. We must also release in utility mode (e.g. |
| * standalone backends used in TAP tests). |
| */ |
| if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY || |
| IS_SINGLENODE()) |
| MoveDbSessionLockRelease(); |
| |
| /* |
| * Send out notification signals to other backends (and do other |
| * post-commit NOTIFY cleanup). This must not happen until after our |
| * transaction is fully done from the viewpoint of other backends. |
| */ |
| AtCommit_Notify(); |
| |
| /* |
| * Everything after this should be purely internal-to-this-backend |
| * cleanup. |
| */ |
| AtEOXact_GUC(true, 1); |
| AtEOXact_SPI(true); |
| AtEOXact_Enum(); |
| AtEOXact_on_commit_actions(true); |
| AtEOXact_Namespace(true, is_parallel_worker); |
| AtEOXact_SMgr(); |
| AtEOXact_Files(true); |
| AtEOXact_ComboCid(); |
| AtEOXact_HashTables(true); |
| AtEOXact_PgStat(true, is_parallel_worker); |
| AtEOXact_Snapshot(true, false); |
| AtEOXact_ApplyLauncher(true); |
| AtEOXact_WorkFile(); |
| pgstat_report_xact_timestamp(0); |
| |
| CurrentResourceOwner = NULL; |
| ResourceOwnerDelete(TopTransactionResourceOwner); |
| s->curTransactionOwner = NULL; |
| CurTransactionResourceOwner = NULL; |
| TopTransactionResourceOwner = NULL; |
| |
| AtCommit_Memory(); |
| |
| finishDistributedTransactionContext("CommitTransaction", false); |
| |
| if (gp_local_distributed_cache_stats) |
| { |
| LocalDistribXactCache_ShowStats("CommitTransaction"); |
| } |
| |
| s->fullTransactionId = InvalidFullTransactionId; |
| s->subTransactionId = InvalidSubTransactionId; |
| s->nestingLevel = 0; |
| s->gucNestLevel = 0; |
| s->childXids = NULL; |
| s->nChildXids = 0; |
| s->maxChildXids = 0; |
| s->executorSaysXactDoesWrites = false; |
| |
| XactTopFullTransactionId = InvalidFullTransactionId; |
| nParallelCurrentXids = 0; |
| |
| /* |
| * done with commit processing, set current transaction state back to |
| * default |
| */ |
| s->state = TRANS_DEFAULT; |
| |
| /* we're now in a consistent state to handle an interrupt. */ |
| RESUME_INTERRUPTS(); |
| |
| /* Release resource group slot at the end of a transaction */ |
| if (ShouldUnassignResGroup()) |
| UnassignResGroup(); |
| } |
| |
| /* |
| * PrepareTransaction |
| * |
| * NB: if you change this routine, better look at CommitTransaction too! |
| */ |
| static void |
| PrepareTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| TransactionId xid = GetCurrentTransactionId(); |
| GlobalTransaction gxact; |
| TimestampTz prepared_at; |
| |
| Assert(!IsInParallelMode()); |
| |
| ShowTransactionState("PrepareTransaction"); |
| |
| /* |
| * check the current transaction state |
| */ |
| if (s->state != TRANS_INPROGRESS) |
| elog(WARNING, "PrepareTransaction while in %s state", |
| TransStateAsString(s->state)); |
| Assert(s->parent == NULL); |
| |
| /* |
| * Do pre-commit processing that involves calling user-defined code, such |
| * as triggers. Since closing cursors could queue trigger actions, |
| * triggers could open cursors, etc, we have to keep looping until there's |
| * nothing left to do. |
| */ |
| for (;;) |
| { |
| /* |
| * Fire all currently pending deferred triggers. |
| */ |
| AfterTriggerFireDeferred(); |
| |
| /* |
| * Close open portals (converting holdable ones into static portals). |
| * If there weren't any, we are done ... otherwise loop back to check |
| * if they queued deferred triggers. Lather, rinse, repeat. |
| */ |
| if (!PreCommit_Portals(true)) |
| break; |
| } |
| |
| CallXactCallbacks(XACT_EVENT_PRE_PREPARE); |
| |
| /* |
| * The remaining actions cannot call any user-defined code, so it's safe |
| * to start shutting down within-transaction services. But note that most |
| * of this stuff could still throw an error, which would switch us into |
| * the transaction-abort path. |
| */ |
| |
| /* Shut down the deferred-trigger manager */ |
| AfterTriggerEndXact(true); |
| /* Just after clean up triggers */ |
| AtEOXact_IVM(true); |
| |
| /* |
| * Let ON COMMIT management do its thing (must happen after closing |
| * cursors, to avoid dangling-reference problems) |
| */ |
| PreCommit_on_commit_actions(); |
| |
| AtEOXact_DispatchOids(true); |
| |
| /* |
| * Synchronize files that are created and not WAL-logged during this |
| * transaction. This must happen before EndPrepare(), so that we don't see |
| * committed-but-broken files after a crash and COMMIT PREPARED. |
| */ |
| smgrDoPendingSyncs(true, false); |
| |
| /* close large objects before lower-level cleanup */ |
| AtEOXact_LargeObject(true); |
| |
| /* NOTIFY requires no work at this point */ |
| |
| /* |
| * Mark serializable transaction as complete for predicate locking |
| * purposes. This should be done as late as we can put it and still allow |
| * errors to be raised for failure patterns found at commit. |
| */ |
| PreCommit_CheckForSerializationFailure(); |
| |
| /* |
| * In Postgres, XACT_FLAGS_ACCESSEDTEMPNAMESPACE is used to error out if |
| * PREPARE TRANSACTION operated on temp table. |
| * |
| * In GPDB, XACT_FLAGS_ACCESSEDTEMPNAMESPACE is removed. |
| * |
| * GPDB treat temporary table like a regular table, e.g. stored in shared buffer |
| * instead of keep it in local buffer. The temporary table just have a shorter life |
| * cycle either tie to the session or tie to the transaction if ON COMMIT clause is |
| * used. |
| * |
| * Every transaction in GPDB is 2PC, so PREPARE TRANSACTION is used even for temp table |
| * creation. GPDB cannot error out, otherwise, it won't be able to handle temp table |
| * at all. |
| */ |
| #if 0 /* Upstream code not applicable to GPDB */ |
| /* |
| * Don't allow PREPARE TRANSACTION if we've accessed a temporary table in |
| * this transaction. Having the prepared xact hold locks on another |
| * backend's temp table seems a bad idea --- for instance it would prevent |
| * the backend from exiting. There are other problems too, such as how to |
| * clean up the source backend's local buffers and ON COMMIT state if the |
| * prepared xact includes a DROP of a temp table. |
| * |
| * Other objects types, like functions, operators or extensions, share the |
| * same restriction as they should not be created, locked or dropped as |
| * this can mess up with this session or even a follow-up session trying |
| * to use the same temporary namespace. |
| * |
| * We must check this after executing any ON COMMIT actions, because they |
| * might still access a temp relation. |
| * |
| * XXX In principle this could be relaxed to allow some useful special |
| * cases, such as a temp table created and dropped all within the |
| * transaction. That seems to require much more bookkeeping though. |
| */ |
| if ((MyXactFlags & XACT_FLAGS_ACCESSEDTEMPNAMESPACE)) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("cannot PREPARE a transaction that has operated on temporary objects"))); |
| #endif |
| SIMPLE_FAULT_INJECTOR("start_prepare"); |
| |
| /* |
| * Likewise, don't allow PREPARE after pg_export_snapshot. This could be |
| * supported if we added cleanup logic to twophase.c, but for now it |
| * doesn't seem worth the trouble. |
| */ |
| if (XactHasExportedSnapshots()) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("cannot PREPARE a transaction that has exported snapshots"))); |
| |
| /* Prevent cancel/die interrupt while cleaning up */ |
| HOLD_INTERRUPTS(); |
| |
| /* Do some preparaty work on locks, before we change the transaction state. */ |
| PrePrepare_Locks(); |
| |
| /* |
| * set the current transaction state information appropriately during |
| * prepare processing |
| */ |
| s->state = TRANS_PREPARE; |
| |
| prepared_at = GetCurrentTimestamp(); |
| |
| /* Tell bufmgr and smgr to prepare for commit */ |
| BufmgrCommit(); |
| |
| /* |
| * We cannot prepare if the xid is already aborted for some reason. |
| * If we proceed with this unexpected state, we'll be unrecoverable. |
| */ |
| if (TransactionIdDidAbort(xid)) |
| elog(ERROR, "xid %u is already aborted", xid); |
| |
| /* |
| * Reserve the GID for this transaction. This could fail if the requested |
| * GID is invalid or already in use. |
| */ |
| gxact = MarkAsPreparing(xid, &MyProc->localDistribXactData, |
| prepareGID, prepared_at, |
| GetUserId(), MyDatabaseId); |
| prepareGID = NULL; |
| |
| /* |
| * Collect data for the 2PC state file. Note that in general, no actual |
| * state change should happen in the called modules during this step, |
| * since it's still possible to fail before commit, and in that case we |
| * want transaction abort to be able to clean up. (In particular, the |
| * AtPrepare routines may error out if they find cases they cannot |
| * handle.) State cleanup should happen in the PostPrepare routines |
| * below. However, some modules can go ahead and clear state here because |
| * they wouldn't do anything with it during abort anyway. |
| * |
| * Note: because the 2PC state file records will be replayed in the same |
| * order they are made, the order of these calls has to match the order in |
| * which we want things to happen during COMMIT PREPARED or ROLLBACK |
| * PREPARED; in particular, pay attention to whether things should happen |
| * before or after releasing the transaction's locks. |
| */ |
| StartPrepare(gxact); |
| |
| AtPrepare_Notify(); |
| AtPrepare_Locks(); |
| AtPrepare_PredicateLocks(); |
| AtPrepare_PgStat(); |
| AtPrepare_MultiXact(); |
| AtPrepare_RelationMap(); |
| |
| /* |
| * Here is where we really truly prepare. |
| * |
| * We have to record transaction prepares even if we didn't make any |
| * updates, because the transaction manager might get confused if we lose |
| * a global transaction. |
| */ |
| EndPrepare(gxact); |
| |
| /* |
| * Now we clean up backend-internal state and release internal resources. |
| */ |
| |
| /* Reset XactLastRecEnd until the next transaction writes something */ |
| XactLastRecEnd = 0; |
| |
| /* |
| * Transfer our locks to a dummy PGPROC. This has to be done before |
| * ProcArrayClearTransaction(). Otherwise, a GetLockConflicts() would |
| * conclude "xact already committed or aborted" for our locks. |
| */ |
| PostPrepare_Locks(xid); |
| |
| /* |
| * Let others know about no transaction in progress by me. This has to be |
| * done *after* the prepared transaction has been marked valid, else |
| * someone may think it is unlocked and recyclable. |
| */ |
| ProcArrayClearTransaction(MyProc); |
| |
| /* |
| * In normal commit-processing, this is all non-critical post-transaction |
| * cleanup. When the transaction is prepared, however, it's important |
| * that the locks and other per-backend resources are transferred to the |
| * prepared transaction's PGPROC entry. Note that if an error is raised |
| * here, it's too late to abort the transaction. XXX: This probably should |
| * be in a critical section, to force a PANIC if any of this fails, but |
| * that cure could be worse than the disease. |
| */ |
| |
| CallXactCallbacks(XACT_EVENT_PREPARE); |
| CallXactCallbacksOnce(XACT_EVENT_PREPARE); |
| |
| ResourceOwnerRelease(TopTransactionResourceOwner, |
| RESOURCE_RELEASE_BEFORE_LOCKS, |
| true, true); |
| |
| /* detach combocid dsm */ |
| AtEOXact_ComboCid_Dsm_Detach(); |
| /* Check we've released all buffer pins */ |
| AtEOXact_Buffers(true); |
| |
| /* Clean up the relation cache */ |
| AtEOXact_RelationCache(true); |
| |
| /* notify doesn't need a postprepare call */ |
| |
| PostPrepare_PgStat(); |
| |
| PostPrepare_Inval(); |
| |
| PostPrepare_smgr(); |
| |
| PostPrepare_DatabaseStorage(); |
| |
| PostPrepare_MultiXact(xid); |
| |
| PostPrepare_PredicateLocks(xid); |
| |
| ResourceOwnerRelease(TopTransactionResourceOwner, |
| RESOURCE_RELEASE_LOCKS, |
| true, true); |
| ResourceOwnerRelease(TopTransactionResourceOwner, |
| RESOURCE_RELEASE_AFTER_LOCKS, |
| true, true); |
| |
| /* |
| * Allow another backend to finish the transaction. After |
| * PostPrepare_Twophase(), the transaction is completely detached from our |
| * backend. The rest is just non-critical cleanup of backend-local state. |
| */ |
| PostPrepare_Twophase(); |
| |
| /* PREPARE acts the same as COMMIT as far as GUC is concerned */ |
| AtEOXact_GUC(true, 1); |
| AtEOXact_SPI(true); |
| AtEOXact_Enum(); |
| AtEOXact_on_commit_actions(true); |
| AtEOXact_Namespace(true, false); |
| AtEOXact_SMgr(); |
| AtEOXact_Files(true); |
| AtEOXact_ComboCid(); |
| AtEOXact_HashTables(true); |
| /* don't call AtEOXact_PgStat here; we fixed pgstat state above */ |
| AtEOXact_Snapshot(true, true); |
| AtEOXact_WorkFile(); |
| pgstat_report_xact_timestamp(0); |
| |
| CurrentResourceOwner = NULL; |
| ResourceOwnerDelete(TopTransactionResourceOwner); |
| s->curTransactionOwner = NULL; |
| CurTransactionResourceOwner = NULL; |
| TopTransactionResourceOwner = NULL; |
| |
| AtCommit_Memory(); |
| |
| if (gp_local_distributed_cache_stats) |
| { |
| LocalDistribXactCache_ShowStats("PrepareTransaction"); |
| } |
| |
| s->fullTransactionId = InvalidFullTransactionId; |
| s->subTransactionId = InvalidSubTransactionId; |
| s->nestingLevel = 0; |
| s->gucNestLevel = 0; |
| s->childXids = NULL; |
| s->nChildXids = 0; |
| s->maxChildXids = 0; |
| s->executorSaysXactDoesWrites = false; |
| |
| XactTopFullTransactionId = InvalidFullTransactionId; |
| nParallelCurrentXids = 0; |
| |
| /* |
| * done with 1st phase commit processing, set current transaction state |
| * back to default |
| */ |
| s->state = TRANS_DEFAULT; |
| |
| RESUME_INTERRUPTS(); |
| |
| /* Release resource group slot at the end of prepare transaction on segment */ |
| if (ShouldUnassignResGroup()) |
| UnassignResGroup(); |
| } |
| |
| |
| /* |
| * AbortTransaction |
| */ |
| static void |
| AbortTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| TransactionId latestXid; |
| bool is_parallel_worker; |
| |
| SIMPLE_FAULT_INJECTOR("transaction_abort_failure"); |
| |
| /* Prevent cancel/die interrupt while cleaning up */ |
| HOLD_INTERRUPTS(); |
| |
| /* Make sure we have a valid memory context and resource owner */ |
| AtAbort_Memory(); |
| |
| if (Gp_role == GP_ROLE_EXECUTE) |
| ResetExtensionCreatingGlobalVarsOnQE(); |
| |
| AtAbort_ResourceOwner(); |
| |
| /* |
| * Release any LW locks we might be holding as quickly as possible. |
| * (Regular locks, however, must be held till we finish aborting.) |
| * Releasing LW locks is critical since we might try to grab them again |
| * while cleaning up! |
| */ |
| LWLockReleaseAll(); |
| |
| /* Clear wait information and command progress indicator */ |
| pgstat_report_wait_end(); |
| pgstat_progress_end_command(); |
| |
| /* Clean up buffer I/O and buffer context locks, too */ |
| AbortBufferIO(); |
| UnlockBuffers(); |
| |
| /* Reset WAL record construction state */ |
| XLogResetInsertion(); |
| |
| /* Cancel condition variable sleep */ |
| ConditionVariableCancelSleep(); |
| |
| /* |
| * Also clean up any open wait for lock, since the lock manager will choke |
| * if we try to wait for another lock before doing this. |
| */ |
| LockErrorCleanup(); |
| |
| /* |
| * If any timeout events are still active, make sure the timeout interrupt |
| * is scheduled. This covers possible loss of a timeout interrupt due to |
| * longjmp'ing out of the SIGINT handler (see notes in handle_sig_alarm). |
| * We delay this till after LockErrorCleanup so that we don't uselessly |
| * reschedule lock or deadlock check timeouts. |
| */ |
| reschedule_timeouts(); |
| |
| /* |
| * Re-enable signals, in case we got here by longjmp'ing out of a signal |
| * handler. We do this fairly early in the sequence so that the timeout |
| * infrastructure will be functional if needed while aborting. |
| */ |
| PG_SETMASK(&UnBlockSig); |
| |
| /* |
| * check the current transaction state |
| */ |
| is_parallel_worker = (s->blockState == TBLOCK_PARALLEL_INPROGRESS); |
| if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE) |
| elog(DEBUG1, "WARNING: AbortTransaction while in %s state", |
| TransStateAsString(s->state)); |
| Assert(s->parent == NULL); |
| |
| /* |
| * set the current transaction state information appropriately during the |
| * abort processing |
| */ |
| s->state = TRANS_ABORT; |
| |
| system_relation_modified = false; |
| |
| /* |
| * Reset user ID which might have been changed transiently. We need this |
| * to clean up in case control escaped out of a SECURITY DEFINER function |
| * or other local change of CurrentUserId; therefore, the prior value of |
| * SecurityRestrictionContext also needs to be restored. |
| * |
| * (Note: it is not necessary to restore session authorization or role |
| * settings here because those can only be changed via GUC, and GUC will |
| * take care of rolling them back if need be.) |
| */ |
| SetUserIdAndSecContext(s->prevUser, s->prevSecContext); |
| |
| /* Forget about any active REINDEX. */ |
| ResetReindexState(s->nestingLevel); |
| |
| /* Reset logical streaming state. */ |
| ResetLogicalStreamingState(); |
| |
| /* Reset snapshot export state. */ |
| SnapBuildResetExportedSnapshotState(); |
| |
| /* If in parallel mode, clean up workers and exit parallel mode. */ |
| if (IsInParallelMode()) |
| { |
| AtEOXact_Parallel(false); |
| s->parallelModeLevel = 0; |
| } |
| |
| /* Clean up CBDB style parallel workers which we might have. */ |
| AtEOXact_CBDB_Parallel(); |
| |
| /* |
| * do abort processing |
| */ |
| AfterTriggerEndXact(false); /* 'false' means it's abort */ |
| AtAbort_EndpointExecState(); |
| AtAbort_Portals(); |
| AtAbort_DispatcherState(); |
| AtEOXact_SharedSnapshot(); |
| |
| /* Perform any Resource Scheduler abort procesing. */ |
| if ((Gp_role == GP_ROLE_DISPATCH || IS_SINGLENODE()) && IsResQueueEnabled()) |
| AtAbort_ResScheduler(); |
| |
| AtEOXact_DispatchOids(false); |
| |
| smgrDoPendingSyncs(false, is_parallel_worker); |
| AtEOXact_LargeObject(false); |
| AtAbort_Notify(); |
| AtEOXact_RelationMap(false, is_parallel_worker); |
| AtAbort_Twophase(); |
| AtAbort_IVM(); |
| AtAort_ExtendProtocolData(); |
| |
| /* |
| * Advertise the fact that we aborted in pg_xact (assuming that we got as |
| * far as assigning an XID to advertise). But if we're inside a parallel |
| * worker, skip this; the user backend must be the one to write the abort |
| * record. |
| */ |
| if (!is_parallel_worker) |
| latestXid = RecordTransactionAbort(false); |
| else |
| { |
| latestXid = InvalidTransactionId; |
| |
| /* |
| * Since the parallel leader won't get our value of XactLastRecEnd in |
| * this case, we nudge WAL-writer ourselves in this case. See related |
| * comments in RecordTransactionAbort for why this matters. |
| */ |
| XLogSetAsyncXactLSN(XactLastRecEnd); |
| } |
| |
| TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid); |
| |
| /* |
| * Do abort to all QE. NOTE: we don't process |
| * signals to prevent recursion until we've notified the QEs. |
| */ |
| rollbackDtxTransaction(); |
| |
| /* |
| * Let others know about no transaction in progress by me. Note that this |
| * must be done _before_ releasing locks we hold and _after_ |
| * RecordTransactionAbort. |
| */ |
| ProcArrayEndTransaction(MyProc, latestXid); |
| |
| EndLocalDistribXact(false); |
| |
| SIMPLE_FAULT_INJECTOR("abort_after_procarray_end"); |
| /* |
| * Post-abort cleanup. See notes in CommitTransaction() concerning |
| * ordering. We can skip all of it if the transaction failed before |
| * creating a resource owner. |
| */ |
| if (TopTransactionResourceOwner != NULL) |
| { |
| if (is_parallel_worker) |
| CallXactCallbacks(XACT_EVENT_PARALLEL_ABORT); |
| else |
| CallXactCallbacks(XACT_EVENT_ABORT); |
| CallXactCallbacksOnce(XACT_EVENT_ABORT); |
| |
| ResourceOwnerRelease(TopTransactionResourceOwner, |
| RESOURCE_RELEASE_BEFORE_LOCKS, |
| false, true); |
| AtEOXact_ComboCid_Dsm_Detach(); |
| AtEOXact_Buffers(false); |
| AtEOXact_RelationCache(false); |
| /* |
| * Greenplum specific behavior: |
| * We pass is_commit to true even we are here Aborting Transaction. |
| * Greenplum has writer gang and reader gangs, only writer gang can |
| * modify database (like catalog ...), and gang can be reused in |
| * Greenplum in the same session. Thus when we abort a transaction, |
| * we still have to tell other reader gangs to abort those catcaches. |
| * EntryDB is reader gang in coordinator, we also want to tell them |
| * to invalidate catcache when QD abort. |
| * Discussion: https://groups.google.com/a/greenplum.org/g/gpdb-dev/c/u3-D7isdvmM |
| */ |
| bool need_inval_even_for_abort = ((Gp_role == GP_ROLE_EXECUTE && Gp_is_writer) || |
| Gp_role == GP_ROLE_DISPATCH); /* test QD to invalidate entryDB's catcache */ |
| AtEOXact_Inval(need_inval_even_for_abort); |
| AtEOXact_MultiXact(); |
| |
| ResourceOwnerRelease(TopTransactionResourceOwner, |
| RESOURCE_RELEASE_LOCKS, |
| false, true); |
| ResourceOwnerRelease(TopTransactionResourceOwner, |
| RESOURCE_RELEASE_AFTER_LOCKS, |
| false, true); |
| smgrDoPendingDeletes(false); |
| |
| DoPendingDbDeletes(false); |
| DatabaseStorageResetSessionLock(); |
| |
| AtAbort_TablespaceStorage(); |
| gp_guc_need_restore = true; |
| AtEOXact_GUC(false, 1); |
| gp_guc_need_restore = false; |
| AtEOXact_SPI(false); |
| AtEOXact_Enum(); |
| AtEOXact_on_commit_actions(false); |
| AtEOXact_Namespace(false, is_parallel_worker); |
| AtEOXact_SMgr(); |
| AtEOXact_Files(false); |
| AtEOXact_ComboCid(); |
| AtEOXact_HashTables(false); |
| AtEOXact_PgStat(false, is_parallel_worker); |
| AtEOXact_ApplyLauncher(false); |
| AtEOXact_WorkFile(); |
| pgstat_report_xact_timestamp(0); |
| } |
| |
| /* |
| * Exported snapshots must be cleared before transaction ID is reset. In |
| * GPDB, transaction ID is reset below. In PostgreSQL, because 2PC is not |
| * needed, exported snapshots are cleared and transaction ID is reset |
| * later in CleanupTransaction(). We must perform both the actions here. |
| */ |
| AtEOXact_Snapshot(false, true); /* and release the transaction's snapshots */ |
| |
| /* |
| * If something goes wrong after this, we might recurse back to |
| * AbortTransaction(). To avoid creating another Abort WAL record |
| * and failing assertion in ProcArrayEndTransaction because MyProc->xid |
| * has already been cleared, clear out transactionId now. The rest |
| * of the fields in TransactionState will be cleared later, in |
| * CleanupTransaction(). |
| */ |
| TopTransactionStateData.fullTransactionId = InvalidFullTransactionId; |
| MyProc->localDistribXactData.state = LOCALDISTRIBXACT_STATE_NONE; |
| |
| /* |
| * State remains TRANS_ABORT until CleanupTransaction(). |
| */ |
| RESUME_INTERRUPTS(); |
| |
| /* If a query was cancelled, then cleanup reader gangs. */ |
| if (QueryCancelCleanup) |
| { |
| QueryCancelCleanup = false; |
| cdbcomponent_cleanupIdleQEs(false); |
| } |
| |
| /* |
| * If memprot decides to kill process, make sure we destroy all processes |
| * so that all mem/resource will be freed |
| */ |
| if (elog_geterrcode() == ERRCODE_GP_MEMPROT_KILL) |
| DisconnectAndDestroyAllGangs(true); |
| |
| /* Release resource group slot at the end of a transaction */ |
| if (ShouldUnassignResGroup()) |
| UnassignResGroup(); |
| } |
| |
| /* |
| * CleanupTransaction |
| */ |
| static void |
| CleanupTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| /* |
| * State should still be TRANS_ABORT from AbortTransaction(). |
| */ |
| if (s->state != TRANS_ABORT) |
| elog(FATAL, "CleanupTransaction: unexpected state %s", |
| TransStateAsString(s->state)); |
| |
| /* |
| * do abort cleanup processing |
| */ |
| AtCleanup_Portals(); /* now safe to release portal memory */ |
| AtEOXact_Snapshot(false, true); /* and release the transaction's snapshots */ |
| |
| CurrentResourceOwner = NULL; /* and resource owner */ |
| if (TopTransactionResourceOwner) |
| ResourceOwnerDelete(TopTransactionResourceOwner); |
| s->curTransactionOwner = NULL; |
| CurTransactionResourceOwner = NULL; |
| TopTransactionResourceOwner = NULL; |
| |
| AtCleanup_Memory(); /* and transaction memory */ |
| |
| s->fullTransactionId = InvalidFullTransactionId; |
| s->subTransactionId = InvalidSubTransactionId; |
| s->nestingLevel = 0; |
| s->gucNestLevel = 0; |
| s->childXids = NULL; |
| s->nChildXids = 0; |
| s->maxChildXids = 0; |
| s->parallelModeLevel = 0; |
| s->executorSaysXactDoesWrites = false; |
| |
| XactTopFullTransactionId = InvalidFullTransactionId; |
| nParallelCurrentXids = 0; |
| |
| /* |
| * done with abort processing, set current transaction state back to |
| * default |
| */ |
| s->state = TRANS_DEFAULT; |
| |
| finishDistributedTransactionContext("CleanupTransaction", true); |
| |
| /* Release resource group slot at the end of a transaction */ |
| if (ShouldUnassignResGroup()) |
| UnassignResGroup(); |
| } |
| |
| /* |
| * StartTransactionCommand |
| */ |
| void |
| StartTransactionCommand(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| switch (s->blockState) |
| { |
| /* |
| * if we aren't in a transaction block, we just do our usual start |
| * transaction. |
| */ |
| case TBLOCK_DEFAULT: |
| StartTransaction(); |
| |
| if (DistributedTransactionContext == DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER) |
| { |
| /* |
| * Pretend we executed an explicit BEGIN. |
| */ |
| s->blockState = TBLOCK_INPROGRESS; |
| } |
| else |
| { |
| /* |
| * Normal case. |
| */ |
| s->blockState = TBLOCK_STARTED; |
| } |
| break; |
| |
| /* |
| * We are somewhere in a transaction block or subtransaction and |
| * about to start a new command. For now we do nothing, but |
| * someday we may do command-local resource initialization. (Note |
| * that any needed CommandCounterIncrement was done by the |
| * previous CommitTransactionCommand.) |
| */ |
| case TBLOCK_INPROGRESS: |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| case TBLOCK_SUBINPROGRESS: |
| /* |
| * There may be reader gangs waiting for us to update the |
| * QDSentXID -- make sure the state of the sharedsnapshot |
| * slot properly tracks the qd-xid |
| */ |
| if (Gp_role == GP_ROLE_EXECUTE && Gp_is_writer && SharedLocalSnapshotSlot != NULL) |
| { |
| LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_EXCLUSIVE); |
| |
| FullTransactionId oldFullXid = SharedLocalSnapshotSlot->fullXid; |
| TimestampTz oldStartTimestamp = SharedLocalSnapshotSlot->startTimestamp; |
| |
| /* |
| * MPP-3228: For a subtransaction, the transactionId |
| * may not have been assigned, we can't change the |
| * shared copy to InvalidTransactionId (the unassigned |
| * value) since the reader may *need* it). |
| */ |
| if (FullTransactionIdIsValid(s->fullTransactionId)) |
| { |
| SharedLocalSnapshotSlot->fullXid = s->fullTransactionId; |
| } |
| |
| SharedLocalSnapshotSlot->startTimestamp = xactStartTimestamp; |
| SharedLocalSnapshotSlot->distributedXid = QEDtxContextInfo.distributedXid; |
| |
| LWLockRelease(SharedLocalSnapshotSlot->slotLock); |
| |
| ereportif(Debug_print_full_dtm, LOG, |
| (errmsg("qExec WRITER updating shared xid: " UINT64_FORMAT " -> " UINT64_FORMAT " " |
| "(StartTransactionCommand) timestamp: " |
| INT64_FORMAT " -> " INT64_FORMAT ")", |
| U64FromFullTransactionId(oldFullXid), |
| U64FromFullTransactionId(s->fullTransactionId), |
| oldStartTimestamp, xactStartTimestamp))); |
| } |
| break; |
| |
| /* |
| * Here we are in a failed transaction block (one of the commands |
| * caused an abort) so we do nothing but remain in the abort |
| * state. Eventually we will get a ROLLBACK command which will |
| * get us out of this state. (It is up to other code to ensure |
| * that no commands other than ROLLBACK will be processed in these |
| * states.) |
| */ |
| case TBLOCK_ABORT: |
| case TBLOCK_SUBABORT: |
| break; |
| |
| /* These cases are invalid. */ |
| case TBLOCK_STARTED: |
| case TBLOCK_BEGIN: |
| case TBLOCK_PARALLEL_INPROGRESS: |
| case TBLOCK_SUBBEGIN: |
| case TBLOCK_END: |
| case TBLOCK_SUBRELEASE: |
| case TBLOCK_SUBCOMMIT: |
| case TBLOCK_ABORT_END: |
| case TBLOCK_SUBABORT_END: |
| case TBLOCK_ABORT_PENDING: |
| case TBLOCK_SUBABORT_PENDING: |
| case TBLOCK_SUBRESTART: |
| case TBLOCK_SUBABORT_RESTART: |
| case TBLOCK_PREPARE: |
| elog(ERROR, "StartTransactionCommand: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| break; |
| } |
| |
| /* |
| * We must switch to CurTransactionContext before returning. This is |
| * already done if we called StartTransaction, otherwise not. |
| */ |
| Assert(CurTransactionContext != NULL); |
| MemoryContextSwitchTo(CurTransactionContext); |
| } |
| |
| |
| /* |
| * Simple system for saving and restoring transaction characteristics |
| * (isolation level, read only, deferrable). We need this for transaction |
| * chaining, so that we can set the characteristics of the new transaction to |
| * be the same as the previous one. (We need something like this because the |
| * GUC system resets the characteristics at transaction end, so for example |
| * just skipping the reset in StartTransaction() won't work.) |
| */ |
| static int save_XactIsoLevel; |
| static bool save_XactReadOnly; |
| static bool save_XactDeferrable; |
| |
| void |
| SaveTransactionCharacteristics(void) |
| { |
| save_XactIsoLevel = XactIsoLevel; |
| save_XactReadOnly = XactReadOnly; |
| save_XactDeferrable = XactDeferrable; |
| } |
| |
| void |
| RestoreTransactionCharacteristics(void) |
| { |
| XactIsoLevel = save_XactIsoLevel; |
| XactReadOnly = save_XactReadOnly; |
| XactDeferrable = save_XactDeferrable; |
| } |
| |
| |
| /* |
| * CommitTransactionCommand |
| */ |
| void |
| CommitTransactionCommand(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| if (Gp_role == GP_ROLE_EXECUTE && !Gp_is_writer) |
| elog(DEBUG1,"CommitTransactionCommand: called as segment Reader in state %s", |
| BlockStateAsString(s->blockState)); |
| |
| if (s->chain) |
| SaveTransactionCharacteristics(); |
| |
| switch (s->blockState) |
| { |
| /* |
| * These shouldn't happen. TBLOCK_DEFAULT means the previous |
| * StartTransactionCommand didn't set the STARTED state |
| * appropriately, while TBLOCK_PARALLEL_INPROGRESS should be ended |
| * by EndParallelWorkerTransaction(), not this function. |
| */ |
| case TBLOCK_DEFAULT: |
| case TBLOCK_PARALLEL_INPROGRESS: |
| elog(FATAL, "CommitTransactionCommand: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| break; |
| |
| /* |
| * If we aren't in a transaction block, just do our usual |
| * transaction commit, and return to the idle state. |
| */ |
| case TBLOCK_STARTED: |
| CommitTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| break; |
| |
| /* |
| * We are completing a "BEGIN TRANSACTION" command, so we change |
| * to the "transaction block in progress" state and return. (We |
| * assume the BEGIN did nothing to the database, so we need no |
| * CommandCounterIncrement.) |
| */ |
| case TBLOCK_BEGIN: |
| s->blockState = TBLOCK_INPROGRESS; |
| break; |
| |
| /* |
| * This is the case when we have finished executing a command |
| * someplace within a transaction block. We increment the command |
| * counter and return. |
| */ |
| case TBLOCK_INPROGRESS: |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| case TBLOCK_SUBINPROGRESS: |
| CommandCounterIncrement(); |
| break; |
| |
| /* |
| * We are completing a "COMMIT" command. Do it and return to the |
| * idle state. |
| */ |
| case TBLOCK_END: |
| CommitTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| if (s->chain) |
| { |
| if (Gp_role == GP_ROLE_DISPATCH) |
| setupRegularDtxContext(); |
| |
| StartTransaction(); |
| s->blockState = TBLOCK_INPROGRESS; |
| s->chain = false; |
| RestoreTransactionCharacteristics(); |
| } |
| break; |
| |
| /* |
| * Here we are in the middle of a transaction block but one of the |
| * commands caused an abort so we do nothing but remain in the |
| * abort state. Eventually we will get a ROLLBACK command. |
| */ |
| case TBLOCK_ABORT: |
| case TBLOCK_SUBABORT: |
| break; |
| |
| /* |
| * Here we were in an aborted transaction block and we just got |
| * the ROLLBACK command from the user, so clean up the |
| * already-aborted transaction and return to the idle state. |
| */ |
| case TBLOCK_ABORT_END: |
| CleanupTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| if (s->chain) |
| { |
| if (Gp_role == GP_ROLE_DISPATCH) |
| setupRegularDtxContext(); |
| |
| StartTransaction(); |
| s->blockState = TBLOCK_INPROGRESS; |
| s->chain = false; |
| RestoreTransactionCharacteristics(); |
| } |
| break; |
| |
| /* |
| * Here we were in a perfectly good transaction block but the user |
| * told us to ROLLBACK anyway. We have to abort the transaction |
| * and then clean up. |
| */ |
| case TBLOCK_ABORT_PENDING: |
| AbortTransaction(); |
| CleanupTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| if (s->chain) |
| { |
| if (Gp_role == GP_ROLE_DISPATCH) |
| setupRegularDtxContext(); |
| |
| StartTransaction(); |
| s->blockState = TBLOCK_INPROGRESS; |
| s->chain = false; |
| RestoreTransactionCharacteristics(); |
| } |
| break; |
| |
| /* |
| * We are completing a "PREPARE TRANSACTION" command. Do it and |
| * return to the idle state. |
| */ |
| case TBLOCK_PREPARE: |
| PrepareTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| break; |
| |
| /* |
| * We were just issued a SAVEPOINT inside a transaction block. |
| * Start a subtransaction. (DefineSavepoint already did |
| * PushTransaction, so as to have someplace to put the SUBBEGIN |
| * state.) |
| */ |
| case TBLOCK_SUBBEGIN: |
| StartSubTransaction(); |
| s->blockState = TBLOCK_SUBINPROGRESS; |
| break; |
| |
| /* |
| * We were issued a RELEASE command, so we end the current |
| * subtransaction and return to the parent transaction. The parent |
| * might be ended too, so repeat till we find an INPROGRESS |
| * transaction or subtransaction. |
| */ |
| case TBLOCK_SUBRELEASE: |
| do |
| { |
| CommitSubTransaction(); |
| s = CurrentTransactionState; /* changed by pop */ |
| } while (s->blockState == TBLOCK_SUBRELEASE); |
| |
| Assert(s->blockState == TBLOCK_INPROGRESS || |
| s->blockState == TBLOCK_SUBINPROGRESS); |
| break; |
| |
| /* |
| * We were issued a COMMIT, so we end the current subtransaction |
| * hierarchy and perform final commit. We do this by rolling up |
| * any subtransactions into their parent, which leads to O(N^2) |
| * operations with respect to resource owners - this isn't that |
| * bad until we approach a thousands of savepoints but is |
| * necessary for correctness should after triggers create new |
| * resource owners. |
| */ |
| case TBLOCK_SUBCOMMIT: |
| do |
| { |
| CommitSubTransaction(); |
| s = CurrentTransactionState; /* changed by pop */ |
| } while (s->blockState == TBLOCK_SUBCOMMIT); |
| /* If we had a COMMIT command, finish off the main xact too */ |
| if (s->blockState == TBLOCK_END) |
| { |
| Assert(s->parent == NULL); |
| CommitTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| if (s->chain) |
| { |
| if (Gp_role == GP_ROLE_DISPATCH) |
| setupRegularDtxContext(); |
| |
| StartTransaction(); |
| s->blockState = TBLOCK_INPROGRESS; |
| s->chain = false; |
| RestoreTransactionCharacteristics(); |
| } |
| } |
| else if (s->blockState == TBLOCK_PREPARE) |
| { |
| Assert(s->parent == NULL); |
| PrepareTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| } |
| else |
| elog(ERROR, "CommitTransactionCommand: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| break; |
| |
| /* |
| * The current already-failed subtransaction is ending due to a |
| * ROLLBACK or ROLLBACK TO command, so pop it and recursively |
| * examine the parent (which could be in any of several states). |
| */ |
| case TBLOCK_SUBABORT_END: |
| CleanupSubTransaction(); |
| CommitTransactionCommand(); |
| break; |
| |
| /* |
| * As above, but it's not dead yet, so abort first. |
| */ |
| case TBLOCK_SUBABORT_PENDING: |
| AbortSubTransaction(); |
| CleanupSubTransaction(); |
| CommitTransactionCommand(); |
| break; |
| |
| /* |
| * The current subtransaction is the target of a ROLLBACK TO |
| * command. Abort and pop it, then start a new subtransaction |
| * with the same name. |
| */ |
| case TBLOCK_SUBRESTART: |
| { |
| char *name; |
| int savepointLevel; |
| |
| /* save name and keep Cleanup from freeing it */ |
| name = s->name; |
| s->name = NULL; |
| savepointLevel = s->savepointLevel; |
| |
| AbortSubTransaction(); |
| CleanupSubTransaction(); |
| |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| DispatchRollbackToSavepoint(name); |
| } |
| |
| DefineSavepoint(name); |
| s = CurrentTransactionState; /* changed by push */ |
| if (name) |
| { |
| pfree(name); |
| } |
| s->savepointLevel = savepointLevel; |
| |
| /* This is the same as TBLOCK_SUBBEGIN case */ |
| AssertState(s->blockState == TBLOCK_SUBBEGIN); |
| StartSubTransaction(); |
| s->blockState = TBLOCK_SUBINPROGRESS; |
| } |
| break; |
| |
| /* |
| * Same as above, but the subtransaction had already failed, so we |
| * don't need AbortSubTransaction. |
| */ |
| case TBLOCK_SUBABORT_RESTART: |
| { |
| char *name; |
| int savepointLevel; |
| |
| /* save name and keep Cleanup from freeing it */ |
| name = s->name; |
| s->name = NULL; |
| savepointLevel = s->savepointLevel; |
| |
| CleanupSubTransaction(); |
| |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| DispatchRollbackToSavepoint(name); |
| } |
| |
| DefineSavepoint(name); |
| s = CurrentTransactionState; /* changed by push */ |
| s->name = name; |
| s->savepointLevel = savepointLevel; |
| |
| /* This is the same as TBLOCK_SUBBEGIN case */ |
| AssertState(s->blockState == TBLOCK_SUBBEGIN); |
| StartSubTransaction(); |
| s->blockState = TBLOCK_SUBINPROGRESS; |
| } |
| break; |
| } |
| } |
| |
| /* |
| * AbortCurrentTransaction |
| */ |
| void |
| AbortCurrentTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| elog(DEBUG5, "AbortCurrentTransaction for " UINT64_FORMAT " in state: %d", |
| U64FromFullTransactionId(s->fullTransactionId), |
| s->blockState); |
| |
| switch (s->blockState) |
| { |
| case TBLOCK_DEFAULT: |
| if (s->state == TRANS_DEFAULT) |
| { |
| /* we are idle, so nothing to do */ |
| } |
| else |
| { |
| /* |
| * We can get here after an error during transaction start |
| * (state will be TRANS_START). Need to clean up the |
| * incompletely started transaction. First, adjust the |
| * low-level state to suppress warning message from |
| * AbortTransaction. |
| */ |
| if (s->state == TRANS_START) |
| s->state = TRANS_INPROGRESS; |
| AbortTransaction(); |
| CleanupTransaction(); |
| } |
| break; |
| |
| /* |
| * If we aren't in a transaction block, we just do the basic abort |
| * & cleanup transaction. For this purpose, we treat an implicit |
| * transaction block as if it were a simple statement. |
| */ |
| case TBLOCK_STARTED: |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| AbortTransaction(); |
| CleanupTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| break; |
| |
| /* |
| * If we are in TBLOCK_BEGIN it means something screwed up right |
| * after reading "BEGIN TRANSACTION". We assume that the user |
| * will interpret the error as meaning the BEGIN failed to get him |
| * into a transaction block, so we should abort and return to idle |
| * state. |
| */ |
| case TBLOCK_BEGIN: |
| AbortTransaction(); |
| CleanupTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| break; |
| |
| /* |
| * We are somewhere in a transaction block and we've gotten a |
| * failure, so we abort the transaction and set up the persistent |
| * ABORT state. We will stay in ABORT until we get a ROLLBACK. |
| */ |
| case TBLOCK_INPROGRESS: |
| case TBLOCK_PARALLEL_INPROGRESS: |
| AbortTransaction(); |
| s->blockState = TBLOCK_ABORT; |
| /* CleanupTransaction happens when we exit TBLOCK_ABORT_END */ |
| break; |
| |
| /* |
| * Here, we failed while trying to COMMIT. Clean up the |
| * transaction and return to idle state (we do not want to stay in |
| * the transaction). |
| */ |
| case TBLOCK_END: |
| AbortTransaction(); |
| CleanupTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| break; |
| |
| /* |
| * Here, we are already in an aborted transaction state and are |
| * waiting for a ROLLBACK, but for some reason we failed again! So |
| * we just remain in the abort state. |
| */ |
| case TBLOCK_ABORT: |
| case TBLOCK_SUBABORT: |
| break; |
| |
| /* |
| * We are in a failed transaction and we got the ROLLBACK command. |
| * We have already aborted, we just need to cleanup and go to idle |
| * state. |
| */ |
| case TBLOCK_ABORT_END: |
| CleanupTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| |
| Assert(DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY); |
| break; |
| |
| /* |
| * We are in a live transaction and we got a ROLLBACK command. |
| * Abort, cleanup, go to idle state. |
| */ |
| case TBLOCK_ABORT_PENDING: |
| AbortTransaction(); |
| CleanupTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| |
| Assert(DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY); |
| break; |
| |
| /* |
| * Here, we failed while trying to PREPARE. Clean up the |
| * transaction and return to idle state (we do not want to stay in |
| * the transaction). |
| */ |
| case TBLOCK_PREPARE: |
| AbortTransaction(); |
| CleanupTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| |
| Assert(DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY); |
| break; |
| |
| /* |
| * We got an error inside a subtransaction. Abort just the |
| * subtransaction, and go to the persistent SUBABORT state until |
| * we get ROLLBACK. |
| */ |
| case TBLOCK_SUBINPROGRESS: |
| AbortSubTransaction(); |
| s->blockState = TBLOCK_SUBABORT; |
| break; |
| |
| /* |
| * If we failed while trying to create a subtransaction, clean up |
| * the broken subtransaction and abort the parent. The same |
| * applies if we get a failure while ending a subtransaction. |
| */ |
| case TBLOCK_SUBBEGIN: |
| case TBLOCK_SUBRELEASE: |
| case TBLOCK_SUBCOMMIT: |
| case TBLOCK_SUBABORT_PENDING: |
| case TBLOCK_SUBRESTART: |
| AbortSubTransaction(); |
| CleanupSubTransaction(); |
| AbortCurrentTransaction(); |
| break; |
| |
| /* |
| * Same as above, except the Abort() was already done. |
| */ |
| case TBLOCK_SUBABORT_END: |
| case TBLOCK_SUBABORT_RESTART: |
| CleanupSubTransaction(); |
| AbortCurrentTransaction(); |
| break; |
| } |
| } |
| |
| /* |
| * PreventInTransactionBlock |
| * |
| * This routine is to be called by statements that must not run inside |
| * a transaction block, typically because they have non-rollback-able |
| * side effects or do internal commits. |
| * |
| * If we have already started a transaction block, issue an error; also issue |
| * an error if we appear to be running inside a user-defined function (which |
| * could issue more commands and possibly cause a failure after the statement |
| * completes). Subtransactions are verboten too. |
| * |
| * We must also set XACT_FLAGS_NEEDIMMEDIATECOMMIT in MyXactFlags, to ensure |
| * that postgres.c follows through by committing after the statement is done. |
| * |
| * isTopLevel: passed down from ProcessUtility to determine whether we are |
| * inside a function. (We will always fail if this is false, but it's |
| * convenient to centralize the check here instead of making callers do it.) |
| * stmtType: statement type name, for error messages. |
| */ |
| void |
| PreventInTransactionBlock(bool isTopLevel, const char *stmtType) |
| { |
| /* |
| * xact block already started? |
| */ |
| if (IsTransactionBlock()) |
| ereport(ERROR, |
| (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), |
| /* translator: %s represents an SQL statement name */ |
| errmsg("%s cannot run inside a transaction block", |
| stmtType))); |
| |
| /* |
| * subtransaction? |
| */ |
| if (IsSubTransaction()) |
| ereport(ERROR, |
| (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), |
| /* translator: %s represents an SQL statement name */ |
| errmsg("%s cannot run inside a subtransaction", |
| stmtType))); |
| |
| /* |
| * inside a function call? |
| */ |
| if (!isTopLevel) |
| ereport(ERROR, |
| (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), |
| /* translator: %s represents an SQL statement name */ |
| errmsg("%s cannot be executed from a function", stmtType))); |
| |
| /* If we got past IsTransactionBlock test, should be in default state */ |
| if (CurrentTransactionState->blockState != TBLOCK_DEFAULT && |
| CurrentTransactionState->blockState != TBLOCK_STARTED) |
| elog(FATAL, "cannot prevent transaction chain"); |
| |
| /* All okay. Set the flag to make sure the right thing happens later. */ |
| MyXactFlags |= XACT_FLAGS_NEEDIMMEDIATECOMMIT; |
| } |
| |
| /* |
| * WarnNoTransactionBlock |
| * RequireTransactionBlock |
| * |
| * These two functions allow for warnings or errors if a command is executed |
| * outside of a transaction block. This is useful for commands that have no |
| * effects that persist past transaction end (and so calling them outside a |
| * transaction block is presumably an error). DECLARE CURSOR is an example. |
| * While top-level transaction control commands (BEGIN/COMMIT/ABORT) and SET |
| * that have no effect issue warnings, all other no-effect commands generate |
| * errors. |
| * |
| * If we appear to be running inside a user-defined function, we do not |
| * issue anything, since the function could issue more commands that make |
| * use of the current statement's results. Likewise subtransactions. |
| * Thus these are inverses for PreventInTransactionBlock. |
| * |
| * isTopLevel: passed down from ProcessUtility to determine whether we are |
| * inside a function. |
| * stmtType: statement type name, for warning or error messages. |
| */ |
| void |
| WarnNoTransactionBlock(bool isTopLevel, const char *stmtType) |
| { |
| CheckTransactionBlock(isTopLevel, false, stmtType); |
| } |
| |
| void |
| RequireTransactionBlock(bool isTopLevel, const char *stmtType) |
| { |
| CheckTransactionBlock(isTopLevel, true, stmtType); |
| } |
| |
| /* |
| * This is the implementation of the above two. |
| */ |
| static void |
| CheckTransactionBlock(bool isTopLevel, bool throwError, const char *stmtType) |
| { |
| /* |
| * xact block already started? |
| */ |
| if (IsTransactionBlock()) |
| return; |
| |
| /* |
| * subtransaction? |
| */ |
| if (IsSubTransaction()) |
| return; |
| |
| /* |
| * inside a function call? |
| */ |
| if (!isTopLevel) |
| return; |
| |
| ereport(throwError ? ERROR : WARNING, |
| (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), |
| /* translator: %s represents an SQL statement name */ |
| errmsg("%s can only be used in transaction blocks", |
| stmtType))); |
| } |
| |
| /* |
| * IsInTransactionBlock |
| * |
| * This routine is for statements that need to behave differently inside |
| * a transaction block than when running as single commands. ANALYZE is |
| * currently the only example. |
| * |
| * isTopLevel: passed down from ProcessUtility to determine whether we are |
| * inside a function. |
| */ |
| bool |
| IsInTransactionBlock(bool isTopLevel) |
| { |
| /* |
| * Return true on same conditions that would make |
| * PreventInTransactionBlock error out |
| */ |
| if (IsTransactionBlock()) |
| return true; |
| |
| if (IsSubTransaction()) |
| return true; |
| |
| if (!isTopLevel) |
| return true; |
| |
| if (CurrentTransactionState->blockState != TBLOCK_DEFAULT && |
| CurrentTransactionState->blockState != TBLOCK_STARTED) |
| return true; |
| |
| /* |
| * If we tell the caller we're not in a transaction block, then inform |
| * postgres.c that it had better commit when the statement is done. |
| * Otherwise our report could be a lie. |
| */ |
| MyXactFlags |= XACT_FLAGS_NEEDIMMEDIATECOMMIT; |
| |
| return false; |
| } |
| |
| |
| /* |
| * Register or deregister callback functions for start- and end-of-xact |
| * operations. |
| * |
| * These functions are intended for use by dynamically loaded modules. |
| * For built-in modules we generally just hardwire the appropriate calls |
| * (mainly because it's easier to control the order that way, where needed). |
| * |
| * At transaction end, the callback occurs post-commit or post-abort, so the |
| * callback functions can only do noncritical cleanup. |
| */ |
| void |
| RegisterXactCallback(XactCallback callback, void *arg) |
| { |
| XactCallbackItem *item; |
| |
| item = (XactCallbackItem *) |
| MemoryContextAlloc(TopMemoryContext, sizeof(XactCallbackItem)); |
| item->callback = callback; |
| item->arg = arg; |
| item->next = Xact_callbacks; |
| Xact_callbacks = item; |
| } |
| |
| void |
| UnregisterXactCallback(XactCallback callback, void *arg) |
| { |
| XactCallbackItem *item; |
| XactCallbackItem *prev; |
| |
| prev = NULL; |
| for (item = Xact_callbacks; item; prev = item, item = item->next) |
| { |
| if (item->callback == callback && item->arg == arg) |
| { |
| if (prev) |
| prev->next = item->next; |
| else |
| Xact_callbacks = item->next; |
| pfree(item); |
| break; |
| } |
| } |
| } |
| |
| static void |
| CallXactCallbacks(XactEvent event) |
| { |
| XactCallbackItem *item; |
| |
| for (item = Xact_callbacks; item; item = item->next) |
| item->callback(event, item->arg); |
| |
| } |
| |
| /* Register or deregister callback functions for start/end Xact. Call only once. */ |
| void |
| RegisterXactCallbackOnce(XactCallback callback, void *arg) |
| { |
| XactCallbackItem *item; |
| |
| item = (XactCallbackItem *) |
| MemoryContextAlloc(TopMemoryContext, sizeof(XactCallbackItem)); |
| item->callback = callback; |
| item->arg = arg; |
| item->next = Xact_callbacks_once; |
| Xact_callbacks_once = item; |
| } |
| |
| void |
| UnregisterXactCallbackOnce(XactCallback callback, void *arg) |
| { |
| XactCallbackItem *item; |
| XactCallbackItem *prev; |
| |
| prev = NULL; |
| for (item = Xact_callbacks_once; item; prev = item, item = item->next) |
| { |
| if (item->callback == callback && item->arg == arg) |
| { |
| if (prev) |
| prev->next = item->next; |
| else |
| Xact_callbacks_once = item->next; |
| pfree(item); |
| break; |
| } |
| } |
| } |
| |
| static void |
| CallXactCallbacksOnce(XactEvent event) |
| { |
| /* currently callback once should ignore prepare. */ |
| if (event == XACT_EVENT_PREPARE) |
| return; |
| |
| while(Xact_callbacks_once) |
| { |
| XactCallbackItem *next = Xact_callbacks_once->next; |
| XactCallback callback=Xact_callbacks_once->callback; |
| void*arg=Xact_callbacks_once->arg; |
| pfree(Xact_callbacks_once); |
| Xact_callbacks_once = next; |
| callback(event,arg); |
| } |
| } |
| |
| /* |
| * Register or deregister callback functions for start- and end-of-subxact |
| * operations. |
| * |
| * Pretty much same as above, but for subtransaction events. |
| * |
| * At subtransaction end, the callback occurs post-subcommit or post-subabort, |
| * so the callback functions can only do noncritical cleanup. At |
| * subtransaction start, the callback is called when the subtransaction has |
| * finished initializing. |
| */ |
| void |
| RegisterSubXactCallback(SubXactCallback callback, void *arg) |
| { |
| SubXactCallbackItem *item; |
| |
| item = (SubXactCallbackItem *) |
| MemoryContextAlloc(TopMemoryContext, sizeof(SubXactCallbackItem)); |
| item->callback = callback; |
| item->arg = arg; |
| item->next = SubXact_callbacks; |
| SubXact_callbacks = item; |
| } |
| |
| void |
| UnregisterSubXactCallback(SubXactCallback callback, void *arg) |
| { |
| SubXactCallbackItem *item; |
| SubXactCallbackItem *prev; |
| |
| prev = NULL; |
| for (item = SubXact_callbacks; item; prev = item, item = item->next) |
| { |
| if (item->callback == callback && item->arg == arg) |
| { |
| if (prev) |
| prev->next = item->next; |
| else |
| SubXact_callbacks = item->next; |
| pfree(item); |
| break; |
| } |
| } |
| } |
| |
| static void |
| CallSubXactCallbacks(SubXactEvent event, |
| SubTransactionId mySubid, |
| SubTransactionId parentSubid) |
| { |
| SubXactCallbackItem *item; |
| |
| for (item = SubXact_callbacks; item; item = item->next) |
| item->callback(event, mySubid, parentSubid, item->arg); |
| } |
| |
| |
| /* ---------------------------------------------------------------- |
| * transaction block support |
| * ---------------------------------------------------------------- |
| */ |
| |
| /* |
| * BeginTransactionBlock |
| * This executes a BEGIN command. |
| */ |
| void |
| BeginTransactionBlock(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| switch (s->blockState) |
| { |
| /* |
| * We are not inside a transaction block, so allow one to begin. |
| */ |
| case TBLOCK_STARTED: |
| s->blockState = TBLOCK_BEGIN; |
| break; |
| |
| /* |
| * BEGIN converts an implicit transaction block to a regular one. |
| * (Note that we allow this even if we've already done some |
| * commands, which is a bit odd but matches historical practice.) |
| */ |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| s->blockState = TBLOCK_BEGIN; |
| break; |
| |
| /* |
| * Already a transaction block in progress. |
| */ |
| case TBLOCK_INPROGRESS: |
| case TBLOCK_PARALLEL_INPROGRESS: |
| case TBLOCK_SUBINPROGRESS: |
| case TBLOCK_ABORT: |
| case TBLOCK_SUBABORT: |
| ereport(Gp_role == GP_ROLE_EXECUTE ? DEBUG1 : WARNING, |
| (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), |
| errmsg("there is already a transaction in progress"))); |
| break; |
| |
| /* These cases are invalid. */ |
| case TBLOCK_DEFAULT: |
| case TBLOCK_BEGIN: |
| case TBLOCK_SUBBEGIN: |
| case TBLOCK_END: |
| case TBLOCK_SUBRELEASE: |
| case TBLOCK_SUBCOMMIT: |
| case TBLOCK_ABORT_END: |
| case TBLOCK_SUBABORT_END: |
| case TBLOCK_ABORT_PENDING: |
| case TBLOCK_SUBABORT_PENDING: |
| case TBLOCK_SUBRESTART: |
| case TBLOCK_SUBABORT_RESTART: |
| case TBLOCK_PREPARE: |
| elog(FATAL, "BeginTransactionBlock: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| break; |
| } |
| } |
| |
| /* |
| * PrepareTransactionBlock |
| * This executes a PREPARE command. |
| * |
| * Since PREPARE may actually do a ROLLBACK, the result indicates what |
| * happened: true for PREPARE, false for ROLLBACK. |
| * |
| * Note that we don't actually do anything here except change blockState. |
| * The real work will be done in the upcoming PrepareTransaction(). |
| * We do it this way because it's not convenient to change memory context, |
| * resource owner, etc while executing inside a Portal. |
| */ |
| bool |
| PrepareTransactionBlock(const char *gid) |
| { |
| TransactionState s; |
| bool result; |
| |
| /* Set up to commit the current transaction */ |
| result = EndTransactionBlock(false); |
| |
| /* If successful, change outer tblock state to PREPARE */ |
| if (result) |
| { |
| s = CurrentTransactionState; |
| |
| while (s->parent != NULL) |
| s = s->parent; |
| |
| if (s->blockState == TBLOCK_END) |
| { |
| /* Save GID where PrepareTransaction can find it again */ |
| prepareGID = MemoryContextStrdup(TopTransactionContext, gid); |
| |
| s->blockState = TBLOCK_PREPARE; |
| } |
| else |
| { |
| /* |
| * ignore case where we are not in a transaction; |
| * EndTransactionBlock already issued a warning. |
| */ |
| Assert(s->blockState == TBLOCK_STARTED || |
| s->blockState == TBLOCK_IMPLICIT_INPROGRESS); |
| /* Don't send back a PREPARE result tag... */ |
| result = false; |
| } |
| } |
| |
| return result; |
| } |
| |
| /* |
| * EndTransactionBlock |
| * This executes a COMMIT command. |
| * |
| * Since COMMIT may actually do a ROLLBACK, the result indicates what |
| * happened: true for COMMIT, false for ROLLBACK. |
| * |
| * Note that we don't actually do anything here except change blockState. |
| * The real work will be done in the upcoming CommitTransactionCommand(). |
| * We do it this way because it's not convenient to change memory context, |
| * resource owner, etc while executing inside a Portal. |
| */ |
| bool |
| EndTransactionBlock(bool chain) |
| { |
| TransactionState s = CurrentTransactionState; |
| bool result = false; |
| |
| switch (s->blockState) |
| { |
| /* |
| * We are in a transaction block, so tell CommitTransactionCommand |
| * to COMMIT. |
| */ |
| case TBLOCK_INPROGRESS: |
| s->blockState = TBLOCK_END; |
| result = true; |
| break; |
| |
| /* |
| * We are in an implicit transaction block. If AND CHAIN was |
| * specified, error. Otherwise commit, but issue a warning |
| * because there was no explicit BEGIN before this. |
| */ |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| if (chain) |
| ereport(ERROR, |
| (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), |
| /* translator: %s represents an SQL statement name */ |
| errmsg("%s can only be used in transaction blocks", |
| "COMMIT AND CHAIN"))); |
| else |
| ereport(WARNING, |
| (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), |
| errmsg("there is no transaction in progress"))); |
| s->blockState = TBLOCK_END; |
| result = true; |
| break; |
| |
| /* |
| * We are in a failed transaction block. Tell |
| * CommitTransactionCommand it's time to exit the block. |
| */ |
| case TBLOCK_ABORT: |
| s->blockState = TBLOCK_ABORT_END; |
| break; |
| |
| /* |
| * We are in a live subtransaction block. Set up to subcommit all |
| * open subtransactions and then commit the main transaction. |
| */ |
| case TBLOCK_SUBINPROGRESS: |
| while (s->parent != NULL) |
| { |
| if (s->blockState == TBLOCK_SUBINPROGRESS) |
| s->blockState = TBLOCK_SUBCOMMIT; |
| else |
| elog(FATAL, "EndTransactionBlock: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| s = s->parent; |
| } |
| if (s->blockState == TBLOCK_INPROGRESS) |
| s->blockState = TBLOCK_END; |
| else |
| elog(FATAL, "EndTransactionBlock: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| result = true; |
| break; |
| |
| /* |
| * Here we are inside an aborted subtransaction. Treat the COMMIT |
| * as ROLLBACK: set up to abort everything and exit the main |
| * transaction. |
| */ |
| case TBLOCK_SUBABORT: |
| while (s->parent != NULL) |
| { |
| if (s->blockState == TBLOCK_SUBINPROGRESS) |
| s->blockState = TBLOCK_SUBABORT_PENDING; |
| else if (s->blockState == TBLOCK_SUBABORT) |
| s->blockState = TBLOCK_SUBABORT_END; |
| else |
| elog(FATAL, "EndTransactionBlock: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| s = s->parent; |
| } |
| if (s->blockState == TBLOCK_INPROGRESS) |
| s->blockState = TBLOCK_ABORT_PENDING; |
| else if (s->blockState == TBLOCK_ABORT) |
| s->blockState = TBLOCK_ABORT_END; |
| else |
| elog(FATAL, "EndTransactionBlock: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| break; |
| |
| /* |
| * The user issued COMMIT when not inside a transaction. For |
| * COMMIT without CHAIN, issue a WARNING, staying in |
| * TBLOCK_STARTED state. The upcoming call to |
| * CommitTransactionCommand() will then close the transaction and |
| * put us back into the default state. For COMMIT AND CHAIN, |
| * error. |
| */ |
| case TBLOCK_STARTED: |
| if (chain) |
| ereport(ERROR, |
| (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), |
| /* translator: %s represents an SQL statement name */ |
| errmsg("%s can only be used in transaction blocks", |
| "COMMIT AND CHAIN"))); |
| else |
| ereport((Gp_role == GP_ROLE_EXECUTE) ? DEBUG2 : WARNING, |
| (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), |
| errmsg("there is no transaction in progress"))); |
| result = true; |
| break; |
| |
| /* |
| * The user issued a COMMIT that somehow ran inside a parallel |
| * worker. We can't cope with that. |
| */ |
| case TBLOCK_PARALLEL_INPROGRESS: |
| ereport(FATAL, |
| (errcode(ERRCODE_INVALID_TRANSACTION_STATE), |
| errmsg("cannot commit during a parallel operation"))); |
| break; |
| |
| /* These cases are invalid. */ |
| case TBLOCK_DEFAULT: |
| case TBLOCK_BEGIN: |
| case TBLOCK_SUBBEGIN: |
| case TBLOCK_END: |
| case TBLOCK_SUBRELEASE: |
| case TBLOCK_SUBCOMMIT: |
| case TBLOCK_ABORT_END: |
| case TBLOCK_SUBABORT_END: |
| case TBLOCK_ABORT_PENDING: |
| case TBLOCK_SUBABORT_PENDING: |
| case TBLOCK_SUBRESTART: |
| case TBLOCK_SUBABORT_RESTART: |
| case TBLOCK_PREPARE: |
| elog(FATAL, "EndTransactionBlock: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| break; |
| } |
| |
| Assert(s->blockState == TBLOCK_STARTED || |
| s->blockState == TBLOCK_END || |
| s->blockState == TBLOCK_ABORT_END || |
| s->blockState == TBLOCK_ABORT_PENDING); |
| |
| s->chain = chain; |
| |
| return result; |
| } |
| |
| /* |
| * UserAbortTransactionBlock |
| * This executes a ROLLBACK command. |
| * |
| * As above, we don't actually do anything here except change blockState. |
| */ |
| void |
| UserAbortTransactionBlock(bool chain) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| switch (s->blockState) |
| { |
| /* |
| * We are inside a transaction block and we got a ROLLBACK command |
| * from the user, so tell CommitTransactionCommand to abort and |
| * exit the transaction block. |
| */ |
| case TBLOCK_INPROGRESS: |
| s->blockState = TBLOCK_ABORT_PENDING; |
| break; |
| |
| /* |
| * We are inside a failed transaction block and we got a ROLLBACK |
| * command from the user. Abort processing is already done, so |
| * CommitTransactionCommand just has to cleanup and go back to |
| * idle state. |
| */ |
| case TBLOCK_ABORT: |
| s->blockState = TBLOCK_ABORT_END; |
| break; |
| |
| /* |
| * We are inside a subtransaction. Mark everything up to top |
| * level as exitable. |
| */ |
| case TBLOCK_SUBINPROGRESS: |
| case TBLOCK_SUBABORT: |
| while (s->parent != NULL) |
| { |
| if (s->blockState == TBLOCK_SUBINPROGRESS) |
| s->blockState = TBLOCK_SUBABORT_PENDING; |
| else if (s->blockState == TBLOCK_SUBABORT) |
| s->blockState = TBLOCK_SUBABORT_END; |
| else |
| elog(FATAL, "UserAbortTransactionBlock: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| s = s->parent; |
| } |
| if (s->blockState == TBLOCK_INPROGRESS) |
| s->blockState = TBLOCK_ABORT_PENDING; |
| else if (s->blockState == TBLOCK_ABORT) |
| s->blockState = TBLOCK_ABORT_END; |
| else |
| elog(FATAL, "UserAbortTransactionBlock: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| break; |
| |
| /* |
| * The user issued ABORT when not inside a transaction. For |
| * ROLLBACK without CHAIN, issue a WARNING and go to abort state. |
| * The upcoming call to CommitTransactionCommand() will then put |
| * us back into the default state. For ROLLBACK AND CHAIN, error. |
| * |
| * We do the same thing with ABORT inside an implicit transaction, |
| * although in this case we might be rolling back actual database |
| * state changes. (It's debatable whether we should issue a |
| * WARNING in this case, but we have done so historically.) |
| */ |
| case TBLOCK_STARTED: |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| if (chain) |
| ereport(ERROR, |
| (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), |
| /* translator: %s represents an SQL statement name */ |
| errmsg("%s can only be used in transaction blocks", |
| "ROLLBACK AND CHAIN"))); |
| else |
| ereport((Gp_role == GP_ROLE_EXECUTE) ? DEBUG2 : WARNING, |
| (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), |
| errmsg("there is no transaction in progress"))); |
| s->blockState = TBLOCK_ABORT_PENDING; |
| break; |
| |
| /* |
| * The user issued an ABORT that somehow ran inside a parallel |
| * worker. We can't cope with that. |
| */ |
| case TBLOCK_PARALLEL_INPROGRESS: |
| ereport(FATAL, |
| (errcode(ERRCODE_INVALID_TRANSACTION_STATE), |
| errmsg("cannot abort during a parallel operation"))); |
| break; |
| |
| /* These cases are invalid. */ |
| case TBLOCK_DEFAULT: |
| case TBLOCK_BEGIN: |
| case TBLOCK_SUBBEGIN: |
| case TBLOCK_END: |
| case TBLOCK_SUBRELEASE: |
| case TBLOCK_SUBCOMMIT: |
| case TBLOCK_ABORT_END: |
| case TBLOCK_SUBABORT_END: |
| case TBLOCK_ABORT_PENDING: |
| case TBLOCK_SUBABORT_PENDING: |
| case TBLOCK_SUBRESTART: |
| case TBLOCK_SUBABORT_RESTART: |
| case TBLOCK_PREPARE: |
| elog(FATAL, "UserAbortTransactionBlock: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| break; |
| } |
| |
| Assert(s->blockState == TBLOCK_ABORT_END || |
| s->blockState == TBLOCK_ABORT_PENDING); |
| |
| s->chain = chain; |
| } |
| |
| /* |
| * BeginImplicitTransactionBlock |
| * Start an implicit transaction block if we're not already in one. |
| * |
| * Unlike BeginTransactionBlock, this is called directly from the main loop |
| * in postgres.c, not within a Portal. So we can just change blockState |
| * without a lot of ceremony. We do not expect caller to do |
| * CommitTransactionCommand/StartTransactionCommand. |
| */ |
| void |
| BeginImplicitTransactionBlock(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| /* |
| * If we are in STARTED state (that is, no transaction block is open), |
| * switch to IMPLICIT_INPROGRESS state, creating an implicit transaction |
| * block. |
| * |
| * For caller convenience, we consider all other transaction states as |
| * legal here; otherwise the caller would need its own state check, which |
| * seems rather pointless. |
| */ |
| if (s->blockState == TBLOCK_STARTED) |
| s->blockState = TBLOCK_IMPLICIT_INPROGRESS; |
| } |
| |
| /* |
| * EndImplicitTransactionBlock |
| * End an implicit transaction block, if we're in one. |
| * |
| * Like EndTransactionBlock, we just make any needed blockState change here. |
| * The real work will be done in the upcoming CommitTransactionCommand(). |
| */ |
| void |
| EndImplicitTransactionBlock(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| /* |
| * If we are in IMPLICIT_INPROGRESS state, switch back to STARTED state, |
| * allowing CommitTransactionCommand to commit whatever happened during |
| * the implicit transaction block as though it were a single statement. |
| * |
| * For caller convenience, we consider all other transaction states as |
| * legal here; otherwise the caller would need its own state check, which |
| * seems rather pointless. |
| */ |
| if (s->blockState == TBLOCK_IMPLICIT_INPROGRESS) |
| s->blockState = TBLOCK_STARTED; |
| } |
| |
| void |
| DefineDispatchSavepoint(char *name) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| if ((s->blockState != TBLOCK_INPROGRESS) && |
| (s->blockState != TBLOCK_SUBINPROGRESS)) |
| { |
| if (s->blockState == TBLOCK_IMPLICIT_INPROGRESS) |
| ereport(ERROR, |
| (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), |
| /* translator: %s represents an SQL statement name */ |
| errmsg("%s can only be used in transaction blocks", |
| "SAVEPOINT"))); |
| else |
| elog(FATAL, "DefineSavepoint: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| } |
| |
| /* First we attempt to create on the QEs */ |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| char *cmd; |
| |
| cmd = psprintf("SAVEPOINT %s", quote_identifier(name)); |
| |
| /* |
| * dispatch a DTX command, in the event of an error, this call |
| * will either exit via elog()/ereport() or return false |
| */ |
| if (!dispatchDtxCommand(cmd)) |
| elog(ERROR, "Could not create a new savepoint (%s)", cmd); |
| |
| pfree(cmd); |
| } |
| |
| DefineSavepoint(name); |
| } |
| |
| /* |
| * DefineSavepoint |
| * This executes a SAVEPOINT command. |
| */ |
| void |
| DefineSavepoint(const char *name) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| /* |
| * Workers synchronize transaction state at the beginning of each parallel |
| * operation, so we can't account for new subtransactions after that |
| * point. (Note that this check will certainly error out if s->blockState |
| * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case |
| * below.) |
| */ |
| if (IsInParallelMode()) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TRANSACTION_STATE), |
| errmsg("cannot define savepoints during a parallel operation"))); |
| |
| switch (s->blockState) |
| { |
| case TBLOCK_INPROGRESS: |
| case TBLOCK_SUBINPROGRESS: |
| /* Normal subtransaction start */ |
| PushTransaction(); |
| s = CurrentTransactionState; /* changed by push */ |
| |
| /* |
| * Savepoint names, like the TransactionState block itself, live |
| * in TopTransactionContext. |
| */ |
| if (name) |
| s->name = MemoryContextStrdup(TopTransactionContext, name); |
| break; |
| |
| /* |
| * We disallow savepoint commands in implicit transaction blocks. |
| * There would be no great difficulty in allowing them so far as |
| * this module is concerned, but a savepoint seems inconsistent |
| * with exec_simple_query's behavior of abandoning the whole query |
| * string upon error. Also, the point of an implicit transaction |
| * block (as opposed to a regular one) is to automatically close |
| * after an error, so it's hard to see how a savepoint would fit |
| * into that. |
| * |
| * The error messages for this are phrased as if there were no |
| * active transaction block at all, which is historical but |
| * perhaps could be improved. |
| */ |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| ereport(ERROR, |
| (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), |
| /* translator: %s represents an SQL statement name */ |
| errmsg("%s can only be used in transaction blocks", |
| "SAVEPOINT"))); |
| break; |
| |
| /* These cases are invalid. */ |
| case TBLOCK_DEFAULT: |
| case TBLOCK_STARTED: |
| case TBLOCK_BEGIN: |
| case TBLOCK_PARALLEL_INPROGRESS: |
| case TBLOCK_SUBBEGIN: |
| case TBLOCK_END: |
| case TBLOCK_SUBRELEASE: |
| case TBLOCK_SUBCOMMIT: |
| case TBLOCK_ABORT: |
| case TBLOCK_SUBABORT: |
| case TBLOCK_ABORT_END: |
| case TBLOCK_SUBABORT_END: |
| case TBLOCK_ABORT_PENDING: |
| case TBLOCK_SUBABORT_PENDING: |
| case TBLOCK_SUBRESTART: |
| case TBLOCK_SUBABORT_RESTART: |
| case TBLOCK_PREPARE: |
| elog(FATAL, "DefineSavepoint: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| break; |
| } |
| } |
| |
| /* |
| * ReleaseSavepoint |
| * This executes a RELEASE command. |
| * |
| * As above, we don't actually do anything here except change blockState. |
| */ |
| void |
| ReleaseSavepoint(const char *name) |
| { |
| TransactionState s = CurrentTransactionState; |
| TransactionState target, |
| xact; |
| |
| /* |
| * Workers synchronize transaction state at the beginning of each parallel |
| * operation, so we can't account for transaction state change after that |
| * point. (Note that this check will certainly error out if s->blockState |
| * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case |
| * below.) |
| */ |
| if (IsInParallelMode()) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TRANSACTION_STATE), |
| errmsg("cannot release savepoints during a parallel operation"))); |
| |
| switch (s->blockState) |
| { |
| /* |
| * We can't release a savepoint if there is no savepoint defined. |
| */ |
| case TBLOCK_INPROGRESS: |
| ereport(ERROR, |
| (errcode(ERRCODE_S_E_INVALID_SPECIFICATION), |
| errmsg("savepoint \"%s\" does not exist", name))); |
| break; |
| |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| /* See comment about implicit transactions in DefineSavepoint */ |
| ereport(ERROR, |
| (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), |
| /* translator: %s represents an SQL statement name */ |
| errmsg("%s can only be used in transaction blocks", |
| "RELEASE SAVEPOINT"))); |
| break; |
| |
| /* |
| * We are in a non-aborted subtransaction. This is the only valid |
| * case. |
| */ |
| case TBLOCK_SUBINPROGRESS: |
| break; |
| |
| /* These cases are invalid. */ |
| case TBLOCK_DEFAULT: |
| case TBLOCK_STARTED: |
| case TBLOCK_BEGIN: |
| case TBLOCK_PARALLEL_INPROGRESS: |
| case TBLOCK_SUBBEGIN: |
| case TBLOCK_END: |
| case TBLOCK_SUBRELEASE: |
| case TBLOCK_SUBCOMMIT: |
| case TBLOCK_ABORT: |
| case TBLOCK_SUBABORT: |
| case TBLOCK_ABORT_END: |
| case TBLOCK_SUBABORT_END: |
| case TBLOCK_ABORT_PENDING: |
| case TBLOCK_SUBABORT_PENDING: |
| case TBLOCK_SUBRESTART: |
| case TBLOCK_SUBABORT_RESTART: |
| case TBLOCK_PREPARE: |
| elog(FATAL, "ReleaseSavepoint: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| break; |
| } |
| |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| char *cmd; |
| |
| cmd = psprintf("RELEASE SAVEPOINT %s", quote_identifier(name)); |
| |
| /* |
| * dispatch a DTX command, in the event of an error, this call will |
| * either exit via elog()/ereport() or return false |
| */ |
| if (!dispatchDtxCommand(cmd)) |
| elog(ERROR, "Could not release savepoint (%s)", cmd); |
| |
| pfree(cmd); |
| } |
| |
| for (target = s; PointerIsValid(target); target = target->parent) |
| { |
| if (PointerIsValid(target->name) && strcmp(target->name, name) == 0) |
| break; |
| } |
| |
| if (!PointerIsValid(target)) |
| ereport(ERROR, |
| (errcode(ERRCODE_S_E_INVALID_SPECIFICATION), |
| errmsg("savepoint \"%s\" does not exist", name))); |
| |
| /* disallow crossing savepoint level boundaries */ |
| if (target->savepointLevel != s->savepointLevel) |
| ereport(ERROR, |
| (errcode(ERRCODE_S_E_INVALID_SPECIFICATION), |
| errmsg("savepoint \"%s\" does not exist within current savepoint level", name))); |
| |
| /* |
| * Mark "commit pending" all subtransactions up to the target |
| * subtransaction. The actual commits will happen when control gets to |
| * CommitTransactionCommand. |
| */ |
| xact = CurrentTransactionState; |
| for (;;) |
| { |
| Assert(xact->blockState == TBLOCK_SUBINPROGRESS); |
| xact->blockState = TBLOCK_SUBRELEASE; |
| if (xact == target) |
| break; |
| xact = xact->parent; |
| Assert(PointerIsValid(xact)); |
| } |
| } |
| |
| /* |
| * RollbackToSavepoint |
| * This executes a ROLLBACK TO <savepoint> command. |
| * |
| * As above, we don't actually do anything here except change blockState. |
| */ |
| void |
| RollbackToSavepoint(const char *name) |
| { |
| TransactionState s = CurrentTransactionState; |
| TransactionState target, |
| xact; |
| |
| /* |
| * Workers synchronize transaction state at the beginning of each parallel |
| * operation, so we can't account for transaction state change after that |
| * point. (Note that this check will certainly error out if s->blockState |
| * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case |
| * below.) |
| */ |
| if (IsInParallelMode()) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TRANSACTION_STATE), |
| errmsg("cannot rollback to savepoints during a parallel operation"))); |
| |
| switch (s->blockState) |
| { |
| /* |
| * We can't rollback to a savepoint if there is no savepoint |
| * defined. |
| */ |
| case TBLOCK_INPROGRESS: |
| case TBLOCK_ABORT: |
| ereport(ERROR, |
| (errcode(ERRCODE_S_E_INVALID_SPECIFICATION), |
| errmsg("savepoint \"%s\" does not exist", name))); |
| break; |
| |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| /* See comment about implicit transactions in DefineSavepoint */ |
| ereport(ERROR, |
| (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), |
| /* translator: %s represents an SQL statement name */ |
| errmsg("%s can only be used in transaction blocks", |
| "ROLLBACK TO SAVEPOINT"))); |
| break; |
| |
| /* |
| * There is at least one savepoint, so proceed. |
| */ |
| case TBLOCK_SUBINPROGRESS: |
| case TBLOCK_SUBABORT: |
| break; |
| |
| /* These cases are invalid. */ |
| case TBLOCK_DEFAULT: |
| case TBLOCK_STARTED: |
| case TBLOCK_BEGIN: |
| case TBLOCK_PARALLEL_INPROGRESS: |
| case TBLOCK_SUBBEGIN: |
| case TBLOCK_END: |
| case TBLOCK_SUBRELEASE: |
| case TBLOCK_SUBCOMMIT: |
| case TBLOCK_ABORT_END: |
| case TBLOCK_SUBABORT_END: |
| case TBLOCK_ABORT_PENDING: |
| case TBLOCK_SUBABORT_PENDING: |
| case TBLOCK_SUBRESTART: |
| case TBLOCK_SUBABORT_RESTART: |
| case TBLOCK_PREPARE: |
| elog(FATAL, "RollbackToSavepoint: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| break; |
| } |
| |
| for (target = s; PointerIsValid(target); target = target->parent) |
| { |
| if (PointerIsValid(target->name) && strcmp(target->name, name) == 0) |
| break; |
| } |
| |
| if (!PointerIsValid(target)) |
| ereport(ERROR, |
| (errcode(ERRCODE_S_E_INVALID_SPECIFICATION), |
| errmsg("savepoint \"%s\" does not exist", name))); |
| |
| /* disallow crossing savepoint level boundaries */ |
| if (target->savepointLevel != s->savepointLevel) |
| ereport(ERROR, |
| (errcode(ERRCODE_S_E_INVALID_SPECIFICATION), |
| errmsg("savepoint \"%s\" does not exist within current savepoint level", name))); |
| |
| /* |
| * Mark "abort pending" all subtransactions up to the target |
| * subtransaction. The actual aborts will happen when control gets to |
| * CommitTransactionCommand. |
| */ |
| xact = CurrentTransactionState; |
| for (;;) |
| { |
| if (xact == target) |
| break; |
| if (xact->blockState == TBLOCK_SUBINPROGRESS) |
| xact->blockState = TBLOCK_SUBABORT_PENDING; |
| else if (xact->blockState == TBLOCK_SUBABORT) |
| xact->blockState = TBLOCK_SUBABORT_END; |
| else |
| elog(FATAL, "RollbackToSavepoint: unexpected state %s", |
| BlockStateAsString(xact->blockState)); |
| xact = xact->parent; |
| Assert(PointerIsValid(xact)); |
| } |
| |
| /* And mark the target as "restart pending" */ |
| if (xact->blockState == TBLOCK_SUBINPROGRESS) |
| xact->blockState = TBLOCK_SUBRESTART; |
| else if (xact->blockState == TBLOCK_SUBABORT) |
| xact->blockState = TBLOCK_SUBABORT_RESTART; |
| else |
| elog(FATAL, "RollbackToSavepoint: unexpected state %s", |
| BlockStateAsString(xact->blockState)); |
| } |
| |
| static void |
| DispatchRollbackToSavepoint(char *name) |
| { |
| char *cmd; |
| |
| if (!name) |
| elog(ERROR, "could not find savepoint name for ROLLBACK TO SAVEPOINT"); |
| |
| cmd = psprintf("ROLLBACK TO SAVEPOINT %s", quote_identifier(name)); |
| |
| /* |
| * dispatch a DTX command, in the event of an error, this call will |
| * either exit via elog()/ereport() or return false |
| */ |
| if (!dispatchDtxCommand(cmd)) |
| ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), |
| errmsg("Could not rollback to savepoint (%s)", cmd))); |
| |
| pfree(cmd); |
| } |
| |
| /* |
| * BeginInternalSubTransaction |
| * This is the same as DefineSavepoint except it allows TBLOCK_STARTED, |
| * TBLOCK_IMPLICIT_INPROGRESS, TBLOCK_END, and TBLOCK_PREPARE states, |
| * and therefore it can safely be used in functions that might be called |
| * when not inside a BEGIN block or when running deferred triggers at |
| * COMMIT/PREPARE time. Also, it automatically does |
| * CommitTransactionCommand/StartTransactionCommand instead of expecting |
| * the caller to do it. |
| */ |
| void |
| BeginInternalSubTransaction(const char *name) |
| { |
| TransactionState s = CurrentTransactionState; |
| SIMPLE_FAULT_INJECTOR("begin_internal_sub_transaction"); |
| |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| if (!doDispatchSubtransactionInternalCmd( |
| DTX_PROTOCOL_COMMAND_SUBTRANSACTION_BEGIN_INTERNAL)) |
| { |
| elog(ERROR, |
| "Could not BeginInternalSubTransaction dispatch failed"); |
| } |
| } |
| |
| /* |
| * Workers synchronize transaction state at the beginning of each parallel |
| * operation, so we can't account for new subtransactions after that |
| * point. We might be able to make an exception for the type of |
| * subtransaction established by this function, which is typically used in |
| * contexts where we're going to release or roll back the subtransaction |
| * before proceeding further, so that no enduring change to the |
| * transaction state occurs. For now, however, we prohibit this case along |
| * with all the others. |
| */ |
| if (IsInParallelMode()) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TRANSACTION_STATE), |
| errmsg("cannot start subtransactions during a parallel operation"))); |
| |
| switch (s->blockState) |
| { |
| case TBLOCK_STARTED: |
| case TBLOCK_INPROGRESS: |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| case TBLOCK_END: |
| case TBLOCK_PREPARE: |
| case TBLOCK_SUBINPROGRESS: |
| /* Normal subtransaction start */ |
| PushTransaction(); |
| s = CurrentTransactionState; /* changed by push */ |
| |
| /* |
| * Savepoint names, like the TransactionState block itself, live |
| * in TopTransactionContext. |
| */ |
| if (name) |
| s->name = MemoryContextStrdup(TopTransactionContext, name); |
| break; |
| |
| /* These cases are invalid. */ |
| case TBLOCK_DEFAULT: |
| case TBLOCK_BEGIN: |
| case TBLOCK_PARALLEL_INPROGRESS: |
| case TBLOCK_SUBBEGIN: |
| case TBLOCK_SUBRELEASE: |
| case TBLOCK_SUBCOMMIT: |
| case TBLOCK_ABORT: |
| case TBLOCK_SUBABORT: |
| case TBLOCK_ABORT_END: |
| case TBLOCK_SUBABORT_END: |
| case TBLOCK_ABORT_PENDING: |
| case TBLOCK_SUBABORT_PENDING: |
| case TBLOCK_SUBRESTART: |
| case TBLOCK_SUBABORT_RESTART: |
| elog(FATAL, "BeginInternalSubTransaction: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| break; |
| } |
| |
| CommitTransactionCommand(); |
| StartTransactionCommand(); |
| } |
| |
| /* |
| * ReleaseCurrentSubTransaction |
| * |
| * RELEASE (ie, commit) the innermost subtransaction, regardless of its |
| * savepoint name (if any). |
| * NB: do NOT use CommitTransactionCommand/StartTransactionCommand with this. |
| */ |
| void |
| ReleaseCurrentSubTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| /* |
| * Workers synchronize transaction state at the beginning of each parallel |
| * operation, so we can't account for commit of subtransactions after that |
| * point. This should not happen anyway. Code calling this would |
| * typically have called BeginInternalSubTransaction() first, failing |
| * there. |
| */ |
| if (IsInParallelMode()) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TRANSACTION_STATE), |
| errmsg("cannot commit subtransactions during a parallel operation"))); |
| |
| if (s->blockState != TBLOCK_SUBINPROGRESS) |
| elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| Assert(s->state == TRANS_INPROGRESS); |
| |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| if (!doDispatchSubtransactionInternalCmd( |
| DTX_PROTOCOL_COMMAND_SUBTRANSACTION_RELEASE_INTERNAL)) |
| { |
| elog(ERROR, |
| "Could not ReleaseCurrentSubTransaction dispatch failed"); |
| } |
| } |
| |
| MemoryContextSwitchTo(CurTransactionContext); |
| CommitSubTransaction(); |
| s = CurrentTransactionState; /* changed by pop */ |
| Assert(s->state == TRANS_INPROGRESS); |
| } |
| |
| /* |
| * RollbackAndReleaseCurrentSubTransaction |
| * |
| * ROLLBACK and RELEASE (ie, abort) the innermost subtransaction, regardless |
| * of its savepoint name (if any). |
| * NB: do NOT use CommitTransactionCommand/StartTransactionCommand with this. |
| */ |
| void |
| RollbackAndReleaseCurrentSubTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| /* |
| * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted |
| * during parallel operations. That's because we may be in the leader, |
| * recovering from an error thrown while we were in parallel mode. We |
| * won't reach here in a worker, because BeginInternalSubTransaction() |
| * will have failed. |
| */ |
| |
| switch (s->blockState) |
| { |
| /* Must be in a subtransaction */ |
| case TBLOCK_SUBINPROGRESS: |
| case TBLOCK_SUBABORT: |
| break; |
| |
| /* These cases are invalid. */ |
| case TBLOCK_DEFAULT: |
| case TBLOCK_STARTED: |
| case TBLOCK_BEGIN: |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| case TBLOCK_PARALLEL_INPROGRESS: |
| case TBLOCK_SUBBEGIN: |
| case TBLOCK_INPROGRESS: |
| case TBLOCK_END: |
| case TBLOCK_SUBRELEASE: |
| case TBLOCK_SUBCOMMIT: |
| case TBLOCK_ABORT: |
| case TBLOCK_ABORT_END: |
| case TBLOCK_SUBABORT_END: |
| case TBLOCK_ABORT_PENDING: |
| case TBLOCK_SUBABORT_PENDING: |
| case TBLOCK_SUBRESTART: |
| case TBLOCK_SUBABORT_RESTART: |
| case TBLOCK_PREPARE: |
| elog(FATAL, "RollbackAndReleaseCurrentSubTransaction: unexpected state %s", |
| BlockStateAsString(s->blockState)); |
| break; |
| } |
| |
| /* |
| * Abort the current subtransaction, if needed. |
| */ |
| if (s->blockState == TBLOCK_SUBINPROGRESS) |
| AbortSubTransaction(); |
| |
| /* And clean it up, too */ |
| CleanupSubTransaction(); |
| |
| s = CurrentTransactionState; /* changed by pop */ |
| AssertState(s->blockState == TBLOCK_SUBINPROGRESS || |
| s->blockState == TBLOCK_INPROGRESS || |
| s->blockState == TBLOCK_IMPLICIT_INPROGRESS || |
| s->blockState == TBLOCK_STARTED); |
| |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| if (!doDispatchSubtransactionInternalCmd( |
| DTX_PROTOCOL_COMMAND_SUBTRANSACTION_ROLLBACK_INTERNAL)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), |
| errmsg("DTX RollbackAndReleaseCurrentSubTransaction dispatch failed"))); |
| } |
| } |
| } |
| |
| /* |
| * AbortOutOfAnyTransaction |
| * |
| * This routine is provided for error recovery purposes. It aborts any |
| * active transaction or transaction block, leaving the system in a known |
| * idle state. |
| */ |
| void |
| AbortOutOfAnyTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| /* Ensure we're not running in a doomed memory context */ |
| AtAbort_Memory(); |
| |
| /* |
| * Cloudberry specific behavior: |
| * Some QEs might already be in Abort State, they still need |
| * to reset Extension related global vars, so we invoke them |
| * here (not AbortTransction). |
| */ |
| if (Gp_role == GP_ROLE_EXECUTE) |
| ResetExtensionCreatingGlobalVarsOnQE(); |
| |
| /* |
| * Get out of any transaction or nested transaction |
| */ |
| do |
| { |
| switch (s->blockState) |
| { |
| case TBLOCK_DEFAULT: |
| if (s->state == TRANS_DEFAULT) |
| { |
| /* Not in a transaction, do nothing */ |
| } |
| else |
| { |
| /* |
| * We can get here after an error during transaction start |
| * (state will be TRANS_START). Need to clean up the |
| * incompletely started transaction. First, adjust the |
| * low-level state to suppress warning message from |
| * AbortTransaction. |
| */ |
| if (s->state == TRANS_START) |
| s->state = TRANS_INPROGRESS; |
| AbortTransaction(); |
| CleanupTransaction(); |
| } |
| break; |
| case TBLOCK_STARTED: |
| case TBLOCK_BEGIN: |
| case TBLOCK_INPROGRESS: |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| case TBLOCK_PARALLEL_INPROGRESS: |
| case TBLOCK_END: |
| case TBLOCK_ABORT_PENDING: |
| case TBLOCK_PREPARE: |
| /* In a transaction, so clean up */ |
| AbortTransaction(); |
| CleanupTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| |
| Assert(DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY); |
| break; |
| case TBLOCK_ABORT: |
| case TBLOCK_ABORT_END: |
| |
| /* |
| * AbortTransaction is already done, still need Cleanup. |
| * However, if we failed partway through running ROLLBACK, |
| * there will be an active portal running that command, which |
| * we need to shut down before doing CleanupTransaction. |
| */ |
| AtAbort_Portals(); |
| CleanupTransaction(); |
| s->blockState = TBLOCK_DEFAULT; |
| |
| Assert(DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY); |
| break; |
| |
| /* |
| * In a subtransaction, so clean it up and abort parent too |
| */ |
| case TBLOCK_SUBBEGIN: |
| case TBLOCK_SUBINPROGRESS: |
| case TBLOCK_SUBRELEASE: |
| case TBLOCK_SUBCOMMIT: |
| case TBLOCK_SUBABORT_PENDING: |
| case TBLOCK_SUBRESTART: |
| AbortSubTransaction(); |
| CleanupSubTransaction(); |
| s = CurrentTransactionState; /* changed by pop */ |
| break; |
| |
| case TBLOCK_SUBABORT: |
| case TBLOCK_SUBABORT_END: |
| case TBLOCK_SUBABORT_RESTART: |
| /* As above, but AbortSubTransaction already done */ |
| if (s->curTransactionOwner) |
| { |
| /* As in TBLOCK_ABORT, might have a live portal to zap */ |
| AtSubAbort_Portals(s->subTransactionId, |
| s->parent->subTransactionId, |
| s->curTransactionOwner, |
| s->parent->curTransactionOwner); |
| } |
| CleanupSubTransaction(); |
| s = CurrentTransactionState; /* changed by pop */ |
| break; |
| } |
| } while (s->blockState != TBLOCK_DEFAULT); |
| |
| /* Should be out of all subxacts now */ |
| Assert(s->parent == NULL); |
| |
| /* If we didn't actually have anything to do, revert to TopMemoryContext */ |
| AtCleanup_Memory(); |
| } |
| |
| /* |
| * IsTransactionBlock --- are we within a transaction block? |
| */ |
| bool |
| IsTransactionBlock(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| if (s->blockState == TBLOCK_DEFAULT || s->blockState == TBLOCK_STARTED) |
| return false; |
| |
| return true; |
| } |
| |
| /* |
| * IsTransactionOrTransactionBlock --- are we within either a transaction |
| * or a transaction block? (The backend is only really "idle" when this |
| * returns false.) |
| * |
| * This should match up with IsTransactionBlock and IsTransactionState. |
| */ |
| bool |
| IsTransactionOrTransactionBlock(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| if (s->blockState == TBLOCK_DEFAULT) |
| return false; |
| |
| return true; |
| } |
| |
| void |
| ExecutorMarkTransactionUsesSequences(void) |
| { |
| seqXlogWrite = true; |
| |
| ForceSyncCommit(); |
| } |
| |
| void |
| ExecutorMarkTransactionDoesWrites(void) |
| { |
| // UNDONE: Verify we are in transaction... |
| if (!TopTransactionStateData.executorSaysXactDoesWrites) |
| { |
| ereportif(Debug_print_full_dtm, LOG, |
| (errmsg("ExecutorMarkTransactionDoesWrites called"))); |
| TopTransactionStateData.executorSaysXactDoesWrites = true; |
| } |
| } |
| |
| bool |
| ExecutorSaysTransactionDoesWrites(void) |
| { |
| return TopTransactionStateData.executorSaysXactDoesWrites; |
| } |
| |
| /* |
| * TransactionBlockStatusCode - return status code to send in ReadyForQuery |
| */ |
| char |
| TransactionBlockStatusCode(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| switch (s->blockState) |
| { |
| case TBLOCK_DEFAULT: |
| case TBLOCK_STARTED: |
| return 'I'; /* idle --- not in transaction */ |
| case TBLOCK_BEGIN: |
| case TBLOCK_SUBBEGIN: |
| case TBLOCK_INPROGRESS: |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| case TBLOCK_PARALLEL_INPROGRESS: |
| case TBLOCK_SUBINPROGRESS: |
| case TBLOCK_END: |
| case TBLOCK_SUBRELEASE: |
| case TBLOCK_SUBCOMMIT: |
| case TBLOCK_PREPARE: |
| return 'T'; /* in transaction */ |
| case TBLOCK_ABORT: |
| case TBLOCK_SUBABORT: |
| case TBLOCK_ABORT_END: |
| case TBLOCK_SUBABORT_END: |
| case TBLOCK_ABORT_PENDING: |
| case TBLOCK_SUBABORT_PENDING: |
| case TBLOCK_SUBRESTART: |
| case TBLOCK_SUBABORT_RESTART: |
| return 'E'; /* in failed transaction */ |
| } |
| |
| /* should never get here */ |
| elog(FATAL, "invalid transaction block state: %s", |
| BlockStateAsString(s->blockState)); |
| return 0; /* keep compiler quiet */ |
| } |
| |
| /* |
| * IsSubTransaction |
| */ |
| bool |
| IsSubTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| if (s->nestingLevel >= 2) |
| return true; |
| |
| return false; |
| } |
| |
| /* |
| * StartSubTransaction |
| * |
| * If you're wondering why this is separate from PushTransaction: it's because |
| * we can't conveniently do this stuff right inside DefineSavepoint. The |
| * SAVEPOINT utility command will be executed inside a Portal, and if we |
| * muck with CurrentMemoryContext or CurrentResourceOwner then exit from |
| * the Portal will undo those settings. So we make DefineSavepoint just |
| * push a dummy transaction block, and when control returns to the main |
| * idle loop, CommitTransactionCommand will be called, and we'll come here |
| * to finish starting the subtransaction. |
| */ |
| static void |
| StartSubTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| if (s->state != TRANS_DEFAULT) |
| elog(WARNING, "StartSubTransaction while in %s state", |
| TransStateAsString(s->state)); |
| |
| s->state = TRANS_START; |
| |
| /* |
| * Initialize subsystems for new subtransaction |
| * |
| * must initialize resource-management stuff first |
| */ |
| AtSubStart_Memory(); |
| AtSubStart_ResourceOwner(); |
| AfterTriggerBeginSubXact(); |
| |
| s->state = TRANS_INPROGRESS; |
| |
| /* |
| * Call start-of-subxact callbacks |
| */ |
| CallSubXactCallbacks(SUBXACT_EVENT_START_SUB, s->subTransactionId, |
| s->parent->subTransactionId); |
| |
| ShowTransactionState("StartSubTransaction"); |
| } |
| |
| /* |
| * CommitSubTransaction |
| * |
| * The caller has to make sure to always reassign CurrentTransactionState |
| * if it has a local pointer to it after calling this function. |
| */ |
| static void |
| CommitSubTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| ShowTransactionState("CommitSubTransaction"); |
| |
| if (s->state != TRANS_INPROGRESS) |
| elog(WARNING, "CommitSubTransaction while in %s state", |
| TransStateAsString(s->state)); |
| |
| /* Pre-commit processing goes here */ |
| |
| CallSubXactCallbacks(SUBXACT_EVENT_PRE_COMMIT_SUB, s->subTransactionId, |
| s->parent->subTransactionId); |
| |
| /* If in parallel mode, clean up workers and exit parallel mode. */ |
| if (IsInParallelMode()) |
| { |
| AtEOSubXact_Parallel(true, s->subTransactionId); |
| s->parallelModeLevel = 0; |
| } |
| |
| /* Do the actual "commit", such as it is */ |
| s->state = TRANS_COMMIT; |
| |
| /* Must CCI to ensure commands of subtransaction are seen as done */ |
| CommandCounterIncrement(); |
| |
| /* |
| * Prior to 8.4 we marked subcommit in clog at this point. We now only |
| * perform that step, if required, as part of the atomic update of the |
| * whole transaction tree at top level commit or abort. |
| */ |
| |
| /* Post-commit cleanup */ |
| if (FullTransactionIdIsValid(s->fullTransactionId)) |
| AtSubCommit_childXids(); |
| AfterTriggerEndSubXact(true); |
| AtSubCommit_Portals(s->subTransactionId, |
| s->parent->subTransactionId, |
| s->parent->nestingLevel, |
| s->parent->curTransactionOwner); |
| AtEOSubXact_LargeObject(true, s->subTransactionId, |
| s->parent->subTransactionId); |
| AtSubCommit_Notify(); |
| |
| CallSubXactCallbacks(SUBXACT_EVENT_COMMIT_SUB, s->subTransactionId, |
| s->parent->subTransactionId); |
| |
| ResourceOwnerRelease(s->curTransactionOwner, |
| RESOURCE_RELEASE_BEFORE_LOCKS, |
| true, false); |
| AtEOSubXact_RelationCache(true, s->subTransactionId, |
| s->parent->subTransactionId); |
| AtEOSubXact_Inval(true); |
| AtSubCommit_smgr(); |
| |
| /* |
| * The only lock we actually release here is the subtransaction XID lock. |
| */ |
| CurrentResourceOwner = s->curTransactionOwner; |
| if (FullTransactionIdIsValid(s->fullTransactionId)) |
| XactLockTableDelete(XidFromFullTransactionId(s->fullTransactionId)); |
| |
| /* |
| * Other locks should get transferred to their parent resource owner. |
| */ |
| ResourceOwnerRelease(s->curTransactionOwner, |
| RESOURCE_RELEASE_LOCKS, |
| true, false); |
| ResourceOwnerRelease(s->curTransactionOwner, |
| RESOURCE_RELEASE_AFTER_LOCKS, |
| true, false); |
| |
| AtEOXact_GUC(true, s->gucNestLevel); |
| AtEOSubXact_SPI(true, s->subTransactionId); |
| AtEOSubXact_on_commit_actions(true, s->subTransactionId, |
| s->parent->subTransactionId); |
| AtEOSubXact_Namespace(true, s->subTransactionId, |
| s->parent->subTransactionId); |
| AtEOSubXact_Files(true, s->subTransactionId, |
| s->parent->subTransactionId); |
| AtEOSubXact_HashTables(true, s->nestingLevel); |
| AtEOSubXact_PgStat(true, s->nestingLevel); |
| AtSubCommit_Snapshot(s->nestingLevel); |
| |
| /* |
| * We need to restore the upper transaction's read-only state, in case the |
| * upper is read-write while the child is read-only; GUC will incorrectly |
| * think it should leave the child state in place. |
| */ |
| XactReadOnly = s->prevXactReadOnly; |
| |
| CurrentResourceOwner = s->parent->curTransactionOwner; |
| CurTransactionResourceOwner = s->parent->curTransactionOwner; |
| ResourceOwnerDelete(s->curTransactionOwner); |
| s->curTransactionOwner = NULL; |
| |
| AtSubCommit_Memory(); |
| |
| s->state = TRANS_DEFAULT; |
| |
| PopTransaction(); |
| } |
| |
| /* |
| * AbortSubTransaction |
| */ |
| static void |
| AbortSubTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| /* Prevent cancel/die interrupt while cleaning up */ |
| HOLD_INTERRUPTS(); |
| |
| /* Make sure we have a valid memory context and resource owner */ |
| AtSubAbort_Memory(); |
| AtSubAbort_ResourceOwner(); |
| |
| /* |
| * Release any LW locks we might be holding as quickly as possible. |
| * (Regular locks, however, must be held till we finish aborting.) |
| * Releasing LW locks is critical since we might try to grab them again |
| * while cleaning up! |
| * |
| * FIXME This may be incorrect --- Are there some locks we should keep? |
| * Buffer locks, for example? I don't think so but I'm not sure. |
| */ |
| LWLockReleaseAll(); |
| |
| pgstat_report_wait_end(); |
| pgstat_progress_end_command(); |
| AbortBufferIO(); |
| UnlockBuffers(); |
| |
| /* Clean up hash entries for incremental view maintenance */ |
| AtAbort_IVM(); |
| |
| /* Reset WAL record construction state */ |
| XLogResetInsertion(); |
| |
| /* Cancel condition variable sleep */ |
| ConditionVariableCancelSleep(); |
| |
| /* |
| * Also clean up any open wait for lock, since the lock manager will choke |
| * if we try to wait for another lock before doing this. |
| */ |
| LockErrorCleanup(); |
| |
| /* |
| * If any timeout events are still active, make sure the timeout interrupt |
| * is scheduled. This covers possible loss of a timeout interrupt due to |
| * longjmp'ing out of the SIGINT handler (see notes in handle_sig_alarm). |
| * We delay this till after LockErrorCleanup so that we don't uselessly |
| * reschedule lock or deadlock check timeouts. |
| */ |
| reschedule_timeouts(); |
| |
| /* |
| * Re-enable signals, in case we got here by longjmp'ing out of a signal |
| * handler. We do this fairly early in the sequence so that the timeout |
| * infrastructure will be functional if needed while aborting. |
| */ |
| PG_SETMASK(&UnBlockSig); |
| |
| /* |
| * check the current transaction state |
| */ |
| ShowTransactionState("AbortSubTransaction"); |
| |
| if (s->state != TRANS_INPROGRESS) |
| elog(WARNING, "AbortSubTransaction while in %s state", |
| TransStateAsString(s->state)); |
| |
| s->state = TRANS_ABORT; |
| |
| /* |
| * Reset user ID which might have been changed transiently. (See notes in |
| * AbortTransaction.) |
| */ |
| SetUserIdAndSecContext(s->prevUser, s->prevSecContext); |
| |
| /* Forget about any active REINDEX. */ |
| ResetReindexState(s->nestingLevel); |
| |
| /* Reset logical streaming state. */ |
| ResetLogicalStreamingState(); |
| |
| /* |
| * No need for SnapBuildResetExportedSnapshotState() here, snapshot |
| * exports are not supported in subtransactions. |
| */ |
| |
| /* Exit from parallel mode, if necessary. */ |
| if (IsInParallelMode()) |
| { |
| AtEOSubXact_Parallel(false, s->subTransactionId); |
| s->parallelModeLevel = 0; |
| } |
| |
| /* |
| * We can skip all this stuff if the subxact failed before creating a |
| * ResourceOwner... |
| */ |
| if (s->curTransactionOwner) |
| { |
| AfterTriggerEndSubXact(false); |
| AtSubAbort_Portals(s->subTransactionId, |
| s->parent->subTransactionId, |
| s->curTransactionOwner, |
| s->parent->curTransactionOwner); |
| AtSubAbort_DispatcherState(); |
| AtEOXact_DispatchOids(false); |
| AtEOSubXact_LargeObject(false, s->subTransactionId, |
| s->parent->subTransactionId); |
| AtSubAbort_Notify(); |
| |
| /* Advertise the fact that we aborted in pg_xact. */ |
| (void) RecordTransactionAbort(true); |
| |
| /* Post-abort cleanup */ |
| if (FullTransactionIdIsValid(s->fullTransactionId)) |
| AtSubAbort_childXids(); |
| |
| CallSubXactCallbacks(SUBXACT_EVENT_ABORT_SUB, s->subTransactionId, |
| s->parent->subTransactionId); |
| |
| ResourceOwnerRelease(s->curTransactionOwner, |
| RESOURCE_RELEASE_BEFORE_LOCKS, |
| false, false); |
| AtEOSubXact_RelationCache(false, s->subTransactionId, |
| s->parent->subTransactionId); |
| AtEOSubXact_Inval(false); |
| ResourceOwnerRelease(s->curTransactionOwner, |
| RESOURCE_RELEASE_LOCKS, |
| false, false); |
| ResourceOwnerRelease(s->curTransactionOwner, |
| RESOURCE_RELEASE_AFTER_LOCKS, |
| false, false); |
| AtSubAbort_smgr(); |
| |
| AtEOXact_GUC(false, s->gucNestLevel); |
| AtEOSubXact_SPI(false, s->subTransactionId); |
| AtEOSubXact_on_commit_actions(false, s->subTransactionId, |
| s->parent->subTransactionId); |
| AtEOSubXact_Namespace(false, s->subTransactionId, |
| s->parent->subTransactionId); |
| AtEOSubXact_Files(false, s->subTransactionId, |
| s->parent->subTransactionId); |
| AtEOSubXact_HashTables(false, s->nestingLevel); |
| AtEOSubXact_PgStat(false, s->nestingLevel); |
| AtSubAbort_Snapshot(s->nestingLevel); |
| } |
| |
| /* |
| * Restore the upper transaction's read-only state, too. This should be |
| * redundant with GUC's cleanup but we may as well do it for consistency |
| * with the commit case. |
| */ |
| XactReadOnly = s->prevXactReadOnly; |
| |
| RESUME_INTERRUPTS(); |
| } |
| |
| /* |
| * CleanupSubTransaction |
| * |
| * The caller has to make sure to always reassign CurrentTransactionState |
| * if it has a local pointer to it after calling this function. |
| */ |
| static void |
| CleanupSubTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| ShowTransactionState("CleanupSubTransaction"); |
| |
| if (s->state != TRANS_ABORT) |
| elog(WARNING, "CleanupSubTransaction while in %s state", |
| TransStateAsString(s->state)); |
| |
| AtSubCleanup_Portals(s->subTransactionId); |
| |
| CurrentResourceOwner = s->parent->curTransactionOwner; |
| CurTransactionResourceOwner = s->parent->curTransactionOwner; |
| if (s->curTransactionOwner) |
| ResourceOwnerDelete(s->curTransactionOwner); |
| s->curTransactionOwner = NULL; |
| |
| AtSubCleanup_Memory(); |
| |
| s->state = TRANS_DEFAULT; |
| |
| PopTransaction(); |
| } |
| |
| /* |
| * PushTransaction |
| * Create transaction state stack entry for a subtransaction |
| * |
| * The caller has to make sure to always reassign CurrentTransactionState |
| * if it has a local pointer to it after calling this function. |
| */ |
| static void |
| PushTransaction(void) |
| { |
| TransactionState p = CurrentTransactionState; |
| TransactionState s; |
| |
| currentSavepointTotal++; |
| |
| if ((currentSavepointTotal >= gp_subtrans_warn_limit) && |
| (currentSavepointTotal % gp_subtrans_warn_limit == 0)) |
| { |
| ereport(WARNING, |
| (errmsg("Using too many subtransactions in one transaction."), |
| errhint("Close open transactions soon to avoid wraparound " |
| "problems."))); |
| } |
| |
| /* |
| * We keep subtransaction state nodes in TopTransactionContext. |
| */ |
| s = (TransactionState) |
| MemoryContextAllocZero(TopTransactionContext, |
| sizeof(TransactionStateData)); |
| |
| /* |
| * Assign a subtransaction ID, watching out for counter wraparound. |
| */ |
| currentSubTransactionId += 1; |
| if (currentSubTransactionId == InvalidSubTransactionId) |
| { |
| currentSubTransactionId -= 1; |
| pfree(s); |
| ereport(ERROR, |
| (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| errmsg("cannot have more than 2^32-1 subtransactions in a transaction"))); |
| } |
| |
| /* |
| * We can now stack a minimally valid subtransaction without fear of |
| * failure. |
| */ |
| s->fullTransactionId = InvalidFullTransactionId; /* until assigned */ |
| s->subTransactionId = currentSubTransactionId; |
| s->parent = p; |
| s->nestingLevel = p->nestingLevel + 1; |
| s->gucNestLevel = NewGUCNestLevel(); |
| s->savepointLevel = p->savepointLevel; |
| s->state = TRANS_DEFAULT; |
| s->blockState = TBLOCK_SUBBEGIN; |
| GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); |
| s->prevXactReadOnly = XactReadOnly; |
| s->parallelModeLevel = 0; |
| s->assigned = false; |
| |
| fastNodeCount++; |
| if (fastNodeCount == NUM_NODES_TO_SKIP_FOR_FAST_SEARCH) |
| { |
| fastNodeCount = 0; |
| s->fastLink = previousFastLink; |
| previousFastLink = s; |
| } |
| |
| CurrentTransactionState = s; |
| |
| /* |
| * AbortSubTransaction and CleanupSubTransaction have to be able to cope |
| * with the subtransaction from here on out; in particular they should not |
| * assume that it necessarily has a transaction context, resource owner, |
| * or XID. |
| */ |
| } |
| |
| /* |
| * PopTransaction |
| * Pop back to parent transaction state |
| * |
| * The caller has to make sure to always reassign CurrentTransactionState |
| * if it has a local pointer to it after calling this function. |
| */ |
| static void |
| PopTransaction(void) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| if (s->state != TRANS_DEFAULT) |
| elog(WARNING, "PopTransaction while in %s state", |
| TransStateAsString(s->state)); |
| |
| if (s->parent == NULL) |
| elog(FATAL, "PopTransaction with no parent"); |
| |
| CurrentTransactionState = s->parent; |
| |
| /* Let's just make sure CurTransactionContext is good */ |
| CurTransactionContext = s->parent->curTransactionContext; |
| MemoryContextSwitchTo(CurTransactionContext); |
| |
| /* Ditto for ResourceOwner links */ |
| CurTransactionResourceOwner = s->parent->curTransactionOwner; |
| CurrentResourceOwner = s->parent->curTransactionOwner; |
| |
| if (fastNodeCount) |
| { |
| fastNodeCount--; |
| } |
| |
| /* |
| * Deleting node where last fastLink is stored |
| * hence retrive the fastLink to update in node to be added next |
| */ |
| if (previousFastLink == s) |
| { |
| fastNodeCount = NUM_NODES_TO_SKIP_FOR_FAST_SEARCH - 1; |
| previousFastLink = s->fastLink; |
| } |
| |
| /* Free the old child structure */ |
| if (s->name) |
| pfree(s->name); |
| pfree(s); |
| } |
| |
| /* |
| * EstimateTransactionStateSpace |
| * Estimate the amount of space that will be needed by |
| * SerializeTransactionState. It would be OK to overestimate slightly, |
| * but it's simple for us to work out the precise value, so we do. |
| */ |
| Size |
| EstimateTransactionStateSpace(void) |
| { |
| TransactionState s; |
| Size nxids = 0; |
| Size size = SerializedTransactionStateHeaderSize; |
| |
| for (s = CurrentTransactionState; s != NULL; s = s->parent) |
| { |
| if (FullTransactionIdIsValid(s->fullTransactionId)) |
| nxids = add_size(nxids, 1); |
| nxids = add_size(nxids, s->nChildXids); |
| } |
| |
| return add_size(size, mul_size(sizeof(TransactionId), nxids)); |
| } |
| |
| /* |
| * SerializeTransactionState |
| * Write out relevant details of our transaction state that will be |
| * needed by a parallel worker. |
| * |
| * We need to save and restore XactDeferrable, XactIsoLevel, and the XIDs |
| * associated with this transaction. These are serialized into a |
| * caller-supplied buffer big enough to hold the number of bytes reported by |
| * EstimateTransactionStateSpace(). We emit the XIDs in sorted order for the |
| * convenience of the receiving process. |
| */ |
| void |
| SerializeTransactionState(Size maxsize, char *start_address) |
| { |
| TransactionState s; |
| Size nxids = 0; |
| Size i = 0; |
| TransactionId *workspace; |
| SerializedTransactionState *result; |
| |
| result = (SerializedTransactionState *) start_address; |
| |
| result->xactIsoLevel = XactIsoLevel; |
| result->xactDeferrable = XactDeferrable; |
| result->topFullTransactionId = XactTopFullTransactionId; |
| result->currentFullTransactionId = |
| CurrentTransactionState->fullTransactionId; |
| result->currentCommandId = currentCommandId; |
| |
| /* |
| * If we're running in a parallel worker and launching a parallel worker |
| * of our own, we can just pass along the information that was passed to |
| * us. |
| */ |
| if (nParallelCurrentXids > 0) |
| { |
| result->nParallelCurrentXids = nParallelCurrentXids; |
| memcpy(&result->parallelCurrentXids[0], ParallelCurrentXids, |
| nParallelCurrentXids * sizeof(TransactionId)); |
| return; |
| } |
| |
| /* |
| * OK, we need to generate a sorted list of XIDs that our workers should |
| * view as current. First, figure out how many there are. |
| */ |
| for (s = CurrentTransactionState; s != NULL; s = s->parent) |
| { |
| if (FullTransactionIdIsValid(s->fullTransactionId)) |
| nxids = add_size(nxids, 1); |
| nxids = add_size(nxids, s->nChildXids); |
| } |
| Assert(SerializedTransactionStateHeaderSize + nxids * sizeof(TransactionId) |
| <= maxsize); |
| |
| /* Copy them to our scratch space. */ |
| workspace = palloc(nxids * sizeof(TransactionId)); |
| for (s = CurrentTransactionState; s != NULL; s = s->parent) |
| { |
| if (FullTransactionIdIsValid(s->fullTransactionId)) |
| workspace[i++] = XidFromFullTransactionId(s->fullTransactionId); |
| if (s->nChildXids > 0) |
| memcpy(&workspace[i], s->childXids, |
| s->nChildXids * sizeof(TransactionId)); |
| i += s->nChildXids; |
| } |
| Assert(i == nxids); |
| |
| /* Sort them. */ |
| qsort(workspace, nxids, sizeof(TransactionId), xidComparator); |
| |
| /* Copy data into output area. */ |
| result->nParallelCurrentXids = nxids; |
| memcpy(&result->parallelCurrentXids[0], workspace, |
| nxids * sizeof(TransactionId)); |
| } |
| |
| /* |
| * StartParallelWorkerTransaction |
| * Start a parallel worker transaction, restoring the relevant |
| * transaction state serialized by SerializeTransactionState. |
| */ |
| void |
| StartParallelWorkerTransaction(char *tstatespace) |
| { |
| SerializedTransactionState *tstate; |
| |
| Assert(CurrentTransactionState->blockState == TBLOCK_DEFAULT); |
| StartTransaction(); |
| |
| tstate = (SerializedTransactionState *) tstatespace; |
| XactIsoLevel = tstate->xactIsoLevel; |
| XactDeferrable = tstate->xactDeferrable; |
| XactTopFullTransactionId = tstate->topFullTransactionId; |
| CurrentTransactionState->fullTransactionId = |
| tstate->currentFullTransactionId; |
| currentCommandId = tstate->currentCommandId; |
| nParallelCurrentXids = tstate->nParallelCurrentXids; |
| ParallelCurrentXids = &tstate->parallelCurrentXids[0]; |
| |
| CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS; |
| } |
| |
| /* |
| * EndParallelWorkerTransaction |
| * End a parallel worker transaction. |
| */ |
| void |
| EndParallelWorkerTransaction(void) |
| { |
| Assert(CurrentTransactionState->blockState == TBLOCK_PARALLEL_INPROGRESS); |
| CommitTransaction(); |
| CurrentTransactionState->blockState = TBLOCK_DEFAULT; |
| } |
| |
| /* |
| * ShowTransactionState |
| * Debug support |
| */ |
| static void |
| ShowTransactionState(const char *str) |
| { |
| /* skip work if message will definitely not be printed */ |
| if (message_level_is_interesting(DEBUG5)) |
| ShowTransactionStateRec(str, CurrentTransactionState); |
| } |
| |
| /* |
| * ShowTransactionStateRec |
| * Recursive subroutine for ShowTransactionState |
| */ |
| static void |
| ShowTransactionStateRec(const char *str, TransactionState s) |
| { |
| StringInfoData buf; |
| |
| initStringInfo(&buf); |
| |
| if (s->nChildXids > 0) |
| { |
| int i; |
| |
| appendStringInfo(&buf, ", children: %u", s->childXids[0]); |
| for (i = 1; i < s->nChildXids; i++) |
| appendStringInfo(&buf, " %u", s->childXids[i]); |
| } |
| |
| if (s->parent) |
| ShowTransactionStateRec(str, s->parent); |
| |
| ereport(DEBUG5, |
| (errmsg_internal("%s(%d) name: %s; blockState: %s; state: %s, xid/subid/cid: %u/%u/%u%s%s", |
| str, s->nestingLevel, |
| PointerIsValid(s->name) ? s->name : "unnamed", |
| BlockStateAsString(s->blockState), |
| TransStateAsString(s->state), |
| (unsigned int) XidFromFullTransactionId(s->fullTransactionId), |
| (unsigned int) s->subTransactionId, |
| (unsigned int) currentCommandId, |
| currentCommandIdUsed ? " (used)" : "", |
| buf.data))); |
| |
| pfree(buf.data); |
| } |
| |
| /* |
| * BlockStateAsString |
| * Debug support |
| */ |
| static const char * |
| BlockStateAsString(TBlockState blockState) |
| { |
| switch (blockState) |
| { |
| case TBLOCK_DEFAULT: |
| return "DEFAULT"; |
| case TBLOCK_STARTED: |
| return "STARTED"; |
| case TBLOCK_BEGIN: |
| return "BEGIN"; |
| case TBLOCK_INPROGRESS: |
| return "INPROGRESS"; |
| case TBLOCK_IMPLICIT_INPROGRESS: |
| return "IMPLICIT_INPROGRESS"; |
| case TBLOCK_PARALLEL_INPROGRESS: |
| return "PARALLEL_INPROGRESS"; |
| case TBLOCK_END: |
| return "END"; |
| case TBLOCK_ABORT: |
| return "ABORT"; |
| case TBLOCK_ABORT_END: |
| return "ABORT_END"; |
| case TBLOCK_ABORT_PENDING: |
| return "ABORT_PENDING"; |
| case TBLOCK_PREPARE: |
| return "PREPARE"; |
| case TBLOCK_SUBBEGIN: |
| return "SUBBEGIN"; |
| case TBLOCK_SUBINPROGRESS: |
| return "SUBINPROGRESS"; |
| case TBLOCK_SUBRELEASE: |
| return "SUBRELEASE"; |
| case TBLOCK_SUBCOMMIT: |
| return "SUBCOMMIT"; |
| case TBLOCK_SUBABORT: |
| return "SUBABORT"; |
| case TBLOCK_SUBABORT_END: |
| return "SUBABORT_END"; |
| case TBLOCK_SUBABORT_PENDING: |
| return "SUBABORT_PENDING"; |
| case TBLOCK_SUBRESTART: |
| return "SUBRESTART"; |
| case TBLOCK_SUBABORT_RESTART: |
| return "SUBABORT_RESTART"; |
| } |
| return "UNRECOGNIZED"; |
| } |
| |
| /* |
| * TransStateAsString |
| * Debug support |
| */ |
| static const char * |
| TransStateAsString(TransState state) |
| { |
| switch (state) |
| { |
| case TRANS_DEFAULT: |
| return "DEFAULT"; |
| case TRANS_START: |
| return "START"; |
| case TRANS_INPROGRESS: |
| return "INPROGRESS"; |
| case TRANS_COMMIT: |
| return "COMMIT"; |
| case TRANS_ABORT: |
| return "ABORT"; |
| case TRANS_PREPARE: |
| return "PREPARE"; |
| } |
| return "UNRECOGNIZED"; |
| } |
| |
| /* |
| * EndLocalDistribXact |
| */ |
| static void |
| EndLocalDistribXact(bool isCommit) |
| { |
| if (MyProc->localDistribXactData.state == LOCALDISTRIBXACT_STATE_NONE) |
| return; |
| |
| /* |
| * MyProc->localDistribXactData is access by backend itself only hence okay |
| * to modify without holding the lock. |
| */ |
| switch (DistributedTransactionContext) |
| { |
| case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER: |
| case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER: |
| case DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT: |
| case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE: |
| case DTX_CONTEXT_QD_RETRY_PHASE_2: |
| case DTX_CONTEXT_LOCAL_ONLY: |
| AssertImply(DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY, |
| Gp_role == GP_ROLE_UTILITY || IsAutoVacuumWorkerProcess()); |
| LocalDistribXact_ChangeState(MyProc->pgprocno, |
| isCommit ? |
| LOCALDISTRIBXACT_STATE_COMMITTED : |
| LOCALDISTRIBXACT_STATE_ABORTED); |
| break; |
| |
| case DTX_CONTEXT_QE_READER: |
| case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON: |
| // QD or QE Writer will handle it. |
| break; |
| |
| case DTX_CONTEXT_QE_PREPARED: |
| case DTX_CONTEXT_QE_FINISH_PREPARED: |
| elog(PANIC, "Unexpected distribute transaction context: '%s'", |
| DtxContextToString(DistributedTransactionContext)); |
| break; |
| |
| default: |
| elog(PANIC, "Unrecognized DTX transaction context: %d", |
| (int) DistributedTransactionContext); |
| break; |
| } |
| } |
| |
| /* |
| * IsoLevelAsUpperString |
| * Formatting helper. |
| */ |
| const char * |
| IsoLevelAsUpperString(int IsoLevel) |
| { |
| switch (IsoLevel) |
| { |
| case XACT_READ_UNCOMMITTED: |
| return "READ UNCOMMITTED"; |
| case XACT_READ_COMMITTED: |
| return "READ COMMITTED"; |
| case XACT_REPEATABLE_READ: |
| return "REPEATABLE READ"; |
| case XACT_SERIALIZABLE: |
| return "SERIALIZABLE"; |
| default: |
| return "UNKNOWN"; |
| } |
| } |
| |
| |
| /* |
| * xactGetCommittedChildren |
| * |
| * Gets the list of committed children of the current transaction. The return |
| * value is the number of child transactions. *ptr is set to point to an |
| * array of TransactionIds. The array is allocated in TopTransactionContext; |
| * the caller should *not* pfree() it (this is a change from pre-8.4 code!). |
| * If there are no subxacts, *ptr is set to NULL. |
| */ |
| int |
| xactGetCommittedChildren(TransactionId **ptr) |
| { |
| TransactionState s = CurrentTransactionState; |
| |
| if (s->nChildXids == 0) |
| *ptr = NULL; |
| else |
| *ptr = s->childXids; |
| |
| return s->nChildXids; |
| } |
| |
| |
| /* |
| * Log the commit record for a plain or twophase transaction commit. |
| * |
| * A 2pc commit will be emitted when twophase_xid is valid, a plain one |
| * otherwise. |
| */ |
| XLogRecPtr |
| XactLogCommitRecord(TimestampTz commit_time, |
| Oid tablespace_oid_to_delete_on_commit, |
| int nsubxacts, TransactionId *subxacts, |
| int nrels, RelFileNodePendingDelete *rels, |
| int nmsgs, SharedInvalidationMessage *msgs, |
| int ndeldbs, DbDirNode *deldbs, |
| bool relcacheInval, |
| int xactflags, TransactionId twophase_xid, |
| const char *twophase_gid) |
| { |
| xl_xact_commit xlrec; |
| xl_xact_xinfo xl_xinfo; |
| xl_xact_dbinfo xl_dbinfo; |
| xl_xact_subxacts xl_subxacts; |
| xl_xact_relfilenodes xl_relfilenodes; |
| xl_xact_invals xl_invals; |
| xl_xact_twophase xl_twophase; |
| xl_xact_origin xl_origin; |
| xl_xact_distrib xl_distrib; |
| xl_xact_deldbs xl_deldbs; |
| XLogRecPtr recptr; |
| bool isDtxPrepared = isPreparedDtxTransaction(); |
| DistributedTransactionId distrib_xid = getDistributedTransactionId(); |
| |
| uint8 info; |
| |
| Assert(CritSectionCount > 0); |
| |
| xl_xinfo.xinfo = 0; |
| |
| /* decide between a plain and 2pc commit */ |
| if (isDtxPrepared) |
| info = XLOG_XACT_DISTRIBUTED_COMMIT; |
| else if (!TransactionIdIsValid(twophase_xid)) |
| info = XLOG_XACT_COMMIT; |
| else |
| info = XLOG_XACT_COMMIT_PREPARED; |
| |
| /* First figure out and collect all the information needed */ |
| |
| xlrec.xact_time = commit_time; |
| xlrec.tablespace_oid_to_delete_on_commit = tablespace_oid_to_delete_on_commit; |
| |
| if (relcacheInval) |
| xl_xinfo.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE; |
| if (forceSyncCommit) |
| xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; |
| if ((xactflags & XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK)) |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_AE_LOCKS; |
| |
| /* |
| * Check if the caller would like to ask standbys for immediate feedback |
| * once this commit is applied. |
| */ |
| if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY) |
| xl_xinfo.xinfo |= XACT_COMPLETION_APPLY_FEEDBACK; |
| |
| /* |
| * Relcache invalidations requires information about the current database |
| * and so does logical decoding. |
| */ |
| if (nmsgs > 0 || XLogLogicalInfoActive()) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO; |
| xl_dbinfo.dbId = MyDatabaseId; |
| xl_dbinfo.tsId = MyDatabaseTableSpace; |
| } |
| |
| if (nsubxacts > 0) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS; |
| xl_subxacts.nsubxacts = nsubxacts; |
| } |
| |
| if (nrels > 0) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES; |
| xl_relfilenodes.nrels = nrels; |
| info |= XLR_SPECIAL_REL_UPDATE; |
| } |
| |
| if (nmsgs > 0) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS; |
| xl_invals.nmsgs = nmsgs; |
| } |
| |
| if (ndeldbs > 0) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_DELDBS; |
| xl_deldbs.ndeldbs = ndeldbs; |
| } |
| |
| if (TransactionIdIsValid(twophase_xid)) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; |
| xl_twophase.xid = twophase_xid; |
| Assert(twophase_gid != NULL); |
| |
| if (XLogLogicalInfoActive()) |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_GID; |
| } |
| |
| /* dump transaction origin information */ |
| if (replorigin_session_origin != InvalidRepOriginId) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; |
| |
| xl_origin.origin_lsn = replorigin_session_origin_lsn; |
| xl_origin.origin_timestamp = replorigin_session_origin_timestamp; |
| } |
| |
| /* include distributed xid if there's one */ |
| if (distrib_xid != InvalidDistributedTransactionId) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_DISTRIB; |
| xl_distrib.distrib_xid = distrib_xid; |
| } |
| |
| #if 0 |
| if (xl_xinfo.xinfo != 0) |
| info |= XLOG_XACT_HAS_INFO; |
| #endif |
| /* Then include all the collected data into the commit record. */ |
| |
| XLogBeginInsert(); |
| |
| XLogRegisterData((char *) (&xlrec), sizeof(xl_xact_commit)); |
| |
| XLogRegisterData((char *) (&xl_xinfo.xinfo), sizeof(xl_xinfo.xinfo)); |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) |
| XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo)); |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) |
| { |
| XLogRegisterData((char *) (&xl_subxacts), |
| MinSizeOfXactSubxacts); |
| XLogRegisterData((char *) subxacts, |
| nsubxacts * sizeof(TransactionId)); |
| } |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES) |
| { |
| XLogRegisterData((char *) (&xl_relfilenodes), |
| MinSizeOfXactRelfilenodes); |
| XLogRegisterData((char *) rels, |
| nrels * sizeof(RelFileNodePendingDelete)); |
| } |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS) |
| { |
| XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals); |
| XLogRegisterData((char *) msgs, |
| nmsgs * sizeof(SharedInvalidationMessage)); |
| } |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_DELDBS) |
| { |
| XLogRegisterData((char *) (&xl_deldbs), MinSizeOfXactDelDbs); |
| XLogRegisterData((char *) deldbs, |
| ndeldbs * sizeof(DbDirNode)); |
| } |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) |
| { |
| XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) |
| XLogRegisterData(unconstify(char *, twophase_gid), strlen(twophase_gid) + 1); |
| } |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) |
| XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_DISTRIB) |
| XLogRegisterData((char *) (&xl_distrib), sizeof(xl_xact_distrib)); |
| |
| /* we allow filtering by xacts */ |
| XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); |
| |
| if (isDtxPrepared) |
| insertingDistributedCommitted(); |
| |
| recptr = XLogInsert(RM_XACT_ID, info); |
| |
| if (isDtxPrepared) |
| insertedDistributedCommitted(); |
| |
| return recptr; |
| } |
| |
| /* |
| * Log the commit record for a plain or twophase transaction abort. |
| * |
| * A 2pc abort will be emitted when twophase_xid is valid, a plain one |
| * otherwise. |
| */ |
| XLogRecPtr |
| XactLogAbortRecord(TimestampTz abort_time, |
| Oid tablespace_oid_to_delete_on_abort, |
| int nsubxacts, TransactionId *subxacts, |
| int nrels, RelFileNodePendingDelete *rels, |
| int ndeldbs, DbDirNode *deldbs, |
| int xactflags, TransactionId twophase_xid, |
| const char *twophase_gid) |
| { |
| xl_xact_abort xlrec; |
| xl_xact_xinfo xl_xinfo; |
| xl_xact_subxacts xl_subxacts; |
| xl_xact_relfilenodes xl_relfilenodes; |
| xl_xact_deldbs xl_deldbs; |
| xl_xact_twophase xl_twophase; |
| xl_xact_dbinfo xl_dbinfo; |
| xl_xact_origin xl_origin; |
| |
| uint8 info; |
| |
| Assert(CritSectionCount > 0); |
| |
| xl_xinfo.xinfo = 0; |
| |
| /* decide between a plain and 2pc abort */ |
| if (!TransactionIdIsValid(twophase_xid)) |
| info = XLOG_XACT_ABORT; |
| else |
| info = XLOG_XACT_ABORT_PREPARED; |
| |
| |
| /* First figure out and collect all the information needed */ |
| |
| xlrec.xact_time = abort_time; |
| xlrec.tablespace_oid_to_delete_on_abort = tablespace_oid_to_delete_on_abort; |
| |
| if ((xactflags & XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK)) |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_AE_LOCKS; |
| |
| if (nsubxacts > 0) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS; |
| xl_subxacts.nsubxacts = nsubxacts; |
| } |
| |
| if (nrels > 0) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES; |
| xl_relfilenodes.nrels = nrels; |
| info |= XLR_SPECIAL_REL_UPDATE; |
| } |
| |
| if (ndeldbs > 0) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_DELDBS; |
| xl_deldbs.ndeldbs = ndeldbs; |
| } |
| |
| if (TransactionIdIsValid(twophase_xid)) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; |
| xl_twophase.xid = twophase_xid; |
| Assert(twophase_gid != NULL); |
| |
| if (XLogLogicalInfoActive()) |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_GID; |
| } |
| |
| if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive()) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO; |
| xl_dbinfo.dbId = MyDatabaseId; |
| xl_dbinfo.tsId = MyDatabaseTableSpace; |
| } |
| |
| /* |
| * Dump transaction origin information only for abort prepared. We need |
| * this during recovery to update the replication origin progress. |
| */ |
| if ((replorigin_session_origin != InvalidRepOriginId) && |
| TransactionIdIsValid(twophase_xid)) |
| { |
| xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; |
| |
| xl_origin.origin_lsn = replorigin_session_origin_lsn; |
| xl_origin.origin_timestamp = replorigin_session_origin_timestamp; |
| } |
| #if 0 |
| if (xl_xinfo.xinfo != 0) |
| info |= XLOG_XACT_HAS_INFO; |
| #endif |
| /* Then include all the collected data into the abort record. */ |
| |
| XLogBeginInsert(); |
| |
| XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort); |
| |
| XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo)); |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) |
| XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo)); |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) |
| { |
| XLogRegisterData((char *) (&xl_subxacts), |
| MinSizeOfXactSubxacts); |
| XLogRegisterData((char *) subxacts, |
| nsubxacts * sizeof(TransactionId)); |
| } |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES) |
| { |
| XLogRegisterData((char *) (&xl_relfilenodes), |
| MinSizeOfXactRelfilenodes); |
| XLogRegisterData((char *) rels, |
| nrels * sizeof(RelFileNodePendingDelete)); |
| } |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_DELDBS) |
| { |
| XLogRegisterData((char *) (&xl_deldbs), |
| MinSizeOfXactDelDbs); |
| XLogRegisterData((char *) deldbs, |
| ndeldbs * sizeof(DbDirNode)); |
| } |
| |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) |
| { |
| XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) |
| XLogRegisterData(unconstify(char *, twophase_gid), strlen(twophase_gid) + 1); |
| } |
| |
| if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) |
| XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); |
| |
| if (TransactionIdIsValid(twophase_xid)) |
| XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); |
| |
| return XLogInsert(RM_XACT_ID, info); |
| } |
| |
| /* |
| * Before 9.0 this was a fairly short function, but now it performs many |
| * actions for which the order of execution is critical. |
| */ |
| static void |
| xact_redo_commit(xl_xact_parsed_commit *parsed, |
| TransactionId xid, |
| XLogRecPtr lsn, |
| RepOriginId origin_id) |
| { |
| TransactionId max_xid; |
| TimestampTz commit_time; |
| Oid tablespace_oid_to_delete = parsed->tablespace_oid_to_delete_on_commit; |
| |
| Assert(TransactionIdIsValid(xid)); |
| |
| max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts); |
| |
| ereportif(OidIsValid(tablespace_oid_to_delete), DEBUG5, |
| (errmsg("in xact_redo_commit_internal with tablespace oid to delete: %u", |
| tablespace_oid_to_delete))); |
| |
| /* Make sure nextXid is beyond any XID mentioned in the record. */ |
| AdvanceNextFullTransactionIdPastXid(max_xid); |
| |
| /* also update distributed commit log */ |
| if (parsed->distribXid != 0) |
| { |
| DistributedLog_SetCommittedTree(xid, parsed->nsubxacts, parsed->subxacts, |
| parsed->distribXid, |
| /* isRedo */ true); |
| } |
| |
| Assert(((parsed->xinfo & XACT_XINFO_HAS_ORIGIN) == 0) == |
| (origin_id == InvalidRepOriginId)); |
| |
| if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) |
| commit_time = parsed->origin_timestamp; |
| else |
| commit_time = parsed->xact_time; |
| |
| /* Set the transaction commit timestamp and metadata */ |
| TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts, |
| commit_time, origin_id); |
| |
| if (standbyState == STANDBY_DISABLED) |
| { |
| /* |
| * Mark the transaction committed in pg_xact. |
| */ |
| TransactionIdCommitTree(xid, parsed->nsubxacts, parsed->subxacts); |
| } |
| else |
| { |
| /* |
| * If a transaction completion record arrives that has as-yet |
| * unobserved subtransactions then this will not have been fully |
| * handled by the call to RecordKnownAssignedTransactionIds() in the |
| * main recovery loop in xlog.c. So we need to do bookkeeping again to |
| * cover that case. This is confusing and it is easy to think this |
| * call is irrelevant, which has happened three times in development |
| * already. Leave it in. |
| */ |
| RecordKnownAssignedTransactionIds(max_xid); |
| |
| /* |
| * Mark the transaction committed in pg_xact. We use async commit |
| * protocol during recovery to provide information on database |
| * consistency for when users try to set hint bits. It is important |
| * that we do not set hint bits until the minRecoveryPoint is past |
| * this commit record. This ensures that if we crash we don't see hint |
| * bits set on changes made by transactions that haven't yet |
| * recovered. It's unlikely but it's good to be safe. |
| */ |
| TransactionIdAsyncCommitTree(xid, parsed->nsubxacts, parsed->subxacts, lsn); |
| |
| /* |
| * We must mark clog before we update the ProcArray. |
| */ |
| ExpireTreeKnownAssignedTransactionIds(xid, parsed->nsubxacts, parsed->subxacts, max_xid); |
| |
| /* |
| * Send any cache invalidations attached to the commit. We must |
| * maintain the same order of invalidation then release locks as |
| * occurs in CommitTransaction(). |
| */ |
| ProcessCommittedInvalidationMessages(parsed->msgs, parsed->nmsgs, |
| XactCompletionRelcacheInitFileInval(parsed->xinfo), |
| parsed->dbId, parsed->tsId); |
| |
| /* |
| * Release locks, if any. We do this for both two phase and normal one |
| * phase transactions. In effect we are ignoring the prepare phase and |
| * just going straight to lock release. |
| */ |
| if (parsed->xinfo & XACT_XINFO_HAS_AE_LOCKS) |
| StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts); |
| } |
| |
| if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) |
| { |
| /* recover apply progress */ |
| replorigin_advance(origin_id, parsed->origin_lsn, lsn, |
| false /* backward */ , false /* WAL */ ); |
| } |
| |
| /* Make sure files supposed to be dropped are dropped */ |
| if (parsed->nrels > 0) |
| { |
| /* |
| * First update minimum recovery point to cover this WAL record. Once |
| * a relation is deleted, there's no going back. The buffer manager |
| * enforces the WAL-first rule for normal updates to relation files, |
| * so that the minimum recovery point is always updated before the |
| * corresponding change in the data file is flushed to disk, but we |
| * have to do the same here since we're bypassing the buffer manager. |
| * |
| * Doing this before deleting the files means that if a deletion fails |
| * for some reason, you cannot start up the system even after restart, |
| * until you fix the underlying situation so that the deletion will |
| * succeed. Alternatively, we could update the minimum recovery point |
| * after deletion, but that would leave a small window where the |
| * WAL-first rule would be violated. |
| */ |
| XLogFlush(lsn); |
| |
| /* Make sure files supposed to be dropped are dropped */ |
| DropRelationFiles(parsed->xnodes, parsed->nrels, true); |
| } |
| |
| if (parsed->ndeldbs > 0) |
| { |
| XLogFlush(lsn); |
| DropDatabaseDirectories(parsed->deldbs, parsed->ndeldbs, true); |
| } |
| |
| DoTablespaceDeletionForRedoXlog(tablespace_oid_to_delete); |
| |
| /* |
| * We issue an XLogFlush() for the same reason we emit ForceSyncCommit() |
| * in normal operation. For example, in CREATE DATABASE, we copy all files |
| * from the template database, and then commit the transaction. If we |
| * crash after all the files have been copied but before the commit, you |
| * have files in the data directory without an entry in pg_database. To |
| * minimize the window for that, we use ForceSyncCommit() to rush the |
| * commit record to disk as quick as possible. We have the same window |
| * during recovery, and forcing an XLogFlush() (which updates |
| * minRecoveryPoint during recovery) helps to reduce that problem window, |
| * for any user that requested ForceSyncCommit(). |
| */ |
| if (XactCompletionForceSyncCommit(parsed->xinfo)) |
| XLogFlush(lsn); |
| |
| /* |
| * If asked by the primary (because someone is waiting for a synchronous |
| * commit = remote_apply), we will need to ask walreceiver to send a reply |
| * immediately. |
| */ |
| if (XactCompletionApplyFeedback(parsed->xinfo)) |
| XLogRequestWalReceiverReply(); |
| } |
| |
| /* |
| * If the xid is valid, call xact_redo_commit with the same args. The xid can |
| * be invalid if the distributed transaction was read-only for the Query |
| * Dispatcher (e.g. a DML operation like INSERT which only writes data on the |
| * Query Executors). Regardless, we end with recording the gxid into |
| * shmCommittedGxidArray which will later be removed by replaying the |
| * corresponding distributed forget record or resolved during promotion during |
| * dtx recovery. |
| */ |
| static void |
| xact_redo_distributed_commit(xl_xact_parsed_commit *parsed, |
| TransactionId xid, |
| XLogRecPtr lsn, |
| RepOriginId origin_id) |
| { |
| if (TransactionIdIsValid(xid)) |
| xact_redo_commit(parsed, xid, lsn, origin_id); |
| |
| redoDistributedCommitRecord(parsed->distribXid); |
| } |
| |
| static void |
| xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid, |
| XLogRecPtr lsn, RepOriginId origin_id) |
| { |
| TransactionId max_xid; |
| |
| Assert(TransactionIdIsValid(xid)); |
| |
| /* Make sure nextXid is beyond any XID mentioned in the record. */ |
| max_xid = TransactionIdLatest(xid, |
| parsed->nsubxacts, |
| parsed->subxacts); |
| AdvanceNextFullTransactionIdPastXid(max_xid); |
| |
| if (standbyState == STANDBY_DISABLED) |
| { |
| /* Mark the transaction aborted in pg_xact, no need for async stuff */ |
| TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts); |
| } |
| else |
| { |
| /* |
| * If a transaction completion record arrives that has as-yet |
| * unobserved subtransactions then this will not have been fully |
| * handled by the call to RecordKnownAssignedTransactionIds() in the |
| * main recovery loop in xlog.c. So we need to do bookkeeping again to |
| * cover that case. This is confusing and it is easy to think this |
| * call is irrelevant, which has happened three times in development |
| * already. Leave it in. |
| */ |
| RecordKnownAssignedTransactionIds(max_xid); |
| |
| /* Mark the transaction aborted in pg_xact, no need for async stuff */ |
| TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts); |
| |
| /* |
| * We must update the ProcArray after we have marked clog. |
| */ |
| ExpireTreeKnownAssignedTransactionIds(xid, parsed->nsubxacts, parsed->subxacts, max_xid); |
| |
| /* |
| * There are no invalidation messages to send or undo. |
| */ |
| |
| /* |
| * Release locks, if any. There are no invalidations to send. |
| */ |
| if (parsed->xinfo & XACT_XINFO_HAS_AE_LOCKS) |
| StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts); |
| } |
| |
| if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) |
| { |
| /* recover apply progress */ |
| replorigin_advance(origin_id, parsed->origin_lsn, lsn, |
| false /* backward */ , false /* WAL */ ); |
| } |
| |
| /* Make sure files supposed to be dropped are dropped */ |
| if (parsed->nrels > 0) |
| { |
| /* |
| * See comments about update of minimum recovery point on truncation, |
| * in xact_redo_commit(). |
| */ |
| XLogFlush(lsn); |
| |
| DropRelationFiles(parsed->xnodes, parsed->nrels, true); |
| } |
| |
| /* |
| * GPDB_14_MERGE_FIXME: Do we need to call XLogFlush before dropping database |
| * and tablespace |
| */ |
| DropDatabaseDirectories(parsed->deldbs, parsed->ndeldbs, true); |
| DoTablespaceDeletionForRedoXlog(parsed->tablespace_oid_to_delete_on_abort); |
| } |
| |
| static void |
| xact_redo_distributed_forget(xl_xact_distributed_forget *xlrec, TransactionId xid pg_attribute_unused() ) |
| { |
| redoDistributedForgetCommitRecord(xlrec->gxid); |
| } |
| |
| |
| void |
| xact_redo(XLogReaderState *record) |
| { |
| uint8 info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK; |
| |
| /* Backup blocks are not used in xact records */ |
| Assert(!XLogRecHasAnyBlockRefs(record)); |
| |
| if (info == XLOG_XACT_COMMIT) |
| { |
| xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); |
| xl_xact_parsed_commit parsed; |
| |
| ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed); |
| xact_redo_commit(&parsed, XLogRecGetXid(record), |
| record->EndRecPtr, XLogRecGetOrigin(record)); |
| } |
| else if (info == XLOG_XACT_COMMIT_PREPARED) |
| { |
| xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); |
| xl_xact_parsed_commit parsed; |
| |
| ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed); |
| xact_redo_commit(&parsed, parsed.twophase_xid, |
| record->EndRecPtr, XLogRecGetOrigin(record)); |
| |
| /* Delete TwoPhaseState gxact entry and/or 2PC file. */ |
| LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); |
| PrepareRedoRemove(parsed.twophase_xid, false); |
| LWLockRelease(TwoPhaseStateLock); |
| } |
| else if (info == XLOG_XACT_ABORT) |
| { |
| xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); |
| xl_xact_parsed_abort parsed; |
| |
| ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); |
| xact_redo_abort(&parsed, XLogRecGetXid(record), |
| record->EndRecPtr, XLogRecGetOrigin(record)); |
| } |
| else if (info == XLOG_XACT_ABORT_PREPARED) |
| { |
| xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); |
| xl_xact_parsed_abort parsed; |
| |
| ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); |
| xact_redo_abort(&parsed, parsed.twophase_xid, |
| record->EndRecPtr, XLogRecGetOrigin(record)); |
| |
| /* Delete TwoPhaseState gxact entry and/or 2PC file. */ |
| LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); |
| PrepareRedoRemove(parsed.twophase_xid, false); |
| LWLockRelease(TwoPhaseStateLock); |
| } |
| else if (info == XLOG_XACT_PREPARE) |
| { |
| /* |
| * Store xid and start/end pointers of the WAL record in TwoPhaseState |
| * gxact entry. |
| */ |
| LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); |
| PrepareRedoAdd(XLogRecGetData(record), |
| record->ReadRecPtr, |
| record->EndRecPtr, |
| XLogRecGetOrigin(record)); |
| LWLockRelease(TwoPhaseStateLock); |
| } |
| else if (info == XLOG_XACT_DISTRIBUTED_COMMIT) |
| { |
| xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); |
| xl_xact_parsed_commit parsed; |
| |
| ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed); |
| Assert(parsed.twophase_xid == InvalidTransactionId); |
| xact_redo_distributed_commit(&parsed, XLogRecGetXid(record), |
| record->EndRecPtr, XLogRecGetOrigin(record)); |
| } |
| else if (info == XLOG_XACT_DISTRIBUTED_FORGET) |
| { |
| xl_xact_distributed_forget *xlrec = (xl_xact_distributed_forget *) XLogRecGetData(record); |
| |
| xact_redo_distributed_forget(xlrec, XLogRecGetXid(record)); |
| } |
| else if (info == XLOG_XACT_ASSIGNMENT) |
| { |
| xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record); |
| |
| if (standbyState >= STANDBY_INITIALIZED) |
| ProcArrayApplyXidAssignment(xlrec->xtop, |
| xlrec->nsubxacts, xlrec->xsub); |
| } |
| else if (info == XLOG_XACT_INVALIDATIONS) |
| { |
| /* |
| * XXX we do ignore this for now, what matters are invalidations |
| * written into the commit record. |
| */ |
| } |
| else |
| elog(PANIC, "xact_redo: unknown op code %u", info); |
| } |
| |
| /* |
| * IsSubTransactionAssignmentPending |
| * |
| * This is used to decide whether we need to WAL log the top-level XID for |
| * operation in a subtransaction. We require that for logical decoding, see |
| * LogicalDecodingProcessRecord. |
| * |
| * This returns true if wal_level >= logical and we are inside a valid |
| * subtransaction, for which the assignment was not yet written to any WAL |
| * record. |
| */ |
| bool |
| IsSubTransactionAssignmentPending(void) |
| { |
| /* wal_level has to be logical */ |
| if (!XLogLogicalInfoActive()) |
| return false; |
| |
| /* we need to be in a transaction state */ |
| if (!IsTransactionState()) |
| return false; |
| |
| /* it has to be a subtransaction */ |
| if (!IsSubTransaction()) |
| return false; |
| |
| /* the subtransaction has to have a XID assigned */ |
| if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) |
| return false; |
| |
| /* and it should not be already 'assigned' */ |
| return !CurrentTransactionState->assigned; |
| } |
| |
| /* |
| * MarkSubTransactionAssigned |
| * |
| * Mark the subtransaction assignment as completed. |
| */ |
| void |
| MarkSubTransactionAssigned(void) |
| { |
| Assert(IsSubTransactionAssignmentPending()); |
| |
| CurrentTransactionState->assigned = true; |
| } |