blob: 8e5b7a1d1c5b95746dda999f24dfba14e11ad88a [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: ex_ex.c
* Description: Methods for class ex_tcb and for assertions
*
*
* Created: 5/3/94
* Language: C++
* Status: Experimental
*
*
*
*
******************************************************************************
*/
#include "Platform.h"
#include <sys/time.h>
#include "str.h"
#include "ex_stdh.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ExStats.h"
#include "NAError.h"
#include "exp_expr.h"
#include "ExCextdecs.h"
#include "logmxevent.h"
extern Int32 gettimeofday(struct timeval *, struct timezone *);
// -----------------------------------------------------------------------
// NT Port - GSH 03/18/97
// ----------------------
// Added code to include GUI related code if building with the GUI tool
// ie. NO_NA_DEBUG_GUI is not set which in turn means NA_DEBUG_GUI is
// set. The code I added is : && defined(NA_DEBUG_GUI).
// -----------------------------------------------------------------------
#ifdef NA_DEBUG_GUI
Int32 ex_tcb::objectCount = 0;
#endif
/////////////////////////////////////////////////////////////
// class ex_tcb
/////////////////////////////////////////////////////////////
// Base class constructor
ex_tcb::ex_tcb( const ComTdb & tdb, const short in_version,
ex_globals * g) :
tdb(tdb),
nodeType_((ComTdb::ex_node_type)tdb.getNodeType()),
version_(in_version),
globals_(g),
statsEntry_(NULL),
pool_(NULL)
// QSTUFF
, holdable_(FALSE)
// QSTUFF
{
str_cpy_all( (char *) &eyeCatcher_, (char *) &(tdb.eyeCatcher_), 4);
#ifdef NA_DEBUG_GUI
increaseObjectcount();
setObjectId();
#endif
// Cleanup
g->registerTcb(this);
};
ex_tcb::~ex_tcb()
{
ex_assert(!str_cmp((char *)&eyeCatcher_, (char *)(&tdb.eyeCatcher_), 4),
"TCB points to wrong TDB");
// Change the eye catcher
str_cpy_all((char *) &eyeCatcher_, eye_FREE, 4);
};
void ex_tcb::registerSubtasks()
{
// the default implementation registers the default static
// work procedure for all events that can occur for this task
ExScheduler *sched = globals_->getScheduler();
ex_queue_pair pQueue = getParentQueue();
// register events for parent queue
ex_assert(pQueue.down && pQueue.up,"Parent down queue must exist");
sched->registerInsertSubtask(ex_tcb::sWork, this, pQueue.down,"WK");
sched->registerCancelSubtask(ex_tcb::sCancel, this, pQueue.down,"CN");
sched->registerUnblockSubtask(ex_tcb::sWork, this, pQueue.up,"WK");
// BertBert VV
// subtask to handle GET_NEXT_N events downstream.
sched->registerNextSubtask(ex_tcb::sWork, this, pQueue.down,"GN");
// BertBert ^^
// register events for child queues
Int32 nc = numChildren();
for (Int32 i = 0; i < nc; i++)
{
const ex_tcb * childTcb = getChild(i);
if (childTcb)
{
ex_queue_pair cQueue = childTcb->getParentQueue();
sched->registerUnblockSubtask(ex_tcb::sWork,this, cQueue.down);
sched->registerInsertSubtask(ex_tcb::sWork, this, cQueue.up);
}
}
if (getPool())
getPool()->setStaticMode(FALSE);
}
ExWorkProcRetcode ex_tcb::workCancel()
{
// the default cancel work method calls the regular work method
return work();
}
ExWorkProcRetcode ex_tcb::workResize()
{
queue_index newSize;
queue_index maxSize;
ex_queue *whichQueue;
// go twice through this loop, once for the up queue and once
// for the down queue
for (Int32 i = 0; i < 2; i++)
{
// set some data that depends on the up/down queue
if (i == 0)
{
// point to the up queue
whichQueue = getParentQueue().up;
maxSize = getTdb()->getMaxQueueSizeUp();
}
else
{
// point to the down queue
whichQueue = getParentQueue().down;
maxSize = getTdb()->getMaxQueueSizeDown();
}
// the following works the same for both up and down queue
if (whichQueue->needsResize())
{
// calculate the new size
newSize = whichQueue->getSize() * getTdb()->getQueueResizeFactor();
// check whether the new size exceeds the max size
if (newSize > maxSize)
{
newSize = maxSize;
}
// is the new size larger than the current size?
if (newSize > whichQueue->getSize())
{
// yes, call the queue's resize method
whichQueue->resize(this, newSize);
}
if (whichQueue->getSize() >= maxSize ||
newSize > whichQueue->getSize())
{
// Can't resize anymore, either because we exceeded
// the max size or because we're out of memory.
// Set the resize subtask to NULL so we don't repeatedly
// call this method.
whichQueue->setResizeSubtask(NULL);
}
}
}
// this method never complains (but it may not always resize the queue)
return WORK_OK;
}
Int32 ex_tcb::fixup()
{
short error = 0;
Int32 nc = numChildren();
for (Int32 i=0; i<nc; i++)
{
error = ((ex_tcb *)getChild(i))->fixup();
if (error)
break;
}
return error;
}
// this method is called to rollback to a previously set savepoint.
Int32 ex_tcb::rollbackSavepoint()
{
short error = 0;
Int32 nc = numChildren();
for (Int32 i=0; i<nc; i++)
{
error = ((ex_tcb *)getChild(i))->rollbackSavepoint();
if (error)
break;
}
return error;
}
// QSTUFF
void ex_tcb::propagateHoldable(NABoolean h)
{
setHoldable(h);
for (Int32 i=0; i<numChildren(); i++)
{
((ex_tcb *)getChild(i))->propagateHoldable(h);
}
}
// QSTUFF
NABoolean ex_tcb::reOpenTables()
{
return FALSE;
}
NABoolean ex_tcb::closeTables()
{
return FALSE;
}
unsigned short ex_tcb::getExpressionMode() const
{
if (globals_->getInjectErrorAtExpr())
return tdb.expressionMode_ | ex_expr::INJECT_ERROR;
else
return tdb.expressionMode_;
};
void ex_tcb::allocateParentQueues(ex_queue_pair &parentQueues,
NABoolean allocatePstate)
{
CollHeap *space = getGlobals()->getSpace();
const ComTdb *tdb = getTdb();
queue_index downSize = tdb->initialQueueSizeDown_;
queue_index upSize = tdb->initialQueueSizeUp_;
parentQueues.down = new(space) ex_queue(ex_queue::DOWN_QUEUE,
downSize,
tdb->criDescDown_,
space);
if (allocatePstate)
{
// Allocate the private state in each entry of the down queue
parentQueues.down->allocatePstate(this);
}
parentQueues.up = new(space) ex_queue(ex_queue::UP_QUEUE,
upSize,
tdb->criDescUp_,
space);
}
void ex_tcb::registerResizeSubtasks()
{
// Register two resize tasks with the scheduler, one for the
// down queue and one for the up queue. Do this only if there
// is potential for resizing (if the initial size is less
// than the max size for the queue). Set the resize limit.
ExScheduler *sched = globals_->getScheduler();
Lng32 resizeLimit = (Lng32) getTdb()->getQueueResizeLimit();
ex_queue *whichQueue;
whichQueue = getParentQueue().up;
if (whichQueue->getSize() < getTdb()->getMaxQueueSizeUp())
{
whichQueue->setResizeLimit(resizeLimit);
sched->registerResizeSubtask(ex_tcb::sResize,this,whichQueue,"RS");
}
whichQueue = getParentQueue().down;
if (whichQueue->getSize() < getTdb()->getMaxQueueSizeDown())
{
whichQueue->setResizeLimit(resizeLimit);
sched->registerResizeSubtask(ex_tcb::sResize,this,whichQueue,"RS");
}
}
ex_tcb_private_state * ex_tcb::allocatePstates(
Lng32 &numElems, // inout, desired/actual elements
Lng32 &pstateLength) // out, length of one element
{
// This method can't be implemented in the base class, every
// derived TCB class needs to allocate an array of its own
// pstate class and return the pointer.
// The easiest way to redefine this method is to use the template
// provided in the header file:
//
// PstateAllocator<my pstate type name goes here> pa;
// return pa.allocatePstates(this, numElems, pstateLength);
// NOTE: This is a new method that isn't implemented for
// all TCBs yet. The current policy is that this method needs
// to be implemented for a TCB if it resizes its queues dynamically.
numElems = 0;
pstateLength = sizeof(ex_tcb_private_state);
return NULL;
}
// this method find the first set of children in the child tree
// that have a valid stats area and sets their parent id to the
// input tdb id.
void ex_tcb::propagateTdbIdForStats(Lng32 tdbId)
{
Int32 nc = numChildren();
ExOperStats *stat, *currentStat;
currentStat = getStatsEntry();
for (Int32 i=0; i < nc; i++)
{
if (getChild(i))
{
stat = ((ex_tcb*)getChild(i))->getStatsEntry();
if (stat != NULL)
{
// If the parent tdb id is already set in the by the doAllocateStatsEntry at the time
// of building the tcb, donot set parentTdbId, leftChildId and rightChildId
// tdb ::build methods need to set all these Ids correctly
// If the rightChildTdb is set to -2 then this function just resets the
// right child to -1 even there is a right child (conceptually) like in ex_split_top
if (stat->getParentTdbId() == -1)
{
stat->setParentTdbId(tdbId);
}
if (currentStat && currentStat->getParentTdbId() == -1)
{
if (i == 0)
currentStat->setLeftChildTdbId(getChild(i)->getTdb()->getTdbId());
if (i == 1)
{
if (currentStat->getRightChildTdbId() == -2)
currentStat->setRightChildTdbId(-1);
else
currentStat->setRightChildTdbId(getChild(i)->getTdb()->getTdbId());
}
}
}
}
}
}
// If statistics are to be collected and an entry for this
// node doesn't already exist in stats area passed in,
// allocate one and add to it. Else, make pstate's stat pointer
// point to this entry.
void ex_tcb::allocateStatsEntry(CollHeap * heap) {
// temporarily enable stats. Done so we can build stats related
// structs, if asked for by the compiler.
NABoolean statsEnabled = getGlobals()->statsEnabled();
getGlobals()->setStatsEnabled(TRUE);
if (getGlobals()->getStatsArea()) {
// allocate a new stats entry
ExOperStats* stat = NULL;
if (needStatsEntry())
stat = doAllocateStatsEntry(getGlobals()->getStatsArea()->getHeap(),
((ComTdb *)getTdb()));
if (stat)
{
// the queue sizes from the generator are just
// estimates. Set the correct values here.
if (getParentQueue().down) {
stat->setDownQueueSize(getParentQueue().down->getSize());
stat->setUpQueueSize(getParentQueue().up->getSize());
}
else {
stat->setDownQueueSize((queue_index) 0);
stat->setUpQueueSize((queue_index) 0);
};
// add this entry to the stats area
getGlobals()->getStatsArea()->insert(stat);
setStatsEntry(stat);
}
// now allocate the stats entries of my children
Int32 nc = numChildren();
for (Int32 i = 0; i < nc; i++)
{
if (getChild(i) != NULL)
{
ex_tcb * childTcb = (ex_tcb*)getChild(i);
childTcb->allocateStatsEntry(getGlobals()->getStatsArea()->getHeap());
}
}
// assign my TDB id as parent ID to my children
if (stat && nc)
propagateTdbIdForStats(getTdb()->getTdbId());
} // stats area present
getGlobals()->setStatsEnabled(statsEnabled);
}
NABoolean ex_tcb::needStatsEntry()
{
if ((getGlobals()->getStatsArea()->getCollectStatsType() == ComTdb::ALL_STATS) ||
(getGlobals()->getStatsArea()->getCollectStatsType() == ComTdb::OPERATOR_STATS))
return TRUE;
else
return FALSE;
}
ExOperStats * ex_tcb::doAllocateStatsEntry(CollHeap *heap, ComTdb *tdb)
{
ComTdb::CollectStatsType statsType = getGlobals()->getStatsArea()->getCollectStatsType();
if (statsType == ComTdb::OPERATOR_STATS || statsType == ComTdb::ALL_STATS)
return new(heap) ExOperStats(heap,
ExOperStats::EX_OPER_STATS,
this,
tdb);
else
{
ex_assert(FALSE, "doAllocateStatsEntry is not implemented by the TCB");
return NULL;
}
}
void ex_tcb::computeNeededPoolInfo(
Int32 &numBuffs,
UInt32 &staticPoolSpaceSize,
UInt32 &dynPoolSpaceSize)
{
numBuffs = getTdb()->numBuffers_;
staticPoolSpaceSize = 0;
dynPoolSpaceSize = 0;
if (getPool())
{
getPool()->getUsedMemorySize(staticPoolSpaceSize, dynPoolSpaceSize);
dynPoolSpaceSize = numBuffs * getPool()->defaultBufferSize();
}
}
void ex_tcb::mergeStats(ExStatisticsArea *otherStats)
{
ex_globals * glob = getGlobals();
StatsGlobals *statsGlobals = glob->getStatsGlobals();
Long semId;
if (statsGlobals != NULL)
{
semId = glob->getSemId();
int error = statsGlobals->getStatsSemaphore(semId, glob->getPid());
glob->getStatsArea()->merge(otherStats);
statsGlobals->releaseStatsSemaphore(semId, glob->getPid());
}
else
glob->getStatsArea()->merge(otherStats);
}
void ex_tcb::cpuLimitExceeded()
{
ex_assert(0, "ex_tcb::cpuLimitExceeded not defined for this class.");
}
void ex_tcb::cleanup()
{
Int32 nc = numChildren();
for (Int32 i = 0; i < nc; i++)
{
ex_tcb * childTcb = (ex_tcb*)getChild(i);
if (childTcb)
{
childTcb->cleanup();
}
}
}
short ex_tcb::moveRowToUpQueue(ex_queue_pair *qparent,
UInt16 tuppIndex,
const char * row, Lng32 len,
short * rc, NABoolean isVarchar)
{
if (qparent->up->isFull())
{
if (rc)
*rc = WORK_CALL_AGAIN;
return -1;
}
Lng32 length;
if (len <= 0)
length = strlen(row);
else
length = len;
tupp p;
if (pool_->get_free_tuple(p, (Lng32)
((isVarchar ? SQL_VARCHAR_HDR_SIZE : 0)
+ length)))
{
if (rc)
*rc = WORK_POOL_BLOCKED;
return -1;
}
char * dp = p.getDataPointer();
if (isVarchar)
{
*(short*)dp = (short)length;
str_cpy_all(&dp[SQL_VARCHAR_HDR_SIZE], row, length);
}
else
{
str_cpy_all(dp, row, length);
}
ex_queue_entry * pentry_down = qparent->down->getHeadEntry();
ex_queue_entry * up_entry = qparent->up->getTailEntry();
up_entry->copyAtp(pentry_down);
up_entry->getAtp()->getTupp((Lng32)tuppIndex) = p;
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_OK_MMORE;
// insert into parent
qparent->up->insert();
return 0;
}
short ex_tcb::handleError(ex_queue_pair *qparent, ComDiagsArea *inDiagsArea)
{
if (qparent->up->isFull())
return 1;
// Return EOF.
ex_queue_entry * up_entry = qparent->up->getTailEntry();
ex_queue_entry * pentry_down = qparent->down->getHeadEntry();
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_SQLERROR;
ComDiagsArea *diagsArea = up_entry->getDiagsArea();
if (diagsArea == NULL)
diagsArea =
ComDiagsArea::allocate(this->getGlobals()->getDefaultHeap());
else
diagsArea->incrRefCount (); // the setDiagsArea below will decr the ref count
if (inDiagsArea)
diagsArea->mergeAfter(*inDiagsArea);
up_entry->setDiagsArea (diagsArea);
// insert into parent
qparent->up->insert();
return 0;
}
short ex_tcb::handleDone(ex_queue_pair *qparent, ComDiagsArea *inDiagsArea)
{
if (qparent->up->isFull())
return 1;
// Return EOF.
ex_queue_entry * up_entry = qparent->up->getTailEntry();
ex_queue_entry * pentry_down = qparent->down->getHeadEntry();
if (inDiagsArea && inDiagsArea->getNumber(DgSqlCode::WARNING_) > 0)
{
ComDiagsArea *diagsArea = up_entry->getDiagsArea();
if (diagsArea == NULL)
diagsArea =
ComDiagsArea::allocate(this->getGlobals()->getDefaultHeap());
else
diagsArea->incrRefCount (); // the setDiagsArea below will decr the ref count
if (inDiagsArea)
diagsArea->mergeAfter(*inDiagsArea);
up_entry->setDiagsArea (diagsArea);
}
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_NO_DATA;
// insert into parent
qparent->up->insert();
// pstate.matches_ = 0;
qparent->down->removeHead();
return 0;
}
__declspec(dllexport)
NABoolean ExExprComputeSpace(ex_tcb * tcb)
{
return tcb->getGlobals()->computeSpace();
}