blob: 81ee824c2613c357b7fafebf8d8fd768969d258b [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ex_esp_frag_dir.cpp
* Description: Fragment instance directory in the ESP
*
* Created: 1/22/96
* Language: C++
*
*
*
*
*****************************************************************************
*/
// -----------------------------------------------------------------------
#include "Platform.h"
#include <stdlib.h>
#include <sys/syscall.h>
#include "ex_stdh.h"
#include "ex_exe_stmt_globals.h"
#include "ex_esp_frag_dir.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ex_split_bottom.h"
#include "ex_send_bottom.h"
#include "ComSpace.h"
#include "ComDiags.h"
#include "LateBindInfo.h"
#include "NAHeap.h"
#include "NAMemory.h"
#include "ExpError.h"
#include "Globals.h"
#include "SqlStats.h"
#include "ComRtUtils.h"
#include "PortProcessCalls.h"
#include "ExStats.h"
#include "ExSMTrace.h"
#include <errno.h>
#include "Context.h"
#include <semaphore.h>
#include "ExSMCommon.h"
#include "ExSMEvent.h"
#include "ExSMGlobals.h"
#include "ExSMShortMessage.h"
#include "ComExeTrace.h"
#include "Context.h"
// -----------------------------------------------------------------------
// Methods for class ExEspFragInstanceDir
// -----------------------------------------------------------------------
ExEspFragInstanceDir::ExEspFragInstanceDir(CliGlobals *cliGlobals,
NAHeap *heap,
StatsGlobals *statsGlobals)
: instances_(heap),
cliGlobals_(cliGlobals),
heap_(heap),
statsGlobals_(statsGlobals),
userIDEstablished_(FALSE),
localStatsHeap_(NULL)
{
// If this is not Linux, database user ID is the same as process ID
// so our database identity is already established
numActiveInstances_ = 0;
highWaterMark_ = 0;
numMasters_ = 1;
int error;
//Phandle wrapper in porting layer
NAProcessHandle phandle;
phandle.getmine();
phandle.decompose();
cpu_ = phandle.getCpu();
pid_ = phandle.getPin();
tid_ = syscall(SYS_gettid);
if (statsGlobals_ == NULL
|| (statsGlobals_ != NULL &&
statsGlobals_->getVersion() != StatsGlobals::CURRENT_SHARED_OBJECTS_VERSION_))
{
statsGlobals_ = NULL;
statsHeap_ = new (heap_)
NAHeap("Process Stats Heap", (NAHeap *)heap_,
8192,
0);
semId_ = -1;
}
else
{
error = statsGlobals_->openStatsSemaphore(semId_);
// Behave like as if stats is not available
if (error != 0)
{
statsGlobals_ = NULL;
statsHeap_ = (NAHeap *)heap_;
}
else
{
cliGlobals_->setStatsGlobals(statsGlobals_);
cliGlobals_->setSemId(semId_);
error = statsGlobals_->getStatsSemaphore(semId_, pid_);
statsHeap_ = (NAHeap *)statsGlobals->getStatsHeap()->allocateHeapMemory(sizeof *statsHeap_);
statsHeap_ = new (statsHeap_, statsGlobals->getStatsHeap())
NAHeap("Process Stats Heap", statsGlobals->getStatsHeap(),
8192,
0);
// We need to set up the cliGlobals, since addProcess will call getRTSSemaphore
// and it uses these members
cliGlobals_->setStatsHeap(statsHeap_);
statsGlobals_->addProcess(pid_, statsHeap_);
ExProcessStats *processStats =
statsGlobals_->getExProcessStats(pid_);
processStats->setStartTime(cliGlobals_->myStartTime());
cliGlobals_->setExProcessStats(processStats);
statsGlobals_->releaseStatsSemaphore(semId_, pid_);
}
}
cliGlobals_->setStatsHeap(statsHeap_);
cliGlobals_->setStatsGlobals(statsGlobals_);
cliGlobals_->setSemId(semId_);
// Fragment instance trace
fiTidx_ = 0;
initFiStateTrace();
// add the instance trace to global trace repository
traceRef_ = NULL;
ExeTraceInfo *ti = cliGlobals_->getExeTraceInfo();
if (ti)
{
Int32 lineWidth = 42; // temp
void *regdTrace;
char traceDesc[80];
sprintf(traceDesc, "Trace plan fragment instance state changes");
Int32 ret = ti->addTrace("FragmentInstance", this, NumFiTraceElements, 3,
this, getALine,
&fiTidx_,
lineWidth, traceDesc, &regdTrace);
if (ret == 0)
{
// trace info added successfully, now add entry fields
ti->addTraceField(regdTrace, "FragInst#", 0,
ExeTrace::TR_INT32);
ti->addTraceField(regdTrace, "State ", 1, ExeTrace::TR_STRING);
ti->addTraceField(regdTrace, "Line#", 2, ExeTrace::TR_INT32);
traceRef_ = (ExeTrace*) regdTrace;
}
}
}
ExEspFragInstanceDir::~ExEspFragInstanceDir()
{
// if we come here we are exiting or abending from the process,
// no point in making error checks
if (statsGlobals_ != NULL)
{
int error = statsGlobals_->getStatsSemaphore(semId_, pid_);
statsGlobals_->removeProcess(pid_);
statsGlobals_->releaseStatsSemaphore(semId_, pid_);
sem_close((sem_t *)semId_);
}
}
ExFragInstanceHandle ExEspFragInstanceDir::findHandle(
const ExFragKey &key) const
{
for (CollIndex i = 0; i < highWaterMark_; i++)
{
if (instances_.used(i) && instances_[i]->key_ == key)
return instances_[i]->handle_;
}
return NullFragInstanceHandle;
}
ExFragInstanceHandle ExEspFragInstanceDir::addEntry(ExMsgFragment *msgFragment,
IpcConnection *connection)
{
ExFragInstanceHandle result = instances_.unusedIndex();
ExEspFragInstance *inst = new(heap_) ExEspFragInstance;
const ExFragKey &key = msgFragment->getKey();
inst->key_ = key;
inst->handle_ = result;
inst->fragType_ = msgFragment->getFragType();
inst->parentKey_ = ExFragKey(key.getProcessId(),
key.getStatementHandle(),
msgFragment->getParentId());
inst->controlConn_ = connection;
inst->topNodeOffset_ = msgFragment->getTopNodeOffset();
inst->msgFragment_ = msgFragment;
inst->localRootTdb_ = (ex_split_bottom_tdb *)
(msgFragment->getFragment() + inst->topNodeOffset_);
inst->mxvOfOriginator_ = msgFragment->getMxvOfOriginator();
inst->planVersion_ = msgFragment->getPlanVersion();
inst->queryId_ = (char *) msgFragment->getQueryId();
inst->queryIdLen_ = msgFragment->getQueryIdLen();
inst->localRootTcb_ = NULL;
if (msgFragment->getFragType() == ExFragDir::ESP)
{
NAHeap *fragHeap = new(heap_) NAHeap("ESP Fragment Heap",
(NAHeap *) heap_,32768);
Space * fragSpace = new(fragHeap) Space(Space::EXECUTOR_SPACE);
fragSpace->setParent(fragHeap);
// allocate the globals in their own heap, like the master
// executor does it
inst->globals_ = new(fragHeap) ExEspStmtGlobals(
(short) msgFragment->getNumTemps(),
cliGlobals_,
msgFragment->getDisplayInGui(),
fragSpace,
fragHeap,
this,
inst->handle_,
msgFragment->getInjectErrorAtExpr(),
inst->queryId_,
inst->queryIdLen_);
}
else
inst->globals_ = NULL; // no globals needed for DP2 fragments
inst->numSendBottomRequests_ = 0;
inst->numSendBottomCancels_ = 0;
inst->numLateCancelRequests_ = 0;
inst->displayInGui_ = msgFragment->getDisplayInGui();
instances_.insertAt(result,inst);
setFiState(result, DOWNLOADED, __LINE__);
if (result >= highWaterMark_)
highWaterMark_ = result + 1;
return result;
}
void ExEspFragInstanceDir::fixupEntry(ExFragInstanceHandle handle,
Lng32 numOfParentInstances,
ComDiagsArea &da)
{
ExEspFragInstance *entry = NULL;
FragmentInstanceState entryState = UNUSED;
if (handle != NullFragInstanceHandle AND instances_.used(handle))
{
entry = instances_[handle];
entryState = entry->fiState_;
}
switch (entryState)
{
case DOWNLOADED:
{
if (checkPlanVersion(entry, da))
{
// ERROR: there was a plan versioning error
break;
}
ComTdb *rootTdb = entry->localRootTdb_;
// Set up reallocation space for unpacking. Use the space managed
// by the globals of the entry.
//
//rootTdb->setReallocator(entry->globals_->getSpace());
void *base = (void *)entry->msgFragment_->getFragment();
ComTdb dummyTdb;
if ( (rootTdb = (ex_split_bottom_tdb *)
rootTdb->driveUnpack(base,&dummyTdb,
entry->globals_->getSpace())) == NULL )
{
// ERROR during unpacking. Most likely case is verison-unsupported.
//
entryState = setFiState(handle, BROKEN, __LINE__);
break;
}
else
{
// The root tdb might have been relocated after unpacking due to a
// version upgrade.
//
entry->localRootTdb_ = (ex_split_bottom_tdb *)(rootTdb);
entryState = setFiState(handle, UNPACKED, __LINE__);
// continue to next case
}
}
case UNPACKED:
entry->localRootTcb_ =
entry->localRootTdb_->buildESPTcbTree(entry->globals_,
this,
entry->key_,
entry->parentKey_,
handle,
numOfParentInstances);
entry->globals_->takeGlobalDiagsArea(da);
if (da.mainSQLCODE() >= 0)
{
entryState = setFiState(handle, BUILT, __LINE__);
}
else
// if there is any error, do not change the state to BROKEN but
// break out so that the executor master will wait and receive
// replies from all ESPs. (CR # 10-020919-5026)
break;
// continue to next case
case BUILT:
case FIXED_UP:
case READY_INACTIVE:
entry->localRootTcb_->fixup();
entry->globals_->takeGlobalDiagsArea(da);
if (da.mainSQLCODE() < 0)
// Fixup failed, let master executor redrive when it receives
// the diags area
entryState = setFiState(handle, BUILT, __LINE__);
else
// Statement is now definitely fixed up, it stays ready
// for work if it was ready before the fixup.
if (entryState == BUILT)
entryState = setFiState(handle, FIXED_UP, __LINE__);
break; // leave the switch here, rest is error handling
case ACTIVE:
case BROKEN:
case UNUSED:
default:
ex_assert(FALSE, "Wrong state for downloaded frag");
// this is a serious error, don't use this statement any more
if (entry)
entryState = setFiState(handle, BROKEN, __LINE__);
// $$$$ set Diagnostics area
}
}
void ExEspFragInstanceDir::openedSendBottom(
ExFragInstanceHandle handle)
{
if (instances_[handle]->fiState_ == FIXED_UP)
{
// now that we are linked up (at least partially)
// with our clients, we are ready to do work
setFiState(handle, READY_INACTIVE, __LINE__);
}
}
// The following "started" and "finshed" methods are partially
// responsible for handling the normal ESP fragment instance
// state transitions of
// READY_INACTIVE -> ACTIVE -> RELEASING_WORK -> READY_INACTIVE
// Specifically, these methods handle the first two transitions.
// See ExEspFragInstanceDir::work for the code handling the final
// transition: RELEASING_WORK -> READY_INACTIVE.
void ExEspFragInstanceDir::startedSendBottomRequest(
ExFragInstanceHandle handle)
{
ex_assert(instances_.used(handle),
"handle: ExEspFragInstanceDir::startedSendBottomRequest");
instances_[handle]->numSendBottomRequests_++;
if (instances_[handle]->fiState_ == READY_INACTIVE)
{
ex_assert(instances_[handle]->numSendBottomRequests_ == 1,
"count: ExEspFragInstanceDir::startedSendBottomRequest");
setFiState(handle, ACTIVE, __LINE__);
numActiveInstances_++;
}
}
void ExEspFragInstanceDir::finishedSendBottomRequest(
ExFragInstanceHandle handle)
{
ex_assert(instances_.used(handle),
"handle: ExEspFragInstanceDir::finishedSendBottomRequest");
ex_assert(instances_[handle]->numSendBottomRequests_ > 0,
"requests: ExEspFragInstanceDir::finishedSendBottomRequest");
instances_[handle]->numSendBottomRequests_--;
finishedRequest(handle, TRUE);
}
void ExEspFragInstanceDir::startedSendBottomCancel(
ExFragInstanceHandle handle)
{
ex_assert(instances_.used(handle),
"handle: ExEspFragInstanceDir::startedSendBottomCancel");
instances_[handle]->numSendBottomCancels_++;
if (instances_[handle]->fiState_ == READY_INACTIVE)
{
ex_assert(instances_[handle]->numSendBottomCancels_ == 1,
"count: ExEspFragInstanceDir::startedSendBottomCancel");
setFiState(handle, ACTIVE, __LINE__);
numActiveInstances_++;
}
}
void ExEspFragInstanceDir::finishedSendBottomCancel(
ExFragInstanceHandle handle)
{
ex_assert(instances_.used(handle),
"handle: ExEspFragInstanceDir::finishedSendBottomCancel");
ex_assert(instances_[handle]->numSendBottomCancels_ > 0,
"requests: ExEspFragInstanceDir::finishedSendBottomCancel");
instances_[handle]->numSendBottomCancels_--;
finishedRequest(handle);
}
void ExEspFragInstanceDir::startedLateCancelRequest(
ExFragInstanceHandle handle)
{
ex_assert(instances_.used(handle),
"handle: ExEspFragInstanceDir::startedLateCancelRequest");
instances_[handle]->numLateCancelRequests_++;
}
void ExEspFragInstanceDir::finishedLateCancelRequest(
ExFragInstanceHandle handle)
{
ex_assert(instances_.used(handle),
"handle: ExEspFragInstanceDir::finishedLateCancelRequest");
ex_assert(instances_[handle]->numLateCancelRequests_ > 0,
"requests: ExEspFragInstanceDir::finishedLateCancelRequest");
instances_[handle]->numLateCancelRequests_--;
finishedRequest(handle);
}
// This method handles the ACTIVE->RELEASING_WORK transition that
// happens when the ESP runs out of work to do. The RELEASING_WORK
// state might be only temporary, e.g., if this ESP is on the right
// side of a flow node and will get more probes later.
// The other way to transition ACTIVE->RELEASING_WORK is
// for split_bottom to get a ESP_RELEASE_TRANSACTION_HDR message.
// Even in this case, the RELEASING_WORK state might be temporary,
// for example it might be to allow a temporary suspension of the
// transaction.
void ExEspFragInstanceDir::finishedRequest(
ExFragInstanceHandle handle,
NABoolean testAllQueues)
{
if (instances_[handle]->numSendBottomRequests_ <= 0 &&
instances_[handle]->numLateCancelRequests_ <= 0 &&
instances_[handle]->numSendBottomCancels_ <= 0 &&
instances_[handle]->fiState_ == ACTIVE &&
instances_[handle]->globals_->anySendTopMsgesOut() == FALSE)
{
// if all requests from client are finished or canceled, and
// and if there are no pending messages to servers (these can
// be eager send top continue messages that haven't got the empty
// reply yet), and if this entry is still in the
// active state, then signal to the ESP fragment directory to
// deactivate this fragment and to reply to its work request.
setFiState(handle, RELEASING_WORK, __LINE__);
if (testAllQueues)
instances_[handle]->localRootTcb_->testAllQueues();
}
}
Lng32 ExEspFragInstanceDir::numLateCancelRequests(ExFragInstanceHandle handle)
{
return instances_[handle]->numLateCancelRequests_;
}
void ExEspFragInstanceDir::releaseEntry(ExFragInstanceHandle handle)
{
ExEspFragInstance *entry;
if (handle != NullFragInstanceHandle AND instances_.used(handle))
{
entry = instances_[handle];
}
else
{
// error
ex_assert(FALSE,"Fragment instance not found");
return;
}
// loop again over all entries to remove the DP2 fragments that were
// used by this one
ExFragId fragId = entry->key_.getFragId();
for (CollIndex i = 0; i < highWaterMark_; i++)
{
if (instances_.used(i) AND
instances_[i]->fragType_ == ExFragDir::DP2 AND
instances_[i]->parentKey_ == entry->key_)
{
setFiState(i, RELEASING, __LINE__);
}
}
// now remove the entry itself
setFiState(handle, RELEASING, __LINE__);
}
void ExEspFragInstanceDir::releaseOrphanEntries()
{
for (CollIndex i = 0; i < highWaterMark_; i++)
{
if (instances_.used(i) AND
NOT instances_[i]->controlConn_->isConnected())
{
setFiState(i, RELEASING, __LINE__);
}
}
}
void ExEspFragInstanceDir::hasTransidReleaseRequest(
ExFragInstanceHandle handle)
{
ExEspFragInstance *entry;
if (handle != NullFragInstanceHandle AND instances_.used(handle))
{
entry = instances_[handle];
}
else
{
// error
ex_assert(FALSE,"Fragment instance not found");
return;
}
// - at beginning of query execution, master sends a work request msg to
// all the esps. The esp does not reply to the work msg but instead leave it
// pending. Then at end of query master sends a release work msg to all the
// esps. Upon receiving the release work msg, the esp will reply to release
// work msg plus replying to the work msg saved earlier. However, during
// query execution, it is possible that an esp may find all of its queues
// empty and thus has no work left to do. in the past we allow esp to do an
// early reply to the work msg even if it has not received the release work
// msg from master. When esp does early reply to the work msg, depends on
// timing on the master side, two things could happen:
//
// - master sends another work request msg to esp. esp will save it and
// won't reply until it receives release work msg from master.
//
// - master receives EOD from all top level esps. Then master sends release
// work msg to all esps EXCEPT the esp that has already replied to the work
// msg. i.e., in this case the esp that does early reply to work msg will
// not receive a release work msg from master.
//
// now we don't allow esp early reply to work msg any more. this means that
// even if an esp has no work left to do, it will sit idle until it receives
// the release work msg from master and then reply to both work msg and
// release work msg. all esps are guaranteed to receive a release work msg
// (except in error situations).
//
assert(entry->localRootTcb_->hasTransaction());
entry->localRootTcb_->releaseWorkRequest();
// set entry's state to RELEASING_WORK. note that we may still have some
// work left with other esps (such as cancel request), but not with master
// since we have already replied to master's work request above.
if (entry->fiState_ == WAIT_TO_RELEASE)
setFiState(handle, RELEASING_WORK, __LINE__);
else
// 1. the path where finishedRequest() is called before esp receives
// release work msg, or
// 2. no consumer esp opened this esp as server.
assert(entry->fiState_ == RELEASING_WORK ||
entry->fiState_ == READY_INACTIVE ||
entry->fiState_ == FIXED_UP);
// if esp inactive timeout is turned on, then start the
// inactive timestamp counter for this esp.
//
// note that since we don't allow esp sharing, each esp
// can be used by only one statement. thus esp becomes
// inactive if it receives the release work msg.
//
if (getEnvironment()->getInactiveTimeout() > 0)
getEnvironment()->setInactiveTimestamp();
}
void ExEspFragInstanceDir::hasReleaseRequest(
ExFragInstanceHandle handle)
{
ex_assert((handle != NullFragInstanceHandle) && instances_.used(handle),
"Fragment instance not found");
ExEspFragInstance *entry = instances_[handle];
setFiState(handle, WAIT_TO_RELEASE, __LINE__);
}
void ExEspFragInstanceDir::work(Int64 prevWaitTime)
{
ULng32 startSeqNo;
NABoolean transactionRestored;
// The following DO loop can be forced to execute again by setting
// loopAgain to TRUE
NABoolean loopAgain = FALSE;
do
{
loopAgain = FALSE;
// -----------------------------------------------------------------
// remember the I/O sequence number when we start
// -----------------------------------------------------------------
startSeqNo =
getEnvironment()->getAllConnections()->getCompletionSeqenceNo();
// -----------------------------------------------------------------
// Call the work() method once for each active instance.
// NOTE: if the instances can wake each other up, then they must
// artificially bump the external I/O completion count
// (see, for example, the local send top/bottom nodes).
// -----------------------------------------------------------------
for (CollIndex currInst = 0; currInst < highWaterMark_; currInst++)
{
if (instances_.used(currInst))
{
switch (instances_[currInst]->fiState_)
{
case RELEASING_WORK:
// - an esp instance can become RELEASING_WORK state from
// two paths:
//
// 1. finishedRequest(): esp has no work left to do. all queues
// are empty. but master has not sent the release work msg
// yet. note that in the past esp will do an early reply to
// master's work request. then depending on the timings
// master may send another work request or master may not
// do anything further with this esp (not even a release work
// msg). but that is not the case any more. esp no longer
// does early reply to the work request. instead, even if
// esp has no work to do, it will just sit idle until master
// sends release work msg. so esp will always receives an
// release work msg from master.
//
// 2. hasTransidReleaseRequest(): esp receives release work
// msg from master.
//
// dependent on whether the TCB tree has some requests from
// send bottom nodes or not, set its state to ACTIVE or
// READY_INACTIVE (restoreTransaction() will prevent
// active fragment instances from working when they don't
// have a work message)
if ((instances_[currInst]->numSendBottomRequests_ > 0) ||
(instances_[currInst]->numSendBottomCancels_ > 0) ||
(instances_[currInst]->numLateCancelRequests_ > 0)
)
{
setFiState(currInst, ACTIVE, __LINE__);
}
else
{
setFiState(currInst, READY_INACTIVE, __LINE__);
// decrement the global number of active fragment
// instances per ESP
numActiveInstances_--;
// we shouldn't need to work on this fragment instance
// anymore
break;
}
// fall through to the next case
case ACTIVE:
#ifdef NA_DEBUG_GUI
if (instances_[currInst]->displayInGui_ == 2)
instances_[currInst]->globals_->getScheduler()->startGui();
#endif
// To help debugging (dumps): Put current SB TCB in cli globals
cliGlobals_->setRootTcb(instances_[currInst]->localRootTcb_);
// -------------------------------------------------------
// Call the scheduler work procedure for this fragment
// instance, but not before restoring its transid.
// -------------------------------------------------------
transactionRestored =
instances_[currInst]->globals_->restoreTransaction();
if ( transactionRestored ||
// Allow working without a transaction, otherwise
// "continue" messages might never get their replies.
(instances_[currInst]->numSendBottomRequests_ > 0) ||
// The test below covers the case where the work
// request finished and released the transaction
// before the cancel request was received.
(instances_[currInst]->numSendBottomCancels_ > 0) ||
(instances_[currInst]->numLateCancelRequests_ > 0)
)
{
if (!transactionRestored)
{
// transactionRestored is FALSE if a transid is required
// for this request to work, but the transid is currently
// -1, i.e., no work request has been sent to this ESP
// yet. In this case, no new requests should be posted to
// DP2 (solution #10-060628-7424).
instances_[currInst]->globals_->setNoNewRequest(TRUE);
}
// -----------------------------------------------------
// This is where we do the actual work
// -----------------------------------------------------
ExWorkProcRetcode retcode =
instances_[currInst]->globals_->getScheduler()->work(prevWaitTime);
if (retcode == WORK_BAD_ERROR)
{
setFiState(currInst, GOING_FATAL, __LINE__);
// Force the loop to execute again so that error
// processing can continue
loopAgain = TRUE;
}
}
break;
case WAIT_TO_RELEASE:
{
ExEspStmtGlobals *espGlobals = instances_[currInst]->globals_;
if (espGlobals->anyCancelMsgesOut() ||
espGlobals->anySendTopMsgesOut())
{
if (espGlobals->getSMQueryID() > 0)
{
EXSM_TRACE(EXSM_TRACE_IO_ALL,
"WAIT_TO_RELEASE inst %d",
(int) currInst);
EXSM_TRACE(EXSM_TRACE_IO_ALL,
" sndt msgs %d cancel msgs %d",
(int) espGlobals->numSendTopMsgesOut(),
(int) espGlobals->numCancelMsgesOut());
}
// Must stay in WAIT_TO_RELEASE state. Must let other
// frag instances (which might be hosted in this ESP
// process) work, so wait for I/O completion outside
// the scope of this frag instance (i.e., in the caller
// of this ExExpFragInstanceDir::work method.)
}
else
{
// Ready now to reply to release work request and
// change state to RELEASING_WORK.
hasTransidReleaseRequest(currInst);
}
break;
}
case RELEASING:
{
// release any outstanding messages before destroying the
// instance (to avoid hangs in the master Executor)
ExEspFragInstance * fi = instances_[currInst];
if (fi->localRootTcb_) // if root tcb is still around
fi->localRootTcb_->releaseWorkRequest();
destroyEntry(currInst);
break;
}
case GOING_FATAL:
if (instances_[currInst]->localRootTcb_->reportErrorToMaster())
setFiState(currInst, BROKEN, __LINE__);
break;
case BROKEN:
// waiting for message from master to release.
break;
} // switch
} // if instance is used and not suspended
} // loop over fragment instances
} while (loopAgain || getEnvironment()->getAllConnections()->
getCompletionSeqenceNo() != startSeqNo);
// clean up the completed MasterEspMessages
getEnvironment()->deleteCompletedMessages();
// When we exit here, we have called all active schedulers once,
// they all have worked until they blocked, and no I/Os have
// completed through the cycle. This must mean that we're stuck
// until something external happens. Note that if fragment instances
// send each other data they must bump the completion sequence number
// or this method may cause a deadlock.
}
ExEspFragInstanceDir::ExEspFragInstance * ExEspFragInstanceDir::findEntry(
const ExFragKey &key)
{
CollIndex numEntries = instances_.entries();
// compare keys
for (CollIndex i = 0; i < numEntries; i++)
if (instances_.used(i) AND instances_[i]->key_ == key)
// found it
return instances_[i];
// this key doesn't exist in the fragment instance directory
return NULL;
}
void ExEspFragInstanceDir::destroyEntry(ExFragInstanceHandle handle)
{
ExEspFragInstance *entry = instances_[handle];
// tcb tree will be deleted by glob->deleteMe().
entry->localRootTcb_ = NULL;
entry->queryId_ = NULL; // deleted by MsgFragment destructor
if (entry->globals_)
{
Space *sp = entry->globals_->getSpace();
NAHeap *hp = (NAHeap *)entry->globals_->getDefaultHeap();
entry->globals_->deleteMe(FALSE);
entry->globals_ = NULL;
NADELETE(sp, Space, hp);
NADELETE(hp, NAHeap, heap_);
}
entry->msgFragment_->decrRefCount();
entry->msgFragment_ = NULL;
// delete the entry itself and remove it from the collection
// (sorry, no destructors are called)
heap_->deallocateMemory(entry);
instances_.remove(handle);
if (instances_.entries() == 0)
{
// this esp has been released by all statements and is now available
getEnvironment()->setIdleTimestamp();
this->traceIdleMemoryUsage();
}
// adjust the high water mark, if needed
while (highWaterMark_ > 0 AND NOT instances_.used(highWaterMark_-1))
highWaterMark_--;
}
void ExEspFragInstanceDir::traceIdleMemoryUsage()
{
static bool first_time = true, mem_trace = false;
static FILE *traceFile, *procStatusFile;
static NAHeap *exHeap, *ipcHeap, *contextHeap;
static UInt32 count = 0;
static pid_t myPid;
char fileName[32], buffer[1024], *currPtr;
ULng32 memPeak, memSize; // VMSize in KB
time_t timeInSecs;
tm *localTime;
Int32 success;
size_t bytesRead;
if (first_time)
{
char * env_var = getenv("ESP_IDLE_MEMORY_TRACE");
if (env_var && *env_var == '1')
mem_trace = true;
first_time = false;
}
if (mem_trace)
{
count += 1;
if (count == 1)
{
contextHeap = cliGlobals_->currContext()->exHeap();
traceFile = fopen("espmemuse.log", "r+");
if (traceFile == NULL)
{
traceFile = fopen("espmemuse.log", "a");
if (traceFile == NULL)
{
fprintf(stderr, "ESP_IDLE_MEMORY_TRACE errno %d opening espmemuse.log by pid %d\n", errno, myPid);
mem_trace = false;
return;
}
else
fprintf(traceFile, "TIME,PID,INST,EXAllSize,Cnt,TotSize,AllHW,IPCAllSz,Cnt,CONTEXTAllSize,VmSz,VmPk\n");
}
myPid = getpid();
sprintf(fileName, "/proc/%d/status", myPid);
procStatusFile = fopen(fileName, "r");
if (procStatusFile == NULL)
{
fprintf(stderr, "ESP_IDLE_MEMORY_TRACE errno %d opening /proc/%d/status\n", errno, myPid);
mem_trace = false;
return;
}
}
else
success = fseek(procStatusFile, 0, SEEK_SET);
bytesRead = fread(buffer, 1, 1024, procStatusFile);
currPtr = strstr(buffer, "VmPeak");
sscanf(currPtr, "%*s %u kB", &memPeak);
currPtr = strstr(buffer, "VmSize");
sscanf(currPtr, "%*s %u kB", &memSize);
timeInSecs = time(0);
localTime = localtime(&timeInSecs);
success = fseek(traceFile, 0, SEEK_END); // append to end of file
fprintf(traceFile, "%d:%02d:%02d,%d,%d,", localTime->tm_hour,
localTime->tm_min, localTime->tm_sec, myPid, count);
fprintf(traceFile, PFSZ "," PFSZ "," PFSZ "," PFSZ ",%ld,%ld\n",
heap_->getAllocSize(),
heap_->getAllocCnt(),
heap_->getTotalSize(),
heap_->getHighWaterMark(),
(Long)memSize * 1024,
(Long)memPeak * 1024);
fflush(traceFile);
}
}
Lng32 ExEspFragInstanceDir::checkPlanVersion(const ExEspFragInstance * entry, ComDiagsArea& da)
{
VersionErrorCode versionError = VERSION_NO_ERROR;
return versionError;
}
NAHeap *ExEspFragInstanceDir::getLocalStatsHeap()
{
if (localStatsHeap_ == NULL)
{
// Need to initialize it first. See if the
// other stats heap is local. If so, just use it.
if (statsGlobals_ == NULL)
localStatsHeap_ = statsHeap_;
else
localStatsHeap_ = new (heap_)
NAHeap("Process Local Stats Heap", (NAHeap *)heap_,
8192, 0);
}
return localStatsHeap_;
}
void ExEspFragInstanceDir::setDatabaseUserID(Int32 userID,
const char *userName)
{
ex_assert(cliGlobals_, "CliGlobals pointer should not be NULL");
ContextCli *context = cliGlobals_->currContext();
ex_assert(context, "ContextCli pointer should not be NULL");
if (userIDEstablished_ == TRUE)
{
// close all (shared) opens if switching user by passing "true"
context->setDatabaseUserInESP(userID, userName, true);
}
else
{
context->setDatabaseUserInESP(userID, userName, false);
userIDEstablished_ = TRUE;
}
}
void ExEspFragInstanceDir::initFiStateTrace()
{
for (fiTidx_ = 0; fiTidx_ < NumFiTraceElements; fiTidx_++)
{
fiStateTrace_[fiTidx_].fragId_ = UNUSED_COLL_ENTRY;
fiStateTrace_[fiTidx_].fiState_ = UNUSED;
fiStateTrace_[fiTidx_].lineNum_ = __LINE__;
}
}
enum ExEspFragInstanceDir::FragmentInstanceState
ExEspFragInstanceDir::setFiState(ExFragInstanceHandle fragId,
enum ExEspFragInstanceDir::FragmentInstanceState newState,
Int32 linenum)
{
if (newState != instances_[fragId]->fiState_)
{
if (++fiTidx_ >= NumFiTraceElements)
fiTidx_ = 0;
instances_[fragId]->fiState_ = newState;
fiStateTrace_[fiTidx_].fragId_ = fragId;
fiStateTrace_[fiTidx_].fiState_ = newState;
fiStateTrace_[fiTidx_].lineNum_ = linenum;
}
return newState;
}
Int32
ExEspFragInstanceDir::printALiner(Int32 lineno, char *buf)
{
Int32 rv = 0;
const char *stateName = "UNKNOWN";
if (lineno >= NumFiTraceElements)
return rv;
FiStateTrace *fi = &fiStateTrace_[lineno];
if ((FiStateTrace *)NULL == fi)
return rv;
if ((UNUSED <= fi->fiState_) && (fi->fiState_ <= BROKEN))
stateName = FragmentInstanceStateName[fi->fiState_];
rv = sprintf(buf, "%.4d %9d %.15s %6d\n", lineno, fi->fragId_,
stateName,
fi->lineNum_);
return rv;
}
// -----------------------------------------------------------------------
// Methods for class ExEspControlMessage
// -----------------------------------------------------------------------
ExEspControlMessage::ExEspControlMessage(
ExEspFragInstanceDir *fragInstanceDir,
IpcEnvironment *ipcEnvironment,
CollHeap *heap) : IpcMessageStream(
ipcEnvironment,
IPC_MSG_SQLESP_CONTROL_REPLY,
CurrEspReplyMessageVersion,
0,
TRUE)
{
fragInstanceDir_ = fragInstanceDir;
heap_ = heap;
}
ExEspControlMessage::~ExEspControlMessage()
{
// do nothing
}
void ExEspControlMessage::actOnSend(IpcConnection *connection)
{
// do nothing
if (getState() == ERROR_STATE)
{
ex_assert(FALSE,"Error while replying to a control message from master");
}
else
{
if (connection && connection->getLastSentMsg())
incReplyMsg(connection->getLastSentMsg()->getMessageLength());
else
incReplyMsg(0);
}
}
void ExEspControlMessage::actOnSendAllComplete()
{
// start another receive operation for the next request
clearAllObjects();
receive(FALSE);
}
void ExEspControlMessage::actOnReceive(IpcConnection *connection)
{
if (getState() == ERROR_STATE)
{
ex_assert(FALSE,"Error while receiving a control message from master");
}
ComDiagsArea *da = ComDiagsArea::allocate(heap_);
if (getType() == IPC_MSG_SQLESP_CONTROL_REQUEST)
{
ex_assert(getVersion() == CurrEspRequestMessageVersion,
"Invalid request msg version");
while (moreObjects() && da->mainSQLCODE() >= 0)
{
switch (getNextObjType())
{
case ESP_LOAD_FRAGMENT_HDR:
actOnLoadFragmentReq(connection,*da);
break;
case ESP_FIXUP_FRAGMENT_HDR:
{
actOnFixupFragmentReq(*da);
// The fragment is fixed up. If the query uses
// SeaMonster we send a SeaMonster reply back to the
// master in addition to replying on the Guardian
// control connection.
//
// One FIXUP_REPLY is sent from ESP to master each time
// an ESP successfully fixes up a fragment. There is no
// interesting content in these messages. But they need
// to flow from ESP to master as an SM "go message".
//
// Without these replies flowing from ESP to master, if
// the master were to send a data request to the ESP,
// the receiving SM service might report that ESP
// preposts are not yet available.
// First see if the query uses SM
bool queryUsesSM = false;
Int64 smQueryID = 0;
if (fragInstanceDir_ &&
fragInstanceDir_->instances_.used(currHandle_))
{
ExEspFragInstanceDir::ExEspFragInstance *inst =
fragInstanceDir_->instances_[currHandle_];
ex_split_bottom_tdb *splitBottomTdb = inst->localRootTdb_;
ex_assert(splitBottomTdb, "Invalid split bottom TDB pointer");
if (splitBottomTdb->getQueryUsesSM() && fragInstanceDir_->getEnvironment()->smEnabled())
{
queryUsesSM = true;
if (inst->globals_)
smQueryID = inst->globals_->getSMQueryID();
}
}
if (queryUsesSM &&
da->mainSQLCODE() >= 0)
{
// Send the SM fixup reply
ex_assert(environment_, "Invalid IpcEnvironment pointer");
ex_assert(environment_->getControlConnection(),
"Invalid control connection pointer");
IpcConnection *conn =
environment_->getControlConnection()->getConnection();
ex_assert(conn, "Invalid IpcConnection pointer");
const GuaProcessHandle &phandle =
conn->getOtherEnd().getPhandle();
int otherCPU, otherPID, otherNode_unused;
SB_Int64_Type seqNum = -1;
phandle.decompose(otherCPU, otherPID, otherNode_unused
, seqNum
);
sm_target_t target;
memset(&target, 0, sizeof(target));
// Note: Seaquest node number is the old CPU number
target.node = ExSM_GetNodeID(otherCPU);
target.pid = otherPID;
target.verifier = seqNum;
target.id = (smQueryID > 0 ? smQueryID :
ExSMGlobals::getExeInternalSMID());
EXSM_TRACE(EXSM_TRACE_MAIN_THR,
"Sending FIXUP REPLY to node %d pid %d "
"seqNum %" PRId64 "id %" PRId64,
(int) target.node, (int) target.pid,
seqNum, target.id);
ExSMShortMessage m;
m.setTarget(target);
m.setNumValues(1);
m.setValue(0, ExSMShortMessage::FIXUP_REPLY);
int32_t rc = m.send();
if (rc != 0)
{
*da << DgSqlCode(-EXE_SM_FUNCTION_ERROR)
<< DgString0("ExSM_SendShortMessage")
<< DgInt0((Lng32) rc)
<< DgInt1((Lng32) getpid())
<< DgString1(fragInstanceDir_->getCliGlobals()->myProcessNameString())
<< DgNskCode((Lng32) 10000 + abs(rc));
}
} // if (queryUsesSM)
} // case ESP_FIXUP_FRAGMENT_HDR
break;
case ESP_RELEASE_FRAGMENT_HDR:
actOnReleaseFragmentReq(*da);
break;
case ESP_PARTITION_INPUT_DATA_HDR:
case ESP_WORK_TRANSACTION_HDR:
case ESP_RELEASE_TRANSACTION_HDR:
da->decrRefCount();
actOnReqForSplitBottom(connection);
return; // don't reply, split bottom node will do this
default:
ex_assert(FALSE, "Invalid request for an ESP control connection");
}
}
}
else
{
// is this an error or is this a system message?
ex_assert(FALSE,"Internal error");
// set diagnostics area and reply $$$$
}
// done with receiving, now build the reply
clearAllObjects();
setType(IPC_MSG_SQLESP_CONTROL_REPLY);
setVersion(CurrEspReplyMessageVersion);
ExEspReturnStatusReplyHeader *replyHeader =
new(heap_) ExEspReturnStatusReplyHeader(heap_);
replyHeader->key_ = currKey_;
replyHeader->handle_ = currHandle_;
if (currHandle_ == NullFragInstanceHandle)
{
replyHeader->instanceState_ =
ExEspReturnStatusReplyHeader::INSTANCE_RELEASED;
}
else
{
switch(fragInstanceDir_->instances_[currHandle_]->fiState_)
{
case ExEspFragInstanceDir::UNUSED:
replyHeader->instanceState_ =
ExEspReturnStatusReplyHeader::INSTANCE_RELEASED;
break;
case ExEspFragInstanceDir::DOWNLOADED:
case ExEspFragInstanceDir::UNPACKED:
case ExEspFragInstanceDir::BUILT:
replyHeader->instanceState_ =
ExEspReturnStatusReplyHeader::INSTANCE_DOWNLOADED;
break;
case ExEspFragInstanceDir::FIXED_UP:
case ExEspFragInstanceDir::READY_INACTIVE:
replyHeader->instanceState_ =
ExEspReturnStatusReplyHeader::INSTANCE_READY;
break;
case ExEspFragInstanceDir::ACTIVE:
replyHeader->instanceState_ =
ExEspReturnStatusReplyHeader::INSTANCE_ACTIVE;
break;
case ExEspFragInstanceDir::BROKEN:
default:
replyHeader->instanceState_ =
ExEspReturnStatusReplyHeader::INSTANCE_BROKEN;
break;
}
}
*this << *replyHeader;
replyHeader->decrRefCount();
// pack the SQL diagnostics area into the reply message
if (da->mainSQLCODE())
*this << *da;
da->decrRefCount();
// the individual cases have initialized a reply message,
// now send the reply back to the client
send(FALSE);
}
void ExEspControlMessage::actOnLoadFragmentReq(
IpcConnection *connection,
ComDiagsArea & /*da*/)
{
ExEspLoadFragmentReqHeader receivedRequest(heap_);
ExFragInstanceHandle assignedHandle = NullFragInstanceHandle;
*this >> receivedRequest;
// set stopAfter=0 so esp will never timeout during query execution
environment_->setStopAfter(0);
environment_->setInactiveTimeout(0);
environment_->clearIdleTimestamp();
environment_->clearInactiveTimestamp();
// make sure at least one fragment follows the header
ex_assert(moreObjects() AND getNextObjType() == ESP_FRAGMENT,
"Load message without following fragment");
// 1 or more fragments may follow
while (moreObjects() AND getNextObjType() == ESP_FRAGMENT)
{
ExMsgFragment *msgFragment = new(heap_) ExMsgFragment(heap_);
// get the fragment object out of the message and add it to the
// global fragment directory
*this >> *msgFragment;
// On Linux if this is the first fragment seen by this ESP, set
// the database user identity for this process
Int32 userID = msgFragment->getDatabaseUserID();
const char *userName = msgFragment->getDatabaseUserName();
if (!fragInstanceDir_->getUserIDEstablished())
{
}
fragInstanceDir_->setDatabaseUserID(userID, userName);
ExFragInstanceHandle newHandle =
fragInstanceDir_->addEntry(msgFragment,connection);
if (msgFragment->getFragType() == ExFragDir::ESP)
{
// remember the key and handle of this download request
// (ignore downloaded DP2 fragments, they always come as
// dependents of ESP fragments)
currHandle_ = newHandle;
currKey_ = msgFragment->getKey();
}
} // while more objects
// the reply is handled by the caller
}
void ExEspControlMessage::actOnFixupFragmentReq(ComDiagsArea &da)
{
ExEspFixupFragmentReqHeader receivedRequest(fragInstanceDir_->heap_);
ExProcessIdsOfFragList *poflist = NULL;
ExResolvedNameObj *lnio = NULL;
ExMsgResourceInfo *ri = NULL;
ExMsgTimeoutData * td = NULL;
ExEspStmtGlobals *glob = NULL;
CollHeap *instHeap;
*this >> receivedRequest;
Lng32 maxPollingInterval = receivedRequest.getMaxPollingInterval();
Lng32 persistentOpens = receivedRequest.getPersistentOpens();
environment_->setMaxPollingInterval(maxPollingInterval);
environment_->setPersistentOpens(persistentOpens > 0);
currKey_ = receivedRequest.key_;
currHandle_ = fragInstanceDir_->findHandle(receivedRequest.key_);
if (currHandle_ != NullFragInstanceHandle)
{
glob = fragInstanceDir_->getGlobals(currHandle_);
instHeap = glob->getDefaultHeap();
// are process ids of child fragments following the header?
if (moreObjects() AND getNextObjType() == ESP_PROCESS_IDS_OF_FRAG)
{
poflist = new(instHeap) ExProcessIdsOfFragList(instHeap);
// 1 or more list of input fragment process ids may follow
while (moreObjects() AND
getNextObjType() == ESP_PROCESS_IDS_OF_FRAG)
{
ExProcessIdsOfFrag *pof =
new(instHeap) ExProcessIdsOfFrag(instHeap);
*this >> *pof;
poflist->insert(pof);
}
}
if (moreObjects() AND getNextObjType() == ESP_RESOURCE_INFO)
{
ri = new(instHeap) ExMsgResourceInfo(NULL,instHeap);
*this >> *ri;
}
if (moreObjects() AND getNextObjType() == ESP_TIMEOUT_DATA)
{
td = new(instHeap) ExMsgTimeoutData( NULL, instHeap );
*this >> *td;
}
if (moreObjects() && getNextObjType() == ESP_SM_DOWNLOAD_INFO)
{
// Create a new object to hold SeaMonster properties for this
// query and store a pointer to the object in statement
// globals
ExSMDownloadInfo *info = new (instHeap)
ExSMDownloadInfo(instHeap);
*this >> *info;
glob->setSMDownloadInfo(info);
}
// remember the input fragment process ids in the globals for this
// fragment entry
glob->setPidFragList(poflist);
glob->setResourceInfo(ri);
if ( td ) * glob->getTimeoutData() = td->getTimeoutData() ;
glob->setStatsEnabled(receivedRequest.statsEnabled());
// fixup the statement associated with the handle
fragInstanceDir_->fixupEntry(currHandle_,
receivedRequest.numOfParentInstances_,
da);
}
else
{
ex_assert(FALSE,"entry not found, set diags area and reply");
// $$$$ entry not found set diagnostics area
// $$$$ may have to remove extra message objects to be
// able to send a reply
}
// if fixup priority has been sent, save that so my priority could be restored to
// this value.
if (receivedRequest.getEspFixupPriority() > 0)
{
glob->setMyFixupPriority((IpcPriority)receivedRequest.getEspFixupPriority());
}
// if execute priority has been sent, set my priority to that value.
if (receivedRequest.getEspExecutePriority() > 0)
{
Lng32 rc = 0;
// get my current priority and save it in stmt globals.
// If an error is returned, ignore and leave priorities as is.
// long p;
//rc = ComRtGetProcessPriority(p);
// if (rc == 0) // no error
// {
// set the execute priority
rc =
ComRtSetProcessPriority(receivedRequest.getEspExecutePriority(),
FALSE);
if (rc != 0)
{
// don't do anything.
}
// }
}
// the reply is handled by the caller
}
void ExEspControlMessage::actOnReleaseFragmentReq(ComDiagsArea & /*da*/)
{
// esp being released by master is regarded as idle but not inactive
environment_->setInactiveTimeout(0);
environment_->clearInactiveTimestamp();
ExEspReleaseFragmentReqHeader receivedRequest(fragInstanceDir_->heap_);
*this >> receivedRequest;
currKey_ = receivedRequest.key_;
currHandle_ = fragInstanceDir_->findHandle(receivedRequest.key_);
ExEspStmtGlobals *glob = NULL;
IpcPriority myFixupPriority = 0;
if (receivedRequest.deleteStmt())
{
// delete all fragment instances that come from the same statement
// as the given key
ExFragKey wildCardKey = currKey_;
for (CollIndex i = 0; i < fragInstanceDir_->highWaterMark_; i++)
{
if (fragInstanceDir_->instances_.used(i))
{
if (glob == NULL)
{
glob = fragInstanceDir_->getGlobals(i);
if (glob)
{
myFixupPriority = glob->getMyFixupPriority();
glob->setCloseAllOpens(receivedRequest.closeAllOpens());
}
}
const ExFragKey &instKey =
fragInstanceDir_->instances_[i]->key_;
// alter the wild card key such that it uses a matching
// fragment id, but keeps its old process id and
// statement handle
wildCardKey.setFragId(instKey.getFragId());
// now, if the wild card key is equal to the key in this entry,
// it belongs to the process/statement to be released
if (wildCardKey == instKey)
fragInstanceDir_->releaseEntry(i);
}
}
}
else
{
// delete a particular fragment instance, indicated by the key and handle
if (currHandle_ != NullFragInstanceHandle)
{
glob = fragInstanceDir_->getGlobals(currHandle_);
if (glob)
{
myFixupPriority = glob->getMyFixupPriority();
glob->setCloseAllOpens(receivedRequest.closeAllOpens());
}
fragInstanceDir_->releaseEntry(currHandle_);
}
else
{
// $$$$ entry not found, can probably ignore this??? assert for now
ex_assert(FALSE,"Couldn't find frag entry to release");
}
}
if (receivedRequest.detachFromMaster_)
{
fragInstanceDir_->numMasters_--;
}
// change my priority back to my 'fixup' priority.
if (myFixupPriority > 0)
{
Lng32 rc =
ComRtSetProcessPriority(myFixupPriority, FALSE);
if (rc != 0)
{
// ignore error.
}
}
// how long to keep idle esp alive
environment_->setStopAfter(receivedRequest.idleTimeout_);
// the reply is handled by the caller
}
void ExEspControlMessage::actOnReqForSplitBottom(IpcConnection *connection)
{
NABoolean changePri = FALSE;
switch (getNextObjType())
{
case ESP_PARTITION_INPUT_DATA_HDR:
{
ExEspPartInputDataReqHeader receivedRequest(fragInstanceDir_->heap_);
*this >> receivedRequest;
currKey_ = receivedRequest.key_;
}
break;
case ESP_WORK_TRANSACTION_HDR:
{
ExEspWorkReqHeader receivedRequest(fragInstanceDir_->heap_);
*this >> receivedRequest;
currKey_ = receivedRequest.key_;
}
break;
case ESP_RELEASE_TRANSACTION_HDR:
{
ExEspReleaseWorkReqHeader receivedRequest(fragInstanceDir_->heap_);
*this >> receivedRequest;
currKey_ = receivedRequest.key_;
changePri = TRUE;
// how long to keep inactive esp alive
environment_->setInactiveTimeout(receivedRequest.inactiveTimeout_);
}
break;
default:
ex_assert(0,"Invalid object type for split bottom node received");
}
currHandle_ = fragInstanceDir_->findHandle(currKey_);
if (currHandle_ != NullFragInstanceHandle)
{
ex_split_bottom_tcb * splitBottom =
fragInstanceDir_->getTopTcb(currHandle_);
if (changePri)
{
// change my priority back to my 'fixup' priority.
ExEspStmtGlobals *glob = fragInstanceDir_->getGlobals(currHandle_);
IpcPriority myFixupPriority = glob->getMyFixupPriority();
if (myFixupPriority > 0)
{
Lng32 rc =
ComRtSetProcessPriority(myFixupPriority, FALSE);
if (rc != 0)
{
// ignore error.
}
// reset priority indication in global
glob->setMyFixupPriority(0);
}
}
if (splitBottom)
{
giveMessageTo(*splitBottom->getMessageStream(),connection);
// start another receive operation for the next request
receive(FALSE);
}
else
{
ex_assert(FALSE,"entry is in wrong state to receive part input");
}
}
else
{
ex_assert(FALSE,"entry to receive part. input not found");
}
// the reply is handled by the split bottom node
}
void ExEspControlMessage::incReplyMsg(Int64 msgBytes)
{
ExStatisticsArea *statsArea;
if (currHandle_ != NullFragInstanceHandle)
{
if ((statsArea = fragInstanceDir_->getGlobals(currHandle_)->getStatsArea()) != NULL)
statsArea->incReplyMsg(msgBytes);
}
}
// search for the active tcb in the fragment whose security key matches the one provided.
ex_split_bottom_tcb *ExEspFragInstanceDir::getExtractTop(const char *securityKey)
{
CollIndex numEntries = instances_.entries();
for (CollIndex i = 0; i < numEntries; i++)
{
ex_split_bottom_tcb *top = instances_[i]->localRootTcb_;
if (top && top->splitBottomTdb().getExtractProducerFlag())
{
if (!str_cmp_c(top->splitBottomTdb().getExtractSecurityKey(), securityKey))
return top;
}
}
return NULL;
}