blob: e0835b95370c846aa7d9e3e5b54d008ec8d58008 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/*
******************************************************************************
*
* File: ex_hash_grby.c
* Description: Methods for the tdb and tcb of hash aggregate/grby operator
*
*
* Created: 5/3/94
* Language: C++
*
*
*
*
******************************************************************************
*/
//
// This file contains all the generator and executor methods associated
// with a hash aggr/grby operation.
//
#include "ex_stdh.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ex_hash_grby.h"
#include "ex_expr.h"
#include "ExSimpleSqlBuffer.h"
#include "ExTrieTable.h"
#include "ExBitMapTable.h"
#include "ExStats.h"
#include "ex_error.h"
#include "ex_exe_stmt_globals.h"
#include "memorymonitor.h"
#include "logmxevent.h"
#include "sql_buffer_size.h"
//////////////////////////////////////////////////////////////////////////////
//
// TDB procedures
//
//////////////////////////////////////////////////////////////////////////////
// In the reverse order of ex_hash_grbytcb::HashGrbyPhase
// Limit to 11 characters
const char *ex_hash_grby_tcb::HashGrbyPhaseStr[] = {
"PHASE_END",
"READ_SPILL",
"RETURN_ROWS",
"READ_ROWS"
};
NABoolean ex_hash_grby_tcb::needStatsEntry()
{
ComTdb::CollectStatsType statsType = getGlobals()->getStatsArea()->getCollectStatsType();
// stats are collected for ALL and OPERATOR options.
if (statsType == ComTdb::ALL_STATS || statsType == ComTdb::OPERATOR_STATS)
return TRUE;
else
return FALSE;
}
ExOperStats * ex_hash_grby_tcb::doAllocateStatsEntry(CollHeap *heap,
ComTdb *tdb)
{
ExBMOStats *stat;
ComTdbHashGrby *comTdbHashGrby = (ComTdbHashGrby *)tdb;
if (comTdbHashGrby->isNonBMOPartialGroupBy())
return ex_tcb::doAllocateStatsEntry(heap, tdb);
ComTdb::CollectStatsType statsType = getGlobals()->getStatsArea()->getCollectStatsType();
if (statsType == ComTdb::OPERATOR_STATS)
stat = new (heap) ExBMOStats(heap, this, tdb);
else
{
stat = (ExBMOStats *)new(heap) ExHashGroupByStats(heap,
this,
tdb);
hashGroupByStats_ = (ExHashGroupByStats *)stat;
}
bmoStats_ = stat;
return stat;
}
//
// Build an hash_grby tcb
//
ex_tcb * ex_hash_grby_tdb::build(ex_globals * glob) {
// first build the child
ex_tcb * childTcb;
ex_hash_grby_tcb * hash_grby_tcb;
childTcb = childTdb_->build(glob);
hash_grby_tcb = new(glob->getSpace())
ex_hash_grby_tcb(*this, *childTcb, glob);
// add the hash_grby_tcb to the schedule
hash_grby_tcb->registerSubtasks();
// the parent queues will be resizable, so register a resize subtask.
hash_grby_tcb->registerResizeSubtasks();
return (hash_grby_tcb);
};
//////////////////////////////////////////////////////////////////////////////
//
// TCB procedures
//
//////////////////////////////////////////////////////////////////////////////
//
// Constructor for hash_grby_tcb
//
ex_hash_grby_tcb::ex_hash_grby_tcb(const ex_hash_grby_tdb & hash_grby_tdb,
const ex_tcb & childTcb,
ex_globals * glob)
: ex_tcb( hash_grby_tdb, 1, glob),
space_(glob->getSpace()),
childTcb_(&childTcb),
bucketCount_(0),
buckets_(NULL),
rCluster_(NULL),
ofClusterList_(NULL),
lastOfCluster_(NULL),
clusterDb_(NULL),
matchCount_(0),
partialGroupbyMissCounter_(0),
state_(HASH_GRBY_EMPTY),
oldState_(HASH_GRBY_EMPTY),
readingChild_(FALSE),
hasFreeTupp_(FALSE),
hashValue_(0),
rc_(EXE_OK),
ioEventHandler_(NULL),
workAtp_(NULL),
tempUpAtp_(NULL),
resultPool_(NULL),
bitMuxTable_(NULL),
bitMuxBuffer_(NULL),
numIOChecks_(0),
defragTd_(NULL)
{
bmoStats_ = NULL;
hashGroupByStats_ = NULL;
heap_ = new (glob->getDefaultHeap()) NAHeap("Hash Groupby Heap", (NAHeap *)glob->getDefaultHeap());
// set the memory monitor
memMonitor_ = getGlobals()->castToExExeStmtGlobals()->getMemoryMonitor();
// Copy all expression pointers. This must be done first because
// some of the pointers are used subsequently (i.e. bitMuxExpr_) in
// branch conditions before fixup.
//
hashExpr_ = hash_grby_tdb.hashExpr_;
bitMuxExpr_ = hash_grby_tdb.bitMuxExpr_;
bitMuxAggrExpr_ = hash_grby_tdb.bitMuxAggrExpr_;
hbMoveInExpr_ = hash_grby_tdb.hbMoveInExpr_;
ofMoveInExpr_ = hash_grby_tdb.ofMoveInExpr_;
resMoveInExpr_ = hash_grby_tdb.resMoveInExpr_;
hbAggrExpr_ = hash_grby_tdb.hbAggrExpr();
ofAggrExpr_ = hash_grby_tdb.ofAggrExpr();
resAggrExpr_ = hash_grby_tdb.resAggrExpr();
havingExpr_ = hash_grby_tdb.havingExpr_;
moveOutExpr_ = hash_grby_tdb.moveOutExpr_;
hbSearchExpr_ = hash_grby_tdb.hbSearchExpr_;
ofSearchExpr_ = hash_grby_tdb.ofSearchExpr_;
// Ditto for Atp Indexes.
//
hbRowAtpIndex_ = hash_grby_tdb.hbRowAtpIndex_;
ofRowAtpIndex_ = hash_grby_tdb.ofRowAtpIndex_;
hashValueAtpIndex_ = hash_grby_tdb.hashValueAtpIndex_;
bitMuxAtpIndex_ = hash_grby_tdb.bitMuxAtpIndex_;
bitMuxCountOffset_ = hash_grby_tdb.bitMuxCountOffset_;
resultRowAtpIndex_ = hash_grby_tdb.resultRowAtpIndex_;
returnedAtpIndex_ = hash_grby_tdb.returnedAtpIndex_;
// allocate the buffer for the result rows. This is
// controlled by GEN_HGBY_NUM_BUFFERS CQD at compile time.
Lng32 numBuffers = ((hash_grby_tdb.numBuffers_ == 0) ? 5 : hash_grby_tdb.numBuffers_);
resultPool_ = new(space_)
sql_buffer_pool(numBuffers,
(Int32)hash_grby_tdb.bufferSize_,
space_);
if (hash_grby_tdb.considerBufferDefrag())
{
assert(hash_grby_tdb.useVariableLength());
defragTd_ = resultPool_->addDefragTuppDescriptor(hashGrbyTdb().resultRowLength_);
}
// Allocate an ExTrieTable for rows that map directly to groups using
// bitMux fast mapping.
if (bitMuxExpr_) {
// Try to get about 1Mb of memory for the Trie table.
Int32 memSize = 1000 * 1024;
bitMuxTable_ = new(space_)
ExBitMapTable((Int32) hashGrbyTdb().keyLength_,
(Int32) hashGrbyTdb().extGroupedRowLength_,
(Int32) bitMuxCountOffset_,
memSize,
space_);
bitMuxBuffer_ = (char *)space_->allocateMemory
((size_t) hashGrbyTdb().extGroupedRowLength_);
if (bitMuxTable_ && bitMuxBuffer_ &&
(bitMuxTable_->getMaximumNumberGroups() > 0)) {
// Initialize the bitMuxBuffer_ -- this must be done for now because
// the actual key extracted by the bitMux expression may be shorter
// that the grouped row length.
//
for (Int32 i=0; i<(Int32)hashGrbyTdb().extGroupedRowLength_; i++)
bitMuxBuffer_[i] = 0;
// Initialize the tupp descriptor for the bitMux buffer if
// bitMux'ing is on.
//
bitMuxTupp_.init((Int32)hashGrbyTdb().extGroupedRowLength_,
NULL,
bitMuxBuffer_);
}
else {
// If resource allocation fails for any reason, simply turn off
// bitMux'ing and free any of the resources that where allocated.
//
if (bitMuxTable_) {
delete bitMuxTable_;
bitMuxTable_ = 0;
}
if (bitMuxBuffer_) {
space_->deallocateMemory(bitMuxBuffer_);
bitMuxBuffer_ = 0;
}
bitMuxExpr_ = 0;
}
}
// For performance testing.
//
if(bitMuxExpr_ && !hbAggrExpr_ && !hashExpr_)
bitMuxExpr_ = NULL;
// get the queue that child use to communicate with me
childQueue_ = childTcb_->getParentQueue();
// Allocate the queues to communicate with parent (no pstate needed)
allocateParentQueues(parentQueue_,FALSE);
// Now, fixup the expressions
//
if (hashExpr_)
(void) hashExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, glob->computeSpace(), glob);
if (bitMuxExpr_) {
(void) bitMuxExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, glob->computeSpace(), glob);
if (bitMuxAggrExpr_)
(void) bitMuxAggrExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, glob->computeSpace(), glob);
}
if (hbMoveInExpr_)
(void) hbMoveInExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, glob->computeSpace(), glob);
if (ofMoveInExpr_)
(void) ofMoveInExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, glob->computeSpace(), glob);
if (resMoveInExpr_)
(void) resMoveInExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, glob->computeSpace(), glob);
if (hbAggrExpr_)
(void) hbAggrExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, glob->computeSpace(), glob);
if (ofAggrExpr_)
(void) ofAggrExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, glob->computeSpace(), glob);
if (resAggrExpr_)
(void) resAggrExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, glob->computeSpace(), glob);
if (havingExpr_)
(void) havingExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, glob->computeSpace(), glob);
if (moveOutExpr_)
(void) moveOutExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, glob->computeSpace(), glob);
if (hbSearchExpr_)
(void) hbSearchExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, glob->computeSpace(), glob);
if (ofSearchExpr_)
(void) ofSearchExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, glob->computeSpace(), glob);
// Allocate the ATP's that are used internally in the work
// methods.
//
workAtp_ = allocateAtp(hash_grby_tdb.workCriDesc_, space_);
tempUpAtp_ = allocateAtp(hash_grby_tdb.criDescUp_, space_);
// Initalize the tupp descriptor for the hash value.
//
hashValueTupp_.init(sizeof(SimpleHashValue),
NULL,
(char *) (&hashValue_));
// Allocate tupp descriptors for tupps in the workAtp. tupps 0 and 1
// are used for constants and temps. Don't allocate them here.
// The hashValue and bitMux tupps are part of the TCB so don't allocate
// them either.
// I.e., just allocate the tupp descriptors for the rows in the has
// buffer
workAtp_->getTupp(hash_grby_tdb.hbRowAtpIndex_) =
new(space_) tupp_descriptor;
workAtp_->getTupp(hash_grby_tdb.ofRowAtpIndex_) =
new(space_) tupp_descriptor;
// Set the hashValue and bitMux tupp's
//
workAtp_->getTupp(hash_grby_tdb.hashValueAtpIndex_) = &hashValueTupp_;
workAtp_->getTupp(hash_grby_tdb.bitMuxAtpIndex_) = &bitMuxTupp_;
};
///////////////////////////////////////////////////////////////////////////////
// Destructor for hash_grby_tcb
ex_hash_grby_tcb::~ex_hash_grby_tcb() {
delete parentQueue_.up;
delete parentQueue_.down;
freeResources();
delete heap_;
};
//////////////////////////////////////////////////////////////////////////
// Free Resources
//
void ex_hash_grby_tcb::freeResources() {
if (workAtp_) {
deallocateAtp(workAtp_, space_);
workAtp_ = NULL;
};
if (tempUpAtp_) {
deallocateAtp(tempUpAtp_, space_);
tempUpAtp_ = NULL;
};
while (ofClusterList_) {
Cluster * p = ofClusterList_->getNext();
delete ofClusterList_;
ofClusterList_ = p;
};
lastOfCluster_ = NULL;
// remove all clusters
if (clusterDb_) {
delete clusterDb_;
clusterDb_ = NULL;
};
if (resultPool_)
delete resultPool_;
resultPool_ = NULL;
if(bitMuxTable_) delete bitMuxTable_;
bitMuxTable_ = NULL;
if(bitMuxBuffer_) NADELETEBASIC(bitMuxBuffer_, space_);
bitMuxBuffer_ = NULL;
};
void ex_hash_grby_tcb::registerSubtasks()
{
ExScheduler *sched = getGlobals()->getScheduler();
ex_tcb :: registerSubtasks();
// Regsiter the I/O event
ioEventHandler_ = sched->registerNonQueueSubtask(sWork,this);
};
///////////////////////////////////////////////////////////////////////////////
// work: this is where the action is.
///////////////////////////////////////////////////////////////////////////////
short ex_hash_grby_tcb::work() {
// if no parent request, return
if (parentQueue_.down->isEmpty())
return WORK_OK;
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
const ex_queue::down_request & request =
downParentEntry->downState.request;
// loop forever; exit via return
while (TRUE) {
// Cancel request if parent cancelled and not already cancelled.
if ( state_ != HASH_GRBY_CANCELED && state_ != HASH_GRBY_DONE &&
request == ex_queue::GET_NOMORE )
setState(HASH_GRBY_CANCELED);
switch(state_) {
case HASH_GRBY_EMPTY: {
workInitialize();
} break;
case HASH_GRBY_READ_CHILD: {
if (childQueue_.up->isEmpty())
return WORK_OK;
if (parentQueue_.up->isFull())
return WORK_OK;
// in case of a partial hash grouping we might generate results
// even if we are in the read phase. Thus, we have to make sure that
// we have a free entry in the parent queue. And we have to allocate
// space in the result buffer
// ditto for no-aggregates (return results during workReadChild() )
if (hashGrbyTdb().isPartialGroup_ || ! hbAggrExpr_ ) {
if (!hasFreeTupp_) {
// allocate space for the result tuple
if (resultPool_->get_free_tuple(workAtp_->getTupp(resultRowAtpIndex_),
(Int32) hashGrbyTdb().resultRowLength_))
// not enough space in pool. Return for now
return WORK_POOL_BLOCKED;
else
hasFreeTupp_ = TRUE;
};
};
if (bmoStats_)
bmoStats_->setBmoPhase(PHASE_END-HGB_READ_PHASE);
// now read the next row from the child queue
workReadChild();
} break;
case HASH_GRBY_READ_OF_ROW: {
if (parentQueue_.up->isFull())
return WORK_OK;
// In case of no-aggregates (return results during workReadOFRow() )
// we may return up a row. Thus, we have to make sure that
// we have a free entry in the parent queue. And we have to allocate
// space in the result buffer
if ( ! hbAggrExpr_ && ! hasFreeTupp_ ) {
// allocate space for the result tuple
if (resultPool_->get_free_tuple(workAtp_->getTupp(resultRowAtpIndex_),
(Int32) hashGrbyTdb().resultRowLength_))
// not enough space in pool. Return for now
return WORK_POOL_BLOCKED;
else
hasFreeTupp_ = TRUE;
};
workReadOverFlowRow();
} break;
case HASH_GRBY_SPILL: {
workSpill();
if (state_ == HASH_GRBY_SPILL)
// I/O is not done yet
return WORK_CALL_AGAIN;
} break;
case HASH_GRBY_READ_BUFFER: {
if (bmoStats_ && (bmoStats_->getBmoPhase() == (PHASE_END-HGB_READ_PHASE)))
bmoStats_->setBmoPhase(PHASE_END-HGB_READ_SPILL_PHASE);
workReadBuffer();
if (state_ == HASH_GRBY_READ_BUFFER)
// I/O is not done yet
return WORK_CALL_AGAIN;
} break;
case HASH_GRBY_EVALUATE: {
workEvaluate();
} break;
case HASH_GRBY_RETURN_PARTIAL_GROUPS:
case HASH_GRBY_RETURN_BITMUX_ROWS:
case HASH_GRBY_RETURN_ROWS: {
ex_assert ( hbAggrExpr_ , "DISTINCT rows must be returned immediately");
// if we have given to the parent all the rows needed,
// we are done
if ((request == ex_queue::GET_N) &&
(downParentEntry->downState.requestValue <= (Lng32)matchCount_))
setState(HASH_GRBY_DONE);
else {
// make sure that we have space in the up queue
if (parentQueue_.up->isFull())
return WORK_OK;
NABoolean tryToDefrag = FALSE;
if (!hasFreeTupp_)
{
//if havingExpr_ is not null then we can preallocate the actual row size
// and the row get discarded from results (havingExpr_ discards it)
// but we can't use the pre-allocated space for the next row
if (!havingExpr_ &&
defragTd_ &&
!resultPool_->currentBufferHasEnoughSpace(hashGrbyTdb().resultRowLength_))
{
tryToDefrag = TRUE;
}
else
// allocate space for the result tuple
if (resultPool_->get_free_tuple(workAtp_->getTupp(resultRowAtpIndex_),
(Int32) hashGrbyTdb().resultRowLength_))
// not enough space in pool. return for now
return WORK_POOL_BLOCKED;
else
hasFreeTupp_ = TRUE;
};
if (bmoStats_ && (bmoStats_->getBmoPhase() == (PHASE_END-HGB_READ_PHASE)))
bmoStats_->setBmoPhase(PHASE_END-HGB_RETURN_PHASE);
// now we are ready to return a row
ULng32 retCode = workReturnRows(tryToDefrag);
// if retCode = 0. Do nothing. Continue in the current state.
// if retCode = 1 && state_ == HASH_GRBY_RETURN_BITMUX_ROWS
// -- set
// if retCode = 1 && state_ = HASH_GRBY_RETURN_PARTIAL_GROUPS
// -- delete the cluster (already done in workReturnRows())
// -- and re-allocate it.
// -- set the state_ to HASH_GRBY_READ_CHILD.
// if retCode = 1 && state_ = HASH_GRBY_RETURN_ROWS
// -- delete the cluster (already done in workReturnRows())
// -- and set the state_ to HASH_GRBY_EVALUATE.
// if retcode == 2 return WORK_POOL_BLOCKED
if (retCode == 2)
return WORK_POOL_BLOCKED;
if ( (retCode == 1) && (state_ == HASH_GRBY_RETURN_BITMUX_ROWS))
setState(HASH_GRBY_DONE);
if ( (retCode == 1) && (state_ == HASH_GRBY_RETURN_PARTIAL_GROUPS))
resetClusterAndReadFromChild();
if ( (retCode == 1) && (state_ == HASH_GRBY_RETURN_ROWS))
setState(HASH_GRBY_EVALUATE);;
};
} break;
case HASH_GRBY_DONE: {
// make sure that we have space in the up queue
if (parentQueue_.up->isFull())
return WORK_OK;
workDone();
if (parentQueue_.down->isEmpty())
return WORK_OK;
else
return WORK_CALL_AGAIN; // need to work on more requests
} break;
case HASH_GRBY_CANCELED: {
if (isReadingFromChild())
{
// If we are still reading from the child up queue,
// cancel the request.
queue_index parentIndex = parentQueue_.down->getHeadIndex();
childQueue_.down->cancelRequestWithParentIndex(parentIndex);
// Consume all rows from the child up queue. Note that
// we can temporarily exhaust this queue, and that we'll
// return to the scheduler and be re-dispatched when the
// child produces more rows. That means that the call above
// to cancelRequestWithParentIndex will be executed again,
// but this only wastes a little CPU because the child should
// finish its request and return Q_NO_DATA after its next
// dispatch assuming that it handle's cancel properly.
while (state_ == HASH_GRBY_CANCELED)
{
if (childQueue_.up->isEmpty())
return WORK_OK;
ex_queue_entry * childEntry = childQueue_.up->getHeadEntry();
ex_queue::up_status childStatus = childEntry->upState.status;
if (childStatus == ex_queue::Q_NO_DATA)
{
// When we are finished consuming all rows,
// change state to HASH_GRBY_DONE.
setState(HASH_GRBY_DONE);
doneReadingFromChild();
}
childQueue_.up->removeHead();
} // while
} // if
else
{
// On the other hand, the request to cancel might happen
// after we've read all child up queue rows. In that
// case, proceed immediately to HASH_GRBY_DONE.
setState(HASH_GRBY_DONE);
}
} break;
case HASH_GRBY_ERROR: {
// make sure that we have a free slot in the parent's up queue
if (parentQueue_.up->isFull()) {
return WORK_OK;
};
// we ran into a serious runtime error. Create Condition and
// pass it to parent. rc_ has the error code and
// oldState_ tells us in which state the error occured
ComDiagsArea * diags = NULL;
if(rc_ == EXE_SORT_ERROR)
{
char msg[512];
char errorMsg[100];
Lng32 scratchError = 0;
Lng32 scratchSysError = 0;
Lng32 scratchSysErrorDetail = 0;
if(clusterDb_)
clusterDb_->getScratchErrorDetail(scratchError,
scratchSysError,
scratchSysErrorDetail,
errorMsg);
str_sprintf(msg, "Hash GroupBy Scratch IO Error occurred. Scratch Error: %d, System Error: %d, System Error Detail: %d, Details: %s",
scratchError, scratchSysError, scratchSysErrorDetail, errorMsg);
diags = ExRaiseSqlError(heap_, downParentEntry,
(ExeErrorCode) -rc_,
NULL,
msg);
}
else
diags = ExRaiseSqlError(heap_, downParentEntry, (ExeErrorCode)-rc_);
downParentEntry->setDiagsArea(diags);
workHandleError(downParentEntry->getAtp());
} break;
};
};
};
/////////////////////////////////////////////////////////////////////////////
// work: initialize the data structures for the hash grouping
/////////////////////////////////////////////////////////////////////////////
void ex_hash_grby_tcb::workInitialize() {
// Initialize the BitMux table if it exists.
//
if(bitMuxTable_) bitMuxTable_->reset();
if (bmoStats_)
{
ex_hash_grby_tdb *hashGrbyTdb = (ex_hash_grby_tdb *)getTdb();
bmoStats_->setSpaceBufferSize(hashGrbyTdb->bufferSize_);
bmoStats_->setSpaceBufferCount(resultPool_->get_number_of_buffers());
}
ex_queue_entry * parentEntry = parentQueue_.up->getTailEntry();
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
ex_queue_entry * childEntry = childQueue_.down->getTailEntry();
childEntry->downState.request = ex_queue::GET_ALL;
childEntry->downState.requestValue = 11;
childEntry->downState.parentIndex = parentQueue_.down->getHeadIndex();
// copy the input atp to the tempUp atp for this request
tempUpAtp_->copyAtp(downParentEntry->getAtp());
// pass the tempUp atp as child's input atp
childEntry->passAtp(tempUpAtp_);
matchCount_ = 0;
childQueue_.down->insert();
inReadingFromChild();
// For now, we don't do cluster splits for hash grouping. Thus, we have
// just one bucket per cluster from the beginning
ULng32 bucketsPerCluster = 1;
ULng32 noOfClusters = 0;
// We use memUsagePercent_ of the physical memory for the HGB.
ULng32 availableMemory = memMonitor_->getPhysMemInBytes() / 100
* hashGrbyTdb().memUsagePercent_;
// Available memory should not exceed the quota (if set) because the initial
// hash tables allocated (one per CHAINED cluster) may consume a large
// portion of the available memory. (So to not exceed the quota)
if ( hashGrbyTdb().memoryQuotaMB() > 10 && // mem quota set? (10MB, 2 B safe)
hashGrbyTdb().memoryQuotaMB() * ONE_MEG < availableMemory )
availableMemory = hashGrbyTdb().memoryQuotaMB() * ONE_MEG ;
haveSpilled_ = FALSE;
ioTimer_.resetTimer();
if (!hashGrbyTdb().isPartialGroup_) {
// size of inner table (including row headers and hash chains) in bytes
// This may be a very large number, max out at 8 GB and take at
// least 100 KB. Don't completely trust the optimizer ;-)
// (With available memory usually at 100MB, that's a max of 80 clusters).
NAFloat innerTableSizeF = hashGrbyTdb().getEstRowsUsed() *
(NAFloat)(hashGrbyTdb().extGroupedRowLength_);
Int64 innerTableSize;
if (innerTableSizeF > MAX_INPUT_SIZE ) innerTableSize = MAX_INPUT_SIZE ;
else
innerTableSize =
MAXOF( MIN_INPUT_SIZE , (Int64)innerTableSizeF);
// required number of buffers for table
ULng32 totalResBuffers =
(ULng32) (innerTableSize/hashGrbyTdb().bufferSize_);
if (!totalResBuffers)
totalResBuffers++;
// total number of buffers available
ULng32 totalBuffers = availableMemory/hashGrbyTdb().bufferSize_;
noOfClusters = totalResBuffers/totalBuffers;
if (totalResBuffers % totalBuffers)
noOfClusters++;
// Aim at an average cluster's size to be a quarter of the available memory
// to leave some slack for underestimating or data skews
noOfClusters *= 4;
// round it up to the nearest prime number (3,5,7,11,13,....)
noOfClusters = ClusterDB::roundUpToPrime(noOfClusters);
// the extreme case, each cluster has only one bucket and only one buffer
ULng32 maxNoOfClusters = totalBuffers/bucketsPerCluster;
noOfClusters = MINOF(noOfClusters, maxNoOfClusters);
}
else {
// partial group by. we need only one cluster. If this cluster is full
// (we can't allocate additional buffers), we return the incomming row
// to the parent.
noOfClusters = 1;
// Memory to be used for pHGB.
//
// For Partial Groupby in ESP, use a limited amount of memory
// Based on CQD EXE_MEMORY_FOR_PARTIALHGB_MB.
// Default to 100 MB, minimum setting 10 MB
//
availableMemory = hashGrbyTdb().partialGrbyMemoryMB() * ONE_MEG ;
if(availableMemory == 0) {
availableMemory = 100 * ONE_MEG;
} else if(availableMemory < 10 * ONE_MEG) {
availableMemory = 10 * ONE_MEG;
}
// reset the counter for every run.
partialGroupbyMissCounter_ = 0;
};
// total number of buckets
bucketCount_ = bucketsPerCluster * noOfClusters;
// allocate the buckets
buckets_ = (Bucket *) heap_->allocateMemory(bucketCount_ * sizeof(Bucket));
ULng32 bucketIdx = 0;
for (; bucketIdx < bucketCount_; bucketIdx++)
buckets_[bucketIdx].init();
// if multiple inputs then don't yield memory below the original quota
ULng32 minMemQuotaMB = hashGrbyTdb().isPossibleMultipleCalls() ?
hashGrbyTdb().memoryQuotaMB() : 0 ;
ULng32 minB4Chk = hashGrbyTdb().getBmoMinMemBeforePressureCheck() * ONE_MEG;
clusterDb_ = new(heap_) ClusterDB(ClusterDB::HASH_GROUP_BY,
hashGrbyTdb().bufferSize_,
workAtp_,
hashGrbyTdb().getExplainNodeId(),
0, // not used for HGB
0, // not used for HGB
NULL, // not used for HGB
buckets_,
bucketCount_,
availableMemory,
memMonitor_,
hashGrbyTdb().pressureThreshold_,
getGlobals()->castToExExeStmtGlobals(),
&rc_,
hashGrbyTdb().isPartialGroup_, // no O/F
hashGrbyTdb().isPartialGroup_,
hashGrbyTdb().minBuffersToFlush_,
hashGrbyTdb().numInBatch_,
hashGrbyTdb().forceOverflowEvery(),
FALSE, // forceHashLoop
FALSE, // forceClusterSPlit
ioEventHandler_,
this, // the calling tcb
hashGrbyTdb().scratchThresholdPct_,
hashGrbyTdb().logDiagnostics(),
FALSE, // bufferedWrites
TRUE, // No early overflow based on hints
hashGrbyTdb().memoryQuotaMB() ,
minMemQuotaMB,
minB4Chk, // BmoMinMemBeforePressureCheck
// next 4 are for early overflow (not used)
0,
0,
0,
0,
hashGrbyTdb().initialHashTableSize_,
getStatsEntry()
);
if ( !clusterDb_ || rc_ != EXE_OK ) {
if ( !clusterDb_ ) rc_ = EXE_NO_MEM_TO_EXEC; // new() couldn't allocate
else delete clusterDb_;
setState(HASH_GRBY_ERROR);
return;
};
clusterDb_->setScratchIOVectorSize(hashGrbyTdb().scratchIOVectorSize());
switch(hashGrbyTdb().getOverFlowMode())
{
case SQLCLI_OFM_SSD_TYPE:
clusterDb_->setScratchOverflowMode(SCRATCH_SSD);
break;
case SQLCLI_OFM_MMAP_TYPE:
clusterDb_->setScratchOverflowMode(SCRATCH_MMAP);
break;
default:
case SQLCLI_OFM_DISK_TYPE:
clusterDb_->setScratchOverflowMode(SCRATCH_DISK);
break;
}
clusterDb_->setBMOMaxMemThresholdMB(hashGrbyTdb().getBMOMaxMemThresholdMB());
Cluster * cluster = NULL;
ULng32 i;
bucketIdx = 0;
// allocate the clusters
for (i = 0; i < noOfClusters; i++) {
cluster = new(heap_) Cluster(Cluster::CHAINED,
clusterDb_,
&buckets_[bucketIdx],
bucketsPerCluster,
hashGrbyTdb().extGroupedRowLength_,
hashGrbyTdb().useVariableLength(),
hashGrbyTdb().considerBufferDefrag(),
hashGrbyTdb().hbRowAtpIndex_,
TRUE,
FALSE, // not used for group by
cluster,
&rc_);
if ( !cluster || rc_ != EXE_OK ) {
// we could not allocate the Cluster
if ( !cluster ) rc_ = EXE_NO_MEM_TO_EXEC;
else delete cluster;
setState(HASH_GRBY_ERROR);
return;
};
bucketIdx += bucketsPerCluster;
};
// store head of the cluster list in the clusterDb_
clusterDb_->setClusterList(cluster);
// Initialize the group by state.
//
setState(HASH_GRBY_READ_CHILD);
};
/////////////////////////////////////////////////////////////////////////////
// returnResultCurrentRow(): Return the current row (as a group) to parent
// If dataPointer not null, then get the row from there (i.e. the hash buffer)
// Sets the state to CANCELED and returns if either:
// 1. Returned enough rows to satisty a GET_N request
// 2. An Error occurred
/////////////////////////////////////////////////////////////////////////////
void ex_hash_grby_tcb::returnResultCurrentRow(HashRow * dataPointer)
{
ex_queue_entry * upParentEntry = parentQueue_.up->getTailEntry();
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
// in case of GET_N : we are done if we returned enough
if ( downParentEntry->downState.request == ex_queue::GET_N &&
downParentEntry->downState.requestValue <= (Lng32)matchCount_ ) {
setState(HASH_GRBY_CANCELED);
return;
}
// move current row into the work atp
if ( NULL == dataPointer ) { // get current row from child's up-queue
// We cannot construct the result row dieectly using multiple
// expressions (initializeAggr, aggrExpr) if there are any varchar
// aggregates in Aligned Format (CIF). The second expression may
// need to change the size of the varchar and this would require
// rewriting and resizing the whole tuple. This issue is solved in
// the HashBuffer by forcing these varchar aggregates to be fixed
// fields. However, we do not do this for the return row to limit
// the use of 'forceFixed' to the internal buffers of HashGroupby.
// The solution here is to construct the return row as a
// HashBuffer row and then copy this HashBuffer row to the result
// using one expression. This means that we need three
// expressions (2 to construct the HashBuffer row and one to move
// it to the result).
//
// TDB - Change this to use one new expression which constructs
// the result row in one shot.
//
char buffer[hashGrbyTdb().extGroupedRowLength_];
workAtp_->getTupp(hbRowAtpIndex_).setDataPointer(buffer);
ex_queue_entry * childEntry = childQueue_.up->getHeadEntry();
// move grouping columns from child to the result buffer
if ( hbMoveInExpr_->eval(childEntry->getAtp(), workAtp_)
== ex_expr::EXPR_ERROR ) {
workHandleError(childEntry->getAtp());
return;
}
// when called from EID (with aggr) - turn curr row into a partial group
if (hbAggrExpr_) {
hbAggrExpr_->initializeAggr(workAtp_);
if ((hbAggrExpr_->perrecExpr()) &&
(hbAggrExpr_->perrecExpr()->eval(childEntry->getAtp(), workAtp_) ==
ex_expr::EXPR_ERROR)) {
workHandleError(childEntry->getAtp());
return;
}
}
// Move this row/group from the hash buffer to the result buffer
if (moveOutExpr_->eval(workAtp_, workAtp_) == ex_expr::EXPR_ERROR) {
workHandleError(workAtp_);
return;
}
} else { // take row from hash buffer
// The expression does not expect the HashRow to be part of the row.
// Adjust the datapointer in the work atp to point beyond the HashRow.
workAtp_->getTupp(hbRowAtpIndex_).setDataPointer(dataPointer->getData());
// Move this row/group from the hash buffer to the result buffer
if (moveOutExpr_->eval(workAtp_, workAtp_) == ex_expr::EXPR_ERROR) {
workHandleError(workAtp_);
return;
}
}
// copy the input to this node to up queue
upParentEntry->copyAtp(downParentEntry->getAtp());
atp_struct * atp = upParentEntry->getAtp();
// and assign the tupp to the parents queue entry
atp->getTupp(returnedAtpIndex_) = workAtp_->getTupp(resultRowAtpIndex_);
// Before returning the row -- check HAVING clause for that row
if ( havingExpr_ ) {
// Normally there should be no Having-expression, since this is either
// a partial leaf (i.e., aggr can only be done at the root) or a distinct
// (i.e. no aggregation). But in some rare cases the compiler generates
// such an expression (it should not; it should push the predicate down!)
// ( soln 10-090706-2833 )
switch ( havingExpr_->eval(atp, 0) ) {
case ex_expr::EXPR_ERROR :
workHandleError(atp);
return;
case ex_expr::EXPR_TRUE :
// passed the check -- continue
break;
default: // the HAVING predicate failed; release the tuple
atp->release();
// need to do anything else ??
return;
}
}
upParentEntry->upState.status = ex_queue::Q_OK_MMORE;
upParentEntry->upState.parentIndex
= downParentEntry->downState.parentIndex;
upParentEntry->upState.downIndex = parentQueue_.down->getHeadIndex();
// if stats are to be collected, collect them.
ExOperStats *statsEntry = getStatsEntry();
if (statsEntry)
statsEntry->incActualRowsReturned();
if (hashGroupByStats_)
hashGroupByStats_->incPartialGroupsReturned();
// we had a free tupp in the result buffer and use it now
hasFreeTupp_ = FALSE;
// we can forget about this tupp now
workAtp_->getTupp(resultRowAtpIndex_).release();
// we got another result row
matchCount_++;
upParentEntry->upState.setMatchNo(matchCount_);
// insert result into parent up queue
parentQueue_.up->insert();
}
/////////////////////////////////////////////////////////////////////////////
// workRead: read row from child and group it into the hash buffer
/////////////////////////////////////////////////////////////////////////////
void ex_hash_grby_tcb::workReadChild() {
// If the bitMux expression is set do bitMuxing. If workReadChildBitMux()
// returns a non-zero value, then bitMuxing failed so continue with the
// standard hashing scheme.
//
if( bitMuxTable_ && !workReadChildBitMux() ) return;
ex_queue_entry * childEntry = childQueue_.up->getHeadEntry();
switch (childEntry->upState.status) {
case ex_queue::Q_OK_MMORE: {
if (oldState_ != HASH_GRBY_SPILL)
{
if (hashGrbyTdb().isPartialGroup_ &&
hashGrbyTdb().isPassPartialRows()) {
returnResultCurrentRow();
// remove the row from the child's queue
childQueue_.up->removeHead();
break;
}
// calculate the hash value for this child row. After the evaluation,
// the hash value is available in hashValue_. The calculation is done
// only, if we don't come back from spilling a cluster. In this case,
// the insertion of the row was not sucsessfull. Therefore, we
// have calculated the hash value already
if (hashExpr_->eval(childEntry->getAtp(), workAtp_)
== ex_expr::EXPR_ERROR)
{
workHandleError(childEntry->getAtp());
return;
}
hashValue_ = hashValue_ & MASK31;
}
else
// reset oldState_
oldState_ = HASH_GRBY_READ_CHILD;
// determine the bucket and cluster where the row belongs
ULng32 bucketId = hashValue_ % bucketCount_;
Cluster * cluster = buckets_[bucketId].getInnerCluster();
HashTable * ht = cluster->getHashTable();
ex_assert (ht, "ex_hash_grby_tcb::work() cluster without hash table");
// find out if there is already an existing group for this row
ht->position(&cursor_,
childEntry->getAtp(),
workAtp_,
hbRowAtpIndex_,
hbSearchExpr_,
hashValue_);
HashRow * hashRow = ht->getNext(&cursor_);
if (hashRow) {
// the group exists already. Aggregate the child row into the group
if (hbAggrExpr_) {
// The expression does not expect the HashRow to be part of the row.
// Adjust the datapointer in the work atp to point beyond the HashRow.
workAtp_->getTupp(hbRowAtpIndex_).setDataPointer(hashRow->getData());
if ((hbAggrExpr_->perrecExpr()) &&
(hbAggrExpr_->perrecExpr()->eval(childEntry->getAtp(), workAtp_) ==
ex_expr::EXPR_ERROR))
{
workHandleError(childEntry->getAtp());
return;
}
// Reset the "missed to find a group in the exisitng hash table",
// since we are looking for consecutive misses before we
// clear (send to parent) the hash table.
partialGroupbyMissCounter_ = 0;
};
}
else {
// the row belongs to a new group. Initialize a new group in the
// hash table
if (buckets_[bucketId].insert(childEntry->getAtp(),
hbMoveInExpr_,
hashValue_,
&rc_)) {
// row is inserted. Initialize group and aggregate row
if (hbAggrExpr_) {
hbAggrExpr_->initializeAggr(workAtp_);
if ((hbAggrExpr_->perrecExpr()) &&
(hbAggrExpr_->perrecExpr()->eval(childEntry->getAtp(), workAtp_) ==
ex_expr::EXPR_ERROR))
{
workHandleError(childEntry->getAtp());
return;
}
} else // (distinct) New group, no aggregates
if ( cluster->getState() == Cluster::CHAINED ) // not spilled yet
returnResultCurrentRow(cluster->getLastDataPointer()); // return this row now up to the parent
}
else {
// we couldn't insert the row. If we got an error, handle it.
// EXE_NO_MEM_TO_EXEC is not an error
if (rc_ && !(rc_ == EXE_NO_MEM_TO_EXEC)) {
setState(HASH_GRBY_ERROR);
return;
};
// we ran out of memory. Handle this situation. In case of a
// partial grouping we return the row as a new group to the parent.
// In case of a regular group by this means we have to spill a
// cluster
rc_ = EXE_OK;
if (hashGrbyTdb().isPartialGroup_) {
// missed to find a group in the existing hash table.
// increment the overflow counter and send the row up.
partialGroupbyMissCounter_++;
returnResultCurrentRow(); // Overflow in EID, pass this row up
if (hbAggrExpr_ && (partialGroupbyMissCounter_ > hashGrbyTdb().partialGrbyFlushThreshold_))
{
// We need to do the following:
// (1) pass the rows in the current cluster
// to the parent and clear the cluster to make
// space for new groups.
// (2) Clears and re-initializes the cluster, so that
// new partial groups can be created.
rCluster_ = cluster;
// position on the first row in the cluster
rCluster_->positionOnFirstRowInBuffer();
setState(HASH_GRBY_RETURN_PARTIAL_GROUPS);
// reset the counter
partialGroupbyMissCounter_ = 0;
}
}
else {
// it is a regular group by and we couldn't insert the group. We
// have to spill a cluster
setState(HASH_GRBY_SPILL);
return;
};
};
};
// remove the row from the child's queue
childQueue_.up->removeHead();
} break;
case ex_queue::Q_NO_DATA: {
// we are done with reading the child rows.
setState(HASH_GRBY_EVALUATE);
doneReadingFromChild();
// remove the row from the child's queue
childQueue_.up->removeHead();
if ( hashGrbyTdb().logDiagnostics() ) { // LOG
// Report memory quota allocation grabbing while reading child rows
if ( clusterDb_->memoryQuotaMB() > hashGrbyTdb().memoryQuotaMB() ) {
char msg[512];
sprintf(msg, "HGB End reading input (%u). GRABBED additional quota %u MB",
0,
clusterDb_->memoryQuotaMB() -
hashGrbyTdb().memoryQuotaMB());
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, tdb.getExplainNodeId());
}
// if had overflow, then provide some data about the clusters
if ( haveSpilled_ ) {
ULng32 ind, numSpilled = 0;
Int64 nonSpilledSize = 0, spilledSize = 0;
Int64 maxSpilledSize = 0, minSpilledSize = -1;
Int64 maxNonSpilledSize = 0, minNonSpilledSize = -1;
// traverse all clusters
for ( ind = 0; ind < bucketCount_; ind++ ) {
Cluster * cluster = buckets_[ind].getInnerCluster();
Int64 clusterSize = cluster->getRowCount() *
hashGrbyTdb().extGroupedRowLength_ ;
if ( cluster->getState() == Cluster::FLUSHED ) {
spilledSize += clusterSize;
numSpilled++;
if ( minSpilledSize == -1 || minSpilledSize > clusterSize )
minSpilledSize = clusterSize;
if ( maxSpilledSize < clusterSize )
maxSpilledSize = clusterSize;
} else { // non spilled
nonSpilledSize += clusterSize;
if ( minNonSpilledSize == -1 || minNonSpilledSize > clusterSize )
minNonSpilledSize = clusterSize;
if ( maxNonSpilledSize < clusterSize )
maxNonSpilledSize = clusterSize;
}
}
char msg[512];
Int64 MB = 1024 * 1024 ;
Lng32 nonSpilledSizeMB = (Lng32) (nonSpilledSize / MB) ,
spilledSizeMB = (Lng32) (spilledSize / MB),
maxSpilledSizeMB = (Lng32) (maxSpilledSize / MB),
minSpilledSizeMB = (Lng32) (minSpilledSize / MB),
maxNonSpilledSizeMB = (Lng32) (maxNonSpilledSize / MB),
minNonSpilledSizeMB = (Lng32) (minNonSpilledSize / MB);
if ( bucketCount_ == numSpilled )
sprintf(msg,
"HASH GRBY finished reading input; all %d clusters spilled; size in MB - total: %d , average: %d , max: %d , min: %d , variable length records: %s",
numSpilled, spilledSizeMB, spilledSizeMB / numSpilled ,
maxSpilledSizeMB, minSpilledSizeMB,
hashGrbyTdb().useVariableLength() ? "y" : "n");
else
sprintf(msg,
"HASH GRBY finished reading input; #Clusters total: %u , spilled: %d ; Spilled size in MB - total: %d , average: %d , max: %d , min: %d ; NonSpilled size - total: %d , average: %d , max: %d , min: %d , variable length records: %s",
bucketCount_, numSpilled, spilledSizeMB,
spilledSizeMB / numSpilled ,
maxSpilledSizeMB, minSpilledSizeMB,
nonSpilledSizeMB,
nonSpilledSizeMB / (bucketCount_ - numSpilled) ,
maxNonSpilledSizeMB, minNonSpilledSizeMB,
hashGrbyTdb().useVariableLength() ? "y" : "n");
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg,
hashGrbyTdb().getExplainNodeId());
}
} // if logDiagnostics
// Finished reading input; try and yield some memory allocated back to
// global count of unused memory, so that other BMOs may use this memory
clusterDb_->yieldUnusedMemoryQuota(ofClusterList_, bucketCount_ );
} break;
case ex_queue::Q_SQLERROR: {
// Pass the diagnostics area to the up queue and
// set the state to HASH_GRBY_CANCELED.
workHandleError(childEntry->getAtp());
} break;
case ex_queue::Q_INVALID: {
ex_assert(0, "ex_hash_grby_tcb::work() invalid state returned by child");
} break;
};
};
// workReadChildBitMux
//
Int32 ex_hash_grby_tcb::workReadChildBitMux() {
ex_queue_entry * childEntry;
ex_queue_entry * upParentEntry = parentQueue_.up->getTailEntry();
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
do {
// Get a handle on the child queue's head entry
//
childEntry = childQueue_.up->getHeadEntry();
if(childEntry->upState.status != ex_queue::Q_OK_MMORE)
return 1;
// Use the bitMux expression to extract the binary key for the grouping
// into the bitMux tupp. This tupp references the bitMuxBuffer_.
//
if(bitMuxExpr_) {
if (bitMuxExpr_->eval(childEntry->getAtp(), workAtp_)
== ex_expr::EXPR_ERROR) {
// Error returned as a result eval.
// pass the diagnostics area to the parents up queue.
// and set the state_ to HASH_GRBY_CANCELED.
workHandleError(childEntry->getAtp());
return 0;
}
}
// Attempt to find a match for the data in the bitMuxBuffer_
// in the bitMux table.
//
Int32 result = bitMuxTable_->findOrAdd(bitMuxBuffer_);
// Set the hbRowAtpIndex tupp to reference the found group
//
workAtp_->getTupp(hbRowAtpIndex_).setDataPointer(bitMuxTable_->getData());
// If the group could not be added to the table, then revert to
// standard hashing. This is indicated by returning non-zero
// to the caller.
//
if(result == 0) return 1;
// Otherwise, if the group did not exist before and was added to
// the table, the group must be initialized.
//
else if(result < 0) {
// Move the data to the new grouped row
//
if(hbMoveInExpr_) {
if(hbMoveInExpr_->eval(childEntry->getAtp(), workAtp_) ==
ex_expr::EXPR_ERROR) {
workHandleError(childEntry->getAtp());
return 0;
}
}
// Initialize the aggregate for the new grouped row.
//
if(bitMuxAggrExpr_)
((AggrExpr *)(bitMuxAggrExpr_))->initializeAggr(workAtp_);
// New group and no aggregation -- return this row now!
// ( Note: Check for "no aggregation" by testing hbAggrExpr_ and not
// bitMuxAggrExpr_ because the COUNT() aggregates are missing there )
if ( ! hbAggrExpr_ ) {
// return up this row/group now;
returnResultCurrentRow() ;
// if GET_N was fulfilled (or an error) then quit
if ( state_ == HASH_GRBY_CANCELED ) return 0 ;
// We are returning rows as they are read; so stop if up-queue is full
if (parentQueue_.up->isFull()) {
childQueue_.up->removeHead(); // this row was consumed
return 0;
}
// Free entry in parent queue was consumed; allocate a new one
if (resultPool_->get_free_tuple(workAtp_->getTupp(resultRowAtpIndex_),
(Int32) hashGrbyTdb().resultRowLength_)) {
// not enough space in pool. This would be checked again before the
// next call to workReadChild(), and then a WORK_POOL_BLOCKED would
// be returned by work()
childQueue_.up->removeHead(); // this row was consumed
return 0;
}
else
hasFreeTupp_ = TRUE;
}
}
// Apply the aggregate expression to the grouped row (which is
// now referenced by the work Atp).
//
if(bitMuxAggrExpr_) {
if(bitMuxAggrExpr_->eval(childEntry->getAtp(), workAtp_) ==
ex_expr::EXPR_ERROR) {
workHandleError(childEntry->getAtp());
return 0;
}
}
// We have consumed the row from the child. Advance to the next
// entry in the child queue.
//
childQueue_.up->removeHead();
// Repeat until the child queue is empty
//
} while (!childQueue_.up->isEmpty());
// If we were able to handle the row using the bitMux buffer, then
// we are done so return 0 to indicate to caller that no hashing needs
// to be done.
//
return 0;
}
/////////////////////////////////////////////////////////////////////////////
// read rows from the overflow buffer and aggregate them into the
// hash buffer
/////////////////////////////////////////////////////////////////////////////
void ex_hash_grby_tcb::workReadOverFlowRow() {
Cluster * oCluster = clusterDb_->getClusterList()->getOuterCluster();
HashRow * ofDataPointer;
NABoolean isRowFromBuffer = FALSE;
if (oldState_ == HASH_GRBY_SPILL)
// we had to spill a cluster. Thus, the last row is still in the workAtp.
// process it now
ofDataPointer =
(HashRow *)(workAtp_->getTupp(ofRowAtpIndex_).getDataPointer() - sizeof(HashRow));
else
// advance to the next row only, if we really have processed the last
// one.
ofDataPointer = oCluster->advance();
if (ofDataPointer) {
// get the hash value of this row and change it. Whenever
// we read a row more than once, we have to change the hash value.
// Otherwise, the row would always end up in the same bucket.
if (oldState_ != HASH_GRBY_SPILL)
{
// the hash-values are now re-spread over up to 2^10 new buckets
SimpleHashValue hv = ofDataPointer->hashValue() ;
SimpleHashValue delta = (hv & 0x03ff00) >> 8 ;
hashValue_ = hv + delta ;
hashValue_ = hashValue_ & MASK31;
isRowFromBuffer = TRUE;
}
else
oldState_ = HASH_GRBY_READ_OF_ROW;
// determine the bucket and cluster where the row belongs
ULng32 bucketId = hashValue_ % bucketCount_;
Cluster * cluster = buckets_[bucketId].getInnerCluster();
HashTable * ht = cluster->getHashTable();
ex_assert (ht, "ex_hash_grby_tcb::work() cluster without hash table");
// find out if there is already an existing group for this row
// The expression does not expect the HashRow to be part of the row.
// Adjust the datapointer in the work atp to point beyond the HashRow.
workAtp_->getTupp(ofRowAtpIndex_).setDataPointer(ofDataPointer->getData());
ht->position(&cursor_,
workAtp_,
workAtp_,
hbRowAtpIndex_,
ofSearchExpr_,
hashValue_);
HashRow * hashRow = ht->getNext(&cursor_);
if (hashRow) {
// the group exists already. Aggregate the child row into the group
if (ofAggrExpr_ && ofAggrExpr_->perrecExpr()) {
// The expression does not expect the HashRow to be part of the row.
// Adjust the datapointer in the work atp to point beyond the HashRow.
workAtp_->getTupp(hbRowAtpIndex_).setDataPointer(hashRow->getData());
if (ofAggrExpr_->perrecExpr()->eval(workAtp_, workAtp_) == ex_expr::EXPR_ERROR)
{
workHandleError(workAtp_);
return;
}
};
}
else { // there is no group for this row yet. Start a new group
// (when no-aggr) can't let a secondary overflow happen before all the
// tail part of the outer was read. (extremely unlikely case; only if
// the memory available shrinks by much during execution of this node.)
NABoolean doNotOverflow = ! hbAggrExpr_ &&
! oCluster->returnOverflowRows() ;
if (buckets_[bucketId].insert(workAtp_,
ofMoveInExpr_,
hashValue_,
&rc_,
doNotOverflow )) {
// row is inserted. Initialize group and aggregate row
if (hbAggrExpr_)
hbAggrExpr_->initializeAggr(workAtp_);
else
// no aggr: then return now the row that was just inserted, but only
// if we now read the unreturned part (i.e., the head of the list)
// and no secondary overflow (i.e., the inner hasn't O.F. yet)
if ( cluster->getState() == Cluster::CHAINED && // inner not spilled
oCluster->returnOverflowRows() ) // reading HEAD from outer
returnResultCurrentRow(cluster->getLastDataPointer()) ;
if ((ofAggrExpr_) &&
(ofAggrExpr_->perrecExpr()) &&
(ofAggrExpr_->perrecExpr()->eval(workAtp_, workAtp_) == ex_expr::EXPR_ERROR))
{
workHandleError(workAtp_);
return;
}
}
else {
if (rc_) {
setState(HASH_GRBY_ERROR);
return;
};
// we couldn't insert the row. We have to spill a cluster
setState(HASH_GRBY_SPILL);
// Secondary spill (spill while reading from an overflown cluster)
if ( hashGrbyTdb().logDiagnostics() && isRowFromBuffer ) {
char msg[256];
sprintf(msg, "HASH GRBY secondary spill" );
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg,
hashGrbyTdb().getExplainNodeId());
}
};
};
}
else {
// we have processed all rows of the current overflow buffer. Advance
// to the next overflow buffer of this cluster
if (!oCluster->endOfCluster()) {
clusterDb_->setOuterClusterToRead(oCluster, &rc_);
setState(HASH_GRBY_READ_BUFFER);
return;
};
// no more rows in this cluster to process.
// before we release the cluster, we collect some stats about it
if (hashGroupByStats_)
hashGroupByStats_->
addClusterStats(oCluster->getStats(hashGroupByStats_->getHeap()));
delete oCluster;
// Soln 10-100607-0922: Nullify pointer to the deleted oCluster to avoid
// the possibility of deallocating that cluster again
clusterDb_->getClusterList()->setOuterCluster(NULL);
setState(HASH_GRBY_EVALUATE);
};
};
/////////////////////////////////////////////////////////////////////////////
// spill a cluster
/////////////////////////////////////////////////////////////////////////////
void ex_hash_grby_tcb::workSpill() {
Cluster * cluster = clusterDb_->getClusterToFlush();
if (!cluster) {
// we could not find a cluster to spill. The only
// reson for this situation is that we decided to
// spill even before we were able to allocate one
// hashbuffer in any of the clusters. I.e., we are
// very, very tight on memory. We have to give up.
rc_ = EXE_NO_MEM_TO_EXEC;
setState(HASH_GRBY_ERROR);
return;
};
if (cluster->spill(&rc_, ! hbAggrExpr_ )) {
// all I/Os are done, go back to the state were we came from
setState(oldState_);
clusterDb_->setClusterToFlush(NULL);
if ( hashGrbyTdb().logDiagnostics() ) {
Int64 elapsedIOTime = ioTimer_.endTimer(); // stop timing, get current total
if (!haveSpilled_) {
char msg[256];
sprintf(msg,
"HASH GRBY finished first spill, numChecks: %d., elapsed time = " PF64,
numIOChecks_, elapsedIOTime );
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg,
hashGrbyTdb().getExplainNodeId());
haveSpilled_ = TRUE;
}
}
}
else {
numIOChecks_++;
if ( hashGrbyTdb().logDiagnostics() ) {
ioTimer_.startTimer(); // start accumulating time
}
if (rc_)
setState(HASH_GRBY_ERROR);
};
};
/////////////////////////////////////////////////////////////////////////////
// read one overflow buffer
/////////////////////////////////////////////////////////////////////////////
void ex_hash_grby_tcb::workReadBuffer() {
Cluster * oCluster = clusterDb_->getClusterToRead();
if (oCluster->read(&rc_)) {
// the I/O is complete, position on the first row of the buffer
oCluster->positionOnFirstRowInFirstOuterBuffer();
setState(HASH_GRBY_READ_OF_ROW);
clusterDb_->setClusterToRead(NULL);
}
else {
if (rc_)
setState(HASH_GRBY_ERROR);
};
};
/////////////////////////////////////////////////////////////////////////////
// workEvaluate: prepare clusters for next phase
/////////////////////////////////////////////////////////////////////////////
void ex_hash_grby_tcb::workEvaluate() {
Cluster * cluster = clusterDb_->getClusterList();
while(cluster) {
if (cluster->getState() == Cluster::CHAINED) {
if ( cluster->getRowCount() && hbAggrExpr_ ) {
// This cluster is CHAINED. remove it from the clusterList and
// return all groups in this cluster to the parent
clusterDb_->setClusterList(cluster->getNext());
cluster->setNext(NULL);
rCluster_ = cluster;
// position on the first row in the cluster
rCluster_->positionOnFirstRowInBuffer();
// and return the rows
setState(HASH_GRBY_RETURN_ROWS);
return;
}
else {
// No aggregation, so this cluster was already returned to parent or
// this cluster contains no groups. so forget about it
clusterDb_->setClusterList(cluster->getNext());
// before we release the cluster, we collect some stats about it
if (hashGroupByStats_)
hashGroupByStats_->
addClusterStats(cluster->getStats(hashGroupByStats_->getHeap()));
delete cluster;
}
}
else {
// the cluster is FLUSHED/SPILLED.
// if we haven't done it yet, write the last buffer to temporary
// file
if (cluster->getRowsInBuffer()) {
clusterDb_->setClusterToFlush(cluster);
setState(HASH_GRBY_SPILL);
return;
};
//Remove it from the clusterList and add
// it to the overFlowList.
clusterDb_->setClusterList(cluster->getNext());
if ( ! hbAggrExpr_ )
// For non-aggr: Start reading the already returned rows first
cluster->initializeReadPosition();
cluster->setNext(NULL);
if (!ofClusterList_)
ofClusterList_ = cluster;
else
lastOfCluster_->setNext(cluster);
lastOfCluster_ = cluster;
// delete the hash table and buffer for this cluster, we don't
// need it anymore
cluster->removeHashTable();
cluster->releaseAllHashBuffers();
};
cluster = clusterDb_->getClusterList();
};
// all clusters are now either processed or on the overFlowList.
// Before handling another spilled cluster, try and yield some memory
// allocated back to global count of unused memory, so that other BMOs
// may use this memory
clusterDb_->yieldUnusedMemoryQuota(ofClusterList_, bucketCount_ );
if (ofClusterList_) {
// take the first cluster form the overflow list and process
// it now, i.e., use it as input.
Cluster * inputCluster = ofClusterList_;
ofClusterList_ = ofClusterList_->getNext();
inputCluster->setNext(NULL);
ULng32 i;
// re-initialize all buckets
for (i = 0; i < bucketCount_; i++)
buckets_[i].init();
// allocate new clusters. Currently, a cluster
// contains always one bucket. This might change in the future?
ULng32 bucketIdx = 0;
ULng32 noOfClusters = bucketCount_;
ULng32 bucketsPerCluster = bucketCount_/noOfClusters;
Cluster * newCluster = NULL;
for (i = 0; i < noOfClusters; i++) {
newCluster = new(heap_) Cluster(Cluster::CHAINED,
clusterDb_,
&buckets_[bucketIdx],
bucketsPerCluster,
hashGrbyTdb().extGroupedRowLength_,
hashGrbyTdb().useVariableLength(),
hashGrbyTdb().considerBufferDefrag(),
hashGrbyTdb().hbRowAtpIndex_,
TRUE,
FALSE, // not used for group by
newCluster,
&rc_);
if ( !newCluster || rc_ != EXE_OK ) {
// we could not allocate the Cluster
if ( !newCluster ) rc_ = EXE_NO_MEM_TO_EXEC;
else delete newCluster;
setState(HASH_GRBY_ERROR);
return;
};
bucketIdx += bucketsPerCluster;
};
// store head of the cluster list in the clusterDb_
clusterDb_->setClusterList(newCluster);
// The inputCluster is to be marked as an "outer" so that its buffers
// would be read one at a time (for inner, all buffers are read at once.)
// the input rows come from the outer cluster of the first
// cluster in the cluster list. Set this outer cluster accordingly.
newCluster->setOuterCluster(inputCluster);
// read the first buffer of the input
clusterDb_->setOuterClusterToRead(inputCluster, &rc_);
if ( rc_ ) {
setState(HASH_GRBY_ERROR);
return;
}
setState(HASH_GRBY_READ_BUFFER);
return;
}
// no more clusters to process. We are done unless there are bitmux
// rows to return (and aggregation is used, so these rows weren't returned)
if ( bitMuxTable_ && hbAggrExpr_ )
setState(HASH_GRBY_RETURN_BITMUX_ROWS);
else
setState(HASH_GRBY_DONE);
};
/////////////////////////////////////////////////////////////////////////////
// workReturnRows: return rows to parent
/////////////////////////////////////////////////////////////////////////////
ULng32 ex_hash_grby_tcb::workReturnRows(NABoolean tryToDefrag) {
HashRow * dataPointer = NULL;
ExOperStats *statsEntry = getStatsEntry();
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
// If we are returning rows from the bitmux buffer, then search through
// the bitmux pointers for the first return row.
//
if(state_ == HASH_GRBY_RETURN_BITMUX_ROWS) {
dataPointer = (HashRow *)(bitMuxTable_->getReturnGroup());
bitMuxTable_->advanceReturnGroup();
}
// Otherwise, we are returning rows from the cluster so advance to the
// next row.
//
else {
if (!tryToDefrag)
{
dataPointer = rCluster_->advance();
}
else
{// don' t advance in this case
// we may exit the proc without procesing if we don't get a free tuple
dataPointer = rCluster_->getCurrentRow();
}
}
while (dataPointer) {
// The expression does not expect the HashRow to be part of the row.
// Adjust the datapointer in the work atp to point beyond the HashRow.
workAtp_->getTupp(hbRowAtpIndex_).setDataPointer(dataPointer->getData());
// this is a result group. Move it from the hash buffer to the
// result buffer
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
ex_queue_entry * upParentEntry = parentQueue_.up->getTailEntry();
UInt32 resMaxRowLen = hashGrbyTdb().resultRowLength_;
UInt32 rowLen = resMaxRowLen;
UInt32 *rowLenPtr = &rowLen;
if (tryToDefrag)
{
#if defined(_DEBUG)
assert(hasFreeTupp_ == FALSE && !havingExpr_);
#endif
workAtp_->getTupp(resultRowAtpIndex_)= defragTd_;
defragTd_->setReferenceCount(1);
if (moveOutExpr_->eval(workAtp_, workAtp_, 0, -1, rowLenPtr) == ex_expr::EXPR_ERROR)
{
workHandleError(workAtp_);
return 0;
}
if (resultPool_->get_free_tuple(workAtp_->getTupp(resultRowAtpIndex_),*rowLenPtr))
return 2 ; //WORK_POOL_BLOCKED;
#if (defined(_DEBUG))
char txt[] = "hashgrpby";
SqlBuffer * buf = resultPool_->getCurrentBuffer();
sql_buffer_pool::logDefragInfo(txt,
SqlBufferGetTuppSize(hashGrbyTdb().resultRowLength_,buf->bufType()),
SqlBufferGetTuppSize(*rowLenPtr,buf->bufType()),
buf->getFreeSpace(),
buf,
buf->getTotalTuppDescs());
#endif
//advance now
rCluster_->advance();
char * defragDataPointer = defragTd_->getTupleAddress();
char * destDataPointer = workAtp_->getTupp(resultRowAtpIndex_).getDataPointer();
str_cpy_all(destDataPointer,
defragDataPointer,
*rowLenPtr);
}
else
if (moveOutExpr_->eval(workAtp_, workAtp_, 0, -1, rowLenPtr) == ex_expr::EXPR_ERROR)
{
workHandleError(workAtp_);
return 0;
}
upParentEntry->copyAtp(downParentEntry->getAtp());
atp_struct * atp = upParentEntry->getAtp();
// and assign the tupp to the parents queue entry
atp->getTupp(returnedAtpIndex_) =
workAtp_->getTupp(resultRowAtpIndex_);
ex_expr::exp_return_type evalRetCode = ex_expr::EXPR_TRUE;
if (havingExpr_) {
evalRetCode = havingExpr_->eval(atp, 0);
}
if ( evalRetCode == ex_expr::EXPR_TRUE ) { // e.g. havingExpr_ == NULL
// Resize the row if the actual size is different from the max size (resMaxRowLen)
if(rowLen != resMaxRowLen) {
resultPool_->resizeLastTuple(rowLen,
workAtp_->getTupp(resultRowAtpIndex_).getDataPointer());
}
// if stats are to be collected, collect them.
if (statsEntry) {
statsEntry->incActualRowsReturned();
}
upParentEntry->upState.status = ex_queue::Q_OK_MMORE;
upParentEntry->upState.parentIndex = downParentEntry->downState.parentIndex;
upParentEntry->upState.downIndex = parentQueue_.down->getHeadIndex();
hasFreeTupp_ = FALSE;
// we can forget about this tupp now
workAtp_->getTupp(resultRowAtpIndex_).release();
// we got another result row
matchCount_++;
upParentEntry->upState.setMatchNo(matchCount_);
if ( matchCount_ == 1 && haveSpilled_ ) {
// haveSpilled_ indicates that an overflow took place (Only for distinct
// HGB the overflow occurs after the return of the first row, but that
// happens at the very begining, so it is not of interest for us.)
if ( hashGrbyTdb().logDiagnostics() )
SQLMXLoggingArea::logExecRtInfo(NULL, 0,
"HASH GRBY returns first row.",
hashGrbyTdb().getExplainNodeId());
}
// insert into parent up queue
parentQueue_.up->insert();
return 0;
}
else if (evalRetCode == ex_expr::EXPR_ERROR)
{
workHandleError(atp);
return 0;
}
else {
// no hit. release the returned tuple
atp->release();
// Try to advance to the next return row
//
dataPointer = NULL;
// If we are returning rows from the bitmux buffer, then
// get the next return row.
//
if(state_ == HASH_GRBY_RETURN_BITMUX_ROWS) {
dataPointer = (HashRow *)(bitMuxTable_->getReturnGroup());
bitMuxTable_->advanceReturnGroup();
}
// Otherwise, we are returning rows from the cluster so advance to the
// next row.
//
else {
dataPointer = rCluster_->advance();
}
};
} // while (dataPointer)
// If we were returning rows from the bitmux buffer, then the buffer
// must be empty so set the state to done.
//
if(state_ == HASH_GRBY_RETURN_BITMUX_ROWS) {
bitMuxTable_->resetReturnGroup();
}
// Otherwise, we are done returning rows from a cluster. Go back to the
// evaluate phase.
//
else {
// no more rows in this cluster. Delete cluster.
// before we release the cluster, we collect some stats about it
if (hashGroupByStats_ &&
state_ != HASH_GRBY_RETURN_PARTIAL_GROUPS)
hashGroupByStats_->
addClusterStats(rCluster_->getStats(hashGroupByStats_->getHeap()));
delete rCluster_;
rCluster_ = NULL;
}
return 1;
};
/////////////////////////////////////////////////////////////////////////////
// resetClusterAndReadFromChild: creates a new cluster and prepares
// it for reading from child.
/////////////////////////////////////////////////////////////////////////////
void ex_hash_grby_tcb::resetClusterAndReadFromChild()
{
// There must be a better way to reuse
// the rCluster_ without deleteing and re-allocating
// the cluster.
//
Cluster * cluster = NULL;
ULng32 bucketIdx = 0;
buckets_[bucketIdx].init();
// allocate the cluster
cluster = new(heap_) Cluster(Cluster::CHAINED,
clusterDb_,
&buckets_[bucketIdx],
1, // bucketsPerCluster,
hashGrbyTdb().extGroupedRowLength_,
hashGrbyTdb().useVariableLength(),
hashGrbyTdb().considerBufferDefrag(),
hashGrbyTdb().hbRowAtpIndex_,
TRUE,
FALSE, // not used for group by
cluster,
&rc_);
if ( !cluster || rc_ != EXE_OK ) {
// we could not allocate the Cluster
if ( !cluster ) rc_ = EXE_NO_MEM_TO_EXEC;
else delete cluster;
setState(HASH_GRBY_ERROR);
return;
};
// store head of the cluster list in the clusterDb_
clusterDb_->setClusterList(cluster);
// Initialize the group by state.
//
setState(HASH_GRBY_READ_CHILD);
};
/////////////////////////////////////////////////////////////////////////////
// workDone: cleanup and return
/////////////////////////////////////////////////////////////////////////////
void ex_hash_grby_tcb::workDone() {
ex_queue_entry * upParentEntry = parentQueue_.up->getTailEntry();
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
upParentEntry->upState.status = ex_queue::Q_NO_DATA;
upParentEntry->upState.parentIndex = downParentEntry->downState.parentIndex;
upParentEntry->upState.downIndex = parentQueue_.up->getHeadIndex();
upParentEntry->upState.setMatchNo(matchCount_);
parentQueue_.down->removeHead();
if ( haveSpilled_ && hashGrbyTdb().logDiagnostics() ) {
char msg[256];
Int64 elapsedIOTime = ioTimer_.endTimer();// stop timing, get current total
sprintf(msg,
"HASH GRBY returns last row; for spills, numChecks: %d , elapsed time = " PF64,
numIOChecks_, elapsedIOTime );
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg,
hashGrbyTdb().getExplainNodeId());
numIOChecks_ = 0;
}
setState(HASH_GRBY_EMPTY);
parentQueue_.up->insert();
if (rCluster_) {
delete rCluster_;
rCluster_ = NULL;
};
// Soln 10-100607-0922: Delete flushed clusters before deleting clusterDb_
// We'd have flushed clusters here only if the query was canceled while
// processing overflow
while (ofClusterList_) {
Cluster * p = ofClusterList_->getNext();
delete ofClusterList_;
ofClusterList_ = p;
};
if (clusterDb_) {
// Yield memory allocated back to global count of unused memory, so that
// other BMOs may use this memory
clusterDb_->yieldAllMemoryQuota(); // yield all of the alloc memory
delete clusterDb_;
clusterDb_ = NULL;
};
// delete the buckets
if (buckets_) {
heap_->deallocateMemory(buckets_);
buckets_ = NULL;
};
if (hasFreeTupp_) {
workAtp_->getTupp(resultRowAtpIndex_).release();
hasFreeTupp_ = FALSE;
};
// fix for CR 10-010713-3879: release tempUpAtp_ when done
// to decrease tupp descriptor reference count
tempUpAtp_->release();
};
/////////////////////////////////////////////////////////////////////////////
// workHandleError: insert Q_SQLERROR into parent's up queue.
/////////////////////////////////////////////////////////////////////////////
void ex_hash_grby_tcb::workHandleError(atp_struct* atp)
{
ex_queue_entry * upParentEntry = parentQueue_.up->getTailEntry();
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
if (atp)
upParentEntry->getAtp()->copyAtp(atp);
upParentEntry->upState.status = ex_queue::Q_SQLERROR;
upParentEntry->upState.parentIndex = downParentEntry->downState.parentIndex;
upParentEntry->upState.downIndex = parentQueue_.up->getHeadIndex();
upParentEntry->upState.setMatchNo(matchCount_);
parentQueue_.up->insert();
// Cancel all the this parent's request, and cleanup child queue, if needed
setState(HASH_GRBY_CANCELED);
};
//////////////////////////////////////////////////////////////////////////
// The hash groupby operator does not need private state per queue entry
//////////////////////////////////////////////////////////////////////////