| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * |
| * cdbpersistentstore.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| #include "miscadmin.h" |
| #include "pgstat.h" |
| #include "utils/palloc.h" |
| |
| #include "cdb/cdbpersistentstore.h" |
| |
| #include "storage/itemptr.h" |
| #include "access/genam.h" |
| #include "access/heapam.h" |
| #include "access/transam.h" |
| #include "utils/guc.h" |
| #include "utils/faultinjector.h" |
| #include "storage/smgr.h" |
| #include "storage/ipc.h" |
| #include "cdb/cdbglobalsequence.h" |
| #include "cdb/cdbpersistentfilesysobj.h" |
| #include "cdb/cdbpersistentrecovery.h" |
| |
| void PersistentStore_InitShared( |
| PersistentStoreSharedData *storeSharedData) |
| { |
| MemSet(storeSharedData, 0, sizeof(PersistentStoreSharedData)); |
| |
| strcpy(storeSharedData->eyecatcher, PersistentStoreSharedDataEyecatcher); |
| |
| storeSharedData->needToScanIntoSharedMemory = true; |
| } |
| |
| void PersistentStore_Init( |
| PersistentStoreData *storeData, |
| |
| char *tableName, |
| |
| GpGlobalSequence gpGlobalSequence, |
| |
| PersistentStoreOpenRel openRel, |
| |
| PersistentStoreCloseRel closeRel, |
| |
| PersistentStoreScanTupleCallback scanTupleCallback, |
| |
| PersistentStorePrintTupleCallback printTupleCallback, |
| |
| int numAttributes, |
| |
| int attNumPersistentSerialNum, |
| |
| int attNumPreviousFreeTid) |
| { |
| MemSet(storeData, 0, sizeof(PersistentStoreData)); |
| |
| storeData->tableName = tableName; |
| storeData->gpGlobalSequence = gpGlobalSequence; |
| storeData->openRel = openRel; |
| storeData->closeRel = closeRel; |
| storeData->scanTupleCallback = scanTupleCallback; |
| storeData->printTupleCallback = printTupleCallback; |
| storeData->numAttributes = numAttributes; |
| storeData->attNumPersistentSerialNum = attNumPersistentSerialNum; |
| storeData->attNumPreviousFreeTid = attNumPreviousFreeTid; |
| } |
| |
| void PersistentStore_ResetFreeList( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData) |
| { |
| int64 persistentSerialNum; |
| |
| persistentSerialNum = |
| GlobalSequence_Current( |
| storeData->gpGlobalSequence); |
| |
| storeSharedData->maxInUseSerialNum = persistentSerialNum; |
| storeSharedData->inUseCount = 0; |
| storeSharedData->maxFreeOrderNum = 0; |
| MemSet(&storeSharedData->freeTid, 0, sizeof(ItemPointerData)); |
| MemSet(&storeSharedData->maxTid, 0, sizeof(ItemPointerData)); |
| } |
| |
| static void PersistentStore_DeformTuple( |
| PersistentStoreData *storeData, |
| |
| TupleDesc tupleDesc, |
| /* tuple descriptor */ |
| |
| HeapTuple tuple, |
| |
| Datum *values) |
| { |
| bool *nulls; |
| |
| int i; |
| |
| nulls = (bool*)palloc(storeData->numAttributes * sizeof(bool)); |
| |
| heap_deform_tuple(tuple, tupleDesc, values, nulls); |
| |
| for (i = 1; i <= storeData->numAttributes; i++) |
| Assert(!nulls[i - 1]); |
| |
| pfree(nulls); |
| } |
| |
| static void PersistentStore_ExtractOurTupleData( |
| PersistentStoreData *storeData, |
| |
| Datum *values, |
| |
| int64 *persistentSerialNum, |
| |
| ItemPointer previousFreeTid) |
| { |
| *persistentSerialNum = DatumGetInt64(values[storeData->attNumPersistentSerialNum - 1]); |
| |
| *previousFreeTid = *((ItemPointer) DatumGetPointer(values[storeData->attNumPreviousFreeTid - 1])); |
| } |
| |
| static void PersistentStore_FormTupleSetOurs( |
| PersistentStoreData *storeData, |
| |
| TupleDesc tupleDesc, |
| /* Tuple descriptor. */ |
| |
| Datum *values, |
| |
| int64 persistentSerialNum, |
| |
| ItemPointerData *previousFreeTid, |
| |
| HeapTuple *tuple) |
| /* The formed tuple. */ |
| { |
| int natts = 0; |
| |
| bool *nulls; |
| |
| natts = tupleDesc->natts; |
| Assert(natts == storeData->numAttributes); |
| |
| /* |
| * In order to keep the tuples the exact same size to enable direct reuse of |
| * free tuples, we do not use NULLs. |
| */ |
| nulls = (bool*)palloc0(storeData->numAttributes * sizeof(bool)); |
| |
| values[storeData->attNumPersistentSerialNum - 1] = |
| Int64GetDatum(persistentSerialNum); |
| |
| values[storeData->attNumPreviousFreeTid - 1] = |
| PointerGetDatum(previousFreeTid); |
| |
| /* |
| * Form the tuple. |
| */ |
| *tuple = heap_form_tuple(tupleDesc, values, nulls); |
| if (!HeapTupleIsValid(*tuple)) |
| elog(ERROR, "Failed to build persistent relation node tuple"); |
| |
| } |
| |
| static HTAB* freeEntryHashTable = NULL; |
| |
| typedef struct PersistentFreeEntryKey |
| { |
| ItemPointerData persistentTid; |
| |
| } PersistentFreeEntryKey; |
| |
| typedef struct PersistentFreeEntry |
| { |
| PersistentFreeEntryKey key; |
| |
| ItemPointerData previousFreeTid; |
| int64 freeOrderNum; |
| |
| } PersistentFreeEntry; |
| |
| static void |
| PersistentStore_FreeEntryHashTableInit(void) |
| { |
| HASHCTL info; |
| int hash_flags; |
| |
| /* Set key and entry sizes. */ |
| MemSet(&info, 0, sizeof(info)); |
| info.keysize = sizeof(PersistentFreeEntryKey); |
| info.entrysize = sizeof(PersistentFreeEntry); |
| info.hash = tag_hash; |
| |
| hash_flags = (HASH_ELEM | HASH_FUNCTION); |
| |
| freeEntryHashTable = hash_create("Persistent Free Entry", 10, &info, hash_flags); |
| } |
| |
| static void PersistentStore_InitScanAddFreeEntry( |
| ItemPointer persistentTid, |
| |
| ItemPointer previousFreeTid, |
| |
| int64 freeOrderNum) |
| { |
| PersistentFreeEntryKey key; |
| |
| PersistentFreeEntry *entry; |
| |
| bool found; |
| |
| if (PersistentStore_IsZeroTid(persistentTid)) |
| elog(ERROR, "Expected persistent TID to not be (0,0)"); |
| |
| if (PersistentStore_IsZeroTid(previousFreeTid)) |
| elog(ERROR, "Expected previous free TID to not be (0,0)"); |
| |
| if (freeEntryHashTable == NULL) |
| PersistentStore_FreeEntryHashTableInit(); |
| |
| MemSet(&key, 0, sizeof(key)); |
| key.persistentTid = *persistentTid; |
| |
| entry = |
| (PersistentFreeEntry*) |
| hash_search(freeEntryHashTable, |
| (void *) &key, |
| HASH_ENTER, |
| &found); |
| |
| if (found) |
| elog(ERROR, "Duplicate free persistent TID entry %s", |
| ItemPointerToString(persistentTid)); |
| |
| entry->previousFreeTid = *previousFreeTid; |
| entry->freeOrderNum = freeOrderNum; |
| } |
| |
| static void PersistentStore_InitScanVerifyFreeEntries( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData) |
| { |
| int64 freeOrderNum; |
| ItemPointerData freeTid; |
| |
| PersistentFreeEntry *entry; |
| |
| HASH_SEQ_STATUS iterateStatus; |
| |
| freeOrderNum = storeSharedData->maxFreeOrderNum; |
| freeTid = storeSharedData->freeTid; |
| |
| if (freeOrderNum == 0) |
| { |
| if (!PersistentStore_IsZeroTid(&freeTid)) |
| elog(ERROR, "Expected free TID to be (0,0) when free order number is 0 in '%s'", |
| storeData->tableName); |
| } |
| else |
| { |
| PersistentFreeEntryKey key; |
| |
| PersistentFreeEntry *removeEntry; |
| |
| if (freeEntryHashTable == NULL) |
| elog(ERROR, "Looking for free order number " INT64_FORMAT " and the free entry hash table is empty for '%s'", |
| freeOrderNum, |
| storeData->tableName); |
| |
| while (true) |
| { |
| MemSet(&key, 0, sizeof(key)); |
| key.persistentTid = freeTid; |
| |
| entry = |
| (PersistentFreeEntry*) |
| hash_search(freeEntryHashTable, |
| (void *) &key, |
| HASH_FIND, |
| NULL); |
| if (entry == NULL) |
| elog(ERROR, |
| "Did not find free entry for free TID %s (free order number " INT64_FORMAT ") for '%s'", |
| ItemPointerToString(&freeTid), |
| freeOrderNum, |
| storeData->tableName); |
| |
| if (PersistentStore_IsZeroTid(&entry->previousFreeTid)) |
| elog(ERROR, |
| "Previous free TID not expected to be (0,0) -- persistent Free Entry hashtable corrupted for '%s' " |
| "(expected free order number " INT64_FORMAT ", entry free order number " INT64_FORMAT ")", |
| storeData->tableName, |
| freeOrderNum, |
| entry->freeOrderNum); |
| |
| if (freeOrderNum != entry->freeOrderNum) |
| elog(ERROR, |
| "Free entry for free TID %s has wrong free order number (expected free order number " INT64_FORMAT ", found free order number " INT64_FORMAT ") for '%s'", |
| ItemPointerToString(&freeTid), |
| freeOrderNum, |
| entry->freeOrderNum, |
| storeData->tableName); |
| |
| if (Debug_persistent_recovery_print) |
| elog(PersistentRecovery_DebugPrintLevel(), |
| "PersistentStore_InitScanVerifyFreeEntries ('%s'): Free order number " INT64_FORMAT ", free TID %s, previous free TID %s", |
| storeData->tableName, |
| freeOrderNum, |
| ItemPointerToString(&freeTid), |
| ItemPointerToString2(&entry->previousFreeTid)); |
| |
| freeTid = entry->previousFreeTid; |
| Insist(!PersistentStore_IsZeroTid(&freeTid)); // Note the error check above. |
| if (freeOrderNum == 1) |
| { |
| /* |
| * The last free entry uses its own TID in previous_free_tid. |
| */ |
| if (ItemPointerCompare( |
| &entry->key.persistentTid, |
| &freeTid) != 0) |
| { |
| elog(ERROR, "Expected previous_free_tid %s to match the persistent TID %s for the last free entry (free order number 1) for '%s'", |
| ItemPointerToString(&freeTid), |
| ItemPointerToString2(&entry->key.persistentTid), |
| storeData->tableName); |
| |
| } |
| } |
| |
| removeEntry = hash_search( |
| freeEntryHashTable, |
| (void *) &entry->key, |
| HASH_REMOVE, |
| NULL); |
| if (removeEntry == NULL) |
| elog(ERROR, "Persistent Free Entry hashtable corrupted for '%s'", |
| storeData->tableName); |
| entry = NULL; |
| |
| freeOrderNum--; |
| |
| if (freeOrderNum == 0) |
| break; |
| |
| } |
| } |
| |
| if (freeEntryHashTable != NULL) |
| { |
| hash_seq_init( |
| &iterateStatus, |
| freeEntryHashTable); |
| |
| /* |
| * Verify the hash table has no free entries left. |
| */ |
| while ((entry = hash_seq_search(&iterateStatus)) != NULL) |
| { |
| elog(ERROR, "Found at least one unaccounted for free entry for '%s'. Example: free order number " INT64_FORMAT ", free TID %s, previous free TID %s", |
| storeData->tableName, |
| entry->freeOrderNum, |
| ItemPointerToString(&entry->key.persistentTid), |
| ItemPointerToString2(&entry->previousFreeTid)); |
| } |
| |
| hash_destroy(freeEntryHashTable); |
| freeEntryHashTable = NULL; |
| } |
| |
| if (Debug_persistent_recovery_print) |
| elog(PersistentRecovery_DebugPrintLevel(), |
| "PersistentStore_InitScanVerifyFreeEntries ('%s'): Successfully verified " INT64_FORMAT " free entries", |
| storeData->tableName, |
| storeSharedData->maxFreeOrderNum); |
| } |
| |
| static void PersistentStore_DoInitScan( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData) |
| { |
| PersistentStoreScan storeScan; |
| |
| ItemPointerData persistentTid; |
| int64 persistentSerialNum; |
| |
| ItemPointerData previousFreeTid; |
| |
| Datum *values; |
| |
| int64 globalSequenceNum; |
| |
| values = (Datum*)palloc(storeData->numAttributes * sizeof(Datum)); |
| |
| MemSet(&storeSharedData->maxTid, 0, sizeof(ItemPointerData)); |
| |
| PersistentStore_BeginScan( |
| storeData, |
| storeSharedData, |
| &storeScan); |
| |
| while (PersistentStore_GetNext( |
| &storeScan, |
| values, |
| &persistentTid, |
| &persistentSerialNum)) |
| { |
| /* |
| * We are scanning from low to high TID. |
| */ |
| Assert( |
| PersistentStore_IsZeroTid(&storeSharedData->maxTid) |
| || |
| ItemPointerCompare( |
| &storeSharedData->maxTid, |
| &persistentTid) == -1); // Less-Than. |
| |
| storeSharedData->maxTid = persistentTid; |
| |
| PersistentStore_ExtractOurTupleData( |
| storeData, |
| values, |
| &persistentSerialNum, |
| &previousFreeTid); |
| |
| if (Debug_persistent_recovery_print) |
| (*storeData->printTupleCallback)( |
| PersistentRecovery_DebugPrintLevel(), |
| "SCAN", |
| &persistentTid, |
| values); |
| |
| if (!PersistentStore_IsZeroTid(&previousFreeTid)) |
| { |
| /* |
| * Non-zero previousFreeTid implies a free entry. |
| */ |
| if (storeSharedData->maxFreeOrderNum < persistentSerialNum) |
| { |
| storeSharedData->maxFreeOrderNum = persistentSerialNum; |
| storeSharedData->freeTid = persistentTid; |
| } |
| |
| if (!gp_persistent_skip_free_list) |
| { |
| PersistentStore_InitScanAddFreeEntry( |
| &persistentTid, |
| &previousFreeTid, |
| /* freeOrderNum */ persistentSerialNum); |
| } |
| } |
| else |
| { |
| storeSharedData->inUseCount++; |
| |
| if (storeSharedData->maxInUseSerialNum < persistentSerialNum) |
| { |
| storeSharedData->maxInUseSerialNum = persistentSerialNum; |
| storeData->myHighestSerialNum = storeSharedData->maxInUseSerialNum; |
| } |
| } |
| |
| if (storeData->scanTupleCallback != NULL) |
| (*storeData->scanTupleCallback)( |
| &persistentTid, |
| persistentSerialNum, |
| values); |
| |
| } |
| |
| PersistentStore_EndScan(&storeScan); |
| |
| pfree(values); |
| |
| globalSequenceNum = GlobalSequence_Current(storeData->gpGlobalSequence); |
| |
| /* |
| * Note: Originally the below IF STMT was guarded with a InRecovery flag check. |
| * However, this routine should not be called during recovery since the entries are |
| * not consistent... |
| */ |
| Assert(!InRecovery); |
| |
| if (globalSequenceNum < storeSharedData->maxInUseSerialNum) |
| { |
| /* |
| * We seem to have a corruption problem. |
| * |
| * Use the gp_persistent_repair_global_sequence GUC to get the system up. |
| */ |
| |
| if (gp_persistent_repair_global_sequence) |
| { |
| elog(LOG, "Need to Repair global sequence number " INT64_FORMAT " so use scanned maximum value " INT64_FORMAT " ('%s')", |
| globalSequenceNum, |
| storeSharedData->maxInUseSerialNum, |
| storeData->tableName); |
| } |
| else |
| { |
| elog(ERROR, "Global sequence number " INT64_FORMAT " less than maximum value " INT64_FORMAT " found in scan ('%s')", |
| globalSequenceNum, |
| storeSharedData->maxInUseSerialNum, |
| storeData->tableName); |
| } |
| |
| } |
| else |
| { |
| storeSharedData->maxCachedSerialNum = globalSequenceNum; |
| } |
| |
| if (Debug_persistent_recovery_print) |
| elog(PersistentRecovery_DebugPrintLevel(), |
| "PersistentStore_DoInitScan ('%s'): maximum in-use serial number " INT64_FORMAT ", maximum free order number " INT64_FORMAT ", free TID %s, maximum known TID %s", |
| storeData->tableName, |
| storeSharedData->maxInUseSerialNum, |
| storeSharedData->maxFreeOrderNum, |
| ItemPointerToString(&storeSharedData->freeTid), |
| ItemPointerToString2(&storeSharedData->maxTid)); |
| |
| if (!gp_persistent_skip_free_list) |
| { |
| PersistentStore_InitScanVerifyFreeEntries( |
| storeData, |
| storeSharedData); |
| } |
| else |
| { |
| if (Debug_persistent_recovery_print) |
| elog(PersistentRecovery_DebugPrintLevel(), |
| "PersistentStore_DoInitScan ('%s'): Skipping verification because gp_persistent_skip_free_list GUC is ON", |
| storeData->tableName); |
| } |
| } |
| |
| void PersistentStore_InitScanUnderLock( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData) |
| { |
| if (!storeSharedData->needToScanIntoSharedMemory) |
| { |
| return; // Someone else got in first. |
| } |
| |
| PersistentStore_DoInitScan( |
| storeData, |
| storeSharedData); |
| |
| storeSharedData->needToScanIntoSharedMemory = false; |
| } |
| |
| void PersistentStore_Scan( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData, |
| |
| PersistentStoreScanTupleCallback scanTupleCallback) |
| { |
| WRITE_PERSISTENT_STATE_ORDERED_LOCK_DECLARE; |
| |
| PersistentStoreScan storeScan; |
| |
| ItemPointerData persistentTid; |
| int64 persistentSerialNum = 0; |
| |
| Datum *values; |
| |
| values = (Datum*)palloc(storeData->numAttributes * sizeof(Datum)); |
| |
| WRITE_PERSISTENT_STATE_ORDERED_LOCK; |
| |
| PersistentStore_BeginScan( |
| storeData, |
| storeSharedData, |
| &storeScan); |
| |
| while (PersistentStore_GetNext( |
| &storeScan, |
| values, |
| &persistentTid, |
| &persistentSerialNum)) |
| { |
| bool okToContinue; |
| |
| okToContinue = (*scanTupleCallback)( |
| &persistentTid, |
| persistentSerialNum, |
| values); |
| |
| if (!okToContinue) |
| break; |
| |
| } |
| |
| PersistentStore_EndScan(&storeScan); |
| |
| WRITE_PERSISTENT_STATE_ORDERED_UNLOCK; |
| |
| pfree(values); |
| } |
| |
| void PersistentStore_BeginScan( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData, |
| |
| PersistentStoreScan *storeScan) |
| { |
| MemSet(storeScan, 0, sizeof(PersistentStoreScan)); |
| |
| storeScan->storeData = storeData; |
| storeScan->storeSharedData = storeSharedData; |
| |
| storeScan->persistentRel = (*storeData->openRel)(); |
| |
| storeScan->scan = heap_beginscan( |
| storeScan->persistentRel, |
| SnapshotNow, |
| 0, |
| NULL); |
| |
| storeScan->tuple = NULL; |
| } |
| |
| bool PersistentStore_GetNext( |
| PersistentStoreScan *storeScan, |
| |
| Datum *values, |
| |
| ItemPointer persistentTid, |
| |
| int64 *persistentSerialNum) |
| { |
| ItemPointerData previousFreeTid; |
| |
| storeScan->tuple = heap_getnext(storeScan->scan, ForwardScanDirection); |
| if (storeScan->tuple == NULL) |
| return false; |
| |
| PersistentStore_DeformTuple( |
| storeScan->storeData, |
| storeScan->persistentRel->rd_att, |
| storeScan->tuple, |
| values); |
| PersistentStore_ExtractOurTupleData( |
| storeScan->storeData, |
| values, |
| persistentSerialNum, |
| &previousFreeTid); |
| |
| *persistentTid = storeScan->tuple->t_self; |
| |
| return true; |
| } |
| |
| HeapTuple PersistentStore_GetScanTupleCopy( |
| PersistentStoreScan *storeScan) |
| { |
| return heaptuple_copy_to(storeScan->tuple, NULL, NULL); |
| } |
| |
| void PersistentStore_EndScan( |
| PersistentStoreScan *storeScan) |
| { |
| heap_endscan(storeScan->scan); |
| |
| (*storeScan->storeData->closeRel)(storeScan->persistentRel); |
| } |
| |
| // ----------------------------------------------------------------------------- |
| // Helpers |
| // ----------------------------------------------------------------------------- |
| |
| static XLogRecPtr nowaitXLogEndLoc; |
| |
| inline static void XLogRecPtr_Zero(XLogRecPtr *xlogLoc) |
| { |
| MemSet(xlogLoc, 0, sizeof(XLogRecPtr)); |
| } |
| |
| void PersistentStore_FlushXLog(void) |
| { |
| if (nowaitXLogEndLoc.xlogid != 0 || |
| nowaitXLogEndLoc.xrecoff != 0) |
| { |
| XLogFlush(nowaitXLogEndLoc); |
| XLogRecPtr_Zero(&nowaitXLogEndLoc); |
| } |
| } |
| |
| int64 PersistentStore_MyHighestSerialNum( |
| PersistentStoreData *storeData) |
| { |
| return storeData->myHighestSerialNum; |
| } |
| |
| int64 PersistentStore_CurrentMaxSerialNum( |
| PersistentStoreSharedData *storeSharedData) |
| { |
| return storeSharedData->maxInUseSerialNum; |
| } |
| |
| PersistentTidIsKnownResult PersistentStore_TidIsKnown( |
| PersistentStoreSharedData *storeSharedData, |
| |
| ItemPointer persistentTid, |
| |
| ItemPointer maxTid) |
| { |
| *maxTid = storeSharedData->maxTid; |
| |
| Assert(!PersistentStore_IsZeroTid(persistentTid)); |
| |
| // UNDONE: I think the InRecovery test only applies to physical Master Mirroring on Standby. |
| /* Only test this outside of recovery scenarios */ |
| if (Persistent_BeforePersistenceWork()) |
| return PersistentTidIsKnownResult_BeforePersistenceWork; |
| |
| if (storeSharedData->needToScanIntoSharedMemory) |
| return PersistentTidIsKnownResult_ScanNotPerformedYet; |
| |
| if (PersistentStore_IsZeroTid(&storeSharedData->maxTid)) |
| return PersistentTidIsKnownResult_MaxTidIsZero; |
| |
| if (ItemPointerCompare( |
| persistentTid, |
| &storeSharedData->maxTid) <= 0) // Less-than or equal. |
| return PersistentTidIsKnownResult_Known; |
| else |
| return PersistentTidIsKnownResult_NotKnown; |
| } |
| |
| static void PersistentStore_DoInsertTuple( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData, |
| |
| Relation persistentRel, |
| /* The persistent table relation. */ |
| |
| Datum *values, |
| |
| bool flushToXLog, |
| /* When true, the XLOG record for this change will be flushed to disk. */ |
| |
| ItemPointer persistentTid) |
| /* TID of the stored tuple. */ |
| |
| { |
| bool *nulls; |
| HeapTuple persistentTuple = NULL; |
| XLogRecPtr xlogInsertEndLoc; |
| |
| /* |
| * In order to keep the tuples the exact same size to enable direct reuse of |
| * free tuples, we do not use NULLs. |
| */ |
| nulls = (bool*)palloc0(storeData->numAttributes * sizeof(bool)); |
| |
| /* |
| * Form the tuple. |
| */ |
| persistentTuple = heap_form_tuple(persistentRel->rd_att, values, nulls); |
| if (!HeapTupleIsValid(persistentTuple)) |
| elog(ERROR, "Failed to build persistent tuple ('%s')", |
| storeData->tableName); |
| |
| /* |
| * (We have an exclusive lock (higher up) here so we can direct the insert to the last page.) |
| */ |
| { |
| // Do not assert valid ItemPointer -- it is ok if it is (0,0)... |
| BlockNumber blockNumber = |
| BlockIdGetBlockNumber( |
| &storeSharedData->maxTid.ip_blkid); |
| |
| frozen_heap_insert_directed( |
| persistentRel, |
| persistentTuple, |
| blockNumber); |
| } |
| |
| if (Debug_persistent_store_print) |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_DoInsertTuple: old maximum known TID %s, new insert TID %s ('%s')", |
| ItemPointerToString(&storeSharedData->maxTid), |
| ItemPointerToString2(&persistentTuple->t_self), |
| storeData->tableName); |
| if (ItemPointerCompare( |
| &storeSharedData->maxTid, |
| &persistentTuple->t_self) == -1) |
| { |
| // Current max is Less-Than. |
| storeSharedData->maxTid = persistentTuple->t_self; |
| } |
| |
| /* |
| * Return the TID of the INSERT tuple. |
| * Return the XLOG location of the INSERT tuple's XLOG record. |
| */ |
| *persistentTid = persistentTuple->t_self; |
| |
| xlogInsertEndLoc = XLogLastInsertEndLoc(); |
| |
| heap_freetuple(persistentTuple); |
| |
| if (flushToXLog) |
| { |
| XLogFlush(xlogInsertEndLoc); |
| XLogRecPtr_Zero(&nowaitXLogEndLoc); |
| } |
| else |
| nowaitXLogEndLoc = xlogInsertEndLoc; |
| |
| pfree(nulls); |
| |
| } |
| |
| static void PersistentStore_InsertTuple( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData, |
| |
| Datum *values, |
| |
| bool flushToXLog, |
| /* When true, the XLOG record for this change will be flushed to disk. */ |
| |
| ItemPointer persistentTid) |
| /* TID of the stored tuple. */ |
| |
| { |
| Relation persistentRel; |
| |
| #ifdef USE_ASSERT_CHECKING |
| if (storeSharedData == NULL || |
| !PersistentStoreSharedData_EyecatcherIsValid(storeSharedData)) |
| elog(ERROR, "Persistent store shared-memory not valid"); |
| #endif |
| |
| if (Debug_persistent_store_print) |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_InsertTuple: Going to insert new tuple ('%s', shared data %p)", |
| storeData->tableName, |
| storeSharedData); |
| |
| persistentRel = (*storeData->openRel)(); |
| |
| PersistentStore_DoInsertTuple( |
| storeData, |
| storeSharedData, |
| persistentRel, |
| values, |
| flushToXLog, |
| persistentTid); |
| |
| #ifdef FAULT_INJECTOR |
| if (FaultInjector_InjectFaultIfSet(SyncPersistentTable, |
| DDLNotSpecified, |
| "" /* databaseName */, |
| "" /* tableName */)== FaultInjectorTypeSkip) |
| { |
| FlushRelationBuffers(persistentRel); |
| smgrimmedsync(persistentRel->rd_smgr); |
| } |
| #endif |
| |
| (*storeData->closeRel)(persistentRel); |
| |
| if (Debug_persistent_store_print) |
| { |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_InsertTuple: Inserted new tuple at TID %s ('%s')", |
| ItemPointerToString(persistentTid), |
| storeData->tableName); |
| |
| (*storeData->printTupleCallback)( |
| PersistentStore_DebugPrintLevel(), |
| "STORE INSERT TUPLE", |
| persistentTid, |
| values); |
| } |
| |
| } |
| |
| void PersistentStore_UpdateTuple( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData, |
| |
| ItemPointer persistentTid, |
| /* TID of the stored tuple. */ |
| |
| Datum *values, |
| |
| bool flushToXLog) |
| /* When true, the XLOG record for this change will be flushed to disk. */ |
| |
| { |
| Relation persistentRel; |
| bool *nulls; |
| HeapTuple persistentTuple = NULL; |
| XLogRecPtr xlogUpdateEndLoc; |
| |
| #ifdef USE_ASSERT_CHECKING |
| if (storeSharedData == NULL || |
| !PersistentStoreSharedData_EyecatcherIsValid(storeSharedData)) |
| elog(ERROR, "Persistent store shared-memory not valid"); |
| #endif |
| |
| if (Debug_persistent_store_print) |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_ReplaceTuple: Going to update whole tuple at TID %s ('%s', shared data %p)", |
| ItemPointerToString(persistentTid), |
| storeData->tableName, |
| storeSharedData); |
| |
| persistentRel = (*storeData->openRel)(); |
| |
| /* |
| * In order to keep the tuples the exact same size to enable direct reuse of |
| * free tuples, we do not use NULLs. |
| */ |
| nulls = (bool*)palloc0(storeData->numAttributes * sizeof(bool)); |
| |
| /* |
| * Form the tuple. |
| */ |
| persistentTuple = heap_form_tuple(persistentRel->rd_att, values, nulls); |
| if (!HeapTupleIsValid(persistentTuple)) |
| elog(ERROR, "Failed to build persistent tuple ('%s')", |
| storeData->tableName); |
| |
| persistentTuple->t_self = *persistentTid; |
| |
| frozen_heap_inplace_update(persistentRel, persistentTuple); |
| |
| /* |
| * Return the XLOG location of the UPDATE tuple's XLOG record. |
| */ |
| xlogUpdateEndLoc = XLogLastInsertEndLoc(); |
| |
| heap_freetuple(persistentTuple); |
| |
| #ifdef FAULT_INJECTOR |
| if (FaultInjector_InjectFaultIfSet(SyncPersistentTable, |
| DDLNotSpecified, |
| "" /* databaseName */, |
| "" /* tableName */)== FaultInjectorTypeSkip) |
| { |
| FlushRelationBuffers(persistentRel); |
| smgrimmedsync(persistentRel->rd_smgr); |
| } |
| #endif |
| |
| (*storeData->closeRel)(persistentRel); |
| |
| if (Debug_persistent_store_print) |
| { |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_UpdateTuple: Updated whole tuple at TID %s ('%s')", |
| ItemPointerToString(persistentTid), |
| storeData->tableName); |
| |
| (*storeData->printTupleCallback)( |
| PersistentStore_DebugPrintLevel(), |
| "STORE UPDATED TUPLE", |
| persistentTid, |
| values); |
| } |
| |
| if (flushToXLog) |
| { |
| XLogFlush(xlogUpdateEndLoc); |
| XLogRecPtr_Zero(&nowaitXLogEndLoc); |
| } |
| else |
| nowaitXLogEndLoc = xlogUpdateEndLoc; |
| } |
| |
| void PersistentStore_ReplaceTuple( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData, |
| |
| ItemPointer persistentTid, |
| /* TID of the stored tuple. */ |
| |
| HeapTuple tuple, |
| |
| Datum *newValues, |
| |
| bool *replaces, |
| |
| bool flushToXLog) |
| /* When true, the XLOG record for this change will be flushed to disk. */ |
| |
| { |
| Relation persistentRel; |
| bool *nulls; |
| HeapTuple replacementTuple = NULL; |
| XLogRecPtr xlogUpdateEndLoc; |
| |
| #ifdef USE_ASSERT_CHECKING |
| if (storeSharedData == NULL || |
| !PersistentStoreSharedData_EyecatcherIsValid(storeSharedData)) |
| elog(ERROR, "Persistent store shared-memory not valid"); |
| #endif |
| |
| if (Debug_persistent_store_print) |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_ReplaceTuple: Going to replace set of columns in tuple at TID %s ('%s', shared data %p)", |
| ItemPointerToString(persistentTid), |
| storeData->tableName, |
| storeSharedData); |
| |
| persistentRel = (*storeData->openRel)(); |
| |
| /* |
| * In order to keep the tuples the exact same size to enable direct reuse of |
| * free tuples, we do not use NULLs. |
| */ |
| nulls = (bool*)palloc0(storeData->numAttributes * sizeof(bool)); |
| |
| /* |
| * Modify the tuple. |
| */ |
| replacementTuple = heap_modify_tuple(tuple, persistentRel->rd_att, |
| newValues, nulls, replaces); |
| |
| replacementTuple->t_self = *persistentTid; |
| |
| frozen_heap_inplace_update(persistentRel, replacementTuple); |
| |
| /* |
| * Return the XLOG location of the UPDATE tuple's XLOG record. |
| */ |
| xlogUpdateEndLoc = XLogLastInsertEndLoc(); |
| |
| heap_freetuple(replacementTuple); |
| pfree(nulls); |
| |
| if (Debug_persistent_store_print) |
| { |
| Datum *readValues; |
| bool *readNulls; |
| HeapTupleData readTuple; |
| Buffer buffer; |
| HeapTuple readTupleCopy; |
| |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_ReplaceTuple: Replaced set of columns in tuple at TID %s ('%s')", |
| ItemPointerToString(persistentTid), |
| storeData->tableName); |
| |
| readValues = (Datum*)palloc(storeData->numAttributes * sizeof(Datum)); |
| readNulls = (bool*)palloc(storeData->numAttributes * sizeof(bool)); |
| |
| readTuple.t_self = *persistentTid; |
| |
| if (!heap_fetch(persistentRel, SnapshotAny, |
| &readTuple, &buffer, false, NULL)) |
| { |
| elog(ERROR, "Failed to fetch persistent tuple at %s ('%s')", |
| ItemPointerToString(&readTuple.t_self), |
| storeData->tableName); |
| } |
| |
| |
| readTupleCopy = heaptuple_copy_to(&readTuple, NULL, NULL); |
| |
| ReleaseBuffer(buffer); |
| |
| heap_deform_tuple(readTupleCopy, persistentRel->rd_att, readValues, readNulls); |
| |
| (*storeData->printTupleCallback)( |
| PersistentStore_DebugPrintLevel(), |
| "STORE REPLACED TUPLE", |
| persistentTid, |
| readValues); |
| |
| heap_freetuple(readTupleCopy); |
| pfree(readValues); |
| pfree(readNulls); |
| } |
| |
| (*storeData->closeRel)(persistentRel); |
| |
| if (flushToXLog) |
| { |
| XLogFlush(xlogUpdateEndLoc); |
| XLogRecPtr_Zero(&nowaitXLogEndLoc); |
| } |
| else |
| nowaitXLogEndLoc = xlogUpdateEndLoc; |
| } |
| |
| |
| void PersistentStore_ReadTuple( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData, |
| |
| ItemPointer readTid, |
| |
| Datum *values, |
| |
| HeapTuple *tupleCopy) |
| { |
| Relation persistentRel; |
| |
| HeapTupleData tuple; |
| Buffer buffer; |
| |
| bool *nulls; |
| |
| #ifdef USE_ASSERT_CHECKING |
| if (storeSharedData == NULL || |
| !PersistentStoreSharedData_EyecatcherIsValid(storeSharedData)) |
| elog(ERROR, "Persistent store shared-memory not valid"); |
| #endif |
| |
| if (Debug_persistent_store_print) |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_ReadTuple: Going to read tuple at TID %s ('%s', shared data %p)", |
| ItemPointerToString(readTid), |
| storeData->tableName, |
| storeSharedData); |
| |
| if (PersistentStore_IsZeroTid(readTid)) |
| elog(ERROR, "TID for fetch persistent tuple is invalid (0,0) ('%s')", |
| storeData->tableName); |
| |
| // UNDONE: I think the InRecovery test only applies to physical Master Mirroring on Standby. |
| /* Only test this outside of recovery scenarios */ |
| if (!InRecovery |
| && |
| (PersistentStore_IsZeroTid(&storeSharedData->maxTid) |
| || |
| ItemPointerCompare( |
| readTid, |
| &storeSharedData->maxTid) == 1 // Greater-than. |
| )) |
| { |
| elog(ERROR, "TID %s for fetch persistent tuple is greater than the last known TID %s ('%s')", |
| ItemPointerToString(readTid), |
| ItemPointerToString2(&storeSharedData->maxTid), |
| storeData->tableName); |
| } |
| |
| persistentRel = (*storeData->openRel)(); |
| |
| tuple.t_self = *readTid; |
| |
| if (!heap_fetch(persistentRel, SnapshotAny, |
| &tuple, &buffer, false, NULL)) |
| { |
| elog(ERROR, "Failed to fetch persistent tuple at %s (maximum known TID %s, '%s')", |
| ItemPointerToString(&tuple.t_self), |
| ItemPointerToString2(&storeSharedData->maxTid), |
| storeData->tableName); |
| } |
| |
| |
| *tupleCopy = heaptuple_copy_to(&tuple, NULL, NULL); |
| |
| ReleaseBuffer(buffer); |
| |
| /* |
| * In order to keep the tuples the exact same size to enable direct reuse of |
| * free tuples, we do not use NULLs. |
| */ |
| nulls = (bool*)palloc(storeData->numAttributes * sizeof(bool)); |
| |
| heap_deform_tuple(*tupleCopy, persistentRel->rd_att, values, nulls); |
| |
| (*storeData->closeRel)(persistentRel); |
| |
| if (Debug_persistent_store_print) |
| { |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_ReadTuple: Successfully read tuple at TID %s ('%s')", |
| ItemPointerToString(readTid), |
| storeData->tableName); |
| |
| (*storeData->printTupleCallback)( |
| PersistentStore_DebugPrintLevel(), |
| "STORE READ TUPLE", |
| readTid, |
| values); |
| } |
| |
| pfree(nulls); |
| } |
| |
| |
| static bool PersistentStore_GetFreeTuple( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData, |
| |
| ItemPointer freeTid) |
| { |
| Datum *values; |
| HeapTuple tupleCopy; |
| |
| int64 persistentSerialNum; |
| ItemPointerData previousFreeTid; |
| |
| MemSet(freeTid, 0, sizeof(ItemPointerData)); |
| |
| if (Debug_persistent_store_print) |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_GetFreeTuple: Enter: maximum free order number " INT64_FORMAT ", free TID %s ('%s')", |
| storeSharedData->maxFreeOrderNum, |
| ItemPointerToString(&storeSharedData->freeTid), |
| storeData->tableName); |
| |
| if (storeSharedData->maxFreeOrderNum == 0) |
| { |
| return false; // No free tuples. |
| } |
| |
| if (gp_persistent_skip_free_list) |
| { |
| if (Debug_persistent_store_print) |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_GetFreeTuple: Skipping because gp_persistent_skip_free_list GUC is ON ('%s')", |
| storeData->tableName); |
| return false; // Pretend no free tuples. |
| } |
| |
| Assert(storeSharedData->freeTid.ip_posid != 0); |
| |
| /* |
| * Read the current last free tuple. |
| */ |
| values = (Datum*)palloc(storeData->numAttributes * sizeof(Datum)); |
| |
| PersistentStore_ReadTuple( |
| storeData, |
| storeSharedData, |
| &storeSharedData->freeTid, |
| values, |
| &tupleCopy); |
| |
| PersistentStore_ExtractOurTupleData( |
| storeData, |
| values, |
| &persistentSerialNum, |
| &previousFreeTid); |
| |
| if (PersistentStore_IsZeroTid(&previousFreeTid)) |
| elog(ERROR, "Expected persistent store tuple at %s to be free ('%s')", |
| ItemPointerToString(&storeSharedData->freeTid), |
| storeData->tableName); |
| |
| if (storeSharedData->maxFreeOrderNum == 1) |
| Assert(ItemPointerCompare(&previousFreeTid, &storeSharedData->freeTid) == 0); |
| |
| if (persistentSerialNum != storeSharedData->maxFreeOrderNum) |
| elog(ERROR, "Expected persistent store tuple at %s to have order number " INT64_FORMAT " (found " INT64_FORMAT ", '%s')", |
| ItemPointerToString(&storeSharedData->freeTid), |
| storeSharedData->maxFreeOrderNum, |
| persistentSerialNum, |
| storeData->tableName); |
| |
| *freeTid = storeSharedData->freeTid; |
| storeSharedData->maxFreeOrderNum--; |
| storeSharedData->freeTid = previousFreeTid; |
| |
| pfree(values); |
| |
| heap_freetuple(tupleCopy); |
| |
| if (Debug_persistent_store_print) |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_GetFreeTuple: Exit: maximum free order number " INT64_FORMAT ", free TID %s ('%s')", |
| storeSharedData->maxFreeOrderNum, |
| ItemPointerToString(&storeSharedData->freeTid), |
| storeData->tableName); |
| |
| return true; |
| } |
| |
| void PersistentStore_AddTuple( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData, |
| |
| Datum *values, |
| |
| bool flushToXLog, |
| /* When true, the XLOG record for this change will be flushed to disk. */ |
| |
| ItemPointer persistentTid, |
| /* TID of the stored tuple. */ |
| |
| int64 *persistentSerialNum) |
| |
| { |
| #ifdef USE_ASSERT_CHECKING |
| if (storeSharedData == NULL || |
| !PersistentStoreSharedData_EyecatcherIsValid(storeSharedData)) |
| elog(ERROR, "Persistent store shared-memory not valid"); |
| #endif |
| |
| if (Debug_persistent_store_print) |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_AddTuple: Going to add tuple ('%s', shared data %p)", |
| storeData->tableName, |
| storeSharedData); |
| /* allocate new sequence numbers if cached sequence numbers are exhausted */ |
| if (storeSharedData->maxInUseSerialNum >= storeSharedData->maxCachedSerialNum) |
| { |
| storeSharedData->maxCachedSerialNum += |
| PersistentStoreSharedDataMaxCachedSerialNum; |
| GlobalSequence_Set( |
| storeData->gpGlobalSequence, |
| storeSharedData->maxCachedSerialNum); |
| } |
| |
| *persistentSerialNum = ++storeSharedData->maxInUseSerialNum; |
| storeData->myHighestSerialNum = storeSharedData->maxInUseSerialNum; |
| |
| // Overwrite with the new serial number value. |
| values[storeData->attNumPersistentSerialNum - 1] = |
| Int64GetDatum(*persistentSerialNum); |
| |
| if (PersistentStore_GetFreeTuple( |
| storeData, |
| storeSharedData, |
| persistentTid)) |
| { |
| Assert(persistentTid->ip_posid != 0); |
| PersistentStore_UpdateTuple( |
| storeData, |
| storeSharedData, |
| persistentTid, |
| values, |
| flushToXLog); |
| } |
| else |
| { |
| /* |
| * Add new tuple. |
| */ |
| |
| PersistentStore_InsertTuple( |
| storeData, |
| storeSharedData, |
| values, |
| flushToXLog, |
| persistentTid); |
| Assert(persistentTid->ip_posid != 0); |
| } |
| |
| storeSharedData->inUseCount++; |
| |
| if (Debug_persistent_store_print) |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_AddTuple: Added tuple ('%s', in use count " INT64_FORMAT ", shared data %p)", |
| storeData->tableName, |
| storeSharedData->inUseCount, |
| storeSharedData); |
| } |
| |
| void PersistentStore_FreeTuple( |
| PersistentStoreData *storeData, |
| |
| PersistentStoreSharedData *storeSharedData, |
| |
| ItemPointer persistentTid, |
| /* TID of the stored tuple. */ |
| |
| Datum *freeValues, |
| |
| bool flushToXLog) |
| /* When true, the XLOG record for this change will be flushed to disk. */ |
| |
| { |
| Relation persistentRel; |
| HeapTuple persistentTuple = NULL; |
| ItemPointerData prevFreeTid; |
| XLogRecPtr xlogEndLoc; |
| /* The end location of the UPDATE XLOG record. */ |
| |
| #ifdef USE_ASSERT_CHECKING |
| if (storeSharedData == NULL || |
| !PersistentStoreSharedData_EyecatcherIsValid(storeSharedData)) |
| elog(ERROR, "Persistent store shared-memory not valid"); |
| #endif |
| |
| if (Debug_persistent_store_print) |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_FreeTuple: Going to free tuple at TID %s ('%s', shared data %p)", |
| ItemPointerToString(persistentTid), |
| storeData->tableName, |
| storeSharedData); |
| |
| Assert(persistentTid->ip_posid != 0); |
| |
| persistentRel = (*storeData->openRel)(); |
| |
| storeSharedData->maxFreeOrderNum++; |
| if (storeSharedData->maxFreeOrderNum == 1) |
| prevFreeTid = *persistentTid; // So non-zero PreviousFreeTid indicates free. |
| else |
| prevFreeTid = storeSharedData->freeTid; |
| storeSharedData->freeTid = *persistentTid; |
| |
| PersistentStore_FormTupleSetOurs( |
| storeData, |
| persistentRel->rd_att, |
| freeValues, |
| storeSharedData->maxFreeOrderNum, |
| &prevFreeTid, |
| &persistentTuple); |
| |
| persistentTuple->t_self = *persistentTid; |
| |
| frozen_heap_inplace_update(persistentRel, persistentTuple); |
| |
| /* |
| * XLOG location of the UPDATE tuple's XLOG record. |
| */ |
| xlogEndLoc = XLogLastInsertEndLoc(); |
| |
| heap_freetuple(persistentTuple); |
| |
| (*storeData->closeRel)(persistentRel); |
| |
| storeSharedData->inUseCount--; |
| |
| if (Debug_persistent_store_print) |
| elog(PersistentStore_DebugPrintLevel(), |
| "PersistentStore_FreeTuple: Freed tuple at TID %s. Maximum free order number " INT64_FORMAT ", in use count " INT64_FORMAT " ('%s')", |
| ItemPointerToString(&storeSharedData->freeTid), |
| storeSharedData->maxFreeOrderNum, |
| storeSharedData->inUseCount, |
| storeData->tableName); |
| |
| if (flushToXLog) |
| { |
| XLogFlush(xlogEndLoc); |
| XLogRecPtr_Zero(&nowaitXLogEndLoc); |
| } |
| else |
| nowaitXLogEndLoc = xlogEndLoc; |
| } |