blob: 38654b3cb63a74b63be995c433e7f11cbdc73916 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ex_root.cpp
* Description: Root TCB for master executor. Interface with CLI.
* The root node also drives the scheduler and the
* fragment directory. It has execute() and fetch() methods.
* Created: 7/10/95
* Language: C++
*
*
*
*
*****************************************************************************
*/
#include "Platform.h"
#include <stdio.h>
#include "cli_stdh.h"
#include "ex_stdh.h"
#include "ex_exe_stmt_globals.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ex_root.h"
#include "ex_expr.h"
#include "ex_frag_rt.h"
#include "ComDiags.h"
#include "ExStats.h"
#include "NAError.h"
#include "LateBindInfo.h"
#include "ExecuteIdTrig.h"
#include "TriggerEnable.h"
#include "ComRtUtils.h"
#include "PortProcessCalls.h"
#include "ExpLOB.h"
#include "ex_transaction.h"
#include "ComSqlId.h"
#include "ExCextdecs.h"
#include "ExSMTrace.h"
#include "ExSMGlobals.h"
#include "ExSMCommon.h"
#include "ExpHbaseInterface.h"
////////////////////////////////////////////////////////////////////////
// TDB procedures
//
// Build a root tcb
//
ex_tcb * ex_root_tdb::build(CliGlobals *cliGlobals, ex_globals * glob)
{
ExExeStmtGlobals * exe_glob = glob->castToExExeStmtGlobals();
ExMasterStmtGlobals *master_glob = exe_glob->castToExMasterStmtGlobals();
if (getQueryUsesSM() && cliGlobals->getEnvironment()->smEnabled())
{
// For SeaMonster if reader thread terminated, since there is no
// reader thread we cannot continue with the execution of the
// query. So raise the error as to why the reader thread
// terminated and add the error to statement globals.
ExSMGlobals *smGlobals = ExSMGlobals::GetExSMGlobals();
if (smGlobals && smGlobals->getReaderThreadState() ==
ExSMGlobals::TERMINATED_DUE_TO_ERROR)
{
smGlobals->addReaderThreadError(exe_glob);
return NULL;
}
}
// set this plan version in the statement globals.
glob->setPlanVersion(planVersion_);
// set the fragment directory in glob. This will be passed
// to the build of all tdb's and used by them, if needed.
master_glob->setFragDir(fragDir_);
glob->setNumOfTemps(tempTableCount_);
// ---------------------------------------------------------------------
// build the run time fragment instance table
// ---------------------------------------------------------------------
ExRtFragTable *rtFragTable = NULL;
// make a run time fragment directory identical to compiled one
rtFragTable = new(master_glob->getSpace())
ExRtFragTable(master_glob, fragDir_, (char *) this);
// if this tdb is displayed in the GUI then enable GUI display for ESPs
rtFragTable->displayGuiForESPs(displayExecution());
// remember the fragment table in the globals
master_glob->setRtFragTable(rtFragTable);
// Acquire ESPs (do this before child is built, as some decisions in
// children depend on the location of fragment instances)
// If measure stats are being collected, find out the number of new
// esps started and the time it took to start them.
UInt32 numOfNewEspsStarted = 0; // num of new esps started by this operator.
UInt32 numOfTotalEspsUsed = 0; // num of total esps used by this operator.
Int64 newprocessTime = 0;
if ((getCollectStatsType() == ComTdb::ACCUMULATED_STATS) ||
(getCollectStatsType() == ComTdb::OPERATOR_STATS) ||
(getCollectStatsType() == ComTdb::PERTABLE_STATS))
{
newprocessTime = JULIANTIMESTAMP(OMIT,OMIT,OMIT,OMIT);
}
rtFragTable->assignEsps(TRUE, numOfTotalEspsUsed, numOfNewEspsStarted
);
if (((getCollectStatsType() == ComTdb::ACCUMULATED_STATS) ||
(getCollectStatsType() == ComTdb::OPERATOR_STATS) ||
(getCollectStatsType() == ComTdb::PERTABLE_STATS)) &&
(numOfNewEspsStarted > 0))
{
newprocessTime = JULIANTIMESTAMP(OMIT,OMIT,OMIT,OMIT) - newprocessTime;
}
else
newprocessTime = 0;
// return if we couldn't allocate ESPs
if (rtFragTable->getState() == ExRtFragTable::ERROR)
{
return NULL;
}
if (getQueryUsesSM() && cliGlobals->getEnvironment()->smEnabled())
{
// Assign a SeaMonster ID to the query
Int64 smQueryID = ExSMGlobals::reserveSMID();
master_glob->setSMQueryID(smQueryID);
// Initialize SeaMonster. Return early if errors where encountered.
exe_glob->initSMGlobals();
ComDiagsArea *diags = exe_glob->getDiagsArea();
if (diags && diags->getNumber(DgSqlCode::ERROR_) > 0)
return NULL;
}
else
{
// Turn off tracing if it once was enabled and now we turned off
// the SEAMONSTER CQD, which disables the SeaMonster trace.
ExSMGlobals *smGlobals = ExSMGlobals::GetExSMGlobals();
if (smGlobals && smGlobals->getTraceEnabled())
smGlobals->setTraceEnabled(false);
}
// build the child
ex_tcb * child_tcb = childTdb->build(exe_glob);
ex_root_tcb * root_tcb =
new(exe_glob->getSpace()) ex_root_tcb(*this,
*child_tcb,
exe_glob);
root_tcb->registerSubtasks();
// if we collect stats set up the stats area.
ExStatisticsArea* statsArea = NULL;
if (getCollectStats())
{
StmtStats *stmtStats = master_glob->getStatement()->getStmtStats();
if (getCollectStatsType() == ALL_STATS ||
stmtStats == NULL || stmtStats->getQueryId() == NULL)
{
// These ALL stats are too numerous to store in the
// shared segment.
statsArea = new(master_glob->getDefaultHeap())
ExStatisticsArea(master_glob->getDefaultHeap(), 0,
getCollectStatsType());
master_glob->setStatsArea(statsArea);
root_tcb->allocateStatsEntry();
}
else
{
StatsGlobals *statsGlobals = cliGlobals->getStatsGlobals();
if (statsGlobals == NULL)
{
statsArea = new(master_glob->getDefaultHeap())
ExStatisticsArea(master_glob->getDefaultHeap(), 0,
getCollectStatsType());
master_glob->setStatsArea(statsArea);
root_tcb->allocateStatsEntry();
if (stmtStats != NULL)
stmtStats->setStatsArea(statsArea);
}
else
{
int error =
statsGlobals->getStatsSemaphore(cliGlobals->getSemId(),
cliGlobals->myPin());
statsArea = new(cliGlobals->getStatsHeap())
ExStatisticsArea(cliGlobals->getStatsHeap(), 0,
getCollectStatsType());
if (stmtStats != NULL)
{
// If the Context Stats is pointing to this statement statsArea
// make the Context stats as a copy so that it gets deleted when
// ContextCli::setStatsArea called again. StmtStats::setStatsArea will not
// delete this statistics area since context has a reference to it.
ExStatisticsArea *ctxStats =
master_glob->getStatement()->getContext()->getStats();
ExStatisticsArea *prevStats = stmtStats->getStatsArea();
if (ctxStats == prevStats)
cliGlobals->currContext()->setStatsArea(NULL,FALSE, FALSE,FALSE);
stmtStats->setStatsArea(statsArea);
}
master_glob->setStatsArea(statsArea);
statsArea->setRtsStatsCollectEnabled(getCollectRtsStats());
root_tcb->allocateStatsEntry();
statsGlobals->releaseStatsSemaphore(cliGlobals->getSemId(),
cliGlobals->myPin());
}
}
statsArea->setExplainPlanId(explainPlanId_);
ExMasterStats *masterStats = (stmtStats ? stmtStats->getMasterStats() :
NULL);
if (masterStats)
{
masterStats->numOfNewEspsStarted() = numOfNewEspsStarted;
masterStats->setNumCpus(rtFragTable->
countSQLNodes(cliGlobals->myCpu()));
}
}
// set the EMS Experience level
if (isEMSEventExperienceLevelBeginner())
{
cliGlobals->setEMSBeginnerExperienceLevel();
}
if (getUncProcess())
cliGlobals->setUncProcess();
// nobody can claim me as a dependent, so I'm setting my parent TDB id
// to NULL.
if (root_tcb->getStatsEntry())
{
root_tcb->getStatsEntry()->setParentTdbId(-1);
if (root_tcb->getStatsEntry()->castToExFragRootOperStats())
root_tcb->getStatsEntry()->castToExFragRootOperStats()->
setStmtIndex(master_glob->getStatement()->getStatementIndex());
}
if (root_tcb->getStatsEntry())
{
if (root_tcb->getStatsEntry()->castToExMeasStats())
{
root_tcb->getStatsEntry()->castToExMeasStats()->incNewprocess(numOfNewEspsStarted);
root_tcb->getStatsEntry()->castToExMeasStats()->incNewprocessTime(newprocessTime);
}
else
if (root_tcb->getStatsEntry()->castToExFragRootOperStats())
{
root_tcb->getStatsEntry()->castToExFragRootOperStats()->incNewprocess(numOfNewEspsStarted);
root_tcb->getStatsEntry()->castToExFragRootOperStats()->incNewprocessTime(newprocessTime);
}
}
// download plans and build them in the ESPs
if (NOT noEspsFixup())
rtFragTable->downloadAndFixup();
// assign initial partition ranges to ESPs
rtFragTable->assignPartRangesAndTA(TRUE);
if (cpuLimit_ > 0)
{
glob->getScheduler()->setMaxCpuTime(cpuLimit_);
glob->getScheduler()->setCpuLimitCheckFreq(cpuLimitCheckFreq_);
}
if (processLOB())
{
glob->initLOBglobal(cliGlobals->currContext());
}
return (root_tcb);
} // ex_root_tdb::build
///////////////////////////////////////////////////////////////////////////
//
// TCB procedures
///////////////////////////////////////////////////////////////////////////
ex_root_tcb::ex_root_tcb(
const ex_root_tdb & root_tdb,
const ex_tcb & child_tcb,
ExExeStmtGlobals *glob) :
ex_tcb( root_tdb, 1, glob),
workAtp_(0),
asyncCancelSubtask_(NULL),
cancelStarted_(FALSE),
fatalError_(FALSE),
cpuLimitExceeded_(FALSE),
pkeyAtp_(NULL),
lastQueueSize_(0), // used by rowsets
triggerStatusVector_(NULL),
queryStartedStream_(NULL),
queryFinishedStream_(NULL),
cbServer_(NULL),
cbCommStatus_(0),
mayPinAudit_(false),
mayLock_(false)
{
tcbChild_ = &child_tcb;
// get the queues that my child uses to communicate with me
qchild = child_tcb.getParentQueue();
// unlike most other nodes, the root allocates ATPs for its child
// down queue, since it creates the down queue entries rather
// than passing them
qchild.down->allocateAtps(glob->getSpace());
if (root_tdb.inputExpr_)
{
(root_tdb.inputExpr_)->fixup(0, getExpressionMode(), this,
glob->getSpace(), glob->getDefaultHeap(), FALSE, glob);
}
if (root_tdb.outputExpr_)
{
(root_tdb.outputExpr_)->fixup(0, getExpressionMode(), this,
glob->getSpace(), glob->getDefaultHeap(),
FALSE, glob);
}
if (root_tdb.pkeyExpr_)
{
(root_tdb.pkeyExpr_)->fixup(0, getExpressionMode(), this,
glob->getSpace(), glob->getDefaultHeap(), FALSE, glob);
}
if (root_tdb.predExpr_)
{
(root_tdb.predExpr_)->fixup(0, getExpressionMode(), this,
glob->getSpace(), glob->getDefaultHeap(), FALSE, glob);
}
workAtp_ = allocateAtp(root_tdb.criDesc_, glob->getSpace());
Int32 numTuples = 2; // temps and consts
if (root_tdb.inputVarsSize_ > 0)
{
// allocate space to hold input params/hostvars
tupp_descriptor *tp = new(glob->getSpace()) tupp_descriptor;
char * dataPtr =
(char *)glob->getSpace()->allocateMemory(root_tdb.inputVarsSize_);
tp->init(root_tdb.inputVarsSize_,0,dataPtr);
workAtp_->getTupp(numTuples++) = tp;
}
if (root_tdb.updateCurrentOfQuery())
{
// allocate space to hold input pkey row
tupp_descriptor *tp = new(glob->getSpace()) tupp_descriptor;
char * dataPtr =
(char *)glob->getSpace()->allocateMemory(root_tdb.pkeyLen_);
tp->init((short) root_tdb.pkeyLen_,0,dataPtr);
workAtp_->getTupp(numTuples++) = tp;
}
else if (pkeyExpr())
{
pkeyAtp_ = allocateAtp(root_tdb.workCriDesc_, glob->getSpace());
// allocate space to hold the row of primary keys
tupp_descriptor *tp = new(glob->getSpace()) tupp_descriptor;
char * dataPtr =
(char *)glob->getSpace()->allocateMemory(root_tdb.pkeyLen_);
tp->init((short) root_tdb.pkeyLen_,0,dataPtr);
pkeyAtp_->getTupp(2) = tp;
}
// set the stream timeout value to be used by this TCB
if ( ! glob->getStreamTimeout( streamTimeout_ ) ) // dynamic not found ?
streamTimeout_ = root_tdb.streamTimeout_ ; // then use the static value
// Used by Query Caching.
if (root_tdb.cacheVarsSize_ > 0)
{
tupp_descriptor *tp = new(glob->getSpace()) tupp_descriptor;
tp->init(root_tdb.cacheVarsSize_,0, root_tdb.getParameterBuffer());
workAtp_->getTupp(numTuples++) = tp;
}
ex_assert(numTuples==root_tdb.criDesc_->noTuples(),
"conflicting tuple numbers in root_tdb");
getGlobals()->getScheduler()->setRootTcb(this);
#ifdef NA_DEBUG_GUI
//-------------------------------------------------------------
// GSH: If user has requested use of MS windows based GUI
// display then load the tdm_sqlexedbg dll and SetRootTcb.
// Note: displayExecution() return 2 if MS windows based GUI
// was requested.
//-----------------------------------------------------------
if (root_tdb.displayExecution() == 2)
getGlobals()->getScheduler()->startGui();
#endif
rwrsBuffer_ = NULL;
if (root_tdb.getRWRSInfo())
{
rwrsBuffer_ =
new(glob->getDefaultHeap()) char[
root_tdb.getRWRSInfo()->rwrsMaxSize() *
root_tdb.getRWRSInfo()->rwrsMaxInternalRowlen()];
root_tdb.getRWRSInfo()->setRWRSInternalBufferAddr(rwrsBuffer_);
}
}
ex_root_tcb::~ex_root_tcb()
{
freeResources();
};
void ex_root_tcb::freeResources()
{
if (workAtp_)
{
workAtp_->release();
deallocateAtp(workAtp_, getGlobals()->getSpace());
workAtp_ = NULL;
}
ExMasterStmtGlobals *glob =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals();
ExRtFragTable *rtFragTable = glob->getRtFragTable();
if (rtFragTable)
{
delete rtFragTable;
glob->setRtFragTable(NULL);
}
if (root_tdb().logRetriedIpcErrors())
{
// The IpcEnvironment object is global to the process, and shared
// by all of its statements. Although it is possible these errors
// (if any) were accumulated on some other concurrent Statement,
// we will log and clear the counter now. As a result, we will not
// attempt to correlate retried errors with any particular Statement.
glob->getIpcEnvironment()->logRetriedMessages();
}
if (queryStartedStream_)
{
queryStartedStream_->removeRootTcb();
queryStartedStream_->addToCompletedList();
queryStartedStream_ = NULL;
}
if (queryFinishedStream_)
{
queryFinishedStream_->removeRootTcb();
queryFinishedStream_->addToCompletedList();
queryFinishedStream_ = NULL;
}
};
void ex_root_tcb::populateCancelDiags(ComDiagsArea &diags)
{
diags.clearErrorConditionsOnly();
diags << DgSqlCode(-EXE_CANCELED);
ExMasterStmtGlobals *master_glob = getGlobals()->
castToExExeStmtGlobals()->castToExMasterStmtGlobals();
ExStatisticsArea *statsArea = master_glob->getStatsArea();
CliGlobals *cliGlobals = master_glob->getCliGlobals();
if (statsArea)
{
ExMasterStats *masterStats = statsArea->getMasterStats();
if (masterStats && masterStats->getCancelComment())
diags << DgString0(masterStats->getCancelComment());
}
}
void ex_root_tcb::registerSubtasks()
{
ExScheduler *sched = getGlobals()->getScheduler();
// there is only one queue pair to the child, no parent queue, and
// the work procedure does actually no work except interrupting
// the work procedure immediately when a row can be returned
sched->registerUnblockSubtask(sWork,this,qchild.down);
sched->registerInsertSubtask(sWork,this,qchild.up);
asyncCancelSubtask_ = sched->registerNonQueueSubtask(sWork,this,"WK");
// the frag table has its own event and work procedure, but its
// work procedure is called through a method of the root TCB
getGlobals()->
castToExExeStmtGlobals()->
castToExMasterStmtGlobals()->getRtFragTable()->setSchedulerEvent(
getGlobals()->getScheduler()->registerNonQueueSubtask(sWorkOnFragDir,
this,"FD"));
}
char * ex_root_tcb::getPkeyRow()
{
if ((pkeyExpr()) &&
(pkeyAtp_))
return pkeyAtp_->getTupp(2).getDataPointer();
else
{
// either cursor is not updatable or the pkeyExpr was
// not setup correctly.
return NULL; // error case.
}
}
NABoolean ex_root_tcb::needStatsEntry()
{
return TRUE;
}
ExOperStats * ex_root_tcb::doAllocateStatsEntry(CollHeap *heap, ComTdb *tdb)
{
ExOperStats * stat = NULL;
ComTdb::CollectStatsType statsType = getGlobals()->getStatsArea()->getCollectStatsType();
if ((statsType == ComTdb::ALL_STATS) ||
(statsType == ComTdb::OPERATOR_STATS) ||
(statsType == ComTdb::PERTABLE_STATS))
{
stat = new(heap) ExFragRootOperStats(heap,
this,
tdb);
StmtStats *ss = getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->getStatement()->getStmtStats();
if (ss != NULL)
((ExFragRootOperStats *)stat)->setQueryId(ss->getQueryId(), ss->getQueryIdLen());
}
else if (statsType == ComTdb::ACCUMULATED_STATS)
{
// if accumulated statistics are to be collected, allocate
// one stats entry and insert it into the queue.
// All executor operators that collect stats will use this
// entry to update stats.
// These stats are not associated with any particular
// TDB or TCB. We pass tdb as argument just to propagate
// overflow_mode settings to stats.
stat = new(heap) ExMeasStats(heap,
NULL,
tdb);
StmtStats *ss = getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->getStatement()->getStmtStats();
if (ss != NULL)
((ExMeasStats *)stat)->setQueryId(ss->getQueryId(), ss->getQueryIdLen());
}
if (stat)
getGlobals()->getStatsArea()->setRootStats(stat);
return stat;
}
// this method receives the row of primary key values for an update
// where current of query. The row is returned from the select cursor
// thru the getPkeyRow() method.
void ex_root_tcb::inputPkeyRow(char * pkey_row)
{
char * dataPtr =
workAtp_->getTupp(root_tdb().criDesc_->noTuples()-1).getDataPointer();
str_cpy_all(dataPtr, pkey_row, root_tdb().pkeyLen_);
}
///////////////////////////////////////////////////////////////////////////
// this method returns the inputRow. If no input values (hostvars, param,
// etc), it returns NULL. Used for reompilation purpose. At recomp time,
// the input descriptor is not available to the fetch methods. So before
// recompilation,
// the 'original' inputRow is retrieved and then passed back again
// after recompilation.
// Caller should copy the inputdata into its own space.
///////////////////////////////////////////////////////////////////////////
void ex_root_tcb::getInputData(char* &inputData, ULng32 &inputDatalen)
{
inputData = NULL;
inputDatalen = 0;
if (root_tdb().inputVarsSize_ > 0)
{
inputData = workAtp_->getTupp(2).getDataPointer();
inputDatalen = root_tdb().inputVarsSize_;
}
}
void ex_root_tcb::setInputData(char* inputData)
{
if (root_tdb().inputVarsSize_ > 0)
{
str_cpy_all(workAtp_->getTupp(2).getDataPointer(), inputData,
root_tdb().inputVarsSize_);
}
}
///////////////////////////////////
// fixup.
//////////////////////////////////
Int32 ex_root_tcb::fixup()
{
return ex_tcb::fixup();
}
///////////////////////////
// execute.
///////////////////////////
Int32 ex_root_tcb::execute(CliGlobals *cliGlobals,
ExExeStmtGlobals * glob,
Descriptor * input_desc,
ComDiagsArea*& diagsArea,
NABoolean reExecute)
{
Int32 jmpRc = 0;
ExMasterStmtGlobals *master_glob = glob->castToExMasterStmtGlobals();
if (fatalError_)
{
// Can't fetch. I'm dead.
diagsArea = ComDiagsArea::allocate(getHeap());
*diagsArea << DgSqlCode(-EXE_CANNOT_CONTINUE);
return -EXE_CANNOT_CONTINUE;
}
#ifdef _DEBUG
// Do not need to cover debug code since customers and QA do no receive it.
char *testCancelFreq = getenv("TEST_ERROR_AT_QUEUE");
if (testCancelFreq)
{
Int32 freq = atoi(testCancelFreq);
if (freq < 0)
freq = 0;
if (freq != 0)
{
Int32 i = 1;
while ( i <= freq)
i = i << 1;
freq = i >> 1;
}
getGlobals()->setInjectErrorAtQueue(freq);
}
else
getGlobals()->setInjectErrorAtQueue(0);
#endif
ex_queue_entry *entry = qchild.down->getTailEntry();
// Initialize rownum.
getGlobals()->rowNum() = 1;
if (root_tdb().isEmbeddedUpdateOrDelete())
{
// This is a destructive select, we need the GET_NEXT_N protocol.
// The program must be using a cursor for "delete access" that probably
// will remain open across transactions. In order to "bunch"
// destructively read rows, we need the GET_NEXT_N protocol where N is
// the number of rows the program intents to fetch (destructively) in
// a transaction.
entry->downState.request = (root_tdb().isStreamScan() ? ex_queue::GET_NEXT_N_MAYBE_WAIT : ex_queue::GET_NEXT_N);
// nothing to do yet (fetch will start this)
entry->downState.requestValue = 0;
// this is the first GET_NEXT_N request
entry->downState.numGetNextsIssued = 0;
}
else
if (root_tdb().getFirstNRows() < 0)
{
entry->downState.request = ex_queue::GET_ALL;
entry->downState.requestValue = 11;
}
else
{
// Coverage note - this is probably dead code.
entry->downState.request = ex_queue::GET_N;
entry->downState.requestValue = (Lng32)root_tdb().getFirstNRows();
}
entry->downState.parentIndex = 20;
entry->copyAtp(workAtp_);
// In case we are dealing with compound statements
if (input_desc) {
input_desc->setCompoundStmtsInfo(((ComTdbRoot *) &tdb)->getCompoundStmtsInfo());
}
//
// This entry may not have a diags area yet.
// If diags area is not available, allocate
// one for the entry.
//
if (root_tdb().isNonFatalErrorTolerated())
{
if (!(entry->getDiagsArea()))
{
ComDiagsArea * da = ComDiagsArea::allocate(getHeap());
entry->setDiagsArea(da);
}
entry->getDiagsArea()->setLengthLimit(root_tdb().getNotAtomicFailureLimit());
}
// do not evaulate the inputExpr, if this execute is being done
// after automatic recompilation. The original input tupp will
// be used at this time.
if ((! reExecute) && (inputExpr()))
{
UInt32 flags = 0;
inputExpr()->setIsODBC(flags, root_tdb().odbcQuery());
inputExpr()->setIsRWRS(flags, root_tdb().rowwiseRowsetInput());
inputExpr()->setInternalFormatIO
(flags,
master_glob->getStatement()->getContext()->getSessionDefaults()->
getInternalFormatIO());
inputExpr()->setIsDBTR
(flags,
master_glob->getStatement()->getContext()->getSessionDefaults()->
getDbtrProcess());
// move the values from user area to internal area
ex_expr::exp_return_type rc;
if ((input_desc) &&
(root_tdb().singleRowInput()) &&
(NOT input_desc->isDescTypeWide()) &&
(NOT input_desc->thereIsACompoundStatement()))
{
// fast single row input
rc = inputExpr()->inputSingleRowValue
(entry->getAtp(),
input_desc,
NULL,
glob->getDefaultHeap(),
flags);
}
else if (root_tdb().rowwiseRowsetInput())
{
// rowwise rowset input
rc = inputExpr()->inputRowwiseRowsetValues
(entry->getAtp(),
input_desc,
root_tdb().getRWRSInfo(),
root_tdb().isNonFatalErrorTolerated(),
glob->getDefaultHeap(),
flags);
}
else
{
// columnwise rowset or fastsingle row input disabled.
rc = inputExpr()->inputValues(entry->getAtp(),
input_desc,
root_tdb().isNonFatalErrorTolerated(),
glob->getDefaultHeap(),
flags);
}
if (rc == ex_expr::EXPR_ERROR)
{
diagsArea = ex_root_tcb::moveDiagsAreaFromEntry(entry);
if (root_tdb().isNonFatalErrorTolerated() && !(diagsArea->canAcceptMoreErrors()))
{
diagsArea->removeLastErrorCondition();
*diagsArea << DgSqlCode(-EXE_NONATOMIC_FAILURE_LIMIT_EXCEEDED)
<< DgInt0(root_tdb().getNotAtomicFailureLimit());
}
return -1;
};
// and evaluate any expression on those input values
if ((NOT inputExpr()->noEval()) &&
(inputExpr()->eval(entry->getAtp(), 0) == ex_expr::EXPR_ERROR))
{
diagsArea = ex_root_tcb::moveDiagsAreaFromEntry(entry);
return -1;
}
}
else
if (!reExecute && !inputExpr() && input_desc &&
input_desc->getUsedEntryCount() &&
glob->getStmtType() != ExExeStmtGlobals::STATIC) {
NABoolean byPassCheck = input_desc->isDescTypeWide();
if (!byPassCheck && (input_desc->getCompoundStmtsInfo() == 0)
&& (diagsArea == NULL)) {
diagsArea = ComDiagsArea::allocate(getHeap());
*diagsArea << DgSqlCode(-CLI_STMT_DESC_COUNT_MISMATCH);
return -1;
}
}
ExFragRootOperStats *rootStats = NULL;
if (getStatsEntry())
{
rootStats = getStatsEntry()->castToExFragRootOperStats();
if (rootStats)
rootStats->setTimestamp(NA_JulianTimestamp());
}
ExMasterStats *mStats = NULL;
StmtStats *sStats = master_glob->getStatement()->getStmtStats();
if(sStats)
mStats = sStats->getMasterStats();
if (checkTransBeforeExecute(
master_glob->getStatement()->getContext()->getTransaction(),
master_glob,
mStats, diagsArea))
return -1;
// Set cancelState to indicate ex_root_tcb is valid
// for the cancel thread to reference.
// Also check if there is an old cancel request.
// This code block must be placed right before insert().
// ------------------------------------------------------------
CancelState old = master_glob->setCancelState(CLI_CANCEL_TCB_READY);
if (old == CLI_CANCEL_REQUESTED)
{
// Don't bother to continue if async cancel has been requested.
master_glob->clearCancelState();
if (diagsArea == NULL)
diagsArea = ComDiagsArea::allocate(getHeap());
else
diagsArea->clearErrorConditionsOnly();
populateCancelDiags(*diagsArea);
return -1;
}
// ++Trigger,
//
// UniqueExecueId should be able to distinguish between every two query trees
// that are executed in a cluster simultaneously.
// UniqueExecueId is an evaluate once function in the compiler, but its value
// is calculated once here, and propagate like external parameter to every
// node in the query tree.
// The unique execute id is a combination of CPU number in the cluster,
// process id and root tcb address.
//
if (root_tdb().getUniqueExecuteIdOffset() >= 0
// noTuples() > 2 is checked temporarily, entered to prevent
// root_tdb version related problems during INIT_SQL
&& entry->getAtp()->getCriDesc()->noTuples() > 2)
{
ExecuteId *uniqueExecuteId = (ExecuteId *)
(entry->getAtp()->getTupp(2).getDataPointer() + root_tdb().getUniqueExecuteIdOffset());
uniqueExecuteId->cpuNum = glob->castToExExeStmtGlobals()->getIpcEnvironment()->getMyOwnProcessId(IPC_DOM_GUA_PHANDLE).getCpuNum();
uniqueExecuteId->pid = getpid();
uniqueExecuteId->rootTcbAddress = this;
}
if ((root_tdb().getTriggersCount() > 0)
// noTuples() > 2 is checked to prevent
// root_tdb version related problems during INIT_SQL
&& (entry->getAtp()->getCriDesc()->noTuples() > 2)
&& (getTriggerStatusVector() != NULL))
// In case one or more triggers are dropped during execution and
// precisely after fixup and immediately before populating the root
// tcb trigger status vector array (before calling getTriggersStatus
// in Statement::execute) a blown away open error will be raised
// getTriggerStatusVector() returns NULL when there are no triggers
// in this statement
{
unsigned char *triggersStatus =
(unsigned char *)(entry->getAtp()->getTupp(2).getDataPointer() +
root_tdb().getTriggersStatusOffset());
// copy the TriggerStatusVector to its atp
memcpy(triggersStatus, getTriggerStatusVector(), TRIGGERS_STATUS_VECTOR_SIZE);
}
// --Trigger,
qchild.down->insert();
// indicate to fragment table that we have a new request
// (ignore return code for now, since we never process multiple requests)
getGlobals()->
castToExExeStmtGlobals()->
castToExMasterStmtGlobals()->getRtFragTable()->setActiveState();
// Let cancel broker know about this query,
registerCB(diagsArea);
cpuLimitExceeded_ = FALSE;
master_glob->incExecutionCount();
if (root_tdb().getQueryUsesSM() && cliGlobals->getEnvironment()->smEnabled())
{
EXSM_TRACE(EXSM_TRACE_MAIN_THR, "query [%s]",
master_glob->getStatement()->getSrcStr());
EXSM_TRACE(EXSM_TRACE_MAIN_THR, "root tcb %p exec count %d", this,
(int) master_glob->getExecutionCount());
}
// do some work, but not necessarily all of it
if (glob->getScheduler()->work() == WORK_BAD_ERROR)
{
completeOutstandingCancelMsgs();
return fatal_error(glob, diagsArea);
}
// Following code is test for soln 10-081104-7061. A CQD
// COMP_INT_38 can be used to force various kinds of abends
// in the master.
#if 0
// re-enable this to test abend creation.
switch (root_tdb().getAbendType())
{
case ComTdbRoot::NO_ABEND:
{
break;
}
case ComTdbRoot::SIGNED_OVERFLOW:
{
// Don't expect this to work. b/c this code is compiled
// with overlow trapping set to off.
signed short willOverflow = 0x7ffc;
ULng32 loopCnt = (ULng32) this;
while (loopCnt--)
willOverflow++;
break;
}
case ComTdbRoot::ASSERT:
{
ex_assert( root_tdb().getAbendType() !=
ComTdbRoot::ASSERT,
"Abend/assertTest passed.");
break;
}
case ComTdbRoot::INVALID_MEMORY:
{
return *(Int32 *) WORK_OK;
}
default:
{
ex_assert( 0, "invalid value for getAbendType (COMP_INT_38).");
break;
}
}
#endif
return 0;
}
void ex_root_tcb::setupWarning(Lng32 retcode, const char * str,
const char * str2, ComDiagsArea* & diagsArea)
{
ContextCli *currContext = GetCliGlobals()->currContext();
// Make sure retcode is positive.
if (retcode < 0)
retcode = -retcode;
if ((ABS(retcode) >= HBASE_MIN_ERROR_NUM)
&& (ABS(retcode) <= HBASE_MAX_ERROR_NUM))
{
if (diagsArea == NULL)
diagsArea = ComDiagsArea::allocate(getHeap());
Lng32 cliError = 0;
Lng32 intParam1 = retcode;
ComDiagsArea * newDiags = NULL;
ExRaiseSqlWarning(getHeap(), &newDiags, (ExeErrorCode) (8448), NULL,
&intParam1, &cliError, NULL, (str ? (char*) str : (char*) " "),
getHbaseErrStr(retcode),
(str2 ? (char*) str2 : (char *) currContext->getJniErrorStr().data()));
diagsArea->mergeAfter(*newDiags);
}
ex_assert( 0, "invalid return code value");
}
void ex_root_tcb::snapshotScanCleanup(ComDiagsArea* & diagsArea)
{
const char * tmpLoc = root_tdb().getSnapshotScanTempLocation();
if (tmpLoc == NULL)
return;
ExpHbaseInterface* ehi = ExpHbaseInterface::newInstance
(STMTHEAP, "", "");
ex_assert(ehi != NULL, "cannot connect to HBase");
Int32 retcode = ehi->init(NULL);
if (retcode != 0)
{
setupWarning(retcode, "ExpHbaseInterface::init", "", diagsArea);
}
else
{
retcode = ehi->cleanSnpTmpLocation(tmpLoc);
if (retcode != 0)
setupWarning(retcode, "ExpHbaseInterface::cleanSnpTmpLocation", "", diagsArea);
if (root_tdb().getListOfSnapshotScanTables())
root_tdb().getListOfSnapshotScanTables()->position();
for (int i = 0; i < root_tdb().getListOfSnapshotScanTables()->entries(); i++)
{
char * tbl = (char*) root_tdb().getListOfSnapshotScanTables()->getCurr();
retcode = ehi->setArchivePermissions((const char *) tbl);
if (retcode != 0)
{
setupWarning(retcode, "ExpHbaseInterface::setArchivePermissions", "", diagsArea);
}
root_tdb().getListOfSnapshotScanTables()->advance();
}
}
delete ehi;
return;
}
////////////////////////////////////////////////////////
// RETURNS: 0, success. 100, EOF. -1, error. 1, warning
////////////////////////////////////////////////////////
Int32 ex_root_tcb::fetch(CliGlobals *cliGlobals,
ExExeStmtGlobals * glob,
Descriptor * output_desc,
ComDiagsArea* & diagsArea,
Lng32 timeLimit,
NABoolean newOperation,
NABoolean &closeCursorOnError)
{
// see details of this param in ex_root.h::fetch() declaration.
closeCursorOnError = TRUE;
//
// This procedure retrieves the row returned from the child's up
// queue, evaluates the output expression, and returns the result
// into the output host variables. All the entries from the child's
// up queue are returned first. The scheduler is invoked to do more
// processing once the queue becomes empty.
//
// For the GET_NEXT_N protocol, we should only return when a Q_GET_DONE is
// received. In addition, due to the incomplete implementation of the
// GET_NEXT_N protocol, it is possible to receive a Q_GET_DONE without
// receiving any rows. To work around that, we re-issue a GET_NEXT_N if
// that happens.
// For the non-streaming destructive cursor (which also uses the GET_NEXT_N
// protocol). A Q_NO_DATA is returned instead of a Q_GET_DONE is there
// were less than 'N' rows to be returned.
Int32 rowsReturned = 0;
NABoolean nextIsQNoData = FALSE;
if (newOperation)
time_of_fetch_call_usec_ = NA_JulianTimestamp();
ExMasterStmtGlobals *master_glob = glob->castToExMasterStmtGlobals();
// start off by calling the scheduler (again)
ExWorkProcRetcode schedRetcode = WORK_CALL_AGAIN;
// mjh -- I'd like to move this test down to after the scheduler
// has returned (consolidate it w/ the other new stuff), and
// only do this code if WORK_BAD_ERROR has been returned. After I
// have built browse info, check and see if places that set globals
// da could also return WORK_BAD_ERROR??? (Build-time is okay exception).
if (glob->getDiagsArea() && glob->getDiagsArea()->mainSQLCODE() < 0)
{
// fatal error coming back in the globals, don't fetch
// $$$$ need to do any cleanup, like close cursor?
diagsArea = glob->getDiagsArea();
diagsArea->incrRefCount();
glob->setGlobDiagsArea(NULL);
return -1;
}
if (fatalError_)
{
// Can't fetch. I'm dead.
diagsArea = ComDiagsArea::allocate(getHeap());
*diagsArea << DgSqlCode(-EXE_CANNOT_CONTINUE);
return -1;
}
Lng32 numLoopFetches = 1;
Lng32 outputRowsetSize = 0;
Lng32 compileRowsetSize = -1;
NABoolean doneWithRowsets = TRUE;
if (output_desc != NULL) {
if ((newOperation) &&
(NOT output_desc->rowwiseRowsetEnabled()))
{
output_desc->setDescItem(0, SQLDESC_ROWSET_NUM_PROCESSED, 0, 0);
output_desc->setDescItem(0, SQLDESC_ROWSET_HANDLE, 0, 0);
}
output_desc->getDescItem(0, SQLDESC_ROWSET_SIZE, &outputRowsetSize,
0, 0, 0, 0);
doneWithRowsets = (outputRowsetSize == 0);
// In case we are dealing with compound statements
output_desc->setCompoundStmtsInfo(((ComTdbRoot *) &tdb)->getCompoundStmtsInfo());
}
// For an active destructive select, a new GET_NEXT_N-type command needs
// to be issued for every new fetch.
if (newOperation && root_tdb().isEmbeddedUpdateOrDelete() &&
!qchild.down->isEmpty())
{
ex_queue_entry * dentry = qchild.down->getHeadEntry();
if (dentry->downState.numGetNextsIssued >5)
{
if (root_tdb().isStreamScan())
// either 1 or output rowset size
qchild.down->getNextNMaybeWaitRequestInit((outputRowsetSize?outputRowsetSize:1));
else
// either 1 or output rowset size
qchild.down->getNextNRequestInit((outputRowsetSize?outputRowsetSize:1));
}
else
{
if (root_tdb().isStreamScan())
// either 1 or output rowset size
qchild.down->getNextNMaybeWaitRequest((outputRowsetSize?outputRowsetSize:1));
else
// either 1 or output rowset size
qchild.down->getNextNRequest((outputRowsetSize?outputRowsetSize:1));
}
}
ContextCli & currContext = *(master_glob->getStatement()->getContext());
ComDiagsArea & diags = currContext.diags();
Lng32 numOfCliCalls = currContext.getNumOfCliCalls();
Lng32 queueSize = 0;
// Indicates if Cli has converted a rowset from executor format to user
// space format
NABoolean rowsetConvertedInCli = FALSE;
ExMasterStats* mStats = NULL;
StmtStats* sStats = master_glob->getStatement()->getStmtStats();
if(sStats)
mStats = sStats ->getMasterStats();
ExOperStats *statsEntry = getStatsEntry();
IpcEnvironment * ipcEnv =
glob->castToExExeStmtGlobals()->getIpcEnvironment();
while (1) // return
{
queueSize = (Lng32)qchild.up->getLength();
Lng32 retcode = 0;
// The first condition in this if statement means that there is
// are number of rows in the queue that do not fit in the rowset, so
// we retrieve only those that fit. The second condition means that we
// have no more rows to retrieve than those in the up queue. Note that
// if none of these conditions are met, the else part of this statement
// will cause us to call the scheduler to retrieve more rows
if ((queueSize > outputRowsetSize) ||
((outputRowsetSize > 0) && (queueSize > 0) && (lastQueueSize_ == queueSize))) {
lastQueueSize_ = queueSize;
ex_queue_entry * centry = qchild.up->getHeadEntry();
// if getFirstNRows() is set to -2, then don't output rows
// and don't return after each fetch. Only return at EOD.
// Do this only for user query invocation at the top cli level.
// Don't do this for internal queries.
NABoolean dontReturn =
((root_tdb().getFirstNRows() == -2) &&
(numOfCliCalls == 1) &&
NOT (centry->upState.status == ex_queue::Q_NO_DATA));
if (compileRowsetSize == -1) {
// Find out if the SQL statement was compiled with output
// Rowsets
// ex_queue_entry * centry = qchild.up->getHeadEntry();
if (centry->upState.status == ex_queue::Q_OK_MMORE) {
if (outputExpr()) {
compileRowsetSize =
outputExpr()->getCompiledOutputRowsetSize(centry->getAtp());
}
if (compileRowsetSize < 0)
compileRowsetSize = 0;
if (compileRowsetSize == 0 && outputRowsetSize > 0) {
// The SQL statement is likely a cursor that was compiled
// without rowset output manipulation. In this case we need to fetch
// one row at a time and put it in the output descriptor.
// This code is also exercised for dynamic rowset selects where
// we fetch multiple rows at a time and put it in the output descriptor.
numLoopFetches = outputRowsetSize;
}
}
}
if (queueSize > numLoopFetches) {
queueSize = numLoopFetches;
}
for(Lng32 i = 0; i < queueSize && (retcode == 0 || retcode ==1); i++) {
if (outputRowsetSize > 0 && doneWithRowsets) {
break;
}
centry = qchild.up->getHeadEntry();
if ((centry->upState.status != ex_queue::Q_OK_MMORE) &&
(centry->upState.status != ex_queue::Q_GET_DONE)) {
doneWithRowsets = TRUE;
// if we encounter a Q_NO_DATA
// and we already have accumulated rows to be returned, then
// return the Q_NO_DATA on the next fetch.
nextIsQNoData
= (centry->upState.status == ex_queue::Q_NO_DATA);
if (rowsReturned && nextIsQNoData) {
break;
}
// If we get here then status = Q_SQLERROR. With the
// new behaviour of fetch all we want to do is return the
// error info along with data. We do not want to break here.
}
switch (centry->upState.status)
{
case ex_queue::Q_OK_MMORE:
{
if (predExpr())
{
ex_expr::exp_return_type exprRetCode =
predExpr()->eval(centry->getAtp(), NULL) ;
if (exprRetCode == ex_expr::EXPR_ERROR)
{
retcode = -1;
break;
}
else if (exprRetCode == ex_expr::EXPR_FALSE)
{
dontReturn = TRUE;
break;
}
}
// Remember how many rows have been returned before a
// Q_GET_DONE or Q_NO_DATA tuple is encountered. Needed for
// destructive cursors and dynamic rowset selects .
rowsReturned++;
// update operator stats
if (statsEntry)
statsEntry-> incActualRowsReturned();
// update master stats also
if(mStats)
mStats->incRowsReturned();
// row was returned from child.Evaluate output expression.
if (outputExpr())
{
if ((NOT outputExpr()->noEval()) &&
(outputExpr()->eval(centry->getAtp(), 0)
== ex_expr::EXPR_ERROR)) {
retcode = -1;
break;
}
if (output_desc)
{
UInt32 flags = 0;
outputExpr()->setIsODBC(flags, root_tdb().odbcQuery());
outputExpr()->setInternalFormatIO
(flags,
currContext.getSessionDefaults()->
getInternalFormatIO());
outputExpr()->setIsDBTR
(flags,
master_glob->getStatement()->getContext()->getSessionDefaults()->
getDbtrProcess());
if (outputExpr()->outputValues(centry->getAtp(),
output_desc,
getHeap(),
flags)
== ex_expr::EXPR_ERROR)
{
closeCursorOnError = FALSE;
retcode = -1;
break;
}
}
// if primary key is to be returned, create a tuple
// containing the pkey row.
if (pkeyExpr())
{
if (pkeyExpr()->eval(centry->getAtp(), pkeyAtp_)
== ex_expr::EXPR_ERROR)
{
retcode = -1;
break;
}
}
}
if (output_desc != NULL) {
if (outputRowsetSize > 0)
{
Lng32 numProcessed;
output_desc->getDescItem(0,
SQLDESC_ROWSET_NUM_PROCESSED,
&numProcessed, 0, 0, 0, 0);
if (numLoopFetches > 1 && (numProcessed < outputRowsetSize)) {
// This section gets executed if we are in a FETCH
// stmt. for rowsets. It counts the number of rows fetched
output_desc->setDescItem(0,
SQLDESC_ROWSET_ADD_NUM_PROCESSED,
0, 0);
}
// Special case when rowset size is 1
if ((numLoopFetches == 1) && (numProcessed == 0) &&
(outputRowsetSize == 1)) {
output_desc->setDescItem(0,
SQLDESC_ROWSET_ADD_NUM_PROCESSED,
0, 0);
}
// Update the total number of rows processed in the rowset.
output_desc->getDescItem(0,
SQLDESC_ROWSET_NUM_PROCESSED,
&numProcessed, 0, 0, 0, 0);
diags.setRowCount(numProcessed);
//master_glob->setRowsAffected(numProcessed);
// Set row number in row's diags if any
ComDiagsArea *rowDiags = centry->getAtp()->getDiagsArea();
if (rowDiags != NULL) {
Int32 index;
for (index=1;
index <= rowDiags->getNumber(DgSqlCode::WARNING_);
index++) {
rowDiags->getWarningEntry(index)->setRowNumber(numProcessed);
}
for (index=1;
index <= rowDiags->getNumber(DgSqlCode::ERROR_);
index++) {
rowDiags->getErrorEntry(index)->setRowNumber(numProcessed);
}
}
// Do not return if the rowset is not full yet and there is
// data to be processed
// ------
// the condition on dontRetun ensures that when processing under LAST0_MODE cqd
// we do not return from fetch even when the number of rowsProcessed exceeds
// rowset size. Since no rows are returned when last0_mode is set we do not return
// from fetch till we see Q_NODATA. This if block allows one to return from fetch
// after an OK_MMORE, so we are disallowing it when LAST0_MODE is set.
if (((numProcessed >= outputRowsetSize) ||
(numLoopFetches <= 1)) &&
(!dontReturn)) {
doneWithRowsets = TRUE;
}
}
}
break;
}
case ex_queue::Q_NO_DATA:
{
dontReturn = FALSE; // i.e., do return
retcode = 100;
// done with this request, indicate this to frag dir
getGlobals()->
castToExExeStmtGlobals()->
castToExMasterStmtGlobals()->
getRtFragTable()->setInactiveState();
// mark the stats area to indicate if stats
// were collected. This method will get this
// information from globals and set it in the
// statsArea that is pointed to by stmt globals.
ExStatisticsArea *stats = getGlobals()->getOrigStatsArea();
if (stats)
stats->setStatsEnabled(getGlobals()->statsEnabled());
#ifdef NA_DEBUG_GUI
ex_tcb::objectCount = 0;
#endif
// Make the rowset handle available
if (output_desc != NULL)
output_desc->setDescItem(0,
SQLDESC_ROWSET_HANDLE,
0, 0);
}
break;
case ex_queue::Q_SQLERROR:
{
// Converts error -EXE_CS_EOD to warning EXE_CS_EOD and sets
// retcode to 0 if such a warning is found and no other error
// is present, else retcode is -1. Setting retcode to 0 allows
// processing to continue till we get to Q_N0_DATA
ComDiagsArea * da = centry->getAtp()->getDiagsArea();
if ( da && da->contains(-EXE_CS_EOD)) {
da->negateCondition(da->returnIndex(-EXE_CS_EOD));
CollIndex index;
while ((index = da->returnIndex(-EXE_CS_EOD)) != NULL_COLL_INDEX ) {
da->deleteError(index);
}
if (da->getNumber(DgSqlCode::ERROR_) == 0) {
retcode = 0;
}
else {
retcode = -1;
}
}
else {
retcode = -1;
}
if (retcode == -1)
{
// mark the stats area to indicate if stats
// were collected. This method will get this
// information from globals and set it in the
// statsArea that is pointed to by stmt globals.
ExStatisticsArea *stats = getGlobals()->getOrigStatsArea();
if (stats)
stats->setStatsEnabled(getGlobals()->statsEnabled());
}
}
break;
case ex_queue::Q_GET_DONE:
{
// mark the stats area to indicate if stats
// were collected. This method will get this
// information from globals and set it in the
// statsArea that is pointed to by stmt globals.
ExStatisticsArea *stats = getGlobals()->getOrigStatsArea();
if (stats)
stats->setStatsEnabled(getGlobals()->statsEnabled());
// Only the destructive select will get Q_GET_DONE signals.
// This is in response to GET_NEXT_N and
// GET_NEXT_N_MAYBE_WAIT commands. This Q_GET_DONE signals
// that there are no more rows in this "bunch".
// Note that this for-loop only loops for the number of row
// expected and we expect here the number of rows PLUS a
// Q_GET_DONE. We will fall out of the loop and get back
// in it via the while loop. This is a little inefficient
// but the good thing is that I don't need to touch the
// fragile for loop.
if (rowsReturned == 0)
{
// This Q_GET_DONE finishes the current bunch, this means
// that we have not received a row to return to the
// fetch.
// This is a destructive select. No rows were returned
// because the full GET_NEXT protocol is not yet
// implemeted. Work around it by issueing a new command.
if ((streamTimeout_ >= 0) &&
((time_of_fetch_call_usec_ + streamTimeout_*10000) < NA_JulianTimestamp()))
{
// We have spend enough time in fetch().
// It is time to give back control to the program.
// Note that this cursor is a read-only cursor, so
// there is not problem with outstanding requests
// (which there probably are) to DP2 damaging
// things while the program is given the chance to
// commit.
// Note that streamTimeout_ is in .01 seconds and
// NA_JulianTimestamp is in micro seconds
if (diagsArea == NULL)
{
diagsArea = ComDiagsArea::allocate(getHeap());
}
*diagsArea << DgSqlCode(-EXE_STREAM_TIMEOUT);
// This is not a tuple that needs to be returned.
qchild.up->removeHead();
return 1; // warning -- we return 1 (a warning) here
// , because we want Statement::fetch to behave a little differently
// than when other errors are raised: usually, Statement::fetch
// would rollback the transaction and cancel the statment (i.e., close
// the cursor). Instead, when this Condition is raised, we
// want it to leave the trans active and let the cursor be used
// for more fetches if the client desires.
}
#ifdef TRACE_PAPA_DEQUEUE
cout << "ROOT empty Q_GET_DONE, no timeout" << endl;
#endif
if (root_tdb().isStreamScan())
qchild.down->getNextNMaybeWaitRequest((outputRowsetSize?outputRowsetSize:1));
else
qchild.down->getNextNRequest((outputRowsetSize?outputRowsetSize:1));
}
else
{
// This Q_GET_DONE finishes the current bunch. We have
// moved at least one row to the caller; return now.
// This is not a tuple that needs to be returned.
qchild.up->removeHead();
// Note that the retcode is already set since we have
// already moved rows.
return retcode;
}
}
break;
case ex_queue::Q_INVALID:
ex_assert(0, "invalid state returned from child");
break;
} // end of switch
// Set output parameter diagsArea to the diagnostics area
// associated with the current entry, if there is one
// associated with it. Be careful not to leak diags areas
// by merging into diagsArea if it has already been init'd,
// as can happen with rowsets.
/* if (root_tdb().isNonFatalErrorTolerated())
{
// remove dup NF errors if any
if (centry->getDiagsArea())
(centry->getDiagsArea())->removeDupNFConditions();
}*/
if (diagsArea == NULL)
{
diagsArea = ex_root_tcb::moveDiagsAreaFromEntry (centry);
}
else
{
if (centry->getDiagsArea())
{
diagsArea->mergeAfter(*centry->getDiagsArea());
}
}
if (diagsArea != NULL)
{
if (retcode == 0 && diagsArea->mainSQLCODE() > 0)
// It's a warning. So return 1. That's what the cli expects.
retcode = 1;
else if (diagsArea->mainSQLCODE() < 0)
{
// It's an error. Return the negative value.
retcode = -1;
}
else
; // It's a Diags Area w/o any Conditions.
}
qchild.up->removeHead();
// this may have woken up the scheduler, make sure we call it
schedRetcode = WORK_CALL_AGAIN;
}//loop for(Lng32 i = 0; i < queueSize && (retcode == 0 || retcode ==1); i++)
if (retcode == 100)
{
completeOutstandingCancelMsgs();
glob->testAllQueues();
}
if (dontReturn)
rowsReturned = 0;
else if (retcode)
{
if ((retcode < 0) || (retcode == 100) || ((retcode > 0) && doneWithRowsets))
{
ipcEnv->deleteCompletedMessages(); // cleanup messages
if (root_tdb().getSnapshotScanTempLocation())
{
snapshotScanCleanup(diagsArea);
}
return retcode;
}
}
else if (root_tdb().isEmbeddedUpdateOrDelete())
{
// We get here in the following scenarios:
// - we have received not tuples yet => go wait for more
// - we have received data tuples => go wait for a Q_GET_DONE
// - we have received data tuples and the next tuple is a
// Q_NO_DATA => no Q_GET_DONE will be coming, return the
// data tuples and return the Q_NO_DATA on the next fetch.
// In the case of a destructive cursor, the GET_NEXT_N protocol
// is used and we can only return when a complete 'bunch' or rows
// is received, and that is identified by a Q_GET_DONE tuple.
// Consequently, only after reception of a Q_GET_DONE can we
// return (which the Q_GET_DONE code above does).
// Note that a non-streaming destructive cursor returns
// Q_NO_DATA when no more rows are available (just like any
// other cursor).
if (nextIsQNoData)
{
// We have encountered a Q_NO_DATA. Earlier, if we encountered a non-Q_OK_MMORE, we
// set doneWithRowsets to TRUE. We assert that no other developer has changed this
// logic.
//
ex_assert(doneWithRowsets, "Invalid logic in ex_root_tcb::fetch");
ipcEnv->deleteCompletedMessages(); // cleanup messages
// This non-streaming destructive cursor encountered
// Q_NO_DATA but there are rows to be returned. Return them
// now.
return retcode;
}
else
{
// This destructive cursor is still waiting for a
// Q_GET_DONE or a Q_NO_DATA.
// Don't let the rowset code make us return before processing
// the Q_GET_DONE (genesis CR 10-000517-0131). It is safe to
// reset doneWithRowsets to FALSE, because the embeddedUpdateOrDelete
// statements use the GET_NEXT_N protocol, limiting the N (
// (# rows requested) to the size of the rowset.
doneWithRowsets = FALSE;
;
}
}
else if (doneWithRowsets)
{
ipcEnv->deleteCompletedMessages(); // cleanup messages
// row is being returned now.
// Increment rownum.
getGlobals()->rowNum()++;
return retcode;
}
}
else
{
lastQueueSize_ = queueSize;
// make sure we wake our ESPs if this is the first fetch
// after quiescing the executor
master_glob->getRtFragTable()->continueWithTransaction();
NABoolean earliestTimeToCancel = FALSE;
// wait for external events if the scheduler indicated
// last time that it had nothing to do
NABoolean timedout;
Int64 prevWaitTime = 0;
if (schedRetcode == WORK_OK || schedRetcode == WORK_POOL_BLOCKED)
{
IpcTimeout wait_timeout = -1; // default is wait forever
// We are about to block. If this is a streaming non-destructive
// cursor, then make sure we don't get stuck for too long.
if (root_tdb().isStreamScan() && !root_tdb().isEmbeddedUpdateOrDelete())
{
// If we are not waiting forever, then chech if we waited
// long enough.
if (streamTimeout_ >= 0)
{
wait_timeout = streamTimeout_;
// Note that NA_JulianTimestamp is in micro seconds and that
// streamTimeout_ is in .01 seconds
Int64 wait64 = streamTimeout_; // to avoid timeout overflow
// Extra time may be needed
IpcTimeout extraTime = 0;
if (getGlobals()->statsEnabled())
extraTime = 100;
if ((time_of_fetch_call_usec_ + (wait64+extraTime)*10000) < NA_JulianTimestamp())
{
// We have spend enough time in fetch().
// It is time to give back control to the program.
// Note that this cursor is a read-only cursor, so there
// is not problem with outstanding requests (which
// there probably are) to DP2 damaging things while
// the program is given the chance to commit.
if (diagsArea == NULL)
{
diagsArea = ComDiagsArea::allocate(getHeap());
}
*diagsArea << DgSqlCode(-EXE_STREAM_TIMEOUT);
return 1; // warning -- we return 1 (a warning) here
// , because we want Statement::fetch to behave a little differently
// than when other errors are raised: usually, Statement::fetch
// would rollback the transaction and cancel the statment (i.e., close
// the cursor). Instead, when this Condition is raised, we
// want it to leave the trans active and let the cursor be used
// for more fetches if the client desires.
}
}
}
// $$$ no-wait CLI prototype VV
// Obsolete in SQ.
if (timeLimit == 0)
{
// the fetch is nowaited and time has expired, so return to caller now
// note that no diagnostic is generated
// $$$ need to check that we don't lose any outstanding
// diagnostics in this way...
return NOT_FINISHED;
}
// $$$ no-wait CLI prototype ^^
// Wait for any I/O to complete, this includes both
// ARKFS and ESP connections. Use an infinite timeout.
ipcEnv->getAllConnections()->waitOnAll((Lng32) wait_timeout, FALSE, &timedout, &prevWaitTime);
// clean up the completed MasterEspMessages
ipcEnv->deleteCompletedMessages();
earliestTimeToCancel = TRUE;
}
// redrive the scheduler
schedRetcode = glob->getScheduler()->work(prevWaitTime);
#ifdef _DEBUG
// We do not need coverage for DEBUG builds since QA or customers do no recieve.
if (earliestTimeToCancel && getenv("TEST_CANCEL_ABORT"))
{
if (diagsArea == NULL)
{
diagsArea = ComDiagsArea::allocate(getHeap());
}
*diagsArea << DgSqlCode(-EXE_CANCEL_INJECTED);
return -1;
}
#endif
if (schedRetcode == WORK_BAD_ERROR)
{
return fatal_error(glob, diagsArea);
}
else if (schedRetcode == WORK_NOBODY_WORKED)
{
// for now, map this back to WORK_OK.
// I think we'll be able to detect infinite loops
// in the master after we add code to test that there
// are no IPC connections waiting. It may also be
// necessary to handle cases where all the work was
// done in root::execute's call to scheduler::work
// as can happen, perhaps, with waited I/O (or
// statements w/ no I/O).
schedRetcode = WORK_OK;
}
}
} // while
}
////////////////////////////////////////////////////////
// RETURNS: 0, success. 100, EOF. -1, error. 1, warning
////////////////////////////////////////////////////////
Int32 ex_root_tcb::fetchMultiple(CliGlobals *cliGlobals,
ExExeStmtGlobals * glob,
Descriptor * output_desc,
ComDiagsArea* & diagsArea,
Lng32 timeLimit,
NABoolean newOperation,
NABoolean &closeCursorOnError,
NABoolean &eodSeen)
{
Int32 retcode = 0;
NABoolean keepFetching = TRUE;
Lng32 numTotalRows = 0;
Lng32 numRowsProcessed = 0;
output_desc->getDescItem(0, SQLDESC_ROWWISE_ROWSET_SIZE,
&numTotalRows, NULL, 0, NULL, 0);
output_desc->setDescItem(0, SQLDESC_ROWSET_NUM_PROCESSED,
numRowsProcessed, NULL);
while ((output_desc->rowwiseRowsetEnabled()) &&
keepFetching &&
(numRowsProcessed < numTotalRows))
{
retcode = fetch(cliGlobals, glob, output_desc, diagsArea, timeLimit,
newOperation, closeCursorOnError);
if (retcode < 0)
keepFetching = FALSE;
if( retcode == 100)
{
// If there are warnings accompanying this EOD, then change
//the retcode to WARNING. MXCS will retrieve diagnostic warnings
// only if the retcode from this method is a warning. If we return
// EOD then they ignore the warnings.
if (diagsArea && diagsArea->getNumber(DgSqlCode::WARNING_)>0)
retcode =1;
// we will set the statement state to EOF_ after this call so
// on a redrive of a fetch, we will return EOD right away.
keepFetching = FALSE;
eodSeen = TRUE;
}
else if (retcode == 1 && diagsArea &&
diagsArea->containsError(-EXE_STREAM_TIMEOUT))
{
if (numRowsProcessed > 0)
{
// Here we have a stream timeout (-8006) ComCondition in the
// diagsArea, but some rows have already been placed into
// the rowset. To make sure that the ODBC and JDBC clients
// get these rows, we need to convert the error into a
// warning. There is probably only one such ComCondition,
// but to be safe we'll convert any that we find.
CollIndex ste = diagsArea->returnIndex(-EXE_STREAM_TIMEOUT);
do {
diagsArea->negateCondition(ste);
ste = diagsArea->returnIndex(-EXE_STREAM_TIMEOUT);
} while (ste != NULL_COLL_INDEX);
}
else
{
// There weren't any rows in the rowset when the stream
// timed out. So leave the ComCondition as an error, so
// that the caller will stop fetching.
}
// Regardless of whether the rowset was partially populated,
// this method wil return 1 (warning), not -1 (error) so that
// the Statement will not rollback the transaction and thereby
// also close the cursor. At a higher level of the CLI, the
// DiagsArea::mainSQLCode is used to return the -8006 or 8006
// to the caller.
keepFetching = FALSE;
}
else
if (eodSeen != TRUE)
numRowsProcessed++;
output_desc->setDescItem(0, SQLDESC_ROWSET_NUM_PROCESSED,
numRowsProcessed, NULL);
}
return retcode;
}
//////////////////////////////////////////////////////
// OLT execute. Does execute/fetch in one shot. Used
// for OLT queries that do OLT optimization.
//////////////////////////////////////////////////////
Int32 ex_root_tcb::oltExecute(ExExeStmtGlobals * glob,
Descriptor * input_desc,
Descriptor * output_desc,
ComDiagsArea*& diagsArea)
{
ExMasterStmtGlobals *master_glob = getGlobals()->
castToExExeStmtGlobals()->castToExMasterStmtGlobals();
ex_queue_entry *entry = qchild.down->getTailEntry();
entry->downState.request = ex_queue::GET_ALL;
entry->downState.requestValue = 11;
entry->downState.parentIndex = 20;
entry->copyAtp(workAtp_);
if (inputExpr())
{
// In case we are dealing with compound statements
if (input_desc)
{
input_desc->setCompoundStmtsInfo(((ComTdbRoot *) &tdb)->getCompoundStmtsInfo());
}
UInt32 flags = 0;
inputExpr()->setIsODBC(flags, root_tdb().odbcQuery());
inputExpr()->setInternalFormatIO
(flags,
master_glob->getStatement()->getContext()->getSessionDefaults()->
getInternalFormatIO());
inputExpr()->setIsDBTR
(flags,
master_glob->getStatement()->getContext()->getSessionDefaults()->
getDbtrProcess());
// move the values from user area to internal area
if (inputExpr()->inputValues(entry->getAtp(),
input_desc,
FALSE, // do not tolerate nonfatal errors
glob->getDefaultHeap(),
flags) ==
ex_expr::EXPR_ERROR)
{
diagsArea = ex_root_tcb::moveDiagsAreaFromEntry(entry);
return -1;
};
// and evaluate any expression on those input values
if ((NOT inputExpr()->noEval()) &&
(inputExpr()->eval(entry->getAtp(), 0) == ex_expr::EXPR_ERROR))
{
diagsArea = ex_root_tcb::moveDiagsAreaFromEntry(entry);
return -1;
}
} // input expr
ExMasterStats *mStats = NULL;
StmtStats *sStats = master_glob->getStatement()->getStmtStats();
if(sStats)
mStats = sStats ->getMasterStats();
if ( checkTransBeforeExecute(
master_glob->getStatement()->getContext()->getTransaction(),
master_glob,
mStats, diagsArea) )
return -1;
qchild.down->insert();
registerCB(diagsArea);
cpuLimitExceeded_ = FALSE;
// now check for returned rows from child
// indicate to fragment table that we have a new request. This would
// activate ESPs to work, otherwise query will hang. If no ESPs is used
// this call would like no-op.
// (ignore return code for now, since we never process multiple requests)
master_glob->getRtFragTable()->setActiveState();
while (1)
{
// do some work, but not necessarily all of it
if (glob->getScheduler()->work() == WORK_BAD_ERROR)
{
completeOutstandingCancelMsgs();
return fatal_error(glob, diagsArea);
}
Lng32 retcode = -1;
NABoolean setRetcode = TRUE;
while (!qchild.up->isEmpty())
{
ex_queue_entry * centry = qchild.up->getHeadEntry();
switch (centry->upState.status)
{
case ex_queue::Q_OK_MMORE:
{
// row was returned from child.Evaluate output expression.
if (outputExpr())
{
if ((NOT outputExpr()->noEval()) &&
(outputExpr()->eval(centry->getAtp(), 0)
== ex_expr::EXPR_ERROR))
{
diagsArea = ex_root_tcb::moveDiagsAreaFromEntry (centry);
retcode = -1;
setRetcode = FALSE;
break;
}
if (output_desc)
{
UInt32 flags = 0;
outputExpr()->setIsODBC(flags, root_tdb().odbcQuery());
outputExpr()->setInternalFormatIO
(flags,
master_glob->getStatement()->getContext()->
getSessionDefaults()->
getInternalFormatIO());
outputExpr()->setIsDBTR
(flags,
master_glob->getStatement()->getContext()->
getSessionDefaults()->
getDbtrProcess());
if (outputExpr()->outputValues(centry->getAtp(),
output_desc,
getHeap(),
flags)
== ex_expr::EXPR_ERROR)
{
diagsArea = ex_root_tcb::moveDiagsAreaFromEntry (centry);
retcode = -1;
setRetcode = FALSE;
break;
}
}
}
if (setRetcode)
{
retcode = 0;
diagsArea = ex_root_tcb::moveDiagsAreaFromEntry (centry);
setRetcode = FALSE;
}
// update operator stats
if (getStatsEntry())
getStatsEntry() -> incActualRowsReturned();
// update master stats rows returned
if(mStats)
mStats->incRowsReturned();
}
break;
case ex_queue::Q_NO_DATA:
{
if (setRetcode)
{
retcode = 100;
diagsArea = ex_root_tcb::moveDiagsAreaFromEntry (centry);
setRetcode = FALSE;
}
}
break;
case ex_queue::Q_SQLERROR:
{
// Converts error -EXE_CS_EOD to warning EXE_CS_EOD and sets
// retcode to 0 if such a warning is found and no other error
// is present, else retcode is -1. Setting retcode to 0 allows
// processing to continue till we get to Q_N0_DATA
ComDiagsArea * da = centry->getAtp()->getDiagsArea();
if ( da && da->contains(-EXE_CS_EOD)) {
da->negateCondition(da->returnIndex(-EXE_CS_EOD));
CollIndex index ;
while ((index = da->returnIndex(-EXE_CS_EOD)) != NULL_COLL_INDEX ) {
da->deleteError(index); }
if (da->getNumber(DgSqlCode::ERROR_) == 0) {
if (setRetcode) {
retcode = 0;
}
}
else {
retcode = -1;
setRetcode = FALSE;
}
}
else {
retcode = -1;
setRetcode = FALSE;
}
diagsArea = ex_root_tcb::moveDiagsAreaFromEntry (centry);
break;
}
case ex_queue::Q_INVALID:
ex_assert(0, "invalid state returned from child");
setRetcode = FALSE;
break;
} // end of switch
qchild.up->removeHead();
} // while child is not empty
if (NOT setRetcode)
{
if (diagsArea != NULL)
{
// Its a warning. So return 1. That's what the cli expects.
if (retcode == 0)
retcode = 1;
else if (diagsArea->mainSQLCODE() < 0)
{
// its an error. So send in a negative values.
// The diagsArea->mainSQLCODE() better be negative!!
retcode = (Int32) diagsArea->mainSQLCODE();
}
}
return retcode;
}
IpcEnvironment * ipcEnv =
glob->castToExExeStmtGlobals()->getIpcEnvironment();
// Wait for any I/O to complete, this includes both
// ARKFS and ESP connections. Use an infinite timeout.
ipcEnv->getAllConnections()->waitOnAll();
} // while (1)
}
/////////////////////////////////////////////////////////////////////
// Called by the QueryStartedMsgStream::actOnReceive or
// ex_root_tcb::cpuLimitExceeded. Also, on windows, old prototype
// code called this method from a special cancel thread.
/////////////////////////////////////////////////////////////////////
Int32 ex_root_tcb::requestCancel()
{
if (qchild.down->isEmpty() && qchild.up->isEmpty())
; // Query is already finished.
else
{
ExMasterStmtGlobals *glob = getGlobals()->
castToExExeStmtGlobals()->castToExMasterStmtGlobals();
// To guarantee correct concurrent execution, the order of
// the following three statements must not change.
glob->castToExMasterStmtGlobals()->setCancelState(CLI_CANCEL_REQUESTED);
asyncCancelSubtask_->schedule();
// Call ex_root_tcb::work to start the cancel.
work();
// Break the IPC wait loop.
glob->getIpcEnvironment()->getAllConnections()->cancelWait(TRUE);
}
return 0;
}
static IpcTimeout CancelTimeout = -1;
///////////////////////////////////////////
// RETURNS: 0, success. 1, EOF. -1, error.
///////////////////////////////////////////
Int32 ex_root_tcb::cancel(ExExeStmtGlobals * glob, ComDiagsArea *&diagsArea,
NABoolean getQueueDiags)
{
if (fatalError_)
{
// After fatal error, just ignore cancel, because
// queues may be unstable and a hang can occur.
}
else
{
// start off by calling the scheduler (again)
ExWorkProcRetcode schedRetcode = WORK_CALL_AGAIN;
qchild.down->cancelRequest();
// make sure we wake our ESPs if this is the first fetch
// after quiescing the executor
glob->castToExMasterStmtGlobals()->getRtFragTable()->
continueWithTransaction();
// IPC errors (e.g., SQLCODE 2034, error 201 or 14)
// accumulate on the statement globals. Get rid of
// them or any other new errors created during cancel.
ComDiagsArea *globDiagsAreaAtEntry = glob->getDiagsArea();
Lng32 oldDiagsAreaMark = -1;
if (globDiagsAreaAtEntry && !getQueueDiags)
oldDiagsAreaMark = globDiagsAreaAtEntry->mark();
// while there is anything in the up or down queue
// between the root node and its child, work to
// remove whatever is in it.
while ((!qchild.up->isEmpty() || !qchild.down->isEmpty())
&& (schedRetcode != WORK_BAD_ERROR))
{
// remove all up entries that have accumulated
// in the up queue from the child.
// Update diagsArea before removing up queue entries from child.
while (!qchild.up->isEmpty())
{
if (getQueueDiags &&
qchild.up->getHeadEntry()->getDiagsArea())
{
if (!diagsArea)
diagsArea = ComDiagsArea::allocate(glob->getDefaultHeap());
diagsArea->mergeAfter((const ComDiagsArea&)*qchild.up->getHeadEntry()->getDiagsArea());
}
qchild.up->removeHead();
}
NABoolean timedOut = FALSE;
// wait for external events if the scheduler indicated
// last time that it had nothing to do
if (schedRetcode == WORK_OK || schedRetcode == WORK_POOL_BLOCKED
|| schedRetcode == WORK_NOBODY_WORKED)
{
if (CancelTimeout < 0)
{
// call getenv once per process
char *sct = getenv("SQLMX_CANCEL_TIMEOUT");
if (sct)
{
CancelTimeout = (IpcTimeout)
str_atoi(sct, str_len(sct)) * 100;
if (CancelTimeout < 100)
CancelTimeout = 30 * 100;
}
else
CancelTimeout = 30 * 100;
}
// Wait for any I/O to complete, this includes both
// ARKFS and ESP connections. Use an infinite timeout.
glob->castToExExeStmtGlobals()->getIpcEnvironment()->
getAllConnections()->waitOnAll(CancelTimeout, FALSE,
&timedOut);
}
if (timedOut)
{
SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__,
"IPC timeout in ex_root_tcb::cancel.", 0);
schedRetcode = WORK_BAD_ERROR;
}
else
{
/*
// redrive the scheduler.
// Fix for CR 6701 - some ExExeUtil operators call back
// in to the CLI and explicitly clear the curr context
// diags area. It would be nice to have a more general
// fix, but meanwhile, we store off the curr context diags
// area before calling scheduler and restore afterwards.
Statement *statement =
glob->castToExMasterStmtGlobals()->getStatement();
ContextCli *context = statement->getContext();
ComDiagsArea *savedContextDiags = context->diags().copy();
context->diags().clear();
*/
schedRetcode = glob->getScheduler()->work();
/*
savedContextDiags->mergeAfter(context->diags());
context->diags().clear();
context->diags().mergeAfter(*savedContextDiags);
savedContextDiags->decrRefCount();
*/
}
}
if (!getQueueDiags)
{
if (globDiagsAreaAtEntry)
globDiagsAreaAtEntry->rewind(oldDiagsAreaMark);
else if (glob->getDiagsArea())
{
// newly created diagnostic area in the statement globals.
// Clear it, since the caller doesn't want new diags.
glob->getDiagsArea()->clear();
}
}
if (schedRetcode == WORK_BAD_ERROR)
{
// deregister to avoid a leak in broker process MXSSMP.
deregisterCB();
if (root_tdb().getSnapshotScanTempLocation())
{
snapshotScanCleanup(diagsArea);
}
return fatal_error(glob, diagsArea, TRUE);
}
if (!fatalError_)
{
glob->testAllQueues();
}
}
completeOutstandingCancelMsgs();
// Set cancelState to ensure no more references of
// ex_root_tcb by the cancel thread.
glob->castToExMasterStmtGlobals()->clearCancelState();
deregisterCB();
cbMessageWait(0);
if (root_tdb().getSnapshotScanTempLocation())
{
snapshotScanCleanup(diagsArea);
}
return 0;
}
Int32 ex_root_tcb::deallocAndDelete(ExExeStmtGlobals *glob,
ExRtFragTable *fragTable)
{
// Reset cancelState to ensure no more references of
// ex_root_tcb by the cancel thread.
glob->castToExMasterStmtGlobals()->resetCancelState();
// Warning: deleteMe() will delete this tcb!!!!
glob->deleteMe(fatalError_);
return 0;
}
Int32 ex_root_tcb::closeTables(ExExeStmtGlobals * glob,
ExRtFragTable * fragTable)
{
glob->closeTables();
return 0;
}
Int32 ex_root_tcb::reOpenTables(ExExeStmtGlobals * glob,
ExRtFragTable * fragTable)
{
glob->reOpenTables();
return 0;
}
short ex_root_tcb::work()
{
// execute and fetch method handle all the work, but the scheduler
// call this method when an event on the queues to the child
// occurs
ExMasterStmtGlobals *master_glob = getGlobals()->
castToExExeStmtGlobals()->castToExMasterStmtGlobals();
// Pick up an asynchronous cancel request if any.
if ((master_glob->getCancelState() == CLI_CANCEL_REQUESTED) &&
!cancelStarted_)
{
// Process the cancel request and push cancel request
// down the tcb tree.
// It is possible that there is a concurrent ordinary cancel().
// If so, the ordinary cancel will be superseded by
// this async cancel, and get -EXE_CANCELED error from
// up child queue in cancel().
// For now, cancel() does not pass back the -EXE_CANCELED.
cancelStarted_ = TRUE;
if (!qchild.down->isEmpty())
qchild.down->cancelRequest();
}
if (!qchild.up->isEmpty())
{
// A new row is available to the root node. Exit the scheduler
// immediately and let the user see the row. Remember that the
// scheduler exits for all positive error codes (NSK error codes
// when used in DP2). Positive error codes are used to communicate
// a certain exit status from the work procedure to the root.
if (!cancelStarted_)
return 1;
// For async cancel.
// Remove all entries except Q_NO_DATA for the cancel request.
while (qchild.up->getLength() > 1)
qchild.up->removeHead();
if (!qchild.down->isEmpty())
{ // Cancel request has not been removed by child.
qchild.up->removeHead();
// WORK_OK will continue scheduler loop.
return WORK_OK;
}
else
{ // Async cancel completes.
// Attach -EXE_CANCELED error to Q_NO_DATA entry.
// Return 1 to exit the scheduler loop.
ex_queue_entry *centry = qchild.up->getHeadEntry();
ComDiagsArea *diagsArea =
ex_root_tcb::moveDiagsAreaFromEntry(centry);
if (diagsArea == NULL)
{
diagsArea = ComDiagsArea::allocate(getHeap());
centry->setDiagsArea(diagsArea);
}
if (cpuLimitExceeded_)
{
*diagsArea << DgSqlCode(-EXE_QUERY_LIMITS_CPU);
*diagsArea << DgInt0((Lng32) root_tdb().cpuLimit_);
*diagsArea << DgInt1((Int32) master_glob->getMyFragId());
if (root_tdb().getQueryLimitDebug())
{
*diagsArea << DgSqlCode(EXE_QUERY_LIMITS_CPU_DEBUG);
*diagsArea << DgInt0((Lng32) root_tdb().cpuLimit_);
*diagsArea << DgInt1((Int32) master_glob->getMyFragId());
Int64 localCpuTime = 0;
ExFragRootOperStats *fragRootStats;
ExMeasStats *measRootStats;
ExOperStats *rootStats = master_glob->getStatsArea()->getRootStats();
if ((fragRootStats = rootStats->castToExFragRootOperStats()) != NULL)
localCpuTime = fragRootStats->getLocalCpuTime();
else if ((measRootStats = rootStats->castToExMeasStats()) != NULL)
localCpuTime = measRootStats->getLocalCpuTime();
else
ex_assert(0, "Cpu limit evaluated without runtime stats.");
*diagsArea << DgInt2((Lng32) localCpuTime / 1000);
}
}
else
populateCancelDiags(*diagsArea);
// Reset these for the next execution.
master_glob->clearCancelState();
cancelStarted_ = FALSE;
cpuLimitExceeded_ = FALSE;
return 1;
}
}
else
{
// some other reason for reaching here (e.g. unblock of
// down queue to child), just ignore it
return WORK_OK;
}
}
ExWorkProcRetcode ex_root_tcb::workOnFragDir()
{
//called by scheduler.
return getGlobals()->castToExExeStmtGlobals()->
castToExMasterStmtGlobals()->
getRtFragTable()->workOnRequests();
}
Int32 ex_root_tcb::fatal_error( ExExeStmtGlobals * glob,
ComDiagsArea*& diagsArea,
NABoolean noFatalDiags)
{
if (diagsArea)
glob->takeGlobalDiagsArea(*diagsArea);
else if ((diagsArea = glob->getDiagsArea()) != NULL)
{
diagsArea->incrRefCount();
glob->setGlobDiagsArea(NULL);
}
else if (FALSE == noFatalDiags)
{
diagsArea = ComDiagsArea::allocate(getHeap());
*diagsArea << DgSqlCode(-EXE_CANNOT_CONTINUE);
}
fatalError_ = TRUE;
if (noFatalDiags)
; //
else if (glob->castToExMasterStmtGlobals()->getCancelState()
== CLI_CANCEL_REQUESTED)
populateCancelDiags(*diagsArea);
Int32 retCode = 0;
if (diagsArea)
retCode = diagsArea->mainSQLCODE();
return retCode;
}
void ex_root_tcb::completeOutstandingCancelMsgs()
{
ExMasterStmtGlobals *glob = getGlobals()->castToExExeStmtGlobals()->
castToExMasterStmtGlobals();
while (!fatalError_ &&
(glob->getRtFragTable()->getState() != ExRtFragTable::ERROR) &&
glob->anyCancelMsgesOut())
{
if (root_tdb().getQueryUsesSM() && glob->getIpcEnvironment()->smEnabled())
EXSM_TRACE(EXSM_TRACE_MAIN_THR | EXSM_TRACE_CANCEL,
"root tcb %p completeOutstandingCancelMsgs out %d",
this, (int) glob->numCancelMsgesOut());
// work may have finished before a cancel request
// was answered by some ESP.
glob->getIpcEnvironment()->getAllConnections()->waitOnAll();
}
// Note about cleanup after fatalError: When a fatalError happens,
// we can no longer user the data connections and ex_queues. The
// query must be deallocated; it cannot be simply reexecuted. So
// it is unreliable to depend on ESPs to reply to data messages.
// Therefore we skip it. What happens to the connections to ESPs?
// Any pending messages are subjected to BCANCELREQ when they
// timeout (from Statement::releaseTransaction or ex_root_tcb::cancel).
// The data connections are closed as part of the destructor of
// the send top tcbs. The control connections are closed as
// part of ExEspManager::releaseEsp and this will cause any
// functioning ESP to exit when it get the system close message.
// Any hanging ESP will stick around. If it ever comes out of the
// hang, it will check for the system close message and exit.
}
NABoolean ex_root_tcb::externalEventCompleted(void)
{
return getGlobals()->getScheduler()->externalEventCompleted();
}
Int32 ex_root_tcb::checkTransBeforeExecute(ExTransaction *myTrans,
ExMasterStmtGlobals *masterGlob,
ExMasterStats *masterStats,
ComDiagsArea *& diagsArea)
{
if (myTrans && myTrans->xnInProgress())
{
// If the statement uses DP2 locks to track already-inserted
// rows to protect against the Halloween problem, check that
// it is not executed with autocommit off. External transactions
// must also not be allowed, since these aren't committed
// at the end of a SQL statement. By "external transaction",
// we mean one that is started without using BEGIN WORK in this
// master executor. For example, application calls TMF
// BEGINTRANSACTION directly.
if ((myTrans->autoCommit() == FALSE) ||
(myTrans->exeStartedXn() == FALSE))
{
NABoolean doAutoCommitTest = TRUE;
#ifdef _DEBUG
if (getenv("SKIP_AUTOCOMMIT_TEST"))
doAutoCommitTest = FALSE;
#endif
if (root_tdb().isCheckAutoCommit() &&
doAutoCommitTest)
{
if (diagsArea == NULL)
diagsArea = ComDiagsArea::allocate(getHeap());
*diagsArea << DgSqlCode(-EXE_HALLOWEEN_INSERT_AUTOCOMMIT);
return -1;
}
// If there is an external trans, we cannot be sure whether audit
// or locks are held, so we will make the safest assumption.
if (myTrans->exeStartedXn() == FALSE)
{
myTrans->setMayAlterDb(TRUE);
myTrans->setMayHoldLock(TRUE);
}
}
if (root_tdb().getMayAlterDb())
myTrans->setMayAlterDb(TRUE);
if (root_tdb().getSuspendMayHoldLock())
myTrans->setMayHoldLock(TRUE);
if (masterStats)
{
// Will configure runtime statistics to disallow or warn
// query SUSPEND if encompassing transaction might alter
// database or hold locks.
if (myTrans->mayAlterDb())
masterStats->setSuspendMayHaveAuditPinned(TRUE);
if (myTrans->mayHoldLock())
masterStats->setSuspendMayHoldLock(TRUE);
}
mayPinAudit_ = myTrans->mayAlterDb() ? true : false;
mayLock_ = myTrans->mayHoldLock() ? true : false;
}
return 0;
}
static const char * const NCRenvvar =
getenv("SQL_NO_REGISTER_CANCEL");
static const bool NoRegisterCancel = (NCRenvvar && *NCRenvvar == '1');
void ex_root_tcb::registerCB(ComDiagsArea *&diagsArea)
{
if (NoRegisterCancel)
return;
ExMasterStmtGlobals *glob =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals();
CliGlobals *cliGlobals = glob->getCliGlobals();
if (cliGlobals->getStatsGlobals() == NULL)
return;
ContextCli *context = cliGlobals->currContext();
// Let MXSSMP know this query is ready to be suspended, since any ESPs will
// now have their root oper stats in place and we are guaranteed to go into
// ExScheduler::work.
ExMasterStats *mStats = NULL;
StmtStats *sStats = glob->getStatement()->getStmtStats();
if(sStats)
mStats = sStats ->getMasterStats();
if (mStats)
mStats->setReadyToSuspend();
Lng32 qidLen = glob->getStatement()->getUniqueStmtIdLen();
char *qid = glob->getStatement()->getUniqueStmtId();
// No query id -- no cancel or suspend.
if (qidLen == 0)
return;
// No query id -- no cancel or suspend.
if (NULL == getStatsEntry())
return;
// For executionCount, will need either an ExFragRootOperStats
// or else an ExMeasStats, otherwise no cancel or suspend.
Int32 executionCount;
ExFragRootOperStats *rootOperStats =
getStatsEntry()->castToExFragRootOperStats();
ExMeasStats *measStats =
getStatsEntry()->castToExMeasStats();
if (rootOperStats)
executionCount = rootOperStats->getExecutionCount();
else if (measStats)
executionCount = measStats->getExecutionCount();
else
return;
// See if query allows cancel or suspend.
if (root_tdb().mayNotCancel())
return;
SessionDefaults *sessionDefaults = context->getSessionDefaults();
if (sessionDefaults)
{
// Note that it will be required that if a session does not
// allow queries to be canceled, then it also will not be
// possible to suspend the queries.
// See if session allows cancel.
if (!sessionDefaults->getCancelQueryAllowed())
return;
// See if query is "unique" and session allows cancel of unique.
if (!sessionDefaults->getCancelUniqueQuery())
{
Lng32 qt = root_tdb().getQueryType();
if ((qt == ComTdbRoot::SQL_SELECT_UNIQUE) ||
(qt == ComTdbRoot::SQL_INSERT_UNIQUE) ||
(qt == ComTdbRoot::SQL_UPDATE_UNIQUE) ||
(qt == ComTdbRoot::SQL_DELETE_UNIQUE))
return;
}
}
Lng32 fromCond = 0;
if (diagsArea != NULL)
fromCond = diagsArea->mark();
ExSsmpManager *ssmpManager = context->getSsmpManager();
cbServer_ = ssmpManager->getSsmpServer((NAHeap *)getHeap(),
cliGlobals->myNodeName(),
cliGlobals->myCpu(), diagsArea);
if (cbServer_ == NULL || cbServer_->getControlConnection() == NULL)
{
// We could not get a phandle for the cancel broker. However,
// let the query run (on the assumption that it will not need to
// be canceled) and convert any error conditions to warnings.
diagsArea->negateErrors(fromCond);
return;
}
// The stream's actOnSend method will delete (or call decrRefCount())
// for this object.
RtsQueryId *rtsQueryId = new (context->getIpcHeap())
RtsQueryId(context->getIpcHeap(), qid, qidLen);
RtsHandle rtsHandle = (RtsHandle) this;
// Make the message
QueryStarted *queryStarted =
new (context->getIpcHeap())
QueryStarted(rtsHandle, context->getIpcHeap(),
JULIANTIMESTAMP(), executionCount);
//Create the stream on the IpcHeap, since we don't dispose of it immediately.
//We just add it to the list of completed messages in the IpcEnv, and it is disposed of later.
//If we create it in the executor heap, that heap gets deallocated when the statement is
//finished, and we can corrupt some other statement's heap later on when we deallocate this stream.
if (queryStartedStream_ == NULL)
{
queryStartedStream_ = new (context->getIpcHeap())
QueryStartedMsgStream(context->getEnvironment(), this, ssmpManager);
queryStartedStream_->addRecipient(cbServer_->getControlConnection());
}
*queryStartedStream_ << *queryStarted;
queryStarted->decrRefCount();
*queryStartedStream_<< *rtsQueryId;
rtsQueryId->decrRefCount();
// send the no-wait request so cancel broker knows this query has started.
queryStartedStream_->send(FALSE);
setCbStartedMessageSent();
}
void ex_root_tcb::deregisterCB()
{
ExMasterStmtGlobals *glob =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals();
CliGlobals *cliGlobals = glob->getCliGlobals();
ContextCli *context = cliGlobals->currContext();
// Let MXSSMP know this query can no longer be suspended. Also, here
// is where we handle one possibilty: the query can be suspended after
// the ExScheduler::work method has made its final check of the root
// oper stats isSuspended_ flag, but before we get to this code where
// we change ExMasterStats::readyToSuspend_ from READY to NOT_READY.
// To handle this, the subject master will obtain the Stats semaphore
// and test ExMasterStats::isSuspended_. If set to true, it will
// release the semaphore, wait a few seconds, and obtain the semaphore
// and test again. This will repeat until the ExMasterStats::isSuspended_
// is false. Then, before releasing the semaphore, the subject master
// makes the change from READY->NOT_READY. By waiting until the SSMP has
// cleared the ExMasterStats::isSuspended_ flag, we can be sure it has
// also sent messages to the SSCPs to set all the frag root oper stats
// isSuspended_ flag to false.
StatsGlobals *statsGlobals = cliGlobals->getStatsGlobals();
ExMasterStats *mStats = NULL;
StmtStats *sStats = glob->getStatement()->getStmtStats();
if(sStats)
mStats = sStats->getMasterStats();
if (statsGlobals && mStats && mStats->isReadyToSuspend())
{
int error = statsGlobals->getStatsSemaphore(cliGlobals->getSemId(),
cliGlobals->myPin());
while (mStats->isQuerySuspended())
{
// See comments around allowUnitTestSuspend above. This code has
// been manually unit tested, and it too difficult and costly to
// test in an automated script.
statsGlobals->releaseStatsSemaphore(cliGlobals->getSemId(),
cliGlobals->myPin());
DELAY(300);
int error = statsGlobals->getStatsSemaphore(cliGlobals->getSemId(),
cliGlobals->myPin());
}
// Now we have the semaphore, and the query is not suspended. Quick
// set the flag in the masterStats to prevent it from being suspended.
mStats->setNotReadyToSuspend();
// Now it is safe to let MXSSMP process another SUSPEND.
statsGlobals->releaseStatsSemaphore(cliGlobals->getSemId(),
cliGlobals->myPin());
}
// No started message sent, so no finished message should be sent.
if (!isCbStartedMessageSent())
return;
// Holdable cursor is known to deregister more than once.
if (isCbFinishedMessageSent())
return;
// The stream's actOnSend method will delete (or call decrRefCount())
// for this object.
RtsQueryId *rtsQueryId = new (context->getIpcHeap())
RtsQueryId( context->getIpcHeap(),
glob->getStatement()->getUniqueStmtId(),
glob->getStatement()->getUniqueStmtIdLen()
);
RtsHandle rtsHandle = (RtsHandle) this;
// Make the message
QueryFinished *queryFinished =
new (context->getIpcHeap()) QueryFinished( rtsHandle,
context->getIpcHeap());
//Create the stream on the IpcHeap, since we don't dispose of it immediately.
//We just add it to the list of completed messages in the IpcEnv, and it is disposed of later.
//If we create it in the executor heap, that heap gets deallocated when the statement is
//finished, and we can corrupt some other statement's heap later on when we deallocate this stream.
if (queryFinishedStream_ == NULL)
{
queryFinishedStream_ = new (context->getIpcHeap())
QueryFinishedMsgStream(context->getEnvironment(), this, context->getSsmpManager());
queryFinishedStream_->addRecipient(cbServer_->getControlConnection());
}
*queryFinishedStream_ << *queryFinished;
queryFinished->decrRefCount();
*queryFinishedStream_<< *rtsQueryId;
rtsQueryId->decrRefCount();
// send the no-wait request to let the cancel broker know this query has started.
queryFinishedStream_->send(FALSE);
setCbFinishedMessageSent();
}
void ex_root_tcb::setCbFinishedMessageReplied()
{
ExMasterStmtGlobals *glob =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals();
ContextCli *context = glob->getCliGlobals()->currContext();
cbCommStatus_ &= ~FINISHED_PENDING_;
}
void ex_root_tcb::cbMessageWait(Int64 waitStartTime)
{
ExMasterStmtGlobals *master_glob = getGlobals()->
castToExExeStmtGlobals()->castToExMasterStmtGlobals();
NABoolean cbDidTimedOut = FALSE;
while (anyCbMessages())
{
// Allow the Control Broker 5 minutes to reply.
// The time already spent waiting (see caller
// Statement::releaseTransaction) is counted in this 5 minutes.
Int64 timeSinceCBMsgSentMicroSecs = waitStartTime ?
NA_JulianTimestamp() - waitStartTime : 0 ;
IpcTimeout timeSinceCBMsgSentCentiSecs = (IpcTimeout)
(timeSinceCBMsgSentMicroSecs / 10000);
IpcTimeout cbTimeout = (5*60*100) - timeSinceCBMsgSentCentiSecs;
if (cbTimeout <= 0)
cbTimeout = IpcImmediately;
master_glob->getIpcEnvironment()->getAllConnections()->
waitOnAll(cbTimeout, FALSE, &cbDidTimedOut);
if (cbDidTimedOut)
{
SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__,
"Dumping the MXSSMP after IPC timeout.", 0);
dumpCb();
ex_assert(!cbDidTimedOut, "Timeout waiting for control broker.");
}
}
}
void ex_root_tcb::dumpCb()
{
ExMasterStmtGlobals *master_glob = getGlobals()->
castToExExeStmtGlobals()->castToExMasterStmtGlobals();
CliGlobals *cliGlobals = master_glob->getCliGlobals();
StatsGlobals *statsGlobals = cliGlobals->getStatsGlobals();
if (statsGlobals == NULL)
return;
int error = statsGlobals->getStatsSemaphore(cliGlobals->getSemId(),
cliGlobals->myPin());
bool doDump = false;
Int64 timenow = NA_JulianTimestamp();
if ((timenow - statsGlobals->getSsmpDumpTimestamp()) >
5*60*1000*1000) // 5 minutes
{
doDump = true;
statsGlobals->setSsmpDumpTimestamp(timenow);
}
statsGlobals->releaseStatsSemaphore(cliGlobals->getSemId(),
cliGlobals->myPin());
if (doDump)
cbServer_->getServerId().getPhandle().dumpAndStop(true, false);
}
// -----------------------------------------------------------------------
// Methods for QueryStartedMsgStream.
// -----------------------------------------------------------------------
void QueryStartedMsgStream::actOnSend(IpcConnection *conn)
{
if (conn->getErrorInfo() != 0)
delinkConnection(conn);
}
void QueryStartedMsgStream::actOnSendAllComplete()
{
clearAllObjects();
receive(FALSE);
}
void QueryStartedMsgStream::actOnReceive(IpcConnection *connection)
{
if (connection->getErrorInfo() == 0)
{
CollHeap *ipcHeap = connection->getEnvironment()->getHeap();
QueryStartedReply *reply = new (ipcHeap)
QueryStartedReply(INVALID_RTS_HANDLE, ipcHeap);
*this >> *reply;
ex_assert( !moreObjects(), "Unknown extra objects follow started reply");
if(reply->isNextActionComplete())
; // normal completion of a query. This query requested a reply to its
// own QueryStarted msg.
else if (rootTcb_)
{
ex_assert(reply->isNextActionCancel(), "Unknown reply to Query Started message.");
rootTcb_->requestCancel();
if (reply->cancelLogging())
{
ExMasterStmtGlobals *glob = rootTcb_->getGlobals()->
castToExExeStmtGlobals()->castToExMasterStmtGlobals();
char *qid = glob->getStatement()->getUniqueStmtId();
char msg[80 + // the constant text
ComSqlId::MAX_QUERY_ID_LEN];
str_sprintf(msg,
"Master executor of query %s has begun internal SQL cancel.",
qid);
SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__, msg, 0);
}
}
reply->decrRefCount();
}
else
delinkConnection(connection);
}
void QueryStartedMsgStream::delinkConnection(IpcConnection *conn)
{
char nodeName[MAX_SEGMENT_NAME_LEN+1];
IpcCpuNum cpu;
conn->getOtherEnd().getNodeName().getNodeNameAsString(nodeName);
cpu = conn->getOtherEnd().getCpuNum();
ssmpManager_->removeSsmpServer(nodeName, (short)cpu);
}
void QueryStartedMsgStream::actOnReceiveAllComplete()
{
if (rootTcb_)
{
// clear i/o pending flag for Statement::releaseTransaction
rootTcb_->setCbStartedMessageReplied();
// root's pointer to this stream is no longer valid.
rootTcb_->setQueryStartedMsgStreamIsInvalid();
// This stream's pointer to root is no longer valid.
rootTcb_ = NULL;
}
addToCompletedList();
}
// -----------------------------------------------------------------------
// Methods for QueryFinishedMsgStream.
// -----------------------------------------------------------------------
void QueryFinishedMsgStream::actOnSend(IpcConnection *conn)
{
if (conn->getErrorInfo() != 0)
delinkConnection(conn);
}
void QueryFinishedMsgStream::actOnSendAllComplete()
{
clearAllObjects();
receive(FALSE);
}
void QueryFinishedMsgStream::actOnReceive(IpcConnection *connection)
{
if (connection->getErrorInfo() == 0)
{
CollHeap *ipcHeap = connection->getEnvironment()->getHeap();
RmsGenericReply *reply = new (ipcHeap)
RmsGenericReply(ipcHeap);
*this >> *reply;
ex_assert( !moreObjects(), "Unknown extra objects follow started reply");
reply->decrRefCount();
}
else
delinkConnection(connection);
}
void QueryFinishedMsgStream::delinkConnection(IpcConnection *conn)
{
char nodeName[MAX_SEGMENT_NAME_LEN+1];
IpcCpuNum cpu;
conn->getOtherEnd().getNodeName().getNodeNameAsString(nodeName);
cpu = conn->getOtherEnd().getCpuNum();
ssmpManager_->removeSsmpServer(nodeName, (short)cpu);
}
void QueryFinishedMsgStream::actOnReceiveAllComplete()
{
if (rootTcb_)
{
// clear i/o pending flag for Statement::releaseTransaction
rootTcb_->setCbFinishedMessageReplied();
// root's pointer to this stream is no longer valid.
rootTcb_->setQueryFinishMsgStreamIsInvalid();
// This stream's pointer to root is no longer valid.
rootTcb_ = NULL;
}
addToCompletedList();
}
void ex_root_tcb::cpuLimitExceeded()
{
cpuLimitExceeded_ = TRUE;
requestCancel();
}