| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * |
| * cdbappendonlystorageread.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| #ifndef WIN32 |
| #include <sys/fcntl.h> |
| #else |
| #include <io.h> |
| #endif |
| #include <sys/file.h> |
| #include <unistd.h> |
| |
| #include "access/filesplit.h" |
| #include "catalog/pg_compression.h" |
| #include "cdb/cdbappendonlystorage.h" |
| #include "cdb/cdbappendonlystoragelayer.h" |
| #include "cdb/cdbappendonlystorageformat.h" |
| #include "cdb/cdbappendonlystorageread.h" |
| #include "utils/guc.h" |
| #include "cdb/cdbvars.h" |
| |
| |
| // ----------------------------------------------------------------------------- |
| // Initialization |
| // ----------------------------------------------------------------------------- |
| |
| /* |
| * Initialize AppendOnlyStorageRead. |
| * |
| * The AppendOnlyStorageRead data structure is initialized |
| * once for a read session and can be used to read |
| * Append-Only Storage Blocks from 1 or more segment files. |
| * |
| * The current file to read to is opened with the |
| * AppendOnlyStorageRead_OpenFile routine. |
| */ |
| void AppendOnlyStorageRead_Init( |
| AppendOnlyStorageRead *storageRead, |
| /* The data structure to initialize. */ |
| |
| MemoryContext memoryContext, |
| /* |
| * The memory context to use for buffers and |
| * other memory needs. When NULL, the |
| * current memory context is used. |
| */ |
| int32 maxBufferLen, |
| /* |
| * The maximum Append-Only Storage Block |
| * length including all storage headers. |
| */ |
| char *relationName, |
| /* |
| * Name of the relation to use in system |
| * logging and error messages. |
| */ |
| |
| char *title, |
| /* |
| * A phrase that better describes the purpose of the this open. |
| * |
| * The caller manages the storage for this. |
| */ |
| |
| AppendOnlyStorageAttributes *storageAttributes) |
| /* |
| * The Append-Only Storage Attributes |
| * from relation creation. |
| */ |
| { |
| int relationNameLen; |
| uint8 *memory; |
| int32 memoryLen; |
| MemoryContext oldMemoryContext; |
| |
| Assert(storageRead != NULL); |
| |
| // UNDONE: Range check maxBufferLen |
| |
| Assert(relationName != NULL); |
| Assert(storageAttributes != NULL); |
| |
| // UNDONE: Range check fields in storageAttributes |
| |
| MemSet(storageRead, 0, sizeof(AppendOnlyStorageRead)); |
| |
| storageRead->maxBufferLen = maxBufferLen; |
| |
| if (memoryContext == NULL) |
| storageRead->memoryContext = CurrentMemoryContext; |
| else |
| storageRead->memoryContext = memoryContext; |
| |
| oldMemoryContext = MemoryContextSwitchTo(storageRead->memoryContext); |
| |
| memcpy( |
| &storageRead->storageAttributes, |
| storageAttributes, |
| sizeof(AppendOnlyStorageAttributes)); |
| |
| relationNameLen = strlen(relationName); |
| storageRead->relationName = (char *) palloc(relationNameLen + 1); |
| memcpy(storageRead->relationName, relationName, relationNameLen + 1); |
| |
| storageRead->title = title; |
| |
| storageRead->minimumHeaderLen = |
| AppendOnlyStorageFormat_RegularHeaderLenNeeded( |
| storageRead->storageAttributes.checksum); |
| |
| /* |
| * Initialize BufferedRead. |
| */ |
| storageRead->largeReadLen = 2 * storageRead->maxBufferLen; |
| |
| memoryLen = |
| BufferedReadMemoryLen( |
| storageRead->maxBufferLen, |
| storageRead->largeReadLen); |
| |
| Assert(CurrentMemoryContext == storageRead->memoryContext); |
| memory = (uint8*)palloc(memoryLen); |
| |
| BufferedReadInit(&storageRead->bufferedRead, |
| memory, |
| memoryLen, |
| storageRead->maxBufferLen, |
| storageRead->largeReadLen, |
| relationName); |
| |
| if (Debug_appendonly_print_scan || Debug_appendonly_print_read_block) |
| elog(LOG,"Append-Only Storage Read initialize for table '%s' " |
| "(compression = %s, compression level %d, maximum buffer length %d, large read length %d)", |
| storageRead->relationName, |
| (storageRead->storageAttributes.compress ? "true" : "false"), |
| storageRead->storageAttributes.compressLevel, |
| storageRead->maxBufferLen, |
| storageRead->largeReadLen); |
| |
| storageRead->file = -1; |
| |
| MemoryContextSwitchTo(oldMemoryContext); |
| |
| storageRead->isActive = true; |
| |
| } |
| |
| /* |
| * Return (read-only) pointer to relation name. |
| */ |
| char *AppendOnlyStorageRead_RelationName( |
| AppendOnlyStorageRead *storageRead) |
| { |
| Assert(storageRead != NULL); |
| Assert(storageRead->isActive); |
| |
| return storageRead->relationName; |
| } |
| |
| /* |
| * Return (read-only) pointer to relation name. |
| */ |
| char *AppendOnlyStorageRead_SegmentFileName( |
| AppendOnlyStorageRead *storageRead) |
| { |
| Assert(storageRead != NULL); |
| Assert(storageRead->isActive); |
| |
| return storageRead->segmentFileName; |
| } |
| |
| /* |
| * Finish using the AppendOnlyStorageRead session created with ~Init. |
| */ |
| void AppendOnlyStorageRead_FinishSession( |
| AppendOnlyStorageRead *storageRead) |
| /* The data structure to finish. */ |
| { |
| MemoryContext oldMemoryContext; |
| |
| if(!storageRead->isActive) |
| return; |
| |
| oldMemoryContext = MemoryContextSwitchTo(storageRead->memoryContext); |
| |
| // UNDONE: This expects the MemoryContext to be what was used for the 'memory' in ~Init |
| BufferedReadFinish(&storageRead->bufferedRead); |
| |
| if (storageRead->relationName != NULL) |
| { |
| pfree(storageRead->relationName); |
| storageRead->relationName = NULL; |
| } |
| |
| if (storageRead->segmentFileName != NULL) |
| { |
| pfree(storageRead->segmentFileName); |
| storageRead->segmentFileName = NULL; |
| } |
| |
| if (storageRead->compression_functions != NULL) |
| { |
| callCompressionDestructor(storageRead->compression_functions[COMPRESSION_DESTRUCTOR], storageRead->compressionState); |
| pfree(storageRead->compressionState); |
| } |
| |
| /* Deallocation is done. Go back to caller memory-context. */ |
| MemoryContextSwitchTo(oldMemoryContext); |
| |
| storageRead->isActive = false; |
| |
| } |
| |
| // ----------------------------------------------------------------------------- |
| // Open and Close |
| // ----------------------------------------------------------------------------- |
| |
| /* |
| * Do open the next segment file to read, but don't do error processing. |
| * |
| * This routine is responsible for seeking to the proper |
| * read location given the logical EOF. |
| */ |
| static File AppendOnlyStorageRead_DoOpenFile( |
| AppendOnlyStorageRead *storageRead, |
| |
| char *filePathName) |
| /* The name of the segment file to open. */ |
| { |
| int fileFlags = O_RDONLY | PG_BINARY; |
| int fileMode = 0400; |
| /* File mode is S_IRUSR 00400 user has read permission */ |
| |
| File file; |
| |
| Assert(storageRead != NULL); |
| Assert(storageRead->isActive); |
| Assert(filePathName != NULL); |
| |
| if (Debug_appendonly_print_read_block) |
| { |
| elog(LOG, |
| "Append-Only storage read: opening table '%s', segment file '%s', fileFlags 0x%x, fileMode 0x%x", |
| storageRead->relationName, |
| storageRead->segmentFileName, |
| fileFlags, |
| fileMode); |
| } |
| /* |
| * Open the file for read. |
| */ |
| file = PathNameOpenFile(filePathName, fileFlags, fileMode); |
| return file; |
| } |
| |
| /* |
| * Finish the open by positioning the next read and saving information. |
| */ |
| static void AppendOnlyStorageRead_FinishOpenFile( |
| AppendOnlyStorageRead *storageRead, |
| |
| File file, |
| /* The open file. */ |
| |
| char *filePathName, |
| /* The name of the segment file to open. */ |
| int64 splitLen, |
| int64 logicalEof, |
| int64 offset) |
| /* |
| * The snapshot version of the EOF |
| * value to use as the read end of the segment |
| * file. |
| */ |
| { |
| int64 seekResult; |
| |
| MemoryContext oldMemoryContext; |
| |
| int segmentFileNameLen; |
| |
| seekResult = FileSeek(file, offset, SEEK_SET); |
| if (seekResult != offset) |
| { |
| FileClose(file); |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("Append-only Storage Read error on segment file '%s' for relation '%s'. FileSeek offset = " INT64_FORMAT ". Error code = %d (%s)" |
| , filePathName, storageRead->relationName, offset, errno, strerror(errno)), |
| errdetail("%s", HdfsGetLastError()))); |
| } |
| |
| storageRead->file = file; |
| |
| /* |
| * When reading multiple segment files, we throw away the old segment file name strings. |
| */ |
| oldMemoryContext = MemoryContextSwitchTo(storageRead->memoryContext); |
| |
| if (storageRead->segmentFileName != NULL) |
| pfree(storageRead->segmentFileName); |
| |
| segmentFileNameLen = strlen(filePathName); |
| storageRead->segmentFileName = (char *) palloc(segmentFileNameLen + 1); |
| memcpy(storageRead->segmentFileName, filePathName, segmentFileNameLen + 1); |
| |
| /* Allocation is done. Go back to caller memory-context. */ |
| MemoryContextSwitchTo(oldMemoryContext); |
| |
| storageRead->logicalEof = logicalEof; |
| storageRead->bufferedRead.largeReadPosition = offset; |
| |
| BufferedReadSetFile( |
| &storageRead->bufferedRead, |
| storageRead->file, |
| storageRead->segmentFileName, |
| splitLen, |
| logicalEof); |
| |
| } |
| |
| /* |
| * Open the next segment file to read. |
| * |
| * This routine is responsible for seeking to the proper |
| * read location given the logical EOF. |
| */ |
| void AppendOnlyStorageRead_OpenFile( |
| AppendOnlyStorageRead *storageRead, |
| |
| char *filePathName, |
| /* The name of the segment file to open. */ |
| int64 splitLen, |
| int64 logicalEof, |
| int64 offset, |
| bool toOpenFile) |
| /* |
| * The snapshot version of the EOF |
| * value to use as the read end of the segment |
| * file. |
| */ |
| { |
| File file; |
| |
| Assert(storageRead != NULL); |
| Assert(storageRead->isActive); |
| Assert(filePathName != NULL); |
| |
| /* |
| * The EOF must be be greater than 0, otherwise we risk transactionally created |
| * segment files from disappearing if a concurrent write transaction aborts. |
| */ |
| if (logicalEof <= 0 && splitLen <= 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("Append-only Storage Read segment file '%s' EOF must be > 0 for relation '%s'", |
| filePathName, |
| storageRead->relationName))); |
| if(toOpenFile || storageRead->file < 0 ){ |
| if (debug_print_split_alloc_result) { |
| elog(LOG, "reopen file"); |
| } |
| file = AppendOnlyStorageRead_DoOpenFile( |
| storageRead, |
| filePathName); |
| } |
| else |
| { |
| if (debug_print_split_alloc_result) { |
| elog(LOG, "avoid reopen file"); |
| } |
| file = storageRead->file; |
| } |
| if(file < 0) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("Append-Only Storage Read could not open segment file '%s' for relation '%s'", filePathName, storageRead->relationName), |
| errdetail("%s", HdfsGetLastError()))); |
| } |
| |
| AppendOnlyStorageRead_FinishOpenFile( |
| storageRead, |
| file, |
| filePathName, |
| splitLen, |
| logicalEof, |
| offset); |
| } |
| |
| /* |
| * Try opening the next segment file to read. |
| * |
| * This routine is responsible for seeking to the proper |
| * read location given the logical EOF. |
| */ |
| bool AppendOnlyStorageRead_TryOpenFile( |
| AppendOnlyStorageRead *storageRead, |
| |
| char *filePathName, |
| /* The name of the segment file to open. */ |
| int64 splitLen, |
| int64 logicalEof) |
| /* |
| * The snapshot version of the EOF |
| * value to use as the read end of the segment |
| * file. |
| */ |
| { |
| File file; |
| |
| Assert(storageRead != NULL); |
| Assert(storageRead->isActive); |
| Assert(filePathName != NULL); |
| // UNDONE: Range check logicalEof |
| |
| file = AppendOnlyStorageRead_DoOpenFile( |
| storageRead, |
| filePathName); |
| if(file < 0) |
| { |
| return false; |
| } |
| |
| AppendOnlyStorageRead_FinishOpenFile( |
| storageRead, |
| file, |
| filePathName, |
| splitLen, |
| logicalEof, |
| 0); |
| |
| return true; |
| } |
| |
| /* |
| * Set a temporary read range in the current open segment file. |
| * |
| * The beginFileOffset must be to the beginning of an Append-Only Storage block. |
| * |
| * The afterFileOffset serves as the temporary EOF. It will cause ~_GetBlockInfo to return |
| * false (no more blocks) when reached. It must be at the end of an Append-Only Storage |
| * block. |
| * |
| * When ~_GetBlockInfo returns false (no more blocks), the temporary read range is forgotten. |
| */ |
| void AppendOnlyStorageRead_SetTemporaryRange( |
| AppendOnlyStorageRead *storageRead, |
| int64 beginFileOffset, |
| int64 afterFileOffset) |
| { |
| Assert(storageRead->isActive); |
| Assert(storageRead->file != -1); |
| Assert(beginFileOffset >= 0); |
| Assert(beginFileOffset <= storageRead->logicalEof); |
| Assert(afterFileOffset >= 0); |
| Assert(afterFileOffset <= storageRead->logicalEof); |
| |
| BufferedReadSetTemporaryRange(&storageRead->bufferedRead, |
| beginFileOffset, |
| afterFileOffset); |
| } |
| |
| /* |
| * Close the current segment file. |
| * |
| * No error if the current is already closed. |
| */ |
| void AppendOnlyStorageRead_CloseFile( |
| AppendOnlyStorageRead *storageRead) |
| { |
| if(!storageRead->isActive) |
| return; |
| |
| if (storageRead->file == -1) |
| return; |
| |
| FileClose(storageRead->file); |
| |
| storageRead->file = -1; |
| |
| storageRead->logicalEof = INT64CONST(0); |
| |
| if(storageRead->bufferedRead.file >= 0) |
| BufferedReadCompleteFile(&storageRead->bufferedRead); |
| } |
| |
| // ----------------------------------------------------------------------------- |
| // Reading Content |
| // ----------------------------------------------------------------------------- |
| |
| /* |
| * Skip zero padding to next page boundary, if necessary. |
| * |
| * This function is called when the file system block we are scanning has |
| * no more valid data but instead is padded with zero's from the position |
| * we are currently in until the end of the block. The function will skip |
| * to the end of block if skipLen is -1 or skip skipLen bytes otherwise. |
| */ |
| static void |
| AppendOnlyStorageRead_DoSkipPadding( |
| AppendOnlyStorageRead *storageRead, |
| int32 skipLen, |
| bool isUseSplitLen) |
| { |
| int64 nextReadPosition; |
| int64 nextBoundaryPosition; |
| int32 safeWriteRemainder; |
| bool doSkip; |
| uint8 *buffer; |
| int32 availableLen; |
| int32 safewrite = storageRead->storageAttributes.safeFSWriteSize; |
| |
| /* early exit if no pad used */ |
| if(safewrite == 0) |
| return; |
| |
| nextReadPosition = BufferedReadNextBufferPosition(&storageRead->bufferedRead); |
| nextBoundaryPosition = |
| ((nextReadPosition + safewrite - 1)/safewrite)*safewrite; |
| safeWriteRemainder = (int32)(nextBoundaryPosition - nextReadPosition); |
| |
| if (safeWriteRemainder <= 0) |
| doSkip = false; |
| else if (skipLen == -1) |
| { |
| /* |
| * Skip to end of page. |
| */ |
| doSkip = true; |
| skipLen = safeWriteRemainder; |
| } |
| else |
| doSkip = (safeWriteRemainder < skipLen); |
| |
| if (doSkip) |
| { |
| /* |
| * Read through the remainder. |
| */ |
| buffer = BufferedReadGetNextBuffer(&storageRead->bufferedRead, |
| safeWriteRemainder, |
| &availableLen, |
| isUseSplitLen); |
| |
| /* |
| * Since our file EOF should always be a multiple of the file-system |
| * page, we do not expect a short read here. |
| */ |
| if (buffer == NULL) |
| availableLen = 0; |
| if (buffer == NULL || safeWriteRemainder != availableLen) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("Unexpected end of file. Expected to read %d bytes after position " INT64_FORMAT " but found %d bytes (bufferCount " INT64_FORMAT ")\n", |
| safeWriteRemainder, |
| nextReadPosition, |
| availableLen, |
| storageRead->bufferCount))); |
| } |
| |
| // UNDONE: For verification purposes, we should verify the |
| // UNDONE: reminder is all zeroes. |
| |
| if (Debug_appendonly_print_scan) |
| elog(LOG,"Append-only scan skipping zero padded remainder for table '%s' (nextReadPosition = " INT64_FORMAT ", safeWriteRemainder = %d)", |
| storageRead->relationName, |
| nextReadPosition, |
| safeWriteRemainder); |
| } |
| } |
| |
| /* |
| * Skip zero padding to next page boundary, if necessary. |
| * |
| * This function is called when the file system block we are scanning has |
| * no more valid data but instead is padded with zero's from the position |
| * we are currently in until the end of the block. The function will skip |
| * to the end of block if skipLen is -1 or skip skipLen bytes otherwise. |
| */ |
| static bool |
| AppendOnlyStorageRead_PositionToNextBlock( |
| AppendOnlyStorageRead *storageRead, |
| int64 *headerOffsetInFile, |
| uint8 **header, |
| int32 *blockLimitLen, |
| bool isUseSplitLen) |
| { |
| int32 availableLen; |
| int i; |
| int64 fileRemainderLen; |
| |
| Assert(storageRead != NULL); |
| Assert(header != NULL); |
| |
| /* |
| * Peek ahead just enough so we can see the Append-Only storage |
| * header. |
| * |
| * However, we need to honor the file-system page boundaries here |
| * since we do not let the length information cross the boundary. |
| */ |
| AppendOnlyStorageRead_DoSkipPadding(storageRead, storageRead->minimumHeaderLen,isUseSplitLen); |
| |
| *headerOffsetInFile = BufferedReadNextBufferPosition(&storageRead->bufferedRead); |
| |
| *header = BufferedReadGetNextBuffer(&storageRead->bufferedRead, |
| storageRead->minimumHeaderLen, |
| &availableLen, |
| isUseSplitLen); |
| |
| if (*header == NULL) |
| { |
| /* done reading the file */ |
| return false; |
| } |
| |
| storageRead->bufferCount++; |
| |
| if ((availableLen != storageRead->minimumHeaderLen) |
| && (storageRead->bufferedRead.largeReadPosition + storageRead->bufferedRead.largeReadLen != storageRead->bufferedRead.splitLen)) |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("Expected %d bytes and got %d bytes in table %s " |
| "(segment file '%s', header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT ")\n", |
| storageRead->minimumHeaderLen, |
| availableLen, |
| storageRead->relationName, |
| storageRead->segmentFileName, |
| *headerOffsetInFile, |
| storageRead->bufferCount))); |
| |
| /* |
| * First check for zero padded page remainder. |
| */ |
| i = 0; |
| while (true) |
| { |
| if ((*header)[i] != 0) |
| break; |
| |
| i++; |
| if (i >= storageRead->minimumHeaderLen) |
| { |
| /* |
| * Skip over zero padding caused when the append command |
| * left a partially full page. |
| */ |
| AppendOnlyStorageRead_DoSkipPadding(storageRead, /* indicated till end of page */ -1, isUseSplitLen); |
| |
| /* |
| * Now try to get the peek data from the new page. |
| */ |
| *headerOffsetInFile = BufferedReadNextBufferPosition(&storageRead->bufferedRead); |
| |
| *header = BufferedReadGetNextBuffer(&storageRead->bufferedRead, |
| storageRead->minimumHeaderLen, |
| &availableLen, |
| isUseSplitLen); |
| |
| if (*header == NULL) |
| { |
| /* done reading the file */ |
| return false; |
| } |
| |
| if ((availableLen != storageRead->minimumHeaderLen) |
| && (storageRead->bufferedRead.largeReadPosition + storageRead->bufferedRead.largeReadLen != storageRead->bufferedRead.splitLen)) |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("Expected %d bytes and found %d bytes in table %s " |
| "(segment file '%s', header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT ")", |
| storageRead->minimumHeaderLen, |
| availableLen, |
| storageRead->relationName, |
| storageRead->segmentFileName, |
| *headerOffsetInFile, |
| storageRead->bufferCount))); |
| i = 0; |
| } |
| } |
| |
| /* |
| * Check for zero padded page remainder. |
| * If i >= storageRead->minimumHeaderLen (16 or 8), we skip storageRead->minimumHeaderLen zeros as mentioned above. |
| * If there are more zeros but less than storageRead->minimumHeaderLen, we skip them as well. |
| * But it must be a multiple of 8 in version 2, or a multiple of 4 in version 1. If not, we should align it. |
| */ |
| if (i > 0) |
| { |
| if (storageRead->storageAttributes.version == AORelationVersion_Original) |
| { |
| i = i / 4 * 4; |
| } |
| else if (storageRead->storageAttributes.version == AORelationVersion_Aligned64bit) |
| { |
| i = i / 8 * 8; |
| } |
| *headerOffsetInFile += i; |
| *header += i; |
| storageRead->bufferedRead.bufferOffset += i; |
| } |
| |
| /* |
| * Determine the maximum boundary of the block. |
| * UNDONE: When we have a block directory, we will tighten the limit down. |
| */ |
| if (isUseSplitLen) |
| fileRemainderLen = storageRead->bufferedRead.splitLen - *headerOffsetInFile; |
| else |
| fileRemainderLen = storageRead->bufferedRead.fileLen - *headerOffsetInFile; |
| |
| if (storageRead->maxBufferLen > fileRemainderLen) |
| *blockLimitLen = (int32)fileRemainderLen; |
| else |
| *blockLimitLen = storageRead->maxBufferLen; |
| |
| return (*blockLimitLen > 0); |
| } |
| |
| |
| char * |
| AppendOnlyStorageRead_ContextStr(AppendOnlyStorageRead *storageRead) |
| { |
| StringInfoData buf; |
| |
| int64 headerOffsetInFile; |
| |
| headerOffsetInFile = BufferedReadCurrentPosition(&storageRead->bufferedRead); |
| |
| initStringInfo(&buf); |
| appendStringInfo( |
| &buf, |
| "%s. Append-Only segment file '%s', block header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT, |
| storageRead->title, |
| storageRead->segmentFileName, |
| headerOffsetInFile, |
| storageRead->bufferCount); |
| |
| return buf.data; |
| } |
| |
| /* |
| * errcontext_appendonly_read_storage_block |
| * |
| * Add an errcontext() line showing the table, segment file, offset in file, block count of |
| * the storage block being read. |
| */ |
| int |
| errcontext_appendonly_read_storage_block(AppendOnlyStorageRead *storageRead) |
| { |
| char *str; |
| |
| str = AppendOnlyStorageRead_ContextStr(storageRead); |
| |
| errcontext("%s", str); |
| |
| pfree(str); |
| |
| return 0; |
| } |
| |
| char * |
| AppendOnlyStorageRead_StorageContentHeaderStr(AppendOnlyStorageRead *storageRead) |
| { |
| uint8 *header; |
| |
| header = BufferedReadGetCurrentBuffer(&storageRead->bufferedRead); |
| |
| return AppendOnlyStorageFormat_BlockHeaderStr( |
| header, |
| storageRead->storageAttributes.checksum, |
| storageRead->storageAttributes.version); |
| } |
| |
| /* |
| * errdetail_appendonly_read_storage_content_header |
| * |
| * Add an errdetail() line showing the Append-Only Storage header being read. |
| */ |
| int |
| errdetail_appendonly_read_storage_content_header(AppendOnlyStorageRead *storageRead) |
| { |
| char *str; |
| |
| str = AppendOnlyStorageRead_StorageContentHeaderStr(storageRead); |
| |
| errdetail("%s", str); |
| |
| pfree(str); |
| |
| return 0; |
| } |
| |
| static void AppendOnlyStorageRead_LogBlockHeader( |
| AppendOnlyStorageRead *storageRead, |
| uint8 *header) |
| { |
| char *contextStr; |
| char *blockHeaderStr; |
| |
| contextStr = AppendOnlyStorageRead_ContextStr(storageRead); |
| |
| blockHeaderStr = |
| AppendOnlyStorageFormat_SmallContentHeaderStr( |
| header, |
| storageRead->storageAttributes.checksum, |
| storageRead->storageAttributes.version); |
| ereport(LOG, |
| (errmsg("%s. %s", |
| contextStr, |
| blockHeaderStr))); |
| |
| pfree(contextStr); |
| pfree(blockHeaderStr); |
| } |
| |
| /* |
| * Get information on the next Append-Only Storage Block. |
| * |
| * Return true if another block was found. Otherwise, |
| * when we have reached the end of the current segment |
| * file. |
| */ |
| static bool AppendOnlyStorageRead_InternalGetBlockInfo( |
| AppendOnlyStorageRead *storageRead, |
| bool isUseSplitLen) |
| { |
| uint8 *header; |
| AOHeaderCheckError checkError; |
| int32 blockLimitLen = 0; // Shutup compiler. |
| pg_crc32 storedChecksum; |
| pg_crc32 computedChecksum; |
| |
| /* |
| * Reset current* variables. |
| */ |
| |
| // For efficiency, zero out. Comment out lines that set fields to 0. |
| memset(&storageRead->current, 0, sizeof(AppendOnlyStorageReadCurrent)); |
| |
| // storageRead->current.headerOffsetInFile = 0; |
| storageRead->current.headerKind = AoHeaderKind_None; |
| // storageRead->current.actualHeaderLen = 0; |
| // storageRead->current.contentLen = 0; |
| // storageRead->current.overallBlockLen = 0; |
| // storageRead->current.contentOffset = 0; |
| // storageRead->current.executorBlockKind = 0; |
| // storageRead->current.hasFirstRowNum = false; |
| storageRead->current.firstRowNum = INT64CONST(-1); |
| // storageRead->current.rowCount = 0; |
| // storageRead->current.isLarge = false; |
| // storageRead->current.isCompressed = false; |
| // storageRead->current.compressedLen = 0; |
| |
| if(Debug_appendonly_print_datumstream) |
| elog(LOG, "before AppendOnlyStorageRead_PositionToNextBlock, storageRead->current.headerOffsetInFile is" INT64_FORMAT "storageRead->current.overallBlockLen is %d", storageRead->current.headerOffsetInFile, storageRead->current.overallBlockLen); |
| |
| if (!AppendOnlyStorageRead_PositionToNextBlock( |
| storageRead, |
| &storageRead->current.headerOffsetInFile, |
| &header, |
| &blockLimitLen, |
| isUseSplitLen)) |
| { |
| /* Done reading the file */ |
| return false; |
| } |
| |
| if(Debug_appendonly_print_datumstream) |
| elog(LOG, "after AppendOnlyStorageRead_PositionToNextBlock, storageRead->current.headerOffsetInFile is" INT64_FORMAT "storageRead->current.overallBlockLen is %d", storageRead->current.headerOffsetInFile, storageRead->current.overallBlockLen); |
| |
| /* |
| * Proceed very carefully: |
| * [ 1. Verify header checksum ] |
| * 2. Examine (basic) header. |
| * 3. Examine specific header. |
| * [ 4. Verify the block checksum ] |
| */ |
| if (storageRead->storageAttributes.checksum) |
| { |
| if (!AppendOnlyStorageFormat_VerifyHeaderChecksum( |
| header, |
| &storedChecksum, |
| &computedChecksum)) |
| ereport(ERROR, |
| (errmsg("Header checksum does not match. Expected 0x%X and found 0x%X headerOffsetInFile is" INT64_FORMAT " overallBlockLen is %d", |
| storedChecksum, |
| computedChecksum, |
| storageRead->current.headerOffsetInFile, |
| storageRead->current.overallBlockLen), |
| errdetail_appendonly_read_storage_content_header(storageRead), |
| errcontext_appendonly_read_storage_block(storageRead))); |
| } |
| |
| /* |
| * Check the (basic) header information. |
| */ |
| checkError = AppendOnlyStorageFormat_GetHeaderInfo( |
| header, |
| storageRead->storageAttributes.checksum, |
| &storageRead->current.headerKind, |
| &storageRead->current.actualHeaderLen); |
| if (checkError != AOHeaderCheckOk) |
| ereport(ERROR, |
| (errmsg("Bad append-only storage header. Header check error %d, detail '%s'", |
| (int)checkError, |
| AppendOnlyStorageFormat_GetHeaderCheckErrorStr()), |
| errdetail_appendonly_read_storage_content_header(storageRead), |
| errcontext_appendonly_read_storage_block(storageRead))); |
| |
| /* |
| * Get more header since AppendOnlyStorageRead_PositionToNextBlock only gets minimum. |
| */ |
| if (storageRead->minimumHeaderLen < storageRead->current.actualHeaderLen) |
| { |
| int32 availableLen; |
| |
| header = BufferedReadGrowBuffer(&storageRead->bufferedRead, |
| storageRead->current.actualHeaderLen, |
| &availableLen, |
| isUseSplitLen); |
| |
| if (header == NULL || |
| availableLen != storageRead->current.actualHeaderLen) |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("Expected %d bytes and found %d bytes in table %s " |
| "(segment file '%s', header offset in file = " INT64_FORMAT ", bufferCount " INT64_FORMAT ")", |
| storageRead->current.actualHeaderLen, |
| availableLen, |
| storageRead->relationName, |
| storageRead->segmentFileName, |
| storageRead->current.headerOffsetInFile, |
| storageRead->bufferCount))); |
| } |
| |
| /* |
| * Based on the kind of header, we either have small or large content. |
| */ |
| switch (storageRead->current.headerKind) |
| { |
| case AoHeaderKind_SmallContent: |
| /* |
| * Check the SmallContent header information. |
| */ |
| checkError = |
| AppendOnlyStorageFormat_GetSmallContentHeaderInfo( |
| header, |
| storageRead->current.actualHeaderLen, |
| storageRead->storageAttributes.checksum, |
| blockLimitLen, |
| &storageRead->current.overallBlockLen, |
| &storageRead->current.contentOffset, |
| &storageRead->current.uncompressedLen, |
| &storageRead->current.executorBlockKind, |
| &storageRead->current.hasFirstRowNum, |
| storageRead->storageAttributes.version, |
| &storageRead->current.firstRowNum, |
| &storageRead->current.rowCount, |
| &storageRead->current.isCompressed, |
| &storageRead->current.compressedLen); |
| if (checkError != AOHeaderCheckOk) |
| ereport(ERROR, |
| (errmsg("Bad append-only storage header of type small content. Header check error %d, detail '%s'", |
| (int)checkError, |
| AppendOnlyStorageFormat_GetHeaderCheckErrorStr()), |
| errdetail_appendonly_read_storage_content_header(storageRead), |
| errcontext_appendonly_read_storage_block(storageRead))); |
| break; |
| |
| case AoHeaderKind_LargeContent: |
| /* |
| * Check the LargeContent metadata header information. |
| */ |
| checkError = AppendOnlyStorageFormat_GetLargeContentHeaderInfo( |
| header, |
| storageRead->current.actualHeaderLen, |
| storageRead->storageAttributes.checksum, |
| &storageRead->current.uncompressedLen, |
| &storageRead->current.executorBlockKind, |
| &storageRead->current.hasFirstRowNum, |
| &storageRead->current.firstRowNum, |
| &storageRead->current.rowCount); |
| if (checkError != AOHeaderCheckOk) |
| ereport(ERROR, |
| (errmsg("Bad append-only storage header of type large content. Header check error %d, detail '%s'", |
| (int)checkError, |
| AppendOnlyStorageFormat_GetHeaderCheckErrorStr()), |
| errdetail_appendonly_read_storage_content_header(storageRead), |
| errcontext_appendonly_read_storage_block(storageRead))); |
| storageRead->current.isLarge = true; |
| break; |
| |
| case AoHeaderKind_NonBulkDenseContent: |
| /* |
| * Check the NonBulkDense header information. |
| */ |
| checkError = |
| AppendOnlyStorageFormat_GetNonBulkDenseContentHeaderInfo( |
| header, |
| storageRead->current.actualHeaderLen, |
| storageRead->storageAttributes.checksum, |
| blockLimitLen, |
| &storageRead->current.overallBlockLen, |
| &storageRead->current.contentOffset, |
| &storageRead->current.uncompressedLen, |
| &storageRead->current.executorBlockKind, |
| &storageRead->current.hasFirstRowNum, |
| storageRead->storageAttributes.version, |
| &storageRead->current.firstRowNum, |
| &storageRead->current.rowCount); |
| if (checkError != AOHeaderCheckOk) |
| ereport(ERROR, |
| (errmsg("Bad append-only storage header of type non-bulk dense content. Header check error %d, detail '%s'", |
| (int)checkError, |
| AppendOnlyStorageFormat_GetHeaderCheckErrorStr()), |
| errdetail_appendonly_read_storage_content_header(storageRead), |
| errcontext_appendonly_read_storage_block(storageRead))); |
| break; |
| |
| case AoHeaderKind_BulkDenseContent: |
| /* |
| * Check the BulkDenseContent header information. |
| */ |
| checkError = |
| AppendOnlyStorageFormat_GetBulkDenseContentHeaderInfo( |
| header, |
| storageRead->current.actualHeaderLen, |
| storageRead->storageAttributes.checksum, |
| blockLimitLen, |
| &storageRead->current.overallBlockLen, |
| &storageRead->current.contentOffset, |
| &storageRead->current.uncompressedLen, |
| &storageRead->current.executorBlockKind, |
| &storageRead->current.hasFirstRowNum, |
| storageRead->storageAttributes.version, |
| &storageRead->current.firstRowNum, |
| &storageRead->current.rowCount, |
| &storageRead->current.isCompressed, |
| &storageRead->current.compressedLen); |
| if (checkError != AOHeaderCheckOk) |
| ereport(ERROR, |
| (errmsg("Bad append-only storage header of type bulk dense content. Header check error %d, detail '%s'", |
| (int)checkError, |
| AppendOnlyStorageFormat_GetHeaderCheckErrorStr()), |
| errdetail_appendonly_read_storage_content_header(storageRead), |
| errcontext_appendonly_read_storage_block(storageRead))); |
| break; |
| |
| default: |
| elog(ERROR, "Unexpected Append-Only header kind %d", |
| storageRead->current.headerKind); |
| break; |
| } |
| |
| if (Debug_appendonly_print_storage_headers) |
| { |
| AppendOnlyStorageRead_LogBlockHeader(storageRead, header); |
| } |
| |
| if (storageRead->current.hasFirstRowNum) |
| { |
| // UNDONE: Grow buffer and read the value into firstRowNum. |
| } |
| |
| if (storageRead->current.headerKind == AoHeaderKind_LargeContent) |
| { |
| // UNDONE: Finish the read for the information only header. |
| } |
| |
| return true; |
| } |
| |
| /* |
| * Get information on the next Append-Only Storage Block. |
| * |
| * Return true if another block was found. Otherwise, |
| * when we have reached the end of the current segment |
| * file. |
| */ |
| bool AppendOnlyStorageRead_GetBlockInfo( |
| AppendOnlyStorageRead *storageRead, |
| |
| int32 *contentLen, |
| /* The total byte length of the content. */ |
| |
| int *executorBlockKind, |
| /* |
| * The executor supplied value stored in the |
| * Append-Only Storage Block header. |
| */ |
| int64 *firstRowNum, |
| /* |
| * When the first row number for this block |
| * was explicitly set, that value is |
| * returned here. Otherwise, INT64CONST(-1) |
| * is returned. |
| */ |
| int *rowCount, |
| /* The number of rows in the content. */ |
| |
| bool *isLarge, |
| /* |
| * When true, the content was longer than the |
| * maxBufferLen (i.e. blocksize) minus |
| * Append-Only Storage Block header and had |
| * to be stored in more than one storage block. |
| */ |
| bool *isCompressed, |
| /* |
| * When true, the content is compressed and |
| * cannot be looked at directly in the buffer. |
| */ |
| bool isUseSplitLen) |
| { |
| bool isNext; |
| |
| Assert(storageRead != NULL); |
| Assert(storageRead->isActive); |
| |
| /* |
| * If isUseSplitLen= true and readPosition>splitLen,then this block should not belong to this split. |
| * We should read segment file for a new split. |
| * This situation will occur when the previous block is the last block of a big tuple which is larger than read split size(128MB). |
| * It also means the last tuple cross splits and the rest of this split should handle for other vSeg not this vSeg. |
| */ |
| if (isUseSplitLen && storageRead->bufferedRead.largeReadPosition >= storageRead->bufferedRead.splitLen) |
| return false; |
| |
| isNext = AppendOnlyStorageRead_InternalGetBlockInfo(storageRead, isUseSplitLen); |
| |
| /* |
| * The current* variables have good values even when there is no next block. |
| */ |
| *contentLen = storageRead->current.uncompressedLen; |
| *executorBlockKind = storageRead->current.executorBlockKind; |
| *firstRowNum = storageRead->current.firstRowNum; |
| *rowCount = storageRead->current.rowCount; |
| *isLarge = storageRead->current.isLarge; |
| *isCompressed = storageRead->current.isCompressed; |
| |
| return isNext; |
| } |
| |
| /* |
| * Return the current Append-Only Storage Block buffer. |
| */ |
| uint8 *AppendOnlyStorageRead_CurrentBuffer( |
| AppendOnlyStorageRead *storageRead) |
| { |
| Assert(storageRead != NULL); |
| Assert(storageRead->isActive); |
| |
| return BufferedReadGetCurrentBuffer(&storageRead->bufferedRead); |
| } |
| |
| /* |
| * Return the file offset of the current Append-Only Storage Block. |
| */ |
| int64 AppendOnlyStorageRead_CurrentHeaderOffsetInFile( |
| AppendOnlyStorageRead *storageRead) |
| { |
| Assert(storageRead != NULL); |
| Assert(storageRead->isActive); |
| |
| return storageRead->current.headerOffsetInFile; |
| } |
| |
| /* |
| * Return the compressed length of the content of the current Append-Only Storage Block. |
| */ |
| int64 AppendOnlyStorageRead_CurrentCompressedLen( |
| AppendOnlyStorageRead *storageRead) |
| { |
| Assert(storageRead != NULL); |
| Assert(storageRead->isActive); |
| |
| return storageRead->current.compressedLen; |
| } |
| |
| /* |
| * Return the overall block length of the current Append-Only Storage Block. |
| */ |
| int64 AppendOnlyStorageRead_OverallBlockLen( |
| AppendOnlyStorageRead *storageRead) |
| { |
| Assert(storageRead != NULL); |
| Assert(storageRead->isActive); |
| |
| return storageRead->current.overallBlockLen; |
| } |
| |
| /* |
| * Internal routine to grow the BufferedRead buffer to be the whole current block and |
| * to get header and content pointers of current block. |
| * |
| * Since we are growing the BufferedRead buffer to the whole block, old pointers to |
| * the header must be abandoned. |
| * |
| * Header to current block was read and verified by AppendOnlyStorageRead_InternalGetBlockInfo. |
| */ |
| static void AppendOnlyStorageRead_InternalGetBuffer( |
| AppendOnlyStorageRead *storageRead, |
| uint8 **header, |
| uint8 **content, |
| bool isUseSplitLen) |
| { |
| int32 availableLen; |
| pg_crc32 storedChecksum; |
| pg_crc32 computedChecksum; |
| |
| /* |
| * Verify next block is type Block. |
| */ |
| Assert(storageRead->current.headerKind == AoHeaderKind_SmallContent || |
| storageRead->current.headerKind == AoHeaderKind_NonBulkDenseContent || |
| storageRead->current.headerKind == AoHeaderKind_BulkDenseContent); |
| |
| /* |
| * Grow the buffer to the full block length to avoid any |
| * unnecessary copying by BufferedRead. |
| * |
| * Since the BufferedRead module may have to copy information around, |
| * we do not save any pointers to the prior buffer call. This why |
| * AppendOnlyStorageFormat_GetHeaderInfo passes back the offset to the data, |
| * not a pointer. |
| */ |
| *header = BufferedReadGrowBuffer(&storageRead->bufferedRead, |
| storageRead->current.overallBlockLen, |
| &availableLen, |
| isUseSplitLen); |
| |
| if (storageRead->current.overallBlockLen != availableLen) |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("Wrong buffer length. Expected %d byte length buffer and got %d ", |
| storageRead->current.overallBlockLen, |
| availableLen), |
| errdetail_appendonly_read_storage_content_header(storageRead), |
| errcontext_appendonly_read_storage_block(storageRead))); |
| |
| if (storageRead->storageAttributes.checksum && |
| gp_appendonly_verify_block_checksums) |
| { |
| /* |
| * Now that the header has been verified, verify the block checksum |
| * in the header with the checksum of the data portion. |
| */ |
| if (!AppendOnlyStorageFormat_VerifyBlockChecksum( |
| *header, |
| storageRead->current.overallBlockLen, |
| &storedChecksum, |
| &computedChecksum)) |
| ereport(ERROR, |
| (errmsg("Block checksum does not match. Expected 0x%X and found 0x%X", |
| storedChecksum, |
| computedChecksum), |
| errdetail_appendonly_read_storage_content_header(storageRead), |
| errcontext_appendonly_read_storage_block(storageRead))); |
| } |
| |
| *content = &((*header)[storageRead->current.contentOffset]); |
| } |
| |
| /* |
| * Get a pointer to the small non-compressed content. |
| * |
| * This interface provides a pointer directly into the |
| * read buffer for efficient data use. |
| * |
| */ |
| uint8 *AppendOnlyStorageRead_GetBuffer( |
| AppendOnlyStorageRead *storageRead, |
| bool isUseSplitLen) |
| { |
| uint8 *header; |
| uint8 *content; |
| |
| Assert(storageRead != NULL); |
| Assert(storageRead->isActive); |
| |
| /* |
| * Verify next block is a "small" non-compressed block. |
| */ |
| Assert(storageRead->current.headerKind == AoHeaderKind_SmallContent || |
| storageRead->current.headerKind == AoHeaderKind_NonBulkDenseContent || |
| storageRead->current.headerKind == AoHeaderKind_BulkDenseContent); |
| Assert(!storageRead->current.isLarge); |
| Assert(!storageRead->current.isCompressed); |
| |
| /* |
| * Fetch pointers to content. |
| */ |
| AppendOnlyStorageRead_InternalGetBuffer( |
| storageRead, |
| &header, |
| &content, |
| isUseSplitLen); |
| |
| return content; |
| } |
| |
| /* |
| * Copy the large and/or decompressed content out. |
| * |
| * The contentOutLen parameter value must match the contentLen |
| * from the AppendOnlyStorageReadGetBlockInfo call. |
| * |
| * Note this routine will work for small non-compressed content, too. |
| */ |
| void AppendOnlyStorageRead_Content( |
| AppendOnlyStorageRead *storageRead, |
| |
| uint8 *contentOut, |
| /* The memory to receive the contiguous content. */ |
| |
| int32 contentOutLen, |
| /* The byte length of the contentOut buffer. */ |
| bool isUseSplitLen) |
| { |
| Assert(storageRead != NULL); |
| Assert(storageRead->isActive); |
| Assert(contentOutLen == storageRead->current.uncompressedLen); |
| |
| if (storageRead->current.isLarge) |
| { |
| int64 largeContentPosition; |
| // Position of the large content metadata block. |
| |
| int32 largeContentLen; |
| // Total length of the large content. |
| |
| int32 remainingLargeContentLen; |
| // The remaining number of bytes to read for the large content. |
| |
| uint8 *contentNext; |
| // Pointer inside the contentOut buffer to put the next byte. |
| |
| int32 regularBlockReadCount; |
| // Number of regular blocks read after the metadata block. |
| |
| int32 regularContentLen; |
| // Length of the current regular block's content. |
| |
| /* |
| * Large content. |
| * |
| * We have the LargeContent "metadata" AO block with the total length (already |
| * read) followed by N SmallContent blocks with the fragments of the large content. |
| */ |
| |
| |
| /* |
| * Save any values needed from the current* members since they will be modifed |
| * as we read the regular blocks. |
| */ |
| largeContentPosition = storageRead->current.headerOffsetInFile; |
| largeContentLen = storageRead->current.uncompressedLen; |
| |
| /* |
| * Loop to read regular blocks. |
| */ |
| contentNext = contentOut; |
| remainingLargeContentLen = largeContentLen; |
| regularBlockReadCount = 0; |
| while (true) |
| { |
| /* |
| * Read next regular block. |
| */ |
| regularBlockReadCount++; |
| if (!AppendOnlyStorageRead_InternalGetBlockInfo(storageRead, false)) |
| { |
| /* |
| * Unexpected end of file. |
| */ |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("Unexpected end of file trying to read block %d of large content in segment file '%s' of table '%s'. " |
| "Large content metadata block is at position " INT64_FORMAT " " |
| "Large content length %d", |
| regularBlockReadCount, |
| storageRead->segmentFileName, |
| storageRead->relationName, |
| largeContentPosition, |
| largeContentLen))); |
| } |
| if (storageRead->current.headerKind != AoHeaderKind_SmallContent) |
| { |
| /* |
| * Unexpected headerKind. |
| */ |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("Expected header kind 'Block' for block %d of large content in segment file '%s' of table '%s'. " |
| "Large content metadata block is at position " INT64_FORMAT " " |
| "Large content length %d", |
| regularBlockReadCount, |
| storageRead->segmentFileName, |
| storageRead->relationName, |
| largeContentPosition, |
| largeContentLen))); |
| } |
| Assert(!storageRead->current.isLarge); |
| |
| regularContentLen = storageRead->current.uncompressedLen; |
| remainingLargeContentLen -= regularContentLen; |
| if (remainingLargeContentLen < 0) |
| { |
| /* |
| * Too much data found??? |
| */ |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("Too much data found after reading %d blocks for large content in segment file '%s' of table '%s'. " |
| "Large content metadata block is at position " INT64_FORMAT " " |
| "Large content length %d; extra data length %d", |
| regularBlockReadCount, |
| storageRead->segmentFileName, |
| storageRead->relationName, |
| largeContentPosition, |
| largeContentLen, |
| -remainingLargeContentLen))); |
| } |
| |
| /* |
| * We can safely recurse one level here. |
| */ |
| AppendOnlyStorageRead_Content( |
| storageRead, |
| contentNext, |
| regularContentLen, |
| isUseSplitLen); |
| |
| if (remainingLargeContentLen == 0) |
| break; |
| |
| /* |
| * Advance our pointer inside the contentOut buffer to put the next bytes. |
| */ |
| contentNext += regularContentLen; |
| } |
| } |
| else |
| { |
| uint8 *header; |
| uint8 *content; |
| |
| /* |
| * "Small" content in one regular block. |
| */ |
| |
| /* |
| * Fetch pointers to content. |
| */ |
| AppendOnlyStorageRead_InternalGetBuffer( |
| storageRead, |
| &header, |
| &content, |
| isUseSplitLen); |
| |
| if (!storageRead->current.isCompressed) |
| { |
| /* |
| * Not compressed. |
| */ |
| memcpy( |
| contentOut, |
| content, |
| storageRead->current.uncompressedLen); |
| |
| if (Debug_appendonly_print_scan) |
| elog(LOG, |
| "Append-only Storage Read non-compressed block for table '%s' " |
| "(length = %d, segment file '%s', header offset in file = " INT64_FORMAT ", block count " INT64_FORMAT ")", |
| storageRead->relationName, |
| storageRead->current.uncompressedLen, |
| storageRead->segmentFileName, |
| storageRead->current.headerOffsetInFile, |
| storageRead->bufferCount); |
| } |
| else |
| { |
| /* |
| * Compressed. |
| */ |
| |
| PGFunction decompressor; |
| PGFunction *cfns = storageRead->compression_functions; |
| |
| if (cfns == NULL) |
| decompressor = NULL; |
| else |
| decompressor = cfns[COMPRESSION_DECOMPRESS]; |
| |
| gp_decompress_new( |
| content, // Compressed data in block. |
| storageRead->current.compressedLen, |
| contentOut, |
| storageRead->current.uncompressedLen, |
| decompressor, |
| storageRead->compressionState, |
| storageRead->bufferCount); |
| |
| if (Debug_appendonly_print_scan) |
| elog(LOG, |
| "Append-only Storage Read decompressed block for table '%s' " |
| "(compressed length %d, uncompressed length = %d, segment file '%s', " |
| "header offset in file = " INT64_FORMAT ", block count " INT64_FORMAT ")", |
| storageRead->relationName, |
| AppendOnlyStorageFormat_GetCompressedLen(header), |
| storageRead->current.uncompressedLen, |
| storageRead->segmentFileName, |
| storageRead->current.headerOffsetInFile, |
| storageRead->bufferCount); |
| } |
| } |
| |
| |
| } |
| |
| /* |
| * Skip the current block found with ~_GetBlockInfo. |
| * |
| * Do not decompress the block contents. |
| * |
| * Call this routine instead of calling ~_GetBuffer or ~_Contents that look at contents. Useful |
| * when the desired row(s) are not within the row range of the current block. |
| * |
| */ |
| void AppendOnlyStorageRead_SkipCurrentBlock( |
| AppendOnlyStorageRead *storageRead, bool isUseSplitLen) |
| { |
| Assert(storageRead != NULL); |
| Assert(storageRead->isActive); |
| |
| if (storageRead->current.isLarge) |
| { |
| int64 largeContentPosition; |
| // Position of the large content metadata block. |
| |
| int32 largeContentLen; |
| // Total length of the large content. |
| |
| int32 remainingLargeContentLen; |
| // The remaining number of bytes to read for the large content. |
| |
| int32 regularBlockReadCount; |
| // Number of regular blocks read after the metadata block. |
| |
| int32 regularContentLen; |
| // Length of the current regular block's content. |
| |
| /* |
| * Large content. |
| * |
| * We have the LargeContent "metadata" AO block with the total length (already |
| * read) followed by N SmallContent blocks with the fragments of the large content. |
| */ |
| |
| |
| /* |
| * Save any values needed from the current* members since they will be modifed |
| * as we read the regular blocks. |
| */ |
| largeContentPosition = storageRead->current.headerOffsetInFile; |
| largeContentLen = storageRead->current.uncompressedLen; |
| |
| /* |
| * Loop to read regular blocks. |
| */ |
| remainingLargeContentLen = largeContentLen; |
| regularBlockReadCount = 0; |
| while (true) |
| { |
| /* |
| * Read next regular block. |
| */ |
| regularBlockReadCount++; |
| if (!AppendOnlyStorageRead_InternalGetBlockInfo(storageRead, isUseSplitLen)) |
| { |
| /* |
| * Unexpected end of file. |
| */ |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("Unexpected end of file trying to read block %d of large content in segment file '%s' of table '%s'. " |
| "Large content metadata block is at position " INT64_FORMAT " " |
| "Large content length %d", |
| regularBlockReadCount, |
| storageRead->segmentFileName, |
| storageRead->relationName, |
| largeContentPosition, |
| largeContentLen))); |
| } |
| if (storageRead->current.headerKind != AoHeaderKind_SmallContent) |
| { |
| /* |
| * Unexpected headerKind. |
| */ |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("Expected header kind 'Block' for block %d of large content in segment file '%s' of table '%s'. " |
| "Large content metadata block is at position " INT64_FORMAT " " |
| "Large content length %d", |
| regularBlockReadCount, |
| storageRead->segmentFileName, |
| storageRead->relationName, |
| largeContentPosition, |
| largeContentLen))); |
| } |
| Assert(!storageRead->current.isLarge); |
| |
| regularContentLen = storageRead->current.uncompressedLen; |
| remainingLargeContentLen -= regularContentLen; |
| if (remainingLargeContentLen < 0) |
| { |
| /* |
| * Too much data found??? |
| */ |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("Too much data found after reading %d blocks for large content in segment file '%s' of table '%s'. " |
| "Large content metadata block is at position " INT64_FORMAT " " |
| "Large content length %d; extra data length %d", |
| regularBlockReadCount, |
| storageRead->segmentFileName, |
| storageRead->relationName, |
| largeContentPosition, |
| largeContentLen, |
| -remainingLargeContentLen))); |
| } |
| |
| /* |
| * Since we are skipping, we do not use the compressed or uncompressed |
| * content. |
| */ |
| |
| if (remainingLargeContentLen == 0) |
| break; |
| } |
| } |
| else |
| { |
| uint8 *header; |
| uint8 *content; |
| |
| /* |
| * "Small" content in one regular block. |
| */ |
| |
| /* |
| * Fetch pointers to content. |
| * |
| * Since we are skipping, we do not look at the content. |
| */ |
| AppendOnlyStorageRead_InternalGetBuffer( |
| storageRead, |
| &header, |
| &content, |
| true); |
| } |
| |
| } |