| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * |
| * cdbappendonlystoragewrite.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| #ifndef WIN32 |
| #include <sys/fcntl.h> |
| #else |
| #include <io.h> |
| #endif |
| #include <sys/file.h> |
| #include <unistd.h> |
| |
| #include "catalog/heap.h" |
| #include "catalog/pg_compression.h" |
| #include "commands/tablespace.h" |
| #include "cdb/cdbappendonlyam.h" |
| #include "cdb/cdbappendonlystorageread.h" |
| #include "cdb/cdbappendonlystorage.h" |
| #include "cdb/cdbappendonlystoragelayer.h" |
| #include "cdb/cdbappendonlystorageformat.h" |
| #include "cdb/cdbappendonlystoragewrite.h" |
| #include "cdb/cdbmirroredfilesysobj.h" |
| #include "cdb/cdbpersistentfilesysobj.h" |
| #include "utils/guc.h" |
| |
| |
| // ----------------------------------------------------------------------------- |
| // Initialization |
| // ----------------------------------------------------------------------------- |
| |
| /* |
| * Initialize AppendOnlyStorageWrite. |
| * |
| * The AppendOnlyStorageWrite data structure is initialized |
| * once for a append session and can be used to add |
| * Append-Only Storage Blocks to 1 or more segment files. |
| * |
| * The current file to write to is opened with the |
| * AppendOnlyStorageWrite_OpenFile routine. |
| */ |
| void AppendOnlyStorageWrite_Init( |
| AppendOnlyStorageWrite *storageWrite, |
| /* The data structure to initialize. */ |
| |
| MemoryContext memoryContext, |
| /* |
| * The memory context to use for buffers and |
| * other memory needs. When NULL, the |
| * current memory context is used. |
| */ |
| int32 maxBufferLen, |
| /* |
| * The maximum Append-Only Storage Block |
| * length including all storage headers. |
| */ |
| char *relationName, |
| /* |
| * Name of the relation to use in system |
| * logging and error messages. |
| */ |
| |
| char *title, |
| /* |
| * A phrase that better describes the purpose of the this open. |
| * |
| * The caller manages the storage for this. |
| */ |
| |
| AppendOnlyStorageAttributes *storageAttributes) |
| /* |
| * The Append-Only Storage Attributes |
| * from relation creation. |
| */ |
| { |
| int relationNameLen; |
| uint8 *memory; |
| int32 memoryLen; |
| MemoryContext oldMemoryContext; |
| |
| Assert(storageWrite != NULL); |
| |
| // UNDONE: Range check maxBufferLen |
| |
| Assert(relationName != NULL); |
| Assert(storageAttributes != NULL); |
| |
| // UNDONE: Range check fields in storageAttributes |
| |
| MemSet(storageWrite, 0, sizeof(AppendOnlyStorageWrite)); |
| |
| storageWrite->maxBufferLen = maxBufferLen; |
| |
| if (memoryContext == NULL) |
| storageWrite->memoryContext = CurrentMemoryContext; |
| else |
| storageWrite->memoryContext = memoryContext; |
| |
| oldMemoryContext = MemoryContextSwitchTo(storageWrite->memoryContext); |
| |
| memcpy( |
| &storageWrite->storageAttributes, |
| storageAttributes, |
| sizeof(AppendOnlyStorageAttributes)); |
| |
| /* |
| * Determine the fixed header length based on the checksum flag. |
| */ |
| storageWrite->regularHeaderLen = AoHeader_RegularSize; |
| if (storageWrite->storageAttributes.checksum) |
| storageWrite->regularHeaderLen += 2 * sizeof(pg_crc32); |
| |
| relationNameLen = strlen(relationName); |
| storageWrite->relationName = (char *) palloc(relationNameLen + 1); |
| memcpy(storageWrite->relationName, relationName, relationNameLen + 1); |
| |
| storageWrite->title = title; |
| |
| /* |
| * Set up extra buffers for compression. |
| */ |
| storageWrite->compressionOverrunLen = storageAttributes->overflowSize; |
| |
| elog(DEBUG2, "Requested compression overflow bytes = %d.", storageAttributes->overflowSize); |
| |
| if (storageWrite->storageAttributes.compress) |
| { |
| storageWrite->uncompressedBuffer = (uint8 *) palloc(storageWrite->maxBufferLen * sizeof(uint8)); |
| } |
| else |
| { |
| Assert(storageWrite->uncompressedBuffer == NULL); |
| } |
| |
| /* |
| * Now that we have determined the compression overrun, we can now |
| * initialize BufferedAppend with the correct maxBufferLen + compressionOverrunLen. |
| */ |
| storageWrite->maxBufferWithCompressionOverrrunLen = |
| storageWrite->maxBufferLen + storageWrite->compressionOverrunLen; |
| storageWrite->largeWriteLen = 2 * storageWrite->maxBufferLen; |
| Assert(storageWrite->maxBufferWithCompressionOverrrunLen <= storageWrite->largeWriteLen); |
| |
| memoryLen = |
| BufferedAppendMemoryLen( |
| /* maxBufferLen */ storageWrite->maxBufferWithCompressionOverrrunLen, |
| storageWrite->largeWriteLen); |
| |
| memory = (uint8*)palloc(memoryLen); |
| |
| BufferedAppendInit(&storageWrite->bufferedAppend, |
| memory, |
| memoryLen, |
| /* maxBufferLen */ storageWrite->maxBufferWithCompressionOverrrunLen, |
| storageWrite->largeWriteLen, |
| relationName); |
| |
| if (Debug_appendonly_print_insert || Debug_appendonly_print_append_block) |
| elog(LOG,"Append-Only Storage Write initialize for table '%s' " |
| "(compression = %s, compression level %d, maximum buffer length %d, large write length %d)", |
| storageWrite->relationName, |
| (storageWrite->storageAttributes.compress ? "true" : "false"), |
| storageWrite->storageAttributes.compressLevel, |
| storageWrite->maxBufferWithCompressionOverrrunLen, |
| storageWrite->largeWriteLen); |
| |
| /* |
| * When doing VerifyBlock, allocate the extra buffers. |
| */ |
| if (gp_appendonly_verify_write_block && storageWrite->storageAttributes.compress) |
| { |
| storageWrite->verifyWriteBuffer = (uint8 *) palloc(storageWrite->maxBufferLen * sizeof(uint8)); |
| } |
| else |
| { |
| Assert(storageWrite->verifyWriteBuffer == NULL); |
| } |
| |
| storageWrite->file = -1; |
| |
| MemoryContextSwitchTo(oldMemoryContext); |
| |
| storageWrite->isActive = true; |
| |
| storageWrite->bufferedAppend.mirroredOpen.isActive = FALSE; |
| storageWrite->bufferedAppend.mirroredOpen.segmentFileNum = 0; |
| storageWrite->bufferedAppend.mirroredOpen.primaryFile = -1; |
| |
| |
| |
| } |
| |
| |
| /* |
| * Return (read-only) pointer to relation name. |
| */ |
| char *AppendOnlyStorageWrite_RelationName( |
| AppendOnlyStorageWrite *storageWrite) |
| { |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| return storageWrite->relationName; |
| } |
| |
| |
| /* |
| * Finish using the AppendOnlyStorageWrite session created with ~Init. |
| */ |
| void AppendOnlyStorageWrite_FinishSession( |
| AppendOnlyStorageWrite *storageWrite) |
| /* The data structure to finish. */ |
| { |
| MemoryContext oldMemoryContext; |
| |
| if(!storageWrite->isActive) |
| return; |
| |
| oldMemoryContext = MemoryContextSwitchTo(storageWrite->memoryContext); |
| |
| // UNDONE: This expects the MemoryContent to be what was used for the 'memory' in ~Init |
| BufferedAppendFinish(&storageWrite->bufferedAppend); |
| |
| if (storageWrite->relationName != NULL) |
| { |
| pfree(storageWrite->relationName); |
| storageWrite->relationName = NULL; |
| } |
| |
| if (storageWrite->uncompressedBuffer != NULL) |
| { |
| pfree(storageWrite->uncompressedBuffer); |
| storageWrite->uncompressedBuffer = NULL; |
| } |
| |
| if (storageWrite->verifyWriteBuffer != NULL) |
| { |
| pfree(storageWrite->verifyWriteBuffer); |
| storageWrite->verifyWriteBuffer = NULL; |
| } |
| |
| if (storageWrite->segmentFileName != NULL) |
| { |
| pfree(storageWrite->segmentFileName); |
| storageWrite->segmentFileName = NULL; |
| } |
| |
| if (storageWrite->compression_functions != NULL) |
| { |
| callCompressionDestructor(storageWrite->compression_functions[COMPRESSION_DESTRUCTOR], storageWrite->compressionState); |
| pfree(storageWrite->compressionState); |
| if (storageWrite->verifyWriteCompressionState != NULL) |
| { |
| callCompressionDestructor(storageWrite->compression_functions[COMPRESSION_DESTRUCTOR], storageWrite->verifyWriteCompressionState); |
| } |
| } |
| |
| /* Deallocation is done. Go back to caller memory-context. */ |
| MemoryContextSwitchTo(oldMemoryContext); |
| |
| storageWrite->isActive = false; |
| |
| } |
| |
| // ----------------------------------------------------------------------------- |
| // Open and FlushAndClose |
| // ----------------------------------------------------------------------------- |
| |
| /* |
| * Creates an on-demand Append-Only segment file under transaction. |
| */ |
| void AppendOnlyStorageWrite_TransactionCreateFile( |
| char *relname, |
| |
| int64 logicalEof, |
| /* |
| * The last committed write transaction's EOF |
| * value to use as the end of the segment |
| * file. |
| * |
| * If the EOF is 0, we will create the file |
| * if necessary. Otherwise, it must already |
| * exist. |
| */ |
| RelFileNode *relFileNode, |
| int32 segmentFileNum, |
| ItemPointer persistentTid, |
| int64 *persistentSerialNum) |
| { |
| Relation gp_relfile_node; |
| |
| |
| Assert (segmentFileNum > 0); |
| Assert (logicalEof == 0); |
| |
| /* |
| * We may or may not have a gp_relfile_node entry when the EOF is 0. |
| */ |
| if (ReadGpRelfileNode( |
| relFileNode->relNode, |
| segmentFileNum, |
| persistentTid, |
| persistentSerialNum)) |
| { |
| // UNDONE: Verify the gp_persistent_relation_node Append-Only EOFs are zero. |
| return; |
| } |
| |
| MirroredFileSysObj_TransactionCreateAppendOnlyFile( |
| relFileNode, |
| segmentFileNum, |
| relname, |
| /* doJustInTimeDirCreate */ false, |
| persistentTid, |
| persistentSerialNum); |
| |
| gp_relfile_node = heap_open(GpRelfileNodeRelationId, RowExclusiveLock); |
| |
| InsertGpRelfileNodeTuple( |
| gp_relfile_node, |
| /* relationId */ 0, // UNDONE: Don't have this value here -- currently only used for tracing... |
| relname, |
| relFileNode->relNode, |
| segmentFileNum, |
| /* updateIndex */ true, |
| persistentTid, |
| *persistentSerialNum); |
| |
| heap_close(gp_relfile_node, RowExclusiveLock); |
| } |
| |
| /* |
| * Opens the next segment file to write. The file must already exist. |
| * |
| * This routine is responsible for seeking to the proper |
| * write location given the logical EOF. |
| */ |
| void AppendOnlyStorageWrite_OpenFile( |
| AppendOnlyStorageWrite *storageWrite, |
| |
| char *filePathName, |
| /* The name of the segment file to open. */ |
| |
| int64 logicalEof, |
| /* |
| * The last committed write transaction's EOF |
| * value to use as the end of the segment |
| * file. |
| */ |
| int64 fileLen_uncompressed, |
| |
| RelFileNode *relFileNode, |
| int32 segmentFileNum |
| /*ItemPointer persistentTid, |
| int64 persistentSerialNum*/) |
| { |
| int primaryError; |
| |
| File file; |
| |
| int64 seekResult; |
| |
| MemoryContext oldMemoryContext; |
| |
| int segmentFileNameLen; |
| |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| Assert(filePathName != NULL); |
| |
| /* |
| * Open or create the file for write. |
| */ |
| MirroredAppendOnly_OpenReadWrite( |
| &storageWrite->bufferedAppend.mirroredOpen, |
| relFileNode, |
| segmentFileNum, |
| storageWrite->relationName, |
| logicalEof, |
| false, |
| &primaryError); |
| if (primaryError != 0) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("Append-only Storage Write could not open segment file '%s' for relation '%s': %s" |
| , filePathName, storageWrite->relationName, strerror(primaryError)), |
| errdetail("%s", HdfsGetLastError()))); |
| |
| file = storageWrite->bufferedAppend.mirroredOpen.primaryFile; |
| |
| seekResult = FileNonVirtualTell(file); |
| |
| if (seekResult < 0) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("Append-only Storage Tell error on segment file '%s' for relation '%s'. FileSeek offset = " INT64_FORMAT ". Error code = %d (%s)", |
| filePathName, storageWrite->relationName, logicalEof, (int)errno, strerror(errno)), |
| errdetail("%s", HdfsGetLastError()))); |
| } |
| |
| if (seekResult > logicalEof) |
| { |
| elog(LOG, "Truncate file %s for table %s to length " INT64_FORMAT, |
| filePathName, storageWrite->relationName, logicalEof); |
| /* |
| * previous transaction is aborted |
| * truncate file |
| */ |
| if (FileTruncate(file, logicalEof)) |
| { |
| char * msg = pstrdup(HdfsGetLastError()); |
| int errorno = errno; |
| |
| MirroredAppendOnly_Close(&storageWrite->bufferedAppend.mirroredOpen); |
| |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("Append-only Storage truncate error on segment file '%s' to position " INT64_FORMAT " for relation '%s' Error code = %d (%s)", |
| filePathName, logicalEof, storageWrite->relationName, (int)errorno, strerror(errorno)), |
| errdetail("%s", msg))); |
| |
| } |
| } |
| /* |
| * file length is less then the logic eof, we lost some data |
| */ |
| else if (seekResult < logicalEof) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("Table %s, file %s length " INT64_FORMAT " is less then logic EOF recorded in metadata " INT64_FORMAT, |
| storageWrite->relationName, filePathName, seekResult, logicalEof))); |
| } |
| |
| storageWrite->file = file; |
| storageWrite->startEof = logicalEof; |
| storageWrite->relFileNode = *relFileNode; |
| storageWrite->segmentFileNum = segmentFileNum; |
| |
| /* |
| * When writing multiple segment files, we throw away the old segment file name strings. |
| */ |
| oldMemoryContext = MemoryContextSwitchTo(storageWrite->memoryContext); |
| |
| if (storageWrite->segmentFileName != NULL) |
| pfree(storageWrite->segmentFileName); |
| |
| segmentFileNameLen = strlen(filePathName); |
| storageWrite->segmentFileName = (char *) palloc(segmentFileNameLen + 1); |
| memcpy(storageWrite->segmentFileName, filePathName, segmentFileNameLen + 1); |
| |
| /* Allocation is done. Go back to caller memory-context. */ |
| MemoryContextSwitchTo(oldMemoryContext); |
| |
| /* |
| * Tell the BufferedAppend module about the file we just opened. |
| */ |
| BufferedAppendSetFile(&storageWrite->bufferedAppend, |
| storageWrite->file, |
| storageWrite->segmentFileName, |
| logicalEof, |
| fileLen_uncompressed); |
| |
| } |
| |
| /* |
| * Optionally pad out to next page boundary. |
| * |
| * Since we do not do typical recovery processing of append-only file-system |
| * pages, we pad out the last file-system byte with zeroes. The number of bytes |
| * that are padded with zero's is determined by safefswritesize. |
| * This function pads with 0's of length padLen or pads the whole remainder |
| * of the safefswritesize size with 0's if padLen is -1. |
| */ |
| static void |
| AppendOnlyStorageWrite_DoPadOutRemainder( |
| AppendOnlyStorageWrite *storageWrite, |
| int32 padLen) |
| { |
| int64 nextWritePosition; |
| int64 nextBoundaryPosition; |
| int32 safeWrite = storageWrite->storageAttributes.safeFSWriteSize; |
| int32 safeWriteRemainder; |
| bool doPad; |
| uint8 *buffer; |
| |
| /* early exit if no pad needed */ |
| if(safeWrite == 0) |
| return; |
| |
| nextWritePosition = BufferedAppendNextBufferPosition(&storageWrite->bufferedAppend); |
| nextBoundaryPosition = |
| ((nextWritePosition + safeWrite - 1)/safeWrite)*safeWrite; |
| safeWriteRemainder = (int32)(nextBoundaryPosition - nextWritePosition); |
| |
| if (safeWriteRemainder <= 0) |
| doPad = false; |
| else if (padLen == -1) |
| { |
| /* |
| * Pad to end of page. |
| */ |
| doPad = true; |
| padLen = safeWriteRemainder; |
| } |
| else |
| doPad = (safeWriteRemainder < padLen); |
| |
| if (doPad) |
| { |
| /* |
| * Get buffer of the remainder to pad. |
| */ |
| buffer = BufferedAppendGetBuffer(&storageWrite->bufferedAppend, |
| safeWriteRemainder); |
| |
| if (buffer == NULL) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("We do not expect files to be have a maximum length"))); |
| } |
| |
| memset(buffer, 0, safeWriteRemainder); |
| BufferedAppendFinishBuffer(&storageWrite->bufferedAppend, |
| safeWriteRemainder, |
| safeWriteRemainder); |
| |
| if (Debug_appendonly_print_insert) |
| elog(LOG,"Append-only insert zero padded safeWriteRemainder for table '%s' (nextWritePosition = " INT64_FORMAT ", safeWriteRemainder = %d)", |
| storageWrite->relationName, |
| nextWritePosition, |
| safeWriteRemainder); |
| |
| } |
| } |
| |
| /* |
| * Flush and close the current segment file. |
| * |
| * No error if the current is already closed. |
| */ |
| void AppendOnlyStorageWrite_FlushAndCloseFile( |
| AppendOnlyStorageWrite *storageWrite, |
| |
| int64 *newLogicalEof, |
| /* The new EOF for the segment file. */ |
| |
| int64 *fileLen_uncompressed) |
| { |
| int primaryError; |
| |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| if (storageWrite->file == -1) |
| { |
| *newLogicalEof = 0; |
| *fileLen_uncompressed = 0; |
| return; |
| } |
| |
| /* |
| * We pad out append commands to the page boundary. |
| */ |
| AppendOnlyStorageWrite_DoPadOutRemainder( |
| storageWrite, |
| /* indicate till end of page */ -1); |
| |
| /* |
| * Have the BufferedAppend module let go, but this does not close the file. |
| */ |
| BufferedAppendCompleteFile(&storageWrite->bufferedAppend, |
| newLogicalEof, |
| fileLen_uncompressed); |
| |
| /* |
| * We must take care of fsynching to disk ourselves since |
| * the fd API won't do it for us. |
| */ |
| |
| MirroredAppendOnly_FlushAndClose( |
| &storageWrite->bufferedAppend.mirroredOpen, |
| &primaryError); |
| if (primaryError != 0) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("Could not flush (fsync) Append-Only segment file '%s' to disk for relation '%s': %s" |
| , storageWrite->segmentFileName, storageWrite->relationName, strerror(primaryError)), |
| errdetail("%s", HdfsGetLastError()))); |
| |
| storageWrite->file = -1; |
| |
| MemSet(&storageWrite->relFileNode, 0, sizeof(RelFileNode)); |
| storageWrite->segmentFileNum = 0; |
| } |
| |
| /* |
| * Flush and close the current segment file under a transaction. |
| * |
| * Handles mirror loss end transaction work. |
| * |
| * No error if the current is already closed. |
| */ |
| void AppendOnlyStorageWrite_TransactionFlushAndCloseFile( |
| AppendOnlyStorageWrite *storageWrite, |
| |
| int64 *newLogicalEof, |
| /* The new EOF for the segment file. */ |
| |
| int64 *fileLen_uncompressed) |
| { |
| RelFileNode relFileNode; |
| int32 segmentFileNum; |
| |
| int64 startEof; |
| |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| if (storageWrite->file == -1) |
| { |
| *newLogicalEof = 0; |
| *fileLen_uncompressed = 0; |
| return; |
| } |
| |
| relFileNode = storageWrite->relFileNode; |
| segmentFileNum = storageWrite->segmentFileNum; |
| |
| startEof = storageWrite->startEof; |
| |
| AppendOnlyStorageWrite_FlushAndCloseFile( |
| storageWrite, |
| newLogicalEof, |
| fileLen_uncompressed); |
| } |
| |
| // ----------------------------------------------------------------------------- |
| // Usable Block Length |
| // ----------------------------------------------------------------------------- |
| |
| /* |
| * Returns the Append-Only Storage Block fixed header length in bytes. |
| */ |
| int32 AppendOnlyStorageWrite_FixedHeaderLen( |
| AppendOnlyStorageWrite *storageWrite) |
| { |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| return storageWrite->regularHeaderLen; |
| } |
| |
| /* |
| * Returns the Append-Only Storage Block complete header length in bytes. |
| * |
| * Call this routine after specifying all optional header information for the current block |
| * begin written. |
| */ |
| int32 AppendOnlyStorageWrite_CompleteHeaderLen( |
| AppendOnlyStorageWrite *storageWrite, |
| AoHeaderKind aoHeaderKind) |
| { |
| int32 completeHeaderLen; |
| |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| completeHeaderLen = storageWrite->regularHeaderLen; |
| if (AoHeader_IsLong(aoHeaderKind)) |
| { |
| completeHeaderLen += (AoHeader_LongSize - AoHeader_RegularSize); |
| } |
| |
| return completeHeaderLen; |
| } |
| |
| /* |
| * Returns the Append-Only Storage large content metadata header length in bytes. |
| * |
| * Call this routine after specifying all optional header information for the current block |
| * begin written. |
| */ |
| static int32 AppendOnlyStorageWrite_LargeContentHeaderLen( |
| AppendOnlyStorageWrite *storageWrite) |
| { |
| int32 completeHeaderLen; |
| |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| completeHeaderLen = storageWrite->regularHeaderLen; |
| |
| // UNDONE: Right alignment? |
| |
| return completeHeaderLen; |
| } |
| |
| char *AppendOnlyStorageWrite_ContextStr( |
| AppendOnlyStorageWrite *storageWrite) |
| { |
| int64 headerOffsetInFile = |
| BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend); |
| |
| StringInfoData buf; |
| |
| initStringInfo(&buf); |
| appendStringInfo( |
| &buf, |
| "%s. Append-Only segment file '%s', header offset in file " INT64_FORMAT, |
| storageWrite->title, |
| storageWrite->segmentFileName, |
| headerOffsetInFile); |
| |
| return buf.data; |
| } |
| |
| static char *AppendOnlyStorageWrite_BlockHeaderStr( |
| AppendOnlyStorageWrite *storageWrite, |
| uint8 *header) |
| { |
| return AppendOnlyStorageFormat_BlockHeaderStr( |
| header, |
| storageWrite->storageAttributes.checksum, |
| storageWrite->storageAttributes.version); |
| } |
| |
| |
| static void AppendOnlyStorageWrite_LogBlockHeader( |
| AppendOnlyStorageWrite *storageWrite, |
| int64 headerOffsetInFile, |
| uint8 *header) |
| { |
| char *blockHeaderStr; |
| |
| blockHeaderStr = AppendOnlyStorageWrite_BlockHeaderStr( |
| storageWrite, |
| header); |
| ereport(LOG, |
| (errmsg("%s. Append-Only segment file '%s', header offset in file " INT64_FORMAT ". %s", |
| storageWrite->title, |
| storageWrite->segmentFileName, |
| headerOffsetInFile, |
| blockHeaderStr))); |
| |
| pfree(blockHeaderStr); |
| } |
| |
| // ----------------------------------------------------------------------------- |
| // errcontext and errdetail |
| // ----------------------------------------------------------------------------- |
| |
| /* |
| * errcontext_appendonly_write_storage_block |
| * |
| * Add an errcontext() line showing the table, segment file, offset in file, block count of |
| * the storage block being read. |
| */ |
| static int |
| errcontext_appendonly_write_storage_block(AppendOnlyStorageWrite *storageWrite) |
| { |
| int64 headerOffsetInFile; |
| |
| headerOffsetInFile = BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend); |
| |
| errcontext( |
| "Append-Only table '%s', segment file '%s', block header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT ")", |
| storageWrite->relationName, |
| storageWrite->segmentFileName, |
| headerOffsetInFile, |
| storageWrite->bufferCount); |
| |
| return 0; |
| } |
| |
| /* |
| * errdetail_appendonly_write_storage_block_header |
| * |
| * Add an errdetail() line showing the Append-Only Storage header being written. |
| */ |
| static int |
| errdetail_appendonly_write_storage_block_header(AppendOnlyStorageWrite *storageWrite) |
| { |
| uint8 *header; |
| |
| bool checksum; |
| |
| header = BufferedAppendGetCurrentBuffer(&storageWrite->bufferedAppend); |
| |
| checksum = storageWrite->storageAttributes.checksum; |
| |
| return errdetail_appendonly_storage_smallcontent_header(header, checksum, storageWrite->storageAttributes.version); |
| } |
| |
| // ----------------------------------------------------------------------------- |
| // Writing Small Content Efficiently that is not being Bulk Compressed |
| // ----------------------------------------------------------------------------- |
| |
| /* |
| * Get a pointer to next maximum length buffer space for |
| * appending small content. |
| * |
| * You must decide whether you are supplying the optional first row number BEFORE |
| * calling this routine! |
| * |
| * NOTE: The maximum length buffer space = |
| * maxBufferLen + |
| * AppendOnlyStorageWrite_CompleteHeaderLen(...) |
| * |
| * When compression is not being used, this interface provides a pointer directly into the |
| * write buffer for efficient data generation. Otherwise, a pointer to a temporary buffer |
| * to collect the uncompressed contents will be provided. |
| * |
| * Returns NULL when the current file does not have enough |
| * room for another buffer. |
| */ |
| uint8 *AppendOnlyStorageWrite_GetBuffer( |
| AppendOnlyStorageWrite *storageWrite, |
| int aoHeaderKind) |
| { |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| Assert(aoHeaderKind == AoHeaderKind_SmallContent || |
| aoHeaderKind == AoHeaderKind_NonBulkDenseContent || |
| aoHeaderKind == AoHeaderKind_BulkDenseContent); |
| |
| storageWrite->getBufferAoHeaderKind = aoHeaderKind; |
| |
| /* |
| * Both headers (Small and NonBulkDense) have the same length. |
| * BulkDense is a long header. |
| */ |
| storageWrite->currentCompleteHeaderLen = |
| AppendOnlyStorageWrite_CompleteHeaderLen( |
| storageWrite, |
| aoHeaderKind); |
| |
| |
| /* |
| * If compression configured, the supply the temporary buffer instead. |
| */ |
| if (storageWrite->storageAttributes.compress) |
| { |
| storageWrite->currentBuffer = NULL; |
| |
| return storageWrite->uncompressedBuffer; |
| } |
| else |
| { |
| storageWrite->currentBuffer = |
| BufferedAppendGetMaxBuffer(&storageWrite->bufferedAppend); |
| |
| return &storageWrite->currentBuffer[storageWrite->currentCompleteHeaderLen]; |
| } |
| } |
| |
| /* |
| * Test if a buffer is currently allocated. |
| */ |
| bool AppendOnlyStorageWrite_IsBufferAllocated( |
| AppendOnlyStorageWrite *storageWrite) |
| { |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| return (storageWrite->currentCompleteHeaderLen > 0); |
| } |
| |
| /* |
| * Return the beginning of the last write position of |
| * the write buffer. |
| */ |
| int64 AppendOnlyStorageWrite_LastWriteBeginPosition( |
| AppendOnlyStorageWrite *storageWrite) |
| { |
| return storageWrite->lastWriteBeginPosition; |
| } |
| |
| /* |
| * Return the position of the current write buffer. |
| */ |
| int64 AppendOnlyStorageWrite_CurrentPosition( |
| AppendOnlyStorageWrite *storageWrite) |
| { |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| return BufferedAppendCurrentBufferPosition( |
| &storageWrite->bufferedAppend); |
| } |
| |
| /* |
| * Return the internal current write buffer that includes the header. |
| * UNDONE: Fix this interface privacy violation... |
| */ |
| uint8 *AppendOnlyStorageWrite_GetCurrentInternalBuffer( |
| AppendOnlyStorageWrite *storageWrite) |
| { |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| return BufferedAppendGetCurrentBuffer( |
| &storageWrite->bufferedAppend); |
| } |
| |
| |
| static void AppendOnlyStorageWrite_VerifyWriteBlock( |
| AppendOnlyStorageWrite *storageWrite, |
| int64 headerOffsetInFile, |
| int32 bufferLen, |
| uint8 *expectedContent, |
| int32 expectedUncompressedLen, |
| int expectedExecutorBlockKind, |
| int expectedRowCount, |
| int32 expectedCompressedLen) |
| { |
| uint8 *header; |
| uint8 *content; |
| |
| AOHeaderCheckError checkError; |
| AoHeaderKind headerKind; |
| int32 actualHeaderLen; |
| int32 offset; |
| int32 uncompressedLen; |
| bool isCompressed; |
| int32 overallBlockLen; |
| int32 compressedLen; |
| int executorBlockKind; |
| bool hasFirstRowNum; |
| int64 firstRowNum; |
| int rowCount; |
| pg_crc32 storedChecksum; |
| pg_crc32 computedChecksum; |
| |
| if (storageWrite->storageAttributes.compress && storageWrite->verifyWriteBuffer == NULL) |
| return; // GUC must have been turned on mid-transaction. |
| |
| if (gp_appendonly_verify_write_block == false) |
| elog(WARNING, "The GUC gp_appendonly_verify_write_block is false. Compressed write not checked."); |
| |
| header = BufferedAppendGetCurrentBuffer(&storageWrite->bufferedAppend); |
| |
| /* |
| * Code is similar to that in getNextStorageBlockInFile routine. |
| */ |
| |
| /* |
| * Proceed very carefully: |
| * [ 1. Verify header checksum ] |
| * 2. Examine (basic) header. |
| * 3. Examine specific header. |
| * [ 4. Verify the block checksum ] |
| */ |
| if (storageWrite->storageAttributes.checksum) |
| { |
| if (!AppendOnlyStorageFormat_VerifyHeaderChecksum( |
| header, |
| &storedChecksum, |
| &computedChecksum)) |
| ereport(ERROR, |
| (errmsg("Verify block during write found header checksum does not match. Expected 0x%X and found 0x%X", |
| storedChecksum, |
| computedChecksum), |
| errdetail_appendonly_write_storage_block_header(storageWrite), |
| errcontext_appendonly_write_storage_block(storageWrite))); |
| } |
| |
| /* |
| * Check the (basic) header information. |
| */ |
| checkError = AppendOnlyStorageFormat_GetHeaderInfo( |
| header, |
| storageWrite->storageAttributes.checksum, |
| &headerKind, |
| &actualHeaderLen); |
| if (checkError != AOHeaderCheckOk) |
| ereport(ERROR, |
| (errmsg("Verify block during write found bad append-only storage header. Header check error %d, detail '%s'", |
| (int)checkError, |
| AppendOnlyStorageFormat_GetHeaderCheckErrorStr()), |
| errdetail_appendonly_write_storage_block_header(storageWrite), |
| errcontext_appendonly_write_storage_block(storageWrite))); |
| |
| switch (headerKind) |
| { |
| case AoHeaderKind_SmallContent: |
| /* |
| * Check the Block header information. |
| */ |
| checkError = |
| AppendOnlyStorageFormat_GetSmallContentHeaderInfo( |
| header, |
| actualHeaderLen, |
| storageWrite->storageAttributes.checksum, |
| bufferLen, |
| &overallBlockLen, |
| &offset, // Offset to data. |
| &uncompressedLen, |
| &executorBlockKind, |
| &hasFirstRowNum, |
| storageWrite->storageAttributes.version, |
| &firstRowNum, |
| &rowCount, |
| &isCompressed, |
| &compressedLen); |
| if (checkError != AOHeaderCheckOk) |
| ereport(ERROR, |
| (errmsg("Verify block during write found bad append-only storage block header. " |
| "Header check error %d, detail '%s'", |
| (int)checkError, |
| AppendOnlyStorageFormat_GetHeaderCheckErrorStr()), |
| errdetail_appendonly_write_storage_block_header(storageWrite), |
| errcontext_appendonly_write_storage_block(storageWrite))); |
| |
| if (uncompressedLen != expectedUncompressedLen) |
| ereport(ERROR, |
| (errmsg("Verify block during write found append-only storage block header. " |
| "DataLen %d does not equal expected length %d, ", |
| uncompressedLen, |
| expectedUncompressedLen), |
| errdetail_appendonly_write_storage_block_header(storageWrite), |
| errcontext_appendonly_write_storage_block(storageWrite))); |
| |
| |
| if (compressedLen != expectedCompressedLen) |
| ereport(ERROR, |
| (errmsg("Verify block during write found append-only storage block header. " |
| "CompressedLen %d does not equal expected length %d", |
| compressedLen, |
| expectedCompressedLen), |
| errdetail_appendonly_write_storage_block_header(storageWrite), |
| errcontext_appendonly_write_storage_block(storageWrite))); |
| |
| /* |
| * Now verify the executor portion of the block. |
| */ |
| |
| if (executorBlockKind != expectedExecutorBlockKind) |
| ereport(ERROR, |
| (errmsg("Verify block during write found append-only storage block header. " |
| "ExecutorBlockKind %d does not equal expected value %d.", |
| executorBlockKind, |
| expectedExecutorBlockKind), |
| errdetail_appendonly_write_storage_block_header(storageWrite), |
| errcontext_appendonly_write_storage_block(storageWrite))); |
| |
| // UNDONE: Check hasFirstRowNum |
| |
| if (rowCount != expectedRowCount) |
| ereport(ERROR, |
| (errmsg("Verify block during write found append-only storage block header. " |
| "RowCount %d does not equal expected value %d", |
| rowCount, |
| expectedRowCount), |
| errdetail_appendonly_write_storage_block_header(storageWrite), |
| errcontext_appendonly_write_storage_block(storageWrite))); |
| |
| if (Debug_appendonly_print_verify_write_block) |
| { |
| AppendOnlyStorageWrite_LogBlockHeader( |
| storageWrite, |
| headerOffsetInFile, |
| header); |
| } |
| |
| |
| if (isCompressed) |
| { |
| int test; |
| PGFunction decompressor; |
| PGFunction *cfns = storageWrite->compression_functions; |
| |
| Assert(gp_appendonly_verify_write_block == true); |
| Assert(storageWrite->verifyWriteCompressionState != NULL); |
| |
| if (cfns == NULL) |
| decompressor = NULL; |
| else |
| decompressor = cfns[COMPRESSION_DECOMPRESS]; |
| |
| gp_decompress_new(&header[offset], // Compressed data in block. |
| compressedLen, |
| storageWrite->verifyWriteBuffer, // Temporary buffer to hold uncompressed data. |
| uncompressedLen, |
| decompressor, |
| storageWrite->verifyWriteCompressionState, |
| storageWrite->bufferCount); |
| |
| /* |
| * Compare. |
| */ |
| test = memcmp(expectedContent, |
| storageWrite->verifyWriteBuffer, |
| uncompressedLen); |
| |
| if (test != 0) |
| ereport(ERROR, |
| (errmsg("Verify block during write found decompress did not produce the exact same bits passed to compress! " |
| "Memcmp result %d", |
| test), |
| errdetail_appendonly_write_storage_block_header(storageWrite), |
| errcontext_appendonly_write_storage_block(storageWrite))); |
| |
| } |
| else |
| { |
| content = &header[offset]; |
| |
| // UNDONE: Do comparison here |
| } |
| break; |
| |
| case AoHeaderKind_LargeContent: |
| /* |
| * Check the LargeContent header information. |
| */ |
| checkError = AppendOnlyStorageFormat_GetLargeContentHeaderInfo( |
| header, |
| actualHeaderLen, |
| storageWrite->storageAttributes.checksum, |
| &uncompressedLen, |
| &executorBlockKind, |
| &hasFirstRowNum, |
| &firstRowNum, |
| &rowCount); |
| if (checkError != AOHeaderCheckOk) |
| ereport(ERROR, |
| (errmsg("Bad append-only storage header of type LargeContent. Header check error %d, detail '%s'", |
| (int)checkError, |
| AppendOnlyStorageFormat_GetHeaderCheckErrorStr()), |
| errdetail_appendonly_write_storage_block_header(storageWrite), |
| errcontext_appendonly_write_storage_block(storageWrite))); |
| |
| // UNDONE: Needs checks on uncompressedLen, executorBlockKind, hasFirstRowNum, |
| // UNDONE: and rowCount... |
| |
| // UNDONE: Aren't we done here? |
| break; |
| |
| default: |
| elog(ERROR, "Unexpected Append-Only header kind %d", |
| headerKind); |
| break; |
| } |
| |
| #ifdef NeedCallBack |
| if (executorBlockKind == AoExecutorBlockKind_VarBlock) |
| { |
| VarBlockCheckError varBlockCheckError; |
| VarBlockReader varBlockReader; |
| int readerItemCount; |
| |
| varBlockCheckError = VarBlockIsValid(data, uncompressedLen); |
| if (varBlockCheckError != VarBlockCheckOk) |
| ereport(ERROR, |
| (errmsg("Verify block during write found VarBlock is not valid " |
| "Valid block check error %d, detail '%s'", |
| varBlockCheckError, |
| VarBlockGetCheckErrorStr()), |
| errdetail_appendonly_write_storage_block_header(storageWrite), |
| errcontext_appendonly_write_storage_block(storageWrite))); |
| |
| /* |
| * Now use the VarBlock module to extract the items out. |
| */ |
| VarBlockReaderInit(&varBlockReader, |
| data, |
| uncompressedLen); |
| |
| readerItemCount = VarBlockReaderItemCount(&varBlockReader); |
| |
| if (rowCount != readerItemCount) |
| { |
| ereport(ERROR, |
| (errmsg("Verify block during write found row count %d in append-only storage header does not match VarBlock item count %d", |
| rowCount, |
| readerItemCount), |
| errdetail_appendonly_write_storage_block_header(storageWrite), |
| errcontext_appendonly_write_storage_block(storageWrite))); |
| } |
| } |
| #endif |
| } |
| |
| static void |
| AppendOnlyStorageWrite_CompressAppend( |
| AppendOnlyStorageWrite *storageWrite, |
| uint8 *sourceData, |
| int32 sourceLen, |
| int executorBlockKind, |
| int itemCount, |
| int32 *compressedLen, |
| int32 *bufferLen) |
| { |
| uint8 *header; |
| uint8 *dataBuffer; |
| int res; |
| int32 dataRoundedUpLen = 0; // Shutup compiler. |
| int32 dataBufferWithOverrrunLen; |
| PGFunction *cfns = storageWrite->compression_functions; |
| PGFunction compressor; |
| |
| if (cfns == NULL) |
| compressor = NULL; |
| else |
| compressor = cfns[COMPRESSION_COMPRESS]; |
| |
| // UNDONE: This can be a duplicate call... |
| storageWrite->currentCompleteHeaderLen = |
| AppendOnlyStorageWrite_CompleteHeaderLen( |
| storageWrite, |
| storageWrite->getBufferAoHeaderKind); |
| |
| header = BufferedAppendGetMaxBuffer(&storageWrite->bufferedAppend); |
| if (header == NULL) |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("We do not expect files to be have a maximum length"), |
| errcontext_appendonly_write_storage_block(storageWrite))); |
| |
| dataBuffer = &header[storageWrite->currentCompleteHeaderLen]; |
| dataBufferWithOverrrunLen = |
| storageWrite->maxBufferWithCompressionOverrrunLen |
| - storageWrite->currentCompleteHeaderLen; |
| |
| /* |
| * Compress into the BufferedAppend buffer after the large header (and |
| * optional checksum, etc. |
| */ |
| res = gp_trycompress_new( |
| sourceData, |
| sourceLen, |
| dataBuffer, |
| dataBufferWithOverrrunLen, |
| sourceLen, // Limit compression to be no more than the input size. |
| compressedLen, |
| storageWrite->storageAttributes.compressLevel, |
| compressor, |
| storageWrite->compressionState); |
| /* |
| * We always store the data compressed if the comprssed length is less than |
| * the uncompressed length. |
| * |
| * TODO: this is a weak assumption. It doesn't account for the fact that |
| * it's not worth paying the CPU cost of decompression for a potentially |
| * trivial saving. |
| * |
| * The best solution to this seems to be to make the threshold at which we |
| * compress data user configurable. |
| */ |
| if (*compressedLen < sourceLen) |
| { |
| /* |
| * Compression successful. |
| */ |
| dataRoundedUpLen = AOStorage_RoundUp(*compressedLen, storageWrite->storageAttributes.version); |
| |
| AOStorage_ZeroPad( |
| dataBuffer, |
| *compressedLen, |
| dataRoundedUpLen); |
| |
| switch (storageWrite->getBufferAoHeaderKind) |
| { |
| case AoHeaderKind_SmallContent: |
| /* |
| * Make the header and compute the checksum if necessary. |
| */ |
| AppendOnlyStorageFormat_MakeSmallContentHeader( |
| header, |
| storageWrite->storageAttributes.checksum, |
| false, |
| storageWrite->storageAttributes.version, |
| 1, |
| executorBlockKind, |
| itemCount, |
| sourceLen, |
| *compressedLen); |
| break; |
| |
| case AoHeaderKind_BulkDenseContent: |
| /* |
| * Make the header and compute the checksum if necessary. |
| */ |
| AppendOnlyStorageFormat_MakeBulkDenseContentHeader( |
| header, |
| storageWrite->storageAttributes.checksum, |
| false, |
| storageWrite->storageAttributes.version, |
| 1, |
| executorBlockKind, |
| itemCount, |
| sourceLen, |
| *compressedLen); |
| break; |
| |
| default: |
| elog(ERROR, "Unexpected Append-Only header kind %d", |
| storageWrite->getBufferAoHeaderKind); |
| break; |
| } |
| |
| if (Debug_appendonly_print_storage_headers) |
| { |
| AppendOnlyStorageWrite_LogBlockHeader( |
| storageWrite, |
| BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend), |
| header); |
| } |
| |
| if (Debug_appendonly_print_insert) |
| elog(LOG,"Append-only insert finished compressed block for table '%s' " |
| "(segment file '%s', header offset in file " INT64_FORMAT ", " |
| "length = %d, compressed length %d, item count %d, block count " INT64_FORMAT ")", |
| storageWrite->relationName, |
| storageWrite->segmentFileName, |
| BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend), |
| sourceLen, |
| *compressedLen, |
| itemCount, |
| storageWrite->bufferCount); |
| } |
| else |
| { |
| int32 overallBlockLen; |
| |
| /* |
| * Unable to compress the data to smaller the input size. |
| * Solution: Indicate in the header we are storing an non-compressed |
| * block. |
| */ |
| *compressedLen = 0; |
| |
| dataRoundedUpLen = AOStorage_RoundUp(sourceLen, storageWrite->storageAttributes.version); |
| |
| overallBlockLen = storageWrite->currentCompleteHeaderLen + dataRoundedUpLen; |
| |
| /* |
| * Copy non-compressed data in after the header information. |
| */ |
| memcpy(dataBuffer, sourceData, sourceLen); |
| |
| AOStorage_ZeroPad( |
| dataBuffer, |
| sourceLen, |
| dataRoundedUpLen); |
| |
| /* |
| * Make the header and compute the checksum if necessary. |
| */ |
| AppendOnlyStorageFormat_MakeSmallContentHeader( |
| header, |
| storageWrite->storageAttributes.checksum, |
| false, |
| storageWrite->storageAttributes.version, |
| 1, |
| executorBlockKind, |
| itemCount, |
| sourceLen, |
| /* compressedLen */ 0); |
| |
| if (Debug_appendonly_print_storage_headers) |
| { |
| AppendOnlyStorageWrite_LogBlockHeader( |
| storageWrite, |
| BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend), |
| header); |
| } |
| |
| if (Debug_appendonly_print_insert) |
| elog(LOG,"Append-only insert could not compress block for table '%s' smaller -- non-compressed block stored " |
| "(segment file '%s', header offset in file " INT64_FORMAT ", " |
| "length = %d, item count %d, block count " INT64_FORMAT ")", |
| storageWrite->relationName, |
| storageWrite->segmentFileName, |
| BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend), |
| sourceLen, |
| itemCount, |
| storageWrite->bufferCount); |
| } |
| |
| *bufferLen = storageWrite->currentCompleteHeaderLen + dataRoundedUpLen; |
| |
| } |
| |
| /* |
| * Mark the current buffer "small" buffer as finished. |
| * |
| * If compression is configured, we will try to compress the contents in |
| * the temporary uncompressed buffer into the write buffer. |
| * |
| * The buffer can be scheduled for writing and reused. |
| */ |
| void AppendOnlyStorageWrite_FinishBuffer( |
| AppendOnlyStorageWrite *storageWrite, |
| |
| int32 contentLen, |
| /* |
| * The byte length of the content generated |
| * directly into the buffer returned by |
| * AppendOnlyStorageWrite_GetBuffer. |
| */ |
| int executorBlockKind, |
| /* |
| * A value defined externally by the executor |
| * that describes in content stored in the |
| * Append-Only Storage Block. |
| */ |
| int rowCount) |
| /* The number of rows stored in the content. */ |
| { |
| int64 headerOffsetInFile; |
| int32 bufferLen; |
| |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| Assert(storageWrite->currentCompleteHeaderLen > 0); |
| |
| if (contentLen > |
| storageWrite->maxBufferLen - storageWrite->currentCompleteHeaderLen) |
| elog(ERROR, |
| "Append-only content too large AO storage block (table '%s', " |
| "content length = %d, maximum buffer length %d, complete header length %d, first row number is set %s)", |
| storageWrite->relationName, |
| contentLen, |
| storageWrite->maxBufferLen, |
| storageWrite->currentCompleteHeaderLen, |
| (false ? "true" : "false")); |
| |
| |
| headerOffsetInFile = BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend); |
| |
| if(!storageWrite->storageAttributes.compress) |
| { |
| uint8 *nonCompressedHeader; |
| uint8 *nonCompressedData; |
| int32 dataRoundedUpLen; |
| int32 uncompressedlen; |
| |
| nonCompressedHeader = storageWrite->currentBuffer; |
| |
| nonCompressedData = &nonCompressedHeader[storageWrite->currentCompleteHeaderLen]; |
| |
| dataRoundedUpLen = AOStorage_RoundUp(contentLen, storageWrite->storageAttributes.version); |
| |
| AOStorage_ZeroPad( |
| nonCompressedData, |
| contentLen, |
| dataRoundedUpLen); |
| |
| switch (storageWrite->getBufferAoHeaderKind) |
| { |
| case AoHeaderKind_SmallContent: |
| /* |
| * Make the header and compute the checksum if necessary. |
| */ |
| AppendOnlyStorageFormat_MakeSmallContentHeader( |
| nonCompressedHeader, |
| storageWrite->storageAttributes.checksum, |
| false, |
| storageWrite->storageAttributes.version, |
| 1, |
| executorBlockKind, |
| rowCount, |
| contentLen, |
| /* compressedLength */ 0); |
| break; |
| |
| case AoHeaderKind_NonBulkDenseContent: |
| /* |
| * Make the header and compute the checksum if necessary. |
| */ |
| AppendOnlyStorageFormat_MakeNonBulkDenseContentHeader( |
| nonCompressedHeader, |
| storageWrite->storageAttributes.checksum, |
| false, |
| storageWrite->storageAttributes.version, |
| 1, |
| executorBlockKind, |
| rowCount, |
| contentLen); |
| break; |
| |
| default: |
| elog(ERROR, "Unexpected Append-Only header kind %d", |
| storageWrite->getBufferAoHeaderKind); |
| break; |
| } |
| |
| if (Debug_appendonly_print_storage_headers) |
| { |
| AppendOnlyStorageWrite_LogBlockHeader( |
| storageWrite, |
| headerOffsetInFile, |
| nonCompressedHeader); |
| } |
| |
| |
| bufferLen = storageWrite->currentCompleteHeaderLen + dataRoundedUpLen; |
| uncompressedlen = bufferLen; /* since there's no compression.. */ |
| |
| /* |
| * Just before finishing the AO Storage buffer with our non-compressed content, let's verify it. |
| */ |
| if (gp_appendonly_verify_write_block) |
| AppendOnlyStorageWrite_VerifyWriteBlock( |
| storageWrite, |
| headerOffsetInFile, |
| bufferLen, |
| nonCompressedData, |
| contentLen, |
| executorBlockKind, |
| rowCount, |
| /* expectedCompressedLen */ 0); |
| |
| storageWrite->lastWriteBeginPosition = |
| BufferedAppendNextBufferPosition(&(storageWrite->bufferedAppend)); |
| |
| BufferedAppendFinishBuffer( |
| &storageWrite->bufferedAppend, |
| bufferLen, |
| uncompressedlen); |
| |
| // Declare it finished. |
| storageWrite->currentCompleteHeaderLen = 0; |
| |
| if (Debug_appendonly_print_insert) |
| elog(LOG, |
| "Append-only insert finished uncompressed block for table '%s' " |
| "(length = %d, executor block kind %d, item count %d, block count " INT64_FORMAT ")", |
| storageWrite->relationName, |
| contentLen, |
| executorBlockKind, |
| rowCount, |
| storageWrite->bufferCount); |
| |
| /* For debug */ |
| if (Debug_appendonly_print_insert) |
| { |
| elog(LOG, |
| "Append-only insert finished uncompressed block for table '%s' " |
| "(lastWriteBeginPosition " INT64_FORMAT ")", |
| storageWrite->relationName, |
| storageWrite->lastWriteBeginPosition); |
| } |
| |
| } |
| else |
| { |
| int32 compressedLen = 0; |
| |
| AppendOnlyStorageWrite_CompressAppend( |
| storageWrite, |
| storageWrite->uncompressedBuffer, |
| contentLen, |
| executorBlockKind, |
| rowCount, |
| &compressedLen, |
| &bufferLen); |
| |
| /* |
| * Just before finishing the AO Storage buffer with our non-compressed content, let's verify it. |
| */ |
| if (gp_appendonly_verify_write_block) |
| AppendOnlyStorageWrite_VerifyWriteBlock( |
| storageWrite, |
| headerOffsetInFile, |
| bufferLen, |
| storageWrite->uncompressedBuffer, |
| contentLen, |
| executorBlockKind, |
| rowCount, |
| compressedLen); |
| |
| storageWrite->lastWriteBeginPosition = |
| BufferedAppendNextBufferPosition(&(storageWrite->bufferedAppend)); |
| |
| /* |
| * Finish the current buffer by specifying the used length. |
| */ |
| BufferedAppendFinishBuffer( |
| &storageWrite->bufferedAppend, |
| bufferLen, |
| storageWrite->currentCompleteHeaderLen + |
| AOStorage_RoundUp(contentLen, storageWrite->storageAttributes.version) /* non-compressed size */); |
| // Declare it finished. |
| storageWrite->currentCompleteHeaderLen = 0; |
| } |
| |
| Assert(storageWrite->currentCompleteHeaderLen == 0); |
| storageWrite->currentBuffer = NULL; |
| } |
| |
| /* |
| * Cancel the last ~GetBuffer call. |
| * |
| * This will also turn off the firstRowNum flag. |
| */ |
| void AppendOnlyStorageWrite_CancelLastBuffer( |
| AppendOnlyStorageWrite *storageWrite) |
| { |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| Assert(storageWrite->currentCompleteHeaderLen > 0); |
| |
| if (storageWrite->currentBuffer != NULL) |
| { |
| BufferedAppendCancelLastBuffer(&storageWrite->bufferedAppend); |
| storageWrite->currentBuffer = NULL; |
| } |
| |
| storageWrite->currentCompleteHeaderLen = 0; |
| } |
| |
| // ----------------------------------------------------------------------------- |
| // Writing "Large" Content |
| // ----------------------------------------------------------------------------- |
| |
| /* |
| * Write content up to 1Gb. |
| * |
| * Large content will be writen in fragment blocks by |
| * the Append-Only Storage Layer. |
| * |
| * If compression is configured, then the content will be |
| * compressed in fragments. |
| * |
| * Returns NULL when the current file does not have enough |
| * room for another buffer. |
| */ |
| void AppendOnlyStorageWrite_Content( |
| AppendOnlyStorageWrite *storageWrite, |
| |
| uint8 *content, |
| /* The content to store. All contiguous. */ |
| |
| int32 contentLen, |
| /* The byte length of the data to store. */ |
| |
| int executorBlockKind, |
| /* |
| * A value defined externally by the executor |
| * that describes in content stored in the |
| * Append-Only Storage Block. |
| */ |
| int rowCount) |
| /* The number of rows stored in the content. */ |
| { |
| int32 completeHeaderLen; |
| int32 compressedLen; |
| int32 bufferLen; |
| uint8 *data; |
| |
| Assert(storageWrite != NULL); |
| Assert(storageWrite->isActive); |
| |
| completeHeaderLen = |
| AppendOnlyStorageWrite_CompleteHeaderLen( |
| storageWrite, |
| AoHeaderKind_SmallContent); |
| if (contentLen <= storageWrite->maxBufferLen - completeHeaderLen) |
| { |
| /* |
| * This is "small" content. |
| */ |
| if(!storageWrite->storageAttributes.compress) |
| { |
| /* |
| * Verify the split boundary. |
| */ |
| AppendOnlyStorageWrite_PadOutForSplit(storageWrite, storageWrite->maxBufferLen); |
| |
| data = AppendOnlyStorageWrite_GetBuffer( |
| storageWrite, |
| AoHeaderKind_SmallContent); |
| |
| memcpy( |
| data, |
| content, |
| contentLen); |
| |
| storageWrite->lastWriteBeginPosition = |
| BufferedAppendNextBufferPosition(&(storageWrite->bufferedAppend)); |
| |
| AppendOnlyStorageWrite_FinishBuffer( |
| storageWrite, |
| contentLen, |
| executorBlockKind, |
| rowCount); |
| Assert(storageWrite->currentCompleteHeaderLen == 0); |
| } |
| else |
| { |
| |
| /* |
| * Verify the split boundary. |
| */ |
| AppendOnlyStorageWrite_PadOutForSplit(storageWrite, storageWrite->maxBufferLen); |
| |
| /* |
| * Since ~_GetBuffer now takes in a specification of the header kind, we |
| * need to set the header kind so general routines like ~_CompressAppend |
| * will work correctly when writing the small "fragments |
| */ |
| storageWrite->getBufferAoHeaderKind = AoHeaderKind_SmallContent; |
| AppendOnlyStorageWrite_CompressAppend( |
| storageWrite, |
| content, |
| contentLen, |
| executorBlockKind, |
| rowCount, |
| &compressedLen, |
| &bufferLen); |
| |
| /* |
| * Just before finishing the AO Storage buffer with our non-compressed content, let's verify it. |
| */ |
| if (gp_appendonly_verify_write_block) |
| AppendOnlyStorageWrite_VerifyWriteBlock( |
| storageWrite, |
| BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend), |
| bufferLen, |
| content, |
| contentLen, |
| executorBlockKind, |
| rowCount, |
| compressedLen); |
| |
| storageWrite->lastWriteBeginPosition = |
| BufferedAppendNextBufferPosition(&(storageWrite->bufferedAppend)); |
| |
| /* |
| * Finish the current buffer by specifying the used length. |
| */ |
| BufferedAppendFinishBuffer( |
| &storageWrite->bufferedAppend, |
| bufferLen, |
| storageWrite->currentCompleteHeaderLen + |
| AOStorage_RoundUp(contentLen, storageWrite->storageAttributes.version) /* non-compressed size */); |
| |
| // Declare it finished. |
| storageWrite->currentCompleteHeaderLen = 0; |
| } |
| } |
| else |
| { |
| int32 largeContentHeaderLen; |
| uint8 *largeContentHeader; |
| int32 smallContentHeaderLen; |
| int32 maxSmallContentLen; |
| int32 countdownContentLen; |
| uint8 *contentNext; |
| int32 smallContentLen; |
| |
| /* |
| * Write the "Large" content in fragments. |
| */ |
| |
| storageWrite->lastWriteBeginPosition = |
| BufferedAppendNextBufferPosition(&(storageWrite->bufferedAppend)); |
| |
| /* |
| * First is a LargeContent header-only block that has the "large" content length and |
| * "large" row count. |
| */ |
| largeContentHeaderLen = AppendOnlyStorageWrite_LargeContentHeaderLen(storageWrite); |
| |
| /* |
| * Verify the split boundary. |
| */ |
| AppendOnlyStorageWrite_PadOutForSplit(storageWrite, storageWrite->maxBufferLen); |
| |
| largeContentHeader = |
| BufferedAppendGetBuffer( |
| &storageWrite->bufferedAppend, |
| largeContentHeaderLen); |
| |
| AppendOnlyStorageFormat_MakeLargeContentHeader( |
| largeContentHeader, |
| storageWrite->storageAttributes.checksum, |
| false, |
| storageWrite->storageAttributes.version, |
| 1, |
| executorBlockKind, |
| rowCount, |
| contentLen); |
| |
| BufferedAppendFinishBuffer( |
| &storageWrite->bufferedAppend, |
| largeContentHeaderLen, |
| largeContentHeaderLen); |
| |
| // Declare it finished. |
| storageWrite->currentCompleteHeaderLen = 0; |
| |
| smallContentHeaderLen = |
| AppendOnlyStorageWrite_CompleteHeaderLen( |
| storageWrite, |
| AoHeaderKind_SmallContent); |
| maxSmallContentLen = storageWrite->maxBufferLen - smallContentHeaderLen; |
| countdownContentLen = contentLen; |
| contentNext = content; |
| while (true) |
| { |
| if (countdownContentLen <= maxSmallContentLen) |
| smallContentLen = countdownContentLen; |
| else |
| smallContentLen = maxSmallContentLen; |
| |
| /* |
| * Verify the split boundary. |
| */ |
| AppendOnlyStorageWrite_PadOutForSplit(storageWrite, storageWrite->maxBufferLen); |
| |
| if(!storageWrite->storageAttributes.compress) |
| { |
| data = AppendOnlyStorageWrite_GetBuffer( |
| storageWrite, |
| AoHeaderKind_SmallContent); |
| |
| memcpy( |
| data, |
| contentNext, |
| smallContentLen); |
| |
| AppendOnlyStorageWrite_FinishBuffer( |
| storageWrite, |
| smallContentLen, |
| executorBlockKind, |
| /* rowCount */ 0); |
| } |
| else |
| { |
| /* |
| * Since ~_GetBuffer now takes in a specification of the header kind, we |
| * need to set the header kind so general routines like ~_CompressAppend |
| * will work correctly when writing the small "fragments |
| */ |
| storageWrite->getBufferAoHeaderKind = AoHeaderKind_SmallContent; |
| |
| AppendOnlyStorageWrite_CompressAppend( |
| storageWrite, |
| contentNext, |
| smallContentLen, |
| executorBlockKind, |
| /* rowCount */ 0, |
| &compressedLen, |
| &bufferLen); |
| |
| /* |
| * Just before finishing the AO Storage buffer with our non-compressed content, let's verify it. |
| */ |
| if (gp_appendonly_verify_write_block) |
| AppendOnlyStorageWrite_VerifyWriteBlock( |
| storageWrite, |
| BufferedAppendCurrentBufferPosition(&storageWrite->bufferedAppend), |
| bufferLen, |
| contentNext, |
| smallContentLen, |
| executorBlockKind, |
| /* rowCount */ 0, |
| compressedLen); |
| |
| /* |
| * Finish the current buffer by specifying the used length. |
| */ |
| BufferedAppendFinishBuffer( |
| &storageWrite->bufferedAppend, |
| bufferLen, |
| smallContentHeaderLen + |
| AOStorage_RoundUp(smallContentLen, storageWrite->storageAttributes.version) /* non-compressed size */); |
| |
| // Declare it finished. |
| storageWrite->currentCompleteHeaderLen = 0; |
| } |
| |
| countdownContentLen -= smallContentLen; |
| if (countdownContentLen == 0) |
| break; |
| |
| contentNext += smallContentLen; |
| } |
| } |
| |
| // Verify we have no buffer allocated. |
| Assert(storageWrite->currentCompleteHeaderLen == 0); |
| } |