blob: 68031f4ff36cef52ab5b873ada620aef3e1634fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*-------------------------------------------------------------------------
*
* appendonlyam.c
* append-only relation access method code
*
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
* Portions Copyright (c) 2008-2009, Greenplum Inc.
*
*
* INTERFACE ROUTINES
* appendonly_beginscan - begin relation scan
* appendonly_rescan - restart a relation scan
* appendonly_endscan - end relation scan
* appendonly_getnext - retrieve next tuple in scan
* appendonly_insert_init - initialize an insert operation
* appendonly_insert - insert tuple into a relation
* appendonly_insert_finish - finish an insert operation
*
* NOTES
* This file contains the appendonly_ routines which implement
* the access methods used for all append-only relations.
*
* $Id: $
* $Change: $
* $DateTime: $
* $Author: $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#ifndef WIN32
#include <fcntl.h>
#else
#include <io.h>
#endif
#include <sys/file.h>
#include <unistd.h>
#include "fmgr.h"
#include "access/tupdesc.h"
#include "access/appendonlytid.h"
#include "access/filesplit.h"
#include "cdb/cdbappendonlystorage.h"
#include "cdb/cdbappendonlystorageformat.h"
#include "cdb/cdbappendonlystoragelayer.h"
#include "access/aomd.h"
#include "access/heapam.h"
#include "access/hio.h"
#include "access/multixact.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/valid.h"
#include "access/xact.h"
#include "access/appendonlywriter.h"
#include "access/aosegfiles.h"
#include "catalog/catalog.h"
#include "catalog/pg_appendonly.h"
#include "catalog/pg_attribute_encoding.h"
#include "catalog/namespace.h"
#include "catalog/gp_fastsequence.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbappendonlyam.h"
#include "pgstat.h"
#include "storage/procarray.h"
#include "storage/gp_compress.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
#include "storage/freespace.h"
#include "storage/smgr.h"
#include "storage/gp_compress.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#ifdef HAVE_LIBZ
#include <zlib.h>
#endif
#define SCANNED_SEGNO (&scan->aos_segfile_arr[ \
(scan->aos_segfiles_processed == 0 ? 0 : scan->aos_segfiles_processed - 1)])->segno
typedef enum AoExecutorBlockKind
{
AoExecutorBlockKind_None = 0,
AoExecutorBlockKind_VarBlock,
AoExecutorBlockKind_SingleRow,
MaxAoExecutorBlockKind /* must always be last */
} AoExecutorBlockKind;
static void
AppendOnlyExecutionReadBlock_SetSegmentFileNum(
AppendOnlyExecutorReadBlock *executorReadBlock,
int segmentFileNum);
static void
AppendOnlyExecutionReadBlock_SetPositionInfo(
AppendOnlyExecutorReadBlock *executorReadBlock,
int64 blockFirstRowNum);
static void
AppendOnlyExecutorReadBlock_Init(
AppendOnlyExecutorReadBlock *executorReadBlock,
Relation relation,
MemoryContext memoryContext,
AppendOnlyStorageRead *storageRead,
int32 usableBlockSize);
static void
AppendOnlyExecutorReadBlock_Finish(
AppendOnlyExecutorReadBlock *executorReadBlock);
static void
AppendOnlyExecutorReadBlock_ResetCounts(
AppendOnlyExecutorReadBlock *executorReadBlock);
/* ----------------
* initscan - scan code common to appendonly_beginscan and appendonly_rescan
* ----------------
*/
static void
initscan(AppendOnlyScanDesc scan, ScanKey key)
{
/*
* copy the scan key, if appropriate
*/
if (key != NULL)
memcpy(scan->aos_key, key, scan->aos_nkeys * sizeof(ScanKeyData));
scan->aos_filenamepath[0] = '\0';
scan->aos_splits_processed = 0;
scan->aos_need_new_split = true; /* need to assign a file to be scanned */
scan->aos_done_all_splits = false;
scan->bufferDone = true;
if (scan->initedStorageRoutines)
AppendOnlyExecutorReadBlock_ResetCounts(
&scan->executorReadBlock);
pgstat_count_heap_scan(scan->aos_rd);
}
/*
* Open the next file segment to scan and allocate all resources needed for it.
*/
static bool
SetNextFileSegForRead(AppendOnlyScanDesc scan)
{
Relation reln = scan->aos_rd;
int segno = -1;
int64 eof = -1;
int64 end_of_split = -1;
int64 offset = 0;
bool finished_all_splits = true; /* assume */
int32 fileSegNo;
bool toOpenFile = true;
Assert(scan->aos_need_new_split); /* only call me when last segfile completed */
Assert(!scan->aos_done_all_splits); /* don't call me if I told you to stop */
if (!scan->initedStorageRoutines)
{
PGFunction *fns = NULL;
AppendOnlyStorageRead_Init(
&scan->storageRead,
scan->aoScanInitContext,
scan->usableBlockSize,
NameStr(scan->aos_rd->rd_rel->relname),
scan->title,
&scan->storageAttributes);
/*
* There is no guarantee that the current memory context will be preserved between calls,
* so switch to a safe memory context for retrieving compression information.
*/
MemoryContext oldMemoryContext = MemoryContextSwitchTo(scan->aoScanInitContext);
/* Get the relation specific compression functions */
fns = RelationGetRelationCompressionFuncs(reln);
scan->storageRead.compression_functions = fns;
if (scan->storageRead.compression_functions != NULL)
{
PGFunction cons = fns[COMPRESSION_CONSTRUCTOR];
CompressionState *cs;
StorageAttributes sa;
sa.comptype = scan->storageAttributes.compressType;
sa.complevel = scan->storageAttributes.compressLevel;
sa.blocksize = scan->usableBlockSize;
/*
* The relation's tuple descriptor allows the compression
* constructor to make decisions about how to compress or
* decompress the relation given it's structure.
*/
cs = callCompressionConstructor(cons,
RelationGetDescr(reln),
&sa,
false /* decompress */);
scan->storageRead.compressionState = cs;
}
/* Switch back to caller's memory context. */
MemoryContextSwitchTo(oldMemoryContext);
AppendOnlyExecutorReadBlock_Init(
&scan->executorReadBlock,
scan->aos_rd,
scan->aoScanInitContext,
&scan->storageRead,
scan->usableBlockSize);
scan->bufferDone = true; /* so we read a new buffer right away */
scan->initedStorageRoutines = true;
}
/*
* Do we have more segment files to read or are we done?
*/
while(scan->aos_splits_processed < list_length(scan->splits))
{
/* still have more segment files to read. get info of the next one */
FileSplit split = (FileSplitNode *)list_nth(scan->splits, scan->aos_splits_processed);
// new random read code
// splits in the same doesn't need to reopen file
if (scan->aos_splits_processed > 0) {
FileSplit lastSplit = (FileSplitNode *) list_nth(scan->splits,
scan->aos_splits_processed - 1);
if (split->segno == lastSplit->segno) {
toOpenFile = false;
}
}
scan->toCloseFile = true;
if (scan->aos_splits_processed + 1 < list_length(scan->splits)) {
FileSplit nextSplit = (FileSplitNode *) list_nth(scan->splits,
scan->aos_splits_processed + 1);
if (split->segno == nextSplit->segno) {
scan->toCloseFile = false;
}
}
// int continueLength = 0;
segno = split->segno;
end_of_split = split->offsets + split->lengths;
offset = split->offsets;
scan->aos_splits_processed++;
Assert(eof==-1 || eof == split->logiceof);
if(eof==-1) eof = split->logiceof;
//workaround for split->logiceof is not set
if( end_of_split >eof )
eof = end_of_split;
/* lirong has combine continue splits in optimizer
* so we don't need to combine again
// combine continue splits
while(scan->aos_splits_processed + continueLength < list_length(scan->splits)){
FileSplit nextSplit = (FileSplitNode *) list_nth(scan->splits,
scan->aos_splits_processed + continueLength);
continueLength++;
if (split->segno == nextSplit->segno && end_of_split == nextSplit->offsets) {
end_of_split = end_of_split + nextSplit->lengths;
scan->aos_splits_processed++;
}else{
break;
}
}
*/
/*
* special case: we are the QD reading from an AO table in utility mode
* (gp_dump). We see entries in the aoseg table but no files or data
* actually exist. If we try to open this file we'll get an error, so
* we must skip to the next. For now, we can test if the file exists by
* looking at the end_of_split value - it's always 0 on the QD.
*/
if(end_of_split > 0)
{
/* Initialize the block directory for inserts if needed. */
if (scan->buildBlockDirectory)
{
ItemPointerData tid;
/*
* if building the block directory, we need to make sure the
* sequence starts higher than our highest tuple's rownum. In
* the case of upgraded blocks, the highest tuple will
* have tupCount as its row num for non-upgrade cases, which
* use the sequence, it will be enough to start off the end
* of the sequence; note that this is not ideal -- if we are at
* least curSegInfo->tupcount + 1 then we don't even need to
* update the sequence value.
*/
int64 firstSequence =
GetFastSequences(scan->aoEntry->segrelid,
segno,
1,
NUM_FAST_SEQUENCES,
&tid);
AppendOnlyBlockDirectory_Init_forInsert(scan->blockDirectory,
scan->aoEntry,
scan->appendOnlyMetaDataSnapshot,
NULL,
0, /* lastSequence */
scan->aos_rd,
segno, /* segno */
1 /* columnGroupNo */);
Assert(!"need contentid here");
InsertFastSequenceEntry(scan->aoEntry->segrelid,
segno,
firstSequence,
/*TODO, need change in hawq*/
&tid);
}
finished_all_splits = false;
break;
}
}
if(finished_all_splits)
{
/* finished reading all segment files */
scan->aos_need_new_split = false;
scan->aos_done_all_splits = true;
return false;
}
MakeAOSegmentFileName(reln, segno, -1, &fileSegNo, scan->aos_filenamepath);
Assert(strlen(scan->aos_filenamepath) + 1 <= scan->aos_filenamepath_maxlen);
Assert(scan->initedStorageRoutines);
/* old code
AppendOnlyStorageRead_OpenFile(
&scan->storageRead,
scan->aos_filenamepath,
end_of_split,
eof,
offset);
*/
/* lei's code
if (toOpenFile) {
AppendOnlyStorageRead_OpenFile(&scan->storageRead,
scan->aos_filenamepath, end_of_split, eof, offset);
} else {
AppendOnlyStorageRead *storageRead = &scan->storageRead;
storageRead->logicalEof = end_of_split;
storageRead->bufferedRead.largeReadPosition = offset;
BufferedReadSetFile(&storageRead->bufferedRead, storageRead->file,
storageRead->segmentFileName, end_of_split);
}
*/
AppendOnlyStorageRead_OpenFile(&scan->storageRead,
scan->aos_filenamepath, end_of_split,eof, offset,toOpenFile);
AppendOnlyExecutionReadBlock_SetSegmentFileNum(
&scan->executorReadBlock,
segno);
AppendOnlyExecutionReadBlock_SetPositionInfo(
&scan->executorReadBlock,
/* blockFirstRowNum */ 1);
/* ready to go! */
scan->aos_need_new_split = false;
if (Debug_appendonly_print_scan)
elog(LOG,"Append-only scan initialize for table '%s', %u/%u/%u, segment file %u, END_OF_SPLIT" INT64_FORMAT ", EOF " INT64_FORMAT ", "
"(compression = %s, usable blocksize %d)",
NameStr(scan->aos_rd->rd_rel->relname),
scan->aos_rd->rd_node.spcNode,
scan->aos_rd->rd_node.dbNode,
scan->aos_rd->rd_node.relNode,
segno,
end_of_split,
eof,
(scan->storageAttributes.compress ? "true" : "false"),
scan->usableBlockSize);
return true;
}
/*
* errcontext_appendonly_insert_block_user_limit
*
* Add an errcontext() line showing the table name but little else because this is a user
* caused error.
*/
static int
errcontext_appendonly_insert_block_user_limit(AppendOnlyInsertDesc aoInsertDesc)
{
char *relationName = NameStr(aoInsertDesc->aoi_rel->rd_rel->relname);
errcontext(
"Append-Only table '%s'",
relationName);
return 0;
}
/*
* errcontext_appendonly_insert_block
*
* Add an errcontext() line showing the table, segment file, offset in file, block count of the block being inserted.
*/
static int
errcontext_appendonly_insert_block(AppendOnlyInsertDesc aoInsertDesc)
{
char *relationName = NameStr(aoInsertDesc->aoi_rel->rd_rel->relname);
int segmentFileNum = aoInsertDesc->cur_segno;
int64 headerOffsetInFile = AppendOnlyStorageWrite_CurrentPosition(&aoInsertDesc->storageWrite);
int64 blockFirstRowNum = aoInsertDesc->blockFirstRowNum;
int64 bufferCount = aoInsertDesc->bufferCount;
errcontext(
"Append-Only table '%s', segment file #%d, block header offset in file = " INT64_FORMAT ", "
"block first row number " INT64_FORMAT ", bufferCount " INT64_FORMAT ")",
relationName,
segmentFileNum,
headerOffsetInFile,
blockFirstRowNum,
bufferCount);
return 0;
}
/*
* errdetail_appendonly_insert_block_header
*
* Add an errdetail() line showing the Append-Only Storage block header for the block being inserted.
*/
static int
errdetail_appendonly_insert_block_header(AppendOnlyInsertDesc aoInsertDesc)
{
uint8 *header;
bool usingChecksum;
header = AppendOnlyStorageWrite_GetCurrentInternalBuffer(&aoInsertDesc->storageWrite);
usingChecksum = aoInsertDesc->usingChecksum;
return errdetail_appendonly_storage_content_header(header, usingChecksum, aoInsertDesc->storageAttributes.version);
}
/*
* Open the next file segment for write.
*/
static void
SetCurrentFileSegForWrite(AppendOnlyInsertDesc aoInsertDesc, ResultRelSegFileInfo *segfileinfo)
{
FileSegInfo *fsinfo;
int64 eof;
int64 eof_uncompressed;
int64 varblockcount;
int32 fileSegNo;
/* Make the 'segment' file name */
MakeAOSegmentFileName(aoInsertDesc->aoi_rel,
aoInsertDesc->cur_segno, -1,
&fileSegNo,
aoInsertDesc->appendFilePathName);
Assert(strlen(aoInsertDesc->appendFilePathName) + 1 <= aoInsertDesc->appendFilePathNameMaxLen);
/*
* In order to append to this file segment entry we must first
* acquire the relation Append-Only segment file (transaction-scope) lock (tag
* LOCKTAG_RELATION_APPENDONLY_SEGMENT_FILE) in order to guarantee
* stability of the pg_aoseg information on this segment file and exclusive right
* to append data to the segment file.
*
* NOTE: This is a transaction scope lock that must be held until commit / abort.
*/
LockRelationAppendOnlySegmentFile(
&aoInsertDesc->aoi_rel->rd_node,
aoInsertDesc->cur_segno,
AccessExclusiveLock,
/* dontWait */ false);
/* Now, get the information for the file segment we are going to append to. */
aoInsertDesc->fsInfo = (FileSegInfo *) palloc0(sizeof(FileSegInfo));
/*
* in hawq, we cannot insert a new catalog entry and then update,
* since we cannot get the tid of added tuple.
* we should add the new catalog entry on master and then dispatch it to segments for update.
*/
fsinfo = aoInsertDesc->fsInfo;
Assert(segfileinfo->numfiles == 1);
fsinfo->segno = segfileinfo->segno;
fsinfo->eof = segfileinfo->eof[0];
fsinfo->eof_uncompressed = segfileinfo->uncompressed_eof[0];
fsinfo->varblockcount = segfileinfo->varblock;
fsinfo->tupcount = segfileinfo->tupcount;
eof = (int64)fsinfo->eof;
eof_uncompressed = (int64)fsinfo->eof_uncompressed;
varblockcount = (int64)fsinfo->varblockcount;
aoInsertDesc->rowCount = fsinfo->tupcount;
/*
* Open the existing file for write.
*/
AppendOnlyStorageWrite_OpenFile(
&aoInsertDesc->storageWrite,
aoInsertDesc->appendFilePathName,
eof,
eof_uncompressed,
&aoInsertDesc->aoi_rel->rd_node,
aoInsertDesc->cur_segno);
/* reset counts */
aoInsertDesc->insertCount = 0;
aoInsertDesc->varblockCount = 0;
/*
* Use the current block count from the segfile info so our system log error messages are
* accurate.
*/
aoInsertDesc->bufferCount = varblockcount;
}
/*
* Finished scanning this file segment. Close it.
*/
static void
CloseScannedFileSeg(AppendOnlyScanDesc scan)
{
AppendOnlyStorageRead_CloseFile(&scan->storageRead);
scan->aos_need_new_split = true;
}
/*
* Finished writing to this file segment. Update catalog and close file.
*/
static void
CloseWritableFileSeg(AppendOnlyInsertDesc aoInsertDesc)
{
int64 fileLen_uncompressed;
int64 fileLen;
AppendOnlyStorageWrite_TransactionFlushAndCloseFile(
&aoInsertDesc->storageWrite,
&fileLen,
&fileLen_uncompressed);
aoInsertDesc->sendback->segno = aoInsertDesc->cur_segno;
aoInsertDesc->sendback->varblock = aoInsertDesc->varblockCount;
aoInsertDesc->sendback->insertCount = aoInsertDesc->insertCount;
aoInsertDesc->sendback->eof = palloc(sizeof(int64));
aoInsertDesc->sendback->uncompressed_eof = palloc(sizeof(int64));
aoInsertDesc->sendback->numfiles = 1;
aoInsertDesc->sendback->eof[0] = fileLen;
aoInsertDesc->sendback->uncompressed_eof[0] = fileLen_uncompressed;
aoInsertDesc->sendback->nextFastSequence = aoInsertDesc->lastSequence + aoInsertDesc->numSequences - 1;
/*
* Update the AO segment info table with our new eof
*/
if (Gp_role != GP_ROLE_EXECUTE)
UpdateFileSegInfo(aoInsertDesc->aoi_rel,
aoInsertDesc->aoEntry,
aoInsertDesc->cur_segno,
fileLen,
fileLen_uncompressed,
aoInsertDesc->insertCount,
aoInsertDesc->varblockCount);
pfree(aoInsertDesc->fsInfo);
aoInsertDesc->fsInfo = NULL;
if (Debug_appendonly_print_insert)
elog(LOG,
"Append-only scan closed write file segment #%d for table %s "
"(file length " INT64_FORMAT ", insert count %f, VarBlock count %f",
aoInsertDesc->cur_segno,
NameStr(aoInsertDesc->aoi_rel->rd_rel->relname),
fileLen,
aoInsertDesc->insertCount,
aoInsertDesc->varblockCount);
}
//------------------------------------------------------------------------------
static void
AppendOnlyExecutorReadBlock_GetContents(
AppendOnlyExecutorReadBlock *executorReadBlock)
{
VarBlockCheckError varBlockCheckError;
if (!executorReadBlock->isCompressed)
{
if (!executorReadBlock->isLarge)
{
/*
* Small content.
*/
executorReadBlock->dataBuffer =
AppendOnlyStorageRead_GetBuffer(executorReadBlock->storageRead, true);
if (Debug_appendonly_print_scan)
elog(LOG,
"Append-only scan read small non-compressed block for table '%s' "
"(length = %d, segment file '%s', block offset in file = " INT64_FORMAT ")",
AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead),
executorReadBlock->dataLen,
AppendOnlyStorageRead_SegmentFileName(executorReadBlock->storageRead),
executorReadBlock->headerOffsetInFile);
}
else
{
/*
* Large row.
*/
// UNDONE: Error out if NOTOAST isn't ON.
// UNDONE: Error out if it is not a single row
Assert(executorReadBlock->executorBlockKind == AoExecutorBlockKind_SingleRow);
/*
* Enough room in our private buffer?
* UNDONE: Is there a way to avoid the 2nd copy later doProcessTuple?
*/
if (executorReadBlock->largeContentBufferLen < executorReadBlock->dataLen)
{
MemoryContext oldMemoryContext;
/*
* Buffer too small.
*/
oldMemoryContext =
MemoryContextSwitchTo(executorReadBlock->memoryContext);
if (executorReadBlock->largeContentBuffer != NULL)
{
/*
* Make sure we set the our pointer to NULL here in case
* the subsequent allocation fails. Otherwise cleanup will
* get confused.
*/
pfree(executorReadBlock->largeContentBuffer);
executorReadBlock->largeContentBuffer = NULL;
}
executorReadBlock->largeContentBuffer = (uint8*)palloc(executorReadBlock->dataLen);
executorReadBlock->largeContentBufferLen = executorReadBlock->dataLen;
/* Deallocation and allocation done. Go back to caller memory-context. */
MemoryContextSwitchTo(oldMemoryContext);
}
executorReadBlock->dataBuffer = executorReadBlock->largeContentBuffer;
AppendOnlyStorageRead_Content(
executorReadBlock->storageRead,
executorReadBlock->dataBuffer,
executorReadBlock->dataLen,
false);
if (Debug_appendonly_print_scan)
elog(LOG,
"Append-only scan read large row for table '%s' "
"(length = %d, segment file '%s', "
"block offset in file = " INT64_FORMAT ")",
AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead),
executorReadBlock->dataLen,
AppendOnlyStorageRead_SegmentFileName(executorReadBlock->storageRead),
executorReadBlock->headerOffsetInFile);
}
}
else
{
int32 compressedLen =
AppendOnlyStorageRead_CurrentCompressedLen(executorReadBlock->storageRead);
// AppendOnlyStorageWrite does not report compressed for large content metadata.
Assert(!executorReadBlock->isLarge);
/*
* Decompress into our temporary buffer.
*/
executorReadBlock->dataBuffer = executorReadBlock->uncompressedBuffer;
AppendOnlyStorageRead_Content(
executorReadBlock->storageRead,
executorReadBlock->dataBuffer,
executorReadBlock->dataLen,
true);
if (Debug_appendonly_print_scan)
elog(LOG,
"Append-only scan read decompressed block for table '%s' "
"(compressed length %d, length = %d, segment file '%s', "
"block offset in file = " INT64_FORMAT ")",
AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead),
compressedLen,
executorReadBlock->dataLen,
AppendOnlyStorageRead_SegmentFileName(executorReadBlock->storageRead),
executorReadBlock->headerOffsetInFile);
}
/*
* The executorBlockKind value is what the executor -- i.e. the upper
* part of this appendonlyam module! -- has stored in the Append-Only
* Storage header. We interpret it here.
*/
switch (executorReadBlock->executorBlockKind)
{
case AoExecutorBlockKind_VarBlock:
varBlockCheckError = VarBlockIsValid(executorReadBlock->dataBuffer, executorReadBlock->dataLen);
if (varBlockCheckError != VarBlockCheckOk)
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("VarBlock is not valid. "
"Valid block check error %d, detail '%s'",
varBlockCheckError,
VarBlockGetCheckErrorStr()),
errdetail_appendonly_read_storage_content_header(executorReadBlock->storageRead),
errcontext_appendonly_read_storage_block(executorReadBlock->storageRead)));
/*
* Now use the VarBlock module to extract the items out.
*/
VarBlockReaderInit(&executorReadBlock->varBlockReader,
executorReadBlock->dataBuffer,
executorReadBlock->dataLen);
executorReadBlock->readerItemCount = VarBlockReaderItemCount(&executorReadBlock->varBlockReader);
executorReadBlock->currentItemCount = 0;
if (executorReadBlock->rowCount != executorReadBlock->readerItemCount)
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Row count %d in append-only storage header does not match VarBlock item count %d",
executorReadBlock->rowCount,
executorReadBlock->readerItemCount),
errdetail_appendonly_read_storage_content_header(executorReadBlock->storageRead),
errcontext_appendonly_read_storage_block(executorReadBlock->storageRead)));
}
if (Debug_appendonly_print_scan)
{
elog(LOG,"Append-only scan read VarBlock for table '%s' with %d items (block offset in file = " INT64_FORMAT ")",
AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead),
executorReadBlock->readerItemCount,
executorReadBlock->headerOffsetInFile);
}
break;
case AoExecutorBlockKind_SingleRow:
if (executorReadBlock->rowCount != 1)
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Row count %d in append-only storage header is not 1 for single row",
executorReadBlock->rowCount),
errdetail_appendonly_read_storage_content_header(executorReadBlock->storageRead),
errcontext_appendonly_read_storage_block(executorReadBlock->storageRead)));
}
executorReadBlock->singleRow = executorReadBlock->dataBuffer;
executorReadBlock->singleRowLen = executorReadBlock->dataLen;
if (Debug_appendonly_print_scan)
{
elog(LOG,"Append-only scan read single row for table '%s' with length %d (block offset in file = " INT64_FORMAT ")",
AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead),
executorReadBlock->singleRowLen,
executorReadBlock->headerOffsetInFile);
}
break;
default:
elog(ERROR, "Unrecognized append-only executor block kind: %d",
executorReadBlock->executorBlockKind);
break;
}
}
static bool
AppendOnlyExecutorReadBlock_GetBlockInfo(
AppendOnlyStorageRead *storageRead,
AppendOnlyExecutorReadBlock *executorReadBlock,
bool isUseSplitLen)
{
int64 blockFirstRowNum = executorReadBlock->blockFirstRowNum;
if (!AppendOnlyStorageRead_GetBlockInfo(
storageRead,
&executorReadBlock->dataLen,
&executorReadBlock->executorBlockKind,
&executorReadBlock->blockFirstRowNum,
&executorReadBlock->rowCount,
&executorReadBlock->isLarge,
&executorReadBlock->isCompressed,
isUseSplitLen))
{
return false;
}
/* If the firstRowNum is not stored in the AOBlock,
* executorReadBlock->blockFirstRowNum is set to -1.
* Since this is properly updated by calling functions
* AppendOnlyExecutionReadBlock_SetPositionInfo and
* AppendOnlyExecutionReadBlock_FinishedScanBlock,
* we restore the last value when the block does not
* contain firstRowNum.
*/
if (executorReadBlock->blockFirstRowNum < 0)
{
executorReadBlock->blockFirstRowNum = blockFirstRowNum;
}
executorReadBlock->headerOffsetInFile =
AppendOnlyStorageRead_CurrentHeaderOffsetInFile(storageRead);
// UNDONE: Check blockFirstRowNum
return true;
}
static void
AppendOnlyExecutionReadBlock_SetSegmentFileNum(
AppendOnlyExecutorReadBlock *executorReadBlock,
int segmentFileNum)
{
executorReadBlock->segmentFileNum = segmentFileNum;
}
static void
AppendOnlyExecutionReadBlock_SetPositionInfo(
AppendOnlyExecutorReadBlock *executorReadBlock,
int64 blockFirstRowNum)
{
executorReadBlock->blockFirstRowNum = blockFirstRowNum;
}
static void
AppendOnlyExecutionReadBlock_FinishedScanBlock(
AppendOnlyExecutorReadBlock *executorReadBlock)
{
executorReadBlock->blockFirstRowNum += executorReadBlock->rowCount;
}
/*
* Initialize the ExecutorReadBlock once. Assumed to be zeroed out before the call.
*/
static void
AppendOnlyExecutorReadBlock_Init(
AppendOnlyExecutorReadBlock *executorReadBlock,
Relation relation,
MemoryContext memoryContext,
AppendOnlyStorageRead *storageRead,
int32 usableBlockSize)
{
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(memoryContext);
executorReadBlock->uncompressedBuffer = (uint8 *) palloc(usableBlockSize * sizeof(uint8));
executorReadBlock->mt_bind = create_memtuple_binding(RelationGetDescr(relation));
ItemPointerSet(&executorReadBlock->cdb_fake_ctid, 0, 0);
executorReadBlock->storageRead = storageRead;
executorReadBlock->memoryContext = memoryContext;
MemoryContextSwitchTo(oldcontext);
}
/*
* Free the space allocated inside ExexcutorReadBlock.
*/
static void
AppendOnlyExecutorReadBlock_Finish(
AppendOnlyExecutorReadBlock *executorReadBlock)
{
if (executorReadBlock->uncompressedBuffer)
{
pfree(executorReadBlock->uncompressedBuffer);
executorReadBlock->uncompressedBuffer = NULL;
}
if (executorReadBlock->mt_bind)
{
destroy_memtuple_binding(executorReadBlock->mt_bind);
executorReadBlock->mt_bind = NULL;
}
}
static void
AppendOnlyExecutorReadBlock_ResetCounts(
AppendOnlyExecutorReadBlock *executorReadBlock)
{
executorReadBlock->totalRowsScannned = 0;
}
static bool
AppendOnlyExecutorReadBlock_ProcessTuple(
AppendOnlyExecutorReadBlock *executorReadBlock,
int64 rowNum,
MemTuple tuple,
int32 tupleLen,
int nkeys,
ScanKey key,
TupleTableSlot *slot)
{
bool valid = true; // Assume for HeapKeyTestUsingSlot define.
AOTupleId *aoTupleId = (AOTupleId*)&executorReadBlock->cdb_fake_ctid;
AOTupleIdInit_Init(aoTupleId);
AOTupleIdInit_segmentFileNum(aoTupleId, executorReadBlock->segmentFileNum);
AOTupleIdInit_rowNum(aoTupleId, rowNum);
if(slot)
{
/*
* MPP-7372: If the AO table was created before the fix for this issue, it may
* contain tuples with misaligned bindings. Here we check if the stored memtuple
* is problematic and then create a clone of the tuple with properly aligned
* bindings to be used by the executor.
*/
if (!IsAOBlockAndMemtupleAlignmentFixed(executorReadBlock->storageRead->storageAttributes.version) &&
memtuple_has_misaligned_attribute(tuple, slot->tts_mt_bind))
{
/*
* Create a properly aligned clone of the memtuple.
* We p'alloc memory for the clone, so the slot is
* responsible for releasing the allocated memory.
*/
tuple = memtuple_aligned_clone(tuple, slot->tts_mt_bind, true /* upgrade */);
Assert(tuple);
ExecStoreMemTuple(tuple, slot, true /* shouldFree */);
}
else
{
ExecStoreMemTuple(tuple, slot, false);
}
slot_set_ctid(slot, &(executorReadBlock->cdb_fake_ctid));
}
/* skip visibility test, all tuples are visible */
if (key != NULL)
HeapKeyTestUsingSlot(slot, nkeys, key, valid);
if (Debug_appendonly_print_scan_tuple && valid)
elog(LOG,"Append-only scan tuple for table '%s' "
"(AOTupleId %s, tuple length %d, memtuple length %d, block offset in file " INT64_FORMAT ")",
AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead),
AOTupleIdToString(aoTupleId),
tupleLen,
memtuple_get_size(tuple, executorReadBlock->mt_bind),
executorReadBlock->headerOffsetInFile);
return valid;
}
static MemTuple
AppendOnlyExecutorReadBlock_ScanNextTuple(
AppendOnlyExecutorReadBlock *executorReadBlock,
int nkeys,
ScanKey key,
TupleTableSlot *slot)
{
MemTuple tuple;
Assert(slot);
switch (executorReadBlock->executorBlockKind)
{
case AoExecutorBlockKind_VarBlock:
/*
* get the next item (tuple) from the varblock
*/
while (true)
{
int itemLen = 0;
uint8 *itemPtr;
int64 rowNum;
itemPtr = VarBlockReaderGetNextItemPtr(
&executorReadBlock->varBlockReader,
&itemLen);
if (itemPtr == NULL)
{
/* no more items in the varblock, get new buffer */
AppendOnlyExecutionReadBlock_FinishedScanBlock(
executorReadBlock);
return NULL;
}
executorReadBlock->currentItemCount++;
executorReadBlock->totalRowsScannned++;
if (itemLen > 0)
{
tuple = (MemTuple) itemPtr;
rowNum = executorReadBlock->blockFirstRowNum +
executorReadBlock->currentItemCount - INT64CONST(1);
if (AppendOnlyExecutorReadBlock_ProcessTuple(
executorReadBlock,
rowNum,
tuple,
itemLen,
nkeys,
key,
slot))
return TupGetMemTuple(slot);
}
}
/* varblock sanity check */
if (executorReadBlock->readerItemCount !=
executorReadBlock->currentItemCount)
elog(NOTICE, "Varblock mismatch: Reader count %d, found %d items\n",
executorReadBlock->readerItemCount,
executorReadBlock->currentItemCount);
break;
case AoExecutorBlockKind_SingleRow:
{
int32 singleRowLen;
if (executorReadBlock->singleRow == NULL)
{
AppendOnlyExecutionReadBlock_FinishedScanBlock(
executorReadBlock);
return NULL; // Force fetching new block.
}
Assert(executorReadBlock->singleRowLen != 0);
tuple = (MemTuple) executorReadBlock->singleRow;
singleRowLen = executorReadBlock->singleRowLen;
/*
* Indicated used up for scan.
*/
executorReadBlock->singleRow = NULL;
executorReadBlock->singleRowLen = 0;
executorReadBlock->totalRowsScannned++;
if (AppendOnlyExecutorReadBlock_ProcessTuple(
executorReadBlock,
executorReadBlock->blockFirstRowNum,
tuple,
singleRowLen,
nkeys,
key,
slot))
return TupGetMemTuple(slot);
}
break;
default:
elog(ERROR, "Unrecognized append-only executor block kind: %d",
executorReadBlock->executorBlockKind);
break;
}
AppendOnlyExecutionReadBlock_FinishedScanBlock(
executorReadBlock);
return NULL; // No match.
}
static bool
AppendOnlyExecutorReadBlock_FetchTuple(
AppendOnlyExecutorReadBlock *executorReadBlock,
int64 rowNum,
int nkeys,
ScanKey key,
TupleTableSlot *slot)
{
MemTuple tuple;
int itemNum;
Assert(rowNum >= executorReadBlock->blockFirstRowNum);
Assert(rowNum <=
executorReadBlock->blockFirstRowNum +
executorReadBlock->rowCount - 1);
/*
* Get 0-based index to tuple.
*/
itemNum =
(int)(rowNum - executorReadBlock->blockFirstRowNum);
switch (executorReadBlock->executorBlockKind)
{
case AoExecutorBlockKind_VarBlock:
{
uint8 *itemPtr;
int itemLen;
itemPtr = VarBlockReaderGetItemPtr(
&executorReadBlock->varBlockReader,
itemNum,
&itemLen);
Assert(itemPtr != NULL);
tuple = (MemTuple) itemPtr;
if (AppendOnlyExecutorReadBlock_ProcessTuple(
executorReadBlock,
rowNum,
tuple,
itemLen,
nkeys,
key,
slot))
return true;
}
break;
case AoExecutorBlockKind_SingleRow:
{
Assert(itemNum == 0);
Assert (executorReadBlock->singleRow != NULL);
Assert(executorReadBlock->singleRowLen != 0);
tuple = (MemTuple) executorReadBlock->singleRow;
if (AppendOnlyExecutorReadBlock_ProcessTuple(
executorReadBlock,
rowNum,
tuple,
executorReadBlock->singleRowLen,
nkeys,
key,
slot))
return true;
}
break;
default:
elog(ERROR, "Unrecognized append-only executor block kind: %d",
executorReadBlock->executorBlockKind);
break;
}
return false; // No match.
}
//------------------------------------------------------------------------------
/*
* You can think of this scan routine as get next "executor" AO block.
*/
static bool
getNextBlock(
AppendOnlyScanDesc scan)
{
LABEL_START_GETNEXTBLOCK:
if (scan->aos_need_new_split)
{
/*
* Need to open a new segment file.
*/
if (!SetNextFileSegForRead(scan))
return false;
}
if (!AppendOnlyExecutorReadBlock_GetBlockInfo(
&scan->storageRead,
&scan->executorReadBlock,
true))
{
if (scan->buildBlockDirectory)
{
Assert(scan->blockDirectory != NULL);
AppendOnlyBlockDirectory_End_forInsert(scan->blockDirectory);
}
/* done reading the file */
if(scan->toCloseFile){
CloseScannedFileSeg(scan);
}
scan->aos_need_new_split = true;
return false;
}
if (scan->buildBlockDirectory)
{
Assert(scan->blockDirectory != NULL);
AppendOnlyBlockDirectory_InsertEntry(
scan->blockDirectory, 0,
scan->executorReadBlock.blockFirstRowNum,
scan->executorReadBlock.headerOffsetInFile,
scan->executorReadBlock.rowCount);
}
//skip invalid small content blocks
if(!scan->executorReadBlock.isLarge
&& scan->executorReadBlock.executorBlockKind == AoExecutorBlockKind_SingleRow
&& scan->executorReadBlock.rowCount==0)
{
//skip current block
AppendOnlyStorageRead_SkipCurrentBlock(&scan->storageRead, true);
goto LABEL_START_GETNEXTBLOCK;
}else{
AppendOnlyExecutorReadBlock_GetContents(
&scan->executorReadBlock);
}
return true;
}
/* ----------------
* appendonlygettup - fetch next heap tuple
*
* Initialize the scan if not already done; then advance to the next
* tuple in forward direction; return the next tuple in scan->aos_ctup,
* or set scan->aos_ctup.t_data = NULL if no more tuples.
*
* Note: the reason nkeys/key are passed separately, even though they are
* kept in the scan descriptor, is that the caller may not want us to check
* the scankeys.
* ----------------
*/
static MemTuple
appendonlygettup(AppendOnlyScanDesc scan,
ScanDirection dir __attribute__((unused)),
int nkeys,
ScanKey key,
TupleTableSlot *slot)
{
MemTuple tuple;
Assert(ScanDirectionIsForward(dir));
Assert(scan->usableBlockSize > 0);
for(;;)
{
if(scan->bufferDone)
{
/*
* Get the next block. We call this function until we
* successfully get a block to process, or finished reading
* all the data (all 'segment' files) for this relation.
*/
while(!getNextBlock(scan))
{
/* have we read all this relation's data. done! */
if(scan->aos_done_all_splits)
return NULL;
}
scan->bufferDone = false;
}
tuple = AppendOnlyExecutorReadBlock_ScanNextTuple(
&scan->executorReadBlock,
nkeys,
key,
slot);
if (tuple != NULL)
{
return tuple;
}
/* no more items in the varblock, get new buffer */
scan->bufferDone = true;
}
}
static void
cancelLastBuffer(AppendOnlyInsertDesc aoInsertDesc)
{
if (aoInsertDesc->nonCompressedData != NULL)
{
Assert(AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));
AppendOnlyStorageWrite_CancelLastBuffer(&aoInsertDesc->storageWrite);
aoInsertDesc->nonCompressedData = NULL;
}
else
Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));
}
static void
setupNextWriteBlock(AppendOnlyInsertDesc aoInsertDesc)
{
Assert(aoInsertDesc->nonCompressedData == NULL);
Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));
/*
* when trying to fetch the next write block, we first check whether
* this new write block would cross the boundary of split.
*/
AppendOnlyStorageWrite_PadOutForSplit(&aoInsertDesc->storageWrite, aoInsertDesc->usableBlockSize);
/* Set the firstRowNum for the block */
aoInsertDesc->blockFirstRowNum = aoInsertDesc->lastSequence + 1;
AppendOnlyStorageWrite_SetFirstRowNum(&aoInsertDesc->storageWrite,
aoInsertDesc->blockFirstRowNum);
if(!aoInsertDesc->shouldCompress)
{
aoInsertDesc->nonCompressedData =
AppendOnlyStorageWrite_GetBuffer(
&aoInsertDesc->storageWrite,
AoHeaderKind_SmallContent);
/*
* Prepare our VarBlock for items. Leave room for the Append-Only
* Storage header.
*/
VarBlockMakerInit(&aoInsertDesc->varBlockMaker,
aoInsertDesc->nonCompressedData,
aoInsertDesc->maxDataLen,
aoInsertDesc->tempSpace,
aoInsertDesc->tempSpaceLen);
}
else
{
/*
* Block oriented compression. We also restrict the size of the
* buffer to leave room for the Append-Only Storage header in case the
* block cannot be compressed by the compress library.
*/
VarBlockMakerInit(&aoInsertDesc->varBlockMaker,
aoInsertDesc->uncompressedBuffer,
aoInsertDesc->maxDataLen,
aoInsertDesc->tempSpace,
aoInsertDesc->tempSpaceLen);
}
aoInsertDesc->bufferCount++;
}
static void
finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc)
{
int executorBlockKind;
int itemCount;
int32 dataLen;
executorBlockKind = AoExecutorBlockKind_VarBlock; // Assume.
itemCount = VarBlockMakerItemCount(&aoInsertDesc->varBlockMaker);
if (itemCount == 0)
{
/*
* "Cancel" the last block allocation, if one.
*/
cancelLastBuffer(aoInsertDesc);
return;
}
dataLen = VarBlockMakerFinish(&aoInsertDesc->varBlockMaker);
aoInsertDesc->varblockCount++;
if(!aoInsertDesc->shouldCompress)
{
if (itemCount == 1)
{
dataLen = VarBlockCollapseToSingleItem(
/* target */ aoInsertDesc->nonCompressedData,
/* source */ aoInsertDesc->nonCompressedData,
/* sourceLen */ dataLen);
executorBlockKind = AoExecutorBlockKind_SingleRow;
}
AppendOnlyStorageWrite_FinishBuffer(
&aoInsertDesc->storageWrite,
dataLen,
executorBlockKind,
itemCount);
aoInsertDesc->nonCompressedData = NULL;
Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));
if (Debug_appendonly_print_insert)
elog(LOG,
"Append-only insert finished uncompressed block for table '%s' "
"(length = %d, application specific %d, item count %d, block count " INT64_FORMAT ")",
NameStr(aoInsertDesc->aoi_rel->rd_rel->relname),
dataLen,
executorBlockKind,
itemCount,
aoInsertDesc->bufferCount);
}
else
{
if (itemCount == 1)
{
dataLen = VarBlockCollapseToSingleItem(
/* target */ aoInsertDesc->uncompressedBuffer,
/* source */ aoInsertDesc->uncompressedBuffer,
/* sourceLen */ dataLen);
executorBlockKind = AoExecutorBlockKind_SingleRow;
}
else
{
Assert(executorBlockKind == AoExecutorBlockKind_VarBlock);
/*
* Just before finishing the attempting to compress the VarBlock, let's verify the VarBlock has integrity, honor, etc.
*/
if (gp_appendonly_verify_write_block)
{
VarBlockCheckError varBlockCheckError;
varBlockCheckError = VarBlockIsValid(aoInsertDesc->uncompressedBuffer, dataLen);
if (varBlockCheckError != VarBlockCheckOk)
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("Verify block during write found VarBlock is not valid. "
"Valid block check error %d, detail '%s'",
varBlockCheckError,
VarBlockGetCheckErrorStr()),
errdetail_appendonly_insert_block_header(aoInsertDesc),
errcontext_appendonly_insert_block(aoInsertDesc)));
}
}
AppendOnlyStorageWrite_Content(
&aoInsertDesc->storageWrite,
aoInsertDesc->uncompressedBuffer,
dataLen,
executorBlockKind,
itemCount);
}
/* Insert an entry to the block directory */
AppendOnlyBlockDirectory_InsertEntry(
&aoInsertDesc->blockDirectory,
0,
aoInsertDesc->blockFirstRowNum,
AppendOnlyStorageWrite_LastWriteBeginPosition(&aoInsertDesc->storageWrite),
itemCount);
Assert(aoInsertDesc->nonCompressedData == NULL);
Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));
}
/* ----------------------------------------------------------------
* append-only access method interface
* ----------------------------------------------------------------
*/
/* ----------------
* appendonly_beginscan - begin relation scan
* ----------------
*/
AppendOnlyScanDesc
appendonly_beginscan(Relation relation, Snapshot appendOnlyMetaDataSnapshot, int nkeys, ScanKey key)
{
AppendOnlyScanDesc scan;
AppendOnlyEntry *aoentry;
char* comptype;
int complevel;
AppendOnlyStorageAttributes *attr;
StringInfoData titleBuf;
ValidateAppendOnlyMetaDataSnapshot(&appendOnlyMetaDataSnapshot);
/*
* increment relation ref count while scanning relation
*
* This is just to make really sure the relcache entry won't go away while
* the scan has a pointer to it. Caller should be holding the rel open
* anyway, so this is redundant in all normal scenarios...
*/
RelationIncrementReferenceCount(relation);
/*
* allocate scan descriptor
*/
scan = (AppendOnlyScanDesc) palloc0(sizeof(AppendOnlyScanDescData));
/*
* Get the pg_appendonly information for this table
*/
aoentry = GetAppendOnlyEntry(RelationGetRelid(relation), appendOnlyMetaDataSnapshot);
scan->aoEntry = aoentry;
complevel = aoentry->compresslevel;
comptype = aoentry->compresstype;
/*
* initialize the scan descriptor
*/
scan->aos_filenamepath_maxlen = AOSegmentFilePathNameLen(relation) + 1;
scan->aos_filenamepath = (char*)palloc(scan->aos_filenamepath_maxlen);
scan->aos_filenamepath[0] = '\0';
scan->usableBlockSize = aoentry->blocksize; /*AppendOnlyStorage_GetUsableBlockSize(aoentry->blocksize); */
scan->aos_rd = relation;
scan->appendOnlyMetaDataSnapshot = appendOnlyMetaDataSnapshot;
scan->aos_nkeys = nkeys;
scan->aoScanInitContext = CurrentMemoryContext;
initStringInfo(&titleBuf);
appendStringInfo(&titleBuf, "Scan of Append-Only Row-Oriented relation '%s'",
RelationGetRelationName(relation));
scan->title = titleBuf.data;
/*
* Fill in Append-Only Storage layer attributes.
*/
attr = &scan->storageAttributes;
/*
* These attributes describe the AppendOnly format to be scanned.
*/
if (aoentry->compresstype == NULL || pg_strcasecmp(aoentry->compresstype, "none") == 0)
attr->compress = false;
else
attr->compress = true;
if (aoentry->compresstype != NULL)
attr->compressType = aoentry->compresstype;
else
attr->compressType = "none";
attr->compressLevel = aoentry->compresslevel;
attr->checksum = aoentry->checksum;
attr->safeFSWriteSize = aoentry->safefswritesize;
attr->splitsize = aoentry->splitsize;
attr->version = aoentry->version;
AORelationVersion_CheckValid(attr->version);
/*
* Adding a NOTOAST table attribute in 3.3.3 would require a catalog change,
* so in the interim we will test this with a GUC.
*
* This GUC must have the same value on write and read.
*/
// scan->aos_notoast = aoentry->notoast;
scan->aos_notoast = Debug_appendonly_use_no_toast;
// UNDONE: We are calling the static header length routine here.
scan->maxDataLen =
scan->usableBlockSize -
AppendOnlyStorageFormat_RegularHeaderLenNeeded(scan->storageAttributes.checksum);
/*
* we do this here instead of in initscan() because appendonly_rescan also calls
* initscan() and we don't want to allocate memory again
*/
if (nkeys > 0)
scan->aos_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
else
scan->aos_key = NULL;
//pgstat_initstats(relation);
initscan(scan, key);
scan->buildBlockDirectory = false;
scan->blockDirectory = NULL;
return scan;
}
/* ----------------
* appendonly_rescan - restart a relation scan
*
*
* TODO: instead of freeing resources here and reallocating them in initscan
* over and over see which of them can be refactored into appendonly_beginscan
* and persist there until endscan is finally reached. For now this will do.
* ----------------
*/
void
appendonly_rescan(AppendOnlyScanDesc scan,
ScanKey key)
{
CloseScannedFileSeg(scan);
AppendOnlyStorageRead_FinishSession(&scan->storageRead);
scan->initedStorageRoutines = false;
AppendOnlyExecutorReadBlock_Finish(&scan->executorReadBlock);
scan->aos_need_new_split = true;
/*
* reinitialize scan descriptor
*/
initscan(scan, key);
}
/* ----------------
* appendonly_endscan - end relation scan
* ----------------
*/
void
appendonly_endscan(AppendOnlyScanDesc scan)
{
RelationDecrementReferenceCount(scan->aos_rd);
if (scan->aos_key)
pfree(scan->aos_key);
CloseScannedFileSeg(scan);
AppendOnlyStorageRead_FinishSession(&scan->storageRead);
scan->initedStorageRoutines = false;
AppendOnlyExecutorReadBlock_Finish(&scan->executorReadBlock);
pfree(scan->aos_filenamepath);
pfree(scan->aoEntry);
pfree(scan->title);
pfree(scan);
}
/* ----------------
* appendonly_getnext - retrieve next tuple in scan
* ----------------
*/
MemTuple
appendonly_getnext(AppendOnlyScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
{
MemTuple tup = appendonlygettup(scan, direction, scan->aos_nkeys, scan->aos_key, slot);
if (tup == NULL)
{
if(slot)
ExecClearTuple(slot);
return NULL;
}
pgstat_count_heap_getnext(scan->aos_rd);
return tup;
}
static void
closeFetchSegmentFile(
AppendOnlyFetchDesc aoFetchDesc)
{
Assert(aoFetchDesc->currentSegmentFile.isOpen);
AppendOnlyStorageRead_CloseFile(&aoFetchDesc->storageRead);
aoFetchDesc->currentSegmentFile.isOpen = false;
}
static bool
openFetchSegmentFile(
AppendOnlyFetchDesc aoFetchDesc,
int openSegmentFileNum)
{
int i;
FileSegInfo *fsInfo;
int segmentFileNum;
int64 logicalEof;
int32 fileSegNo;
Assert(!aoFetchDesc->currentSegmentFile.isOpen);
i = 0;
while (true)
{
if (i >= aoFetchDesc->totalSegfiles)
return false; // Segment file not visible in catalog information.
fsInfo = aoFetchDesc->segmentFileInfo[i];
segmentFileNum = fsInfo->segno;
if (openSegmentFileNum == segmentFileNum)
{
logicalEof = (int64)fsInfo->eof;
break;
}
i++;
}
/*
* Don't try to open a segment file when its EOF is 0, since the file may not
* exist. See MPP-8280.
*/
if (logicalEof == 0)
return false;
MakeAOSegmentFileName(
aoFetchDesc->relation,
openSegmentFileNum, -1,
&fileSegNo,
aoFetchDesc->segmentFileName);
Assert(strlen(aoFetchDesc->segmentFileName) + 1 <=
aoFetchDesc->segmentFileNameMaxLen);
// UNDONE: Appropriate to use Try here?
if (!AppendOnlyStorageRead_TryOpenFile(
&aoFetchDesc->storageRead,
aoFetchDesc->segmentFileName,
logicalEof,
-1))
return false;
aoFetchDesc->currentSegmentFile.num = openSegmentFileNum;
aoFetchDesc->currentSegmentFile.logicalEof = logicalEof;
aoFetchDesc->currentSegmentFile.isOpen = true;
return true;
}
static bool
fetchNextBlock(
AppendOnlyFetchDesc aoFetchDesc)
{
AppendOnlyExecutorReadBlock *executorReadBlock =
&aoFetchDesc->executorReadBlock;
/*
* Try to read next block.
*/
if (!AppendOnlyExecutorReadBlock_GetBlockInfo(
&aoFetchDesc->storageRead,
&aoFetchDesc->executorReadBlock,
true))
return false; // Hit end of range.
/*
* Unpack information into member variables.
*/
aoFetchDesc->currentBlock.have = true;
aoFetchDesc->currentBlock.fileOffset =
executorReadBlock->headerOffsetInFile;
aoFetchDesc->currentBlock.overallBlockLen =
AppendOnlyStorageRead_OverallBlockLen(
&aoFetchDesc->storageRead);
aoFetchDesc->currentBlock.firstRowNum =
executorReadBlock->blockFirstRowNum;
aoFetchDesc->currentBlock.lastRowNum =
executorReadBlock->blockFirstRowNum +
executorReadBlock->rowCount - 1;
aoFetchDesc->currentBlock.isCompressed =
executorReadBlock->isCompressed;
aoFetchDesc->currentBlock.isLargeContent =
executorReadBlock->isLarge;
aoFetchDesc->currentBlock.gotContents = false;
return true;
}
static bool
fetchFromCurrentBlock(
AppendOnlyFetchDesc aoFetchDesc,
int64 rowNum,
TupleTableSlot *slot)
{
Assert(aoFetchDesc->currentBlock.have);
Assert(rowNum >= aoFetchDesc->currentBlock.firstRowNum);
Assert(rowNum <= aoFetchDesc->currentBlock.lastRowNum);
if (!aoFetchDesc->currentBlock.gotContents)
{
/*
* Do decompression if necessary and get contents.
*/
AppendOnlyExecutorReadBlock_GetContents(
&aoFetchDesc->executorReadBlock);
aoFetchDesc->currentBlock.gotContents = true;
}
return AppendOnlyExecutorReadBlock_FetchTuple(
&aoFetchDesc->executorReadBlock,
rowNum,
/* nkeys */ 0,
/* key */ NULL,
slot);
}
static void
positionFirstBlockOfRange(
AppendOnlyFetchDesc aoFetchDesc)
{
AppendOnlyBlockDirectoryEntry_GetBeginRange(
&aoFetchDesc->currentBlock.blockDirectoryEntry,
&aoFetchDesc->scanNextFileOffset,
&aoFetchDesc->scanNextRowNum);
}
static void
positionLimitToEndOfRange(
AppendOnlyFetchDesc aoFetchDesc)
{
AppendOnlyBlockDirectoryEntry_GetEndRange(
&aoFetchDesc->currentBlock.blockDirectoryEntry,
&aoFetchDesc->scanAfterFileOffset,
&aoFetchDesc->scanLastRowNum);
}
static void
positionSkipCurrentBlock(
AppendOnlyFetchDesc aoFetchDesc)
{
aoFetchDesc->scanNextFileOffset =
aoFetchDesc->currentBlock.fileOffset +
aoFetchDesc->currentBlock.overallBlockLen;
aoFetchDesc->scanNextRowNum = aoFetchDesc->currentBlock.lastRowNum + 1;
}
/*
* Scan through blocks to find row.
*
* If row is not represented in any of the blocks covered by the Block Directory, then the row
* falls into a row gap. The row must have been aborted or deleted and reclaimed.
*/
static bool
scanToFetchTuple(
AppendOnlyFetchDesc aoFetchDesc,
int64 rowNum,
TupleTableSlot *slot)
{
if (aoFetchDesc->scanNextFileOffset >=
aoFetchDesc->scanAfterFileOffset)
return false; // No more blocks requested for range.
if (aoFetchDesc->currentSegmentFile.logicalEof ==
aoFetchDesc->scanNextFileOffset)
return false; // No more blocks in this file.
if (aoFetchDesc->currentSegmentFile.logicalEof <
aoFetchDesc->scanNextFileOffset)
return false; // UNDONE: Why does our next scan position go beyond logical EOF?
/*
* Temporarily restrict our reading to just the range.
*/
AppendOnlyStorageRead_SetTemporaryRange(
&aoFetchDesc->storageRead,
aoFetchDesc->scanNextFileOffset,
aoFetchDesc->scanAfterFileOffset);
AppendOnlyExecutionReadBlock_SetSegmentFileNum(
&aoFetchDesc->executorReadBlock,
aoFetchDesc->currentSegmentFile.num);
AppendOnlyExecutionReadBlock_SetPositionInfo(
&aoFetchDesc->executorReadBlock,
aoFetchDesc->scanNextRowNum);
aoFetchDesc->skipBlockCount = 0;
while (true)
{
/*
* Fetch block starting at scanNextFileOffset.
*/
if (!fetchNextBlock(aoFetchDesc))
return false; // No more blocks.
/*
* Examine new current block header information.
*/
if (rowNum < aoFetchDesc->currentBlock.firstRowNum)
{
/*
* Since we have read a new block, the temporary
* range for the read needs to be adjusted
* accordingly. Otherwise, the underlying bufferedRead
* may stop reading more data because of the
* previously-set smaller temporary range.
*/
int64 beginFileOffset = aoFetchDesc->currentBlock.fileOffset;
int64 afterFileOffset = aoFetchDesc->currentBlock.fileOffset +
aoFetchDesc->currentBlock.overallBlockLen;
AppendOnlyStorageRead_SetTemporaryRange(
&aoFetchDesc->storageRead,
beginFileOffset,
afterFileOffset);
return false; // Row fell in gap between blocks.
}
if (rowNum <= aoFetchDesc->currentBlock.lastRowNum)
return fetchFromCurrentBlock(aoFetchDesc, rowNum, slot);
/*
* Update information to get next block.
*/
Assert(!aoFetchDesc->currentBlock.gotContents);
/* MPP-17061: reach the end of range covered by block directory entry */
if ((aoFetchDesc->currentBlock.fileOffset +
aoFetchDesc->currentBlock.overallBlockLen) >=
aoFetchDesc->scanAfterFileOffset)
{
return false;
}
AppendOnlyExecutionReadBlock_FinishedScanBlock(
&aoFetchDesc->executorReadBlock);
AppendOnlyStorageRead_SkipCurrentBlock(
&aoFetchDesc->storageRead,true);
aoFetchDesc->skipBlockCount++;
}
}
AppendOnlyFetchDesc
appendonly_fetch_init(
Relation relation,
Snapshot appendOnlyMetaDataSnapshot)
{
AppendOnlyFetchDesc aoFetchDesc;
AppendOnlyEntry *aoentry;
AppendOnlyStorageAttributes *attr;
ValidateAppendOnlyMetaDataSnapshot(&appendOnlyMetaDataSnapshot);
PGFunction *fns = NULL;
StringInfoData titleBuf;
/*
* increment relation ref count while scanning relation
*
* This is just to make really sure the relcache entry won't go away while
* the scan has a pointer to it. Caller should be holding the rel open
* anyway, so this is redundant in all normal scenarios...
*/
RelationIncrementReferenceCount(relation);
/*
* allocate scan descriptor
*/
aoFetchDesc = (AppendOnlyFetchDesc) palloc0(sizeof(AppendOnlyFetchDescData));
aoFetchDesc->relation = relation;
aoFetchDesc->appendOnlyMetaDataSnapshot = appendOnlyMetaDataSnapshot;
aoFetchDesc->initContext = CurrentMemoryContext;
aoFetchDesc->segmentFileNameMaxLen = AOSegmentFilePathNameLen(relation) + 1;
aoFetchDesc->segmentFileName =
(char*)palloc(aoFetchDesc->segmentFileNameMaxLen);
aoFetchDesc->segmentFileName[0] = '\0';
initStringInfo(&titleBuf);
appendStringInfo(&titleBuf, "Fetch of Append-Only Row-Oriented relation '%s'",
RelationGetRelationName(relation));
aoFetchDesc->title = titleBuf.data;
/*
* Get the pg_appendonly information for this table
*/
aoentry = GetAppendOnlyEntry(RelationGetRelid(relation), appendOnlyMetaDataSnapshot);
aoFetchDesc->aoEntry = aoentry;
/*
* Fill in Append-Only Storage layer attributes.
*/
attr = &aoFetchDesc->storageAttributes;
/*
* These attributes describe the AppendOnly format to be scanned.
*/
if (aoentry->compresstype == NULL || pg_strcasecmp(aoentry->compresstype, "none") == 0)
attr->compress = false;
else
attr->compress = true;
if (aoentry->compresstype != NULL)
attr->compressType = aoentry->compresstype;
else
attr->compressType = "none";
attr->compressLevel = aoentry->compresslevel;
attr->checksum = aoentry->checksum;
attr->safeFSWriteSize = aoentry->safefswritesize;
attr->splitsize = aoentry->splitsize;
attr->version = aoentry->version;
AORelationVersion_CheckValid(attr->version);
aoFetchDesc->usableBlockSize = aoentry->blocksize;
/* AppendOnlyStorage_GetUsableBlockSize(aoentry->blocksize); */
/*
* Get information about all the file segments we need to scan
* Currently, fetch operation is disabled. So we just set the
* segmentFileInfo NULL.
*/
aoFetchDesc->segmentFileInfo = NULL;
/*
GetAllFileSegInfo(
relation,
aoentry,
appendOnlyMetaDataSnapshot,
false,
&aoFetchDesc->totalSegfiles);
*/
AppendOnlyStorageRead_Init(
&aoFetchDesc->storageRead,
aoFetchDesc->initContext,
aoFetchDesc->usableBlockSize,
NameStr(aoFetchDesc->relation->rd_rel->relname),
aoFetchDesc->title,
&aoFetchDesc->storageAttributes);
fns = RelationGetRelationCompressionFuncs(relation);
aoFetchDesc->storageRead.compression_functions = fns;
if (fns)
{
PGFunction cons = fns[COMPRESSION_CONSTRUCTOR];
CompressionState *cs;
StorageAttributes sa;
sa.comptype = aoentry->compresstype;
sa.complevel = aoentry->compresslevel;
sa.blocksize = aoentry->blocksize;
cs = callCompressionConstructor(cons, RelationGetDescr(relation),
&sa,
false /* decompress */);
aoFetchDesc->storageRead.compressionState = cs;
}
AppendOnlyExecutorReadBlock_Init(
&aoFetchDesc->executorReadBlock,
aoFetchDesc->relation,
aoFetchDesc->initContext,
&aoFetchDesc->storageRead,
aoFetchDesc->usableBlockSize);
AppendOnlyBlockDirectory_Init_forSearch(
&aoFetchDesc->blockDirectory,
aoentry,
appendOnlyMetaDataSnapshot,
aoFetchDesc->segmentFileInfo,
aoFetchDesc->totalSegfiles,
aoFetchDesc->relation,
1);
return aoFetchDesc;
}
/*
* appendonly_fetch -- fetch the tuple for a given tid.
*
* If the 'slot' is not NULL, the fetched tuple will be assigned to the slot.
*
* Return true if such a tuple is found. Otherwise, return false.
*/
bool
appendonly_fetch(
AppendOnlyFetchDesc aoFetchDesc,
AOTupleId *aoTupleId,
TupleTableSlot *slot)
{
int segmentFileNum = AOTupleIdGet_segmentFileNum(aoTupleId);
int64 rowNum = AOTupleIdGet_rowNum(aoTupleId);
/*
* Do we have a current block? If it has the requested tuple,
* that would be a great performance optimization.
*/
if (aoFetchDesc->currentBlock.have)
{
if (segmentFileNum == aoFetchDesc->currentSegmentFile.num &&
segmentFileNum == aoFetchDesc->blockDirectory.currentSegmentFileNum)
{
if (rowNum >= aoFetchDesc->currentBlock.firstRowNum &&
rowNum <= aoFetchDesc->currentBlock.lastRowNum)
return fetchFromCurrentBlock(aoFetchDesc, rowNum, slot);
/*
* Otherwize, if the current Block Directory entry covers the request tuples,
* lets use its information as another performance optimization.
*/
if (AppendOnlyBlockDirectoryEntry_RangeHasRow(
&aoFetchDesc->currentBlock.blockDirectoryEntry,
rowNum))
{
/*
* The tuple is covered by the current Block Directory entry, but is it
* before or after our current block?
*/
if (rowNum < aoFetchDesc->currentBlock.firstRowNum)
{
/*
* XXX This could happen when an insert is cancelled. In that case, we
* fetched the next block that has a higher firstRowNum when we
* try to find the first cancelled row. So for the second or any
* cancelled row, we enter here, and re-read the previous block.
* This seems inefficient.
*
* We may be able to fix this by adding an entry to the block
* directory for those cancelled inserts.
*/
/*
* Set scan range to prior blocks.
*/
positionFirstBlockOfRange(aoFetchDesc);
// Set limit to before current block.
aoFetchDesc->scanAfterFileOffset =
aoFetchDesc->currentBlock.fileOffset;
aoFetchDesc->scanLastRowNum =
aoFetchDesc->currentBlock.firstRowNum - 1;
}
else
{
/*
* Set scan range to following blocks.
*/
positionSkipCurrentBlock(aoFetchDesc);
positionLimitToEndOfRange(aoFetchDesc);
}
if (scanToFetchTuple(aoFetchDesc, rowNum, slot))
return true;
if (slot != NULL)
ExecClearTuple(slot);
return false; // Segment file not in aoseg table..
}
}
}
// resetCurrentBlockInfo(aoFetchDesc);
/*
* Open or switch open, if necessary.
*/
if (aoFetchDesc->currentSegmentFile.isOpen &&
segmentFileNum != aoFetchDesc->currentSegmentFile.num)
{
#ifdef USE_ASSERT_CHECKING
if (segmentFileNum < aoFetchDesc->currentSegmentFile.num)
ereport(WARNING,
(errmsg("Append-only fetch requires scan prior segment file: "
"segmentFileNum %d, rowNum " INT64_FORMAT
", currentSegmentFileNum %d",
segmentFileNum, rowNum, aoFetchDesc->currentSegmentFile.num)));
#endif
closeFetchSegmentFile(aoFetchDesc);
Assert(!aoFetchDesc->currentSegmentFile.isOpen);
}
if (!aoFetchDesc->currentSegmentFile.isOpen)
{
if (!openFetchSegmentFile(
aoFetchDesc,
segmentFileNum))
{
if (slot != NULL)
ExecClearTuple(slot);
return false; // Segment file not in aoseg table..
// Must be aborted or deleted and reclaimed.
}
}
/*
* Need to get the Block Directory entry that covers the TID.
*/
if (!AppendOnlyBlockDirectory_GetEntry(
&aoFetchDesc->blockDirectory,
aoTupleId,
0,
&aoFetchDesc->currentBlock.blockDirectoryEntry))
{
if (slot != NULL)
{
ExecClearTuple(slot);
}
return false; /* Row not represented in Block Directory. */
/* Must be aborted or deleted and reclaimed. */
}
/*
* Set scan range covered by new Block Directory entry.
*/
positionFirstBlockOfRange(aoFetchDesc);
positionLimitToEndOfRange(aoFetchDesc);
if (scanToFetchTuple(aoFetchDesc, rowNum, slot))
return true;
if (slot != NULL)
ExecClearTuple(slot);
return false; // Segment file not in aoseg table..
}
void
appendonly_fetch_detail(
AppendOnlyFetchDesc aoFetchDesc,
AppendOnlyFetchDetail *aoFetchDetail)
{
aoFetchDetail->rangeFileOffset =
aoFetchDesc->currentBlock.blockDirectoryEntry.range.fileOffset;
aoFetchDetail->rangeFirstRowNum =
aoFetchDesc->currentBlock.blockDirectoryEntry.range.firstRowNum;
aoFetchDetail->rangeAfterFileOffset =
aoFetchDesc->currentBlock.blockDirectoryEntry.range.afterFileOffset;
aoFetchDetail->rangeLastRowNum =
aoFetchDesc->currentBlock.blockDirectoryEntry.range.lastRowNum;
aoFetchDetail->skipBlockCount = aoFetchDesc->skipBlockCount;
aoFetchDetail->blockFileOffset = aoFetchDesc->currentBlock.fileOffset;
aoFetchDetail->blockOverallLen = aoFetchDesc->currentBlock.overallBlockLen;
aoFetchDetail->blockFirstRowNum = aoFetchDesc->currentBlock.firstRowNum;
aoFetchDetail->blockLastRowNum = aoFetchDesc->currentBlock.lastRowNum;
aoFetchDetail->isCompressed = aoFetchDesc->currentBlock.isCompressed;
aoFetchDetail->isLargeContent = aoFetchDesc->currentBlock.isLargeContent;
}
void
appendonly_fetch_finish(AppendOnlyFetchDesc aoFetchDesc)
{
RelationDecrementReferenceCount(aoFetchDesc->relation);
AppendOnlyStorageRead_CloseFile(&aoFetchDesc->storageRead);
AppendOnlyStorageRead_FinishSession(&aoFetchDesc->storageRead);
AppendOnlyExecutorReadBlock_Finish(&aoFetchDesc->executorReadBlock);
AppendOnlyBlockDirectory_End_forSearch(&aoFetchDesc->blockDirectory);
if (aoFetchDesc->segmentFileInfo)
{
FreeAllSegFileInfo(aoFetchDesc->segmentFileInfo, aoFetchDesc->totalSegfiles);
pfree(aoFetchDesc->segmentFileInfo);
aoFetchDesc->segmentFileInfo = NULL;
}
pfree(aoFetchDesc->aoEntry);
aoFetchDesc->aoEntry = NULL;
pfree(aoFetchDesc->segmentFileName);
aoFetchDesc->segmentFileName = NULL;
pfree(aoFetchDesc->title);
}
/*
* appendonly_insert_init
*
* before using appendonly_insert() to insert tuples we need to call
* this function to initialize our varblock and bufferedAppend structures
* and memory for appending data into the relation file.
*
* see appendonly_insert() for more specifics about inserting tuples into
* append only tables.
*/
AppendOnlyInsertDesc
appendonly_insert_init(Relation rel, ResultRelSegFileInfo *segfileinfo)
{
AppendOnlyInsertDesc aoInsertDesc;
AppendOnlyEntry *aoentry;
int maxtupsize;
int64 firstSequence = 0;
PGFunction *fns;
int desiredOverflowBytes = 0;
size_t (*desiredCompressionSize)(size_t input);
AppendOnlyStorageAttributes *attr;
StringInfoData titleBuf;
/*
* Get the pg_appendonly information for this table
*/
aoentry = GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
/*
* allocate and initialize the insert descriptor
*/
aoInsertDesc = (AppendOnlyInsertDesc) palloc0(sizeof(AppendOnlyInsertDescData));
aoInsertDesc->aoi_rel = rel;
aoInsertDesc->appendOnlyMetaDataSnapshot = SnapshotNow;
// Writers uses this since they have exclusive access to the lock acquired with
// LockRelationAppendOnlySegmentFile for the segment-file.
aoInsertDesc->mt_bind = create_memtuple_binding(RelationGetDescr(rel));
aoInsertDesc->appendFile = -1;
aoInsertDesc->appendFilePathNameMaxLen = AOSegmentFilePathNameLen(rel) + 1;
aoInsertDesc->appendFilePathName = (char*)palloc(aoInsertDesc->appendFilePathNameMaxLen);
aoInsertDesc->appendFilePathName[0] = '\0';
aoInsertDesc->bufferCount = 0;
aoInsertDesc->blockFirstRowNum = 0;
aoInsertDesc->insertCount = 0;
aoInsertDesc->varblockCount = 0;
aoInsertDesc->rowCount = 0;
Assert(segfileinfo->segno >= 0);
aoInsertDesc->cur_segno = segfileinfo->segno;
aoInsertDesc->aoEntry = aoentry;
/*
* Adding a NOTOAST table attribute in 3.3.3 would require a catalog change,
* so in the interim we will test this with a GUC.
*
* This GUC must have the same value on write and read.
*/
// aoInsertDesc->useNoToast = aoentry->notoast;
aoInsertDesc->useNoToast = Debug_appendonly_use_no_toast;
aoInsertDesc->usableBlockSize = aoentry->blocksize; /* AppendOnlyStorage_GetUsableBlockSize(aoentry->blocksize); */
attr = &aoInsertDesc->storageAttributes;
/*
* These attributes describe the AppendOnly format to be scanned.
*/
if (aoentry->compresstype == NULL || pg_strcasecmp(aoentry->compresstype, "none") == 0)
attr->compress = false;
else
attr->compress = true;
if (aoentry->compresstype != NULL)
attr->compressType = aoentry->compresstype;
else
attr->compressType = "none";
attr->compressLevel = aoentry->compresslevel;
attr->checksum = aoentry->checksum;
attr->safeFSWriteSize = aoentry->safefswritesize;
attr->splitsize = aoentry->splitsize;
attr->version = aoentry->version;
AORelationVersion_CheckValid(attr->version);
fns = RelationGetRelationCompressionFuncs(rel);
CompressionState *cs = NULL;
CompressionState *verifyCs = NULL;
if (fns)
{
PGFunction cons = fns[COMPRESSION_CONSTRUCTOR];
StorageAttributes sa;
sa.comptype = aoentry->compresstype;
sa.complevel = aoentry->compresslevel;
sa.blocksize = aoentry->blocksize;
cs = callCompressionConstructor(cons, RelationGetDescr(rel),
&sa,
true /* compress */);
if (gp_appendonly_verify_write_block == true)
{
verifyCs = callCompressionConstructor(cons, RelationGetDescr(rel),
&sa,
false /* decompress */);
}
desiredCompressionSize = cs->desired_sz;
if (desiredCompressionSize != NULL)
{
/*
* Call the compression's desired size function to find out what additional
* space it requires for our block size.
*/
desiredOverflowBytes =
(int)(desiredCompressionSize)(aoInsertDesc->usableBlockSize)
- aoInsertDesc->usableBlockSize;
Assert(desiredOverflowBytes >= 0);
}
}
aoInsertDesc->storageAttributes.overflowSize = desiredOverflowBytes;
initStringInfo(&titleBuf);
appendStringInfo(&titleBuf, "Write of Append-Only Row-Oriented relation '%s'",
RelationGetRelationName(aoInsertDesc->aoi_rel));
aoInsertDesc->title = titleBuf.data;
AppendOnlyStorageWrite_Init(
&aoInsertDesc->storageWrite,
NULL,
aoInsertDesc->usableBlockSize,
RelationGetRelationName(aoInsertDesc->aoi_rel),
aoInsertDesc->title,
&aoInsertDesc->storageAttributes);
aoInsertDesc->storageWrite.compression_functions = fns;
aoInsertDesc->storageWrite.compressionState = cs;
aoInsertDesc->storageWrite.verifyWriteCompressionState = verifyCs;
if (Debug_appendonly_print_insert)
elog(LOG,"Append-only insert initialize for table '%s' segment file %u "
"(compression = %s, compression type %s, compression level %d)",
NameStr(aoInsertDesc->aoi_rel->rd_rel->relname),
aoInsertDesc->cur_segno,
(attr->compress ? "true" : "false"),
(aoentry->compresstype ? aoentry->compresstype : "<none>"),
attr->compressLevel);
/*
* Temporarily set the firstRowNum for the block so that we can
* calculate the correct header length.
*/
AppendOnlyStorageWrite_SetFirstRowNum(&aoInsertDesc->storageWrite,
1);
aoInsertDesc->completeHeaderLen =
AppendOnlyStorageWrite_CompleteHeaderLen(
&aoInsertDesc->storageWrite,
AoHeaderKind_SmallContent);
aoInsertDesc->maxDataLen =
aoInsertDesc->usableBlockSize -
aoInsertDesc->completeHeaderLen;
aoInsertDesc->tempSpaceLen = aoInsertDesc->usableBlockSize / 8; /* TODO - come up with a more efficient calculation */
aoInsertDesc->tempSpace = (uint8 *) palloc(aoInsertDesc->tempSpaceLen * sizeof(uint8));
maxtupsize = aoInsertDesc->maxDataLen - VARBLOCK_HEADER_LEN - 4;
aoInsertDesc->toast_tuple_threshold = maxtupsize / 4; /* see tuptoaster.h for more information */
aoInsertDesc->toast_tuple_target = maxtupsize / 4;
/* open our current relation file segment for write */
SetCurrentFileSegForWrite(aoInsertDesc, segfileinfo);
Assert(aoInsertDesc->tempSpaceLen > 0);
/*
* Obtain the next list of fast sequences for this relation.
*
* Even in the case of no indexes, we need to update the fast
* sequences, since the table may contain indexes at some
* point of time.
*/
Assert(!ItemPointerIsValid(&aoInsertDesc->fsInfo->sequence_tid));
Assert(aoInsertDesc->fsInfo->segno == segfileinfo->segno);
/*
firstSequence =
GetFastSequences(aoInsertDesc->aoEntry->segrelid,
segfileinfo->segno,
aoInsertDesc->rowCount + 1,
NUM_FAST_SEQUENCES,
&aoInsertDesc->fsInfo->sequence_tid);
*/
firstSequence = aoInsertDesc->rowCount + 1;
aoInsertDesc->numSequences = NUM_FAST_SEQUENCES;
/* Set last_sequence value */
Assert(firstSequence > aoInsertDesc->rowCount);
aoInsertDesc->lastSequence = firstSequence - 1;
setupNextWriteBlock(aoInsertDesc);
/* Initialize the block directory. */
AppendOnlyBlockDirectory_Init_forInsert(
&(aoInsertDesc->blockDirectory),
aoentry,
aoInsertDesc->appendOnlyMetaDataSnapshot, // CONCERN: Safe to assume all block directory entries for segment are "covered" by same exclusive lock.
aoInsertDesc->fsInfo, aoInsertDesc->lastSequence,
rel, segfileinfo->segno, 1);
return aoInsertDesc;
}
/*
* appendonly_insert - insert tuple into a varblock
*
* Note the following major differences from heap_insert
*
* - wal is always bypassed here.
* - transaction information is of no interest.
* - tuples inserted into varblocks, not via the postgresql buf/page manager.
* - no need to pin buffers.
*
* The output parameter tupleOid is the OID assigned to the tuple (either here or by the
* caller), or InvalidOid if no OID. The header fields of *tup are updated
* to match the stored tuple;
*/
void
appendonly_insert(
AppendOnlyInsertDesc aoInsertDesc,
MemTuple instup,
Oid *tupleOid,
AOTupleId *aoTupleId)
{
Relation relation = aoInsertDesc->aoi_rel;
VarBlockByteLen itemLen;
uint8 *itemPtr;
MemTuple tup = NULL;
bool need_toast;
bool isLargeContent;
Assert(aoInsertDesc->usableBlockSize > 0 && aoInsertDesc->tempSpaceLen > 0);
Assert(aoInsertDesc->toast_tuple_threshold > 0 && aoInsertDesc->toast_tuple_target > 0);
Insist(RelationIsAoRows(relation));
if (relation->rd_rel->relhasoids)
{
/*
* If the object id of this tuple has already been assigned, trust the
* caller. There are a couple of ways this can happen. At initial db
* creation, the backend program sets oids for tuples. When we define
* an index, we set the oid. Finally, in the future, we may allow
* users to set their own object ids in order to support a persistent
* object store (objects need to contain pointers to one another).
*/
if (!OidIsValid(MemTupleGetOid(instup, aoInsertDesc->mt_bind)))
MemTupleSetOid(instup, aoInsertDesc->mt_bind, GetNewOid(relation));
}
else
{
/* check there is not space for an OID */
MemTupleNoOidSpace(instup);
}
if (aoInsertDesc->useNoToast)
need_toast = false;
else
need_toast = (MemTupleHasExternal(instup, aoInsertDesc->mt_bind) ||
memtuple_get_size(instup, aoInsertDesc->mt_bind) >
aoInsertDesc->toast_tuple_threshold);
/*
* If the new tuple is too big for storage or contains already toasted
* out-of-line attributes from some other relation, invoke the toaster.
*
* Note: below this point, tup is the data we actually intend to store
* into the relation; instup is the caller's original untoasted data.
*/
if (need_toast)
tup = (MemTuple) toast_insert_or_update(relation, (HeapTuple) instup,
NULL, aoInsertDesc->mt_bind,
aoInsertDesc->toast_tuple_target,
false /* errtbl is never AO */);
else
tup = instup;
/*
* MPP-7372: If the AO table was created before the fix for this issue, it may contain
* tuples with misaligned bindings. Here we check if the memtuple to be stored is
* problematic and then create a clone of the tuple with the old (misaligned) bindings
* to preserve consistency.
*/
if (!IsAOBlockAndMemtupleAlignmentFixed(aoInsertDesc->storageAttributes.version) &&
memtuple_has_misaligned_attribute(tup, aoInsertDesc->mt_bind))
{
/* Create a clone of the memtuple using misaligned bindings. */
MemTuple tuple = memtuple_aligned_clone(tup, aoInsertDesc->mt_bind, false /* downgrade */);
Assert(tuple);
if(tup != instup)
{
pfree(tup);
}
tup = tuple;
}
/*
* get space to insert our next item (tuple)
*/
itemLen = memtuple_get_size(tup, aoInsertDesc->mt_bind);
isLargeContent = false;
/*
* If we are at the limit for append-only storage header's row count,
* force this VarBlock to finish.
*/
if (VarBlockMakerItemCount(&aoInsertDesc->varBlockMaker) >= AOSmallContentHeader_MaxRowCount)
itemPtr = NULL;
else
itemPtr = VarBlockMakerGetNextItemPtr(&aoInsertDesc->varBlockMaker, itemLen);
/*
* If no more room to place items in the current varblock
* finish it and start inserting into the next one.
*/
if (itemPtr == NULL)
{
if (VarBlockMakerItemCount(&aoInsertDesc->varBlockMaker) == 0)
{
/*
* Case #1. The entire tuple cannot fit within a VarBlock. It is too large.
*/
if (aoInsertDesc->useNoToast)
{
/*
* Indicate we need to write the large tuple as a large content
* multiple-block set.
*/
isLargeContent = true;
}
else
{
/*
* Use a different errcontext when user input (tuple contents) cause the
* error.
*/
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("Item too long (check #1): length %d, maxBufferLen %d",
itemLen, aoInsertDesc->varBlockMaker.maxBufferLen),
errcontext_appendonly_insert_block_user_limit(aoInsertDesc)));
}
}
else
{
/*
* Write out the current VarBlock to make room.
*/
finishWriteBlock(aoInsertDesc);
Assert(aoInsertDesc->nonCompressedData == NULL);
Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));
/*
* Setup a new VarBlock.
*/
setupNextWriteBlock(aoInsertDesc);
itemPtr = VarBlockMakerGetNextItemPtr(&aoInsertDesc->varBlockMaker, itemLen);
if (itemPtr == NULL)
{
/*
* Case #2. The entire tuple cannot fit within a VarBlock. It is too large.
*/
if (aoInsertDesc->useNoToast)
{
/*
* Indicate we need to write the large tuple as a large content
* multiple-block set.
*/
isLargeContent = true;
}
else
{
/*
* Use a different errcontext when user input (tuple contents) cause the
* error.
*/
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("Item too long (check #2): length %d, maxBufferLen %d",
itemLen, aoInsertDesc->varBlockMaker.maxBufferLen),
errcontext_appendonly_insert_block_user_limit(aoInsertDesc)));
}
}
}
}
if (!isLargeContent)
{
/*
* We have room in the current VarBlock for the new tuple.
*/
Assert(itemPtr != NULL);
if (itemLen > 0)
memcpy(itemPtr, tup, itemLen);
}
else
{
/*
* Write the large tuple as a large content multiple-block set.
*/
Assert(itemPtr == NULL);
Assert(!need_toast);
Assert(instup == tup);
/*
* "Cancel" the last block allocation, if one.
*/
cancelLastBuffer(aoInsertDesc);
Assert(aoInsertDesc->nonCompressedData == NULL);
Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));
/*
* Write large content.
*/
AppendOnlyStorageWrite_Content(
&aoInsertDesc->storageWrite,
(uint8*)tup,
itemLen,
AoExecutorBlockKind_SingleRow,
/* rowCount */ 1);
Assert(aoInsertDesc->nonCompressedData == NULL);
Assert(!AppendOnlyStorageWrite_IsBufferAllocated(&aoInsertDesc->storageWrite));
setupNextWriteBlock(aoInsertDesc);
}
aoInsertDesc->insertCount++;
aoInsertDesc->lastSequence++;
if (aoInsertDesc->numSequences > 0)
(aoInsertDesc->numSequences)--;
Assert(aoInsertDesc->numSequences >= 0);
pgstat_count_heap_insert(relation);
*tupleOid = MemTupleGetOid(tup, aoInsertDesc->mt_bind);
AOTupleIdInit_Init(aoTupleId);
AOTupleIdInit_segmentFileNum(aoTupleId, aoInsertDesc->cur_segno);
AOTupleIdInit_rowNum(aoTupleId, aoInsertDesc->lastSequence);
/*
* If the allocated fast sequence numbers are used up, we request for
* a next list of fast sequence numbers.
*/
if (aoInsertDesc->numSequences == 0)
{
int64 firstSequence;
/*
* in hawq, catalog are in memory heap table,
* ItemPointer of tuple is invalid.
*/
if (Gp_role == GP_ROLE_EXECUTE)
{
/*
firstSequence = GetFastSequences(aoInsertDesc->aoEntry->segrelid,
aoInsertDesc->cur_segno, aoInsertDesc->lastSequence + 1,
NUM_FAST_SEQUENCES, &aoInsertDesc->fsInfo->sequence_tid);
*/
firstSequence = aoInsertDesc->lastSequence + 1;
} else {
firstSequence = GetFastSequencesByTid(
&aoInsertDesc->fsInfo->sequence_tid,
aoInsertDesc->lastSequence + 1, NUM_FAST_SEQUENCES);
}
Assert(firstSequence == aoInsertDesc->lastSequence + 1);
aoInsertDesc->numSequences = NUM_FAST_SEQUENCES;
}
if (Debug_appendonly_print_insert_tuple)
{
elog(LOG,"Append-only insert tuple for table '%s' "
"(AOTupleId %s, memtuple length %d, isLargeRow %s, block count " INT64_FORMAT ")",
NameStr(aoInsertDesc->aoi_rel->rd_rel->relname),
AOTupleIdToString(aoTupleId),
itemLen,
(isLargeContent ? "true" : "false"),
aoInsertDesc->bufferCount);
}
if(tup != instup)
pfree(tup);
}
/*
* appendonly_insert_finish
*
* when done inserting all the data via appendonly_insert() we need to call
* this function to flush all remaining data in the buffer into the file.
*/
void
appendonly_insert_finish(AppendOnlyInsertDesc aoInsertDesc)
{
/*
* Finish up that last varblock.
*/
finishWriteBlock(aoInsertDesc);
CloseWritableFileSeg(aoInsertDesc);
AppendOnlyBlockDirectory_End_forInsert(&(aoInsertDesc->blockDirectory));
AppendOnlyStorageWrite_FinishSession(&aoInsertDesc->storageWrite);
pfree(aoInsertDesc->aoEntry);
pfree(aoInsertDesc->title);
pfree(aoInsertDesc);
}
/*
* RelationGuessNumberOfBlocks
*
* Has the same meaning as RelationGetNumberOfBlocks for heap relations
* however uses an estimation since AO relations use variable len blocks
* which are meaningless to the optimizer.
*
* This function, in other words, answers the following question - "If
* I were a heap relation, about how many blocks would I have had?"
*/
BlockNumber
RelationGuessNumberOfBlocks(double totalbytes)
{
/* for now it's very simple */
return (BlockNumber)(totalbytes/BLCKSZ) + 1;
}
/*
* AppendOnlyStorageWrite_PadOutForSplit - padding zero to split boundary.
*/
void
AppendOnlyStorageWrite_PadOutForSplit(
AppendOnlyStorageWrite *storageWrite,
int32 varblocksize)
{
int64 nextWritePosition;
int64 nextBoundaryPosition;
int32 splitWriteRemainder;
bool crossSplitBoundary;
uint8 *buffer;
int32 safeWriteSplit = storageWrite->storageAttributes.splitsize;
nextWritePosition = BufferedAppendNextBufferPosition(&storageWrite->bufferedAppend);
crossSplitBoundary = ((nextWritePosition / safeWriteSplit) != ((nextWritePosition + varblocksize) / safeWriteSplit));
if (!crossSplitBoundary)
{
return;
}
nextBoundaryPosition = ((nextWritePosition + safeWriteSplit - 1) / safeWriteSplit) * safeWriteSplit;
splitWriteRemainder = (int32)(nextBoundaryPosition - nextWritePosition);
if (splitWriteRemainder <= 0)
{
return;
}
/*
* Get buffer of the remainder to pad.
*/
buffer = BufferedAppendGetBuffer(&storageWrite->bufferedAppend, splitWriteRemainder);
if (buffer == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("We do not expect files to have a maximum length")));
}
memset(buffer, 0, splitWriteRemainder);
BufferedAppendFinishBuffer(&storageWrite->bufferedAppend,
splitWriteRemainder,
splitWriteRemainder);
if (Debug_appendonly_print_insert)
{
elog(LOG, "Append-only insert zero padded splitWriteRemainder for table '%s' (nextWritePosition = " INT64_FORMAT ", splitWriteRemainder = %d)",
storageWrite->relationName,
nextBoundaryPosition,
splitWriteRemainder);
}
}