blob: 1828a0f36c990fa83dd0745ec8d422c96d48234c [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: ex_hashj.cpp
* Description: Methods for the tdb and tcb of a hash join
* with the ordered queue protocol
*
*
* Created: 5/3/94
* Language: C++
*
*
*
*
******************************************************************************
*/
//
// This file contains all the generator and executor methods associated
// with a hybrid hash join
//
// beginning of regular compilation
#include "ex_stdh.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ex_hashj.h"
#include "ex_expr.h"
#include "str.h"
#include "BaseTypes.h"
#include "ExStats.h"
#include "ex_error.h"
#include "ex_exe_stmt_globals.h"
#include "memorymonitor.h"
#include "sql_buffer_size.h"
#include "logmxevent.h"
#include "exp_function.h"
#include "CommonStructs.h"
///////////////////////////////////////////////////////////////////////////////
// hashj state transition diagram source
//
// Please maintain this as you add states or make changes to the transitions
// between states. The grammar is quite simple: "oldstate -> newstate" is all
// you need to be concerned with.
//
// To generate a nice diagram, copy this text into hj_std.dot,
// download http://www.graphviz.org/Download.php
// and run: dot -Tgif hj_std.dot >hj.gif
//
///////////////////////////////////////////////////////////////////////////////
/*
digraph hasjoin {
EMPTY -> READ_OUTER
EMPTY -> READ_INNER
EMPTY -> DONE
FLUSH_CLUSTER0 [ label="FLUSH_CLUSTER" ]
FLUSH_CLUSTER1 [ label="FLUSH_CLUSTER" ]
FLUSH_CLUSTER2 [ label="FLUSH_CLUSTER" ]
FLUSH_CLUSTER3 [ label="FLUSH_CLUSTER" ]
PROBE0 [ label="PROBE" ]
PROBE1 [ label="PROBE" ]
READ_BUFFER -> READ_OUTER_CLUSTER
READ_OUTER_CLUSTER -> RETURN_RIGHT_ROWS
READ_OUTER_CLUSTER -> READ_INNER_CLUSTER
READ_OUTER_CLUSTER -> PROBE0
READ_OUTER_CLUSTER -> READ_BUFFER
PROBE0 -> READ_OUTER_CLUSTER
RETURN_RIGHT_ROWS -> READ_INNER_CLUSTER
RETURN_RIGHT_ROWS -> END_PHASE2
READ_INNER -> END_PHASE1
READ_INNER -> FLUSH_CLUSTER0
FLUSH_CLUSTER0 -> READ_INNER
READ_OUTER -> RETURN_RIGHT_ROWS
READ_OUTER -> FLUSH_CLUSTER3
READ_OUTER -> END_PHASE2
FLUSH_CLUSTER3 -> READ_OUTER
READ_OUTER -> PROBE1
PROBE1 -> READ_OUTER
not_EMPTY [ shape=plaintext ]
not_EMPTY -> CANCELED
not_EMPTY -> HASHJ_CANCEL_AFTER_INNER
HASHJ_CANCEL_AFTER_INNER -> END_PHASE1
CANCELED -> DONE
READ_INNER_CLUSTER -> READ_BUFFER
END_PHASE1 -> CANCELED [ label="oldState() == CANCEL_AFTER_INNER" ]
END_PHASE1 -> READ_OUTER
END_PHASE1 -> FLUSH_CLUSTER1
FLUSH_CLUSTER1 -> END_PHASE1
END_PHASE2 -> DONE
END_PHASE2 -> FLUSH_CLUSTER2
END_PHASE2 -> READ_INNER_CLUSTER
FLUSH_CLUSTER2 -> END_PHASE2
}
*/
///////////////////////////////////////////////////////////////////////////////
//
// TDB procedures
//
///////////////////////////////////////////////////////////////////////////////
//
// Build a hashj tcb
//
ex_tcb * ex_hashj_tdb::build(ex_globals * glob) {
// first build the children
ex_tcb * leftChildTcb = leftChildTdb_->build(glob);
ex_tcb * rightChildTcb = rightChildTdb_->build(glob);
ex_hashj_tcb * hashJoinTcb = NULL;
// now build the hash join Tcb
// If this Hash Join is configured to be a Unique Hash Join,
// use the ExUniqueHashJoinTcb
//
if(isUniqueHashJoin()) {
hashJoinTcb = new(glob->getSpace())
ExUniqueHashJoinTcb(*this,
*leftChildTcb,
*rightChildTcb,
glob);
} else {
hashJoinTcb = new(glob->getSpace())
ex_hashj_tcb(*this,
*leftChildTcb,
*rightChildTcb,
glob);
}
// add the hashJoinTcb to the schedule
hashJoinTcb->registerSubtasks();
hashJoinTcb->registerResizeSubtasks();
return (hashJoinTcb);
}
void ex_hashj_tcb::registerSubtasks()
{
ExScheduler *sched = getGlobals()->getScheduler();
ex_queue_pair pQueue = getParentQueue();
// down queues are handled by workDown()
// up queues are handled by workUp()
// cancellations are handled by workCancel()
sched->registerInsertSubtask(sWorkDown, this, pQueue.down, "DN");
sched->registerCancelSubtask(sCancel, this, pQueue.down, "CA");
sched->registerUnblockSubtask(sWorkUp, this, pQueue.up, "UP");
// Register the I/O event
ioEventHandler_ = sched->registerNonQueueSubtask(sWorkUp,this);
// We need to schedule workUp from workDown if we see a GET_NOMORE
// in workDown in case nothing else scheules workUp.
workUpTask_ = sched->registerNonQueueSubtask(sWorkUp, this, "UP");
// register events for child queues
sched->registerUnblockSubtask(sWorkDown,this, leftQueue_.down);
sched->registerInsertSubtask(sWorkUp, this, leftQueue_.up);
sched->registerUnblockSubtask(sWorkDown,this, rightQueue_.down);
sched->registerInsertSubtask(sWorkUp, this, rightQueue_.up);
}
NABoolean ex_hashj_tcb::needStatsEntry()
{
ComTdb::CollectStatsType statsType = getGlobals()->getStatsArea()->getCollectStatsType();
// stats are collected for ALL and MEASURE options.
if (statsType == ComTdb::ALL_STATS || statsType == ComTdb::OPERATOR_STATS)
return TRUE;
else
return FALSE;
}
ExOperStats * ex_hashj_tcb::doAllocateStatsEntry(CollHeap *heap, ComTdb *tdb)
{
ExBMOStats *stat;
ComTdb::CollectStatsType statsType = getGlobals()->getStatsArea()->getCollectStatsType();
if (statsType == ComTdb::OPERATOR_STATS)
stat = new (heap) ExBMOStats(heap, this, tdb);
else
{
stat = (ExBMOStats *)new(heap) ExHashJoinStats(heap,
this,
tdb);
hashJoinStats_ = (ExHashJoinStats *)stat;
}
ex_hashj_tdb *hashjTdb = (ex_hashj_tdb *)getTdb();
bmoStats_ = stat;
return stat;
}
//////////////////////////////////////////////////////////////////////////////
//
// TCB procedures
//
//////////////////////////////////////////////////////////////////////////////
//
// Constructor for hashj_tcb
//
ex_hashj_tcb::ex_hashj_tcb(const ex_hashj_tdb & hashJoinTdb,
const ex_tcb & leftChildTcb, // left queue pair
const ex_tcb & rightChildTcb, // right queue pair
ex_globals * glob)
: ex_tcb(hashJoinTdb, 1, glob),
space_(glob->getSpace()),
leftChildTcb_(&leftChildTcb),
rightChildTcb_(&rightChildTcb),
bucketCount_(0),
buckets_(NULL),
outerMatchedInner_(FALSE),
rc_(EXE_OK),
hashValue_(0),
clusterDb_(NULL),
workAtp_(NULL),
ioEventHandler_(NULL),
currCluster_(NULL),
flushedChainedCluster_(FALSE),
totalRightRowsRead_(0),
isAllOrNothing_(FALSE),
anyRightRowsRead_(FALSE),
doNotChainDup_(FALSE),
hasFreeTupp_(FALSE),
isRightOutputNeeded_(hashJoinTdb.rightRowLength_ > 0
// in some cases, even when rightRowLength_ is zero,
// the left join expr applies to the right tupp
// see soln 10-090107-8249
|| hashJoinTdb.leftJoinExpr_
),
onlyReturnResultsWhenInnerMatched_( !hashJoinTdb.isLeftJoin() &&
!hashJoinTdb.isAntiSemiJoin() ) ,
isInnerEmpty_(TRUE),
nullPool_(NULL)
{
bmoStats_ = NULL;
hashJoinStats_ = NULL;
heap_ = new (glob->getDefaultHeap()) NAHeap("Hash Join Heap", (NAHeap *)glob->getDefaultHeap());
// set the memory monitor
memMonitor_ = getGlobals()->castToExExeStmtGlobals()->getMemoryMonitor();
// Allocate the buffer pool for result rows
// this pool contains only one buffer
Int32 numBuffers = hashJoinTdb.numBuffers_;
// The default number of buffers is 1.
// Regular Hash Join (not unique) adds more buffers on demand (it
// does not return POOL_BLOCKED).
// Unique Hash Join returns POOL_BLOCKED, so if this is a unique
// hash join make sure there are at least 2 buffers to avoid dead
// lock on a pinned buffer.
if(hashJoinTdb.isUniqueHashJoin() && (numBuffers < 2)) {
numBuffers = 2;
}
resultPool_ = new(space_) sql_buffer_pool(numBuffers,
(Lng32)hashJoinTdb.bufferSize_,
space_);
if (hashJoinTdb.considerBufferDefrag())
{
assert(hashJoinTdb.useVariableLength());
ULng32 defragLength = MAXOF(hashJoinTdb.leftRowLength_, hashJoinTdb.rightRowLength_);
resultPool_->addDefragTuppDescriptor(defragLength);
}
// Copy all expression pointers
minMaxExpr_ = hashJoinTdb.minMaxExpr_;
rightHashExpr_ = hashJoinTdb.rightHashExpr_;
rightMoveInExpr_ = hashJoinTdb.rightMoveInExpr_;
rightMoveOutExpr_ = hashJoinTdb.rightMoveOutExpr_;
rightSearchExpr_ = hashJoinTdb.rightSearchExpr_;
leftHashExpr_ = hashJoinTdb.leftHashExpr_;
leftMoveExpr_ = hashJoinTdb.leftMoveExpr_;
leftMoveInExpr_ = hashJoinTdb.leftMoveInExpr_;
leftMoveOutExpr_ = hashJoinTdb.leftMoveOutExpr_;
probeSearchExpr1_ = hashJoinTdb.probeSearchExpr1_;
probeSearchExpr2_ = hashJoinTdb.probeSearchExpr2_;
leftJoinExpr_ = hashJoinTdb.leftJoinExpr_;
nullInstForLeftJoinExpr_ = hashJoinTdb.nullInstForLeftJoinExpr_;
rightJoinExpr_ = hashJoinTdb.rightJoinExpr_;
nullInstForRightJoinExpr_ = hashJoinTdb.nullInstForRightJoinExpr_;
beforeJoinPred1_ = hashJoinTdb.beforeJoinPred1_;
beforeJoinPred2_ = hashJoinTdb.beforeJoinPred2_;
afterJoinPred1_ = hashJoinTdb.afterJoinPred1_;
afterJoinPred2_ = hashJoinTdb.afterJoinPred2_;
afterJoinPred3_ = hashJoinTdb.afterJoinPred3_;
afterJoinPred4_ = hashJoinTdb.afterJoinPred4_;
afterJoinPred5_ = hashJoinTdb.afterJoinPred5_;
checkInputPred_ = hashJoinTdb.checkInputPred_;
moveInputExpr_ = hashJoinTdb.moveInputExpr_;
checkInnerNullExpr_ = hashJoinTdb.checkInnerNullExpr_;
checkOuterNullExpr_ = hashJoinTdb.checkOuterNullExpr_;
// Set up a flag for the HashTable::position method calls,
// for cases where it is valid to return only one match
// from the inner table. Keep this logic in sync with the
// multiple-match loop in ex_hashj_tcb::workProbe.
if (isSemiJoin() &&
beforeJoinPred1_ == NULL &&
beforeJoinPred2_ == NULL &&
afterJoinPred1_ == NULL &&
afterJoinPred2_ == NULL)
doNotChainDup_ = TRUE;
else if (isAntiSemiJoin() &&
beforeJoinPred1_ == NULL &&
beforeJoinPred2_ == NULL)
doNotChainDup_ = TRUE;
else
; // see above in the ctor initializer that we have set this
// flag to FALSE;
// get the queues that left and right use to communicate with me
leftQueue_ = leftChildTcb_->getParentQueue();
rightQueue_ = rightChildTcb_->getParentQueue();
// We don't need state in this up queues
ex_cri_desc * fromParentCri = hashJoinTdb.criDescDown_;
ex_cri_desc * toParentCri = hashJoinTdb.criDescUp_;
// Allocate the queue to communicate with parent (no PSTATE)
allocateParentQueues(parentQueue_,FALSE);
// Allocate the private state in each entry of the down queue
ex_hashj_private_state *p = new(space_) ex_hashj_private_state();
parentQueue_.down->allocatePstate(p, this);
delete p;
// fixup expressions
if (minMaxExpr_)
(void) minMaxExpr_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (rightHashExpr_)
(void) rightHashExpr_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (rightMoveInExpr_)
(void) rightMoveInExpr_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (rightMoveOutExpr_)
(void) rightMoveOutExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, FALSE, glob);
if (rightSearchExpr_)
(void) rightSearchExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, FALSE, glob);
else {
// ASJ, and no join search expr and no before predicate -- then either
// return all left rows, or none, based on having ANY right rows
isAllOrNothing_ = isAntiSemiJoin() && ! beforeJoinPred1_ ;
}
if (leftHashExpr_)
(void) leftHashExpr_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (leftMoveExpr_)
(void) leftMoveExpr_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (leftMoveInExpr_)
(void) leftMoveInExpr_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (leftMoveOutExpr_)
(void) leftMoveOutExpr_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (probeSearchExpr1_)
(void) probeSearchExpr1_->fixup(0, getExpressionMode(), this,
space_, heap_, FALSE, glob);
if (probeSearchExpr2_)
(void) probeSearchExpr2_->fixup(0, getExpressionMode(), this,space_, heap_, FALSE, glob);
if (leftJoinExpr_)
(void) leftJoinExpr_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (nullInstForLeftJoinExpr_)
(void) nullInstForLeftJoinExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, FALSE, glob);
if (rightJoinExpr_)
(void) rightJoinExpr_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (nullInstForRightJoinExpr_)
(void) nullInstForRightJoinExpr_->fixup(0, getExpressionMode(), this,
space_, heap_, FALSE, glob);
if (beforeJoinPred1_)
(void) beforeJoinPred1_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (beforeJoinPred2_)
(void) beforeJoinPred2_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (afterJoinPred1_)
(void) afterJoinPred1_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (afterJoinPred2_)
(void) afterJoinPred2_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (afterJoinPred3_)
(void) afterJoinPred3_->fixup(0, getExpressionMode(), this,space_, heap_, FALSE, glob);
if (afterJoinPred4_)
(void) afterJoinPred4_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if (afterJoinPred5_)
(void) afterJoinPred5_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
if ( isReuse() ) {
if (checkInputPred_)
(void) checkInputPred_->fixup(0, getExpressionMode(),this,space_, heap_, FALSE, glob);
if (moveInputExpr_)
(void) moveInputExpr_->fixup(0, getExpressionMode(), this, space_, heap_, FALSE, glob);
// allocate space to keep the current input values for next time
Lng32 neededBufferSize =
(Lng32) SqlBufferNeededSize( 1, hashJoinTdb.inputValuesLen_ );
inputPool_ = new(space_) sql_buffer_pool(1, neededBufferSize, space_);
inputPool_->get_free_tuple(prevInputValues_, hashJoinTdb.inputValuesLen_);
} // if isReuse
if (checkInnerNullExpr_)
(void) checkInnerNullExpr_->fixup(0, getExpressionMode(), this,space_, heap_, FALSE, glob);
if (checkOuterNullExpr_)
(void) checkOuterNullExpr_->fixup(0, getExpressionMode(), this,space_, heap_, FALSE, glob);
// get initial head index
nextRequest_ = parentQueue_.down->getHeadIndex();
haveAllocatedClusters_ = FALSE; // no clusters yet (nor their hash tables)
havePrevInput_ = FALSE; // first time there is no previous input
workAtp_ = allocateAtp(hashJoinTdb.workCriDesc_, space_);
// initialize tupp descriptor for the hash value
hashValueTupp_.init(sizeof(SimpleHashValue), NULL, (char *) (&hashValue_));
// allocate tupp descriptors for tupps in the workAtp. tupps 0 and 1
// are used for constants and temps. Don't allocate them here. Also
// left row, right row and instantiated row are allocated in the
// result buffer. Only allocate tupp descriptors for the extended rows
workAtp_->getTupp(hashJoinTdb.extLeftRowAtpIndex_) =
new(space_) tupp_descriptor;
workAtp_->getTupp(hashJoinTdb.extRightRowAtpIndex1_) =
new(space_) tupp_descriptor;
workAtp_->getTupp(hashJoinTdb.extRightRowAtpIndex2_) =
new(space_) tupp_descriptor;
// assign the hash value tupp
workAtp_->getTupp(hashJoinTdb.hashValueAtpIndex_) = &hashValueTupp_;
// Use one null tuple for both Left and Right joins;
ULng32 nullLength = 0;
if(leftJoinExpr_ && nullInstForLeftJoinExpr_ &&
(hashJoinTdb.instRowForLeftJoinLength_ > 0)) {
nullLength = hashJoinTdb.instRowForLeftJoinLength_;
}
if (rightJoinExpr_ && nullInstForRightJoinExpr_ &&
(hashJoinTdb.instRowForRightJoinLength_ > 0)) {
nullLength = MAXOF(nullLength, hashJoinTdb.instRowForRightJoinLength_);
}
// Allocate a NULL tuple for use in null instantiation.
// Allocate the MinMax tuple if min/max optimazation is being used.
if((nullLength > 0) || (hashJoinTdb.minMaxRowLength_ > 0)) {
ULng32 tupleSize = MAXOF(nullLength, hashJoinTdb.minMaxRowLength_);
ULng32 numTuples =
((nullLength > 0) && (hashJoinTdb.minMaxRowLength_ > 0)) ? 2 : 1;
Lng32 neededBufferSize =
(Lng32) SqlBufferNeededSize( numTuples, tupleSize);
nullPool_ = new(space_) sql_buffer_pool(1, neededBufferSize, space_);
if (nullLength > 0) {
if (nullPool_->get_free_tuple(nullData_, nullLength)) {
ex_assert(0, "ex_hashj_tcb No space for null tuple");
}
// Fill tuple with NULL values.
str_pad(nullData_.getDataPointer(), nullLength, '\377');
}
if (hashJoinTdb.minMaxRowLength_ > 0) {
// Allocate ATP used to pass request (including min/max value)
// to left child.
toLeftChildAtp_ = allocateAtp(hashJoinTdb.leftDownCriDesc_, space_);
// Allocate tuple used to compute min/max values.
if (nullPool_->get_free_tuple(workAtp_->getTupp(hashJoinTdb.minMaxValsAtpIndex_),
(Lng32) hashJoinTdb.minMaxRowLength_)) {
ex_assert(0, "ex_hashj_tcb No space for minmax tuple");
}
// Initial min/max tuple to all null values.
str_pad(workAtp_->getTupp(hashJoinTdb.minMaxValsAtpIndex_).getDataPointer(),
(Lng32) hashJoinTdb.minMaxRowLength_,
'\377');
}
}
};
///////////////////////////////////////////////////////////////////////////////
// Destructor for hashj_tcb
///////////////////////////////////////////////////////////////////////////////
ex_hashj_tcb::~ex_hashj_tcb() {
freeResources();
if (workAtp_)
deallocateAtp(workAtp_, space_);
workAtp_ = NULL;
if (clusterDb_) {
delete clusterDb_;
clusterDb_ = NULL;
};
// delete the buckets
if (buckets_) {
heap_->deallocateMemory(buckets_);
buckets_ = NULL;
};
delete heap_;
};
///////////////////////////////////////////////////////////////////////////////
// Free Resources
///////////////////////////////////////////////////////////////////////////////
void ex_hashj_tcb::freeResources() {
if ( isReuse() ) {
delete inputPool_;
inputPool_ = 0;
}
if (nullPool_) {
delete nullPool_;
nullPool_ = NULL;
}
delete parentQueue_.up;
delete parentQueue_.down;
};
///////////////////////////////////////////////////////////////////////////////
// release all result tupps in the workAtp_
///////////////////////////////////////////////////////////////////////////////
void ex_hashj_tcb::releaseResultTupps() {
if (hashJoinTdb().leftRowLength_)
workAtp_->getTupp(hashJoinTdb().leftRowAtpIndex_).release();
if (hashJoinTdb().rightRowLength_)
workAtp_->getTupp(hashJoinTdb().rightRowAtpIndex_).release();
if (isLeftJoin() && leftJoinExpr_)
workAtp_->getTupp(hashJoinTdb().instRowForLeftJoinAtpIndex_).release();
if (isRightJoin() && rightJoinExpr_)
workAtp_->getTupp(hashJoinTdb().instRowForRightJoinAtpIndex_).release();
};
///////////////////////////////////////////////////////////////////////////////
// insert a result row into the parents queue
// if dataPointer == NULL, we handle a null-ext left join or an AntiSemiJoin
///////////////////////////////////////////////////////////////////////////////
short ex_hashj_tcb::insertResult(HashRow * hashRow) {
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
atp_struct * downParentEntryAtp = downParentEntry->getAtp() ;
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
ex_queue_entry * upParentEntry = parentQueue_.up->getTailEntry();
//ex_expr::exp_return_type retCode;
atp_struct * leftRowAtp =
((pstate.getPhase() == PHASE_2) ?
leftQueue_.up->getHeadEntry()->getAtp() : workAtp_);
// first, we have to copy the input to this node
upParentEntry->copyAtp(downParentEntryAtp);
atp_struct * atp = upParentEntry->getAtp();
// Allocate the tuples as they are needed so that they can be resized if needed.
// if we are in phase 2, we get the left row directly from the queue,
// otherwise we get it from a hash buffer
if (hashJoinTdb().leftRowLength_) {
if (pstate.getPhase() == PHASE_2) {
SqlBufferHeader::moveStatus ms =
resultPool_->moveIn(leftRowAtp,
workAtp_,
hashJoinTdb().leftRowAtpIndex_,
hashJoinTdb().leftRowLength_,
leftMoveExpr_,
true,
hashJoinTdb().bufferSize_);
if (ms == SqlBufferHeader::MOVE_ERROR)
{
processError(leftRowAtp);
return 1;
}
}
else {
char *rowPointer =
leftRowAtp->getTupp(hashJoinTdb().extLeftRowAtpIndex_).getDataPointer();
UInt32 leftRowLength = hashJoinTdb().leftRowLength_;
if(hashJoinTdb().useVariableLength()) {
// For variable length rows, the row length is stored in the
// first 4 bytes of the row
leftRowLength = *((UInt32 *)rowPointer);
}
resultPool_->moveIn(workAtp_,
hashJoinTdb().leftRowAtpIndex_,
leftRowLength,
rowPointer,
true,
hashJoinTdb().bufferSize_);
}
// and assign the tupp to the parents queue entry
atp->getTupp(hashJoinTdb().returnedLeftRowAtpIndex_) =
workAtp_->getTupp(hashJoinTdb().leftRowAtpIndex_);
} // (hashJoinTdb().leftRowLength_);
// Make the left row available at
// returnedInstRowForRightJoinAtpIndex_, so that the parent
// node can gets it. Also handle rightJoinExpr for Full Outer Joins.
if (isRightJoin() && rightJoinExpr_)
{
SqlBufferHeader::moveStatus ms =
resultPool_->moveIn(downParentEntryAtp,
workAtp_,
hashJoinTdb().instRowForRightJoinAtpIndex_,
hashJoinTdb().instRowForRightJoinLength_,
rightJoinExpr_,
true,
hashJoinTdb().bufferSize_);
if (ms == SqlBufferHeader::MOVE_ERROR)
{
processError(downParentEntryAtp);
return 1;
}
// and add the row to the result.
atp->getTupp(hashJoinTdb().returnedInstRowForRightJoinAtpIndex_) =
workAtp_->getTupp(hashJoinTdb().instRowForRightJoinAtpIndex_);
} // isRightJoin() && rightJoinExpr_
// add the right row
if (hashJoinTdb().rightRowLength_ && hashRow) {
UInt32 rightRowLength = hashJoinTdb().rightRowLength_;
if(hashRow && hashJoinTdb().useVariableLength()) {
// get the actual length from the HashRow.
rightRowLength = hashRow->getRowLength();
}
resultPool_->moveIn(workAtp_,
hashJoinTdb().rightRowAtpIndex_,
rightRowLength,
hashRow->getData(),
true,
hashJoinTdb().bufferSize_);
atp->getTupp(hashJoinTdb().returnedRightRowAtpIndex_) =
workAtp_->getTupp(hashJoinTdb().rightRowAtpIndex_);
};
// handle left joins
if (isLeftJoin()) {
if (hashRow) {
// left join and we have a match, move instantiated row
if (leftJoinExpr_) {
SqlBufferHeader::moveStatus ms =
resultPool_->moveIn(downParentEntryAtp,
workAtp_,
hashJoinTdb().instRowForLeftJoinAtpIndex_,
hashJoinTdb().instRowForLeftJoinLength_,
leftJoinExpr_,
true,
hashJoinTdb().bufferSize_);
if (ms == SqlBufferHeader::MOVE_ERROR)
{
processError(downParentEntryAtp);
return 1;
}
// add the row
atp->getTupp(hashJoinTdb().returnedInstRowForLeftJoinAtpIndex_) =
workAtp_->getTupp(hashJoinTdb().instRowForLeftJoinAtpIndex_);
}
}
else {
if (nullInstForLeftJoinExpr_) {
// Use the pre-allocated nullData.
// Don't allocate the row for left join
// Will use the pre-allocated nullData instead
workAtp_->getTupp(hashJoinTdb().instRowForLeftJoinAtpIndex_) = nullData_;
// add the row
atp->getTupp(hashJoinTdb().returnedInstRowForLeftJoinAtpIndex_) =
workAtp_->getTupp(hashJoinTdb().instRowForLeftJoinAtpIndex_);
}
}
}
upParentEntry->upState.status = ex_queue::Q_OK_MMORE;
upParentEntry->upState.parentIndex = downParentEntry->downState.parentIndex;
upParentEntry->upState.downIndex = parentQueue_.down->getHeadIndex();
// we got another result row
pstate.matchCount_++;
upParentEntry->upState.setMatchNo(pstate.matchCount_);
// move left and/or right warnings to parent
if (pstate.accumDiags_)
{
ComDiagsArea *accumulatedDiagsArea =
upParentEntry->getDiagsArea();
if (accumulatedDiagsArea)
accumulatedDiagsArea->mergeAfter(*pstate.accumDiags_);
else
{
upParentEntry->setDiagsArea(pstate.accumDiags_);
// incr ref count after set so it doesn't get deallocated.
pstate.accumDiags_->incrRefCount();
}
pstate.accumDiags_ = NULL;
} // left or right child returned a warning
// insert into parent up queue
parentQueue_.up->insert();
// update operator stats
if (bmoStats_)
bmoStats_-> incActualRowsReturned();
// we can forget about the row in the workAtp
releaseResultTupps();
if (bmoStats_)
bmoStats_->setSpaceBufferCount(resultPool_->get_number_of_buffers());
return 0;
};
///////////////////////////////////////////////////////////////////////////////
// the join is canceled. Consume the rows from a queue
///////////////////////////////////////////////////////////////////////////////
NABoolean ex_hashj_tcb::consumeForCancel(ex_queue_pair q) {
// loop forever. exit via return
while (TRUE) {
if (q.up->isEmpty())
// queue is empty, but we are not done yet. Come back later
return FALSE;
else {
ex_queue_entry * entry = q.up->getHeadEntry();
switch (entry->upState.status) {
case ex_queue::Q_SQLERROR:
case ex_queue::Q_OK_MMORE: {
// consume the row
q.up->removeHead();
}; break;
case ex_queue::Q_NO_DATA: {
// we are done with this queue
return TRUE;
}; break;
case ex_queue::Q_INVALID: {
ex_assert(0, "ex_hashj_tcb::consumeForCancel() invalid state returned by child");
}; break;
}; // switch
}; // else
}; // while
};
short ex_hashj_tcb::processError(atp_struct* atp)
{
ex_assert( ! parentQueue_.down->isEmpty(),
"ex_hashj_tcb::processError() Unexpected empty parent down queue" );
ex_queue_entry *pEntryDown = parentQueue_.down->getHeadEntry();
ex_queue_entry *upEntry = parentQueue_.up->getTailEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) pEntryDown->pstate);
upEntry->copyAtp(atp);
upEntry->upState.status = ex_queue::Q_SQLERROR;
upEntry->upState.downIndex = parentQueue_.down->getHeadIndex();
upEntry->upState.parentIndex = pEntryDown->downState.parentIndex;
upEntry->upState.setMatchNo(pstate.matchCount_);
// insert into parent
parentQueue_.up->insert();
leftQueue_.down->cancelRequestWithParentIndex(parentQueue_.down->getHeadIndex());
rightQueue_.down->cancelRequestWithParentIndex(parentQueue_.down->getHeadIndex());
pstate.setState(HASHJ_CANCELED);
return 0;
};
ExWorkProcRetcode ex_hashj_tcb::work()
{
ex_assert(0,"Should never call ex_hashj_tcb::work()");
return WORK_OK;
}
///////////////////////////////////////////////////////////////////////////////
// workUp()
///////////////////////////////////////////////////////////////////////////////
ExWorkProcRetcode ex_hashj_tcb::workUp() {
// Check if there is still work to do
if (parentQueue_.down->isEmpty())
return WORK_OK;
// A hybrid hash join never works on more than one parent request at a time
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
const ex_queue::down_request & request = downParentEntry->downState.request;
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
ULng32 xProductPreemptMax =
hashJoinTdb().getXproductPreemptMax() ? hashJoinTdb().getXproductPreemptMax() :
UINT_MAX;
numJoinedRowsRejected_ = 0;
// loop forever, exit via return
while (TRUE) {
// if we have already given to the parent all the rows needed then cancel
// the parent's request.
if ((request == ex_queue::GET_NOMORE ||
request == ex_queue::GET_N &&
downParentEntry->downState.requestValue <= (Lng32) pstate.matchCount_)
&& // and not canceled or done yet
pstate.getState() != HASHJ_CANCELED &&
pstate.getOldState() != HASHJ_CANCEL_AFTER_INNER &&
pstate.getState() != HASHJ_DONE )
{
propagateCancel(parentQueue_.down->getHeadIndex(), pstate);
}
switch (pstate.getState()) {
case HASHJ_READ_INNER:
case HASHJ_CANCEL_AFTER_INNER: {
if ( ! pstate.usingPreviousHT() && ! pstate.getHaveClusters() ) {
if ( allocateClusters() ) {
pstate.setState(HASHJ_ERROR);
break;
}
pstate.setHaveClusters(TRUE); // now the clusters are allocated
}
if (rightQueue_.up->isEmpty() ||
parentQueue_.up->isFull()) {
return WORK_OK;
};
if (workReadInner())
return WORK_OK;
} break;
case HASHJ_END_PHASE_1: {
workEndPhase1();
if ( pstate.getState() == HASHJ_END_PHASE_1 )
return WORK_CALL_AGAIN; // more work, but let others run too
} break;
case HASHJ_READ_OUTER: {
if (leftQueue_.up->isEmpty() ||
parentQueue_.up->isFull()) {
return WORK_OK;
};
// Query limits & suspend
if (numJoinedRowsRejected_ > xProductPreemptMax)
return WORK_HASHJ_PREEMPT;
if (workReadOuter())
return WORK_OK;
} break;
case HASHJ_END_PHASE_2: {
workEndPhase2();
} break;
case HASHJ_FLUSH_CLUSTER: {
workFlushCluster();
if (pstate.getState() == HASHJ_FLUSH_CLUSTER) {
// the cluster is not completely flushed. An I/O is pending
return WORK_CALL_AGAIN; // $$$$ add I/O event, change to WORK_OK
};
} break;
case HASHJ_READ_INNER_CLUSTER: {
workReadInnerCluster();
if (pstate.getState() == HASHJ_READ_INNER_CLUSTER) {
// the cluster is not completely read. An I/O is pending
return WORK_CALL_AGAIN; // $$$$ add I/O event, change to WORK_OK
};
} break;
case HASHJ_READ_OUTER_CLUSTER: {
// Query limits & suspend
if (numJoinedRowsRejected_ > xProductPreemptMax)
return WORK_HASHJ_PREEMPT;
workReadOuterCluster();
} break;
case HASHJ_READ_BUFFER: {
workReadBuffer();
if (pstate.getState() == HASHJ_READ_BUFFER) {
// the buffer is not read yet; the I/O is still pending
return WORK_CALL_AGAIN;// $$$$ add I/O event, change to WORK_OK
};
} break;
case HASHJ_RETURN_RIGHT_ROWS: {
// Make sure that we have a free entry in the parents up queue.
if (parentQueue_.up->isFull()) {
// no free entry, come back later
return WORK_OK;
};
if (workReturnRightRows())
return WORK_OK;
} break;
case HASHJ_PROBE: {
// this routine could potentially give a result row. Make sure
// that we have a free entry in the parents up queue.
if (parentQueue_.up->isFull()) {
// no free entry, come back later
return WORK_OK;
};
if (workProbe())
return WORK_OK;
} break;
case HASHJ_DONE: {
// make sure that we have a free slot in the parent's up queue
if (parentQueue_.up->isFull()) {
return WORK_OK;
};
workDone();
// That's it, need to be called again if there are more requests
// We need to schedule workDown from workUp when workUp processes
// the last request it can but there's still some workDown can use.
if (parentQueue_.down->isEmpty())
return WORK_OK;
else
return WORK_CALL_AGAIN;
} break;
case HASHJ_CANCELED: {
// the request was canceled. Both children were sent cancel requests.
// Consume all up rows from the children and wait for Q_NO_DATA.
// (Not needed if the left request was never sent).
NABoolean leftDone =
pstate.delayedLeftRequest() ? TRUE : consumeForCancel(leftQueue_);
// only check the right queue if this request built its own hash table
// (not reusing)
NABoolean rightDone =
pstate.usingPreviousHT() ? TRUE : consumeForCancel(rightQueue_) ;
if (!(leftDone && rightDone))
// we are not done yet, come back again after queue inserts
return WORK_OK;
// all rows are consumed, we are done
pstate.setState(HASHJ_DONE);
} break;
case HASHJ_ERROR: {
// make sure that we have a free slot in the parent's up queue
if (parentQueue_.up->isFull()) {
return WORK_OK;
};
ex_assert( rc_ , "Missing error code");
// we ran into a serious runtime error. Create Condition and
// pass it to parent. rc_ has the error code and
// oldState_ tells us in which state the error occurred
ComDiagsArea *da = downParentEntry->getDiagsArea();
if (!da || !da->contains((Lng32) -rc_))
{
ComDiagsArea * diags = NULL;
#ifndef __EID
if(rc_ == EXE_SORT_ERROR)
{
char msg[512];
char errorMsg[100];
Lng32 scratchError = 0;
Lng32 scratchSysError = 0;
Lng32 scratchSysErrorDetail = 0;
if(clusterDb_)
clusterDb_->getScratchErrorDetail(scratchError,
scratchSysError,
scratchSysErrorDetail,
errorMsg);
str_sprintf(msg, "Hash Join Scratch IO Error occurred. Scratch Error: %d, System Error: %d, System Error Detail: %d, Details: %s",
scratchError, scratchSysError, scratchSysErrorDetail, errorMsg);
diags = ExRaiseSqlError(heap_, downParentEntry,
(ExeErrorCode) -rc_,
NULL,
msg);
}
else
#endif
diags = ExRaiseSqlError(heap_, downParentEntry,
(ExeErrorCode) -rc_);
downParentEntry->setDiagsArea(diags);
}
processError(downParentEntry->getAtp());
} break;
}
}
};
///////////////////////////////////////////////////////////////////////////
// isSameInputAgain()
// Check if the old hash table (from the previous input) can be reused
// That is, match the current input with the previous input
///////////////////////////////////////////////////////////////////////////
NABoolean ex_hashj_tcb::isSameInputAgain( ex_queue_entry * downParentEntry ) {
if ( havePrevInput_ == FALSE ) return FALSE;
// insert the previous input into the appropriate tupp in the workAtp
workAtp_->getTupp(hashJoinTdb().prevInputTuppIndex_) = prevInputValues_;
NABoolean haveMark = FALSE;
Lng32 oldDiagsAreaMark = 0;
ComDiagsArea * da = downParentEntry->getAtp()->getDiagsArea();
if ( da != NULL ) {
oldDiagsAreaMark = da->mark();
haveMark = TRUE;
}
// match the current input with the previous
switch ( checkInputPred_->eval(downParentEntry->getAtp(), workAtp_ ) ) {
case ex_expr::EXPR_TRUE :
// the new input matches the previous --> reuse existing hash table
return TRUE; // this HT is a reuse of a previous one
break;
case ex_expr::EXPR_ERROR :
// expression evaluation failed; treat this like EXPR_FALSE -> no reuse
// clean diags area (so this error won't show)
if ( haveMark ) {
downParentEntry->getAtp()->getDiagsArea()->rewind(oldDiagsAreaMark, TRUE);
} else {
downParentEntry->getAtp()->getDiagsArea()->clear();
}
break;
case ex_expr::EXPR_FALSE :
break;
default:
ex_assert(0, "ex_hashj_tcb::isSameInputAgain() Invalid expr evaluation.");
}
return FALSE;
}
///////////////////////////////////////////////////////////////////////////
// workDown()
// Send the GET_ALL request to the left and possibly right child queue.
///////////////////////////////////////////////////////////////////////////
ExWorkProcRetcode ex_hashj_tcb::workDown() {
queue_index queue_tail = parentQueue_.down->getTailIndex();
while (nextRequest_ != queue_tail ) {
// we just received a new entry from our parent, start working
ex_queue_entry * downParentEntry =
parentQueue_.down->getQueueEntry(nextRequest_);
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
NABoolean will_reuse_old_HT = FALSE;
ex_queue_entry * leftEntry = NULL;
switch ( downParentEntry->downState.request ) {
case ex_queue::GET_N:
case ex_queue::GET_ALL:
if (rightQueue_.down->isFull() || leftQueue_.down->isFull() ) {
// we can count on being scheduled again on queues unblocking
return WORK_OK;
}
if ( isReuse() ) {
// If new input matches the previous input, then reuse the old hash-table
if ( will_reuse_old_HT = isSameInputAgain(downParentEntry) ) {
// the hash table is already prepared. Go on with phase 2 of the join
pstate.setPhase(PHASE_2);
pstate.usePreviousHT();
pstate.reliesOnIdx = prevReqIdx_;
pstate.setState(HASHJ_READ_OUTER); // all set to start reading the outer
}
else {
// can't reuse old hash table; need to build a new one
// remember the current input for next check.
if ( moveInputExpr_ ) {
havePrevInput_ = TRUE;
prevReqIdx_ = nextRequest_;
// insert the previous input into the appropriate tupp in this atp
workAtp_->getTupp(hashJoinTdb().prevInputTuppIndex_) = prevInputValues_;
// copy input into prevInputValues_
if ( moveInputExpr_->eval(downParentEntry->getAtp(), workAtp_ ) !=
ex_expr::EXPR_OK ) {
// a move expr should never fail; but if it does -- blow up this query
ex_assert(0, "ex_hashj_tcb::workDown() moveInputExpr failed");
}
}
}
} // if (isReuse())
// Either no reuse or reuse with new input -- Start phase 1
if ( ! will_reuse_old_HT ) {
pstate.setState(HASHJ_READ_INNER); // phase 1
pstate.reliesOnIdx = nextRequest_;
// pass GET ALL request to right child as well. This is done
// because all rows are needed (in worst case) to find even
// one matching row.
ex_queue_entry * rightEntry = rightQueue_.down->getTailEntry();
rightEntry->downState.request = ex_queue::GET_ALL;
rightEntry->downState.requestValue =
downParentEntry->downState.requestValue;
rightEntry->downState.parentIndex = nextRequest_;
rightEntry->passAtp(downParentEntry);
rightQueue_.down->insert();
// Build the hash table only if we "caught up" and
// processed the head entry (without reuse)
if (nextRequest_ == parentQueue_.down->getHeadIndex()) {
if(hashJoinTdb().isUniqueHashJoin()) {
// If this is a Unique Hash Join, allocate the hash table
// structures, rather than the clusters.
//
ExUniqueHashJoinTcb *uHashJoinTcb = (ExUniqueHashJoinTcb *)this;
if (uHashJoinTcb->allocateBufferHeap())
pstate.setState(HASHJ_ERROR);
} else {
if (allocateClusters()) pstate.setState(HASHJ_ERROR);
}
pstate.setHaveClusters(TRUE); // now the clusters are allocated
}
} // if (!will_reuse_old_HT)
if ( isAllOrNothing_ ) { // Special case: AntiSemi HJ, no search expr
if ( will_reuse_old_HT ) {
// Even though we reuse, still need to finish up few things before
// phase two; e.g., handle delayed or non-delayed left request
pstate.setState(HASHJ_END_PHASE_1);
pstate.setPhase(PHASE_1); // needed ? just in case ...
}
else { // no reuse
// Change the request: only one row from the right child is needed
ex_queue_entry * rightEntry = rightQueue_.down->getTailEntry();
rightEntry->downState.request = ex_queue::GET_N;
rightEntry->downState.requestValue = 1;
}
// Wait to find about right rows; don't send request to the left yet !!
if ( hashJoinTdb().delayLeftRequest() ) {
pstate.setDelayedLeftRequest();
// No requests were sent to either child, so need to reschedule
// to allow workUp to finish this request.
workUpTask_->schedule();
break;
}
} // if (isAllOrNothing_)
// If this HashJoin is doing min/max optimization, then we must
// delay the left request. The request will be sent during
// workEndPhase1() after reading all rows from the right child
// and after having computed the min and max values which will
// be sent with the request to the left child.
if (minMaxExpr_ && hashJoinTdb().delayLeftRequest() ) {
pstate.setDelayedLeftRequest();
} else {
// pass GET_ALL request to the left child
leftEntry = leftQueue_.down->getTailEntry();
leftEntry->downState.request = ex_queue::GET_ALL;
leftEntry->downState.requestValue = downParentEntry->downState.requestValue;
leftEntry->downState.parentIndex = nextRequest_;
leftEntry->passAtp(downParentEntry);
leftQueue_.down->insert();
}
// set the space buffer size and initial count whenever ALL request is sent
if (bmoStats_)
{
ex_hashj_tdb *hashjTdb = (ex_hashj_tdb *)getTdb();
bmoStats_->setSpaceBufferSize(hashjTdb->bufferSize_);
bmoStats_->setSpaceBufferCount(resultPool_->get_number_of_buffers());
}
break;
case ex_queue::GET_NOMORE:
pstate.setState(HASHJ_DONE); // We are done before we started.
workUpTask_->schedule(); // Make sure workUp completes the request
break;
default:
ex_assert(0, "ex_hashj_tcb::workDown() Invalid request.");
break;
} // switch (downParentEntry->downState.request)
nextRequest_++;
} // while (nextRequest_ != queue_tail)
return WORK_OK ;
}
///////////////////////////////////////////////////////////////////////////
// allocateClusters()
// Initialize the data structures for the hybrid hash join
///////////////////////////////////////////////////////////////////////////
NABoolean ex_hashj_tcb::allocateClusters() {
if ( haveAllocatedClusters_ ) {
// We can only get here when reusing (a previous HT), but the old hash table
// data is stale, thus need to clean up that old memory
if (clusterDb_) {
// Yield all surplus memory quota (i.e., above the minimum quota) back to
// the global count of unused memory, so that other BMOs may use this quota
clusterDb_->yieldAllMemoryQuota();
delete clusterDb_;
clusterDb_ = NULL;
};
// delete the buckets
if (buckets_) {
heap_->deallocateMemory(buckets_);
buckets_ = NULL;
};
}
// We use memUsagePercent of the physical memory for the hash join.
ULng32 availableMemory = memMonitor_->getPhysMemInBytes() / 100
* hashJoinTdb().memUsagePercent_;
// if quota, and it's less than avail memory, then use that lower figure
if ( hashJoinTdb().memoryQuotaMB() > 0 &&
hashJoinTdb().memoryQuotaMB() * ONE_MEG < availableMemory )
availableMemory = hashJoinTdb().memoryQuotaMB() * ONE_MEG ;
// size of inner table (including row headers and hash chains) in bytes
// This may be a very large number, max out at 8 GB and take at
// least 100 KB. Don't completely trust the optimizer ;-)
// (With available memory usually at 100MB, that's a max of 80 clusters).
NAFloat innerTableSizeF = hashJoinTdb().innerExpectedRows() *
(NAFloat)(hashJoinTdb().extRightRowLength_ + sizeof(HashTableHeader));
Int64 innerTableSize;
if (innerTableSizeF > MAX_INPUT_SIZE ) innerTableSize = MAX_INPUT_SIZE ;
else
innerTableSize =
MAXOF( MIN_INPUT_SIZE, (Int64)innerTableSizeF);
// we start with 4 buckets per cluster
ULng32 bucketsPerCluster = 4;
ULng32 noOfClusters = 0;
// Cross Product needs only one cluster / one bucket
if ( rightSearchExpr_ == NULL ) {
bucketsPerCluster = 1;
noOfClusters = 1;
}
else if ( hashJoinTdb().isUniqueHashJoin() ) {} // UHJ uses zero clusters
else {
// required number of buffers for inner table
ULng32 totalInnerBuffers =
(ULng32)(innerTableSize/hashJoinTdb().hashBufferSize_);
if (innerTableSize % hashJoinTdb().hashBufferSize_)
totalInnerBuffers++;
// total number of buffers available
ULng32 totalBuffers = availableMemory/hashJoinTdb().hashBufferSize_;
noOfClusters = totalInnerBuffers/totalBuffers;
if (totalInnerBuffers % totalBuffers)
noOfClusters++;
// Aim at an average cluster's size to be a quarter of the available memory
// to leave some slack for underestimating or data skews
noOfClusters *= 4;
// Let the compiler force a higher number of clusters
noOfClusters = MAXOF(noOfClusters, hashJoinTdb().numClusters() );
// round it up to the nearest prime number (2,3,5,7,11,....)
noOfClusters = ClusterDB::roundUpToPrime(noOfClusters);
// the extreme case, each cluster has only one bucket and only one buffer
ULng32 maxNoOfClusters = totalBuffers/bucketsPerCluster;
noOfClusters = MINOF(noOfClusters, maxNoOfClusters);
}
// total number of buckets
bucketCount_ = bucketsPerCluster * noOfClusters;
// allocate the buckets
ULng32 bucketIdx = 0;
if ( bucketCount_ )
buckets_ = (Bucket*)heap_->allocateMemory(bucketCount_ * sizeof(Bucket));
for(bucketIdx = 0; bucketIdx < bucketCount_; bucketIdx++)
buckets_[bucketIdx].init();
ULng32 minMemQuotaMB = hashJoinTdb().isPossibleMultipleCalls() ?
hashJoinTdb().memoryQuotaMB() : 0 ;
ULng32 minB4Chk = hashJoinTdb().getBmoMinMemBeforePressureCheck() * ONE_MEG;
// estimate memory needed in phase 1 (not incl. hash tables)
Float32 memEstInMbPerCpu = (Float32)(innerTableSize / ONE_MEG) ;
// Only set cross product optimizations on when there is no
// right search expression
ClusterDB::HashOperator hashOperator =
rightSearchExpr_ == NULL ? ClusterDB::CROSS_PRODUCT :
hashJoinTdb().isUniqueHashJoin() ? ClusterDB::UNIQUE_HASH_JOIN :
hashJoinTdb().isNoOverflow() ? ClusterDB::ORDERED_HASH_JOIN :
ClusterDB::HYBRID_HASH_JOIN ;
clusterDb_ = new(heap_) ClusterDB(hashOperator,
hashJoinTdb().hashBufferSize_,
workAtp_,
tdb.getExplainNodeId(),
hashJoinTdb().extRightRowAtpIndex1_,
hashJoinTdb().extRightRowAtpIndex2_,
rightSearchExpr_,
buckets_,
bucketCount_,
availableMemory,
memMonitor_,
hashJoinTdb().pressureThreshold_,
getGlobals()->castToExExeStmtGlobals(),
&rc_,
hashJoinTdb().isNoOverflow(),
FALSE, /*isPartialGroupBy*/
hashJoinTdb().minBuffersToFlush_,
hashJoinTdb().numInBatch_,
hashJoinTdb().forceOverflowEvery(),
hashJoinTdb().forceHashLoopAfterNumBuffers()
,hashJoinTdb().forceClusterSplitAfterMB(),
ioEventHandler_,
this, // the calling tcb
hashJoinTdb().scratchThresholdPct_,
hashJoinTdb().logDiagnostics(),
hashJoinTdb().bufferedWrites(),
hashJoinTdb().disableCmpHintsOverflow(),
hashJoinTdb().memoryQuotaMB(),
minMemQuotaMB,
minB4Chk, // BmoMinMemBeforePressureCheck
hashJoinTdb().getBmoCitizenshipFactor(),
hashJoinTdb().getMemoryContingencyMB(),
// to estimate the error penalty
hashJoinTdb().hjGrowthPercent(),
memEstInMbPerCpu, // estimate mem needed
0, // Hash-Table not resizable
getStatsEntry()
);
if ( !clusterDb_ || rc_ != EXE_OK ) {
if ( !clusterDb_ ) rc_ = EXE_NO_MEM_TO_EXEC; // allocate ClusterDB failed
else delete clusterDb_ ;
return(TRUE);
};
clusterDb_->setScratchIOVectorSize(hashJoinTdb().scratchIOVectorSize());
switch(hashJoinTdb().getOverFlowMode())
{
case SQLCLI_OFM_SSD_TYPE:
clusterDb_->setScratchOverflowMode(SCRATCH_SSD);
break;
case SQLCLI_OFM_MMAP_TYPE:
clusterDb_->setScratchOverflowMode(SCRATCH_MMAP);
break;
default:
case SQLCLI_OFM_DISK_TYPE:
clusterDb_->setScratchOverflowMode(SCRATCH_DISK);
break;
}
#ifndef __EID
clusterDb_->setBMOMaxMemThresholdMB(hashJoinTdb().getBMOMaxMemThresholdMB());
#endif
bucketIdx = 0;
Cluster * cluster = NULL;
ULng32 i;
// allocate the (inner) clusters
for (i = 0; i < noOfClusters; i++) {
cluster = new(heap_) Cluster(Cluster::IN_MEMORY,
clusterDb_,
&buckets_[bucketIdx],
bucketsPerCluster,
hashJoinTdb().extRightRowLength_,
hashJoinTdb().useVariableLength(),
hashJoinTdb().considerBufferDefrag(),
hashJoinTdb().extRightRowAtpIndex1_,
TRUE,
FALSE, // for now only left joins
cluster,
&rc_);
if ( !cluster || rc_ != EXE_OK ) {
if ( !cluster ) rc_ = EXE_NO_MEM_TO_EXEC; // could not allocate Cluster
else delete cluster;
return(TRUE);
};
bucketIdx += bucketsPerCluster;
};
// store head of the cluster list in the clusterDb_
clusterDb_->setClusterList(cluster);
haveAllocatedClusters_ = TRUE;
return(FALSE);
};
//////////////////////////////////////////////////////////////
//
// isFirst
//
// Determines if a request is the first in a series, the hash
// table to be reused by subsequent requests.
//
// KBC: should we also look to see that the next request
// isn't canceled too? (change to a loop, stop
// on the first non-cancelled with our reliesOnIdx -> true
// or first reliesOnIdx != ix -> false)
//
//////////////////////////////////////////////////////////////
NABoolean ex_hashj_tcb::isFirst(queue_index ix,
ex_hashj_private_state &pstate) {
return ((pstate.reliesOnIdx == ix) &&
(parentQueue_.down->entryExists(ix+1)) &&
(((ex_hashj_private_state*)parentQueue_.down->
getQueueEntry(ix+1)->pstate)->reliesOnIdx == ix));
}
//////////////////////////////////////////////////////////////
//
// isLast
//
// Determines if a request is the last in a series (or a
// singleton)
//
//////////////////////////////////////////////////////////////
NABoolean ex_hashj_tcb::isLast(queue_index ix,
ex_hashj_private_state &pstate) {
return ( (! parentQueue_.down->entryExists(ix+1)) ||
(((ex_hashj_private_state*)parentQueue_.down->
getQueueEntry(ix+1)->pstate)->reliesOnIdx != pstate.reliesOnIdx));
}
//////////////////////////////////////////////////////////////
//
// propagateCancel
//
// propagate the cancellation according to the state of the
// request and reuse situation
//
//////////////////////////////////////////////////////////////
void ex_hashj_tcb::propagateCancel(queue_index ix,
ex_hashj_private_state &pstate) {
// This wasn't started... Just end it
if (pstate.getState() == HASHJ_EMPTY)
pstate.setState(HASHJ_DONE);
else {
// Propagate cancel down to the left
leftQueue_.down->cancelRequestWithParentIndex(ix);
pstate.setState(HASHJ_CANCELED); // Middle, last or not reuse
if(pstate.getPhase() == PHASE_1 ) {
if ( isReuse() ) {
if ( isFirst(ix, pstate)) {
// If first, then we can't cancel the right request, since
// subsequent requests might expect it.
pstate.setState(HASHJ_CANCEL_AFTER_INNER);
} else if ( isLast(ix,pstate) ) {
// Singletons and last entries are done, and can
// finish the cancel by propagating to the right
if(pstate.usingPreviousHT()) {
if(pstate.delayedLeftRequest()) {
// In the case when we are reusing the previous HT and we
// have delayed the left request and are still in PHASE1,
// then we have not sent a request to either child. So we
// do not expect any activity on our child queues and
// therefore we would not be rescheduled. Force a
// reschedule so that the HJ can complete the cancel and
// send the Q_NO_DATA to the parent.
pstate.setState(HASHJ_DONE);
workUpTask_->schedule();
}
} else {
// If we are still in PHASE_1 and we have an outstanding
// request to the right child, then send cancel to right.
rightQueue_.down->cancelRequestWithParentIndex(ix);
// Since we cancelled the build of the HT, it can no longer
// be used by subsequent requests.
havePrevInput_ = FALSE;
}
}
} else {
rightQueue_.down->cancelRequestWithParentIndex(ix);
}
}
}
}
//////////////////////////////////////////////////////////////
//
// Work routine for cancel processing for class ex_hashj_tcb
//
//////////////////////////////////////////////////////////////
ExWorkProcRetcode ex_hashj_tcb::workCancel() {
queue_index ix = parentQueue_.down->getHeadIndex();
// Loop over all requests that have been sent down.
while (ix != nextRequest_ )
{
ex_queue_entry* downParentEntry = parentQueue_.down->getQueueEntry(ix);
const ex_queue::down_request & request = downParentEntry->downState.request;
if (request == ex_queue::GET_NOMORE) {
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
if ( pstate.state_ != HASHJ_CANCELED && pstate.state_ != HASHJ_DONE )
propagateCancel(ix, pstate);
}
ix++;
}
return WORK_OK;
}
///////////////////////////////////////////////////////////////////////////
// read a row from the right child and store it in the appropriate
// bucket/cluster
///////////////////////////////////////////////////////////////////////////
short ex_hashj_tcb::workReadInner() {
ex_queue_entry * rightEntry = rightQueue_.up->getHeadEntry();
ex_expr::exp_return_type retCode;
ex_queue_entry *pEntryDown = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) pEntryDown->pstate);
switch (rightEntry->upState.status) {
case ex_queue::Q_OK_MMORE: {
if ( isAllOrNothing_ ) { // Special case: AntiSemi HJ, no search expr
pstate.setReadRightRow(); // this request actualy read a row from the right
anyRightRowsRead_ = TRUE ; // Got a right row --> left rows are not needed !!
rightQueue_.up->removeHead(); // remove that row from the right upqueue
break;
}
isInnerEmpty_ = FALSE;
if (pstate.getOldState() != HASHJ_FLUSH_CLUSTER) {
// Calculate the hash value for the right row. If the oldstate_ is
// HASHJ_FLUSH_CLUSTER, we tried to insert a row and had to flush a
// cluster. The cluster is now flushed, but the row is not inserted
// yet. Thus, in this case the hash value is still available and we
// don't have to recalculate it.
if (! rightHashExpr_)
hashValue_ = 666;
else
{
// If we are doing MIN/MAX optimization, then accumulate the
// Min and Max values from the inner row. Results are
// accumulated in the min/max tuple of the workAtp_ (Can we
// combine this expression with the rightHashExpr_ to avoid
// some common overhead?)
if (minMaxExpr_) {
retCode = minMaxExpr_->eval(rightEntry->getAtp(), workAtp_);
if (retCode == ex_expr::EXPR_ERROR)
{
processError(rightEntry->getAtp());
return 0;
}
}
retCode = rightHashExpr_->eval(rightEntry->getAtp(), workAtp_);
if (retCode == ex_expr::EXPR_ERROR)
{
processError(rightEntry->getAtp());
return 0;
}
hashValue_ = hashValue_ & MASK31;
// checkInnerNullExpr_ used in the case of hash anti semi join (transformation
// of NOT IN subquery). The checkInnerNullExpr_ expression tests if the value of the
// right child is null. the first null encountered while reading
// from the inner table causes the operation to be canceled.
// The value 666654765 is the hash
// value of NULL and is used here as an optimization so that we do not
// execute the checkInnerNullExpr_ expression if we are sure that the inner value
// is not a NULL value
if (checkInnerNullExpr_ &&
hashValue_ == ExHDPHash::nullHashValue /*666654765*/)
{
retCode = checkInnerNullExpr_->eval(rightEntry->getAtp(), workAtp_);
switch ( retCode)
{
case ex_expr::EXPR_ERROR :
{
return processError(rightEntry->getAtp()) ;
}
case ex_expr::EXPR_TRUE :
{
propagateCancel(parentQueue_.down->getHeadIndex(), pstate);
return 0;
}
default:
break;
}
}
}
}
// set oldState_; just in case we came in from HASHJ_FLUSH_CLUSTER
pstate.setOldState(HASHJ_READ_INNER);
// determine the bucket, where the row is stored
ULng32 bucketId = hashValue_ % bucketCount_;
ComDiagsArea * da = rightEntry->getDiagsArea();
if (da)
{
if (pstate.accumDiags_)
pstate.accumDiags_->mergeAfter(*da);
else
{
pstate.accumDiags_ = da;
da->incrRefCount();
}
}
// insert the row in this bucket
if (buckets_[bucketId].insert(rightEntry->getAtp(),
rightMoveInExpr_,
hashValue_,
&rc_,
hashJoinTdb().isNoOverflow() )) {
// row is inserted, remove it from queue
rightQueue_.up->removeHead();
}
else {
if (rc_) {
// the insert caused an error
// copy the diags from right entry to parent down entry from which
// the diags will be copied later during HASHJ_ERROR state to
// parent up entry.
ex_queue_entry *pEntryDown = parentQueue_.down->getHeadEntry();
pEntryDown->copyAtp(rightEntry);
pstate.setState(HASHJ_ERROR);
return 0;
};
// Do not remove the row but set the oldstate_ and state_ accordingly
pstate.setState(HASHJ_FLUSH_CLUSTER);
}
} break;
case ex_queue::Q_NO_DATA: {
// if inner table is empty then set checkOuterNullExpr_ to NULL so that
// it does not get execute in workReadOuter() and outer NULL values don't
// get discarded.
// if (isInnerEmpty_)
// {
// checkOuterNullExpr_ = NULL;
// }
currCluster_ = clusterDb_->getClusterList(); // first cluster to process
pstate.setState(HASHJ_END_PHASE_1);
if ( isAllOrNothing_ ) { // Special case: AntiSemi HJ, no search expr
if ( ! pstate.readRightRow() ) // if this request did not read rows then
anyRightRowsRead_ = FALSE ; // clear (a possibly previously set) flag !!
}
} break;
case ex_queue::Q_SQLERROR:
{
processError(rightEntry->getAtp());
} break;
case ex_queue::Q_INVALID: {
ex_assert(0,
"ex_hashj_tcb::workReadInner() Invalid state returned by right child");
} break;
};
return 0;
};
///////////////////////////////////////////////////////////////////////////
// flush a cluster
///////////////////////////////////////////////////////////////////////////
void ex_hashj_tcb::workFlushCluster() {
ex_queue_entry *pEntryDown = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) pEntryDown->pstate);
// maybe the state was changed to HASHJ_FLUSH_CLUSTER because of
// a cluster split. In this case, there is no cluster to flush.
if (clusterDb_->getClusterToFlush()) {
if (clusterDb_->getClusterToFlush()->flush(&rc_)) {
// all I/Os are done, go back to the state were we came from
pstate.setState(pstate.getOldState());
clusterDb_->setClusterToFlush(NULL);
}
else {
if (rc_) {
// flush caused an error
pstate.setState(HASHJ_ERROR);
return;
}
}
}
else {
// no cluster to flush, return to the state where we came from
pstate.setState(pstate.getOldState());
}
};
///////////////////////////////////////////////////////////////////////////
// prepare clusters for phase 2 of the join
//
// whenever this method returns without changing the state, it means that
// we return to the scheduler to let other operators do some work.
///////////////////////////////////////////////////////////////////////////
void ex_hashj_tcb::workEndPhase1() {
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
queue_index parent_index = parentQueue_.down->getHeadIndex();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
if ( isAllOrNothing_ ) { // Special case: AntiSemi HJ, no search expr
if ( anyRightRowsRead_ ) { // got a right row -- all done, return nothing !
if ( ! hashJoinTdb().delayLeftRequest() ) { // cancel left request
leftQueue_.down->cancelRequestWithParentIndex(parent_index);
pstate.setState(HASHJ_CANCELED); // cleanup left replies
return;
}
pstate.setState(HASHJ_DONE); // this request returns nothing
return;
}
else // no right rows -- need to return all left rows !!
if ( hashJoinTdb().delayLeftRequest() ) {
// need to pass (a belated) GET_ALL request to the left child
ex_queue_entry * leftEntry = leftQueue_.down->getTailEntry();
leftEntry->downState.request = ex_queue::GET_ALL;
leftEntry->downState.requestValue = downParentEntry->downState.requestValue;
leftEntry->downState.parentIndex = parentQueue_.down->getHeadIndex();
leftEntry->passAtp(downParentEntry);
leftQueue_.down->insert();
pstate.resetDelayedLeftRequest(); // delayed no more
// From here on -- no special processing for "isAllOrNothing"
}
} // if allOrNothing_
// Is this HashJoin doing MIN/MAX optimization.
// If so, the left request was delayed until here.
else if (minMaxExpr_ && pstate.delayedLeftRequest() ) {
// We are doing min/max optimization and are done reading the rows
// from the right child and have computed the min and max values.
// Time to construct the request (including min max values) and
// send it to the left child.
// need to pass (a belated) GET_ALL request to the left child
ex_queue_entry * leftEntry = leftQueue_.down->getTailEntry();
leftEntry->downState.request = ex_queue::GET_ALL;
leftEntry->downState.requestValue = downParentEntry->downState.requestValue;
leftEntry->downState.parentIndex = parentQueue_.down->getHeadIndex();
// Fill in the toLeftAtp with the tuples coming down from the
// parent and with the computed min/max values.
toLeftChildAtp_->copyAtp(downParentEntry->getAtp());
toLeftChildAtp_->getTupp(hashJoinTdb().leftDownCriDesc_->noTuples()-1) =
workAtp_->getTupp(hashJoinTdb().minMaxValsAtpIndex_);
// Pass this ATP to the child.
leftEntry->passAtp(toLeftChildAtp_);
leftQueue_.down->insert();
// delayed no more. From here on -- no special processing
pstate.resetDelayedLeftRequest();
}
while (currCluster_) {
switch (currCluster_->getState()) {
case Cluster::FLUSHED: {
if (currCluster_->getFirstBuffer() && currCluster_->getRowsInBuffer()) {
// the cluster is in state FLUSHED but the buffer still holds
// some rows. Flush it now.
clusterDb_->setClusterToFlush(currCluster_);
pstate.setState(HASHJ_FLUSH_CLUSTER);
return;
};
// this inner cluster is all flushed; set up a matching outer cluster
currCluster_->setUpOuter(hashJoinTdb().extLeftRowLength_,
hashJoinTdb().extLeftRowAtpIndex_,
// bitMapEnable : in the following 3 cases
isLeftJoin() ||isSemiJoin() ||isAntiSemiJoin());
} break;
case Cluster::IN_MEMORY: {
if (currCluster_->getRowCount()) { // chain the cluster only if has rows
// No-overflow would force chaining (i.e., skip memory pressure check
// and return rc_ == EXE_NO_MEM_TO_EXEC if actual allocation failed.)
if ( ! currCluster_->chain( hashJoinTdb().isNoOverflow(), &rc_)) {
if (rc_) {
// chain caused an error
pstate.setState(HASHJ_ERROR);
return;
};
// the cluster is in memory, but there is not enough space
// for the hash table. Flush the cluster.
clusterDb_->setClusterToFlush(currCluster_);
pstate.setState(HASHJ_FLUSH_CLUSTER);
return;
};
// if it was a big cluster; take a break, let other operators run
if ( currCluster_->getRowCount() > 10000
&& currCluster_->getNext() ) // skip breaking for the last cluster
{
return; // go back to the scheduler with WORK_CALL_AGAIN
}
}
} break;
case Cluster::CHAINED: {
// the cluster is in memory and it is chained. nothing to do
} break;
};
// The special case of a cluster flushed after being already chained
// we only entered the loop to set up an outer cluster for this cluster.
if ( flushedChainedCluster_ ) {
flushedChainedCluster_ = FALSE;
currCluster_ = NULL ;
break;
}
// add this inner cluster's number of rows to the total
totalRightRowsRead_ += currCluster_->getRowCount();
currCluster_ = currCluster_->getNext();
}; // while ( currCluster_ )
// Handle efficiently the rare case of an empty inner table
// don't cancel if the left side has an Insert/Update/Delete operation to complete
if ( 0 == totalRightRowsRead_ && // was the inner table empty ?
onlyReturnResultsWhenInnerMatched_ &&
(!leftSideIUD()) )
{
// cancel request to outer; no rows would be returned
leftQueue_.down->cancelRequestWithParentIndex(parent_index);
pstate.setState(HASHJ_CANCELED); // this request returns nothing
return;
}
#if !defined(__EID)
// Yield memory quota (or if needed, flush another in-memory cluster
// to free more memory for phase 2) .
// first get some information about the clusters (number, sizes, etc.)
ULng32 numClusters = 0;
ULng32 numFlushed = 0;
ULng32 totalSize = 0;
ULng32 maxSize = 0;
ULng32 minSize = UINT_MAX;
if ( clusterDb_->sawPressure() ) { // only if an overflow happened
Cluster * currCl, * inMemoryCluster = NULL;
for (numClusters = 0, numFlushed = 0 , totalSize = 0,
maxSize = 0, minSize = UINT_MAX,
currCl = clusterDb_->getClusterList();
currCl ;
currCl = currCl->getNext(), ++numClusters )
if ( currCl->isInner() ) {
if ( currCl->getState() == Cluster::FLUSHED ) ++numFlushed ;
// pick any in-memory cluster
if ( currCl->getState() == Cluster::CHAINED ) inMemoryCluster = currCl;
ULng32 clusterSizeMB =
(ULng32) (currCl->clusterSize() / ONE_MEG ) ;
totalSize += clusterSizeMB ;
if ( maxSize < clusterSizeMB ) maxSize = clusterSizeMB ;
if ( minSize > clusterSizeMB ) minSize = clusterSizeMB ;
}
// End of phase 1 - Calculate max memory quota needed, yield excess
// if not enough comfortable spare memory is left for phase 2,
// then we flush an in-memory cluster to have more memory
if ( clusterDb_->checkQuotaOrYield(numFlushed,maxSize) &&
inMemoryCluster ) { // if none, we'll do with whatever mem we have
// Flush this chained cluster, and when we return to this method make it
// so that this inner cluster would be handled like as if it was flushed
// (i.e. it would get an outer cluster)
inMemoryCluster->removeHashTable();
clusterDb_->setClusterToFlush(inMemoryCluster);
flushedChainedCluster_ = TRUE; // remember that special case
currCluster_ = inMemoryCluster ; // make it get an outer cluster later
pstate.setState(HASHJ_FLUSH_CLUSTER);
return; // go flush this cluster; return here for possible more
}
} // if saw pressure
else
// End of phase 1 - Calculate max memory quota needed, and yield the rest
clusterDb_->yieldUnusedMemoryQuota();
// Log -- end of phase 1
if ( hashJoinTdb().logDiagnostics() ) {
// Report memory quota grabbing during phase 1
if ( clusterDb_->memoryQuotaMB() > hashJoinTdb().memoryQuotaMB() ) {
char msg[512];
str_sprintf(msg, "HJ End Phase 1 (%d). GRABBED additional quota %d MB",
0, // NA_64BIT, use instance id later
clusterDb_->memoryQuotaMB() - hashJoinTdb().memoryQuotaMB());
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, tdb.getExplainNodeId());
}
if ( clusterDb_->sawPressure() ) { // only if an overflow happened
char msg[1024];
str_sprintf(msg,
"HJ End Phase 1 (%d). #inner rows: %Ld, #buckets: %d, #clusters: %d, #flushed: %d, total size %d MB, cluster size max- %d MB min- %d, variable size records: %s",
0, // NA_64BIT, use instance id later
totalRightRowsRead_,
bucketCount_ , numClusters, numFlushed,
totalSize, maxSize, minSize,
hashJoinTdb().useVariableLength() ? "y" : "n");
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, tdb.getExplainNodeId());
}
} // if logDiagnostics
#endif
// all clusters are prepared. Go on with phase 2 of the join
pstate.setPhase(PHASE_2);
// from now on, we want to charge the next phase with time
if (hashJoinStats_)
hashJoinStats_->incPhase();
if (pstate.getOldState() == HASHJ_CANCEL_AFTER_INNER) {
rightQueue_.down->cancelRequestWithParentIndex(parent_index);
pstate.setState(HASHJ_CANCELED);
} else
pstate.setState(HASHJ_READ_OUTER);
};
///////////////////////////////////////////////////////////////////////////
// read a row from the left child.
///////////////////////////////////////////////////////////////////////////
short ex_hashj_tcb::workReadOuter() {
ex_queue_entry * leftEntry = leftQueue_.up->getHeadEntry();
ex_expr::exp_return_type retCode;
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
switch (leftEntry->upState.status) {
case ex_queue::Q_OK_MMORE: {
if (pstate.getOldState() != HASHJ_FLUSH_CLUSTER) {
// Calculate the hash value for the left row. If the oldstate_ is
// HASHJ_FLUSH_CLUSTER, we tried to insert a row and had to flush a
// cluster. The cluster is now flushed, but the row is not inserted
// yet. Thus, in this case the hash value is still available and
// we don't have to recalculate it.
if (! leftHashExpr_)
hashValue_ = 666;
else
{
retCode = leftHashExpr_->eval(leftEntry->getAtp(), workAtp_);
if (retCode == ex_expr::EXPR_ERROR)
{
processError(leftEntry->getAtp());
return 0;
}
hashValue_ = hashValue_ & MASK31;
// checkOuterNullExpr_ used in the case of hash anti semi join (transformation
// of NOT IN subquery). The checkOuterNullExpr_ expression tests if the value of the
// left child is null.
// checkOuterNullExpr_ is set to NULL in workReadInner if the inner table is empty
// When checkOuterNullExpr_ is not NULL, outer NULL values are discraded
if (!isInnerEmpty_ &&
checkOuterNullExpr_ &&
hashValue_ == ExHDPHash::nullHashValue /*666654765*/)
{
retCode = checkOuterNullExpr_->eval(leftEntry->getAtp(), workAtp_);
switch (retCode)
{
case ex_expr::EXPR_TRUE:
{
leftQueue_.up->removeHead();
return 0;
}
break;
case ex_expr::EXPR_ERROR :
{
return processError(leftEntry->getAtp()) ;
}
default:
break;
}
}
}
}
// determine the bucket, where the row is stored
ULng32 bucketId = hashValue_ % bucketCount_;
// determine the corresponding outer Cluster
Cluster * oCluster = buckets_[bucketId].getOuterCluster();
// set oldState_; just in case we came in from HASHJ_FLUSH_CLUSTER
pstate.setOldState(HASHJ_READ_OUTER);
ComDiagsArea * da = leftEntry->getDiagsArea();
if (da)
{
ex_queue_entry *pEntryDown = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) pEntryDown->pstate);
if (pstate.accumDiags_)
pstate.accumDiags_->mergeAfter(*da);
else
{
pstate.accumDiags_ = da;
da->incrRefCount();
}
}
if (oCluster) {
// the outer cluster exists. Thus, the corresponding inner cluster
// is FLUSHED. Store row in the outer cluster, we probe it during
// phase 3 of the join
if (oCluster->insert(leftEntry->getAtp(),
leftMoveInExpr_,
hashValue_,
&rc_)) {
leftQueue_.up->removeHead();
}
else {
if (rc_) {
// the insert caused an error
// copy the diags from left entry to parent down entry from which
// the diags will be copied later during HASHJ_ERROR state to
// parent up entry.
ex_queue_entry *pEntryDown = parentQueue_.down->getHeadEntry();
pEntryDown->copyAtp(leftEntry);
pstate.setState(HASHJ_ERROR);
return 0;
};
// row is not inserted. This can only mean, that we have to flush
// the cluster. Do not remove the row and set state_ accordingly
pstate.setState(HASHJ_FLUSH_CLUSTER);
return 0;
};
}
else {
// no outer cluster. If the corresponding inner bucket contains rows or
// if it is a left join or an AntiSemiJoin, then probe this row. Note:
// workProbe() will take care of removing the row from the up queue
if (buckets_[bucketId].getRowCount() ||
! onlyReturnResultsWhenInnerMatched_ ) {
// set the cursor on the first matching row
Cluster * iCluster = buckets_[bucketId].getInnerCluster();
HashTable * ht = iCluster->getHashTable();
if (ht) {
if (rightSearchExpr_ == NULL) {
ht->positionSingleChain(&cursor_);
} else {
ht->position(&cursor_,
leftEntry->getAtp(),
workAtp_,
hashJoinTdb().extRightRowAtpIndex1_,
probeSearchExpr1_,
hashValue_,
doNotChainDup_,
hashJoinTdb().isReturnRightOrdered() );
}
if ((cursor_.getBeginRow() == NULL) && hashJoinStats_) {
hashJoinStats_->incEmptyChains();
}
}
else
cursor_.init();
clusterDb_->setClusterToProbe(iCluster);
pstate.setState(HASHJ_PROBE);
return 0;
};
// the row can't find a matching row, because the inner bucket is
// empty and it is not a left join. Thus, we can forget about
// this row.
leftQueue_.up->removeHead();
};
} break;
case ex_queue::Q_NO_DATA: {
// reset the isINnerEmpty flag to TRUE
isInnerEmpty_ = TRUE;
//checkOuterNullExpr_ = hashJoinTdb().checkOuterNullExpr_;
if (isRightJoin())
pstate.setState(HASHJ_RETURN_RIGHT_ROWS);
else
pstate.setState(HASHJ_END_PHASE_2);
} break;
case ex_queue::Q_SQLERROR:
{
processError(leftEntry->getAtp());
} break;
case ex_queue::Q_INVALID: {
ex_assert(0,
"ex_hashj_tcb::workReadOuter() Invalid state returned by left child");
} break;
};
return 0;
};
///////////////////////////////////////////////////////////////////////////
// prepare clusters for phase 3 of the join
///////////////////////////////////////////////////////////////////////////
void ex_hashj_tcb::workEndPhase2() {
Cluster * iCluster = clusterDb_->getClusterList();
Cluster * prevICluster = iCluster;
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
// When Reuse is applied, then there is no overflow (i.e., no outer clusters
// are kept) and the inner clusters should be kept and not released !
if ( isReuse() ) {
// still need to collect stats (first time only, not when reusing)
for ( ; ! pstate.usingPreviousHT() && iCluster ; iCluster = iCluster->getNext() )
if (hashJoinStats_)
hashJoinStats_->
addClusterStats(iCluster->getStats(hashJoinStats_->getHeap()));
pstate.setState(HASHJ_DONE);
return;
}
while (iCluster) {
Cluster * originalInnerCluster = iCluster;
Cluster * oCluster = iCluster->getOuterCluster();
if ( oCluster && (oCluster->getRowCount() || isRightJoin()) ) {
// the inner cluster is FLUSHED. If we have rows in the corresponding
// outer cluster or it is a right join, keep the clusters. If the
// outer has rows in the last buffer, flush the last buffer of the
// outer to the temporary file. Release all buffers.
if (oCluster->getFirstBuffer() &&
oCluster->getRowsInBuffer()) {
// outer cluster exists and contains rows. If there are still a
// few rows in the buffer, flush them.
clusterDb_->setClusterToFlush(oCluster);
pstate.setState(HASHJ_FLUSH_CLUSTER);
return;
};
// Then release the hashbuffers of both clusters
oCluster->releaseAllHashBuffers();
iCluster->releaseAllHashBuffers();
// switch to the next pair of clusters
prevICluster = iCluster;
iCluster = iCluster->getNext();
}
else if (isRightJoin() && !oCluster && (iCluster->getState() == Cluster::FLUSHED)) {
// This is an inner Cluster that used to have an outer cluster,
// but it was removed on a previous call to workEndPhase2()
// sinceit was empty. This method was called again because a
// subsequent iCluster had an oCluster that needed to be
// flushed. After the flush, it returns here, but this time,
// this iCluster does not have an oCluster. However, it still
// needs to be read in and chained and we need to return any
// rows that were not matched on the outer since this is a right
// join (presumably all of them since the original outer was
// empty) (Bugzilla #2603)
// No need to release the hashbuffers of the inner cluster since that
// was done on a previous call to workEndPhase2().
// switch to the next pair of clusters
prevICluster = iCluster;
iCluster = iCluster->getNext();
}
else {
// Basically no outer -- no PHASE_3 -- remove this inner cluster
if (iCluster == clusterDb_->getClusterList()) {
// the cluster to remove is the first one in the list
clusterDb_->setClusterList(iCluster->getNext());
prevICluster = iCluster->getNext();
iCluster = prevICluster;
}
else {
prevICluster->setNext(iCluster->getNext());
iCluster = prevICluster->getNext();
};
// before we release the cluster, we collect some stats about it
if (hashJoinStats_)
hashJoinStats_->
addClusterStats(originalInnerCluster->getStats(hashJoinStats_->
getHeap()));
delete originalInnerCluster;
};
// Remove an empty outer -- this cluster pair needs not PHASE_3, except
// right join which has special code to handle a NULL outer.
if ( oCluster && ! oCluster->getRowCount() ) {
// before we release the cluster, we collect some stats about it
if (hashJoinStats_)
hashJoinStats_->
addClusterStats(oCluster->getStats(hashJoinStats_->getHeap()));
delete oCluster;
if ( isRightJoin() ) originalInnerCluster->setOuterCluster(NULL);
};
}; // while (iCluster)
if (!clusterDb_->getClusterList()) {
// no clusters left. We are done with the join
pstate.setState(HASHJ_DONE);
return;
}
// End of phase 2 - Calculate max memory needed, and yield the rest of quota
clusterDb_->yieldUnusedMemoryQuota();
// Log -- end of phase 2
#if !defined(__EID)
if ( hashJoinTdb().logDiagnostics() /* && clusterDb_->sawPressure() */ ) {
// L O G
// count the number of clusters
ULng32 totalSize, maxSize, minSize;
Cluster * iCluster = clusterDb_->getClusterList();
for ( iCluster = clusterDb_->getClusterList(), totalSize = 0,
maxSize = 0, minSize = UINT_MAX ;
iCluster;
iCluster = iCluster->getNext()
)
{
Cluster * oCluster = iCluster->getOuterCluster();
ULng32 clusterSizeMB = NULL == oCluster ? 0 :
(ULng32) (oCluster->clusterSize() / (1024*1024)) ;
totalSize += clusterSizeMB ;
if ( maxSize < clusterSizeMB ) maxSize = clusterSizeMB ;
if ( minSize > clusterSizeMB ) minSize = clusterSizeMB ;
}
char msg[1024];
str_sprintf(msg,
"HJ End of Phase 2 (%d). Total outer size %d MB, cluster size max- %d MB min- %d MB, variable size records: %s",
0, // NA_64BIT, use instance id later
totalSize, maxSize, minSize,
hashJoinTdb().useVariableLength() ? "y" : "n");
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, tdb.getExplainNodeId());
}
#endif
// all clusters are prepared. Go on with phase 3 of the join
// set the first inner cluster to read
clusterDb_->setClusterToRead(clusterDb_->getClusterList());
pstate.setPhase(PHASE_3);
// from now on, we want to charge the next phase with time
if (hashJoinStats_)
hashJoinStats_->incPhase();
pstate.setState(HASHJ_READ_INNER_CLUSTER);
};
///////////////////////////////////////////////////////////////////////////
// A helper routine to reset the cluster for the case of hash loop.
// This method is called from workReadOuterCluster() and
// workReturnRightRows() (during PHASE_3).
///////////////////////////////////////////////////////////////////////////
void ex_hashj_tcb::resetClusterForHashLoop(Cluster *iCluster)
{
// reset for hash loop.
iCluster->resetForHashLoop();
// Read the next buffers from the inner cluster.
clusterDb_->setClusterToRead(iCluster);
return;
}
///////////////////////////////////////////////////////////////////////////
// A helper routine which deletes the existing cluster pair and
// gathers statistics, thus preparing for the next pair of clusters.
// Called during workReturnRightRows() (during PHASE_3) and
// workReadOuterCluster().
///////////////////////////////////////////////////////////////////////////
void ex_hashj_tcb::prepareForNextPairOfClusters(Cluster *iCluster)
{
#if !defined(__EID)
if ( hashJoinTdb().logDiagnostics() /* && clusterDb_->sawPressure() */ ) {
// L O G
Int64 currTime = NA_JulianTimestamp();
char msg[1024];
#ifdef TEMP_DISABLE
Int64 timeTook = currTime - iCluster->startTimePhase3() ;
if ( iCluster->numLoops() ) {
str_sprintf(msg,
"HJ Finished cluster (%d) in Phase 3 (%d) with %d Hash-Loop iterations in %d seconds.",
(ULng32)iCluster & 0xFFF,
(ULng32)clusterDb_ & 0xFFF,
iCluster->numLoops() + 1, // add uncounted last iter
(ULng32) ( timeTook / 1000000 ) );
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, tdb.getExplainNodeId());
}
else
{
clusterDb_->updatePhase3Time(timeTook);
}
#else
if ( iCluster->numLoops() )
{
str_sprintf(msg,
"HJ Finished cluster (%d) in Phase 3 (%d) with %d Hash-Loop iterations.",
0, // NA_64BIT, use some id later (ULng32)iCluster & 0xFFF,
0, // NA_64BIT, use some id later (ULng32)clusterDb_ & 0xFFF,
iCluster->numLoops() + 1); // add uncounted last iter
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, tdb.getExplainNodeId());
}
#endif
}
#endif
// we are done with this pair of clusters; delete them and process
// the next pair;
clusterDb_->setClusterList(iCluster->getNext());
Cluster * oCluster = iCluster->getOuterCluster();
// before we release the clusters, we collect some stats about them
if (hashJoinStats_) {
hashJoinStats_->
addClusterStats(iCluster->getStats(hashJoinStats_->getHeap()));
if ( oCluster )
hashJoinStats_->
addClusterStats(oCluster->getStats(hashJoinStats_->getHeap()));
};
delete iCluster;
if ( oCluster ) delete oCluster;
oCluster = NULL;
iCluster = clusterDb_->getClusterList();
clusterDb_->setClusterToRead(iCluster);
return;
}
///////////////////////////////////////////////////////////////////////////
// An utility routine called only by workProbe() when the left row is not
// needed any more and we should continue to the next outer/left row
// (only used there in the form "return cleanupLeftRow()" )
///////////////////////////////////////////////////////////////////////////
short ex_hashj_tcb::cleanupLeftRow() {
Cluster * iCluster = clusterDb_->getClusterToProbe();
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
outerMatchedInner_ = FALSE; // next left row yet unmatched
// if we are in PHASE_2 remove the entry from the left child
if (pstate.getPhase() == PHASE_2)
leftQueue_.up->removeHead();
else
// it is PHASE_3, we have processed another row from the outer
iCluster->getOuterCluster()->incReadCount();
// we are done with this probe
clusterDb_->setClusterToProbe(NULL);
// return to the previous state in order to read the next outer row
// (the prev state is either READ_OUTER or READ_OUTER_CLUSTER)
pstate.setState(pstate.getOldState());
return 0;
}
///////////////////////////////////////////////////////////////////////////
// probe a hash table. This can happen during PHASE_2; in this case the
// hash table is probed with a row from the left child. If the probe
// happens during PHASE_3, the hash table is probed with an "extended"
// left row (in case of cluster exchange with an "extended" right row).
// workProbe() assumes, that the cursor of the tcb points to potential
// matches in the hash table.
// If the probe happens during PHASE_2, workProbe() removes the row from
// left up queue after the matching hash table rows are examined.
// If the probe happens during PHASE_3, workProbe() expects that the
// appropriate tupp in the workAtp_ points already to the "extended" left
// row.
///////////////////////////////////////////////////////////////////////////
short ex_hashj_tcb::workProbe() {
Cluster * iCluster = clusterDb_->getClusterToProbe();
ex_assert(iCluster, "ex_hashj_tcb::workProbe() No cluster to probe");
ex_expr * beforeJoinPred, * afterJoinPred ;
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
NABoolean phase2 = pstate.getPhase() == PHASE_2 ; // else - it's phase 3
// make sure that we don't have a right tupp in the workAtp
workAtp_->getTupp(hashJoinTdb().extRightRowAtpIndex1_).setDataPointer(NULL);
// The first Atp parameter for evaluating Before and After Join predicates
// should always contain the input. In PHASE_2, it is the Atp from the left
// up-queue, and also the left row is taken from that Atp. In PHASE_3, it is
// the Atp from the parent's down queue (the left row would be taken from
// the second Atp parameter -- the workAtp).
atp_struct * inputAndPhase2LeftRowAtp = phase2 ?
leftQueue_.up->getHeadEntry()->getAtp() : downParentEntry->getAtp() ;
Cluster * oCluster = iCluster->getOuterCluster();
HashTable * hashTable = iCluster->getHashTable();
// LOOP: For this outer row, loop and probe the the inner hash table where
// multiple matches may be found
for ( HashRow * hashRow = hashTable ? hashTable->getNext(&cursor_) : NULL ;
hashRow ; // found a match ?
hashRow = hashTable->getNext(&cursor_) ) { // get next match in HT
// if parent canceled while we loop then return
if ( downParentEntry->downState.request == ex_queue::GET_NOMORE ) return 0;
// we found a potential match; move the hash table row to the workAtp_
// The expression does not expect the HashRow to be part of the row.
// Adjust the datapointer in the work atp to point beyond the HashRow.
workAtp_->getTupp(hashJoinTdb().extRightRowAtpIndex1_).
setDataPointer(hashRow->getData());
// evaluate beforeJoinPredicate
beforeJoinPred = phase2 ? beforeJoinPred1_ : beforeJoinPred2_ ;
if ( beforeJoinPred ) {
switch ( beforeJoinPred->eval(inputAndPhase2LeftRowAtp, workAtp_) ) {
case ex_expr::EXPR_ERROR : return processError(inputAndPhase2LeftRowAtp);
case ex_expr::EXPR_TRUE : break; // Before Predicate Passed !
default:
// The BeforePredicate failed, thus no match found,
// try the next match for this left row
continue ;
};
};
// the join has produced a row, the row may yet disappear via the
// afterJoinPredicate, but we should no longer null instantiate
outerMatchedInner_ = TRUE;
// if right-outer join, mark this right row
if (isRightJoin())
hashRow->setBit(TRUE);
// if this is a left outer join or semi join and if we are in a hash loop,
// the outer cluster has a bitmap. Update this bitmap now
if( oCluster && oCluster->hasBitMap() ) oCluster->updateBitMap();
// At this point ASJ "failed" per this left row; try the next left row
if ( isAntiSemiJoin() ) return cleanupLeftRow();
// evaluate the afterJoinPredicate
afterJoinPred = phase2 ? afterJoinPred1_ : afterJoinPred2_;
if ( afterJoinPred ) {
switch ( afterJoinPred->eval(inputAndPhase2LeftRowAtp, workAtp_) ) {
case ex_expr::EXPR_ERROR : return processError(inputAndPhase2LeftRowAtp);
case ex_expr::EXPR_TRUE : break; // After Predicate passed !
default:
// After Predicate failed, try the next matching inner row
numJoinedRowsRejected_++;
continue;
};
};
// we have a match, insert the row into the parent's up queue
if ( insertResult(hashRow) ) {
// expression evaluation error
return 0;
}
// Semi join need not match another inner row; go to next outer
if ( isSemiJoin() ) return cleanupLeftRow();
// If it is a GET_N request and the number was satisfied then return
if ( downParentEntry->downState.request == ex_queue::GET_N &&
downParentEntry->downState.requestValue <=
(Lng32)pstate.matchCount_ )
return 0;
// if no room for the next result in the up queue, then come back later
if (parentQueue_.up->isFull()) return 1;
} // ------ END OF LOOP: go over matching inner rows for a given outer row
// at this point: no (more) matches in the inner table . Go to the next
// left row unless it's a failed (i.e., no matches) ASJ or LJ
if ( outerMatchedInner_ || onlyReturnResultsWhenInnerMatched_ )
return cleanupLeftRow();
// Evaluate the after join predicate
afterJoinPred = phase2 ? afterJoinPred3_ : afterJoinPred4_ ;
if ( afterJoinPred ) {
// Don't allocate the row for left join
// Will use the pre-allocated nullData instead
if ( !isAntiSemiJoin() && nullInstForLeftJoinExpr_) {
// Need to nullinst cos afterJoinPred can reference a NULL-inst col.
// use the pre-allocated nullData.
workAtp_->getTupp(hashJoinTdb().instRowForLeftJoinAtpIndex_) = nullData_;
}
switch ( afterJoinPred->eval(inputAndPhase2LeftRowAtp, workAtp_) )
{
case ex_expr::EXPR_ERROR :
return processError(inputAndPhase2LeftRowAtp) ;
case ex_expr::EXPR_TRUE :
break ; // After Predicate passed !
default: // failed : on to the next left row
if (leftJoinExpr_)
workAtp_->getTupp(hashJoinTdb().instRowForLeftJoinAtpIndex_).release();
return cleanupLeftRow();
}
}
// afterJoinPredicate passed (or wasn't defined) -- return result unless
// it's the MIDDLE of a Hash Loop, or the bitmap was set for this left row
if ( phase2 || // in phase 2, or
! oCluster->hasBitMap() || // phase 3, and not in a hash loop, or
// (at the last iteration of the Hash Loop, isHashLoop() becomes FALSE)
! clusterDb_->isHashLoop() && // end of Hash Loop, and
! oCluster->testBitMap() ) // the bit in bitmap was not set
{
if (insertResult(NULL)) {
// expression evaluation error
return 0;
}
}
// Now we're done with this left row - prepare for the next left row
return cleanupLeftRow();
};
///////////////////////////////////////////////////////////////////////////
// Find the unmarked right rows, null extend them
// and return to the parent.
///////////////////////////////////////////////////////////////////////////
short ex_hashj_tcb::workReturnRightRows() {
Cluster * iCluster = NULL;
// start from where we left off.
if (clusterDb_->getClusterReturnRightRows()) // either PHASE_3 or returning
// to pick up from the cluster
// I left off during PHASE_2.
iCluster = clusterDb_->getClusterReturnRightRows();
else
{
iCluster = clusterDb_->getClusterList(); // start from the beginning.
ex_assert(iCluster,
"ex_hashj_tcb::workReturnRightRows No cluster to return rows");
}
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
atp_struct * downParentEntryAtp = downParentEntry->getAtp() ;
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
while (iCluster) {
if (iCluster->getRowCount() && iCluster->isInner() &&
(iCluster->getState() != Cluster::FLUSHED) )
{
clusterDb_->setClusterReturnRightRows(iCluster);
// Found a cluster with rows
// Get the hash table to access the rows
HashTable * hashTable = iCluster->getHashTable();
hashTable->position(&cursor_);
ex_assert (hashTable,
"ex_hashj_tcb::workReturnRightRows - hashtable must exist");
// If it's an unmatched row; a row for which a match was not
// found during the probe of an left/outer (workProbe()), then
// null extend for the left table and return the row
// if parent canceled while we loop then return
if ( downParentEntry->downState.request == ex_queue::GET_NOMORE )
return 0;
for ( HashRow * hashRow = hashTable->getNext(&cursor_);
hashRow ; // all rows in the inner (right) table
hashRow = hashTable->getNext(&cursor_) ) // get next HT match
{
ex_queue_entry * upParentEntry = parentQueue_.up->getTailEntry();
//ex_expr::exp_return_type retCode;
atp_struct * atp = NULL;
ULng32 bucketId = hashRow->hashValue() % bucketCount_;
if ( buckets_[bucketId].getInnerCluster() != iCluster) {
bucketId = hashRow->hashValue() % bucketCount_;
}
if ( buckets_[bucketId].getInnerCluster() == iCluster &&
!hashRow->bitSet()) // if row was unmatched
{
// first, we have to copy the input to this node
upParentEntry->copyAtp(downParentEntryAtp);
atp = upParentEntry->getAtp();
// The max length of the right row.
UInt32 rightRowLength = hashJoinTdb().rightRowLength_;
if(rightRowLength) {
if(hashJoinTdb().useVariableLength()) {
// get the actual length from the HashRow.
rightRowLength = hashRow->getRowLength();
}
resultPool_->moveIn(workAtp_,
hashJoinTdb().rightRowAtpIndex_,
rightRowLength,
hashRow->getData(),
true,
hashJoinTdb().bufferSize_);
atp->getTupp(hashJoinTdb().returnedRightRowAtpIndex_) =
workAtp_->getTupp(hashJoinTdb().rightRowAtpIndex_);
}
if (leftJoinExpr_)
{
SqlBufferHeader::moveStatus ms =
resultPool_->moveIn(downParentEntryAtp,
workAtp_,
hashJoinTdb().instRowForLeftJoinAtpIndex_,
hashJoinTdb().instRowForLeftJoinLength_,
leftJoinExpr_,
true,
hashJoinTdb().bufferSize_);
if (ms == SqlBufferHeader::MOVE_ERROR)
{
processError(downParentEntryAtp);
return 1;
}
}
// Null instantiate the left row.
if (nullInstForRightJoinExpr_) {
// Use the pre-allocated nullData.
workAtp_->getTupp(hashJoinTdb().instRowForRightJoinAtpIndex_) = nullData_;
}
// Now that we have processed the row -
// either sending the row to the parent or
// discarding the row because the row
// did not satisfy the afterJoinPred5_, set the bit.
hashRow->setBit(TRUE);
NABoolean afterJoinPredPassed = TRUE;
if (afterJoinPred5_)
{
afterJoinPredPassed = FALSE;
ex_expr::exp_return_type retCode;
retCode = afterJoinPred5_->eval(downParentEntryAtp,
workAtp_);
switch (retCode)
{
case ex_expr::EXPR_ERROR :
return processError(downParentEntryAtp) ;
case ex_expr::EXPR_TRUE: // predicate passed
afterJoinPredPassed = TRUE;
break;
default: // predicate failed
break;
}
} // (afterJoinPred5_)
if (afterJoinPredPassed == TRUE)
{
// add the right row to the result ONLY if the
// afterJoinPredicate5 - for now is satisfied
if (leftJoinExpr_ )
atp->getTupp(hashJoinTdb().returnedInstRowForLeftJoinAtpIndex_) =
workAtp_->getTupp(hashJoinTdb().instRowForLeftJoinAtpIndex_);
// and add the left row (null instantiated) to the result.
if (nullInstForRightJoinExpr_)
atp->getTupp(hashJoinTdb().returnedInstRowForRightJoinAtpIndex_) =
workAtp_->getTupp(hashJoinTdb().instRowForRightJoinAtpIndex_);
upParentEntry->upState.status = ex_queue::Q_OK_MMORE;
upParentEntry->upState.parentIndex = downParentEntry->downState.parentIndex;
upParentEntry->upState.downIndex = parentQueue_.down->getHeadIndex();
// we got another result row
pstate.matchCount_++;
upParentEntry->upState.setMatchNo(pstate.matchCount_);
// insert into parent up queue
parentQueue_.up->insert();
if (bmoStats_)
bmoStats_-> incActualRowsReturned();
// we can forget about the row in the workAtp
releaseResultTupps();
// if no room for the next result in the up queue,
// then come back later
if (parentQueue_.up->isFull())
return 1;
}
} // (!hashRow->bitSet()) // if the row was unmatched
} // for ( HashRow * hashRow..
} // (iCluster->getRowCount() && iCluster->isInner())
// Unlike PHASE_2, for PHASE_3 - there is only one cluster.
// Hence, move on to the next cluster only if it's PHASE_2.
// Else break;
if (pstate.getPhase() == PHASE_2)
iCluster =iCluster->getNext();
else // PHASE_3
break;
} // while (iCluster)
if (pstate.getPhase() == PHASE_2)
pstate.setState(HASHJ_END_PHASE_2);
else
{
// pstate.getPhase() == PHASE_3
if (clusterDb_->isHashLoop())
{
resetClusterForHashLoop(iCluster);
pstate.setState(HASHJ_READ_INNER_CLUSTER);
}
else
{
// we are done with this pair of clusters; delete them and process
// the next pair;
prepareForNextPairOfClusters(iCluster);
pstate.setState(HASHJ_READ_INNER_CLUSTER);
} // else - clusterDb_->isHashLoop()
} // else - Phase_ == PHASE_2
// do not leave allocated tupps for the next cluster
releaseResultTupps();
clusterDb_->setClusterReturnRightRows(NULL);
if (bmoStats_)
bmoStats_->setSpaceBufferCount(resultPool_->get_number_of_buffers());
return 0; // success
};
///////////////////////////////////////////////////////////////////////////
// read an inner cluster
///////////////////////////////////////////////////////////////////////////
void ex_hashj_tcb::workReadInnerCluster() {
Cluster * iCluster = clusterDb_->getClusterToRead();
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
if (iCluster) {
if (iCluster->read(&rc_)) {
// all I/Os are done, chain the cluster
if (!iCluster->chain(TRUE, &rc_)) {
// chaining the cluster failed.
pstate.setState(HASHJ_ERROR);
return;
};
if ( iCluster->getOuterCluster() ) {
// now read a buffer from outer
pstate.setState(HASHJ_READ_BUFFER);
clusterDb_->setOuterClusterToRead(iCluster->getOuterCluster(), &rc_);
if (rc_) { // memory allocation error
pstate.setState(HASHJ_ERROR);
return;
};
}
else {
// Special case: Right join with no outer cluster
// ( right join PHASE_3 is handled by workReadOuterCluster() )
pstate.setState(HASHJ_READ_OUTER_CLUSTER);
clusterDb_->setClusterToRead(NULL);
}
}
else {
if (rc_) {
// the read caused an error
pstate.setState(HASHJ_ERROR);
return;
};
}
}
else {
// no inner Cluster to read, we are done with the join
pstate.setState(HASHJ_DONE);
};
};
///////////////////////////////////////////////////////////////////////////
// read ONE buffer of an outer cluster
///////////////////////////////////////////////////////////////////////////
void ex_hashj_tcb::workReadBuffer() {
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
Cluster * oCluster = clusterDb_->getClusterToRead();
if (oCluster->read(&rc_)) {
// the I/O is complete, position on the first row of the buffer
oCluster->positionOnFirstRowInFirstOuterBuffer();
pstate.setState(HASHJ_READ_OUTER_CLUSTER);
clusterDb_->setClusterToRead(NULL);
}
else {
if (rc_) {
// the read caused an error
pstate.setState(HASHJ_ERROR);
return;
}
}
};
///////////////////////////////////////////////////////////////////////////
// read one buffer from an outer cluster and probe for PHASE_3.
///////////////////////////////////////////////////////////////////////////
void ex_hashj_tcb::workReadOuterCluster() {
Cluster * iCluster = clusterDb_->getClusterList();
Cluster * oCluster = iCluster->getOuterCluster();
HashRow * row = (HashRow *)( oCluster ? oCluster->advance() : NULL );
NABoolean moreBuffersToRead = oCluster ? ! oCluster->endOfCluster() : FALSE;
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
while (row) {
if (isSemiJoin() &&
oCluster->hasBitMap() &&
oCluster->testBitMap()) {
// Skip all matching rows that have been returned in a
// a prior hash loop indicated by a bit set in bitMap.
// Apparently testBitMap will be always FALSE in the first
// round of outer cluster loop.
row = (HashRow *)(oCluster->advance());
oCluster->incReadCount();
continue;
}
// we found another outer row to probe, put it in the correct tupp
// of the workAtp_
// The expression does not expect the HashRow to be part of the row.
// Adjust the datapointer in the work atp to point beyond the HashRow.
workAtp_->getTupp(hashJoinTdb().extLeftRowAtpIndex_).
setDataPointer(row->getData());
// get the inner clusters hash table and position on the first
// potential match
HashTable * ht = iCluster->getHashTable();
if (rightSearchExpr_ == NULL) {
ht->positionSingleChain(&cursor_);
} else {
ht->position(&cursor_,
workAtp_,
workAtp_,
hashJoinTdb().extRightRowAtpIndex1_,
probeSearchExpr2_,
row->hashValue(),
doNotChainDup_);
}
if (cursor_.getBeginRow() == NULL && onlyReturnResultsWhenInnerMatched_ ) {
// Not matched. Get the next row.
row = (HashRow *)(oCluster->advance());
oCluster->incReadCount();
continue;
}
// Prepare to probe and eval before/after join predicates.
// Exit the while loop.
clusterDb_->setClusterToProbe(iCluster);
pstate.setState(HASHJ_PROBE);
return;
}; // while more rows in the current buffer.
// All rows in the current buffer have been read.
// Switch to the next buffer or cluster pair.
if ( moreBuffersToRead ) {
// Read the next buffer(s) in the outer cluster.
clusterDb_->setOuterClusterToRead(oCluster, &rc_); // ignore rc_
pstate.setState(HASHJ_READ_BUFFER);
return;
};
// no more buffers from the outer cluster;
// Return the right rows if it is a right join for this buffer
// in the case of hashLoop and cluster otherwise.
if(isRightJoin())
{
clusterDb_->setClusterReturnRightRows(iCluster);
pstate.setState(HASHJ_RETURN_RIGHT_ROWS);
return;
}
// if we are in a hash loop, we have to prepare the clusters for the
// next loop.
if (clusterDb_->isHashLoop()) {
resetClusterForHashLoop(iCluster);
pstate.setState(HASHJ_READ_INNER_CLUSTER);
return;
};
// we are done with this pair of clusters; delete them and process
// the next pair;
prepareForNextPairOfClusters(iCluster);
pstate.setState(HASHJ_READ_INNER_CLUSTER);
return;
};
///////////////////////////////////////////////////////////////////////////
// we are done with the request. Do some cleanup
///////////////////////////////////////////////////////////////////////////
void ex_hashj_tcb::workDone() {
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
ex_queue_entry * upParentEntry = parentQueue_.up->getTailEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
queue_index indexOfParent = parentQueue_.down->getHeadIndex();
// all rows returned, tell parent that we don't have data left
upParentEntry->upState.status = ex_queue::Q_NO_DATA;
upParentEntry->upState.parentIndex = downParentEntry->downState.parentIndex;
upParentEntry->upState.downIndex = indexOfParent ;
upParentEntry->upState.setMatchNo(pstate.matchCount_);
parentQueue_.up->insert();
// remove the last rows (Q_NO_DATA) from the childrens queue
if (!leftQueue_.up->isEmpty() &&
leftQueue_.up->getHeadEntry()->upState.parentIndex == indexOfParent )
leftQueue_.up->removeHead();
if ( ! pstate.usingPreviousHT() ) { // so we may have rows in the right queue
if ( ! rightQueue_.up->isEmpty() &&
rightQueue_.up->getHeadEntry()->upState.parentIndex == indexOfParent )
rightQueue_.up->removeHead();
}
// initialize back to HASHJ_EMPTY, PHASE_1,
// accumDiags_=NULL, matchCount_=0, usingPreviousHT=FALSE, readRightRow=FALSE
pstate.initState();
ex_assert(nextRequest_ != indexOfParent ,
"ex_hashj_tcb::workDone() expected nextRequest++ already");
parentQueue_.down->removeHead();
if(!hashJoinTdb().isUniqueHashJoin())
releaseResultTupps();
#if !defined(__EID)
if ( hashJoinTdb().logDiagnostics() &&
!hashJoinTdb().isUniqueHashJoin()
/* && clusterDb_->sawPressure() */ ) {
// L O G
char msg[1024];
#ifdef TEMP_DISABLE
if ( clusterDb_->numClustersNoHashLoop() ) {
str_sprintf(msg,
"HJ Finished Phase 3 (%d). Time (sec): total %d (w/o HL) - max %d min %d avg %d (%d clusters)",
(ULng32)clusterDb_ & 0xFFF,
(ULng32)(clusterDb_->totalPhase3TimeNoHL() / 1000000),
(ULng32)(clusterDb_->maxPhase3Time()/ 1000000),
(ULng32)(clusterDb_->minPhase3Time()/ 1000000),
(ULng32)(clusterDb_->totalPhase3TimeNoHL() / 1000000)
/ clusterDb_->numClustersNoHashLoop() ,
clusterDb_->numClustersNoHashLoop()
);
}
else {
str_sprintf(msg,
"HJ Finished Phase 3 (%d) - All clusters used Hash-Loop!",
(ULng32)clusterDb_ & 0xFFF );
}
#else
str_sprintf(msg,
"HJ Finished Phase 3 (%d)",
0 // NA_64BIT, use instance id later
);
#endif
// log an EMS event and continue
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg, tdb.getExplainNodeId());
}
#endif
// When Reuse is applied the inner clusters/buckets should be kept!
if ( ! isReuse() ) {
if(hashJoinTdb().isUniqueHashJoin())
{
((ExUniqueHashJoinTcb *)this)->freeResources();
}
haveAllocatedClusters_ = FALSE;
// In the normal case, nothing else needs to be cleaned up. The
// clusters were deleted right after they were processed. This also
// deleted all buffers associated with a cluster
// But in case of a cancel or a GET_N, there might be still
// some clusters around. Delete them now!
if (clusterDb_) {
// Yield all memory quota back to global count of unused memory, so that
// other BMOs may use this quota
clusterDb_->yieldAllMemoryQuota(); // yield all of the alloc memory
delete clusterDb_;
clusterDb_ = NULL;
};
// delete the buckets
if (buckets_) {
heap_->deallocateMemory(buckets_);
buckets_ = NULL;
};
totalRightRowsRead_ = 0;
}
// reset data members, in case we are called again for a new request.
outerMatchedInner_ = FALSE;
hashValue_ = 0;
};
ex_hashj_private_state::ex_hashj_private_state()
{
initState();
}
ex_hashj_private_state::~ex_hashj_private_state() // destructor
{
}
ex_tcb_private_state * ex_hashj_private_state::allocate_new(const ex_tcb *tcb)
{
return new(((ex_tcb *)tcb)->getSpace()) ex_hashj_private_state();
};
//////////////////////////////////////////////////////////////////////////////
//
// TCB procedures for the Unique Hash Join.
//
//////////////////////////////////////////////////////////////////////////////
// Register the Unique Hash Join substasks with the scheduler
//
void ExUniqueHashJoinTcb::registerSubtasks()
{
ExScheduler *sched = getGlobals()->getScheduler();
ex_queue_pair pQueue = getParentQueue();
// down queues are handled by workDown()
// up queues are handled by workUp()
// cancellations are handled by workCancel()
sched->registerInsertSubtask(sWorkDown, this, pQueue.down, "DN");
sched->registerCancelSubtask(sCancel, this, pQueue.down, "CA");
sched->registerUnblockSubtask(sWorkUp, this, pQueue.up, "UP");
// We need to schedule workUp from workDown if we see a GET_NOMORE
// in workDown in case nothing else scheules workUp.
workUpTask_ = sched->registerNonQueueSubtask(sWorkUp, this, "UP");
// register events for child queues
sched->registerUnblockSubtask(sWorkDown,this, leftQueue_.down);
sched->registerInsertSubtask(sWorkUp, this, leftQueue_.up);
sched->registerUnblockSubtask(sWorkDown,this, rightQueue_.down);
sched->registerInsertSubtask(sWorkUp, this, rightQueue_.up);
}
// Free resources allocated by the TCB
//
void ExUniqueHashJoinTcb::freeResources()
{
if(bufferHeap_)
{
delete bufferHeap_;
bufferHeap_ = NULL;
hashTable_ = NULL;
bufferPool_ = NULL;
chainedBufferPool_ = NULL;
availRows_ = 0;
totalRightRowsRead_ = 0;
}
}
//
// Constructor for ExUniqueHashJoinTcb
//
ExUniqueHashJoinTcb::ExUniqueHashJoinTcb(const ex_hashj_tdb & hashJoinTdb,
const ex_tcb & leftChildTcb,
const ex_tcb & rightChildTcb,
ex_globals * glob)
: ex_hashj_tcb(hashJoinTdb, leftChildTcb, rightChildTcb, glob),
hashTable_(NULL),
bufferSize_(hashJoinTdb.hashBufferSize_),
extRowSize_(hashJoinTdb.extRightRowLength_),
rowSize_((Lng32)hashJoinTdb.rightRowLength_),
returnedRightRowAtpIndex_(hashJoinTdb.returnedRightRowAtpIndex_),
availRows_(0),
bufferPool_(NULL),
chainedBufferPool_(NULL),
bufferHeap_(NULL)
{
}
///////////////////////////////////////////////////////////////////////////////
// Destructor for ExUniqueHashJoinTcb
///////////////////////////////////////////////////////////////////////////////
ExUniqueHashJoinTcb::~ExUniqueHashJoinTcb()
{
freeResources();
}
///////////////////////////////////////////////////////////////////////////////
// workUp()
///////////////////////////////////////////////////////////////////////////////
ExWorkProcRetcode ExUniqueHashJoinTcb::workUp()
{
// Check if there is still work to do
if (parentQueue_.down->isEmpty())
return WORK_OK;
// A hash join workUp never works on more than one parent request at a time
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
const ex_queue::down_request & request = downParentEntry->downState.request;
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
// loop forever, exit via return
while (TRUE) {
// if we have already given to the parent all the rows needed then
// cancel the parent's request.
if ( (request == ex_queue::GET_N) &&
(downParentEntry->downState.requestValue <= (Lng32)pstate.matchCount_) &&
(pstate.getState() != HASHJ_CANCELED) &&
(pstate.getState() != HASHJ_DONE) )
{
propagateCancel(parentQueue_.down->getHeadIndex(), pstate);
}
switch (pstate.getState())
{
case HASHJ_READ_OUTER:
{
if (leftQueue_.up->isEmpty() || parentQueue_.up->isFull())
{
// no work to do or no free entry, come back later
return WORK_OK;
}
// Read from the outer, probe the hash table and potentially
// return a row.
// Can return WORK_OK or WORK_POOL_BLOCK, it which case we
// must return
//
short rc = workReadOuter();
if (rc != WORK_OK)
return rc;
break;
}
case HASHJ_READ_INNER:
case HASHJ_CANCEL_AFTER_INNER:
{
if (rightQueue_.up->isEmpty())
{
return WORK_OK;
}
// If we haven't already allocated a hash table for this
// request, then do so now
//
if ( ! pstate.usingPreviousHT() && ! pstate.getHaveClusters() )
{
if ( allocateBufferHeap() )
{
pstate.setState(HASHJ_ERROR);
break;
}
pstate.setHaveClusters(TRUE); // now the clusters are allocated
}
// If there are no more avail able rows, check again in case
// we can add more due to variable size rows.
if(availRows_ == 0 && bufferPool_) {
availRows_ = bufferPool_->castToSerial()->getRemainingNumFullRows();
}
UInt32 defragLength = 0;
// If we do not have any rows available in the current HashBuffer
// allocate a new one (or the first one).
//
if(availRows_ == 0)
{
if (hashJoinTdb().considerBufferDefrag() &&
bufferPool_ &&
(defragLength=computeDefragLength()) >0)
{
//availaRows is 0 but we can still fit at least one row after computing the actual rowa
// row size. set the availRows_ to one so we can processed it in the workReadInner function
availRows_ = 1;
}
else
{
HashBuffer * newBuffer = NULL;
if ( clusterDb_->enoughMemory( bufferSize_ ) ) // got mem ?
newBuffer =
new(bufferHeap_, FALSE) HashBuffer(bufferSize_,
extRowSize_,
hashJoinTdb().useVariableLength(),
bufferHeap_,
clusterDb_,
&rc_);
if (!newBuffer || rc_)
{
// If we couldn't allocate, or not enough mem, we give up
// If newBuffer was allocated, it will be deleted
// by deleted the whole bufferHeap_ in DONE.
rc_ = EXE_NO_MEM_FOR_IN_MEM_JOIN;
pstate.setState(HASHJ_ERROR);
freeResources(); // free memory to allocate DiagsCondition
break;
}
else
{
// we got a new buffer. Chain it into the list of buffers
newBuffer->setNext(bufferPool_);
bufferPool_ = newBuffer;
availRows_ = newBuffer->castToSerial()->getMaxNumFullRows();
if (hashJoinStats_)
{
hashJoinStats_
->updMemorySize(bufferHeap_->getAllocSize());
}
}
}
}
// Read rows from the inner side and put them into the bufferPool.
//
if(workReadInner(defragLength))
return WORK_OK;
break;
}
case HASHJ_END_PHASE_1:
{
ex_queue_entry *pEntryDown = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) pEntryDown->pstate);
queue_index parent_index = parentQueue_.down->getHeadIndex();
// Is this HashJoin doing MIN/MAX optimization. If so, the
// left request was delayed until here.
if (minMaxExpr_ && pstate.delayedLeftRequest() ) {
// We are doing min/max optimization and are done reading
// the rows from the right child and have computed the min
// and max values. Time to construct the request
// (including min max values) and send it to the left
// child.
// need to pass (a belated) GET_ALL request to the left child
ex_queue_entry * leftEntry = leftQueue_.down->getTailEntry();
leftEntry->downState.request = ex_queue::GET_ALL;
leftEntry->downState.requestValue = downParentEntry->downState.requestValue;
leftEntry->downState.parentIndex = parentQueue_.down->getHeadIndex();
// Fill in the toLeftAtp with the tuples coming down from
// the parent and with the computed min/max values.
toLeftChildAtp_->copyAtp(downParentEntry->getAtp());
toLeftChildAtp_->getTupp(hashJoinTdb().leftDownCriDesc_->noTuples()-1) =
workAtp_->getTupp(hashJoinTdb().minMaxValsAtpIndex_);
leftEntry->passAtp(toLeftChildAtp_);
leftQueue_.down->insert();
// delayed no more. From here on -- no special processing
pstate.resetDelayedLeftRequest();
}
if(workChainInner())
{
// Failed to allocate the hash table.
pstate.setState(HASHJ_ERROR);
break;
}
if(bufferPool_)
{
// Still rows to chain
// Return to scheduler with CALL AGAIN
//
return WORK_CALL_AGAIN;
}
// Handle efficiently the rare case of an empty inner table
// don't cancel if the left side has an Insert/Update/Delete operation to complete
if ( 0 == totalRightRowsRead_ && // was the inner table empty ?
!leftSideIUD()
// Uncomment below if UHJ is changed to handle Left-Join or ASJ
// && onlyReturnResultsWhenInnerMatched_
)
{
// cancel request to outer; no rows would be returned
leftQueue_.down->cancelRequestWithParentIndex(parent_index);
pstate.setState(HASHJ_CANCELED); // this request returns nothing
break;
}
if (pstate.getOldState() == HASHJ_CANCEL_AFTER_INNER)
{
rightQueue_.down->cancelRequestWithParentIndex(parent_index);
pstate.setState(HASHJ_CANCELED);
}
else
{
pstate.setState(HASHJ_READ_OUTER);
pstate.setPhase(PHASE_2);
// from now on, we want to charge the next phase with time
if (hashJoinStats_)
hashJoinStats_->incPhase();
// Leave Q_NO_DATA in queue. Will be removed in DONE
}
break;
}
case HASHJ_DONE:
{
// Need to return Q_NODATA, so make sure that we have a free
// slot in the parent's up queue
//
if (parentQueue_.up->isFull())
{
return WORK_OK;
}
workDone();
// That's it, need to be called again if there are more requests
//
if (parentQueue_.down->isEmpty())
return WORK_OK;
else
return WORK_CALL_AGAIN;
break;
}
case HASHJ_CANCELED:
{
// the request was canceled. Both children were sent cancel requests.
// Consume all up rows from the children and wait for Q_NO_DATA.
//
NABoolean leftDone = pstate.delayedLeftRequest() ? TRUE :
consumeForCancel(leftQueue_);
// only check the right queue if this request built its own
// hash table (not reusing)
NABoolean rightDone =
pstate.usingPreviousHT() ? TRUE : consumeForCancel(rightQueue_) ;
if (!(leftDone && rightDone))
// we are not done yet, come back again after queue inserts
return WORK_OK;
// all rows are consumed, we are done
pstate.setState(HASHJ_DONE);
break;
}
case HASHJ_ERROR:
{
// make sure that we have a free slot in the parent's up queue
if (parentQueue_.up->isFull())
{
return WORK_OK;
}
ex_assert( rc_ , "Missing error code");
// we ran into a serious runtime error. Create Condition and
// pass it to parent. rc_ has the error code.
ComDiagsArea *da = downParentEntry->getDiagsArea();
if(!da) {
da = ComDiagsArea::allocate(heap_);
downParentEntry->setDiagsArea(da);
}
if (!da->contains((Lng32) -rc_))
{
*da << DgSqlCode(-rc_);
}
processError(downParentEntry->getAtp());
break;
}
}
}
}
///////////////////////////////////////////////////////////////////////////
// allocateBufferHeap()
// Initialize the data structures for the hash join
///////////////////////////////////////////////////////////////////////////
NABoolean ExUniqueHashJoinTcb::allocateBufferHeap()
{
freeResources();
// allocate a clusterDB, to be used for memory pressure checks, and quota
if ( allocateClusters() ) return TRUE; // use original rc_ error
// Setup our own bufferHeap. We want at least 10 buffers in each
// block of this heap. Also add a few bytes to the buffer size to
// account for some memory management overhead.
bufferHeap_ = new(heap_, FALSE) NAHeap("Buffer Heap",
(NAHeap *)heap_,
10 * ((Lng32)bufferSize_));
if (!bufferHeap_)
{
rc_ = EXE_NO_MEM_FOR_IN_MEM_JOIN;
return TRUE;
}
return FALSE;
}
///////////////////////////////////////////////////////////////////////////
// read a row from the right child and store it in the hashtable
///////////////////////////////////////////////////////////////////////////
short ExUniqueHashJoinTcb::workReadInner(UInt32 defragLength)
{
ex_expr::exp_return_type retCode = ex_expr::EXPR_OK;
ex_queue_entry * rightEntry = NULL;
// A reference to the tupp to hold the extended right row.
//
tupp &workTupp = workAtp_->getTupp(hashJoinTdb().extRightRowAtpIndex1_);
ex_queue *upQueue = rightQueue_.up;
// A local copy of the number of available rows in the current Hash Buffer
//
Lng32 availRows = availRows_;
// for CIF (with variable size)when there no room to hold max size rows we try to compute
// the actual size of the row and determine if there is room for the actual size.
// computeDefragLength computes the actual row size
while(!upQueue->isEmpty() &&
(availRows > 0 ||
(defragLength > 0 &&
(defragLength= computeDefragLength())>0)))
{
rightEntry = upQueue->getHeadEntry();
// If it is some other type of entry besides Q_OK_MMORE, break
// We will handle it outside the loop.
//
if(rightEntry->upState.status != ex_queue::Q_OK_MMORE)
break;
HashRow *dataPointer;
if (defragLength == 0){
// Get an available row from the buffer.
dataPointer = bufferPool_->castToSerial()->getFreeRow();
}
else
{
dataPointer = bufferPool_->castToSerial()->getFreeRow(ROUND4(defragLength) + sizeof(HashRow));
}
// Initialize the workAtp extended right row tupp.
// The expression does not expect the HashRow to be part of the row.
// Adjust the datapointer in the work atp to point beyond the HashRow.
workTupp.setDataPointer(dataPointer->getData());
// Hash the right row, hash value goes into datamember hashValue_
//
retCode = rightHashExpr_->eval(rightEntry->getAtp(), workAtp_);
if (retCode == ex_expr::EXPR_ERROR)
break;
// If we are doing MIN/MAX optimization, then accumulate the Min
// and Max values from the inner row. Results are accumulated in
// the min/max tuple of the workAtp_
// (Can we combine this expression with the rightHashExpr_ to
// avoid some common overhead?)
if (minMaxExpr_)
{
retCode = minMaxExpr_->eval(rightEntry->getAtp(), workAtp_);
if (retCode == ex_expr::EXPR_ERROR)
break;
}
// Move the right row into the current row of the Hash Buffer.
//
if(rightMoveInExpr_)
{
if (defragLength == 0 )
{
UInt32 maxDataLen = bufferPool_->getMaxRowLength() - sizeof(HashRow);
UInt32 rowLen = extRowSize_ - sizeof(HashRow);
UInt32 *rowLenPtr = &rowLen;
retCode = rightMoveInExpr_->eval(rightEntry->getAtp(), workAtp_, 0, -1, rowLenPtr);
if (retCode == ex_expr::EXPR_ERROR)
break;
// Resize the row if the actual size is different from the max size (maxDataLen)
if(hashJoinTdb().useVariableLength()) {
bufferPool_->castToSerial()->setRowLength(dataPointer, rowLen);
if(rowLen != maxDataLen) {
bufferPool_->castToSerial()->resizeLastRow(rowLen, dataPointer);
}
}
}
else
{
str_cpy_all(dataPointer->getData(),
resultPool_->getDefragTd()->getTupleAddress(),
defragLength);
if(hashJoinTdb().useVariableLength())
{
bufferPool_->castToSerial()->setRowLength(dataPointer, defragLength);
}
#if (defined (NA_LINUX) && defined(_DEBUG) && !defined(__EID))
char txt[] = "hashjU";
sql_buffer_pool::logDefragInfo(txt,bufferPool_->getMaxRowLength(),
ROUND4(defragLength) + sizeof(HashRow),
bufferPool_->getFreeSpace(),
bufferPool_,
bufferPool_->getRowCount());
#endif
}
}
dataPointer->setHashValueRaw(hashValue_);
// One less row available in the current Hash Buffer
//
availRows--;
// row is inserted, remove it from queue
//
upQueue->removeHead();
}
totalRightRowsRead_ += (availRows_ - availRows);
if (availRows < 0)
{
availRows = 0;
}
availRows_ = availRows ;
ex_queue_entry *pEntryDown = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) pEntryDown->pstate);
// If there was an error from the expression, process the error.
//
if (retCode == ex_expr::EXPR_ERROR)
{
rc_ = (ExeErrorCode)(-rightEntry->getDiagsArea()->mainSQLCODE());
pEntryDown->copyAtp(rightEntry);
pstate.setState(HASHJ_ERROR);
return 0;
}
// If the queue is not empty, then it might be something other than
// Q_OK_MMORE. Check for this case here.
//
if(!rightQueue_.up->isEmpty())
{
ex_queue_entry * rightEntry = rightQueue_.up->getHeadEntry();
switch(rightEntry->upState.status)
{
case ex_queue::Q_NO_DATA:
{
pstate.setState(HASHJ_END_PHASE_1);
break;
}
case ex_queue::Q_SQLERROR:
{
rc_ = (ExeErrorCode)(-rightEntry->getDiagsArea()->mainSQLCODE());
pEntryDown->copyAtp(rightEntry);
pstate.setState(HASHJ_ERROR);
break;
}
case ex_queue::Q_INVALID:
{
ex_assert(0,
"ExUniqueHashJoinTcb::workReadInner() "
"Invalid state returned by right child");
break;
}
// If it was none of these then simply fall through.
// We may have run out of HashBuffer space (availRows_ == 0)
}
}
else
{
// Right Up queue is emtpy, return 1 to indicate return to scheduler.
return 1;
}
return 0;
}
NABoolean ExUniqueHashJoinTcb::workChainInner()
{
if(!hashTable_)
{
ULng32 hashEntries = (ULng32) totalRightRowsRead_;
// Allow for %50 more entries in the hash table to avoid long chains.
//
hashEntries += (hashEntries >> 1);
if ( clusterDb_->enoughMemory( hashEntries * sizeof(HashTableHeader) ) )
hashTable_ = new(bufferHeap_, FALSE) HashTable(hashEntries,
FALSE, INT_MAX);
if (!hashTable_) // not enough memory for a hash table, or new() failed
{
rc_ = EXE_NO_MEM_FOR_IN_MEM_JOIN;
freeResources(); // free memory (needed to allocate DiagsCondition)
return TRUE;
}
}
// Keep track of how many rows have been chained.
//
ULng32 numRows = 0;
HashBuffer *buffer = bufferPool_;
while (buffer)
{
HashRow *dataPointer = buffer->castToSerial()->getFirstRow();
for ( ULng32 rowIndex = 0 ;
rowIndex < buffer->getRowCount() ;
rowIndex++)
{
// Insert the right row into the hash table.
//
hashTable_->insertUniq(dataPointer);
dataPointer = buffer->castToSerial()->getNextRow(dataPointer);
}
numRows += buffer->getRowCount();
// Finished with this buffer, move it to the front of the chainBufferPool list
// and get the next buffer from the bufferPool_.
//
HashBuffer *chainedBuffer = buffer;
buffer = buffer->getNext();
chainedBuffer->setNext(chainedBufferPool_);
chainedBufferPool_ = chainedBuffer;
bufferPool_ = buffer;
// Only work so long before returning to the scheduler
//
if(numRows > 10000)
return FALSE;
}
return FALSE;
}
///////////////////////////////////////////////////////////////////////////
// read a row from the left child, probe the hash table and
// potentially return a row
///////////////////////////////////////////////////////////////////////////
short ExUniqueHashJoinTcb::workReadOuter()
{
ex_queue_entry * leftEntry = NULL;
atp_struct *leftRowAtp = NULL;
ex_expr::exp_return_type retCode = ex_expr::EXPR_OK;
// The index to the tupp that holds the extended right row.
//
short atpIndex = hashJoinTdb().extRightRowAtpIndex1_;
while(!leftQueue_.up->isEmpty() && !parentQueue_.up->isFull()) {
leftEntry = leftQueue_.up->getHeadEntry();
// If it is some other type of entry besides Q_OK_MMORE, break
// We will handle it outside the loop.
//
if(leftEntry->upState.status != ex_queue::Q_OK_MMORE)
break;
leftRowAtp = leftEntry->getAtp();
// Hash the left row, hash value goes into datamember hashValue_
//
retCode = leftHashExpr_->eval(leftRowAtp, workAtp_);
if(retCode == ex_expr::EXPR_ERROR)
break;
// Look for a matching row in the hash table.
// Expect either one row or no rows.
//
HashRow *dataPointer;
retCode = hashTable_->positionUniq(&dataPointer,
leftRowAtp,
workAtp_,
atpIndex,
probeSearchExpr1_,
hashValue_);
if(retCode == ex_expr::EXPR_TRUE)
{
// If found a match, return the composite row.
// Can return WORK_OK or WORK_POOL_BLOCKED, or
// WORK_CALL_AGAIN. For WORK_CALL_AGAIN (and WORK_OK), need
// to return to workUp, but not scheduler. For
// WORK_POOL_BLOCKED need to return all the way to the
// scheduler
//
short rc = workReturnRow(dataPointer, leftRowAtp);
if(rc != WORK_OK)
return ((rc == WORK_CALL_AGAIN) ? WORK_OK : rc);
}
else if(retCode == ex_expr::EXPR_ERROR)
break;
leftQueue_.up->removeHead();
}
// If we broke out of the loop due to an expression error,
// handle it here.
if(retCode == ex_expr::EXPR_ERROR)
{
// Since we know the parent up queue is not full, we can call
// processError() directly.
//
processError(leftRowAtp);
return WORK_OK;
}
// If we are not blocked on either of the queues, then it might be
// something other than Q_OK_MMORE. Check for this case here.
//
if(!leftQueue_.up->isEmpty() && !parentQueue_.up->isFull())
{
switch (leftEntry->upState.status)
{
case ex_queue::Q_NO_DATA:
{
ex_queue_entry *pEntryDown = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) pEntryDown->pstate);
// Leave the NO_DATA in the queue. Will be removed in DONE.
//
pstate.setState(HASHJ_DONE);
break;
}
case ex_queue::Q_SQLERROR:
{
// Since we know the parent up queue is not full, we can call
// processError() directly.
//
processError(leftEntry->getAtp());
break;
}
default:
{ // Do not expect a Q_INVALID or Q_OK_MMORE
ex_assert(0,
"ExUniqueHashJoinTcb::workReadOuter() "
"Invalid state returned by left child");
break;
}
}
}
return WORK_OK;
}
///////////////////////////////////////////////////////////////////////////
// Return a row to the parent
///////////////////////////////////////////////////////////////////////////
short ExUniqueHashJoinTcb::workReturnRow(HashRow *dataPointer,
atp_struct *leftRowAtp)
{
ex_queue_entry * upParentEntry = parentQueue_.up->getTailEntry();
ex_queue_entry * downParentEntry = parentQueue_.down->getHeadEntry();
ex_hashj_private_state & pstate =
*((ex_hashj_private_state*) downParentEntry->pstate);
// Start by passing the left row to the parent up queue entry.
//
upParentEntry->copyAtp(leftRowAtp);
// If a right row is needed.
//
if (isRightOutputNeeded_)
{
// The max length of the right row.
UInt32 size = rowSize_;
if(hashJoinTdb().useVariableLength()) {
// get the actual length from the HashRow.
size = dataPointer->getRowLength();
}
// Allocate a right row from the resultPool
// and put it in the parent up queue entry.
//
tupp &rightTupp =
upParentEntry->getAtp()->getTupp(returnedRightRowAtpIndex_);
// Allocate the tuple based on the actual length of the row;
if (resultPool_->get_free_tuple(rightTupp, size))
{
upParentEntry->getAtp()->release();
return WORK_POOL_BLOCKED;
}
// Copy the right row to the new tuple
//
str_cpy_all(rightTupp.getDataPointer(),
dataPointer->getData(),
size);
}
// Prepare the up entry.
//
upParentEntry->upState.status = ex_queue::Q_OK_MMORE;
upParentEntry->upState.parentIndex = downParentEntry->downState.parentIndex;
upParentEntry->upState.downIndex = parentQueue_.down->getHeadIndex();
// we got another result row
pstate.matchCount_++;
upParentEntry->upState.setMatchNo(pstate.matchCount_);
// insert into parent up queue
parentQueue_.up->insert();
if (bmoStats_)
bmoStats_-> incActualRowsReturned();
// If it is a GET_N request and the number was satisfied then return
NABoolean getN = (downParentEntry->downState.request == ex_queue::GET_N);
if (getN &&
downParentEntry->downState.requestValue <= (Lng32)pstate.matchCount_ )
return WORK_CALL_AGAIN;
return WORK_OK;
}
// for CIF (with variable size)when there no room to hold max size rows we try to compute
// the actual size of the row and determine if there is room for the actual size.
// computeDefragLength computes the actual row size by applying the move expression.
// the result of the move is saved in a side/temporary buffer and can be used to copy the data
// to the final buffer
UInt32 ExUniqueHashJoinTcb::computeDefragLength()
{
#if defined(_DEBUG)
assert((availRows_ == 0 || availRows_==1) && hashJoinTdb().considerBufferDefrag());
#endif
if (!hashJoinTdb().considerBufferDefrag() ||
rightQueue_.up->isEmpty())
{
return 0;
}
ex_queue_entry * rightEntry = rightQueue_.up->getHeadEntry();
if(rightEntry->upState.status != ex_queue::Q_OK_MMORE)
{
return 0;
}
UInt32 defragLength = 0;
// A reference to the tupp to hold the extended right row.
//
tupp &workTupp = workAtp_->getTupp(hashJoinTdb().extRightRowAtpIndex1_);
// need to verify the allocated size???
workTupp.setDataPointer(resultPool_->getDefragTd()->getTupleAddress());
UInt32 rowLen = extRowSize_ - sizeof(HashRow);
UInt32 *rowLenPtr = &rowLen;
if(rightMoveInExpr_)
{
ex_expr::exp_return_type retCode =
rightMoveInExpr_->eval(rightEntry->getAtp(), workAtp_, 0, -1, rowLenPtr);
if (retCode != ex_expr::EXPR_ERROR &&
rowLen != 0 &&
(ROUND4(rowLen) + sizeof(HashRow)<= bufferPool_->getFreeSpace()))
{
defragLength = rowLen;
}
}
return defragLength;
}