blob: 0da9aeebb547925c95fcb4257147106bfa954f8c [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: cluster.C
* Description: Methods for Cluster and Buckets of a hash join
*
*
*
* Created: 08/14/96
* Language: C++
*
*
*
*
******************************************************************************
*/
// begining of regular compilation
#include "cluster.h"
#ifndef __EID
#include "memorymonitor.h"
#endif
#include "ExStats.h"
#include "ComResourceInfo.h"
#include "logmxevent.h"
#include "SortError.h"
HashBufferHeader::HashBufferHeader()
: rowCount_(0),
bucketCount_(0)
{
};
/////////////////////////////////////////////////////////////////////////////
HashBuffer::HashBuffer (Cluster * cluster)
: cluster_(NULL),
bufferSize_(0),
maxRowLength_(0),
rows_(NULL),
maxNumFullRowsSer_(0),
freeSpace_(0),
currRow_(NULL),
nextAvailRow_(NULL),
next_(NULL),
prev_(NULL),
heap_(NULL),
data_(NULL)
{
ClusterDB * clusterDb = cluster->clusterDb_;
bufferSize_ = clusterDb->bufferSize_;
data_ = (char *)clusterDb->bufferHeap_->allocateAlignedHeapMemory((UInt32)bufferSize_, 512, FALSE);
if ( !data_ ) return; // memory allocation failed
// adjust memory usage statistics
clusterDb->availableMemory_ -= bufferSize_;
clusterDb->memoryUsed_ += bufferSize_;
if ( clusterDb->hashOperStats_ ) clusterDb->updateMemoryStats();
// if we did not see pressure yet, the maximum cluster
// size is as big as the total memory used
#ifndef __EID
if ( clusterDb->memMonitor_ && !clusterDb->sawPressure_ )
clusterDb->maxClusterSize_ = clusterDb->memoryUsed_;
#endif
// initialize this buffer
init(cluster);
if (clusterDb->bmoStats_)
clusterDb->bmoStats_->updateBMOHeapUsage((NAHeap *)clusterDb->collHeap());
};
#ifndef __EID
// A constructor for cases when the HashBuffer is used without a
// Cluster object.
// Currently, this is only used by the UniqueHashJoin (ExUniqueHashJoinTcb).
//
HashBuffer::HashBuffer(ULng32 bufferSize,
ULng32 rowSize,
NABoolean useVariableLength,
CollHeap *heap,
ClusterDB * clusterDb,
ExeErrorCode * rc)
: cluster_(NULL),
bufferSize_(bufferSize),
maxRowLength_(rowSize),
isVariableLength_(useVariableLength),
rows_(NULL),
maxNumFullRowsSer_(0),
freeSpace_(0),
currRow_(NULL),
nextAvailRow_(NULL),
next_(NULL),
prev_(NULL),
heap_(heap),
data_(NULL)
{
// assume success
*rc = EXE_OK;
data_ = (char *)((NAHeap *)heap)->allocateAlignedHeapMemory((UInt32)bufferSize, 512, FALSE);
if (!data_) {
*rc = EXE_NO_MEM_TO_EXEC;
return;
};
if ( clusterDb ) {
// adjust memory usage statistics
clusterDb->availableMemory_ -= bufferSize;
clusterDb->memoryUsed_ += bufferSize;
if ( clusterDb->hashOperStats_ ) clusterDb->updateMemoryStats();
if (clusterDb->bmoStats_)
clusterDb->bmoStats_->updateBMOHeapUsage((NAHeap *)clusterDb->collHeap());
}
// initialize this buffer
// Make sure that all rows in buffer are aligned.
//
maxRowLength_ = ROUND4(rowSize);
if(maxRowLength_ < sizeof(HashRow) + sizeof(UInt32)) {
isVariableLength_ = false;
}
// first row starts after the buffer header
rows_ = data_ + ROUND8(sizeof(HashBufferHeader));
nextAvailRow_ = rows_;
// The number of allocated rows from this buffer.
header_->setRowCount(0);
// Determine the number of rows that can be allocated from this buffer.
//
maxNumFullRowsSer_ = ((bufferSize_ - ROUND8(sizeof(HashBufferHeader)) - 8 )
/ maxRowLength_);
freeSpace_ = bufferSize_ - ROUND8(sizeof(HashBufferHeader)) -8;
};
#endif
HashBufferSerial::HashBufferSerial (Cluster * cluster)
: HashBuffer(cluster)
{};
#ifndef __EID
// A constructor for cases when the HashBuffer is used without a
// Cluster object.
// Currently, this is only used by the UniqueHashJoin (ExUniqueHashJoinTcb).
//
/*
HashBufferSerial::HashBufferSerial(ULng32 bufferSize,
ULng32 rowSize,
NABoolean useVariableLength,
CollHeap *heap,
ExeErrorCode * rc)
: HashBuffer(bufferSize, rowSize, useVariableLength, heap, rc)
{};
*/
#endif
/////////////////////////////////////////////////////////////////////////////
void HashBuffer::init(Cluster * cluster) {
cluster_ = cluster;
maxRowLength_ = ROUND4(cluster->rowLength_);
bufferSize_ = cluster_->clusterDb_->bufferSize_;
isVariableLength_ = cluster->useVariableLength_;
considerBufferDefrag_ = cluster->considerBufferDefrag_;
if(maxRowLength_ < sizeof(HashRow) + sizeof(UInt32)) {
isVariableLength_ = false;
considerBufferDefrag_ = FALSE;
}
// first row starts after the buffer header
rows_ = data_ + ROUND8(sizeof(HashBufferHeader));
nextAvailRow_ = rows_;
header_->setRowCount(0);
maxNumFullRowsSer_ = ((bufferSize_ - ROUND8(sizeof(HashBufferHeader)) - 8 )
/ maxRowLength_);
freeSpace_ = bufferSize_ - ROUND8(sizeof(HashBufferHeader)) - 8;
};
/////////////////////////////////////////////////////////////////////////////
HashBuffer::~HashBuffer() {
if (data_) {
if ( ! cluster_ ) {
heap_->deallocateMemory(data_); // LCOV_EXCL_LINE
return;
}
// NOTE: we do NOT ajust the memory usage statistics for the cluster,
// because totalClusterSize_ denotes the overall size of a cluster.
// this is later used in PHASE 3 do decide if we switch the role of
// the inner and the outer cluster and to determine if hash loops
// are required.
// But we adjust the overall availableMemory_, because it denotes
// the currently available main memory.
ClusterDB * clusterDb = cluster_->clusterDb_;
NAHeap *heap = clusterDb->bufferHeap_;
heap->deallocateHeapMemory(data_);
clusterDb->availableMemory_ += bufferSize_;
clusterDb->memoryUsed_ -= bufferSize_;
if (clusterDb->bmoStats_)
clusterDb->bmoStats_->updateBMOHeapUsage((NAHeap *)clusterDb->collHeap());
};
// unchain buffer from the list of buffers
if (cluster_) {
cluster_->bufferPool_ = next_;
cluster_->numInMemBuffers_-- ; // one less in-memory buffer
#ifdef DO_DEBUG
if ( ! next_ && ! cluster_->keepRecentBuffer_ ) { // DEBUG
char msg[256];
sprintf(msg,
"NULL bufferPool_ for (%lu %lu). recent %lu inMemBuffs %lu ",
(ULng32)cluster_ & 0x0FFF,
(ULng32)cluster_->clusterDb_ & 0x0FFF,
(ULng32)cluster_->keepRecentBuffer_ & 0x0FFF,
cluster_->numInMemBuffers_);
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL,0,msg,cluster_->clusterDb_->explainNodeId_);
}
//ex_assert( next_ || cluster_->numInMemBuffers_ == 0 || // ** DEBUG **
// cluster_->keepRecentBuffer_ && cluster_->numInMemBuffers_ == 1,
// "Buffer pool is empty, but not num-in-mem");
#endif
}
};
/////////////////////////////////////////////////////////////////////////////
// LCOV_EXCL_START
Bucket::Bucket() {
init();
};
// LCOV_EXCL_STOP
void Bucket::init() {
innerCluster_ = NULL;
outerCluster_ = NULL;
rowCount_ = 0;
};
/////////////////////////////////////////////////////////////////////////////
ClusterDB::ClusterDB(HashOperator hashOperator,
ULng32 bufferSize,
atp_struct * workAtp,
Lng32 explainNodeId,
short hashTableRowAtpIndex1,
short hashTableRowAtpIndex2,
ex_expr * searchExpr,
Bucket * buckets,
ULng32 bucketCount,
ULng32 availableMemory,
#ifndef __EID
MemoryMonitor * memMonitor,
short pressureThreshold,
ExExeStmtGlobals * stmtGlobals,
#endif
ExeErrorCode *rc,
NABoolean noOverFlow,
NABoolean isPartialGroupBy,
#ifndef __EID
unsigned short minBuffersToFlush,
ULng32 numInBatch,
UInt16 forceOverflowEvery,
UInt16 forceHashLoopAfterNumBuffers,
UInt16 forceClusterSplitAfterMB,
ExSubtask * ioEventHandler,
ex_tcb * callingTcb,
UInt16 scratchThresholdPct,
NABoolean doLog,
NABoolean bufferedWrites,
NABoolean disableCmpHintsOverflow,
ULng32 memoryQuotaMB,
ULng32 minMemoryQuotaMB,
ULng32 minMemBeforePressureCheck,
Float32 bmoCitizenshipFactor,
Int32 pMemoryContingencyMB,
Float32 estimateErrorPenalty,
Float32 hashMemEstInMbPerCpu,
#endif
ULng32 initialHashTableSize,
ExOperStats * hashOperStats
)
: hashOperator_(hashOperator),
bufferSize_(bufferSize),
bufferHeap_(NULL),
workAtp_(workAtp),
explainNodeId_(explainNodeId),
hashTableRowAtpIndex1_(hashTableRowAtpIndex1),
hashTableRowAtpIndex2_(hashTableRowAtpIndex2),
searchExpr_(searchExpr),
buckets_(buckets),
bucketCount_(bucketCount),
memoryUsed_(0),
#ifndef __EID
memMonitor_(memMonitor),
pressureThreshold_(pressureThreshold),
sawPressure_(FALSE),
stmtGlobals_(stmtGlobals),
#endif
hashLoop_(FALSE),
noOverFlow_(noOverFlow),
isPartialGroupBy_(isPartialGroupBy),
availableMemory_(availableMemory),
maxClusterSize_(0),
clusterToFlush_(NULL),
clusterToProbe_(NULL),
clusterToRead_(NULL),
clusterReturnRightRows_(NULL),
clusterList_(NULL),
#ifndef __EID
tempFile_(NULL),
minBuffersToFlush_(minBuffersToFlush),
minNumWriteOuterBatch_((UInt16)(numInBatch ? (numInBatch/100) % 100
: minBuffersToFlush)),
maxNumWriteOuterBatch_((UInt16)(numInBatch/10000)),
numReadOuterBatch_((UInt16)( numInBatch ? numInBatch % 100 :
minBuffersToFlush ) ),
forceOverflowEvery_(forceOverflowEvery),
forceOverflowCounter_(forceOverflowEvery), // initialize counter
forceHashLoopAfterNumBuffers_(forceHashLoopAfterNumBuffers),
forceClusterSplitAfterMB_(forceClusterSplitAfterMB),
ioEventHandler_(ioEventHandler),
callingTcb_(callingTcb),
scratchThresholdPct_(scratchThresholdPct),
sequenceGenerator_(0),
outerReadBuffers_(NULL),
doLog_(doLog),
bufferedWrites_(bufferedWrites),
disableCmpHintsOverflow_(disableCmpHintsOverflow),
memoryQuotaMB_(memoryQuotaMB),
minMemoryQuotaMB_(minMemoryQuotaMB),
minMemBeforePressureCheck_(minMemBeforePressureCheck),
bmoCitizenshipFactor_(bmoCitizenshipFactor),
pMemoryContingencyMB_(pMemoryContingencyMB),
estimateErrorPenalty_(estimateErrorPenalty),
hashMemEstInMbPerCpu_(hashMemEstInMbPerCpu),
totalPhase3TimeNoHL_(0),
maxPhase3Time_(0),
minPhase3Time_(0), // zero means not set yet!
numClustersNoHashLoop_(0),
totalIOCnt_(0),
earlyOverflowStarted_(FALSE),
bmoMaxMemThresholdMB_(0),
#endif
hashOperStats_(NULL),
bmoStats_(NULL),
initialHashTableSize_(initialHashTableSize),
scratchIOVectorSize_(0),
overFlowMode_(SCRATCH_DISK)
{
if (hashOperStats)
{
bmoStats_ = hashOperStats->castToExBMOStats();
if ( bmoStats_ && bmoStats_->statType() != ExOperStats::BMO_STATS)
hashOperStats_ = hashOperStats;
else
hashOperStats_ = NULL;
}
else
{
hashOperStats_ = NULL;
bmoStats_ = NULL;
}
// assume success
*rc = EXE_OK;
// set up the bufferHeap. The default is collHeap(), which is just
// the regular statement heap. We want this in DP2. If we are not running
// in DP2, we set up a seperate heap (see below)
bufferHeap_ = (NAHeap*)collHeap();
#ifndef __EID
// we are not running in DP2. Setup our own bufferHeap. We want at least
// 10 buffers in each block of this heap. Also add a few bytes to the buffer
// size to account for some memory management overhead.
bufferHeap_ = new(collHeap()) NAHeap("Buffer Heap",
bufferHeap_,
10 * ((Lng32)bufferSize_ + 20));
#endif
// These fields are used to ensure that #buckets and #hash-table-entries
// have no common prime factors (to make even use of the hash table entries)
evenFactor_ = 0 == bucketCount % 2 ;
primeFactor_ = bucketCount_ ;
// If needed (HJ), remove factor of buckets-per-cluster (always a power of 2)
if ( primeFactor_ ) // skip it for UHJ
while ( 0 == primeFactor_ % 2 ) primeFactor_ /= 2 ;
};
/////////////////////////////////////////////////////////////////////////////
ClusterDB::~ClusterDB() {
#ifndef __EID
// there is no point of checking pending I/O, as either there shouldn't
// be any pending I/O, or we should aband pending I/Os (by closing the
// file at delete) in case of error.
if (tempFile_) {
SortError *sortError = tempFile_->getSortError();
delete tempFile_; // this dtor does not delete the sort error
if ( sortError ) delete sortError;
tempFile_ = NULL;
};
// deallocate the read buffers
for ( HashBuffer *tmpB = outerReadBuffers_ ; tmpB; tmpB = outerReadBuffers_){
outerReadBuffers_ = outerReadBuffers_->getNext() ;
tmpB->deallocateData(collHeap()); // must deallocate before calling dtor
delete tmpB;
memoryUsed_ -= bufferSize_;
availableMemory_ += bufferSize_;
};
#endif
while (clusterList_) {
Cluster * p = clusterList_->next_;
Cluster * q;
if (clusterList_->isInner_)
q = clusterList_->buckets_->getOuterCluster();
else
q = clusterList_->buckets_->getInnerCluster();
if (q)
delete q;
delete clusterList_;
clusterList_ = p;
};
// if we allocated a seperate heap for buffers, get rid of it now
if (bufferHeap_ != collHeap())
delete bufferHeap_;
};
/////////////////////////////////////////////////////////////////////////////
#ifndef __EID
// LCOV_EXCL_START
// given the time it took some cluster to run phase 3, add to local stats
void ClusterDB::updatePhase3Time(Int64 someClusterTime)
{
totalPhase3TimeNoHL_ += someClusterTime;
if ( ! minPhase3Time_ || // first time
minPhase3Time_ > someClusterTime ) // found smaller time
minPhase3Time_ = someClusterTime ;
if ( maxPhase3Time_ < someClusterTime ) maxPhase3Time_ = someClusterTime ;
++numClustersNoHashLoop_;
}
// LCOV_EXCL_STOP
////// Q U O T A /////////
// Return all the memory quota allocation for this operator to the global pool
// (Not exactly "all"; keep at least minMemoryQuotaMB_ )
void ClusterDB::yieldAllMemoryQuota()
{
if ( memoryQuotaMB_ == 0 || memoryQuotaMB_ <= minMemoryQuotaMB_ ) return;
stmtGlobals_->yieldMemoryQuota( memoryQuotaMB_ - minMemoryQuotaMB_ );
if ( doLog_ ) { // LOG -- to show that memory was yielded
char msg[256];
sprintf(msg,
"YIELDED ALL MEMORY ALLOWED: %u MB (%u). Unused pool %u MB",
memoryQuotaMB_-minMemoryQuotaMB_,
0, // NA_64BIT, use instance id later
stmtGlobals_->unusedMemoryQuota() );
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, explainNodeId_);
}
memoryQuotaMB_ = minMemoryQuotaMB_ ;
}
////////
void ClusterDB::YieldQuota(UInt32 memNeeded)
{
// Convert to MegaBytes, upper ceiling
ULng32 memNeededMB = memNeeded / ONE_MEG ;
if ( memNeeded % ONE_MEG ) memNeededMB++ ; // one more
// Do not get below the minimum quota
memNeededMB = MAXOF(memNeededMB, minMemoryQuotaMB_);
// mem quota to yield (could be negative; e.g. with a big flushed cluster)
Lng32 memToYieldMB = (Lng32) memoryQuotaMB_ - (Lng32) memNeededMB ;
// if there is no memory to yield - then return
if ( memToYieldMB <= 1 ) return; // 1 MB - to avoid thrashing
stmtGlobals_->yieldMemoryQuota( memToYieldMB ); // Now yield
if ( doLog_ ) { // LOG -- to show that memory was yielded
char msg[256], msg1[64];
unsigned short id = (unsigned short)((ULong)this & 0x0FFF);
if ( memoryUsed_ ) sprintf(msg1,"Memory used %d, ",memoryUsed_);
else msg1[0] = (char) 0;
sprintf(msg, "%s YIELDED %d MB (%u). %s needed %u MB, unused pool %u",
hashOperator_ == HASH_GROUP_BY ? "HGB" :
hashOperator_ == SEQUENCE_OLAP ? "OLAP" : "HJ",
memToYieldMB, id, msg1, memNeededMB,
stmtGlobals_->unusedMemoryQuota());
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, explainNodeId_);
}
memoryQuotaMB_ = memNeededMB ; // adjust the new limit
}
// At the end of HJ phase 1, check quota to see if there's enough for (the
// outer clusters of) phase-2. If not then return TRUE (and some in-memory
// inner may get flushed to free some memory).
NABoolean ClusterDB::checkQuotaOrYield(UInt32 numFlushed, UInt32 maxSizeMB)
{
if ( memoryQuotaMB_ == 0 ) return FALSE; // memory quota not used
UInt32 sizeNeededOuter = bufferSize_ * numFlushed * minNumWriteOuterBatch_;
if ( sizeNeededOuter + memoryUsed_ > memoryQuotaMB_ * ONE_MEG )
return TRUE ; // need to flush another cluster to free some memory
UInt32 sizeDesiredOuter = bufferSize_ * numFlushed * maxNumWriteOuterBatch_;
UInt32 sizeNeededInPhase2 =
memoryUsed_ + MAXOF( sizeNeededOuter, sizeDesiredOuter );
UInt32 sizeNeededInPhase3 =
bufferSize_ * numReadOuterBatch_ +
ONE_MEG * (maxSizeMB + 1) ; // + 1 to round up
UInt32 memNeeded = MAXOF(sizeNeededInPhase3, sizeNeededInPhase2 );
YieldQuota(memNeeded);
return FALSE;
}
// Return some memory allocation of this operator to the global pool.
// HGB gives theOFList (HJ keeps OF clusters with the regular cluster list)
// Calculate memory needed - max of what is in memory, or the largest on
// disk plus those extra buffers - and yield the rest.
void ClusterDB::yieldUnusedMemoryQuota(Cluster * theOFList,
ULng32 extraBuffers )
{
if ( memoryQuotaMB_ == 0 ) return; // nothing to yield
// Need to keep the greater of what is currently used, or the largest
// flushed (inner) cluster plus a needed hash table
Int64 maxFlushedClusterSize = 0; // find max size of flushed cluster
ULng32 maxRowCount = 0;
// HGB uses a seperate list for the overflown clusters
Cluster * clusters = theOFList ? theOFList : clusterList_ ;
for (Cluster * cl = clusters ; cl ; cl = cl->getNext() ) {
if ( cl->getState() == Cluster::FLUSHED && cl->isInner_ ) {
if ( maxFlushedClusterSize < cl->clusterSize() ) {
maxFlushedClusterSize = cl->clusterSize() ;
maxRowCount = cl->getRowCount();
}
}
}
// add the size of the needed hash table
maxFlushedClusterSize += maxRowCount * sizeof(HashTableHeader) * 3 / 2;
// Needed is max: either what's in memory now, or the largest flushed cluster
ULng32 memNeeded
= MAXOF(memoryUsed_, (ULng32) maxFlushedClusterSize) ;
// add buffers (either one for the outer for HJ, or one per each new
// cluster for HGB)
memNeeded += extraBuffers * bufferSize_ ;
// Convert to MegaBytes, upper ceiling
ULng32 memNeededMB = memNeeded / ONE_MEG ;
if ( memNeeded % ONE_MEG ) memNeededMB++ ; // one more
// Do not get below the minimum quota
memNeededMB = MAXOF(memNeededMB, minMemoryQuotaMB_);
// mem quota to yield (could be negative; e.g. with a big flushed cluster)
Lng32 memToYieldMB = (Lng32) memoryQuotaMB_ - (Lng32) memNeededMB ;
// if there is no memory to yield - then return
if ( memToYieldMB <= 1 ) return; // 1 MB - to avoid thrashing
stmtGlobals_->yieldMemoryQuota( memToYieldMB ); // Now yield
if ( doLog_ ) { // LOG -- to show that memory was yielded
char msg[256], msg1[64];
if ( memoryUsed_ ) sprintf(msg1,"Memory used %d, ",memoryUsed_);
else msg1[0] = (char) 0;
sprintf(msg, "%s YIELDED %d MB (%u). %s needed %u MB, unused pool %u",
extraBuffers == 1 ? "HJ" : "HGB", memToYieldMB,
0, // NA_64BIT, use instance id later
msg1, memNeededMB,
stmtGlobals_->unusedMemoryQuota());
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, explainNodeId_);
}
memoryQuotaMB_ = memNeededMB ; // adjust the new limit
}
//
// Return a prime number equal or greater than the given (# clusters)
// (Used to avoid common prime factors with the hash table size)
// (Smallest value returned == 3)
//
static const ULng32 primes[70] = {
3 /*2*/, 3, 5, 7, 11, 13, 17, 19, 23, 29,
31, 37, 41, 43, 47, 53, 59, 61, 67, 71,
73, 79, 83, 89, 97, 101, 103, 107, 109, 113,
127, 131, 137, 139, 149, 151, 157, 163, 167, 173,
179, 181, 191, 193, 197, 199, 211, 223, 227, 229,
233, 239, 241, 251, 257, 263, 269, 271, 277, 281,
283, 293, 307, 311, 313, 317, 331, 337, 347, 349 };
ULng32 ClusterDB::roundUpToPrime(ULng32 noOfClusters)
{
ULng32 ind = noOfClusters / 5 ; // approximate (floor) index
if ( ind >= 70 ) return noOfClusters; // handle absurd case: #clusters >= 350
while ( primes[ind] < noOfClusters ) ind++ ; // find next prime
return primes[ind] ;
}
#endif
/////////////////////////////////////////////////////////////////////////////
// Perform checks to find if memory allocation of reqSize bytes is possible
// Return FALSE if allocation is not possible.
// Checks made:
// 1. Are we within the memory quota system limits.
// 2. Is the process' memory "crowded".
// 3. Is there a system memory pressure.
// 4. Do the hints from the compiler suggest we start overflow early ?
/////////////////////////////////////////////////////////////////////////////
NABoolean ClusterDB::enoughMemory(ULng32 reqSize, NABoolean checkCompilerHints)
{
#ifndef __EID
char msg[512]; // for logging messages
// For testing overflow only -- Simulate extreme memory conditions
// Force overflow after every "count" of requests
if ( forceOverflowEvery_ )
if ( ! --forceOverflowCounter_ ) {
sawPressure_ = TRUE;
forceOverflowCounter_ = forceOverflowEvery_; // reinitialize the counter
return FALSE;
}
// Check the forced memory limit
if (memoryQuotaMB_ && // a forced max memory allowed was set
// would we exceed the max memory allowed ?
memoryQuotaMB_ * ONE_MEG < memoryUsed_ + reqSize ) {
// calculate how much memory (in MB) is needed
UInt32 memNeededMB = 1 ; // usually a small request - just take 1 MB
if ( reqSize > ONE_MEG ) {
memNeededMB =
(memoryUsed_ + reqSize - memoryQuotaMB_ * ONE_MEG) / ONE_MEG ;
if ( reqSize % ONE_MEG ) memNeededMB++ ; // upper ceiling
}
// Try to increase the memory quota (from the global "pool") to meet need
if ( stmtGlobals_->grabMemoryQuotaIfAvailable(memNeededMB) ) {
memoryQuotaMB_ += memNeededMB ; // got it
// Even though we saw pressure before; having more memory now may mean
// we can have bigger inner clusters (useful only for HJ in phase 1)
if ( sawPressure_ && memoryQuotaMB_ * ONE_MEG > maxClusterSize_ )
maxClusterSize_ = memoryQuotaMB_ * ONE_MEG ;
if ( doLog_ && memNeededMB > 1 ) { // LOG -- only for more than a buffer
sprintf(msg,
"GRABBED %u MB (%u). Memory used %u, now allowed %u MB, request size %u, unused pool %u",
memNeededMB, 0, // NA_64BIT, use instance id later
memoryUsed_,
memoryQuotaMB_, reqSize,stmtGlobals_->unusedMemoryQuota() );
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, explainNodeId_);
}
}
else { // sorry - not enough memory
if ( ! sawPressure_ && doLog_ ) { // log first time quota overflow
sprintf(msg,
"QUOTA LIMIT OVERFLOW started. Total memory used %u, quota allowed %u MB",
memoryUsed_, memoryQuotaMB_ );
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, explainNodeId_);
}
sawPressure_ = TRUE;
return FALSE;
}
}
// Check if we are running out of address space or swap space.
// getUsage() would return TRUE if and only if memory gets crowded (i.e. we
// failed at least once to allocate a desired flat segment size, and the
// free memory is less than half the total memory allocated.)
size_t lastSegSize, freeSize, totalSize;
if ( collHeap()->getUsage(&lastSegSize, &freeSize, &totalSize) ) {
// Another safety check - are we using at least half of the memory in the
// last flat segment allocated ?
if (lastSegSize / 2 < memoryUsed_) {
if ( ! sawPressure_ && doLog_ ) { // log first time
sprintf(msg,
"USAGE OVERFLOW started. Memory used %u, last seg size %u, free size %u, total size %u",
memoryUsed_, (ULng32)lastSegSize,
(ULng32)freeSize, (ULng32)totalSize );
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, explainNodeId_);
}
sawPressure_ = TRUE;
return FALSE;
}
}
if (memMonitor_ && memoryUsed_ >= minMemBeforePressureCheck_ ) {
NABoolean pressure = (memMonitor_->memoryPressure() > pressureThreshold_);
// the first time we see pressure, we set the sawPressure flag.
// The total memory used at this point determines the maximum
// cluster size.
if (pressure) {
if ( ! sawPressure_ && doLog_ ) { // first time system memory pressure
sprintf(msg,
"PRESSURE OVERFLOW started. Total memory used %u",
memoryUsed_);
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, explainNodeId_);
}
sawPressure_ = TRUE;
}
if ( pressure ) return FALSE; // cannot allocate memory
if ( isPartialGroupBy_ ) return availableMemory_ >= reqSize ;
// -----------------------------------------------------------------
// Below - Check compiler hints to trigger a possible early overflow
// -----------------------------------------------------------------
// Can be disabled with the CQD EXE_BMO_DISABLE_CMP_HINTS_OVERFLOW
if ( ! disableCmpHintsOverflow_ ) {
/*
Compiler hints:
================
Compiler provides two hints to BMO operators.
1. bmoExpectedSize:
The compiler's estimate of total memory usage by the BMO operator
2. bmoEstErrorPenalty
The fraction penalty factor that can be used at runtime to adjust
the estimated bmo size as described below.
Explanation:
-------------
bmoEstErrorPenalty can also be viewed as uncertainty penalty factor.
The higher the uncertainty of expected memory consumption,
the better is to overflow early.
bmoEstErrorPenalty is derived by the following equation:
uncertainty = { [max potential cardinality - expected cardinality] /
expected cardinality } * 100%
bmoEstErrorPenalty is set to 10% if uncertainty <= 100%,
else bmogrowthpercent is set at 25%.
Executor usage:
---------------
On Seaquest, operators overflow once memory quota is reached or
physical memory pressure is detected. Memory pressure is currently
disabled but eventually memory pressure detection mechanisms would be
enabled.
Memory quota limits and memory pressure checks do help enforce limits,
however BMO operators could utilize compiler hints to trigger overflow
and behave as a good citizen even before quota limits and memory
pressure enforcements kick in.
BMO operators often consume small chunks of memory or buffer, in
repeated cycles until memory quota limit is reached. Compiler hints
would be exercised each time a buffer allocation is required. However
the size of buffer allocation is not relevant in the calculation below.
C = Total consumed memory by BMO operator. Initially this is zero.
E = Total estimated size of memory that will be consumed by BMO.
This is initially the bmoExpectedSize hint given by compiler,
but adjusted if C started exceeding the estimate:
E = max(bmoExpectedSize, C * (1+bmoEstErrorPenalty) )
This allow us to modify the estimate at runtime after receiving
more data and penalize operators that have the chance of being much
bigger than initially thought. Note, it's possible for the runtime
logic to increase the value of bmoEstErrorPenalty on its own after
certain threshold (e.g C > 10*bmoExpectedSize) but bmoEstErrorPenalty
should not exceed 1 i.e. 100%.
Z = CitizenshipFactor which is a parameter representing how much of
the available physical memory an operator can assume for itself
as compared to leaving it to other potential players. It can have
the value of 1>= Z > 0. While we can suggest an initial value
e.g. 0.5 we think this is better to set after tuning based on
some performance tests. In the future this may also be adjusted
as input from WMS based on existing concurrency on the system.
M = Available physical memory at that instant.
U = Contingency physical memory (e.g. 10% of Phys memory in the node.
Point it to keep always phys memory available for executables,
buffers, non-BMO ops, etc)
m = [E- C] or in other words the estimated delta memory required
by BMO to complete processing.
If ( m < Z * (M-U)) then continue memory allocation else overflow.
*/
// free physical memory in MB
Float32 M = (Float32) memMonitor_->availablePhyMemKb()/1024;
// consumed memory in MB
Float32 C = memoryUsed_ / ONE_MEG;
//U : minimum percent free physical memory to be spared for other players
//on the node. Could be other processes, etc.
Float32 U = pMemoryContingencyMB_ ;
// Z: percent free physical memory to assume for myself and thereby
// allowing reminaing free space for other operators in my esp process.
// WMS could set this based on concurrency, there by it can increase
// the desity of BMO operators residing on the node.
Float32 Z = bmoCitizenshipFactor_ ;
Float32 m = 0; //delta memory required to avoid overflow.
// do the following check if HJ still in phase 1.
if ( checkCompilerHints )
{
Float32 E = hashMemEstInMbPerCpu_ ; //expected memory consumption
#ifdef FUTURE_WORK
//check extreme case first. Expected cannot be more than
//available quota.
if( memoryQuotaMB_ && E > memoryQuotaMB_ )
{
if ( ! earlyOverflowStarted_ && doLog_ ) { // log first time
sprintf(msg,
"Estimate %ld MB exceeded quota %ld MB: OVERFLOW started. Total memory used %lu",
(Lng32) E, (Lng32) memoryQuotaMB_,
memoryUsed_);
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, explainNodeId_);
}
sawPressure_ = TRUE;
earlyOverflowStarted_ = TRUE;
return FALSE;
}
#endif
//general comments pasted above for explanation of this field.
Float32 estimateErrorPenalty = estimateErrorPenalty_ ;
/*
adjust expected memory to higher than consumed so that a target
expected memory is readjusted based on estimateErrorPenalty.
estimateErrorPenalty is high if confidance level of E is low and
vice versa. This in essence creates large delta memory requirement
for sort, which in essence may trigger overflow.
*/
if ( C > E ) // consumed memory exceeded the expected -- adjust E
{
E = C * ( 1 + estimateErrorPenalty ) ;
hashMemEstInMbPerCpu_ = E ;
}
Float32 m = E - C; //delta memory required to avoid overflow.
// if delta memory required is more than physical memory available
// then overflow.
if( m > ( Z * (M -U))) {
if ( ! earlyOverflowStarted_ && doLog_ ) { // log first time
sprintf(msg,
"Hash encountered memory pressure [m > (Z*(M-U))]: memReq m=%f, memFree M=%f, "
"memExpected E=%f, memConsumed C=%f, estimateErrorPenalty e=%f,"
"memContingency U=%f, citizenshipFactor Z=%f", m,M,E,C,estimateErrorPenalty,U,Z);
SQLMXLoggingArea::logExecRtInfo(NULL, 0,msg, explainNodeId_);
}
sawPressure_ = TRUE;
earlyOverflowStarted_ = TRUE;
return FALSE;
}
} // if ( checkCompilerHints )
else
{
// if HJ is in phase 2, memory checks done here is purely
// memory requirements against available physical memory.
m = (Float32)reqSize/ONE_MEG;
if( m > ( Z * (M - U))) {
if ( doLog_ ) {
sprintf(msg,
"HJ phase 2/3 encountered memory pressure [m > (Z*(M-U))]: memReq m=%f, memFree M=%f, memContingency U=%f, citizenshipFactor Z=%f", m,M,U,Z);
SQLMXLoggingArea::logExecRtInfo(NULL, 0,msg, explainNodeId_);
}
return FALSE;
}
} // else -- if ( checkCompilerHints )
} // if ( ! disableCmpHintsOverflow_ )
// ---------------------------------------------------------------
// End of hints' check
// ---------------------------------------------------------------
//The following checks any threshold limits set by the user. This
//check is static in nature and directly controlled by cqds
//SSD_BMO_MAX_MEM_THRESHOLD_MB if SSD disk type is in use OR
//EXE_MEMORY_AVAILABLE_IN_MB if disk type is in use.
if( (memoryUsed_ / ONE_MEG) >= bmoMaxMemThresholdMB_)
return FALSE;
} // if (memMonitor_ && memoryUsed_ >= minMemBeforePressureCheck_ )
#endif // not EID
// Always return TRUE, but for Partial HGB there's one more check
return (!isPartialGroupBy_ || availableMemory_ >= reqSize);
};
/////////////////////////////////////////////////////////////////////////////
#ifndef __EID
// A superset of setClusterToRead, for an outer cluster.
// Also allocates the global list of buffers (in the first time) and
// initializes the outer cluster to start reading more
NABoolean ClusterDB::setOuterClusterToRead(Cluster * oCluster,
ExeErrorCode * rc)
{
* rc = EXE_OK;
// In case oCluster has its own hash buffers allocated, remove them
oCluster->releaseAllHashBuffers();
// when called first time for this ClusterDB, allocate the buffer list
if ( NULL == outerReadBuffers_ ) {
// Allocate a list of hash buffers to be kept at the ClusterDB
for ( Int32 bnum = 0; bnum < numReadOuterBatch_; bnum++ ) {
HashBuffer * newBuffer = NULL;
newBuffer = new(collHeap(), FALSE)
HashBuffer(bufferSize_, oCluster->rowLength_, oCluster->useVariableLength_, collHeap(), this, rc );
// if buffer allocation failed
if ( !newBuffer || *rc ) {
if (newBuffer) delete newBuffer;
if ( ! bnum ) { // we must get at least one buffer, else fail
*rc = EXE_NO_MEM_TO_EXEC;
return TRUE;
};
break; // we'd have a shorter list
}; // if failed
memoryUsed_ += bufferSize_; // the above ctor does not add
availableMemory_ -= bufferSize_;
// chain buffer into the list
newBuffer->setNext(outerReadBuffers_) ;
outerReadBuffers_ = newBuffer ;
}; // for bnum
}; // buffer list is allocated
// Mark each buffer as having zero rows, in case we finish reading in the
// middle of the buffer list
for ( HashBuffer * tmpB = outerReadBuffers_; tmpB ; tmpB = tmpB->getNext() )
tmpB->castToSerial()->clearRowCount();
// start reading into the first buffer in the list.
oCluster->nextBufferToRead_ = outerReadBuffers_ ;
oCluster->completeCurrentRead_ = FALSE;
oCluster->buffersRead_ = 0;
setClusterToRead(oCluster);
return FALSE;
};
////////////////////////////////////////////////////////////////////////
ClusterBitMap::ClusterBitMap(ULng32 size, ExeErrorCode * rc)
: size_(size) {
// assume success
*rc = EXE_OK;
ULng32 charCount = size_/8;
if (size_ % 8)
charCount++;
bitMap_ = (char *) collHeap()->allocateMemory((size_t)charCount,FALSE);
if (!bitMap_) {
*rc = EXE_NO_MEM_TO_EXEC;
return;
};
// initialize bitMap to zero
for (ULng32 i = 0; i < charCount; i++)
bitMap_[i] = 0;
};
ClusterBitMap::~ClusterBitMap() {
if (bitMap_)
collHeap()->deallocateMemory((void *)bitMap_);
};
void ClusterBitMap::setBit(ULng32 bitIndex) {
ex_assert ((bitIndex < size_), "ClusterBitMap::setBit() ot of bounds");
char bitMask = (char) (1 << (bitIndex & 7));
bitMap_[bitIndex >> 3] |= bitMask;
};
NABoolean ClusterBitMap::testBit(ULng32 bitIndex) {
ex_assert ((bitIndex < size_), "ClusterBitMap::testBit() ot of bounds");
char bitMask = (char) (1 << (bitIndex & 7));
return (NABoolean)(bitMap_[bitIndex >> 3] & bitMask);
};
#endif
/////////////////////////////////////////////////////////////////////////////
// Internal utility: Calculate memory and create a hash table
HashTable * Cluster::createHashTable(UInt32 memForHashTable,
ExeErrorCode * rc,
NABoolean checkMemory // enough avail ?
)
{
ex_assert(!hashTable_, "hash table exists already");
// do we have enough memory for this hash table ?
if ( checkMemory &&
!clusterDb_->enoughMemory(memForHashTable) ) return NULL;
UInt32 hashTableEntryCount = memForHashTable / sizeof(HashTableHeader);
NABoolean doResize = clusterDb_->hashOperator_ == ClusterDB::HASH_GROUP_BY ;
NABoolean noHVDups =
// only real HJ enforces no-Hash-Value-duplicates in the Hash Table chains
// HGB has no value duplicates, and X-product uses a single chain
! ( clusterDb_->hashOperator_ == ClusterDB::HASH_GROUP_BY ||
clusterDb_->hashOperator_ == ClusterDB::CROSS_PRODUCT ) ;
//
// create the hash table
//
hashTable_ = new(collHeap(), FALSE)
HashTable(hashTableEntryCount,
clusterDb_->evenFactor_, // can #entries be even ?
clusterDb_->primeFactor_, // #entries not divisible by this PF
noHVDups,
doResize );
// If ctor failed, or could not even get a table of MIN_HEADER_COUNT
// then we return (with error code in case we ignore memory pressure,
// otherwise no error as we would overflow to rectify the situation.)
if ( ! checkMemory )
*rc = EXE_NO_MEM_TO_EXEC; // just in case we return below
if ( !hashTable_ ) return NULL;
else if ( hashTable_->noTableAllocated() ) {
delete hashTable_;
hashTable_ = NULL;
return NULL;
}
// we got a new hash table. The constructor of HashTable might
// have adjusted the size of the table. Report the memory usage
clusterDb_->memoryUsed_ +=
(hashTable_->getHeaderCount() * sizeof(HashTableHeader));
if ( clusterDb_->hashOperStats_ ) clusterDb_->updateMemoryStats();
*rc = EXE_OK; // no error
if (clusterDb_->bmoStats_)
clusterDb_->bmoStats_->updateBMOHeapUsage((NAHeap *)clusterDb_->collHeap());
return hashTable_;
}
Cluster::Cluster(ClusterState state,
ClusterDB * clusterDb,
Bucket * buckets,
ULng32 bucketCount,
ULng32 rowLength,
NABoolean useVariableLength,
NABoolean considerBufferDefrag,
short insertAtpIndex,
NABoolean isInner,
NABoolean bitMapEnable,
Cluster * next,
ExeErrorCode * rc,
HashBuffer * bufferPool)
: state_(state),
clusterDb_(clusterDb),
buckets_(buckets),
bucketCount_(bucketCount),
rowLength_(rowLength),
useVariableLength_(useVariableLength),
considerBufferDefrag_(considerBufferDefrag),
insertAtpIndex_(insertAtpIndex),
isInner_(isInner),
bitMapEnable_(bitMapEnable),
next_(next),
ioPending_(FALSE),
totalClusterSize_(0),
#ifndef __EID
outerBitMap_(NULL),
tempFile_(NULL),
readHandle_(NULL),
completeCurrentRead_(FALSE),
buffersRead_(0),
#endif
hashTable_(NULL),
rowCount_(0),
readCount_(0),
writeIOCnt_(0),
readIOCnt_(0),
bufferPool_(bufferPool),
numInMemBuffers_(0),
scanPosition_(NULL)
#ifndef __EID
,returnOverflowRows_(FALSE)
,batchCountDown_(clusterDb->numReadOuterBatch_)
,lastSqueezedBuffer_(NULL) // no buffer squeezed yet
,dontSplit_(FALSE)
,nextBufferToFlush_(NULL)
,firstBufferInList_(NULL)
,afterLastBufferToFlush_(NULL)
,nextBufferToRead_(NULL)
,afterLastBufferToRead_(NULL)
,numLoops_(0)
,startTimePhase3_(0)
,keepRecentBuffer_(FALSE)
,flushMe_(FALSE)
#endif
,defragBuffer_(NULL)
{
// assume success
*rc = EXE_OK;
if (bufferPool_) {
// reset the given buffer to this cluster
bufferPool_->init(this);
// the cluster got a buffer. Thus, it used memory
totalClusterSize_ += clusterDb_->bufferSize_;
numInMemBuffers_++ ;
};
// set up pointers from buckets to clusters
ULng32 i = 0;
for (; i < bucketCount_; i++)
if (isInner) {
// determine the rowCount_. For the initial clusters this will
// be 0. But in case of a cluster split, we have to sum up
// the rows in the corresponding buckets. We do this only for
// inner clusters. New outer clusters are always empty.
buckets_[i].setInnerCluster(this);
rowCount_ += buckets_[i].getRowCount();
}
else
buckets_[i].setOuterCluster(this);
// in case of the partial group by we allocate one buffer, to
// make sure we can group at least a few rows. If we can't
// allocate this one buffer, we give up. In all other cases, the
// buffer is only allocated, if it really gets a row. Note, that
// in case of a partial group by we always have only one cluster
// and therefore allocate only one buffer.
if (clusterDb_->isPartialGroupBy_) {
HashBuffer * newBuffer = NULL;
newBuffer = new(collHeap(), FALSE) HashBuffer(this);
if (!newBuffer || !newBuffer->getDataPointer() ) {
if (newBuffer)
delete newBuffer;
*rc = EXE_NO_MEM_TO_EXEC;
return;
};
bufferPool_ = newBuffer;
totalClusterSize_ += clusterDb_->bufferSize_;
numInMemBuffers_++;
};
// if the initial state is CHAINED (i.e. hash groupby), then allocate a
// hash table of the initial size (to be resized later, as needed).
if (state_ == CHAINED) {
hashTable_ = createHashTable(clusterDb_->initialHashTableSize_, rc);
if (!hashTable_) return;
};
#ifndef __EID
// set up a unique sequence ID for this cluster to use for overflow
maxSeqIDIndex_ = seqIDIndex_ = 0 ;
seqID_[seqIDIndex_] = clusterDb_->generateSequenceID();
seqID_[1] = seqID_[2] = 0 ; // 0 is an invalid seq value
#endif
if (considerBufferDefrag_)
{
defragBuffer_ = (char *)clusterDb->bufferHeap_->allocateAlignedHeapMemory((UInt32)rowLength_, 512, FALSE);
}
}; // Cluster::Cluster()
/////////////////////////////////////////////////////////////////////////////
Cluster::~Cluster() {
releaseAllHashBuffers();
#ifndef __EID
if (outerBitMap_) {
delete outerBitMap_;
outerBitMap_ = NULL;
};
if ( readHandle_ ) delete readHandle_;
#endif
removeHashTable();
// remove references to this cluster from the buckets
if ( buckets_ )
for (ULng32 i = 0; i < bucketCount_; i++)
if (isInner_)
buckets_[i].setInnerCluster(NULL);
else
buckets_[i].setOuterCluster(NULL);
if (defragBuffer_)
{
NAHeap *heap = clusterDb_->bufferHeap_;
heap->deallocateHeapMemory(defragBuffer_);
}
defragBuffer_ = NULL;
}
/////////////////////////////////////////////////////////////////////////////
void Cluster::removeHashTable() {
if (hashTable_) {
clusterDb_->memoryUsed_ -=
(hashTable_->getHeaderCount() * sizeof(HashTableHeader));
delete hashTable_;
hashTable_ = NULL;
};
if (clusterDb_->bmoStats_)
clusterDb_->bmoStats_->updateBMOHeapUsage((NAHeap *)clusterDb_->collHeap());
};
/////////////////////////////////////////////////////////////////////////////
#ifndef __EID
// Decide which cluster to flush: Find the FLUSHED cluster with the max
// #buffers, and use that one if that #buffers > minimum ; else pick one of
// the non-flushed clusters, and if none -- use the "last resort" cluster
void ClusterDB::chooseClusterToFlush(Cluster * lastResort)
{
clusterToFlush_ = NULL ;
Cluster * maxBuffsFlushedCluster = NULL;
Cluster * anyNonFlushedCluster = NULL;
ULng32 maxBuffers = 0;
NABoolean outersOnly = ! lastResort->isInner_ ; // HJ in phase 2
unsigned short minNumBuffersToFlush =
outersOnly ? minNumWriteOuterBatch_ : minBuffersToFlush_ ;
// search thru all clusters, find the flushed cluster with max buffers
// (in case a "flush me" cluster is found, just return this one)
for ( Cluster * currCluster = clusterList_ ;
currCluster ;
currCluster = currCluster->next_ ) {
// When HJ in phase 2 -- only select outer clusters
if ( outersOnly && currCluster->isInner_ ) continue;
// Skip clusters with fewer (full) buffers than the minimum
// (We use "<=" below because the last buffer is typically not full)
if ( currCluster->numInMemBuffers_ <= minNumBuffersToFlush ) continue;
if ( currCluster->flushMe_ ) { // higher priority
currCluster->flushMe_ = FALSE;
clusterToFlush_ = currCluster ;
break;
}
if ( currCluster->state_ == Cluster::FLUSHED ) {
if ( maxBuffers < currCluster->numInMemBuffers_ ) {
maxBuffsFlushedCluster = currCluster;
maxBuffers = currCluster->numInMemBuffers_ ;
}
else // lastResort gets some priority (as its recent buffer is full)
// (when it is flushed, and has the min # buffers, and 1 <= the max)
if ( lastResort == currCluster &&
lastResort->numInMemBuffers_ + 1 >= maxBuffers )
maxBuffsFlushedCluster = lastResort;
}
else // pick a "random" non flushed cluster
anyNonFlushedCluster = currCluster ;
}
if ( ! clusterToFlush_ )
clusterToFlush_ =
maxBuffsFlushedCluster ? maxBuffsFlushedCluster :
anyNonFlushedCluster ? anyNonFlushedCluster :
lastResort ;
// Keep most recent buffer if not full
if ( clusterToFlush_->bufferPool_->castToSerial()->notFull() ) {
clusterToFlush_->keepRecentBuffer_ = clusterToFlush_->bufferPool_ ;
// start flushing from the second most recent buffer
clusterToFlush_->bufferPool_ = clusterToFlush_->bufferPool_->getNext();
// the buffer to keep would become the new buffer list
clusterToFlush_->keepRecentBuffer_->setNext(NULL);
}
else // in case it is not NULL from a previous flush
clusterToFlush_->keepRecentBuffer_ = NULL;
// start with the first (also most recent) buffer in chain
clusterToFlush_->nextBufferToFlush_ = clusterToFlush_->bufferPool_ ;
#ifdef DO_DEBUG
if ( doLog_ && clusterToFlush_->seqIDIndex_ > 0 ) { // DEBUG
char msg[256];
sprintf(msg,
"chooseClusterToFlush() (%lu %lu). next buff %lu inMemBuffs %lu ",
(ULng32)clusterToFlush_ & 0x0FFF,
(ULng32)this & 0x0FFF,
(ULng32)clusterToFlush_->nextBufferToFlush_ & 0x0FFF,
clusterToFlush_->numInMemBuffers_);
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL,0,msg,explainNodeId_);
}
#endif
}
#endif
/////////////////////////////////////////////////////////////////////////////
NABoolean Cluster::insert(atp_struct * newEntry,
ex_expr * moveExpr,
SimpleHashValue hashValue,
ExeErrorCode * rc,
NABoolean skipMemoryCheck) {
// assume success
*rc = EXE_OK;
HashRow *dataPointer = NULL;
// make sure that we have a buffer
if (!bufferPool_) {
// no buffer means, we don't have rows yet
ex_assert(!rowCount_,
"cluster without a buffer has rows");
HashBuffer * newBuffer = new(collHeap(), FALSE) HashBuffer(this);
if ( !newBuffer || !newBuffer->getDataPointer() ) {
// this is the first buffer for this cluster. If we couldn't
// allocate it, we give up
if (newBuffer)
delete newBuffer;
*rc = EXE_NO_MEM_TO_EXEC;
return FALSE;
};
bufferPool_ = newBuffer;
numInMemBuffers_ = 1; // the first in-memory buffer
totalClusterSize_ += clusterDb_->bufferSize_;
};
#ifndef __EID
lastDataPointer_ = NULL;
#endif
NABoolean defragmented = FALSE;
// allocate space for the new row in the buffer pool
if ((dataPointer = bufferPool_->castToSerial()->getFreeRow()) == NULL)
{
if (defragBuffer_)
{
#if defined(_DEBUG)
assert(defragBuffer_);
#endif
atp_struct * workAtp = clusterDb_->workAtp_;
workAtp->getTupp(insertAtpIndex_).setDataPointer(defragBuffer_);
UInt32 maxDataLen = bufferPool_->getMaxRowLength() - sizeof(HashRow);
UInt32 rowLen = maxDataLen;
UInt32 *rowLenPtr = &rowLen;
if (moveExpr)
{
if(moveExpr->eval(newEntry, workAtp, 0, -1, rowLenPtr) == ex_expr::EXPR_ERROR)
{
*rc = (ExeErrorCode)(-newEntry->getDiagsArea()->mainSQLCODE());
return FALSE;
}
}
if(rowLen != maxDataLen)
{
UInt32 len = ROUND4(rowLen) + sizeof(HashRow);
if ((dataPointer = bufferPool_->castToSerial()->getFreeRow(len)) != NULL)
{
// we got a row
defragmented = TRUE;
char *rowPointer = dataPointer->getData();
//set work atp (insertAtpIndex_) data address -- needed for hash group by
workAtp->getTupp(insertAtpIndex_).setDataPointer(rowPointer);
str_cpy_all(rowPointer,
defragBuffer_,
rowLen);
//rows are variable-- set row length
bufferPool_->castToSerial()->setRowLength(dataPointer, rowLen);
#if (defined (NA_LINUX) && defined(_DEBUG) && !defined(__EID))
char txt[] = "Cluster::insert";
sql_buffer_pool::logDefragInfo(txt,bufferPool_->getMaxRowLength(),
ROUND4(rowLen) + sizeof(HashRow),
bufferPool_->getFreeSpace(),
bufferPool_,
bufferPool_->getRowCount());
#endif
}
}
}
}
if (dataPointer == NULL)
{
// old buffer is full -- allocate a new buffer
if ( skipMemoryCheck || // first check the memory
clusterDb_->enoughMemory(clusterDb_->bufferSize_, isInner_) ) {
// either we generally do not overflow, or
// just for this row we should avoid overflow, or
// we have enough memory to allocate another buffer,
// so create a new buffer and chain it into the bufferlist
HashBuffer * newBuffer = new(collHeap(), FALSE) HashBuffer(this);
if ( !newBuffer || !newBuffer->getDataPointer() ) {
if (skipMemoryCheck) {
// we skipped memory pressure check (e.g., due to no-overflow)
// and we did not get a new buffer, then we must fail
*rc = EXE_NO_MEM_TO_EXEC;
return FALSE;
};
// reset rc in case it was set in the constructor of HashBuffer
*rc = EXE_OK;
#ifndef __EID
// set saw pressure as we indeed can not allocate memory
// fix for CR 10-071012-6254
clusterDb_->sawPressure_ = TRUE;
// our estimate for available memory was wrong. We don't
// have memory left. Select a cluster for flushing and return
// if we are in DP2 and it is a partial group by, DP2 starts
// returning partial groups
if (!clusterDb_->isPartialGroupBy_)
clusterDb_->chooseClusterToFlush(this);
#endif
return FALSE;
};
totalClusterSize_ += clusterDb_->bufferSize_;
numInMemBuffers_++;
// we got a new buffer. Chain it into the list of buffers
newBuffer->setNext(bufferPool_);
bufferPool_ = newBuffer;
// allocate space for new row
dataPointer = bufferPool_->castToSerial()->getFreeRow();
}
else {
// not enough memory avaliable. Select a cluster
// for flushing
#ifndef __EID
if (!clusterDb_->isPartialGroupBy_)
clusterDb_->chooseClusterToFlush(this);
#endif
return FALSE;
};
};
if (!defragmented)
{
char *rowPointer = dataPointer->getData();
// now we have a slot in a buffer and dataPointer points to it
// move input row to pool
atp_struct * workAtp = clusterDb_->workAtp_;
workAtp->getTupp(insertAtpIndex_).setDataPointer(rowPointer);
UInt32 maxDataLen = bufferPool_->getMaxRowLength() - sizeof(HashRow);
UInt32 rowLen = maxDataLen;
UInt32 *rowLenPtr = &rowLen;
if (moveExpr) {
if(moveExpr->eval(newEntry, workAtp, 0, -1, rowLenPtr) == ex_expr::EXPR_ERROR)
{
*rc = (ExeErrorCode)(-newEntry->getDiagsArea()->mainSQLCODE());
return FALSE;
}
}
if(bufferPool_->isVariableLength()) {
bufferPool_->castToSerial()->setRowLength(dataPointer, rowLen);
if(rowLen != maxDataLen) {
bufferPool_->castToSerial()->resizeLastRow(rowLen, dataPointer);
}
}
}
#ifndef __EID
lastDataPointer_ = dataPointer; // keep location of most recent row
#endif
// set the hash value in the row header
dataPointer->setHashValueRaw(hashValue);
// SimpleHashValue * headerHashValue = (SimpleHashValue *)dataPointer;
// * headerHashValue = hashValue;
// For hash-groupby, insert the row into the hash chain.
if ( clusterDb_->hashOperator_ == ClusterDB::HASH_GROUP_BY ) {
NABoolean resizeNeeded;
// We know that we don't have duplicates (each group is unique
// with respect to the grouping (hash) columns. Just insert
// the row at the beginning of the chain.
resizeNeeded = hashTable_->insert(dataPointer);
if ( resizeNeeded ) {
NABoolean enoughMem =
clusterDb_->enoughMemory(hashTable_->resizeTo());
ULng32 memAdded = hashTable_->resize( enoughMem );
// if Hash-Table was resized up, then update memory use
clusterDb_->memoryUsed_ += memAdded ;
#ifndef __EID
// if could not create a new HT, better pick this cluster for a flush
if ( ! memAdded ) flushMe_ = TRUE;
#endif
}
}
rowCount_++;
return TRUE;
}
/////////////////////////////////////////////////////////////////////////////
#ifndef __EID
// initialize the cluster's scratch file and read handler (and the clusterDB
// global scratch file, if needed). Return TRUE if error.
NABoolean Cluster::initScratch(ExeErrorCode * rc)
{
// First time -- need to create the scratch space for all the clusters
if ( ! clusterDb_->tempFile_ ) {
ExExeStmtGlobals * stmtGlobals = clusterDb_->stmtGlobals_ ;
const ExScratchFileOptions *sfo = stmtGlobals->getScratchFileOptions();
Lng32 numOfInstances = stmtGlobals->getNumOfInstances();
Lng32 myInstanceNumber = stmtGlobals->getMyInstanceNumber();
SortError * sortError = new(collHeap(), FALSE) SortError();
HashScratchSpace * tempFile = new(collHeap(), FALSE)
HashScratchSpace(collHeap(),
sortError,
clusterDb_->explainNodeId_,
clusterDb_->bufferSize_,
clusterDb_->doLog_,
sfo->getScratchMgmtOption());
if ( ! tempFile ){
*rc = EXE_NO_MEM_TO_EXEC;
return TRUE;
};
if (sortError->getSortError() != 0) {
*rc = EXE_SORT_ERROR;
return TRUE;
};
tempFile->setScratchDirListSpec(sfo->getSpecifiedScratchDirs());
tempFile->setNumDirsSpec(sfo->getNumSpecifiedDirs());
tempFile->setNumEsps(numOfInstances);
tempFile->setEspInstance(myInstanceNumber);
tempFile->setPreallocateExtents(sfo->getScratchPreallocateExtents());
tempFile->setScratchMaxOpens(sfo->getScratchMaxOpensHash());
tempFile->setScratchDiskLogging(sfo->getScratchDiskLogging());
tempFile->setIoEventHandler(clusterDb_->ioEventHandler_);
tempFile->setCallingTcb(clusterDb_->callingTcb_);
tempFile->setIpcEnvironment(stmtGlobals->getIpcEnvironment());
tempFile->setScratchThreshold(clusterDb_->scratchThresholdPct_);
tempFile->setScratchIOVectorSize(clusterDb_->getScratchIOVectorSize());
tempFile->setScratchOverflowMode(clusterDb_->getScratchOverflowMode());
clusterDb_->tempFile_ = tempFile ;
};
// initialize for this cluster
tempFile_ = clusterDb_->tempFile_ ;
readHandle_ = new(collHeap(), FALSE) ClusterPassBack();
if ( ! readHandle_ ) {
*rc = EXE_NO_MEM_TO_EXEC;
return TRUE;
};
return FALSE;
}
// Flush the in-memory buffers of this cluster
// Returns: FALSE - either error, or need to call again
// TRUE -- flush is done
NABoolean Cluster::flush(ExeErrorCode * rc) {
// assume success
*rc = EXE_OK;
RESULT writeStatus = SCRATCH_SUCCESS;
ULng32 bufferSize = clusterDb_->bufferSize_;
// ensure that the scratch file was created
if ( ! tempFile_ )
if ( initScratch(rc) ) return FALSE; // return if there was an error
#ifdef DO_DEBUG
if ( clusterDb_->doLog_ && seqIDIndex_ > 0 ) { // DEBUG
char msg[256];
sprintf(msg,
"Cluster::flush() (%lu %lu). next buff %lu writeIOCnt %lu ",
(ULong)this & 0x0FFF,
(ULong)clusterDb_ & 0x0FFF,
(ULong)nextBufferToFlush_ & 0x0FFF,
writeIOCnt_);
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL,0,msg,clusterDb_->explainNodeId_);
}
#endif
// while there are buffers, and the IO is ready, write them to disk
// ( afterLastBufferToFlush_ is usually NULL ; it's only used by OLAP, where
// it is guaranteed to point to one of the following buffers.)
while ( nextBufferToFlush_ != afterLastBufferToFlush_ ) {
// are the previous I/Os still in flight, and we can not issue a new I/O ?
if ( IONotReady(rc) ) return FALSE; // return to scheduler (or error)
// first make sure that this buffer has rows
ex_assert( clusterDb_->hashOperator_ == ClusterDB::SEQUENCE_OLAP ||
nextBufferToFlush_->getRowCount(),
"flushed an empty buffer" );
// fire the I/O
writeStatus = tempFile_->writeThru(nextBufferToFlush_->getDataPointer(),
seqID_[seqIDIndex_] );
if ( writeStatus != SCRATCH_SUCCESS && writeStatus != IO_NOT_COMPLETE ) {
*rc = EXE_SORT_ERROR;
return FALSE;
}
// and do the remaining book-keeping
writeIOCnt_++;
clusterDb_->totalIOCnt_++;
if ( clusterDb_->hashOperStats_ ) clusterDb_->updateIOStats();
nextBufferToFlush_ = nextBufferToFlush_->getNext() ;
} // while ( nextBufferToFlush_ )
// No more buffers to flush -- ensure that all I/Os are done
if ( IONotComplete(rc) ) return FALSE; // return to scheduler (or error)
state_ = FLUSHED;
// For OLAP -- do not deallocate buffers (anyway bufferPool is not used).
if ( clusterDb_->hashOperator_ == ClusterDB::SEQUENCE_OLAP ) return TRUE;
// All writes completed -- remove the written buffers
while ( bufferPool_->getNext() ) { // remove all but the last buffer
delete bufferPool_;
}
// if the recent buffer was kept, then the last written buffer can be removed
// and the kept recent buffer would be used instead
if ( keepRecentBuffer_ ) { // make the kept buffer to be the new list
delete bufferPool_;
bufferPool_ = keepRecentBuffer_;
keepRecentBuffer_ = NULL ;
}
else { // the last buffer was flushed; reuse it as the new buffer list
// we are done with the cluster flush. reset the last buffer
bufferPool_->castToSerial()->clearRowCount();
// increase totalClusterSize_ because bufferPool_ is now empty but
// will be reused when inserting new rows.
totalClusterSize_ += bufferSize;
}
numInMemBuffers_ = 1; // only 1 in-memory buffer now
// Check if CLUSTER-SPLIT is needed, and perform the split if possible
// return FALSE if error ( *rc != 0 ) or if still processing the most
// recent (not full) buffer (the schedule would return us here later)
if ( checkAndSplit( rc ) ) return FALSE;
return TRUE; // Done writing this list of buffers.
}; // Cluster::flush()
/////////////////////////////////////////////////////////////////////////////
NABoolean Cluster::spill(ExeErrorCode * rc, NABoolean noAggregates) {
// assume success
*rc = EXE_OK;
NABoolean firstFlushDistinctGroupby = state_ == CHAINED && noAggregates ;
if ( firstFlushDistinctGroupby && keepRecentBuffer_ ) {
// first spill for distinct must flush that remaining not-full buffer
// because it contains groups that were already returned !!
// So re-chain the recent buffer into the buffer pool to be flushed
keepRecentBuffer_->setNext(bufferPool_);
bufferPool_ = keepRecentBuffer_;
keepRecentBuffer_ = NULL;
// start with the (just restored) first buffer in chain
nextBufferToFlush_ = bufferPool_ ;
}
if ( flush(rc) ) {
// the flush is done; reinitialize/renew the hash-table
if ( hashTable_->originalSize() ) {
// the old HT is at the original size; just re-initialize it
hashTable_->init();
}
else {
// Reallocate a (smaller) hash table.
removeHashTable();
// after a flush, a cluster starts with the minimal hash table
hashTable_ = createHashTable(clusterDb_->initialHashTableSize_, rc);
if (!hashTable_) return FALSE;
}
// If last buffer was kept, add its rows to the new hash-table
if ( keepRecentBuffer_ ) {
HashBuffer *hb = bufferPool_;
HashRow *dataPointer = hb->castToSerial()->getFirstRow();
for ( ULng32 rowIndex = 0 ;
rowIndex < hb->getRowCount() ;
rowIndex++ ) {
hashTable_->insert(dataPointer); // no resize
dataPointer = hb->castToSerial()->getNextRow(dataPointer);
}
}
if ( firstFlushDistinctGroupby ) {
ex_assert( 0 == maxSeqIDIndex_ ,"First time spill, index should be 0");
// For non-blocking, rows up to here were returned up, while the
// following rows would only be returned when the overflowed cluster
// is read from file, hence we keep the two lists seperate
// start a new sequence for the current cluster -- but at same index 0
// while the previous sequence-num is pushed to index 1 (because later
// we read first the buffers from index 1)
maxSeqIDIndex_++ ; // update the max
seqID_[1 + seqIDIndex_] = seqID_[seqIDIndex_] ; // push up
seqID_[seqIDIndex_] = clusterDb_->generateSequenceID();
}
return TRUE;
}
else
return FALSE;
};
// internal method: Set HL, create bitmap, return TRUE if no memory error
NABoolean Cluster::setHashLoop(ExeErrorCode * rc) {
if ( clusterDb_->doLog_ && ! clusterDb_->hashLoop_ ) { // first time
char msg[256];
sprintf(msg,
"HASH LOOP started (%lu %lu). Total memory used %u, cluster size " PF64 " ",
(ULong)this & 0x0FFF, (ULong)clusterDb_ & 0x0FFF,
clusterDb_->memoryUsed_, totalClusterSize_ );
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, clusterDb_->explainNodeId_);
}
clusterDb_->hashLoop_ = TRUE ;
// we might have to allocate the bitmap for the outer cluster;
// this is only done if the outer cluster doesn't have a
// bitmap already
Cluster * oCluster = getOuterCluster();
if ( oCluster && // right-HJ may have an empty outer cluster
oCluster->bitMapEnable_ &&
!oCluster->outerBitMap_ ) {
oCluster->outerBitMap_ =
new(collHeap(), FALSE) ClusterBitMap(oCluster->rowCount_, rc);
if (!oCluster->outerBitMap_ || *rc) {
*rc = EXE_NO_MEM_TO_EXEC;
return FALSE;
};
}
completeCurrentRead_ = TRUE; // End of a loop is like an end of cluster
state_ = IN_MEMORY; // the (read part of the) cluster is IN_MEMORY
if ( IONotComplete(rc) ) return FALSE; // return to scheduler (or error)
// squeeze out rows of other clusters (not needed for the first sequence)
if ( seqIDIndex_ < maxSeqIDIndex_ ) // if beyond the first sequence
if ( squeezeOutRowsOfOtherClusters(rc) ) return FALSE; // return if error
return TRUE;
}
/////////////////////////////////////////////////////////////////////////////
// internal method: If cluster contains variable length rows, move
// rows belonging to this cluster from original buffers new
// buffers. All rows of this cluster must be moved. Can reuse buffers
// after they have been processed.
//
// If cluster contains fixed length rows, move rows from last read
// buffer(s) into locations of rows that belong to other clusters
//
// (used only after a cluster split) Works with _multiple_ mixed
// buffers !! return TRUE if error
/////////////////////////////////////////////////////////////////////////////
NABoolean Cluster::squeezeOutRowsOfOtherClusters(ExeErrorCode * rc)
{
if(bufferPool_->isVariableLength()) {
// If the rows in these buffers are variable length, then we
// cannot do this squeeze operation in place. We must copy each
// row from the existing buffers to the new buffers. However, we
// can reuse the buffers that we have already processed.
// The source data
HashBuffer *oldBuffs = bufferPool_;
// The destination. Only rows that belong to this cluster will be
// moved here.
HashBuffer *newBuffs = NULL;
// Keep track of the buffers that have been processed. They can
// be reused.
HashBuffer *freeBuffs = NULL;
// Process all the buffers
while(oldBuffs) {
HashBuffer *oldBuff = oldBuffs;
oldBuff->castToSerial()->fixup();
// Check each row of the current buffer
HashRow *row = oldBuff->castToSerial()->getFirstRow();
for(UInt32 i = 0; i < oldBuff->getRowCount(); i++) {
ULng32 bucketId = row->hashValue() % clusterDb_->bucketCount_;
// If this row belongs to this cluster, copy it to the
// destination. Otherwise, ignore it.
if ( clusterDb_->buckets_[bucketId].getInnerCluster() == this ) {
// If we need a new buffer, allocate one or take one off of
// the free list.
if(!newBuffs || !newBuffs->castToSerial()->notFull()) {
HashBuffer *newBuff = NULL;
// If there are free buffers, use one
if(freeBuffs) {
newBuff = freeBuffs;
freeBuffs = freeBuffs->getNext();
newBuff->setNext(NULL);
} else {
// Otherwise, we must allocate a new one.
newBuff = new(collHeap(), FALSE) HashBuffer(this);
if ( ! newBuff || ! newBuff->getDataPointer() ) {
*rc = EXE_NO_MEM_TO_EXEC;
return TRUE;
}
}
// Place new buffer at the head of the new buffers list.
newBuff->setNext(newBuffs);
newBuffs = newBuff;
}
// Copy the row from the source (oldBuff) to the destination
// (newBuffs). Resize the row based on the size of the source row.
HashRow *dstRow = newBuffs->castToSerial()->getFreeRow();
UInt32 len = oldBuff->castToSerial()->getRowLength(row);
memcpy((char *)dstRow, (char *)row, len + sizeof(HashRow));
newBuffs->castToSerial()->resizeLastRow(len, dstRow);
}
// Move to the next row of the source.
row = oldBuff->castToSerial()->getNextRow(row);
}
// Move to the next buffer of the source.
// Move the buffer we just finished onto the free list.
oldBuffs = oldBuff->getNext();
oldBuff->setNext(freeBuffs);
freeBuffs = oldBuff;
freeBuffs->init(this);
}
// If there are any remaining free buffers, delete them.
while (freeBuffs) {
HashBuffer *freeBuff = freeBuffs;
freeBuffs = freeBuffs->getNext();
delete freeBuff;
}
bufferPool_ = newBuffs;
lastSqueezedBuffer_ = bufferPool_;
return FALSE;
} else {
// If this cluster contains fixed length rows, then we can move
// rows from the source into vacant slots in the destination.
// assume lastSqueezedBuffer_ points to the last squeezed buffer in the
// buffer list, and the buffers ahead have mixed rows (incl other clusters)
// lastSqueezedBuffer_
// |
// V
// ---- ---- ---- ---- ---------------
// bufferPool_ -> | | -> | | -> | | -> | | --> | RRRRRRR| -> ...
// ---- ---- ---- ---- ---------------
// When no lastSqueezedBuffer_ exists (e.g., with hash-loop, or if no input
// rows came after the split), then create an empty one and chain it at the
// end of the buffers list.
if ( ! lastSqueezedBuffer_ ) {
// no memory pressure check
lastSqueezedBuffer_ = new(collHeap(), FALSE) HashBuffer(this);
if ( ! lastSqueezedBuffer_ || ! lastSqueezedBuffer_->getDataPointer() ) {
*rc = EXE_NO_MEM_TO_EXEC;
return TRUE;
};
// chain the new buffer at the end of the buffer pool list
HashBuffer * checkBuff = bufferPool_;
HashBuffer * nextBuff = checkBuff->getNext() ;
while ( nextBuff != NULL ) {
checkBuff = nextBuff;
nextBuff = nextBuff->getNext() ;
}
checkBuff->setNext(lastSqueezedBuffer_) ;
}
// ========================================================================
// The squeeze algorithm: First find the bottom most slot for the row (either
// in the lastSqueezedBuffer_, or if full, in the buffer above it), then
// find the top most row (of this cluster) to move to that slot.
// If both are found -- move that row to that slot. If not (e.g., reached
// the end of the buffer) then rectify and try again.
// Invariants to keep:
// 1. lastSqueezedBuffer_ is always in a complete state (i.e., its row count
// is correct; no rows in theat buffer above that count).
// 2. When returning, bufferPool_ equals lastSqueezedBuffer_ .
// ========================================================================
// Rows that belong to this cluster are moved from the fromBuff to the
// toBuff, and when fromBuff gets empty, it is deallocated.
HashBuffer * toBuff = lastSqueezedBuffer_ ;
HashBuffer * fromBuff = bufferPool_ ; // fromBuff is always == bufferPool_
// invariant - when toSlot/fromRow is NULL, then toRowPos/fromRowPos has
// the position to check next; when not null, then they have the
// current position of that slot/row.
char * toSlot = NULL, * fromRow = NULL;
UInt32 numConfirmedRowsInBuff = lastSqueezedBuffer_->getRowCount() ;
Int32 toRowPos = (Int32) toBuff->getRowCount(),
fromRowPos = (Int32) fromBuff->getRowCount() - 1;
const UInt32 rowLength = lastSqueezedBuffer_->getMaxRowLength() ;
const UInt32 maxRowsInBuffer = lastSqueezedBuffer_->castToSerial()->getMaxNumFullRows();
//
// Search for a slot, then for a row to move, then is possible move that
// row down.
//
while ( TRUE ) { // while there are still rows to move
// ******* find a free toSlot in toBuff *******
while ( ! toSlot ) {
// update toBuff to next buffer, if needed
if ( toRowPos >= (Int32) maxRowsInBuffer ) { // is this "to" buffer full?
lastSqueezedBuffer_ = toBuff ; // toBuff is all squeezed
lastSqueezedBuffer_->castToSerial()->setRowCount(maxRowsInBuffer);
// in some rare case ...
if ( bufferPool_ == lastSqueezedBuffer_ ) return FALSE;
HashBuffer * checkBuff = bufferPool_;
HashBuffer * nextBuff = checkBuff->getNext() ;
while ( nextBuff != lastSqueezedBuffer_ ) {
checkBuff = nextBuff;
nextBuff = nextBuff->getNext() ;
}
toBuff = checkBuff;
toRowPos = 0; // start from row position zero
numConfirmedRowsInBuff = 0;
} // if ( toRowPos >= maxRowsInBuffer ) // is this "to" buffer full
char * toDataPointer = toBuff->getFirstRow() + toRowPos * rowLength ;
HashRow * row = (HashRow *) toDataPointer;
ULng32 bucketId = row->hashValue() % clusterDb_->bucketCount_;
// Are we done ? If end of this batch -- finish up and return
if ( toBuff == fromBuff && // only check when "to" and "from" are same
( row->bitSet() || // was this new row already checked by "from" ?
toRowPos >= fromRowPos ) ) { // "from" hasn't checked this row yet
// handle case of a row not yet checked by "from"
if ( ! row->bitSet() &&
clusterDb_->buckets_[bucketId].getInnerCluster() == this )
++numConfirmedRowsInBuff;
if ( numConfirmedRowsInBuff ) { // there are rows in this buffer
lastSqueezedBuffer_ = toBuff ; // toBuff is all squeezed
lastSqueezedBuffer_->castToSerial()->setRowCount(numConfirmedRowsInBuff);
}
else { // another rare case: just crossed over to an empty bufferPool
delete bufferPool_ ;
}
ex_assert ( bufferPool_ == lastSqueezedBuffer_ ,
"bufferPool is not the last squeezed buffer");
return FALSE;
} else { // find an available slot
if ( toRowPos >= (Int32) toBuff->getRowCount()
// this buffer was not full, and we reached over the top row
|| // or
clusterDb_->buckets_[bucketId].getInnerCluster() != this
// This slot was held by another cluster's row; reuse it
)
toSlot = toDataPointer;
else {
++toRowPos; // prepare for the next one
++numConfirmedRowsInBuff;
}
}
} // while ( ! toSlot )
// **** find the last row (of this cluster) in fromBuff and move it ****
while ( ! fromRow ) {
// if no more rows in this buffer, deallocate and use next buffer
if ( fromRowPos < 0 ) {
ex_assert ( bufferPool_ != lastSqueezedBuffer_ ,
"Empty bufferPool is the last squeezed buffer");
delete bufferPool_; // the dtor removes the first buffer in list
fromBuff = bufferPool_;
fromRowPos = (Int32) fromBuff->getRowCount() - 1;
// just in case ...
if ( fromBuff == lastSqueezedBuffer_ ) return FALSE;
}
// If row at from-pos belongs to this cluster -- use it !
char * fromDataPointer = fromBuff->getFirstRow() + fromRowPos * rowLength ;
HashRow * row = (HashRow *) fromDataPointer;
ULng32 bucketId = row->hashValue() % clusterDb_->bucketCount_;
if ( clusterDb_->buckets_[bucketId].getInnerCluster() == this )
fromRow = fromDataPointer;
else
--fromRowPos; // prepare for the next one
// in case the "from" just crossed the "to", in the same buffer
if ( toBuff == fromBuff && toRowPos >= fromRowPos ) {
// mark this buffer as last squeezed and return
if ( numConfirmedRowsInBuff ) { // there are rows in this buffer
lastSqueezedBuffer_ = toBuff ; // toBuff is all squeezed
lastSqueezedBuffer_->castToSerial()->setRowCount(numConfirmedRowsInBuff);
}
else { // another rare case: just crossed over to an empty bufferPool
delete bufferPool_ ;
}
ex_assert ( bufferPool_ == lastSqueezedBuffer_ ,
"bufferPool is not the last squeezed buffer");
return FALSE;
}
// mark this row to make the "to" stop if needed
// (this row is either skipped or would be moved)
row->setBit(TRUE);
} // while ( ! fromRow )
// ***** Move the row into the slot **********
memcpy(toSlot, fromRow, rowLength );
// in case the toBuff is the last squeezed, or incomplete
// - then update its row count
if ( toBuff == lastSqueezedBuffer_ ||
toRowPos == toBuff->getRowCount() )
toBuff->castToSerial()->setRowCount(toBuff->getRowCount()+1);
HashRow * row = (HashRow *) toSlot;
row->setBit(FALSE); // erase the mark in the moved row
toSlot = fromRow = NULL ; // both were used; reset them
++toRowPos; // prepare for the next one
--fromRowPos; // prepare for the next one
++numConfirmedRowsInBuff; // one more row of this cluster in this buffer
} // while ( TRUE )
}
} // Cluster::squeezeOutRowsOfOtherClusters()
/////////////////////////////////////////////////////////////////////////////
// Read a cluster. If it is an inner Cluster, read as many buffers as
// possible. If it is an inner and we couldn't read all buffers, set
// hashLoop_ == TRUE (this is for inner Clusters during PHASE_3).
// If it in an outer Cluster, read a batch of buffers (this is for outer
// clusters during PHASE_3).
/////////////////////////////////////////////////////////////////////////////
NABoolean Cluster::read(ExeErrorCode * rc) {
// assume success
*rc = EXE_OK;
RESULT readStatus = SCRATCH_SUCCESS;
char * dataPtr = NULL;
// FIRST -- handle end of (batch) read
if ( completeCurrentRead_ ) {
// Synchronize with previously issued READs - ensure that all I/Os are done
if ( IONotComplete(rc) ) return FALSE; // return to scheduler (or error)
completeCurrentRead_ = FALSE ; // prepare for next time, if needed
// this is a regular flushed cluster (i.e., not
// -- an (inner) cluster that was split
// or -- an (outer) cluster from HGB Distinct
if ( ! maxSeqIDIndex_ ) {
if ( readHandle_->endOfSequence() )
state_ = IN_MEMORY; // mark the cluster as IN_MEMORY
return TRUE; // this read is done !!
}
// Handle an (outer) cluster from HGB Distinct
if ( ! isInner_ ) {
// once we finished the first sequence, return all the new groups
if ( seqIDIndex_ < maxSeqIDIndex_ )
returnOverflowRows_ = TRUE;
if ( readHandle_->endOfSequence() ) {
if ( seqIDIndex_ ) {
--seqIDIndex_ ; // end of first sequence; go to 2nd/last
readHandle_->initCPB(); // start reading from begining of sequence
}
else state_ = IN_MEMORY; // 2nd sequence: mark the cluster as IN_MEMORY
}
return TRUE;
}
// code below: only for handling an inner cluster that had a split
// squeeze out rows of other clusters (not needed for the first sequence)
if ( seqIDIndex_ < maxSeqIDIndex_ ) // if beyond the first sequence
if ( squeezeOutRowsOfOtherClusters(rc) ) return FALSE; // return if error
// if reached end of sequence (and there was a split)
if ( readHandle_->endOfSequence() ) {
if ( ! seqIDIndex_ ) { // was that the last sequence ?
state_ = IN_MEMORY; // mark the cluster as IN_MEMORY
return TRUE;
}
if ( seqIDIndex_ == maxSeqIDIndex_ ) // if this was the first sequence
lastSqueezedBuffer_ = bufferPool_ ; // initialize
ex_assert( lastSqueezedBuffer_ == bufferPool_,
"Last squeezed buffer must be the first in the buffer pool");
// prepare for next sequence
--seqIDIndex_;
readHandle_->initCPB(); // start reading from begining of sequence
} // if ( readHandle_->endOfSequence() ) and there was a split
} // if ( completeCurrentRead_ )
// **** read several buffers ********
while ( TRUE ) { // there are more buffers to read
HashBuffer * newBuffer = NULL;
// are all previous I/Os still in flight, and we can not issue a new I/O ?
if ( IONotReady(rc, (UInt32) seqID_[seqIDIndex_]) )
return FALSE; // return to scheduler (or error)
if ( isInner_ ) { // get a buffer, or if can't then switch to hash-loop
// If this is not the first buffer, check if we're out of memory and
// need to switch to a Hash-Loop. For the first buffer the check is
// skipped, and we try and allocate that buffer.
if ( buffersRead_ ) {
// We need enough space for another buffer (we might need memory for
// two buffers, because we need one buffer for the outer cluster)
ULng32 requiredMem = clusterDb_->bufferSize_;
if ( getOuterCluster() && // avoid case of right join without an outer
! getOuterCluster()->bufferPool_ ) requiredMem *= 2 ;
// we also need memory to build the hash table for the rows already
// read plus the potential rows in the next buffer
requiredMem += getMemorySizeForHashTable(readCount_ +
bufferPool_ ? bufferPool_->getRowCount() : 0);
if ( !clusterDb_->enoughMemory(requiredMem)
// For debugging only !! Force HL after reading so many buffers
|| clusterDb_->forceHashLoopAfterNumBuffers_ == buffersRead_
)
// Not enough memory -- we have to use Hash-Loop for this cluster
return ( setHashLoop(rc) ); // return FALSE if no-mem error
} // if ( buffersRead_ )
newBuffer = new(collHeap(), FALSE) HashBuffer(this);
if ( !newBuffer || !newBuffer->getDataPointer() ) {
if ( ! buffersRead_ ) { // no buffers read yet -- can't even hash loop
*rc = EXE_NO_MEM_TO_EXEC;
return FALSE;
};
// reset rc in case it was set by the constructor of HashBuffer
*rc = EXE_OK;
state_ = IN_MEMORY; // mark the cluster as IN_MEMORY for the hash loop
// Inner and No memory ==> calls for a Hash Loop
return ( setHashLoop(rc) );
};
// we got a new buffer. Chain it into the bufferlist
newBuffer->setNext(bufferPool_);
bufferPool_ = newBuffer;
dataPtr = newBuffer->getDataPointer() ;
// if this cluster had a split, and reading its 2nd or 3rd sequence
// then only read a batch of buffers, not all at once
if ( seqIDIndex_ < maxSeqIDIndex_ ) {
if ( 0 == --batchCountDown_ ) { // only one more left in this batch
batchCountDown_ = clusterDb_->numReadOuterBatch_ ; // for next batch
// the next read would be the last for this batch
completeCurrentRead_ = TRUE ;
}
}
} // if ( isInner_ )
else { // *** outer ***
// if no more read buffers are available, then complete this batch
if ( nextBufferToRead_ == afterLastBufferToRead_ ) {
completeCurrentRead_ = TRUE;
return read(rc); // to also check if IO happened to complete
}
// only for the case of OLAP with "bounded following" -- need to
// continue cyclically to the first buffer in the list.
if ( NULL == nextBufferToRead_ ) {
ex_assert(firstBufferInList_,"first buffer not set for OLAP");
nextBufferToRead_ = firstBufferInList_;
}
dataPtr = nextBufferToRead_->getDataPointer() ;
nextBufferToRead_ = nextBufferToRead_->getNext();
}
// ****** ISSUE THE READ ********
readStatus = tempFile_->readThru( dataPtr ,
seqID_[seqIDIndex_],
readHandle_ );
if ( readStatus == SCRATCH_SUCCESS || readStatus == IO_NOT_COMPLETE ) {
// update counters; likely ahead of the actual read but that's OK
buffersRead_++;
readIOCnt_++;
clusterDb_->totalIOCnt_++;
if ( clusterDb_->hashOperStats_ ) clusterDb_->updateIOStats();
}
else { // probably SCRATCH_FAILURE
NABoolean realFailure = TRUE;
// check for a special case -- nothing was written to that sequence
// (e.g., when no more input came immediately after a cluster split)
// this case is benign -- just complete the read below.
SortError *sortError = tempFile_->getSortError();
if ( sortError && readHandle_->endOfSequence() )
realFailure = EInvScrBlockNum != - sortError->getSortError();
if ( realFailure ) {
*rc = EXE_SORT_ERROR;
return FALSE;
}
}
// At end of a sequence
if ( readHandle_->endOfSequence() ) {
if ( isInner_ ) {
ex_assert( maxSeqIDIndex_ /* split */ || readIOCnt_ == writeIOCnt_ ,
"Inner cluster did not read as many buffers as written!");
// in case we were in a hash-loop (and if split, the last sequence)
if ( ! seqIDIndex_ ) clusterDb_->hashLoop_ = FALSE ;
}
completeCurrentRead_ = TRUE;
}
if ( completeCurrentRead_ )
return read(rc); // to also check if IO happened to complete
} // WHILE ( TRUE )
};
/////////////////////////////////////////////////////////////////////////////
// Return: FALSE - No split needed, or split was done successfully
// TRUE -- there was an error
NABoolean Cluster::checkAndSplit(ExeErrorCode * rc)
{
* rc = EXE_OK;
if ( ! isInner_ ||
bucketCount_ <= 1 ||
dontSplit_ ) return FALSE; // then do not split
// Check if a Cluster-Split is needed
// total memory needed == inner cluster + outer cluster buffer + hash table
Int64 requiredMem = totalClusterSize_ + clusterDb_->bufferSize_ +
getMemorySizeForHashTable(rowCount_);
if ( ( requiredMem <= clusterDb_->maxClusterSize_ // need more than max ?
|| clusterDb_->earlyOverflowStarted_ ) // can't tell max cluster size
&& ( ! clusterDb_->forceClusterSplitAfterMB_ // testing CQD was set ?
|| requiredMem <= clusterDb_->forceClusterSplitAfterMB_* ONE_MEG ))
return FALSE ; // split is not needed
// do not split if data is significantly skewed (i.e., mostly in 1 bucket)
for ( UInt32 ib = 0; ib < bucketCount_; ib++)
// does some single bucket hold more than %80 of the cluster's rows ?
if ( buckets_[ib].getRowCount() * 5 >= 4 * getRowCount() ) {
if ( clusterDb_->doLog_ ) { // report that skew
char msg[256];
sprintf(msg,
"Skewed data; no cluster split (%lu %lu). Cluster size " PF64 " ",
(ULong)this & 0x0FFF,
(ULong)clusterDb_ & 0x0FFF,
totalClusterSize_ );
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL,0,msg,clusterDb_->explainNodeId_);
}
dontSplit_ = TRUE;
return FALSE;
} // if more than %80
// Can not split if most recent (not full) buffer was kept in memory !
if ( bufferPool_->getRowCount() > 0 ) { // that was keepRecentBuffer_
// set the next buffer to be flushed to be that remaining kept buffer
nextBufferToFlush_ = bufferPool_ ;
// flush that last buffer (note: this is a recursive call!) and return
return ! flush(rc);
}
// the cluster has only half as many buckets left
bucketCount_ /= 2;
// now create a new cluster and chain it into the chain after the
// current cluster
next_ = new(collHeap(), FALSE) Cluster(Cluster::FLUSHED,
clusterDb_,
&buckets_[bucketCount_],
bucketCount_,
rowLength_,
useVariableLength_,
considerBufferDefrag_,
insertAtpIndex_,
isInner_,
bitMapEnable_,
next_,
rc);
if ( !next_ || *rc ) {
if ( ! rc ) * rc = EXE_NO_MEM_TO_EXEC;
return TRUE;
}
// Update the number of rows in the current cluster: Subtract the rows
// taken by the new cluster.
rowCount_ -= next_->rowCount_ ;
// Calculate the total size of the new cluster based on the ratio of
// rows in the new cluster to total rows. Assume that the ratio of
// rows will reflect the ratio of total size.
next_->totalClusterSize_ =
(next_->rowCount_/(rowCount_ + next_->rowCount_)) * totalClusterSize_;
// Update the total size of the current cluster: subtract the new's size
totalClusterSize_ -= next_->totalClusterSize_ ;
// and add the size of the current buffer (which was flushed and now is empty)
totalClusterSize_ += clusterDb_->bufferSize_;
numInMemBuffers_ = 1; // only 1 in-memory buffer now
// Allocate a buffer for the new cluster. This allocation happens
// unconditionally, i.e., we don't care if there is enough memory.
HashBuffer * newBuffer = new(collHeap(), FALSE) HashBuffer(next_);
if (!newBuffer || !newBuffer->getDataPointer() ) {
if (newBuffer) delete newBuffer;
*rc = EXE_NO_MEM_TO_EXEC;
return TRUE;
};
next_->bufferPool_ = newBuffer;
next_->numInMemBuffers_ = 1; // only 1 in-memory buffer
// and add the size of the new buffer
next_->totalClusterSize_ += clusterDb_->bufferSize_;
//
// Now set the overflow sequences, for both clusters
//
// Update the new cluster to have the same previous sequences as the current
for ( next_->seqIDIndex_ = 0 ;
next_->seqIDIndex_ <= seqIDIndex_ ;
next_->seqIDIndex_++ )
next_->seqID_[next_->seqIDIndex_] = seqID_[next_->seqIDIndex_] ;
next_->maxSeqIDIndex_ = next_->seqIDIndex_ ; // update the max idx
// A new sequence for the new cluster
next_->seqID_[next_->seqIDIndex_] = clusterDb_->generateSequenceID();
// start a new sequence for the current cluster
maxSeqIDIndex_ = ++seqIDIndex_ ; // increment, and update the max
seqID_[seqIDIndex_] = clusterDb_->generateSequenceID();
// Just in case the new cluster gets no more rows, initialize its scratch
if ( next_->initScratch(rc) ) return TRUE;
if ( clusterDb_->doLog_ ) { // report that a split happened
char msg[256];
sprintf(msg,
"Cluster-Split: New/old (%lu/%lu %lu): sizes " PF64 "/" PF64 " rowCount %u/%u seqIDindex %hu",
(ULong)next_ & 0x0FFF,
(ULong)this & 0x0FFF,
(ULong)clusterDb_ & 0x0FFF,
next_->totalClusterSize_, totalClusterSize_,
next_->rowCount_, rowCount_,
seqIDIndex_ );
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL,0,msg,clusterDb_->explainNodeId_);
}
if ( clusterDb_->hashOperStats_ && clusterDb_->hashOperStats_->castToExHashJoinStats() ) // keep stats about splits (HJ only)
((ExHashJoinStats*)clusterDb_->hashOperStats_)->incrClusterSplits();
return FALSE;
};
/////////////////////////////////////////////////////////////////////////////
NABoolean Cluster::chain(NABoolean forceChaining, ExeErrorCode * rc) {
// assume success
*rc = EXE_OK;
// we can only chain clusters in memory
ex_assert((state_ == Cluster::IN_MEMORY),
"tried to chain a unchainable cluster");
// if we can't get enough space for the hash table, don't chain
// the hash table has rowCount_ * 1.5 entries.
// if we have a readCount_, we are in a hash loop, thus, we don't
// allocate hash chain headers for all row in the inner table
// (rowCount_). Instead, we use readCount_ which tells us how many
// rows we have read in this loop.
ULng32 hashTableEntryCount = ( readCount_ ? readCount_ : rowCount_ );
hashTable_ = createHashTable( getMemorySizeForHashTable(hashTableEntryCount),
rc,
! forceChaining ); // force? -> no mem check
if (!hashTable_) return FALSE;
HashBuffer *hb = bufferPool_;
// If performing a CROSS_PRODUCT operation, reverse the list of
// buffers to keep the rows in order.
if (clusterDb_->hashOperator_ == ClusterDB::CROSS_PRODUCT && hb != NULL) {
HashBuffer *revList = hb;
hb = hb->getNext();
revList->setNext(NULL);
while(hb) {
HashBuffer *next = hb->getNext();
hb->setNext(revList);
revList = hb;
hb = next;
}
hb = revList;
}
// scan through all the rows in the bufferPool_ and chain them into the HT
tupp tupp1 =clusterDb_->workAtp_->getTupp(clusterDb_->hashTableRowAtpIndex1_);
tupp tupp2 =clusterDb_->workAtp_->getTupp(clusterDb_->hashTableRowAtpIndex2_);
while (hb) {
hb->castToSerial()->fixup();
HashRow *dataPointer = hb->castToSerial()->getFirstRow();
for ( ULng32 rowIndex = 0 ;
rowIndex < hb->getRowCount() ;
rowIndex++)
{
if (clusterDb_->hashOperator_ == ClusterDB::CROSS_PRODUCT) {
hashTable_->insertSingleChain(dataPointer);
} else {
// insert row into hash table
hashTable_->insert(clusterDb_->workAtp_,
dataPointer,
tupp1,
tupp2,
clusterDb_->searchExpr_);
}
dataPointer = hb->castToSerial()->getNextRow(dataPointer);
}
hb = hb->getNext();
} // while (hb)
// the cluster is chained now
state_ = CHAINED;
return TRUE;
};
/////////////////////////////////////////////////////////////////////////////
ExeErrorCode Cluster::setUpOuter(ULng32 rowLength,
short atpIndex,
NABoolean bitMapEnable) {
// assume success
ExeErrorCode rc = EXE_OK;
ex_assert(state_ == FLUSHED, "Only a FLUSHED inner cluster gets an outer");
ex_assert(isInner_, "can't set up outer Cluster for an outer Cluster");
ex_assert(bufferPool_->getNext() == NULL, "cluster has too many buffers");
ex_assert(!bufferPool_->getRowCount(), "empty buffer has positive rowCount");
// when the last buffer in the cluster is flushed, bufferPool_ is empty
// but it still counts as part of totalClusterSize_. so we need to subtract
// the empty buffer.
totalClusterSize_ -= clusterDb_->bufferSize_;
// the inner cluster is flushed and contains rows (all rows on temporary
// file). We need an outer cluster for it.
Cluster * oCluster = new(collHeap(), FALSE) Cluster(FLUSHED,
clusterDb_,
buckets_,
bucketCount_,
rowLength,
useVariableLength_,
considerBufferDefrag_,
atpIndex,
FALSE, // outer cluster
bitMapEnable,
NULL, // no next ptr
&rc,
bufferPool_);
if (!oCluster) return EXE_NO_MEM_TO_EXEC;
if (rc) return rc;
bufferPool_ = NULL;
for (ULng32 i = 0; i < bucketCount_; i++)
buckets_[i].setOuterCluster(oCluster);
return rc;
};
/////////////////////////////////////////////////////////////////////////////
void Cluster::resetForHashLoop() {
numLoops_++; // finished another iteration
if ( clusterDb_->hashOperStats_ && clusterDb_->hashOperStats_->castToExHashJoinStats()) // keep stats about hash loops (HJ only)
((ExHashJoinStats*)clusterDb_->hashOperStats_)->incrHashLoops();
// delete all buffers of the inner cluster. It would be nice
// to keep one buffer, but Cluster::read() always allocates a buffer
// for inner, i.e. it is not aware of this special case.
releaseAllHashBuffers();
// reset the readCount_ of the inner cluster. This is used to
// determine the size of the hash table in a hash loop.
readCount_ = 0;
// clear lastSqueezedBuffer_ after each hash loop
lastSqueezedBuffer_ = NULL;
// remove the hash table
removeHashTable();
completeCurrentRead_ = FALSE; // Beginning of a loop is like a new cluster
state_ = FLUSHED; // now the cluster is not IN_MEMORY
buffersRead_ = 0;
// get the outer cluster
Cluster * oCluster = getOuterCluster();
if ( oCluster ) { // if not a right HJ with an empty outer
// reset the outer cluster so that it is read again
oCluster->readCount_ = 0;
// reset the file position
oCluster->readHandle_->initCPB();
}
// we don't release the buffer of the outer, since we need it
};
#endif
/////////////////////////////////////////////////////////////////////////////
ExClusterStats* Cluster::getStats(CollHeap* heap) {
ExStatsCounter hashChains;
if (hashTable_)
for (ULng32 i = 0; i < hashTable_->getHeaderCount(); i++) {
ULng32 value = hashTable_->getChainSize(i);
if (value != 0)
hashChains.addEntry(value);
};
ExClusterStats* stats = new(heap)ExClusterStats(isInner_,
bucketCount_,
rowCount_,
totalClusterSize_,
hashChains,
writeIOCnt_,
readIOCnt_);
return stats;
};
/////////////////////////////////////////////////////////////////////////////
void Cluster::releaseAllHashBuffers() {
// release all hashbuffers of this cluster
while (bufferPool_)
delete bufferPool_;
bufferPool_ = NULL;
};
/////////////////////////////////////////////////////////////////////////////
// get the next row in main memory
HashRow * Cluster::advance() {
ex_assert(scanPosition_, "tried to advance on a non-existent hash buffer");
HashRow * dataPointer = scanPosition_->castToSerial()->getCurrentRowAndAdvance();
// If finished this outer buffer, try the next buffer in the list
if ( !dataPointer && scanPosition_->getNext()
&& scanPosition_->getNext()->getRowCount() ) {
scanPosition_ = scanPosition_->getNext();
scanPosition_->castToSerial()->positionOnFirstRow();
dataPointer = scanPosition_->castToSerial()->getCurrentRowAndAdvance();
};
return dataPointer;
};
HashRow * Cluster::getCurrentRow() {
ex_assert(scanPosition_, "tried to advance on a non-existent hash buffer");
HashRow * dataPointer = scanPosition_->castToSerial()->getCurrentRow();
if ( !dataPointer && scanPosition_->getNext()
&& scanPosition_->getNext()->getRowCount() ) {
scanPosition_ = scanPosition_->getNext();
scanPosition_->castToSerial()->positionOnFirstRow();
dataPointer = scanPosition_->castToSerial()->getCurrentRow();
};
return dataPointer;
};
ULng32 Cluster::getMemorySizeForHashTable(ULng32 entryCount)
{
// see constraints in hash_table.h
entryCount = MINOF(entryCount, MAX_HEADER_COUNT);
entryCount = MAXOF(entryCount, MIN_HEADER_COUNT); // just a sanity
// + %50 for hash chains
return (entryCount * sizeof(HashTableHeader) * 3 / 2);
}
#ifndef __EID
void Cluster::updateBitMap() {
ex_assert(outerBitMap_, "no bitmap to update");
outerBitMap_->setBit(readCount_);
};
NABoolean Cluster::testBitMap() {
ex_assert(outerBitMap_, "no bitmap to test");
return (outerBitMap_->testBit(readCount_));
};
#include "ComCextdecs.h"
// LCOV_EXCL_START
void IOTimer::resetTimer()
{
ioStarted_ = FALSE;
accumTime_ = 0;
};
NABoolean IOTimer::startTimer()
{
if (ioStarted_)
return FALSE;
else
{
startTime_ = NA_JulianTimestamp();
ioStarted_ = TRUE;
return TRUE;
}
};
Int64 IOTimer::endTimer()
{
if (ioStarted_)
{
ioStarted_ = FALSE;
accumTime_ += NA_JulianTimestamp() - startTime_;
}
return accumTime_;
};
void ClusterDB::getScratchErrorDetail(Lng32 &scratchError,
Lng32 &scratchSysError,
Lng32 &scratchSysErrorDetail,
char *errorMsg)
{
SortError *sError = NULL;
if(tempFile_)
sError = tempFile_->getSortError();
if(sError)
{
scratchError = (Lng32)sError->getSortError();
scratchSysError = (Lng32)sError->getSysError();
scratchSysErrorDetail = (Lng32)sError->getErrorDetail();
strcpy(errorMsg, sError->getSortErrorMsg());
}
}
// LCOV_EXCL_STOP
// Every time totalIOCnt_ grows, update the stats for this operator
void ClusterDB::updateIOStats()
{
Int64 ioSize = (Int64) bufferSize_ * (Int64) totalIOCnt_ ;
if (hashOperStats_->castToExHashGroupByStats())
hashOperStats_->castToExHashGroupByStats()->updIoSize(ioSize);
else
if (hashOperStats_->castToExHashJoinStats())
hashOperStats_->castToExHashJoinStats()->updIoSize(ioSize);
}
#endif
// Every time memoryUsed_ grows, update the stats for max memory used by this operator
void ClusterDB::updateMemoryStats()
{
if (hashOperStats_->castToExHashGroupByStats())
hashOperStats_->castToExHashGroupByStats()->updMemorySize(memoryUsed_);
else
if (hashOperStats_->castToExHashJoinStats())
hashOperStats_->castToExHashJoinStats()->updMemorySize(memoryUsed_);
}