| /*------------------------------------------------------------------------- |
| * |
| * snapmgr.c |
| * PostgreSQL snapshot manager |
| * |
| * We keep track of snapshots in two ways: those "registered" by resowner.c, |
| * and the "active snapshot" stack. All snapshots in either of them live in |
| * persistent memory. When a snapshot is no longer in any of these lists |
| * (tracked by separate refcounts on each snapshot), its memory can be freed. |
| * |
| * The FirstXactSnapshot, if any, is treated a bit specially: we increment its |
| * regd_count and list it in RegisteredSnapshots, but this reference is not |
| * tracked by a resource owner. We used to use the TopTransactionResourceOwner |
| * to track this snapshot reference, but that introduces logical circularity |
| * and thus makes it impossible to clean up in a sane fashion. It's better to |
| * handle this reference as an internally-tracked registration, so that this |
| * module is entirely lower-level than ResourceOwners. |
| * |
| * Likewise, any snapshots that have been exported by pg_export_snapshot |
| * have regd_count = 1 and are listed in RegisteredSnapshots, but are not |
| * tracked by any resource owner. |
| * |
| * Likewise, the CatalogSnapshot is listed in RegisteredSnapshots when it |
| * is valid, but is not tracked by any resource owner. |
| * |
| * The same is true for historic snapshots used during logical decoding, |
| * their lifetime is managed separately (as they live longer than one xact.c |
| * transaction). |
| * |
| * These arrangements let us reset MyProc->xmin when there are no snapshots |
| * referenced by this transaction, and advance it when the one with oldest |
| * Xmin is no longer referenced. For simplicity however, only registered |
| * snapshots not active snapshots participate in tracking which one is oldest; |
| * we don't try to change MyProc->xmin except when the active-snapshot |
| * stack is empty. |
| * |
| * |
| * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * IDENTIFICATION |
| * src/backend/utils/time/snapmgr.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| |
| #include <sys/stat.h> |
| #include <unistd.h> |
| |
| #include "access/subtrans.h" |
| #include "access/transam.h" |
| #include "access/xact.h" |
| #include "access/xlog.h" |
| #include "catalog/catalog.h" |
| #include "datatype/timestamp.h" |
| #include "lib/pairingheap.h" |
| #include "miscadmin.h" |
| #include "storage/predicate.h" |
| #include "storage/proc.h" |
| #include "storage/procarray.h" |
| #include "storage/sinval.h" |
| #include "storage/sinvaladt.h" |
| #include "storage/spin.h" |
| #include "utils/builtins.h" |
| #include "utils/guc.h" |
| #include "utils/memutils.h" |
| #include "utils/old_snapshot.h" |
| #include "utils/rel.h" |
| #include "utils/resowner_private.h" |
| #include "utils/sharedsnapshot.h" |
| #include "utils/snapmgr.h" |
| #include "utils/syscache.h" |
| #include "utils/timestamp.h" |
| |
| #include "cdb/cdbdisp_query.h" |
| #include "cdb/cdbdispatchresult.h" |
| #include "cdb/cdbtm.h" |
| #include "cdb/cdbvars.h" |
| #include "utils/guc.h" |
| |
| |
| /* |
| * GUC parameters |
| */ |
| int old_snapshot_threshold; /* number of minutes, -1 disables */ |
| |
| volatile OldSnapshotControlData *oldSnapshotControl; |
| |
| |
| /* |
| * CurrentSnapshot points to the only snapshot taken in transaction-snapshot |
| * mode, and to the latest one taken in a read-committed transaction. |
| * SecondarySnapshot is a snapshot that's always up-to-date as of the current |
| * instant, even in transaction-snapshot mode. It should only be used for |
| * special-purpose code (say, RI checking.) CatalogSnapshot points to an |
| * MVCC snapshot intended to be used for catalog scans; we must invalidate it |
| * whenever a system catalog change occurs. |
| * |
| * These SnapshotData structs are static to simplify memory allocation |
| * (see the hack in GetSnapshotData to avoid repeated malloc/free). |
| */ |
| static SnapshotData CurrentSnapshotData = {SNAPSHOT_MVCC}; |
| static SnapshotData SecondarySnapshotData = {SNAPSHOT_MVCC}; |
| SnapshotData CatalogSnapshotData = {SNAPSHOT_MVCC}; |
| SnapshotData SnapshotSelfData = {SNAPSHOT_SELF}; |
| SnapshotData SnapshotAnyData = {SNAPSHOT_ANY}; |
| |
| /* Pointers to valid snapshots */ |
| static Snapshot CurrentSnapshot = NULL; |
| static Snapshot SecondarySnapshot = NULL; |
| static Snapshot CatalogSnapshot = NULL; |
| static Snapshot HistoricSnapshot = NULL; |
| |
| /* |
| * These are updated by GetSnapshotData. We initialize them this way |
| * for the convenience of TransactionIdIsInProgress: even in bootstrap |
| * mode, we don't want it to say that BootstrapTransactionId is in progress. |
| */ |
| TransactionId TransactionXmin = FirstNormalTransactionId; |
| TransactionId RecentXmin = FirstNormalTransactionId; |
| |
| /* (table, ctid) => (cmin, cmax) mapping during timetravel */ |
| static HTAB *tuplecid_data = NULL; |
| |
| /* |
| * Elements of the active snapshot stack. |
| * |
| * Each element here accounts for exactly one active_count on SnapshotData. |
| * |
| * NB: the code assumes that elements in this list are in non-increasing |
| * order of as_level; also, the list must be NULL-terminated. |
| */ |
| typedef struct ActiveSnapshotElt |
| { |
| Snapshot as_snap; |
| int as_level; |
| struct ActiveSnapshotElt *as_next; |
| } ActiveSnapshotElt; |
| |
| /* Top of the stack of active snapshots */ |
| static ActiveSnapshotElt *ActiveSnapshot = NULL; |
| |
| /* Bottom of the stack of active snapshots */ |
| static ActiveSnapshotElt *OldestActiveSnapshot = NULL; |
| |
| /* |
| * Currently registered Snapshots. Ordered in a heap by xmin, so that we can |
| * quickly find the one with lowest xmin, to advance our MyProc->xmin. |
| */ |
| static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, |
| void *arg); |
| |
| static pairingheap RegisteredSnapshots = {&xmin_cmp, NULL, NULL}; |
| |
| /* first GetTransactionSnapshot call in a transaction? */ |
| bool FirstSnapshotSet = false; |
| |
| /* |
| * Remember the serializable transaction snapshot, if any. We cannot trust |
| * FirstSnapshotSet in combination with IsolationUsesXactSnapshot(), because |
| * GUC may be reset before us, changing the value of IsolationUsesXactSnapshot. |
| */ |
| static Snapshot FirstXactSnapshot = NULL; |
| |
| /* Define pathname of exported-snapshot files */ |
| #define SNAPSHOT_EXPORT_DIR "pg_snapshots" |
| |
| /* Structure holding info about exported snapshot. */ |
| typedef struct ExportedSnapshot |
| { |
| char *snapfile; |
| Snapshot snapshot; |
| } ExportedSnapshot; |
| |
| /* Current xact's exported snapshots (a list of ExportedSnapshot structs) */ |
| static List *exportedSnapshots = NIL; |
| |
| /* Prototypes for local functions */ |
| static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts); |
| static Snapshot CopySnapshot(Snapshot snapshot); |
| static void FreeSnapshot(Snapshot snapshot); |
| static void SnapshotResetXmin(void); |
| |
| /* |
| * Snapshot fields to be serialized. |
| * |
| * Only these fields need to be sent to the cooperating backend; the |
| * remaining ones can (and must) be set by the receiver upon restore. |
| */ |
| typedef struct SerializedSnapshotData |
| { |
| TransactionId xmin; |
| TransactionId xmax; |
| uint32 xcnt; |
| int32 subxcnt; |
| bool suboverflowed; |
| bool takenDuringRecovery; |
| CommandId curcid; |
| TimestampTz whenTaken; |
| XLogRecPtr lsn; |
| |
| bool haveDistribSnapshot; |
| |
| /* for cdb distribute snapshot */ |
| TransactionId minCachedLocalXid; |
| TransactionId maxCachedLocalXid; |
| int32 currentLocalXidsCount; |
| |
| DistributedTransactionId ds_xminAllDistributedSnapshots; |
| DistributedSnapshotId ds_distribSnapshotId; |
| DistributedTransactionId ds_xmin; |
| DistributedTransactionId ds_xmax; |
| int32 ds_count; |
| } SerializedSnapshotData; |
| |
| Size |
| SnapMgrShmemSize(void) |
| { |
| Size size; |
| |
| size = offsetof(OldSnapshotControlData, xid_by_minute); |
| if (old_snapshot_threshold > 0) |
| size = add_size(size, mul_size(sizeof(TransactionId), |
| OLD_SNAPSHOT_TIME_MAP_ENTRIES)); |
| |
| return size; |
| } |
| |
| /* |
| * Initialize for managing old snapshot detection. |
| */ |
| void |
| SnapMgrInit(void) |
| { |
| bool found; |
| |
| /* |
| * Create or attach to the OldSnapshotControlData structure. |
| */ |
| oldSnapshotControl = (volatile OldSnapshotControlData *) |
| ShmemInitStruct("OldSnapshotControlData", |
| SnapMgrShmemSize(), &found); |
| |
| if (!found) |
| { |
| SpinLockInit(&oldSnapshotControl->mutex_current); |
| oldSnapshotControl->current_timestamp = 0; |
| SpinLockInit(&oldSnapshotControl->mutex_latest_xmin); |
| oldSnapshotControl->latest_xmin = InvalidTransactionId; |
| oldSnapshotControl->next_map_update = 0; |
| SpinLockInit(&oldSnapshotControl->mutex_threshold); |
| oldSnapshotControl->threshold_timestamp = 0; |
| oldSnapshotControl->threshold_xid = InvalidTransactionId; |
| oldSnapshotControl->head_offset = 0; |
| oldSnapshotControl->head_timestamp = 0; |
| oldSnapshotControl->count_used = 0; |
| } |
| } |
| |
| /* |
| * GetTransactionSnapshot |
| * Get the appropriate snapshot for a new query in a transaction. |
| * |
| * Note that the return value may point at static storage that will be modified |
| * by future calls and by CommandCounterIncrement(). Callers should call |
| * RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be |
| * used very long. |
| */ |
| Snapshot |
| GetTransactionSnapshot(void) |
| { |
| /* |
| * Return historic snapshot if doing logical decoding. We'll never need a |
| * non-historic transaction snapshot in this (sub-)transaction, so there's |
| * no need to be careful to set one up for later calls to |
| * GetTransactionSnapshot(). |
| */ |
| if (HistoricSnapshotActive()) |
| { |
| Assert(!FirstSnapshotSet); |
| return HistoricSnapshot; |
| } |
| |
| /* First call in transaction? */ |
| if (!FirstSnapshotSet) |
| { |
| /* |
| * Don't allow catalog snapshot to be older than xact snapshot. Must |
| * do this first to allow the empty-heap Assert to succeed. |
| */ |
| InvalidateCatalogSnapshot(); |
| |
| Assert(pairingheap_is_empty(&RegisteredSnapshots)); |
| Assert(FirstXactSnapshot == NULL); |
| |
| if (IsInParallelMode()) |
| elog(ERROR, |
| "cannot take query snapshot during a parallel operation"); |
| |
| /* |
| * In transaction-snapshot mode, the first snapshot must live until |
| * end of xact regardless of what the caller does with it, so we must |
| * make a copy of it rather than returning CurrentSnapshotData |
| * directly. Furthermore, if we're running in serializable mode, |
| * predicate.c needs to wrap the snapshot fetch in its own processing. |
| */ |
| if (IsolationUsesXactSnapshot()) |
| { |
| /* First, create the snapshot in CurrentSnapshotData */ |
| if (IsolationIsSerializable()) |
| CurrentSnapshot = GetSerializableTransactionSnapshot(&CurrentSnapshotData); |
| else |
| CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData, DistributedTransactionContext); |
| /* Make a saved copy */ |
| CurrentSnapshot = CopySnapshot(CurrentSnapshot); |
| FirstXactSnapshot = CurrentSnapshot; |
| /* Mark it as "registered" in FirstXactSnapshot */ |
| FirstXactSnapshot->regd_count++; |
| pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node); |
| } |
| else |
| CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData, DistributedTransactionContext); |
| |
| FirstSnapshotSet = true; |
| return CurrentSnapshot; |
| } |
| |
| if (IsolationUsesXactSnapshot()) |
| { |
| elog((Debug_print_snapshot_dtm ? LOG : DEBUG5), |
| "[Distributed Snapshot #%u] *Serializable* (gxid = "UINT64_FORMAT", '%s')", |
| CurrentSnapshot->distribSnapshotWithLocalMapping.ds.distribSnapshotId, |
| getDistributedTransactionId(), |
| DtxContextToString(DistributedTransactionContext)); |
| |
| UpdateCommandIdInSnapshot(CurrentSnapshot->curcid); |
| |
| return CurrentSnapshot; |
| } |
| |
| /* Don't allow catalog snapshot to be older than xact snapshot. */ |
| InvalidateCatalogSnapshot(); |
| |
| CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData, DistributedTransactionContext); |
| |
| elog((Debug_print_snapshot_dtm ? LOG : DEBUG5), |
| "[Distributed Snapshot #%u] (gxid = "UINT64_FORMAT", '%s')", |
| CurrentSnapshot->distribSnapshotWithLocalMapping.ds.distribSnapshotId, |
| getDistributedTransactionId(), |
| DtxContextToString(DistributedTransactionContext)); |
| |
| return CurrentSnapshot; |
| } |
| |
| /* |
| * GetLatestSnapshot |
| * Get a snapshot that is up-to-date as of the current instant, |
| * even if we are executing in transaction-snapshot mode. |
| */ |
| Snapshot |
| GetLatestSnapshot(void) |
| { |
| DtxContext dtxctx; |
| /* |
| * We might be able to relax this, but nothing that could otherwise work |
| * needs it. |
| */ |
| if (IsInParallelMode()) |
| elog(ERROR, |
| "cannot update SecondarySnapshot during a parallel operation"); |
| |
| /* |
| * So far there are no cases requiring support for GetLatestSnapshot() |
| * during logical decoding, but it wouldn't be hard to add if required. |
| */ |
| Assert(!HistoricSnapshotActive()); |
| |
| /* If first call in transaction, go ahead and set the xact snapshot */ |
| if (!FirstSnapshotSet) |
| return GetTransactionSnapshot(); |
| |
| /* |
| * Cloudberry specific behavior |
| * On QEs, we cannot create a latest global snapshot. However, this function |
| * is called mainly in executor, for example some alter table statements that |
| * need to rewrite a new heap will invoke this and scan the old heap via the |
| * latest snapshot. But distributed snapshot can only be created in QD, and |
| * QEs can only set the distributed snapshot from QD through Dispatch. So |
| * we always return the latest local snapshot in this function when in QD. |
| * Sometimes in QD we have to get latest snapshot with distributed snapshot |
| * and then dispatch it to QEs, a typical example is ATExecExpandTableCTAS. |
| * ATExecExpandTableCTAS and ATExecSetDistributedBy functions are implemented |
| * as:nn |
| * 1. build a query CTAS that scan the old table into a new table in QD |
| * 2. ExecutorStart, ExecutorRun, ExecutorEnd the above query |
| * So they have to use the latest snapshot to scan the old table no matter |
| * what is the isolation level of the transaction. |
| * |
| * See github issue: https://github.com/greenplum-db/gpdb/issues/10216 |
| */ |
| dtxctx = Gp_role == GP_ROLE_DISPATCH ? DistributedTransactionContext : DTX_CONTEXT_LOCAL_ONLY; |
| SecondarySnapshot = GetSnapshotData(&SecondarySnapshotData, dtxctx); |
| |
| return SecondarySnapshot; |
| } |
| |
| /* |
| * GetOldestSnapshot |
| * |
| * Get the transaction's oldest known snapshot, as judged by the LSN. |
| * Will return NULL if there are no active or registered snapshots. |
| */ |
| Snapshot |
| GetOldestSnapshot(void) |
| { |
| Snapshot OldestRegisteredSnapshot = NULL; |
| XLogRecPtr RegisteredLSN = InvalidXLogRecPtr; |
| |
| if (!pairingheap_is_empty(&RegisteredSnapshots)) |
| { |
| OldestRegisteredSnapshot = pairingheap_container(SnapshotData, ph_node, |
| pairingheap_first(&RegisteredSnapshots)); |
| RegisteredLSN = OldestRegisteredSnapshot->lsn; |
| } |
| |
| if (OldestActiveSnapshot != NULL) |
| { |
| XLogRecPtr ActiveLSN = OldestActiveSnapshot->as_snap->lsn; |
| |
| if (XLogRecPtrIsInvalid(RegisteredLSN) || RegisteredLSN > ActiveLSN) |
| return OldestActiveSnapshot->as_snap; |
| } |
| |
| return OldestRegisteredSnapshot; |
| } |
| |
| /* |
| * GetCatalogSnapshot |
| * Get a snapshot that is sufficiently up-to-date for scan of the |
| * system catalog with the specified OID. |
| */ |
| Snapshot |
| GetCatalogSnapshot(Oid relid) |
| { |
| /* |
| * Return historic snapshot while we're doing logical decoding, so we can |
| * see the appropriate state of the catalog. |
| * |
| * This is the primary reason for needing to reset the system caches after |
| * finishing decoding. |
| */ |
| if (HistoricSnapshotActive()) |
| return HistoricSnapshot; |
| |
| return GetNonHistoricCatalogSnapshot(relid, DTX_CONTEXT_LOCAL_ONLY); |
| } |
| |
| /* |
| * GetNonHistoricCatalogSnapshot |
| * Get a snapshot that is sufficiently up-to-date for scan of the system |
| * catalog with the specified OID, even while historic snapshots are set |
| * up. |
| */ |
| Snapshot |
| GetNonHistoricCatalogSnapshot(Oid relid, DtxContext distributedTransactionContext) |
| { |
| /* |
| * If the caller is trying to scan a relation that has no syscache, no |
| * catcache invalidations will be sent when it is updated. For a few key |
| * relations, snapshot invalidations are sent instead. If we're trying to |
| * scan a relation for which neither catcache nor snapshot invalidations |
| * are sent, we must refresh the snapshot every time. |
| */ |
| if (CatalogSnapshot && |
| !RelationInvalidatesSnapshotsOnly(relid) && |
| !RelationHasSysCache(relid)) |
| InvalidateCatalogSnapshot(); |
| |
| if (CatalogSnapshot == NULL) |
| { |
| /* Get new snapshot. */ |
| CatalogSnapshot = GetSnapshotData( |
| &CatalogSnapshotData, |
| distributedTransactionContext); |
| |
| /* |
| * Make sure the catalog snapshot will be accounted for in decisions |
| * about advancing PGPROC->xmin. We could apply RegisterSnapshot, but |
| * that would result in making a physical copy, which is overkill; and |
| * it would also create a dependency on some resource owner, which we |
| * do not want for reasons explained at the head of this file. Instead |
| * just shove the CatalogSnapshot into the pairing heap manually. This |
| * has to be reversed in InvalidateCatalogSnapshot, of course. |
| * |
| * NB: it had better be impossible for this to throw error, since the |
| * CatalogSnapshot pointer is already valid. |
| */ |
| pairingheap_add(&RegisteredSnapshots, &CatalogSnapshot->ph_node); |
| } |
| |
| return CatalogSnapshot; |
| } |
| |
| /* |
| * InvalidateCatalogSnapshot |
| * Mark the current catalog snapshot, if any, as invalid |
| * |
| * We could change this API to allow the caller to provide more fine-grained |
| * invalidation details, so that a change to relation A wouldn't prevent us |
| * from using our cached snapshot to scan relation B, but so far there's no |
| * evidence that the CPU cycles we spent tracking such fine details would be |
| * well-spent. |
| */ |
| void |
| InvalidateCatalogSnapshot(void) |
| { |
| if (CatalogSnapshot) |
| { |
| pairingheap_remove(&RegisteredSnapshots, &CatalogSnapshot->ph_node); |
| CatalogSnapshot = NULL; |
| SnapshotResetXmin(); |
| } |
| } |
| |
| /* |
| * InvalidateCatalogSnapshotConditionally |
| * Drop catalog snapshot if it's the only one we have |
| * |
| * This is called when we are about to wait for client input, so we don't |
| * want to continue holding the catalog snapshot if it might mean that the |
| * global xmin horizon can't advance. However, if there are other snapshots |
| * still active or registered, the catalog snapshot isn't likely to be the |
| * oldest one, so we might as well keep it. |
| */ |
| void |
| InvalidateCatalogSnapshotConditionally(void) |
| { |
| if (CatalogSnapshot && |
| ActiveSnapshot == NULL && |
| pairingheap_is_singular(&RegisteredSnapshots)) |
| InvalidateCatalogSnapshot(); |
| } |
| |
| /* |
| * SnapshotSetCommandId |
| * Propagate CommandCounterIncrement into the static snapshots, if set |
| */ |
| void |
| SnapshotSetCommandId(CommandId curcid) |
| { |
| if (!FirstSnapshotSet) |
| return; |
| |
| if (CurrentSnapshot) |
| CurrentSnapshot->curcid = curcid; |
| if (SecondarySnapshot) |
| SecondarySnapshot->curcid = curcid; |
| /* Should we do the same with CatalogSnapshot? */ |
| } |
| |
| /* |
| * SetTransactionSnapshot |
| * Set the transaction's snapshot from an imported MVCC snapshot. |
| * |
| * Note that this is very closely tied to GetTransactionSnapshot --- it |
| * must take care of all the same considerations as the first-snapshot case |
| * in GetTransactionSnapshot. |
| */ |
| static void |
| SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid, |
| int sourcepid, PGPROC *sourceproc) |
| { |
| /* Caller should have checked this already */ |
| Assert(!FirstSnapshotSet); |
| |
| /* Better do this to ensure following Assert succeeds. */ |
| InvalidateCatalogSnapshot(); |
| |
| Assert(pairingheap_is_empty(&RegisteredSnapshots)); |
| Assert(FirstXactSnapshot == NULL); |
| Assert(!HistoricSnapshotActive()); |
| |
| /* |
| * Even though we are not going to use the snapshot it computes, we must |
| * call GetSnapshotData, for two reasons: (1) to be sure that |
| * CurrentSnapshotData's XID arrays have been allocated, and (2) to update |
| * the state for GlobalVis*. |
| */ |
| |
| /* |
| * GPDB: If the source snapshot already has a distributed snapshot, pass in |
| * DTX_CONTEXT_LOCAL_ONLY to GetSnapshotData(). This prevents a new |
| * distributed snapshot from being created in GetSnapshotData() and ensures |
| * that we can use the distributed snapshot from the source snapshot below. |
| */ |
| if (sourcesnap->haveDistribSnapshot) |
| CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData, DTX_CONTEXT_LOCAL_ONLY); |
| else |
| CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData, DistributedTransactionContext); |
| /* |
| * Now copy appropriate fields from the source snapshot. |
| */ |
| CurrentSnapshot->xmin = sourcesnap->xmin; |
| CurrentSnapshot->xmax = sourcesnap->xmax; |
| CurrentSnapshot->xcnt = sourcesnap->xcnt; |
| Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount()); |
| if (sourcesnap->xcnt > 0) |
| memcpy(CurrentSnapshot->xip, sourcesnap->xip, |
| sourcesnap->xcnt * sizeof(TransactionId)); |
| CurrentSnapshot->subxcnt = sourcesnap->subxcnt; |
| Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount()); |
| if (sourcesnap->subxcnt > 0) |
| memcpy(CurrentSnapshot->subxip, sourcesnap->subxip, |
| sourcesnap->subxcnt * sizeof(TransactionId)); |
| CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed; |
| CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery; |
| |
| /* |
| * GPDB: Copy over distributed snapshot if present. |
| */ |
| if (sourcesnap->haveDistribSnapshot) |
| { |
| CurrentSnapshot->haveDistribSnapshot = true; |
| DistributedSnapshot_Copy(&CurrentSnapshot->distribSnapshotWithLocalMapping.ds, |
| &sourcesnap->distribSnapshotWithLocalMapping.ds); |
| } |
| /* NB: curcid should NOT be copied, it's a local matter */ |
| |
| CurrentSnapshot->snapXactCompletionCount = 0; |
| |
| /* |
| * Now we have to fix what GetSnapshotData did with MyProc->xmin and |
| * TransactionXmin. There is a race condition: to make sure we are not |
| * causing the global xmin to go backwards, we have to test that the |
| * source transaction is still running, and that has to be done |
| * atomically. So let procarray.c do it. |
| * |
| * Note: in serializable mode, predicate.c will do this a second time. It |
| * doesn't seem worth contorting the logic here to avoid two calls, |
| * especially since it's not clear that predicate.c *must* do this. |
| */ |
| if (sourceproc != NULL) |
| { |
| if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc)) |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("could not import the requested snapshot"), |
| errdetail("The source transaction is not running anymore."))); |
| } |
| else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid)) |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("could not import the requested snapshot"), |
| errdetail("The source process with PID %d is not running anymore.", |
| sourcepid))); |
| |
| /* |
| * In transaction-snapshot mode, the first snapshot must live until end of |
| * xact, so we must make a copy of it. Furthermore, if we're running in |
| * serializable mode, predicate.c needs to do its own processing. |
| */ |
| if (IsolationUsesXactSnapshot()) |
| { |
| if (IsolationIsSerializable()) |
| SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid, |
| sourcepid); |
| /* Make a saved copy */ |
| CurrentSnapshot = CopySnapshot(CurrentSnapshot); |
| FirstXactSnapshot = CurrentSnapshot; |
| /* Mark it as "registered" in FirstXactSnapshot */ |
| FirstXactSnapshot->regd_count++; |
| pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node); |
| } |
| |
| FirstSnapshotSet = true; |
| } |
| |
| /* |
| * CopySnapshot |
| * Copy the given snapshot. |
| * |
| * The copy is palloc'd in TopTransactionContext and has initial refcounts set |
| * to 0. The returned snapshot has the copied flag set. |
| */ |
| static Snapshot |
| CopySnapshot(Snapshot snapshot) |
| { |
| Snapshot newsnap; |
| Size subxipoff; |
| Size dsoff = 0; |
| Size dslmoff = 0; |
| Size size; |
| |
| Assert(snapshot != InvalidSnapshot); |
| |
| if (!IsMVCCSnapshot(snapshot)) |
| return snapshot; |
| |
| /* We allocate any XID arrays needed in the same palloc block. */ |
| size = subxipoff = sizeof(SnapshotData) + |
| snapshot->xcnt * sizeof(TransactionId); |
| if (snapshot->subxcnt > 0) |
| size += snapshot->subxcnt * sizeof(TransactionId); |
| dslmoff = dsoff = size; |
| |
| if (snapshot->haveDistribSnapshot && |
| snapshot->distribSnapshotWithLocalMapping.ds.count > 0) |
| { |
| size += snapshot->distribSnapshotWithLocalMapping.ds.count * |
| sizeof(DistributedTransactionId); |
| dslmoff = size; |
| size += snapshot->distribSnapshotWithLocalMapping.ds.count * |
| sizeof(TransactionId); |
| } |
| |
| newsnap = (Snapshot) MemoryContextAlloc(TopTransactionContext, size); |
| memcpy(newsnap, snapshot, sizeof(SnapshotData)); |
| |
| newsnap->regd_count = 0; |
| newsnap->active_count = 0; |
| newsnap->copied = true; |
| newsnap->snapXactCompletionCount = 0; |
| |
| /* setup XID array */ |
| if (snapshot->xcnt > 0) |
| { |
| newsnap->xip = (TransactionId *) (newsnap + 1); |
| memcpy(newsnap->xip, snapshot->xip, |
| snapshot->xcnt * sizeof(TransactionId)); |
| } |
| else |
| newsnap->xip = NULL; |
| |
| /* |
| * Setup subXID array. Don't bother to copy it if it had overflowed, |
| * though, because it's not used anywhere in that case. Except if it's a |
| * snapshot taken during recovery; all the top-level XIDs are in subxip as |
| * well in that case, so we mustn't lose them. |
| */ |
| if (snapshot->subxcnt > 0 && |
| (!snapshot->suboverflowed || snapshot->takenDuringRecovery)) |
| { |
| newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff); |
| memcpy(newsnap->subxip, snapshot->subxip, |
| snapshot->subxcnt * sizeof(TransactionId)); |
| } |
| else |
| newsnap->subxip = NULL; |
| |
| newsnap->distribSnapshotWithLocalMapping.ds.inProgressXidArray = NULL; |
| newsnap->distribSnapshotWithLocalMapping.inProgressMappedLocalXids = NULL; |
| if (snapshot->haveDistribSnapshot && |
| snapshot->distribSnapshotWithLocalMapping.ds.count > 0) |
| { |
| newsnap->distribSnapshotWithLocalMapping.ds.inProgressXidArray = |
| (DistributedTransactionId*) ((char *) newsnap + dsoff); |
| newsnap->distribSnapshotWithLocalMapping.inProgressMappedLocalXids = |
| (TransactionId*) ((char *) newsnap + dslmoff); |
| |
| memcpy(newsnap->distribSnapshotWithLocalMapping.ds.inProgressXidArray, |
| snapshot->distribSnapshotWithLocalMapping.ds.inProgressXidArray, |
| snapshot->distribSnapshotWithLocalMapping.ds.count * |
| sizeof(DistributedTransactionId)); |
| |
| if (snapshot->distribSnapshotWithLocalMapping.currentLocalXidsCount > 0) |
| { |
| Assert (!IS_QUERY_DISPATCHER()); |
| Assert(snapshot->distribSnapshotWithLocalMapping.currentLocalXidsCount <= |
| snapshot->distribSnapshotWithLocalMapping.ds.count); |
| memcpy(newsnap->distribSnapshotWithLocalMapping.inProgressMappedLocalXids, |
| snapshot->distribSnapshotWithLocalMapping.inProgressMappedLocalXids, |
| snapshot->distribSnapshotWithLocalMapping.currentLocalXidsCount * |
| sizeof(TransactionId)); |
| } |
| } |
| |
| return newsnap; |
| } |
| |
| /* |
| * FreeSnapshot |
| * Free the memory associated with a snapshot. |
| */ |
| static void |
| FreeSnapshot(Snapshot snapshot) |
| { |
| if (!IsMVCCSnapshot(snapshot)) |
| return; |
| |
| Assert(snapshot->regd_count == 0); |
| Assert(snapshot->active_count == 0); |
| Assert(snapshot->copied); |
| |
| pfree(snapshot); |
| } |
| |
| /* |
| * PushActiveSnapshot |
| * Set the given snapshot as the current active snapshot |
| * |
| * If the passed snapshot is a statically-allocated one, or it is possibly |
| * subject to a future command counter update, create a new long-lived copy |
| * with active refcount=1. Otherwise, only increment the refcount. |
| */ |
| void |
| PushActiveSnapshot(Snapshot snap) |
| { |
| PushActiveSnapshotWithLevel(snap, GetCurrentTransactionNestLevel()); |
| } |
| |
| /* |
| * PushActiveSnapshotWithLevel |
| * Set the given snapshot as the current active snapshot |
| * |
| * Same as PushActiveSnapshot except that caller can specify the |
| * transaction nesting level that "owns" the snapshot. This level |
| * must not be deeper than the current top of the snapshot stack. |
| */ |
| void |
| PushActiveSnapshotWithLevel(Snapshot snap, int snap_level) |
| { |
| ActiveSnapshotElt *newactive; |
| |
| Assert(snap != InvalidSnapshot); |
| Assert(ActiveSnapshot == NULL || snap_level >= ActiveSnapshot->as_level); |
| |
| newactive = MemoryContextAlloc(TopTransactionContext, sizeof(ActiveSnapshotElt)); |
| |
| /* |
| * Checking SecondarySnapshot is probably useless here, but it seems |
| * better to be sure. |
| */ |
| if (snap == CurrentSnapshot || snap == SecondarySnapshot || !snap->copied) |
| newactive->as_snap = CopySnapshot(snap); |
| else |
| newactive->as_snap = snap; |
| |
| newactive->as_next = ActiveSnapshot; |
| newactive->as_level = snap_level; |
| |
| newactive->as_snap->active_count++; |
| |
| ActiveSnapshot = newactive; |
| if (OldestActiveSnapshot == NULL) |
| OldestActiveSnapshot = ActiveSnapshot; |
| } |
| |
| /* |
| * PushCopiedSnapshot |
| * As above, except forcibly copy the presented snapshot. |
| * |
| * This should be used when the ActiveSnapshot has to be modifiable, for |
| * example if the caller intends to call UpdateActiveSnapshotCommandId. |
| * The new snapshot will be released when popped from the stack. |
| */ |
| void |
| PushCopiedSnapshot(Snapshot snapshot) |
| { |
| PushActiveSnapshot(CopySnapshot(snapshot)); |
| } |
| |
| /* |
| * UpdateActiveSnapshotCommandId |
| * |
| * Update the current CID of the active snapshot. This can only be applied |
| * to a snapshot that is not referenced elsewhere. |
| */ |
| void |
| UpdateActiveSnapshotCommandId(void) |
| { |
| CommandId save_curcid, |
| curcid; |
| |
| Assert(ActiveSnapshot != NULL); |
| Assert(ActiveSnapshot->as_snap->active_count == 1); |
| Assert(ActiveSnapshot->as_snap->regd_count == 0); |
| |
| /* |
| * Don't allow modification of the active snapshot during parallel |
| * operation. We share the snapshot to worker backends at the beginning |
| * of parallel operation, so any change to the snapshot can lead to |
| * inconsistencies. We have other defenses against |
| * CommandCounterIncrement, but there are a few places that call this |
| * directly, so we put an additional guard here. |
| */ |
| save_curcid = ActiveSnapshot->as_snap->curcid; |
| curcid = GetCurrentCommandId(false); |
| if (IsInParallelMode() && save_curcid != curcid) |
| elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation"); |
| ActiveSnapshot->as_snap->curcid = curcid; |
| } |
| |
| /* |
| * PopActiveSnapshot |
| * |
| * Remove the topmost snapshot from the active snapshot stack, decrementing the |
| * reference count, and free it if this was the last reference. |
| */ |
| void |
| PopActiveSnapshot(void) |
| { |
| ActiveSnapshotElt *newstack; |
| |
| newstack = ActiveSnapshot->as_next; |
| |
| Assert(ActiveSnapshot->as_snap->active_count > 0); |
| |
| ActiveSnapshot->as_snap->active_count--; |
| |
| if (ActiveSnapshot->as_snap->active_count == 0 && |
| ActiveSnapshot->as_snap->regd_count == 0) |
| FreeSnapshot(ActiveSnapshot->as_snap); |
| |
| pfree(ActiveSnapshot); |
| ActiveSnapshot = newstack; |
| if (ActiveSnapshot == NULL) |
| OldestActiveSnapshot = NULL; |
| |
| SnapshotResetXmin(); |
| } |
| |
| /* |
| * GetActiveSnapshot |
| * Return the topmost snapshot in the Active stack. |
| */ |
| Snapshot |
| GetActiveSnapshot(void) |
| { |
| Assert(ActiveSnapshot != NULL); |
| |
| return ActiveSnapshot->as_snap; |
| } |
| |
| /* |
| * ActiveSnapshotSet |
| * Return whether there is at least one snapshot in the Active stack |
| */ |
| bool |
| ActiveSnapshotSet(void) |
| { |
| return ActiveSnapshot != NULL; |
| } |
| |
| /* |
| * RegisterSnapshot |
| * Register a snapshot as being in use by the current resource owner |
| * |
| * If InvalidSnapshot is passed, it is not registered. |
| */ |
| Snapshot |
| RegisterSnapshot(Snapshot snapshot) |
| { |
| if (snapshot == InvalidSnapshot) |
| return InvalidSnapshot; |
| |
| return RegisterSnapshotOnOwner(snapshot, CurrentResourceOwner); |
| } |
| |
| /* |
| * RegisterSnapshotOnOwner |
| * As above, but use the specified resource owner |
| */ |
| Snapshot |
| RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner) |
| { |
| Snapshot snap; |
| |
| if (snapshot == InvalidSnapshot) |
| return InvalidSnapshot; |
| |
| /* Static snapshot? Create a persistent copy */ |
| snap = snapshot->copied ? snapshot : CopySnapshot(snapshot); |
| |
| /* and tell resowner.c about it */ |
| ResourceOwnerEnlargeSnapshots(owner); |
| snap->regd_count++; |
| ResourceOwnerRememberSnapshot(owner, snap); |
| |
| if (snap->regd_count == 1) |
| pairingheap_add(&RegisteredSnapshots, &snap->ph_node); |
| |
| return snap; |
| } |
| |
| /* |
| * UnregisterSnapshot |
| * |
| * Decrement the reference count of a snapshot, remove the corresponding |
| * reference from CurrentResourceOwner, and free the snapshot if no more |
| * references remain. |
| */ |
| void |
| UnregisterSnapshot(Snapshot snapshot) |
| { |
| if (snapshot == NULL) |
| return; |
| |
| UnregisterSnapshotFromOwner(snapshot, CurrentResourceOwner); |
| } |
| |
| /* |
| * UnregisterSnapshotFromOwner |
| * As above, but use the specified resource owner |
| */ |
| void |
| UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner) |
| { |
| if (snapshot == NULL) |
| return; |
| |
| Assert(snapshot->regd_count > 0); |
| Assert(!pairingheap_is_empty(&RegisteredSnapshots)); |
| |
| ResourceOwnerForgetSnapshot(owner, snapshot); |
| |
| snapshot->regd_count--; |
| if (snapshot->regd_count == 0) |
| pairingheap_remove(&RegisteredSnapshots, &snapshot->ph_node); |
| |
| if (snapshot->regd_count == 0 && snapshot->active_count == 0) |
| { |
| FreeSnapshot(snapshot); |
| SnapshotResetXmin(); |
| } |
| } |
| |
| /* |
| * Comparison function for RegisteredSnapshots heap. Snapshots are ordered |
| * by xmin, so that the snapshot with smallest xmin is at the top. |
| */ |
| static int |
| xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) |
| { |
| const SnapshotData *asnap = pairingheap_const_container(SnapshotData, ph_node, a); |
| const SnapshotData *bsnap = pairingheap_const_container(SnapshotData, ph_node, b); |
| |
| if (TransactionIdPrecedes(asnap->xmin, bsnap->xmin)) |
| return 1; |
| else if (TransactionIdFollows(asnap->xmin, bsnap->xmin)) |
| return -1; |
| else |
| return 0; |
| } |
| |
| /* |
| * SnapshotResetXmin |
| * |
| * If there are no more snapshots, we can reset our PGPROC->xmin to InvalidXid. |
| * Note we can do this without locking because we assume that storing an Xid |
| * is atomic. |
| * |
| * Even if there are some remaining snapshots, we may be able to advance our |
| * PGPROC->xmin to some degree. This typically happens when a portal is |
| * dropped. For efficiency, we only consider recomputing PGPROC->xmin when |
| * the active snapshot stack is empty; this allows us not to need to track |
| * which active snapshot is oldest. |
| * |
| * Note: it's tempting to use GetOldestSnapshot() here so that we can include |
| * active snapshots in the calculation. However, that compares by LSN not |
| * xmin so it's not entirely clear that it's the same thing. Also, we'd be |
| * critically dependent on the assumption that the bottommost active snapshot |
| * stack entry has the oldest xmin. (Current uses of GetOldestSnapshot() are |
| * not actually critical, but this would be.) |
| */ |
| static void |
| SnapshotResetXmin(void) |
| { |
| Snapshot minSnapshot; |
| |
| if (ActiveSnapshot != NULL) |
| return; |
| |
| if (pairingheap_is_empty(&RegisteredSnapshots)) |
| { |
| MyProc->xmin = InvalidTransactionId; |
| return; |
| } |
| |
| minSnapshot = pairingheap_container(SnapshotData, ph_node, |
| pairingheap_first(&RegisteredSnapshots)); |
| |
| if (TransactionIdPrecedes(MyProc->xmin, minSnapshot->xmin)) |
| MyProc->xmin = minSnapshot->xmin; |
| } |
| |
| /* |
| * AtSubCommit_Snapshot |
| */ |
| void |
| AtSubCommit_Snapshot(int level) |
| { |
| ActiveSnapshotElt *active; |
| |
| /* |
| * Relabel the active snapshots set in this subtransaction as though they |
| * are owned by the parent subxact. |
| */ |
| for (active = ActiveSnapshot; active != NULL; active = active->as_next) |
| { |
| if (active->as_level < level) |
| break; |
| active->as_level = level - 1; |
| } |
| } |
| |
| /* |
| * AtSubAbort_Snapshot |
| * Clean up snapshots after a subtransaction abort |
| */ |
| void |
| AtSubAbort_Snapshot(int level) |
| { |
| /* Forget the active snapshots set by this subtransaction */ |
| while (ActiveSnapshot && ActiveSnapshot->as_level >= level) |
| { |
| ActiveSnapshotElt *next; |
| |
| next = ActiveSnapshot->as_next; |
| |
| /* |
| * Decrement the snapshot's active count. If it's still registered or |
| * marked as active by an outer subtransaction, we can't free it yet. |
| */ |
| Assert(ActiveSnapshot->as_snap->active_count >= 1); |
| ActiveSnapshot->as_snap->active_count -= 1; |
| |
| if (ActiveSnapshot->as_snap->active_count == 0 && |
| ActiveSnapshot->as_snap->regd_count == 0) |
| FreeSnapshot(ActiveSnapshot->as_snap); |
| |
| /* and free the stack element */ |
| pfree(ActiveSnapshot); |
| |
| ActiveSnapshot = next; |
| if (ActiveSnapshot == NULL) |
| OldestActiveSnapshot = NULL; |
| } |
| |
| SnapshotResetXmin(); |
| } |
| |
| /* |
| * AtEOXact_Snapshot |
| * Snapshot manager's cleanup function for end of transaction |
| */ |
| void |
| AtEOXact_Snapshot(bool isCommit, bool resetXmin) |
| { |
| /* |
| * In transaction-snapshot mode we must release our privately-managed |
| * reference to the transaction snapshot. We must remove it from |
| * RegisteredSnapshots to keep the check below happy. But we don't bother |
| * to do FreeSnapshot, for two reasons: the memory will go away with |
| * TopTransactionContext anyway, and if someone has left the snapshot |
| * stacked as active, we don't want the code below to be chasing through a |
| * dangling pointer. |
| */ |
| if (FirstXactSnapshot != NULL) |
| { |
| Assert(FirstXactSnapshot->regd_count > 0); |
| Assert(!pairingheap_is_empty(&RegisteredSnapshots)); |
| pairingheap_remove(&RegisteredSnapshots, &FirstXactSnapshot->ph_node); |
| } |
| FirstXactSnapshot = NULL; |
| |
| /* |
| * If we exported any snapshots, clean them up. |
| */ |
| if (exportedSnapshots != NIL) |
| { |
| ListCell *lc; |
| |
| /* |
| * Get rid of the files. Unlink failure is only a WARNING because (1) |
| * it's too late to abort the transaction, and (2) leaving a leaked |
| * file around has little real consequence anyway. |
| * |
| * We also need to remove the snapshots from RegisteredSnapshots to |
| * prevent a warning below. |
| * |
| * As with the FirstXactSnapshot, we don't need to free resources of |
| * the snapshot itself as it will go away with the memory context. |
| */ |
| foreach(lc, exportedSnapshots) |
| { |
| ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc); |
| |
| if (unlink(esnap->snapfile)) |
| elog(WARNING, "could not unlink file \"%s\": %m", |
| esnap->snapfile); |
| |
| pairingheap_remove(&RegisteredSnapshots, |
| &esnap->snapshot->ph_node); |
| } |
| |
| exportedSnapshots = NIL; |
| } |
| |
| /* Drop catalog snapshot if any */ |
| InvalidateCatalogSnapshot(); |
| |
| /* On commit, complain about leftover snapshots */ |
| if (isCommit) |
| { |
| ActiveSnapshotElt *active; |
| |
| if (!pairingheap_is_empty(&RegisteredSnapshots)) |
| elog(WARNING, "registered snapshots seem to remain after cleanup"); |
| |
| /* complain about unpopped active snapshots */ |
| for (active = ActiveSnapshot; active != NULL; active = active->as_next) |
| elog(WARNING, "snapshot %p still active", active); |
| } |
| |
| /* |
| * And reset our state. We don't need to free the memory explicitly -- |
| * it'll go away with TopTransactionContext. |
| */ |
| ActiveSnapshot = NULL; |
| OldestActiveSnapshot = NULL; |
| pairingheap_reset(&RegisteredSnapshots); |
| |
| CurrentSnapshot = NULL; |
| SecondarySnapshot = NULL; |
| |
| FirstSnapshotSet = false; |
| |
| /* |
| * During normal commit processing, we call ProcArrayEndTransaction() to |
| * reset the MyProc->xmin. That call happens prior to the call to |
| * AtEOXact_Snapshot(), so we need not touch xmin here at all. |
| */ |
| if (resetXmin) |
| SnapshotResetXmin(); |
| |
| Assert(resetXmin || MyProc->xmin == 0); |
| } |
| |
| |
| /* |
| * ExportSnapshot |
| * Export the snapshot to a file so that other backends can import it. |
| * Returns the token (the file name) that can be used to import this |
| * snapshot. |
| */ |
| char * |
| ExportSnapshot(Snapshot snapshot) |
| { |
| TransactionId topXid; |
| TransactionId *children; |
| ExportedSnapshot *esnap; |
| int nchildren; |
| int addTopXid; |
| StringInfoData buf; |
| FILE *f; |
| int i; |
| MemoryContext oldcxt; |
| char path[MAXPGPATH]; |
| char pathtmp[MAXPGPATH]; |
| |
| DistributedSnapshot *distributed_snapshot; |
| /* |
| * It's tempting to call RequireTransactionBlock here, since it's not very |
| * useful to export a snapshot that will disappear immediately afterwards. |
| * However, we haven't got enough information to do that, since we don't |
| * know if we're at top level or not. For example, we could be inside a |
| * plpgsql function that is going to fire off other transactions via |
| * dblink. Rather than disallow perfectly legitimate usages, don't make a |
| * check. |
| * |
| * Also note that we don't make any restriction on the transaction's |
| * isolation level; however, importers must check the level if they are |
| * serializable. |
| */ |
| |
| /* |
| * Get our transaction ID if there is one, to include in the snapshot. |
| */ |
| topXid = GetTopTransactionIdIfAny(); |
| |
| /* |
| * We cannot export a snapshot from a subtransaction because there's no |
| * easy way for importers to verify that the same subtransaction is still |
| * running. |
| */ |
| if (IsSubTransaction()) |
| ereport(ERROR, |
| (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), |
| errmsg("cannot export a snapshot from a subtransaction"))); |
| |
| /* |
| * We do however allow previous committed subtransactions to exist. |
| * Importers of the snapshot must see them as still running, so get their |
| * XIDs to add them to the snapshot. |
| */ |
| nchildren = xactGetCommittedChildren(&children); |
| |
| /* |
| * Generate file path for the snapshot. We start numbering of snapshots |
| * inside the transaction from 1. |
| */ |
| snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d", |
| MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1); |
| |
| /* |
| * Copy the snapshot into TopTransactionContext, add it to the |
| * exportedSnapshots list, and mark it pseudo-registered. We do this to |
| * ensure that the snapshot's xmin is honored for the rest of the |
| * transaction. |
| */ |
| snapshot = CopySnapshot(snapshot); |
| |
| oldcxt = MemoryContextSwitchTo(TopTransactionContext); |
| esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot)); |
| esnap->snapfile = pstrdup(path); |
| esnap->snapshot = snapshot; |
| exportedSnapshots = lappend(exportedSnapshots, esnap); |
| MemoryContextSwitchTo(oldcxt); |
| |
| snapshot->regd_count++; |
| pairingheap_add(&RegisteredSnapshots, &snapshot->ph_node); |
| |
| /* |
| * Fill buf with a text serialization of the snapshot, plus identification |
| * data about this transaction. The format expected by ImportSnapshot is |
| * pretty rigid: each line must be fieldname:value. |
| */ |
| initStringInfo(&buf); |
| |
| appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid); |
| appendStringInfo(&buf, "pid:%d\n", MyProcPid); |
| appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId); |
| appendStringInfo(&buf, "iso:%d\n", XactIsoLevel); |
| appendStringInfo(&buf, "ro:%d\n", XactReadOnly); |
| |
| appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin); |
| appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax); |
| |
| /* |
| * We must include our own top transaction ID in the top-xid data, since |
| * by definition we will still be running when the importing transaction |
| * adopts the snapshot, but GetSnapshotData never includes our own XID in |
| * the snapshot. (There must, therefore, be enough room to add it.) |
| * |
| * However, it could be that our topXid is after the xmax, in which case |
| * we shouldn't include it because xip[] members are expected to be before |
| * xmax. (We need not make the same check for subxip[] members, see |
| * snapshot.h.) |
| */ |
| addTopXid = (TransactionIdIsValid(topXid) && |
| TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0; |
| appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid); |
| for (i = 0; i < snapshot->xcnt; i++) |
| appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]); |
| if (addTopXid) |
| appendStringInfo(&buf, "xip:%u\n", topXid); |
| |
| /* |
| * Similarly, we add our subcommitted child XIDs to the subxid data. Here, |
| * we have to cope with possible overflow. |
| */ |
| if (snapshot->suboverflowed || |
| snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount()) |
| appendStringInfoString(&buf, "sof:1\n"); |
| else |
| { |
| appendStringInfoString(&buf, "sof:0\n"); |
| appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren); |
| for (i = 0; i < snapshot->subxcnt; i++) |
| appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]); |
| for (i = 0; i < nchildren; i++) |
| appendStringInfo(&buf, "sxp:%u\n", children[i]); |
| } |
| appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery); |
| |
| /* |
| * GPDB: Serialize distributed snapshot if present. |
| */ |
| if (snapshot->haveDistribSnapshot) |
| { |
| distributed_snapshot = &snapshot->distribSnapshotWithLocalMapping.ds; |
| appendStringInfo(&buf, "dsxminall:%lu\n", distributed_snapshot->xminAllDistributedSnapshots); |
| appendStringInfo(&buf, "dsid:%d\n", distributed_snapshot->distribSnapshotId); |
| appendStringInfo(&buf, "dsxmin:%lu\n", distributed_snapshot->xmin); |
| appendStringInfo(&buf, "dsxmax:%lu\n", distributed_snapshot->xmax); |
| appendStringInfo(&buf, "dsxcnt:%d\n", distributed_snapshot->count); |
| for (i = 0; i < distributed_snapshot->count; i++) |
| appendStringInfo(&buf, "dsxip:%lu\n", distributed_snapshot->inProgressXidArray[i]); |
| } |
| |
| /* |
| * Now write the text representation into a file. We first write to a |
| * ".tmp" filename, and rename to final filename if no error. This |
| * ensures that no other backend can read an incomplete file |
| * (ImportSnapshot won't allow it because of its valid-characters check). |
| */ |
| snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path); |
| if (!(f = AllocateFile(pathtmp, PG_BINARY_W))) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not create file \"%s\": %m", pathtmp))); |
| |
| if (fwrite(buf.data, buf.len, 1, f) != 1) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not write to file \"%s\": %m", pathtmp))); |
| |
| /* no fsync() since file need not survive a system crash */ |
| |
| if (FreeFile(f)) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not write to file \"%s\": %m", pathtmp))); |
| |
| /* |
| * Now that we have written everything into a .tmp file, rename the file |
| * to remove the .tmp suffix. |
| */ |
| if (rename(pathtmp, path) < 0) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not rename file \"%s\" to \"%s\": %m", |
| pathtmp, path))); |
| |
| /* |
| * The basename of the file is what we return from pg_export_snapshot(). |
| * It's already in path in a textual format and we know that the path |
| * starts with SNAPSHOT_EXPORT_DIR. Skip over the prefix and the slash |
| * and pstrdup it so as not to return the address of a local variable. |
| */ |
| return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1); |
| } |
| |
| /* |
| * pg_export_snapshot |
| * SQL-callable wrapper for ExportSnapshot. |
| */ |
| Datum |
| pg_export_snapshot(PG_FUNCTION_ARGS) |
| { |
| char *snapshotName; |
| |
| snapshotName = ExportSnapshot(GetActiveSnapshot()); |
| PG_RETURN_TEXT_P(cstring_to_text(snapshotName)); |
| } |
| |
| |
| /* |
| * Parsing subroutines for ImportSnapshot: parse a line with the given |
| * prefix followed by a value, and advance *s to the next line. The |
| * filename is provided for use in error messages. |
| */ |
| static int |
| parseIntFromText(const char *prefix, char **s, const char *filename) |
| { |
| char *ptr = *s; |
| int prefixlen = strlen(prefix); |
| int val; |
| |
| if (strncmp(ptr, prefix, prefixlen) != 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", filename))); |
| ptr += prefixlen; |
| if (sscanf(ptr, "%d", &val) != 1) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", filename))); |
| ptr = strchr(ptr, '\n'); |
| if (!ptr) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", filename))); |
| *s = ptr + 1; |
| return val; |
| } |
| |
| static TransactionId |
| parseXidFromText(const char *prefix, char **s, const char *filename) |
| { |
| char *ptr = *s; |
| int prefixlen = strlen(prefix); |
| TransactionId val; |
| |
| if (strncmp(ptr, prefix, prefixlen) != 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", filename))); |
| ptr += prefixlen; |
| if (sscanf(ptr, "%u", &val) != 1) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", filename))); |
| ptr = strchr(ptr, '\n'); |
| if (!ptr) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", filename))); |
| *s = ptr + 1; |
| return val; |
| } |
| |
| static DistributedTransactionId |
| parseDistributedXidFromText(const char *prefix, char **s, const char *filename) |
| { |
| char *ptr = *s; |
| int prefixlen = strlen(prefix); |
| DistributedTransactionId val; |
| |
| if (strncmp(ptr, prefix, prefixlen) != 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", filename))); |
| ptr += prefixlen; |
| if (sscanf(ptr, "%lu", &val) != 1) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", filename))); |
| ptr = strchr(ptr, '\n'); |
| if (!ptr) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", filename))); |
| *s = ptr + 1; |
| return val; |
| } |
| |
| static void |
| parseVxidFromText(const char *prefix, char **s, const char *filename, |
| VirtualTransactionId *vxid) |
| { |
| char *ptr = *s; |
| int prefixlen = strlen(prefix); |
| |
| if (strncmp(ptr, prefix, prefixlen) != 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", filename))); |
| ptr += prefixlen; |
| if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", filename))); |
| ptr = strchr(ptr, '\n'); |
| if (!ptr) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", filename))); |
| *s = ptr + 1; |
| } |
| |
| /* |
| * ImportSnapshot |
| * Import a previously exported snapshot. The argument should be a |
| * filename in SNAPSHOT_EXPORT_DIR. Load the snapshot from that file. |
| * This is called by "SET TRANSACTION SNAPSHOT 'foo'". |
| */ |
| void |
| ImportSnapshot(const char *idstr) |
| { |
| char path[MAXPGPATH]; |
| FILE *f; |
| struct stat stat_buf; |
| char *filebuf; |
| int xcnt; |
| int dxcnt; |
| int i; |
| VirtualTransactionId src_vxid; |
| int src_pid; |
| Oid src_dbid; |
| int src_isolevel; |
| bool src_readonly; |
| SnapshotData snapshot; |
| DistributedSnapshot *distributed_snapshot; |
| |
| /* |
| * Must be at top level of a fresh transaction. Note in particular that |
| * we check we haven't acquired an XID --- if we have, it's conceivable |
| * that the snapshot would show it as not running, making for very screwy |
| * behavior. |
| */ |
| if (FirstSnapshotSet || |
| GetTopTransactionIdIfAny() != InvalidTransactionId || |
| IsSubTransaction()) |
| ereport(ERROR, |
| (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), |
| errmsg("SET TRANSACTION SNAPSHOT must be called before any query"))); |
| |
| /* |
| * If we are in read committed mode then the next query would execute with |
| * a new snapshot thus making this function call quite useless. |
| */ |
| if (!IsolationUsesXactSnapshot()) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ"))); |
| |
| /* |
| * Verify the identifier: only 0-9, A-F and hyphens are allowed. We do |
| * this mainly to prevent reading arbitrary files. |
| */ |
| if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("invalid snapshot identifier: \"%s\"", idstr))); |
| |
| /* OK, read the file */ |
| snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr); |
| |
| f = AllocateFile(path, PG_BINARY_R); |
| if (!f) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("invalid snapshot identifier: \"%s\"", idstr))); |
| |
| /* get the size of the file so that we know how much memory we need */ |
| if (fstat(fileno(f), &stat_buf)) |
| elog(ERROR, "could not stat file \"%s\": %m", path); |
| |
| /* and read the file into a palloc'd string */ |
| filebuf = (char *) palloc(stat_buf.st_size + 1); |
| if (fread(filebuf, stat_buf.st_size, 1, f) != 1) |
| elog(ERROR, "could not read file \"%s\": %m", path); |
| |
| filebuf[stat_buf.st_size] = '\0'; |
| |
| FreeFile(f); |
| |
| /* |
| * Construct a snapshot struct by parsing the file content. |
| */ |
| memset(&snapshot, 0, sizeof(snapshot)); |
| |
| parseVxidFromText("vxid:", &filebuf, path, &src_vxid); |
| src_pid = parseIntFromText("pid:", &filebuf, path); |
| /* we abuse parseXidFromText a bit here ... */ |
| src_dbid = parseXidFromText("dbid:", &filebuf, path); |
| src_isolevel = parseIntFromText("iso:", &filebuf, path); |
| src_readonly = parseIntFromText("ro:", &filebuf, path); |
| |
| snapshot.snapshot_type = SNAPSHOT_MVCC; |
| |
| snapshot.xmin = parseXidFromText("xmin:", &filebuf, path); |
| snapshot.xmax = parseXidFromText("xmax:", &filebuf, path); |
| |
| snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path); |
| |
| /* sanity-check the xid count before palloc */ |
| if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount()) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", path))); |
| |
| snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId)); |
| for (i = 0; i < xcnt; i++) |
| snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path); |
| |
| snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path); |
| |
| if (!snapshot.suboverflowed) |
| { |
| snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path); |
| |
| /* sanity-check the xid count before palloc */ |
| if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount()) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", path))); |
| |
| snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId)); |
| for (i = 0; i < xcnt; i++) |
| snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path); |
| } |
| else |
| { |
| snapshot.subxcnt = 0; |
| snapshot.subxip = NULL; |
| } |
| |
| snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path); |
| |
| /* |
| * GPDB: Extract distributed snapshot |
| * Importing a distributed snapshot in utility mode is not allowed because |
| * functionality to dispatch pg_export_snapshot to all segments and create |
| * a snapshot with ds fields in each segments datadir is not implemented. |
| * Since there is no reliable way to export a utility mode distributed snapshot, |
| * we have no way to judge its provenance. |
| */ |
| if(*filebuf != '\0') |
| { |
| if (Gp_role == GP_ROLE_UTILITY) { |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("cannot import distributed snapshot in utility mode"), |
| errhint("export the snapshot in utility mode"))); |
| } |
| distributed_snapshot = &snapshot.distribSnapshotWithLocalMapping.ds; |
| distributed_snapshot->xminAllDistributedSnapshots = parseDistributedXidFromText("dsxminall:", &filebuf, path); |
| distributed_snapshot->distribSnapshotId = parseIntFromText("dsid:", &filebuf, path); |
| distributed_snapshot->xmin = parseDistributedXidFromText("dsxmin:", &filebuf, path); |
| distributed_snapshot->xmax = parseDistributedXidFromText("dsxmax:", &filebuf, path); |
| distributed_snapshot->count = dxcnt = parseIntFromText("dsxcnt:", &filebuf, path); |
| |
| /* sanity-check dsxmin and the xid count before palloc */ |
| if (distributed_snapshot->xmin < distributed_snapshot->xminAllDistributedSnapshots || |
| (dxcnt < 0 || dxcnt > GetMaxSnapshotDistributedXidCount()) |
| ) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", path))); |
| |
| distributed_snapshot->inProgressXidArray = (DistributedTransactionId *) palloc(dxcnt * sizeof(DistributedTransactionId)); |
| for (i = 0; i < dxcnt; i++) |
| distributed_snapshot->inProgressXidArray[i] = parseDistributedXidFromText("dsxip:", &filebuf, path); |
| snapshot.haveDistribSnapshot = true; |
| } |
| |
| /* |
| * Do some additional sanity checking, just to protect ourselves. We |
| * don't trouble to check the array elements, just the most critical |
| * fields. |
| */ |
| if (!VirtualTransactionIdIsValid(src_vxid) || |
| !OidIsValid(src_dbid) || |
| !TransactionIdIsNormal(snapshot.xmin) || |
| !TransactionIdIsNormal(snapshot.xmax)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), |
| errmsg("invalid snapshot data in file \"%s\"", path))); |
| |
| /* |
| * If we're serializable, the source transaction must be too, otherwise |
| * predicate.c has problems (SxactGlobalXmin could go backwards). Also, a |
| * non-read-only transaction can't adopt a snapshot from a read-only |
| * transaction, as predicate.c handles the cases very differently. |
| */ |
| if (IsolationIsSerializable()) |
| { |
| if (src_isolevel != XACT_SERIALIZABLE) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction"))); |
| if (src_readonly && !XactReadOnly) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction"))); |
| } |
| |
| /* |
| * We cannot import a snapshot that was taken in a different database, |
| * because vacuum calculates OldestXmin on a per-database basis; so the |
| * source transaction's xmin doesn't protect us from data loss. This |
| * restriction could be removed if the source transaction were to mark its |
| * xmin as being globally applicable. But that would require some |
| * additional syntax, since that has to be known when the snapshot is |
| * initially taken. (See pgsql-hackers discussion of 2011-10-21.) |
| */ |
| if (src_dbid != MyDatabaseId) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("cannot import a snapshot from a different database"))); |
| |
| /* OK, install the snapshot */ |
| SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL); |
| } |
| |
| /* |
| * XactHasExportedSnapshots |
| * Test whether current transaction has exported any snapshots. |
| */ |
| bool |
| XactHasExportedSnapshots(void) |
| { |
| return (exportedSnapshots != NIL); |
| } |
| |
| /* |
| * DeleteAllExportedSnapshotFiles |
| * Clean up any files that have been left behind by a crashed backend |
| * that had exported snapshots before it died. |
| * |
| * This should be called during database startup or crash recovery. |
| */ |
| void |
| DeleteAllExportedSnapshotFiles(void) |
| { |
| char buf[MAXPGPATH + sizeof(SNAPSHOT_EXPORT_DIR)]; |
| DIR *s_dir; |
| struct dirent *s_de; |
| |
| /* |
| * Problems in reading the directory, or unlinking files, are reported at |
| * LOG level. Since we're running in the startup process, ERROR level |
| * would prevent database start, and it's not important enough for that. |
| */ |
| s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR); |
| |
| while ((s_de = ReadDirExtended(s_dir, SNAPSHOT_EXPORT_DIR, LOG)) != NULL) |
| { |
| if (strcmp(s_de->d_name, ".") == 0 || |
| strcmp(s_de->d_name, "..") == 0) |
| continue; |
| |
| snprintf(buf, sizeof(buf), SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name); |
| |
| if (unlink(buf) != 0) |
| ereport(LOG, |
| (errcode_for_file_access(), |
| errmsg("could not remove file \"%s\": %m", buf))); |
| } |
| |
| FreeDir(s_dir); |
| } |
| |
| /* |
| * ThereAreNoPriorRegisteredSnapshots |
| * Is the registered snapshot count less than or equal to one? |
| * |
| * Don't use this to settle important decisions. While zero registrations and |
| * no ActiveSnapshot would confirm a certain idleness, the system makes no |
| * guarantees about the significance of one registered snapshot. |
| */ |
| bool |
| ThereAreNoPriorRegisteredSnapshots(void) |
| { |
| if (pairingheap_is_empty(&RegisteredSnapshots) || |
| pairingheap_is_singular(&RegisteredSnapshots)) |
| return true; |
| |
| return false; |
| } |
| |
| |
| /* |
| * Return a timestamp that is exactly on a minute boundary. |
| * |
| * If the argument is already aligned, return that value, otherwise move to |
| * the next minute boundary following the given time. |
| */ |
| static TimestampTz |
| AlignTimestampToMinuteBoundary(TimestampTz ts) |
| { |
| TimestampTz retval = ts + (USECS_PER_MINUTE - 1); |
| |
| return retval - (retval % USECS_PER_MINUTE); |
| } |
| |
| /* |
| * Get current timestamp for snapshots |
| * |
| * This is basically GetCurrentTimestamp(), but with a guarantee that |
| * the result never moves backward. |
| */ |
| TimestampTz |
| GetSnapshotCurrentTimestamp(void) |
| { |
| TimestampTz now = GetCurrentTimestamp(); |
| |
| /* |
| * Don't let time move backward; if it hasn't advanced, use the old value. |
| */ |
| SpinLockAcquire(&oldSnapshotControl->mutex_current); |
| if (now <= oldSnapshotControl->current_timestamp) |
| now = oldSnapshotControl->current_timestamp; |
| else |
| oldSnapshotControl->current_timestamp = now; |
| SpinLockRelease(&oldSnapshotControl->mutex_current); |
| |
| return now; |
| } |
| |
| /* |
| * Get timestamp through which vacuum may have processed based on last stored |
| * value for threshold_timestamp. |
| * |
| * XXX: So far, we never trust that a 64-bit value can be read atomically; if |
| * that ever changes, we could get rid of the spinlock here. |
| */ |
| TimestampTz |
| GetOldSnapshotThresholdTimestamp(void) |
| { |
| TimestampTz threshold_timestamp; |
| |
| SpinLockAcquire(&oldSnapshotControl->mutex_threshold); |
| threshold_timestamp = oldSnapshotControl->threshold_timestamp; |
| SpinLockRelease(&oldSnapshotControl->mutex_threshold); |
| |
| return threshold_timestamp; |
| } |
| |
| void |
| SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit) |
| { |
| SpinLockAcquire(&oldSnapshotControl->mutex_threshold); |
| Assert(oldSnapshotControl->threshold_timestamp <= ts); |
| Assert(TransactionIdPrecedesOrEquals(oldSnapshotControl->threshold_xid, xlimit)); |
| oldSnapshotControl->threshold_timestamp = ts; |
| oldSnapshotControl->threshold_xid = xlimit; |
| SpinLockRelease(&oldSnapshotControl->mutex_threshold); |
| } |
| |
| /* |
| * XXX: Magic to keep old_snapshot_threshold tests appear "working". They |
| * currently are broken, and discussion of what to do about them is |
| * ongoing. See |
| * https://www.postgresql.org/message-id/20200403001235.e6jfdll3gh2ygbuc%40alap3.anarazel.de |
| */ |
| void |
| SnapshotTooOldMagicForTest(void) |
| { |
| TimestampTz ts = GetSnapshotCurrentTimestamp(); |
| |
| Assert(old_snapshot_threshold == 0); |
| |
| ts -= 5 * USECS_PER_SEC; |
| |
| SpinLockAcquire(&oldSnapshotControl->mutex_threshold); |
| oldSnapshotControl->threshold_timestamp = ts; |
| SpinLockRelease(&oldSnapshotControl->mutex_threshold); |
| } |
| |
| /* |
| * If there is a valid mapping for the timestamp, set *xlimitp to |
| * that. Returns whether there is such a mapping. |
| */ |
| static bool |
| GetOldSnapshotFromTimeMapping(TimestampTz ts, TransactionId *xlimitp) |
| { |
| bool in_mapping = false; |
| |
| Assert(ts == AlignTimestampToMinuteBoundary(ts)); |
| |
| LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED); |
| |
| if (oldSnapshotControl->count_used > 0 |
| && ts >= oldSnapshotControl->head_timestamp) |
| { |
| int offset; |
| |
| offset = ((ts - oldSnapshotControl->head_timestamp) |
| / USECS_PER_MINUTE); |
| if (offset > oldSnapshotControl->count_used - 1) |
| offset = oldSnapshotControl->count_used - 1; |
| offset = (oldSnapshotControl->head_offset + offset) |
| % OLD_SNAPSHOT_TIME_MAP_ENTRIES; |
| |
| *xlimitp = oldSnapshotControl->xid_by_minute[offset]; |
| |
| in_mapping = true; |
| } |
| |
| LWLockRelease(OldSnapshotTimeMapLock); |
| |
| return in_mapping; |
| } |
| |
| /* |
| * TransactionIdLimitedForOldSnapshots |
| * |
| * Apply old snapshot limit. This is intended to be called for page pruning |
| * and table vacuuming, to allow old_snapshot_threshold to override the normal |
| * global xmin value. Actual testing for snapshot too old will be based on |
| * whether a snapshot timestamp is prior to the threshold timestamp set in |
| * this function. |
| * |
| * If the limited horizon allows a cleanup action that otherwise would not be |
| * possible, SetOldSnapshotThresholdTimestamp(*limit_ts, *limit_xid) needs to |
| * be called before that cleanup action. |
| */ |
| bool |
| TransactionIdLimitedForOldSnapshots(TransactionId recentXmin, |
| Relation relation, |
| TransactionId *limit_xid, |
| TimestampTz *limit_ts) |
| { |
| TimestampTz ts; |
| TransactionId xlimit = recentXmin; |
| TransactionId latest_xmin; |
| TimestampTz next_map_update_ts; |
| TransactionId threshold_timestamp; |
| TransactionId threshold_xid; |
| |
| Assert(TransactionIdIsNormal(recentXmin)); |
| Assert(OldSnapshotThresholdActive()); |
| Assert(limit_ts != NULL && limit_xid != NULL); |
| |
| /* |
| * TestForOldSnapshot() assumes early pruning advances the page LSN, so we |
| * can't prune early when skipping WAL. |
| */ |
| if (!RelationAllowsEarlyPruning(relation) || !RelationNeedsWAL(relation)) |
| return false; |
| |
| ts = GetSnapshotCurrentTimestamp(); |
| |
| SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin); |
| latest_xmin = oldSnapshotControl->latest_xmin; |
| next_map_update_ts = oldSnapshotControl->next_map_update; |
| SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin); |
| |
| /* |
| * Zero threshold always overrides to latest xmin, if valid. Without some |
| * heuristic it will find its own snapshot too old on, for example, a |
| * simple UPDATE -- which would make it useless for most testing, but |
| * there is no principled way to ensure that it doesn't fail in this way. |
| * Use a five-second delay to try to get useful testing behavior, but this |
| * may need adjustment. |
| */ |
| if (old_snapshot_threshold == 0) |
| { |
| if (TransactionIdPrecedes(latest_xmin, MyProc->xmin) |
| && TransactionIdFollows(latest_xmin, xlimit)) |
| xlimit = latest_xmin; |
| |
| ts -= 5 * USECS_PER_SEC; |
| } |
| else |
| { |
| ts = AlignTimestampToMinuteBoundary(ts) |
| - (old_snapshot_threshold * USECS_PER_MINUTE); |
| |
| /* Check for fast exit without LW locking. */ |
| SpinLockAcquire(&oldSnapshotControl->mutex_threshold); |
| threshold_timestamp = oldSnapshotControl->threshold_timestamp; |
| threshold_xid = oldSnapshotControl->threshold_xid; |
| SpinLockRelease(&oldSnapshotControl->mutex_threshold); |
| |
| if (ts == threshold_timestamp) |
| { |
| /* |
| * Current timestamp is in same bucket as the last limit that was |
| * applied. Reuse. |
| */ |
| xlimit = threshold_xid; |
| } |
| else if (ts == next_map_update_ts) |
| { |
| /* |
| * FIXME: This branch is super iffy - but that should probably |
| * fixed separately. |
| */ |
| xlimit = latest_xmin; |
| } |
| else if (GetOldSnapshotFromTimeMapping(ts, &xlimit)) |
| { |
| } |
| |
| /* |
| * Failsafe protection against vacuuming work of active transaction. |
| * |
| * This is not an assertion because we avoid the spinlock for |
| * performance, leaving open the possibility that xlimit could advance |
| * and be more current; but it seems prudent to apply this limit. It |
| * might make pruning a tiny bit less aggressive than it could be, but |
| * protects against data loss bugs. |
| */ |
| if (TransactionIdIsNormal(latest_xmin) |
| && TransactionIdPrecedes(latest_xmin, xlimit)) |
| xlimit = latest_xmin; |
| } |
| |
| if (TransactionIdIsValid(xlimit) && |
| TransactionIdFollowsOrEquals(xlimit, recentXmin)) |
| { |
| *limit_ts = ts; |
| *limit_xid = xlimit; |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /* |
| * Take care of the circular buffer that maps time to xid. |
| */ |
| void |
| MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin) |
| { |
| TimestampTz ts; |
| TransactionId latest_xmin; |
| TimestampTz update_ts; |
| bool map_update_required = false; |
| |
| /* Never call this function when old snapshot checking is disabled. */ |
| Assert(old_snapshot_threshold >= 0); |
| |
| ts = AlignTimestampToMinuteBoundary(whenTaken); |
| |
| /* |
| * Keep track of the latest xmin seen by any process. Update mapping with |
| * a new value when we have crossed a bucket boundary. |
| */ |
| SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin); |
| latest_xmin = oldSnapshotControl->latest_xmin; |
| update_ts = oldSnapshotControl->next_map_update; |
| if (ts > update_ts) |
| { |
| oldSnapshotControl->next_map_update = ts; |
| map_update_required = true; |
| } |
| if (TransactionIdFollows(xmin, latest_xmin)) |
| oldSnapshotControl->latest_xmin = xmin; |
| SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin); |
| |
| /* We only needed to update the most recent xmin value. */ |
| if (!map_update_required) |
| return; |
| |
| /* No further tracking needed for 0 (used for testing). */ |
| if (old_snapshot_threshold == 0) |
| return; |
| |
| /* |
| * We don't want to do something stupid with unusual values, but we don't |
| * want to litter the log with warnings or break otherwise normal |
| * processing for this feature; so if something seems unreasonable, just |
| * log at DEBUG level and return without doing anything. |
| */ |
| if (whenTaken < 0) |
| { |
| elog(DEBUG1, |
| "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld", |
| (long) whenTaken); |
| return; |
| } |
| if (!TransactionIdIsNormal(xmin)) |
| { |
| elog(DEBUG1, |
| "MaintainOldSnapshotTimeMapping called with xmin = %lu", |
| (unsigned long) xmin); |
| return; |
| } |
| |
| LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE); |
| |
| Assert(oldSnapshotControl->head_offset >= 0); |
| Assert(oldSnapshotControl->head_offset < OLD_SNAPSHOT_TIME_MAP_ENTRIES); |
| Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0); |
| Assert(oldSnapshotControl->count_used >= 0); |
| Assert(oldSnapshotControl->count_used <= OLD_SNAPSHOT_TIME_MAP_ENTRIES); |
| |
| if (oldSnapshotControl->count_used == 0) |
| { |
| /* set up first entry for empty mapping */ |
| oldSnapshotControl->head_offset = 0; |
| oldSnapshotControl->head_timestamp = ts; |
| oldSnapshotControl->count_used = 1; |
| oldSnapshotControl->xid_by_minute[0] = xmin; |
| } |
| else if (ts < oldSnapshotControl->head_timestamp) |
| { |
| /* old ts; log it at DEBUG */ |
| LWLockRelease(OldSnapshotTimeMapLock); |
| elog(DEBUG1, |
| "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld", |
| (long) whenTaken); |
| return; |
| } |
| else if (ts <= (oldSnapshotControl->head_timestamp + |
| ((oldSnapshotControl->count_used - 1) |
| * USECS_PER_MINUTE))) |
| { |
| /* existing mapping; advance xid if possible */ |
| int bucket = (oldSnapshotControl->head_offset |
| + ((ts - oldSnapshotControl->head_timestamp) |
| / USECS_PER_MINUTE)) |
| % OLD_SNAPSHOT_TIME_MAP_ENTRIES; |
| |
| if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin)) |
| oldSnapshotControl->xid_by_minute[bucket] = xmin; |
| } |
| else |
| { |
| /* We need a new bucket, but it might not be the very next one. */ |
| int distance_to_new_tail; |
| int distance_to_current_tail; |
| int advance; |
| |
| /* |
| * Our goal is for the new "tail" of the mapping, that is, the entry |
| * which is newest and thus furthest from the "head" entry, to |
| * correspond to "ts". Since there's one entry per minute, the |
| * distance between the current head and the new tail is just the |
| * number of minutes of difference between ts and the current |
| * head_timestamp. |
| * |
| * The distance from the current head to the current tail is one less |
| * than the number of entries in the mapping, because the entry at the |
| * head_offset is for 0 minutes after head_timestamp. |
| * |
| * The difference between these two values is the number of minutes by |
| * which we need to advance the mapping, either adding new entries or |
| * rotating old ones out. |
| */ |
| distance_to_new_tail = |
| (ts - oldSnapshotControl->head_timestamp) / USECS_PER_MINUTE; |
| distance_to_current_tail = |
| oldSnapshotControl->count_used - 1; |
| advance = distance_to_new_tail - distance_to_current_tail; |
| Assert(advance > 0); |
| |
| if (advance >= OLD_SNAPSHOT_TIME_MAP_ENTRIES) |
| { |
| /* Advance is so far that all old data is junk; start over. */ |
| oldSnapshotControl->head_offset = 0; |
| oldSnapshotControl->count_used = 1; |
| oldSnapshotControl->xid_by_minute[0] = xmin; |
| oldSnapshotControl->head_timestamp = ts; |
| } |
| else |
| { |
| /* Store the new value in one or more buckets. */ |
| int i; |
| |
| for (i = 0; i < advance; i++) |
| { |
| if (oldSnapshotControl->count_used == OLD_SNAPSHOT_TIME_MAP_ENTRIES) |
| { |
| /* Map full and new value replaces old head. */ |
| int old_head = oldSnapshotControl->head_offset; |
| |
| if (old_head == (OLD_SNAPSHOT_TIME_MAP_ENTRIES - 1)) |
| oldSnapshotControl->head_offset = 0; |
| else |
| oldSnapshotControl->head_offset = old_head + 1; |
| oldSnapshotControl->xid_by_minute[old_head] = xmin; |
| oldSnapshotControl->head_timestamp += USECS_PER_MINUTE; |
| } |
| else |
| { |
| /* Extend map to unused entry. */ |
| int new_tail = (oldSnapshotControl->head_offset |
| + oldSnapshotControl->count_used) |
| % OLD_SNAPSHOT_TIME_MAP_ENTRIES; |
| |
| oldSnapshotControl->count_used++; |
| oldSnapshotControl->xid_by_minute[new_tail] = xmin; |
| } |
| } |
| } |
| } |
| |
| LWLockRelease(OldSnapshotTimeMapLock); |
| } |
| |
| |
| /* |
| * Setup a snapshot that replaces normal catalog snapshots that allows catalog |
| * access to behave just like it did at a certain point in the past. |
| * |
| * Needed for logical decoding. |
| */ |
| void |
| SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids) |
| { |
| Assert(historic_snapshot != NULL); |
| |
| /* setup the timetravel snapshot */ |
| HistoricSnapshot = historic_snapshot; |
| |
| /* setup (cmin, cmax) lookup hash */ |
| tuplecid_data = tuplecids; |
| } |
| |
| |
| /* |
| * Make catalog snapshots behave normally again. |
| */ |
| void |
| TeardownHistoricSnapshot(bool is_error) |
| { |
| HistoricSnapshot = NULL; |
| tuplecid_data = NULL; |
| } |
| |
| bool |
| HistoricSnapshotActive(void) |
| { |
| return HistoricSnapshot != NULL; |
| } |
| |
| HTAB * |
| HistoricSnapshotGetTupleCids(void) |
| { |
| Assert(HistoricSnapshotActive()); |
| return tuplecid_data; |
| } |
| |
| /* |
| * EstimateSnapshotSpace |
| * Returns the size needed to store the given snapshot. |
| * |
| * We are exporting only required fields from the Snapshot, stored in |
| * SerializedSnapshotData. |
| */ |
| Size |
| EstimateSnapshotSpace(Snapshot snap) |
| { |
| Size size; |
| |
| Assert(snap != InvalidSnapshot); |
| Assert(snap->snapshot_type == SNAPSHOT_MVCC || gp_select_invisible); |
| |
| /* We allocate any XID arrays needed in the same palloc block. */ |
| size = add_size(sizeof(SerializedSnapshotData), |
| mul_size(snap->xcnt, sizeof(TransactionId))); |
| if (snap->subxcnt > 0 && |
| (!snap->suboverflowed || snap->takenDuringRecovery)) |
| size = add_size(size, |
| mul_size(snap->subxcnt, sizeof(TransactionId))); |
| |
| if (snap->haveDistribSnapshot && snap->distribSnapshotWithLocalMapping.ds.count > 0) |
| { |
| size = add_size(size, |
| mul_size(snap->distribSnapshotWithLocalMapping.ds.count, sizeof(DistributedTransactionId))); |
| if (snap->distribSnapshotWithLocalMapping.currentLocalXidsCount > 0) |
| { |
| size = add_size(size, |
| mul_size(snap->distribSnapshotWithLocalMapping.currentLocalXidsCount, sizeof(TransactionId))); |
| } |
| } |
| |
| return size; |
| } |
| |
| Size |
| EstimateSnapshotDataSpace(void) |
| { |
| return sizeof(SerializedSnapshotData); |
| } |
| |
| /* |
| * SerializeSnapshot |
| * Dumps the serialized snapshot (extracted from given snapshot) onto the |
| * memory location at start_address. |
| */ |
| void |
| SerializeSnapshot(Snapshot snapshot, char *start_address) |
| { |
| SerializedSnapshotData serialized_snapshot; |
| Size subxipoff = sizeof(SerializedSnapshotData) + snapshot->xcnt * sizeof(TransactionId); |
| Size dsoff = subxipoff + snapshot->subxcnt * sizeof(TransactionId); |
| Size dslmoff = dsoff + snapshot->distribSnapshotWithLocalMapping.ds.count * sizeof(DistributedTransactionId); |
| |
| Assert(snapshot->subxcnt >= 0); |
| |
| /* Copy all required fields */ |
| serialized_snapshot.xmin = snapshot->xmin; |
| serialized_snapshot.xmax = snapshot->xmax; |
| serialized_snapshot.xcnt = snapshot->xcnt; |
| serialized_snapshot.subxcnt = snapshot->subxcnt; |
| serialized_snapshot.suboverflowed = snapshot->suboverflowed; |
| serialized_snapshot.takenDuringRecovery = snapshot->takenDuringRecovery; |
| serialized_snapshot.curcid = snapshot->curcid; |
| serialized_snapshot.whenTaken = snapshot->whenTaken; |
| serialized_snapshot.lsn = snapshot->lsn; |
| |
| serialized_snapshot.haveDistribSnapshot = snapshot->haveDistribSnapshot; |
| |
| /* Copy fields for cdb distribute snapshot */ |
| serialized_snapshot.minCachedLocalXid = snapshot->distribSnapshotWithLocalMapping.minCachedLocalXid; |
| serialized_snapshot.maxCachedLocalXid = snapshot->distribSnapshotWithLocalMapping.maxCachedLocalXid; |
| serialized_snapshot.currentLocalXidsCount = snapshot->distribSnapshotWithLocalMapping.currentLocalXidsCount; |
| |
| serialized_snapshot.ds_xminAllDistributedSnapshots = snapshot->distribSnapshotWithLocalMapping.ds.xminAllDistributedSnapshots; |
| serialized_snapshot.ds_distribSnapshotId = snapshot->distribSnapshotWithLocalMapping.ds.distribSnapshotId; |
| serialized_snapshot.ds_xmin = snapshot->distribSnapshotWithLocalMapping.ds.xmin; |
| serialized_snapshot.ds_xmax = snapshot->distribSnapshotWithLocalMapping.ds.xmax; |
| serialized_snapshot.ds_count = snapshot->distribSnapshotWithLocalMapping.ds.count; |
| |
| /* |
| * Ignore the SubXID array if it has overflowed, unless the snapshot was |
| * taken during recovery - in that case, top-level XIDs are in subxip as |
| * well, and we mustn't lose them. |
| */ |
| if (serialized_snapshot.suboverflowed && !snapshot->takenDuringRecovery) |
| serialized_snapshot.subxcnt = 0; |
| |
| /* Copy struct to possibly-unaligned buffer */ |
| memcpy(start_address, |
| &serialized_snapshot, sizeof(SerializedSnapshotData)); |
| |
| /* Copy XID array */ |
| if (snapshot->xcnt > 0) |
| memcpy((TransactionId *) (start_address + |
| sizeof(SerializedSnapshotData)), |
| snapshot->xip, snapshot->xcnt * sizeof(TransactionId)); |
| |
| /* |
| * Copy SubXID array. Don't bother to copy it if it had overflowed, |
| * though, because it's not used anywhere in that case. Except if it's a |
| * snapshot taken during recovery; all the top-level XIDs are in subxip as |
| * well in that case, so we mustn't lose them. |
| */ |
| if (serialized_snapshot.subxcnt > 0) |
| { |
| memcpy((TransactionId *) (start_address + subxipoff), |
| snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId)); |
| } |
| |
| if (snapshot->haveDistribSnapshot && |
| snapshot->distribSnapshotWithLocalMapping.ds.count > 0) |
| { |
| memcpy((DistributedTransactionId*) (start_address + dsoff), |
| snapshot->distribSnapshotWithLocalMapping.ds.inProgressXidArray, |
| snapshot->distribSnapshotWithLocalMapping.ds.count * |
| sizeof(DistributedTransactionId)); |
| |
| if (snapshot->distribSnapshotWithLocalMapping.currentLocalXidsCount > 0) |
| { |
| memcpy((TransactionId*) (start_address + dslmoff), |
| snapshot->distribSnapshotWithLocalMapping.inProgressMappedLocalXids, |
| snapshot->distribSnapshotWithLocalMapping.currentLocalXidsCount * |
| sizeof(TransactionId)); |
| } |
| } |
| } |
| |
| void |
| ReadSharedLocalSnapshot(Snapshot snapshot) |
| { |
| char *start_address; |
| dsm_segment *snapshot_sgm; |
| TransactionId *serialized_xids; |
| TransactionId *serialized_subxids; |
| SerializedSnapshotData *serialized_snapshot; |
| |
| if (unlikely(SharedLocalSnapshotSlot->snapshot_handle == |
| DSM_HANDLE_INVALID)) |
| elog(ERROR, "QE reader can not sync shared snapshot."); |
| |
| snapshot_sgm = dsm_attach(SharedLocalSnapshotSlot->snapshot_handle); |
| start_address = dsm_segment_address(snapshot_sgm); |
| |
| serialized_snapshot = (SerializedSnapshotData *) start_address; |
| |
| snapshot->xmin = serialized_snapshot->xmin; |
| snapshot->xmax = serialized_snapshot->xmax; |
| snapshot->xcnt = serialized_snapshot->xcnt; |
| snapshot->subxcnt = serialized_snapshot->subxcnt; |
| snapshot->suboverflowed = serialized_snapshot->suboverflowed; |
| snapshot->curcid = serialized_snapshot->curcid; |
| |
| /* We now capture our current view of the xip/combocid arrays */ |
| serialized_xids = |
| (TransactionId *) (start_address + sizeof(SerializedSnapshotData)); |
| serialized_subxids = serialized_xids + snapshot->xcnt; |
| |
| if (snapshot->xcnt > 0) |
| memcpy(snapshot->xip, serialized_xids, |
| snapshot->xcnt * sizeof(TransactionId)); |
| |
| if (snapshot->subxcnt > 0) |
| memcpy(snapshot->subxip, serialized_subxids, |
| snapshot->subxcnt * sizeof(TransactionId)); |
| |
| dsm_detach(snapshot_sgm); |
| |
| if (TransactionIdPrecedes(snapshot->xmin, TransactionXmin)) |
| TransactionXmin = snapshot->xmin; |
| |
| ereport( |
| (Debug_print_snapshot_dtm ? LOG : DEBUG5), |
| (errmsg( |
| "Reader qExec setting shared local snapshot to: xmin: %d xmax: %d curcid: %d", |
| snapshot->xmin, snapshot->xmax, snapshot->curcid))); |
| |
| ereport( |
| (Debug_print_snapshot_dtm ? LOG : DEBUG5), |
| (errmsg( |
| "GetSnapshotData(): READER currentcommandid %d curcid %d segmatesync %d", |
| GetCurrentCommandId(false), snapshot->curcid, |
| SharedLocalSnapshotSlot->segmateSync))); |
| } |
| |
| CommandId |
| UpdateSharedLocalSnapshotCommandId(CommandId curcid) |
| { |
| char *start_address; |
| CommandId oldcid; |
| dsm_segment *snapshot_sgm; |
| SerializedSnapshotData *serialized_snapshot; |
| |
| if (unlikely(SharedLocalSnapshotSlot->snapshot_handle == DSM_HANDLE_INVALID)) |
| elog(ERROR, "Shared snapshot dsm handler is invalid"); |
| |
| snapshot_sgm = dsm_attach(SharedLocalSnapshotSlot->snapshot_handle); |
| start_address = dsm_segment_address(snapshot_sgm); |
| |
| serialized_snapshot = (SerializedSnapshotData *) start_address; |
| oldcid = serialized_snapshot->curcid; |
| serialized_snapshot->curcid = curcid; |
| |
| dsm_detach(snapshot_sgm); |
| |
| return oldcid; |
| } |
| /* |
| * RestoreSnapshot |
| * Restore a serialized snapshot from the specified address. |
| * |
| * The copy is palloc'd in TopTransactionContext and has initial refcounts set |
| * to 0. The returned snapshot has the copied flag set. |
| */ |
| Snapshot |
| RestoreSnapshot(char *start_address) |
| { |
| SerializedSnapshotData serialized_snapshot; |
| Size dsoff = 0; |
| Size dslmoff = 0; |
| Size size; |
| Snapshot snapshot; |
| TransactionId *serialized_xids; |
| |
| memcpy(&serialized_snapshot, start_address, |
| sizeof(SerializedSnapshotData)); |
| serialized_xids = (TransactionId *) |
| (start_address + sizeof(SerializedSnapshotData)); |
| |
| /* We allocate any XID arrays needed in the same palloc block. */ |
| size = sizeof(SnapshotData) |
| + serialized_snapshot.xcnt * sizeof(TransactionId) |
| + serialized_snapshot.subxcnt * sizeof(TransactionId); |
| dslmoff = dsoff = size; |
| |
| if (serialized_snapshot.haveDistribSnapshot && |
| serialized_snapshot.ds_count > 0) |
| { |
| size += serialized_snapshot.ds_count * |
| sizeof(DistributedTransactionId); |
| dslmoff = size; |
| size += serialized_snapshot.ds_count * |
| sizeof(TransactionId); |
| } |
| |
| /* Copy all required fields */ |
| snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size); |
| snapshot->snapshot_type = SNAPSHOT_MVCC; |
| snapshot->xmin = serialized_snapshot.xmin; |
| snapshot->xmax = serialized_snapshot.xmax; |
| snapshot->xip = NULL; |
| snapshot->xcnt = serialized_snapshot.xcnt; |
| snapshot->subxip = NULL; |
| snapshot->subxcnt = serialized_snapshot.subxcnt; |
| snapshot->suboverflowed = serialized_snapshot.suboverflowed; |
| snapshot->takenDuringRecovery = serialized_snapshot.takenDuringRecovery; |
| snapshot->curcid = serialized_snapshot.curcid; |
| snapshot->whenTaken = serialized_snapshot.whenTaken; |
| snapshot->lsn = serialized_snapshot.lsn; |
| snapshot->snapXactCompletionCount = 0; |
| snapshot->haveDistribSnapshot = serialized_snapshot.haveDistribSnapshot; |
| |
| /* Copy all fields for cdb distributed snapshot */ |
| snapshot->distribSnapshotWithLocalMapping.minCachedLocalXid = serialized_snapshot.minCachedLocalXid; |
| snapshot->distribSnapshotWithLocalMapping.maxCachedLocalXid = serialized_snapshot.maxCachedLocalXid; |
| snapshot->distribSnapshotWithLocalMapping.currentLocalXidsCount = serialized_snapshot.currentLocalXidsCount; |
| |
| snapshot->distribSnapshotWithLocalMapping.ds.xminAllDistributedSnapshots = serialized_snapshot.ds_xminAllDistributedSnapshots; |
| snapshot->distribSnapshotWithLocalMapping.ds.distribSnapshotId = serialized_snapshot.ds_distribSnapshotId; |
| snapshot->distribSnapshotWithLocalMapping.ds.xmin = serialized_snapshot.ds_xmin; |
| snapshot->distribSnapshotWithLocalMapping.ds.xmax = serialized_snapshot.ds_xmax; |
| snapshot->distribSnapshotWithLocalMapping.ds.count = serialized_snapshot.ds_count; |
| |
| /* Copy XIDs, if present. */ |
| if (serialized_snapshot.xcnt > 0) |
| { |
| snapshot->xip = (TransactionId *) (snapshot + 1); |
| memcpy(snapshot->xip, serialized_xids, |
| serialized_snapshot.xcnt * sizeof(TransactionId)); |
| } |
| |
| /* Copy SubXIDs, if present. */ |
| if (serialized_snapshot.subxcnt > 0) |
| { |
| snapshot->subxip = ((TransactionId *) (snapshot + 1)) + |
| serialized_snapshot.xcnt; |
| memcpy(snapshot->subxip, serialized_xids + serialized_snapshot.xcnt, |
| serialized_snapshot.subxcnt * sizeof(TransactionId)); |
| } |
| |
| /* Set the copied flag so that the caller will set refcounts correctly. */ |
| snapshot->regd_count = 0; |
| snapshot->active_count = 0; |
| snapshot->copied = true; |
| |
| snapshot->distribSnapshotWithLocalMapping.ds.inProgressXidArray = NULL; |
| snapshot->distribSnapshotWithLocalMapping.inProgressMappedLocalXids = NULL; |
| /* Copy distribSnapshotWithLocalMapping. */ |
| if (serialized_snapshot.haveDistribSnapshot && |
| serialized_snapshot.ds_count > 0) |
| { |
| Size ds_off = sizeof(SerializedSnapshotData) + |
| serialized_snapshot.xcnt * sizeof(TransactionId) + |
| serialized_snapshot.subxcnt * sizeof(TransactionId); |
| Size ds_lmoff = ds_off + |
| serialized_snapshot.ds_count * sizeof(DistributedTransactionId); |
| |
| snapshot->distribSnapshotWithLocalMapping.ds.inProgressXidArray = |
| (DistributedTransactionId*) ((char *) snapshot + dsoff); |
| snapshot->distribSnapshotWithLocalMapping.inProgressMappedLocalXids = |
| (TransactionId*) ((char *) snapshot + dslmoff); |
| |
| memcpy(snapshot->distribSnapshotWithLocalMapping.ds.inProgressXidArray, |
| (DistributedTransactionId*) (start_address + ds_off), |
| serialized_snapshot.ds_count * |
| sizeof(DistributedTransactionId)); |
| |
| if (serialized_snapshot.currentLocalXidsCount > 0) |
| { |
| memset(snapshot->distribSnapshotWithLocalMapping.inProgressMappedLocalXids, |
| 0, |
| serialized_snapshot.ds_count * sizeof(TransactionId)); |
| memcpy(snapshot->distribSnapshotWithLocalMapping.inProgressMappedLocalXids, |
| (TransactionId*) (start_address + ds_lmoff), |
| serialized_snapshot.currentLocalXidsCount * |
| sizeof(TransactionId)); |
| } |
| } |
| |
| return snapshot; |
| } |
| |
| /* |
| * Install a restored snapshot as the transaction snapshot. |
| * |
| * The second argument is of type void * so that snapmgr.h need not include |
| * the declaration for PGPROC. |
| */ |
| void |
| RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc) |
| { |
| SetTransactionSnapshot(snapshot, NULL, InvalidPid, source_pgproc); |
| } |
| |
| /* |
| * XidInMVCCSnapshot |
| * Is the given XID still-in-progress according to the snapshot? |
| * |
| * GPDB: We have extended the return values to accommodate the case where |
| * we know for sure that the passed in xid has surely committed. This is |
| * to reduce subsequent calls to TransactionIdDidCommit() |
| */ |
| XidInMVCCSnapshotCheckResult |
| XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot, |
| bool distributedSnapshotIgnore, bool *setDistributedSnapshotIgnore) |
| { |
| Assert (setDistributedSnapshotIgnore != NULL); |
| *setDistributedSnapshotIgnore = false; |
| |
| /* |
| * If we have a distributed snapshot, it takes precedence over the local |
| * snapshot since it covers the correct past view of in-progress distributed |
| * transactions and also the correct future view of in-progress distributed |
| * transactions that may yet arrive. |
| * |
| * In the QD, the distributed transactions become visible at the same time |
| * as the corresponding local ones, so we can rely on the local XIDs. |
| */ |
| if (snapshot->haveDistribSnapshot && !distributedSnapshotIgnore && |
| !IS_QUERY_DISPATCHER()) |
| { |
| DistributedSnapshotCommitted distributedSnapshotCommitted; |
| |
| /* Special XIDs don't belong to snapshots, distributed or not. */ |
| if (!TransactionIdIsNormal(xid)) |
| return XID_NOT_IN_SNAPSHOT; |
| |
| /* |
| * A transaction's distributed snapshot always "lags behind" its local |
| * snapshot. So if the local snapshot still sees a transaction as |
| * in-progress, it must be in-progress for the distributed snapshot, |
| * too. Perform this quick xmax check first to avoid the more |
| * expensive distributed snapshot check, if possible. |
| */ |
| if (TransactionIdFollowsOrEquals(xid, snapshot->xmax)) |
| return XID_IN_SNAPSHOT; |
| |
| /* |
| * Check if this committed transaction is a distributed committed |
| * transaction and evaluate it against the distributed snapshot if |
| * it is. |
| */ |
| distributedSnapshotCommitted = |
| DistributedSnapshotWithLocalMapping_CommittedTest( |
| &snapshot->distribSnapshotWithLocalMapping, |
| xid, false); |
| |
| switch (distributedSnapshotCommitted) |
| { |
| case DISTRIBUTEDSNAPSHOT_COMMITTED_INPROGRESS: |
| return XID_IN_SNAPSHOT; |
| |
| case DISTRIBUTEDSNAPSHOT_COMMITTED_VISIBLE: |
| return XID_SURELY_COMMITTED; |
| |
| case DISTRIBUTEDSNAPSHOT_COMMITTED_IGNORE: |
| /* |
| * We can safely skip both of these in the future for distributed |
| * snapshots. |
| */ |
| *setDistributedSnapshotIgnore = true; |
| break; |
| |
| case DISTRIBUTEDSNAPSHOT_COMMITTED_UNKNOWN: |
| /* |
| * The distributed log doesn't know anything about this XID. It may |
| * be a local-only transaction, or still in-progress. Proceed to |
| * perform a local visibility check. |
| */ |
| break; |
| |
| default: |
| elog(FATAL, "Unrecognized distributed committed test result: %d", |
| (int) distributedSnapshotCommitted); |
| break; |
| } |
| } |
| |
| return XidInMVCCSnapshot_Local(xid, snapshot) ? XID_IN_SNAPSHOT : XID_NOT_IN_SNAPSHOT; |
| } |
| |
| /* |
| * XidInMVCCSnapshot |
| * Is the given XID still-in-progress according to the local snapshot? |
| * |
| * Note: GetSnapshotData never stores either top xid or subxids of our own |
| * backend into a snapshot, so these xids will not be reported as "running" |
| * by this function. This is OK for current uses, because we always check |
| * TransactionIdIsCurrentTransactionId first, except when it's known the |
| * XID could not be ours anyway. |
| */ |
| bool |
| XidInMVCCSnapshot_Local(TransactionId xid, Snapshot snapshot) |
| { |
| uint32 i; |
| |
| /* |
| * Make a quick range check to eliminate most XIDs without looking at the |
| * xip arrays. Note that this is OK even if we convert a subxact XID to |
| * its parent below, because a subxact with XID < xmin has surely also got |
| * a parent with XID < xmin, while one with XID >= xmax must belong to a |
| * parent that was not yet committed at the time of this snapshot. |
| */ |
| |
| /* Any xid < xmin is not in-progress */ |
| if (TransactionIdPrecedes(xid, snapshot->xmin)) |
| return false; |
| /* Any xid >= xmax is in-progress */ |
| if (TransactionIdFollowsOrEquals(xid, snapshot->xmax)) |
| return true; |
| |
| /* |
| * Snapshot information is stored slightly differently in snapshots taken |
| * during recovery. |
| */ |
| if (!snapshot->takenDuringRecovery) |
| { |
| /* |
| * If the snapshot contains full subxact data, the fastest way to |
| * check things is just to compare the given XID against both subxact |
| * XIDs and top-level XIDs. If the snapshot overflowed, we have to |
| * use pg_subtrans to convert a subxact XID to its parent XID, but |
| * then we need only look at top-level XIDs not subxacts. |
| */ |
| if (!snapshot->suboverflowed) |
| { |
| /* we have full data, so search subxip */ |
| int32 j; |
| |
| for (j = 0; j < snapshot->subxcnt; j++) |
| { |
| if (TransactionIdEquals(xid, snapshot->subxip[j])) |
| return true; |
| } |
| |
| /* not there, fall through to search xip[] */ |
| } |
| else |
| { |
| /* |
| * Snapshot overflowed, so convert xid to top-level. This is safe |
| * because we eliminated too-old XIDs above. |
| */ |
| xid = SubTransGetTopmostTransaction(xid); |
| |
| /* |
| * If xid was indeed a subxact, we might now have an xid < xmin, |
| * so recheck to avoid an array scan. No point in rechecking |
| * xmax. |
| */ |
| if (TransactionIdPrecedes(xid, snapshot->xmin)) |
| return false; |
| } |
| |
| for (i = 0; i < snapshot->xcnt; i++) |
| { |
| if (TransactionIdEquals(xid, snapshot->xip[i])) |
| return true; |
| } |
| } |
| else |
| { |
| int32 j; |
| |
| /* |
| * In recovery we store all xids in the subxact array because it is by |
| * far the bigger array, and we mostly don't know which xids are |
| * top-level and which are subxacts. The xip array is empty. |
| * |
| * We start by searching subtrans, if we overflowed. |
| */ |
| if (snapshot->suboverflowed) |
| { |
| /* |
| * Snapshot overflowed, so convert xid to top-level. This is safe |
| * because we eliminated too-old XIDs above. |
| */ |
| xid = SubTransGetTopmostTransaction(xid); |
| |
| /* |
| * If xid was indeed a subxact, we might now have an xid < xmin, |
| * so recheck to avoid an array scan. No point in rechecking |
| * xmax. |
| */ |
| if (TransactionIdPrecedes(xid, snapshot->xmin)) |
| return false; |
| } |
| |
| /* |
| * We now have either a top-level xid higher than xmin or an |
| * indeterminate xid. We don't know whether it's top level or subxact |
| * but it doesn't matter. If it's present, the xid is visible. |
| */ |
| for (j = 0; j < snapshot->subxcnt; j++) |
| { |
| if (TransactionIdEquals(xid, snapshot->subxip[j])) |
| return true; |
| } |
| } |
| |
| return false; |
| } |