blob: ad3a54e4d6dfc81f6f5d56d372dc4aed75ea2b53 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ExUdr.cpp
* Description: TDB/TCB for user-defined routines
*
* Created: 2/8/2000
* Language: C++
*
*****************************************************************************
*/
//
// ***TBD
// - Reviewers have noted that error handling following IPC errors may
// be too heavy-handed. The TCB places diagnostics into the statement
// globals area and often returns WORK_BAD_ERROR. It should be
// possible to associate the diags with an up queue entry and return
// all errors and diagnostics via the queues. One reason why this
// enhancement has not been made is that it is not clear when the
// server should be restarted following an IPC error. Currently the
// code lets the Statement object deallocate TCBs and redrive fixup in
// order to acquire a new UDR server. If work methods become
// responsible for restarting the server, I'm not sure how a work
// method would decide to initiate the restart.
//
#pragma nowarn(262) // warning elimination
#include "ex_stdh.h"
#include "ExUdr.h"
#include "ExUdrServer.h"
#include "ExUdrClientIpc.h"
#include "UdrExeIpc.h"
#include "ex_exe_stmt_globals.h"
#include "exp_attrs.h"
#include "ex_error.h"
#include "ExStats.h"
#include "ExCextdecs.h"
#include "UdrFormalParamInfo.h"
#include "udrtabledescinfo.h"
#include "Statement.h"
#include "ExRsInfo.h"
#include "Descriptor.h"
#define TF_STRING(x) ((x) ? ("TRUE") : ("FALSE"))
#define YN_STRING(x) ((x) ? ("YES") : ("NO"))
#ifdef UDR_DEBUG
static const char *GetExpStatusString(ex_expr::exp_return_type s)
{
switch (s)
{
case ex_expr::EXPR_OK: return "EXPR_OK";
case ex_expr::EXPR_ERROR: return "EXPR_ERROR";
case ex_expr::EXPR_TRUE: return "EXPR_TRUE";
case ex_expr::EXPR_FALSE: return "EXPR_FALSE";
case ex_expr::EXPR_NULL: return "EXPR_NULL";
default: return ComRtGetUnknownString((Int32) s);
}
}
static const char *GetWorkRetcodeString(ExWorkProcRetcode r)
{
switch (r)
{
case WORK_OK: return "WORK_OK";
case WORK_CALL_AGAIN: return "WORK_CALL_AGAIN";
case WORK_POOL_BLOCKED: return "WORK_POOL_BLOCKED";
case WORK_RESCHEDULE_AND_RETURN: return "WORK_RESCHEDULE_AND_RETURN";
case WORK_BAD_ERROR: return "WORK_BAD_ERROR";
default: return ComRtGetUnknownString((Int32) r);
}
}
static const char *GetUpStatusString(ex_queue::up_status s)
{
switch (s)
{
case ex_queue::Q_NO_DATA: return "Q_NO_DATA";
case ex_queue::Q_OK_MMORE: return "Q_OK_MMORE";
case ex_queue::Q_SQLERROR: return "Q_SQLERROR";
case ex_queue::Q_INVALID: return "Q_INVALID";
case ex_queue::Q_GET_DONE: return "Q_GET_DONE";
default: return ComRtGetUnknownString((Int32) s);
}
}
static void TransIdToText(Int64 transId, char *buf, Int32 len)
{
ex_assert(len > 100, "Buffer passed to TransIdToText is too small");
if (transId == -1)
{
sprintf(buf, "-1");
return;
}
short actualLen;
short error;
error = TRANSIDTOTEXT(transId, buf, (short) (len - 1), &actualLen);
if (error)
str_sprintf(buf, "(error %d)", (Int32) error);
else
buf[actualLen] = 0;
}
#define UdrDebug0(s) \
if (doTrace_) UdrPrintf(traceFile_,(s))
#define UdrDebug1(s,a1) \
if (doTrace_) UdrPrintf(traceFile_,(s),(a1))
#define UdrDebug2(s,a1,a2) \
if (doTrace_) UdrPrintf(traceFile_,(s),(a1),(a2))
#define UdrDebug3(s,a1,a2,a3) \
if (doTrace_) UdrPrintf(traceFile_,(s),(a1),(a2),(a3))
#define UdrDebug4(s,a1,a2,a3,a4) \
if (doTrace_) UdrPrintf(traceFile_,(s),(a1),(a2),(a3),(a4))
#define UdrDebug5(s,a1,a2,a3,a4,a5) \
if (doTrace_) UdrPrintf(traceFile_,(s),(a1),(a2),(a3),(a4),(a5))
#define UdrDebug6(s,a1,a2,a3,a4,a5,a6) \
if (doTrace_) UdrPrintf(traceFile_,(s),(a1),(a2),(a3),(a4),(a5),(a6))
#else
//
// Debug macros are no-ops in the release build
//
#define UdrDebug0(s)
#define UdrDebug1(s,a1)
#define UdrDebug2(s,a1,a2)
#define UdrDebug3(s,a1,a2,a3)
#define UdrDebug4(s,a1,a2,a3,a4)
#define UdrDebug5(s,a1,a2,a3,a4,a5)
#define printDataStreamState()
#endif // UDR_DEBUG
//
// UDR TCBs use this function to acquire process-wide unique UDR
// handles. JULIANTIMESTAMP(3) returns the number of microseconds
// since cold load. This value is not affected by setting the system
// clock. On Windows we use a static counter because JULIANTIMESTAMP is
// not always reliable due to occasional clock synchronization
// performed by the NonStop Cluster services.
//
static void InitializeUdrHandle(Int64 &handle)
{
static Int64 nextHandle = 0;
nextHandle++;
handle = nextHandle;
}
//----------------------------------------------------------------------
// ExUdrTdb methods
//----------------------------------------------------------------------
ex_tcb *ExUdrTdb::build(ex_globals *glob)
{
// first build the child TCBs
//
const ex_tcb **childTcbs = (const ex_tcb**) new (glob->getSpace()) ex_tcb* [numChildren()];
short i;
for (i=0; i<numChildren(); i++)
{
childTcbs[i] = childTdbs_[i]->build(glob);
}
ExUdrTcb *tcb = new (glob->getSpace()) ExUdrTcb(*this, childTcbs,glob);
return tcb;
}
//----------------------------------------------------------------------
// Allocate stats area for this tcb
//----------------------------------------------------------------------
ExOperStats * ExUdrTcb::doAllocateStatsEntry(CollHeap *heap,
ComTdb *tdb)
{
ExUDRBaseStats * stat = NULL;
ComTdb::CollectStatsType statsType = getGlobals()->getStatsArea()->getCollectStatsType();
ExUdrTdb * udrTdb = (ExUdrTdb *) tdb;
if (statsType == ComTdb::OPERATOR_STATS)
{
stat = new(heap) ExUDRBaseStats(heap,
this,
tdb);
}
else
{
stat = new(heap) ExUDRStats(heap,
udrTdb->getRequestSqlBufferSize(),
udrTdb->getReplySqlBufferSize(),
tdb,
this);
udrStats_ = (ExUDRStats *) stat;
}
udrBaseStats_ = stat;
return stat;
}
//----------------------------------------------------------------------
// ExUdrTcb methods
//----------------------------------------------------------------------
ExUdrTcb::ExUdrTcb(const ExUdrTdb &udrTdb,
const ex_tcb **childTcbs,
ex_globals *glob)
: ex_tcb(udrTdb, 1, glob),
state_(BUILD),
qParent_(),
workAtp_(NULL),
outputPool_(NULL),
inputPool_(NULL),
replyBuffer_(NULL),
requestBuffer_(NULL),
childInputBuffers_(NULL),
nextToSend_(0),
udrServer_(NULL),
dataStream_(NULL),
outstandingControlStream_(NULL),
udrHandle_(INVALID_UDR_HANDLE),
ioSubtask_(NULL),
serverProcessId_(),
rsHandle_(INVALID_RS_HANDLE),
rsIndex_(0),
rsInfo_(NULL),
childTcbs_(childTcbs),
qChild_(0),
udrBaseStats_(NULL),
udrStats_(NULL),
dataMsgsSent_(0),
continueMsgsSent_(0)
#ifdef UDR_DEBUG
, doTrace_(FALSE), doStateTrace_(FALSE), doIpcTrace_(FALSE)
, traceFile_(NULL)
, trustReplies_(FALSE)
#endif
{
#ifdef UDR_DEBUG
initializeDebugVariables();
#endif
UdrDebug1("[BEGIN TCB CONSTRUCTOR] %p", this);
ex_assert(getGlobals() && getGlobals()->castToExExeStmtGlobals(),
"Invalid statement globals pointer in ExUdrTcb constructor");
ExExeStmtGlobals *stmtGlobals = myExeStmtGlobals();
NABoolean isResultSet = udrTdb.isResultSetProxy();
if (isResultSet)
{
ExMasterStmtGlobals *masterStmtGlobals =
stmtGlobals->castToExMasterStmtGlobals();
ex_assert(masterStmtGlobals,
"Must be ExMasterStmtGlobals to support UDR result set");
masterStmtGlobals->acquireRSInfoFromParent(rsIndex_, udrHandle_,
udrServer_, serverProcessId_,
rsInfo_);
InitializeUdrHandle(rsHandle_);
ex_assert(RSHandleIsValid(rsHandle_),
"Invalid RS handle generated by ExUdrTcb constructor");
#ifdef UDR_DEBUG
char buf[300];
serverProcessId_.toAscii(buf, 300);
UdrDebug0(" *** This is a result set proxy");
UdrDebug2(" *** RS index %u, RS handle " INT64_PRINTF_SPEC,
rsIndex_, rsHandle_);
UdrDebug2(" *** ExUdrServer %p, process ID %s", udrServer_, buf);
#endif
}
else
{
InitializeUdrHandle(udrHandle_);
ex_assert(UdrHandleIsValid(udrHandle_),
"Invalid UDR handle generated by ExUdrTcb constructor");
}
UdrDebug1(" UDR handle: " INT64_PRINTF_SPEC, udrHandle_);
UdrDebug5(" %u params, %u input value%s, %u output value%s",
udrTdb.getNumParams(),
udrTdb.getNumInputValues(),
PLURAL_SUFFIX(udrTdb.getNumInputValues()),
udrTdb.getNumOutputValues(),
PLURAL_SUFFIX(udrTdb.getNumOutputValues()));
UdrDebug3(" Row length: %u request, %u reply, %u output",
udrTdb.getRequestRowLen(), udrTdb.getReplyRowLen(),
udrTdb.getOutputRowLen());
UdrDebug2(" SqlBuffer size: %u request, %u reply",
udrTdb.getRequestSqlBufferSize(), udrTdb.getReplySqlBufferSize());
UdrDebug1(" Statement globals %p", getGlobals());
UdrDebug1(" Transaction required: %s",
udrTdb.getTransactionAttrs() == COM_TRANSACTION_REQUIRED ?
"YES" : "NO");
if (!isResultSet)
{
UdrDebug1(" SQL name: %s", udrTdb.getSqlName());
ComRoutineParamStyle paramStyle = udrTdb.getParamStyle();
switch (paramStyle)
{
case COM_STYLE_JAVA_CALL:
UdrDebug0(" Parameter style JAVA (call)");
break;
case COM_STYLE_JAVA_OBJ:
UdrDebug0(" Parameter style JAVA (object)");
break;
case COM_STYLE_SQL:
UdrDebug0(" Parameter style SQL");
break;
case COM_STYLE_SQLROW:
UdrDebug0(" Parameter style SQLROW");
break;
case COM_STYLE_SQLROW_TM:
UdrDebug0(" Parameter style SQLROW_TM");
break;
case COM_STYLE_CPP_OBJ:
UdrDebug0(" Parameter style C++");
break;
default:
ex_assert(0, "Invalid PARAMETER STYLE");
break;
}
ComRoutineLanguage language = udrTdb.getLanguage();
switch (language)
{
case COM_LANGUAGE_JAVA:
UdrDebug0(" Language JAVA");
UdrDebug1(" Runtime options '%s'",
udrTdb.getRuntimeOptions() ?
udrTdb.getRuntimeOptions() : "");
UdrDebug1(" Option delimiters '%s'",
udrTdb.getRuntimeOptionDelimiters() ?
udrTdb.getRuntimeOptionDelimiters() : "");
break;
case COM_LANGUAGE_C:
UdrDebug0(" Language C");
break;
case COM_LANGUAGE_CPP:
UdrDebug0(" Language C++");
break;
default:
ex_assert(0, "Invalid LANGUAGE");
break;
}
} // if (!isResultSet)
Space *globSpace = getSpace();
CollHeap *globHeap = getHeap();
// Allocate output buffer pool
if (udrTdb.getOutputExpression() != NULL)
{
UdrDebug2(" Output pool: %u buffers of size %u",
(ULng32) udrTdb.getNumOutputBuffers(),
(ULng32) udrTdb.getOutputSqlBufferSize());
outputPool_ = new (globSpace)
sql_buffer_pool((Lng32) udrTdb.getNumOutputBuffers(),
(Lng32) udrTdb.getOutputSqlBufferSize(),
globSpace,
SqlBufferBase::NORMAL_);
}
// Allocate input buffer pool
if (udrTdb.getInputExpression() != NULL)
{
UdrDebug2(" Input pool: %u buffers of size %u",
(ULng32) udrTdb.getNumInputBuffers(),
(ULng32) udrTdb.getInputSqlBufferSize());
inputPool_ = new (globSpace)
sql_buffer_pool((Lng32) udrTdb.getNumInputBuffers(),
(Lng32) udrTdb.getInputSqlBufferSize(),
globSpace,
SqlBufferBase::NORMAL_);
}
if (numChildren())
{
childInputBuffers_ = (UdrDataBuffer **) new (globSpace) UdrDataBuffer *[numChildren()];
// Initialize them all to NULL
for (Int32 i = 0; i < numChildren(); i++)
childInputBuffers_[i] = NULL;
}
// Allocate queues to communicate with parent
allocateParentQueues(qParent_);
// if it is a tmudf tcb then allocate the child queues too
if (myTdb().isTmudf())
{
// Allocate array of child queue pairs (i.e., queue pairs for
// communicating with children.
//
qChild_ = new (globSpace) ex_queue_pair[numChildren()];
tmudfStates_ = new (globSpace) TmudfState[numChildren()];
// Initialize array.
//
Int32 i;
for (i=0; i<numChildren(); i++)
{
qChild_[i] = childTcbs_[i]->getParentQueue();
tmudfStates_[i] = INITIAL;
}
}
// During the lifetime of this TCB when down queue entries exist
// that have been sent to the UDR Server but for which not all up
// queue entries have been generated, qParent_.down->getHeadIndex()
// will not be equal to nextToSend_. We maintain this relationship
// by incrementing nextToSend_ each time a down queue entry is
// processed. An entry is considered "processed" when it gets copied
// into a message buffer that gets sent to the UDR Server, or when
// the copy step generates errors, or when we notice the entry has
// been cancelled before it was sent.
nextToSend_ = qParent_.down->getHeadIndex();
// Allocate the work ATP
if (udrTdb.getWorkCriDesc())
workAtp_ = allocateAtp(udrTdb.getWorkCriDesc(), globSpace);
// Fixup expressions
if (udrTdb.getInputExpression())
udrTdb.getInputExpression()->fixup(0, getExpressionMode(), this,
globSpace, globHeap, FALSE, glob);
if (udrTdb.getOutputExpression())
udrTdb.getOutputExpression()->fixup(0, getExpressionMode(), this,
globSpace, globHeap, FALSE, glob);
Int32 i;
for (i=0; i<numChildren(); i++)
{
udrTdb.getChildInputExpr(i)->fixup(0,getExpressionMode(),this,
globSpace, globHeap, FALSE, glob);
}
if (udrTdb.getPredicate())
udrTdb.getPredicate()->fixup(0, getExpressionMode(), this,
globSpace, globHeap, FALSE, glob);
// Register subtasks with the scheduler
registerSubtasks();
registerResizeSubtasks();
if (!isResultSet)
{
// We now create an ExUdrServer instance and store a pointer to it
// in this TCB. If the TDB does not contain runtime option strings
// (which would be true for a plan compiled prior to release 2), we
// establish meaningful default values for those strings here.
const char *options = udrTdb.getRuntimeOptions();
const char *delims = udrTdb.getRuntimeOptionDelimiters();
if (options == NULL || strlen(options) == 0)
options = "OFF";
if (delims == NULL || strlen(delims) == 0)
delims = " ";
NABoolean dedicated = FALSE;
if (myTdb().isTmudf())
dedicated = TRUE;
udrServer_ = stmtGlobals->acquireUdrServer(options, delims, dedicated);
}
ex_assert(udrServer_, "No ExUdrServer available for this TCB");
UdrDebug2(" This TCB will use ExUdrServer %p, ref count %d",
udrServer_, udrServer_->getRefCount());
UdrDebug1("[END TCB CONSTRUCTOR] %p\n", this);
} // ExUdrTcb::ExUdrTcb
ExUdrTcb::~ExUdrTcb()
{
UdrDebug1("[BEGIN TCB DESTRUCTOR] %p", this);
//
// Release resources acquired during fixup
//
freeResources();
//
// Release resources allocated by the constructor. The queues were
// allocated by calling ex_tcb::allocateParentQueues() but there is
// no deallocateParentQueues() function so we simply call
// destructors for the queues here.
//
delete qParent_.up;
delete qParent_.down;
delete outputPool_;
if (workAtp_)
{
workAtp_->release();
deallocateAtp(workAtp_, getSpace());
}
setUdrTcbState(DONE);
UdrDebug1("[END TCB DESTRUCTOR] %p\n", this);
} // ExUdrTcb::~ExUdrTcb()
//
// This function frees any resources acquired during fixup.
// It should only be called by the TCB destructor.
//
void ExUdrTcb::freeResources()
{
UdrDebug1(" [BEGIN TCB FREE RESOURCES] %p", this);
releaseControlStream();
releaseDataStream();
releaseServerResources();
releaseConnectionToServer();
UdrDebug1(" [END TCB FREE RESOURCES] %p", this);
}
ex_tcb_private_state *ExUdrTcb::allocatePstates(
Lng32 &numElems, // [IN/OUT] desired/actual elements
Lng32 &pstateLength) // [OUT] length of one element
{
PstateAllocator<ExUdrPrivateState> pa;
return pa.allocatePstates(this, numElems, pstateLength);
}
void ExUdrTcb::registerSubtasks()
{
ExScheduler *sched = getGlobals()->getScheduler();
// Cancel events are seen by workCancel()
sched->registerCancelSubtask(sWorkCancel, this, qParent_.down, "Cancel");
if (myTdb().isTmudf())
{
// Up queue events are seen by tmudfWork()
sched->registerUnblockSubtask(sTmudfWork, this, qParent_.up, "Up");
// Down queue events are seen by tmudfWork()
sched->registerInsertSubtask(sTmudfWork, this, qParent_.down, "Down");
// I/O events are seen by tmudfWork()
ioSubtask_ = sched->registerNonQueueSubtask(sTmudfWork, this, "I/O Events");
Int32 i;
for (i=0; i<numChildren(); i++)
{
sched->registerUnblockSubtask(sTmudfWork, this, qChild_[i].down,"Down");
sched->registerInsertSubtask(sTmudfWork, this, qChild_[i].up, " Up");
}
}
else
{
// Up queue events are seen by work()
sched->registerUnblockSubtask(sWork, this, qParent_.up, "Up");
// Down queue events are seen by work()
sched->registerInsertSubtask(sWork, this, qParent_.down, "Down");
// I/O events are seen by work()
ioSubtask_ = sched->registerNonQueueSubtask(sWork, this, "I/O Events");
}
}
//----------------------------------------------------------------------
// TCB accessor methods
//----------------------------------------------------------------------
ExExeStmtGlobals *ExUdrTcb::myExeStmtGlobals() const
{
return ((ExUdrTcb *) this)->getGlobals()->castToExExeStmtGlobals();
}
IpcEnvironment *ExUdrTcb::myIpcEnv() const
{
return myExeStmtGlobals()->getIpcEnvironment();
}
CollHeap *ExUdrTcb::getIpcHeap() const
{
return myIpcEnv()->getHeap();
}
ComDiagsArea *ExUdrTcb::getStatementDiags() const
{
return myExeStmtGlobals()->getDiagsArea();
}
void ExUdrTcb::setStatementDiags(ComDiagsArea *d) const
{
myExeStmtGlobals()->setGlobDiagsArea(d);
}
ComDiagsArea *ExUdrTcb::getOrCreateStmtDiags() const
{
ComDiagsArea *result = getStatementDiags();
if (result == NULL)
{
result = ComDiagsArea::allocate(((ExUdrTcb *)this)->getHeap());
setStatementDiags(result);
result->decrRefCount();
}
return result;
}
//----------------------------------------------------------------------
// TCB fixup
// Non-zero return value indicates errors
//----------------------------------------------------------------------
Int32 ExUdrTcb::fixup()
{
#ifdef UDR_DEBUG
initializeDebugVariables();
#endif // UDR_DEBUG
UdrDebug1("[BEGIN TCB FIXUP] %p", this);
setUdrTcbState(FIXUP);
//
// Non-zero return value indicates an error
//
const Int32 FIXUP_SUCCESS = 0;
const Int32 FIXUP_ERROR = 1;
// The result variable will have the value FIXUP_ERROR until we are
// sure the following are true
// - this TCB has a running server
// - a nowait LOAD request has been sent to the server
Int32 result = FIXUP_ERROR;
NABoolean isResultSet = myTdb().isResultSetProxy();
// The code is written to retry fixup for this TCB some number of
// times. Retrying fixup would allow us to send a LOAD message to a
// server that we think is running, then detect that the server has
// died, and then start a new server and send it the LOAD
// message. This sounds like a good idea but there is a catch. The
// MX IPC code will only detect that the server died when it calls
// AWAITIOX to complete IO. We do not detect a dead server when we
// are sending data. Once we call AWAITIOX to complete IO to a dead
// server we experience transaction abort (if there was a
// transaction active) and AWAITIOX returns error 201 which is
// FEPATHDOWN. Once our transaction is gone it does us no good to
// start a new server without first starting a new transaction and
// it is not this TCB's job to start transactions. Thus we are not
// going to retry fixup in this TCB. If bad things happen during the
// fixup phase then it is probably true that a transaction was
// aborted and that fixup should not be retried.
// To prevent fixup retries we set the numRetries variable to zero
// before entering the following for loop.
UInt32 numRetries = 0;
for (UInt32 i = 0; i <= numRetries && result != FIXUP_SUCCESS; i++)
{
if (state_ != FIXUP)
{
setUdrTcbState(FIXUP);
}
//
// We call ExUdrServer::start() to start a new UDR server if one
// is not already running. start() does nothing and returns
// EX_UDR_SUCCESS if one is already running.
//
ComDiagsArea *da = myExeStmtGlobals()->getDiagsArea();
CollHeap *diagsHeap = getHeap();
ComDiagsArea *originalDa = da;
// Determine whether the UDR control connection should carry
// transactional requests. Transactional requests are currently
// only needed in the master executor. UDFs running from ESPs do
// not send transactional requests.
NABoolean usesTransactions = TRUE;
if (myExeStmtGlobals()->castToExEspStmtGlobals())
usesTransactions = FALSE;
if (!isResultSet && udrStats_ != NULL)
{
Int64 serverInit = NA_JulianTimestamp();
udrStats_->setUDRServerInit(serverInit);
}
#ifdef UDR_DEBUG
if (doTrace_ && traceFile_)
udrServer_->setTraceFile(traceFile_);
#endif
// Determine the transid we should send in the message. We use the
// statement transid only if the TDB says we require a
// transaction.
Int64 &stmtTransId = myExeStmtGlobals()->getTransid();
Int64 transIdToSend = -1; // -1 is an invalid transid
if (myTdb().getTransactionAttrs() == COM_TRANSACTION_REQUIRED)
transIdToSend = stmtTransId;
// For CALL stmts, start UDR Server if it's not already started.
// For RS stmts, UDR Server must have been started by parent call.
ExUdrServer::ExUdrServerStatus status = ExUdrServer::EX_UDR_SUCCESS;
if (!isResultSet)
status = udrServer_->start(&da, diagsHeap, transIdToSend,
serverProcessId_, usesTransactions);
if (originalDa == NULL && da != NULL)
{
//
// This indicates that before the start() call our statement
// globals did not have a diags area but during the start()
// call one was created. We need a pointer to this new diags
// area in our statement globals. Note that the diags area can
// contain warnings only, so we need to do this even if start()
// was successful.
//
myExeStmtGlobals()->setGlobDiagsArea(da);
da->decrRefCount();
}
if (status == ExUdrServer::EX_UDR_SUCCESS)
{
#ifdef UDR_DEBUG
char buf[300];
serverProcessId_.toAscii(buf, 300);
UdrDebug1(" [FIXUP] This TCB will use server process %s", buf);
#endif // UDR_DEBUG
if (!isResultSet )
{
if (udrBaseStats_)
{
char bufForServer[300];
serverProcessId_.toAscii(bufForServer, 300);
udrBaseStats_->setUDRServerId( (char *) bufForServer, 300);
}
if (udrStats_)
{
Int64 serverStart = NA_JulianTimestamp();
udrStats_->castToExUDRStats()->setUDRServerStart(serverStart);
}
}
ExExeStmtGlobals *stmtGlobals = myExeStmtGlobals();
// If this is a CALL that produces result sets, then store
// information about this TCB in the statement's ExRsInfo object.
if (myTdb().getMaxResultSets() > 0)
{
ExMasterStmtGlobals *masterStmtGlobals =
stmtGlobals->castToExMasterStmtGlobals();
rsInfo_ = masterStmtGlobals->getResultSetInfo(TRUE);
ex_assert(rsInfo_, "No ExRsInfo object available");
rsInfo_->setNumEntries(myTdb().getMaxResultSets());
rsInfo_->setUdrHandle(udrHandle_);
rsInfo_->setUdrServer(udrServer_);
rsInfo_->setIpcProcessId(serverProcessId_);
}
// Set correct IpcConnection in stmt globals.
// For SPJ with RS, create a new connection. For SPJ that does
// not return RS, stmt globals do not store connection pointer, but
// stmt globals return control connection from udrServer_ on demand.
if (!isResultSet)
{
if (myTdb().getMaxResultSets() > 0)
{
IpcConnection *conn = udrServer_->getAnIpcConnection();
stmtGlobals->setUdrConnection(conn);
UdrDebug2(" TCB %p will use IPC Connection %p",
this, conn);
}
else
{
UdrDebug2(" TCB %p will use IPC Connection %p",
this, udrServer_->getUdrControlConnection());
}
}
if (sendControlMessage(isResultSet ? UDR_MSG_RS_LOAD : UDR_MSG_LOAD,
TRUE))
{
if (verifyUdrServerProcessId())
{
//
// Everything went as planned. We have a running server, the
// LOAD message was sent and following the send, the server
// process ID is still valid which should indicate that
// during the send the IPC layer did not detect an IPC
// error. The send() methods in our IPC stream and
// connection classes return void, therefore errors can
// occur during a send() without the caller of send() being
// notified, and that is why we do the
// verifyUdrServerProcessId() test following the send.
//
result = FIXUP_SUCCESS;
}
else
{
UdrDebug0(" ***");
UdrDebug0(" *** WARNING: LOAD message successfully sent but");
UdrDebug0(" *** an IPC error occurred or perhaps the");
UdrDebug0(" *** server is no longer running.");
UdrDebug0(" ***");
}
} // if (sendControlMessage())
} // if (status == ExUdrServer::EX_UDR_SUCCESS)
} // for i = 0 to numRetries
if (result == FIXUP_SUCCESS)
{
allocateDataStream();
result = super::fixup();
}
UdrDebug3("[END TCB FIXUP] %p. Return value is %d (%s)\n",
this, result, (result == FIXUP_SUCCESS ? "SUCCESS" : "ERROR"));
return result;
} // ExUdrTcb::fixup()
//----------------------------------------------------------------------
// TCB work methods
//----------------------------------------------------------------------
ExWorkProcRetcode ExUdrTcb::work()
{
#ifdef UDR_DEBUG
static Lng32 workCount = 0;
if (doTrace_)
{
workCount++;
FILE *f = traceFile_;
char text[256];
TransIdToText(myExeStmtGlobals()->getTransid(), text, 256);
ex_queue *up = qParent_.up;
ex_queue *dn = qParent_.down;
UdrPrintf(f, "[BEGIN WORK %d] %p", workCount, this);
UdrPrintf(f, " [WORK] Stmt tx %s", text);
UdrPrintf(f, " [WORK] TCB state %s", getUdrTcbStateString(state_));
UdrPrintf(f, " [WORK] Down queue: head %d, tail %d, len %d, size %d",
(Lng32) dn->getHeadIndex(), (Lng32) dn->getTailIndex(),
(Lng32) dn->getLength(), (Lng32) dn->getSize());
UdrPrintf(f, " [WORK] Up queue: head %d, tail %d, len %d, size %d",
(Lng32) up->getHeadIndex(), (Lng32) up->getTailIndex(),
(Lng32) up->getLength(), (Lng32) up->getSize());
UdrPrintf(f, " [WORK] Next to send: %d", (Lng32) nextToSend_);
printDataStreamState();
}
#endif // UDR_DEBUG
ExWorkProcRetcode result = WORK_OK;
result = checkReceive();
UdrDebug1(" [WORK] checkReceive() returned %s",
GetWorkRetcodeString(result));
if (result == WORK_OK)
{
result = checkSend();
UdrDebug1(" [WORK] checkSend() returned %s",
GetWorkRetcodeString(result));
}
if (result == WORK_OK)
{
result = continueRequest();
UdrDebug1(" [WORK] continueRequest() returned %s",
GetWorkRetcodeString(result));
}
if (qParent_.down->isEmpty())
{
UdrDebug0(" [WORK] Down queue is empty");
// There are no more parent requests to process. We now want to
// perform garbage collection of buffers in the data stream.
// There seems to be a bug in stream message processing code
// that moves the message from RECEIVE queue to IN USE queue only
// when you try to get the next message from the stream and it
// only garbage collects the IN USE queue.
// So we ask the stream for one more buffer even though we know
// there won't be one. By doing this we guarantee that all
// incoming buffers have moved off the stream's RECEIVE queue and
// on to the IN USE queue.
// If there does happen to be another buffer in the stream,
// that is an internal error.
//
// This cleanup technique is borrowed from the send top node (code
// is in ex_send_top.cpp) and fixes the memory leak reported in
// solution 10-050707-9503.
UdrDebug0(" [WORK] Releasing unused IPC buffers in the data stream...");
IpcMessageObjType msgType;
if (dataStream_ && dataStream_->getNextReceiveMsg(msgType))
{
char buf[256];
dataStream_->getNextObjType(msgType);
str_sprintf(buf, "An unexpected message of type %d arrived",
(Int32) msgType);
ex_assert(FALSE, buf);
}
dataStream_->releaseBuffers();
UdrDebug0(" [WORK] Done releasing IPC buffers");
} // if (qParent_.down->isEmpty())
#ifdef UDR_DEBUG
if (doTrace_)
{
FILE *f = traceFile_;
UdrPrintf(f, " [WORK] UDR TX messages outstanding: %d",
(Lng32) myExeStmtGlobals()->numUdrTxMsgsOut());
UdrPrintf(f, " [WORK] UDR Non-TX messages outstanding: %d",
(Lng32) myExeStmtGlobals()->numUdrNonTxMsgsOut());
UdrPrintf(f, " [WORK] TCB state: %s", getUdrTcbStateString(state_));
UdrPrintf(f, "[END WORK %d] %p. Return value is %s\n",
workCount, this, GetWorkRetcodeString(result));
}
#endif // UDR_DEBUG
return result;
}
NABoolean ExUdrTcb::anyOutstandingQueueRequests()
{
// Return TRUE if any down queue entries exist that have been sent
// to the UDR server but for which not all up queue entries have
// been generated.
return (nextToSend_ != qParent_.down->getHeadIndex());
}
ExWorkProcRetcode ExUdrTcb::checkSend()
{
ExWorkProcRetcode result = WORK_OK;
if (getIpcBroken())
{
UdrDebug0(" [WORK] IPC is broken");
return WORK_BAD_ERROR;
}
// We should not attempt to send data unless we are sure the server
// has already loaded this UDR.
if (!serverResourcesAreLoaded())
{
if (state_ == LOAD_FAILED)
{
UdrDebug0(" [WORK] Cannot continue. The LOAD request failed.");
result = WORK_BAD_ERROR;
}
else
{
UdrDebug0(" [WORK] Cannot continue. Server resources not loaded.");
result = WORK_OK;
}
return result;
}
if (!(qParent_.down->isEmpty()))
{
result = buildAndSendRequestBuffer();
}
return result;
} // ExUdrTcb::checkSend()
ExWorkProcRetcode ExUdrTcb::checkReceive()
{
// Work method helper function that attempts to process output rows
ExWorkProcRetcode result = WORK_OK;
NABoolean done = FALSE;
while (!done && result == WORK_OK
&& anyOutstandingQueueRequests() && !qParent_.up->isFull())
{
if (getIpcBroken())
{
UdrDebug0(" [WORK] IPC is broken");
result = WORK_BAD_ERROR;
}
else
{
ex_queue_entry *downEntry = qParent_.down->getHeadEntry();
ExUdrPrivateState &down_pstate =
*((ExUdrPrivateState *) downEntry->pstate);
switch (down_pstate.step_)
{
case CANCEL_BEFORE_SEND:
insertUpQueueEntry(ex_queue::Q_NO_DATA);
break;
case PRODUCE_ERROR_REPLY:
// An error occurred before the entry could be sent, probably
// during expression evaluation. If any diags were generated
// they were placed in the down entry ATP and will be copied
// to the up entry ATP by the following function.
insertUpQueueEntry(ex_queue::Q_SQLERROR);
down_pstate.step_ = PRODUCE_EOD_AFTER_ERROR;
break;
case PRODUCE_EOD_AFTER_ERROR:
// finish up error handling by sending an EOD
insertUpQueueEntry(ex_queue::Q_NO_DATA);
break;
case STARTED:
case CANCEL_AFTER_SEND:
{
if (getReplyBuffer() != NULL)
{
// Now we can return a single up queue entry. If the up
// queue entry is Q_NO_DATA then inside this call to
// returnSingleRow() the head of the down queue will be
// popped.
result = returnSingleRow();
if (replyBufferIsEmpty())
{
releaseReplyBuffer();
// It is possible that we are in the WORK_IO_ACTIVE
// state and conditions to transition back to WORK have
// now been met. For example we might have just finished
// processing all down queue entries that have been sent
// to the server. The following method checks the
// conditions and transitions to WORK if
// appropriate. See commentary in the method for more
// detail.
attemptTransitionToWorkState();
}
}
else
{
// check for errors from getReplyBuffer()
if (getStatementDiags() &&
getStatementDiags()->mainSQLCODE() < 0)
{
insertUpQueueEntry(ex_queue::Q_SQLERROR, getStatementDiags());
down_pstate.step_ = PRODUCE_EOD_AFTER_ERROR;
}
else
done = TRUE;
}
}
break;
case NOT_STARTED:
default:
ex_assert(FALSE, "Invalid step value in TCB down queue");
break;
} // switch (down_pstate.step_)
} // if (getIpcBroken()) else
} // while (!done && result == WORK_OK && outstanding requests ... )
return result;
} // ExUdrTcb::checkReceive()
//
// Insert a single entry into the up queue and optionally
// remove the head of the down queue
//
// Right now this function does not handle data rows, only error
// and end-of-data. It could possibly be extended to handle a data
// row. I have not looked at that closely enough yet.
//
NABoolean ExUdrTcb::insertUpQueueEntry(ex_queue::up_status status,
ComDiagsArea *diags)
{
if (qParent_.up->isFull())
return FALSE;
ex_queue_entry *upEntry = qParent_.up->getTailEntry();
ex_queue_entry *downEntry = qParent_.down->getHeadEntry();
ExUdrPrivateState &privateState =
*((ExUdrPrivateState *) downEntry->pstate);
// Initialize the up queue entry.
//
// copyAtp() will copy all tuple pointers and the diags area from
// the down queue entry to the up queue entry.
//
// When we return Q_NO_DATA if the match count is > 0:
// * assume down queue diags were returned with the Q_OK_MMORE entries
// * release down queue diags before copyAtp()
//
if (status == ex_queue::Q_NO_DATA && privateState.matchCount_ > 0)
{
downEntry->setDiagsArea(NULL);
upEntry->copyAtp(downEntry);
}
else
{
upEntry->copyAtp(downEntry);
downEntry->setDiagsArea(NULL);
}
upEntry->upState.status = status;
upEntry->upState.parentIndex = downEntry->downState.parentIndex;
upEntry->upState.downIndex = qParent_.down->getHeadIndex();
upEntry->upState.setMatchNo(privateState.matchCount_);
// Move any diags to the up queue entry
if (diags != NULL)
{
ComDiagsArea *atpDiags = upEntry->getDiagsArea();
if (atpDiags == NULL)
{
// setDiagsArea() does not increment the reference count
upEntry->setDiagsArea(diags);
diags->incrRefCount();
}
else
{
atpDiags->mergeAfter(*diags);
// errors have been reported, clear the passed-in diags
diags->clear();
}
}
if (status == ex_queue::Q_SQLERROR &&
myTdb().isTmudf())
{
// cancel child requests, if necessary
tmudfCancelChildRequests(qParent_.down->getHeadIndex());
}
// Insert into up queue
qParent_.up->insert();
// Remove the head of the down queue if we are done
if (status == ex_queue::Q_NO_DATA)
{
privateState.init();
qParent_.down->removeHead();
}
return TRUE;
}
ExWorkProcRetcode ExUdrTcb::buildAndSendRequestBuffer()
{
UdrDebug1(" [WORK] BEGIN ExUdrTcb::buildAndSendRequestBuffer() 0x%08x",
this);
ExWorkProcRetcode result = WORK_OK;
ULng32 numErrors = 0;
ULng32 numCancels = 0;
NABoolean isResultSet = myTdb().isResultSetProxy();
// Determine the transid we should send in data messages. We
// use the statement transid if all the following are true
// a. the TDB says we require a transaction
// b. this is NOT a CALL that returns result sets
// c. this is NOT a result set
//
// Note: (b) and (c) are both indicated by rsInfo_ == NULL
//
Int64 &stmtTransId = myExeStmtGlobals()->getTransid();
Int64 transIdToSend = -1; // -1 is an invalid transid
if (dataRequestsAreTransactional())
transIdToSend = stmtTransId;
// Process entries from the down queue. nextToSend_
// will be incremented inside the loop following successful
// insertion of a request row into the request buffer.
// state_ will be WORK until a request message is sent and
// then state_ will become WORK_IO_ACTIVE.
while (result == WORK_OK && state_ == WORK
&& qParent_.down->entryExists(nextToSend_))
{
// Two early tests to see if we should get out of the outer while
// loop. First make sure a server is running then attempt to
// allocate a request buffer.
//
// 1. Make sure we have a data stream and a running server
if (dataStream_ == NULL || !verifyUdrServerProcessId())
{
result = WORK_BAD_ERROR;
break;
}
// 2. Allocate a request buffer on the message stream heap. Make
// sure stream buffer limits have not been reached. Break out of
// the outer while loop if the allocation fails.
if (requestBuffer_ == NULL)
{
// Free up any receive buffers no longer in use
dataStream_->cleanupBuffers();
// Now test the stream buffer limits. If the send queue is full
// we return WORK_OK and let the operator wake up again when I/O
// completes. If other limits have been reached we return
// WORK_POOL_BLOCKED which causes the operator to be scheduled
// again right away.
if (dataStream_->sendLimitReached())
{
UdrDebug0(" [WORK] Cannot allocate request buffer");
UdrDebug0(" sendLimitReached() returned TRUE");
printDataStreamState();
result = WORK_OK;
}
else if (dataStream_->inUseLimitReached())
{
UdrDebug0(" [WORK] Cannot allocate request buffer");
UdrDebug0(" inUseLimitReached() returned TRUE");
printDataStreamState();
result = WORK_POOL_BLOCKED;
}
else
{
requestBuffer_ = getRequestBuffer();
if (requestBuffer_ == NULL)
{
result = WORK_POOL_BLOCKED;
printDataStreamState();
}
}
} // if (requestBuffer_ == NULL)
if (requestBuffer_ == NULL)
break;
SqlBuffer *sqlBuf = requestBuffer_->getSqlBuffer();
ex_assert(sqlBuf, "UDR request buffer is corrupt or contains no data");
ULng32 numRequestsAdded = 0;
// Move as many rows as possible into the request buffer
//
// *** IMPORTANT: This inner loop must not do any I/O, must not
// cause a call to setUdrTcbState(), and must not cause the value
// of dataStream_ to become non-null. If this is done then the
// conditions guaranteed by the outer while loop will still be
// valid following this inner while loop.
NABoolean sendSpaceAvailable = TRUE;
while (sendSpaceAvailable && qParent_.down->entryExists(nextToSend_))
{
ex_queue_entry *downEntry = qParent_.down->getQueueEntry(nextToSend_);
const ex_queue::down_request request = downEntry->downState.request;
const Lng32 value = downEntry->downState.requestValue;
ExUdrPrivateState &pstate = *((ExUdrPrivateState *) downEntry->pstate);
NABoolean anyErrors = FALSE;
switch (request)
{
case ex_queue::GET_N:
case ex_queue::GET_ALL:
{
// First step is to allocate space in the SqlBuffer. After
// that a move expression will be evaluated to copy data into
// the buffer. If the expression results in errors then we need
// to back-out the buffer allocation, which may have been a
// single data tupp or possibly control + data. So that the
// correct number are backed out, we will remember how many
// tupps are in the buffer before the allocation takes place.
Lng32 numTuppsBefore = sqlBuf->getTotalTuppDescs();
tupp_descriptor *dataDesc =NULL;
if (sqlBuf->moveInSendOrReplyData(
TRUE, // [IN] sending? (vs. replying)
FALSE, // [IN] force move of control info?
TRUE, // [IN] move data?
&(downEntry->downState), // [IN] queue state
sizeof(ControlInfo), // [IN] length of ControlInfo area
NULL, // [OUT] new ControlInfo area
myTdb().getRequestRowLen(), // [IN] data row length
&dataDesc, // [OUT] new data tupp_descriptor
NULL, // [IN] diags area
NULL // [OUT] new diags tupp_descriptor
) == SqlBuffer::BUFFER_FULL)
{
// No more space in request buffer. The assertion here says that
// we have at least inserted one row into the message buffer.
ex_assert(numRequestsAdded > 0,
"Request buffer not large enough to hold a single row");
sendSpaceAvailable = FALSE;
}
else
{
ex_expr::exp_return_type expStatus = ex_expr::EXPR_OK;
if (myTdb().getInputExpression())
{
workAtp_->getTupp(myTdb().getRequestTuppIndex()) = dataDesc;
// Evaluate the input expression. If diags are generated they
// will be left in the down entry ATP.
expStatus =
myTdb().getInputExpression()->eval(downEntry->getAtp(),
workAtp_);
#ifdef UDR_DEBUG
if (expStatus != ex_expr::EXPR_OK)
UdrDebug1(" [WORK] inputExpr.eval() returned %s",
GetExpStatusString(expStatus));
#endif
workAtp_->getTupp(myTdb().getRequestTuppIndex()).release();
if (expStatus == ex_expr::EXPR_ERROR)
{
while (sqlBuf->getTotalTuppDescs() > numTuppsBefore)
{
sqlBuf->remove_tuple_desc();
}
pstate.step_ = ExUdrTcb::PRODUCE_ERROR_REPLY;
numErrors++;
}
} // if (myTdb().getInputExpression())
if (expStatus != ex_expr::EXPR_ERROR)
{
pstate.step_ = ExUdrTcb::STARTED;
numRequestsAdded++;
}
// Move to the next down entry
nextToSend_++;
} // if (no space in the request buffer) else
} // case GET_N, GET_ALL
break;
case ex_queue::GET_NOMORE:
{
pstate.step_ = ExUdrTcb::CANCEL_BEFORE_SEND;
numCancels++;
nextToSend_++;
}
break;
case ex_queue::GET_EMPTY:
case ex_queue::GET_EOD:
default:
{
ex_assert(FALSE, "Invalid request type in TCB down queue");
nextToSend_++;
}
break;
} // switch (request)
} // while (sendSpaceAvailable && qParent_.down->entryExists(nextToSend_))
UdrDebug2(" [WORK] %u request%s added to the request buffer",
numRequestsAdded, PLURAL_SUFFIX(numRequestsAdded));
UdrDebug2(" [WORK] %u error%s encountered",
numErrors, PLURAL_SUFFIX(numErrors));
UdrDebug2(" [WORK] %u %s cancelled before being sent",
numCancels, (numCancels == 1 ? "entry" : "entries"));
if (sqlBuf->getTotalTuppDescs() > 0)
{
// If we enter this block then it's time to send a data request
#ifdef UDR_DEBUG
// Print a message if tracing is enabled or the
// UDR_CONTINUE_DEBUG environment variable is set
if (doTrace_ ||
doIpcTrace_ ||
(getenv("UDR_CONTINUE_DEBUG") != NULL))
{
char stmtTx[256];
char msgTx[256];
TransIdToText(stmtTransId, stmtTx, 256);
TransIdToText(transIdToSend, msgTx, 256);
UdrPrintf(traceFile_ ? traceFile_ : stdout,
" [WORK] Sending %s, %d tupps, msg tx %s",
isResultSet ? "RS INVOKE" : "INVOKE",
(Lng32) sqlBuf->getTotalTuppDescs(), msgTx);
}
#endif
sqlBuf->drivePack();
setUdrTcbState(WORK_IO_ACTIVE);
if (udrStats_)
{
udrStats_->bufferStats()->sentBuffers().addEntry(sizeof(*requestBuffer_) +
sqlBuf->get_used_size());
udrStats_->bufferStats()->totalSentBytes() += sqlBuf->getSendBufferSize();
}
// If this is a CALL that produces result sets or a result set
// (both are indicated by a non-NULL rsInfo_ pointer) then we
// may need to bring the UDR server into the current transaction
// before sending the data request.
if (rsInfo_)
rsInfo_->enterUdrTx(*myExeStmtGlobals());
if (dataRequestsAreTransactional())
myExeStmtGlobals()->incrementUdrTxMsgsOut();
else
myExeStmtGlobals()->incrementUdrNonTxMsgsOut();
dataStream_->sendRequest(transIdToSend);
dataMsgsSent_++;
UdrDebug0(" [WORK] The data stream has returned control to the TCB");
// Now that sendRequest() has completed the request buffer is in
// packed form and owned by the stream. This TCB should not
// access the request buffer again so we release it now.
releaseRequestBuffer();
UdrDebug0(" [WORK] Done sending the request buffer");
} // if (sqlBuf->getTotalTuppDescs() > 0)
} // while (result == WORK_OK && state_ == WORK &&
// qParent_.down->entryExists(nextToSend_))
if (result == WORK_OK)
{
if (getIpcBroken())
{
result = WORK_BAD_ERROR;
}
}
//
// If errors were encountered then there may be down queue
// entries that were not sent but still need to be processed.
// Changing the return code to WORK_CALL_AGAIN makes sure the
// scheduler redrives this TCB.
//
if (result == WORK_OK && (numErrors > 0 || numCancels > 0))
{
result = WORK_CALL_AGAIN;
}
UdrDebug1(" [WORK] END ExUdrTcb::buildAndSendRequestBuffer() %p",
this);
return result;
} // ExUdrTcb::buildAndSendRequestBuffer()
//
// Stream callbacks use this function to report arrival of a reply to
// a LOAD message
//
void ExUdrTcb::reportLoadReply(NABoolean loadWasSuccessful)
{
UdrDebug2(" TCB %p received a LOAD reply. Handle " INT64_PRINTF_SPEC,
this, udrHandle_);
UdrDebug1(" The load %s successful",
loadWasSuccessful ? "was" : "was not");
ex_assert(state_ == SENDING_LOAD, "Unexpected arrival of a LOAD reply");
releaseControlStream();
if (loadWasSuccessful)
{
setUdrTcbState(WORK);
}
else
{
// If we know now that the JVM did not start then we stop the
// running server now. JVM failures are indicated by error -11202
// (LME_JVM_INIT_ERROR in langman/LmError.h).
ComDiagsArea *d = getStatementDiags();
if (d && d->contains(-11202))
{
udrServer_->setState(ExUdrServer::EX_UDR_BROKEN);
setUdrTcbState(IPC_ERROR);
}
else
{
setUdrTcbState(LOAD_FAILED);
}
}
tickleSchedulerWork();
}
//
// ExUdrTcb::reportIpcError()
//
// Stream callbacks use this function to report IPC errors. When conn
// is not NULL then an IPC failure occurred on the connection. If conn
// is NULL then some other type of error occurred that prevents the
// TCB from continuing its work. For example, if a LOAD reply reports
// that the LOAD was not successful then the TCB cannot continue. The
// stream callback will first move diags from the stream into the TCB
// statement globals area and then call this function with conn set to
// NULL.
//
// The technique of putting diags directly into the statement globals
// area may not be a good long-term solution but works fine for
// single-row CALL statements. It may be better to associate diags
// with an up queue entry and avoid returning WORK_BAD_ERROR to the
// scheduler. However, one effect of returning WORK_BAD_ERROR is that
// the statement does not get used again until it is deallocated and
// re-fixed up. This is a desirable effect because it guarantees that
// a new server gets started. All these pros and cons will have to be
// considered if we want to change the error handling strategy for
// this TCB.
//
void ExUdrTcb::reportIpcError(IpcMessageStreamBase *stream,
IpcConnection *conn)
{
UdrDebug1(" [BEGIN ExUdrTcb::reportIpcError(IpcConnection %p)]",
conn);
//
// Other than lots of debug-build printf calls this method does
// the following:
// - If the TCB is not already in the IPC_ERROR state:
// - Create diagnostics
// - Kill the server if we received an interrupt signal
// - Transition to the IPC_ERROR state
// - Release all message streams
// - Schedule the TCB
//
if (!getIpcBroken())
{
ComDiagsArea *diags = getOrCreateStmtDiags();
CollHeap *diagsHeap = getHeap();
NABoolean udrErrorAlreadyInDiags = FALSE;
if (diags->contains(-EXE_UDR_REPLY_ERROR))
udrErrorAlreadyInDiags = TRUE;
if (conn)
{
#ifdef UDR_DEBUG
UdrDebug1(" IPC error on connection %p", conn);
if (conn->castToGuaConnectionToServer())
{
GuaErrorNumber guardianError =
conn->castToGuaConnectionToServer()->getGuardianError();
UdrDebug1(" Guardian error number is %d", (Lng32) guardianError);
}
#endif // UDR_DEBUG
// Merge the connection's diagnostics with the main diags area
// for this TCB
if (!udrErrorAlreadyInDiags)
conn->populateDiagsArea(diags, diagsHeap);
} // if (conn)
// Even if a connection was passed in and it has diagnostics, they
// are generic and do not say anything UDR-specific. Here we add a
// simple diagnostic that says the error is an MXUDR failure.
if (!udrErrorAlreadyInDiags)
*diags << DgSqlCode(-EXE_UDR_REPLY_ERROR);
// Notify ExUdrServer that we have seen an error if the stored
// serverProcessId_ matches that of ExUdrServer's. They will be
// different if ExUdrServer has already started new process after
// an error.
if (udrServer_->getServerProcessId() == serverProcessId_)
udrServer_->setState(ExUdrServer::EX_UDR_BROKEN);
setUdrTcbState(IPC_ERROR);
} // if (!getIpcBroken())
//
// We can release an outstanding control stream because doing so
// does not actually delete the object. Instead it goes on the
// IpcEnvironment's completed list.
//
releaseControlStream();
tickleSchedulerWork();
UdrDebug0(" [END ExUdrTcb::reportIpcError()]");
} // ExUdrTcb::reportIpcError()
void ExUdrTcb::deallocateMessage(UdrMessageObj *m)
{
UdrDebug2(" ExUdrTcb %p deallocating message object %p",
this, m);
if (m)
{
//
// UDR messages will be deallocated from a heap when the reference
// count drops to zero, not by calling a destructor.
//
m->decrRefCount();
}
}
void ExUdrTcb::releaseControlStream()
{
if (outstandingControlStream_)
{
outstandingControlStream_->delinkTcb(this);
outstandingControlStream_ = NULL;
}
}
void ExUdrTcb::releaseDataStream()
{
/* The following code is added for memory leak for RS Continue Reply
message. The dataStream_->getNextReceiveMsg() function will put
the RS Continue Reply message in inUseBufList_. So when the
destructor for datastream_ is called, the cleanupBuffers() will
delete this message from the inUseBufList_.
But if the ExUdrTcb is in IPC_ERROR state_, we are not executing
this while loop. It is possible that there might be a small
memory leak for error messages. If we get in the while loop for
IPC_ERROR state, we are hanging in the while loop for ever.
So we are ignoring that one right now.
*/
// $$$$ THE FOLLOWING LOOP IS COMMENTED OUT BECAUSE IT CAUSES A HANG
// FOR UDFS. THIS SHOULD BE CORRECTED BEFORE WE RELEASE UDFS. THE
// HANG HAPPENED WHEN THERE WERE TWO UDR OPERATORS IN THE SAME
// FRAGMENT AND FOR ONE OPERATOR THE MXUDR LOAD FAILED. TCB
// DESTRUCTORS WERE CALLED BEFORE THE "GOOD" UDR OPERATOR PROCESSED
// ITS REPLY DATA. INTERNALLY IN THE "GOOD" DATA STREAM, A REPLY
// BUFFER HAD NOT MOVED FROM THE RECV QUEUE TO THE IN INUSE QUEUE
// AND THIS WHILE LOOP KEPT EXECUTING WHILE THE BUFFER STAYED STUCK
// ON THE RECV QUEUE.
#if 0
if (state_ != IPC_ERROR)
{
IpcMessageObjType msgType;
while (dataStream_ && dataStream_->getNextReceiveMsg(msgType))
{
;
}
}
#endif
releaseRequestBuffer();
releaseReplyBuffer();
if (dataStream_)
{
UdrDebug1("ExUdrTcb %p releasing its data stream", this);
dataStream_->delinkTcb(this);
dataStream_ = NULL;
}
}
void ExUdrTcb::releaseConnectionToServer()
{
ExExeStmtGlobals *stmtGlobals = myExeStmtGlobals();
IpcConnection *conn = stmtGlobals->getUdrConnection();
if(myTdb().isTmudf())
{
conn = udrServer_->getUdrControlConnection();
}
// RSs share connection with parent CALL statement. So no need to
// release connection for RSs. Connection will be released in
// parent CALL's context.
if (conn && (myTdb().getMaxResultSets() > 0))
udrServer_->releaseConnection(conn);
}
//
// Function called during fixup to create the data stream.
// Should not be called except during fixup, and only after
// a UDR server is believed to be running.
//
void ExUdrTcb::allocateDataStream()
{
UdrDebug1(" [BEGIN ExUdrTcb::allocateDataStream()] %p", this);
ExExeStmtGlobals *stmtGlobals = myExeStmtGlobals();
IpcConnection *conn = stmtGlobals->getUdrConnection();
if(myTdb().isTmudf())
{
conn = udrServer_->getUdrControlConnection();
}
if (!dataStream_ && conn)
{
IpcMessageObjSize maxSqlBufSize = 0;
IpcMessageObjSize maxBufSize = 0;
maxSqlBufSize = MAXOF(myTdb().getRequestSqlBufferSize(),
myTdb().getReplySqlBufferSize());
NABoolean isResultSet = myTdb().isResultSetProxy();
if (isResultSet)
maxBufSize += sizeof(UdrRSDataHeader);
else if (myTdb().isTmudf())
maxBufSize +=sizeof(UdrTmudfDataHeader);
else
maxBufSize += sizeof(UdrDataHeader);
IpcMessageBuffer::alignOffset(maxBufSize);
maxBufSize += sizeof(UdrDataBuffer);
IpcMessageBuffer::alignOffset(maxBufSize);
maxBufSize += maxSqlBufSize;
IpcMessageBuffer::alignOffset(maxBufSize);
// maxBufSize now represents the number of bytes in a request or
// in a reply, whichever is larger. We want the stream to use this
// size as its default buffer size. But when diags are returned in
// a reply, the reply can become bigger. We will create the
// stream's internal buffers with room for the request or reply,
// and a pad for diags. Doing this does not increase the actual
// message size. It just allocates additional memory for the
// target buffers used by the stream for I/O. These buffers are
// where requests are buffered and where replies are written. By
// making them large enough we avoid going into chunking mode when
// diags are returned in a reply.
IpcMessageObjSize pad = 1000;
#ifdef UDR_DEBUG
//
// We can set the pad size with an environment variable in the
// debug build.
//
char *val;
if ((val = getenv("UDR_BUFFER_PAD")))
{
pad = atol(val);
}
UdrDebug3(" Stream buffer size: %d data + %d pad = %d total",
(Lng32) maxBufSize, (Lng32) pad, (Lng32) maxBufSize + pad);
#endif // UDR_DEBUG
NABoolean isTransactional = dataRequestsAreTransactional();
dataStream_ = new (getIpcHeap()) UdrClientDataStream (
myIpcEnv(),
1, // sendBufferLimit
2, // inUseBufferLimit
maxBufSize + pad, // default buffer size
this,
myExeStmtGlobals(),
isTransactional);
#ifdef UDR_DEBUG
if (doIpcTrace_ && traceFile_)
dataStream_->setTraceFile(traceFile_);
#endif
dataStream_->addRecipient(conn);
#ifdef UDR_DEBUG
char buf[300];
serverProcessId_.toAscii(buf, 300);
UdrDebug2(" Attaching buffered data stream %p to connection %d",
dataStream_, (Lng32) conn->getId());
UdrDebug1(" Server process is %s", buf);
UdrDebug1(" [END ExUdrTcb::allocateDataStream()] %p", this);
#endif // UDR_DEBUG
}
}
void ExUdrTcb::releaseServerResources()
{
if (state_ == SENDING_LOAD || state_ == LOAD_FAILED
|| serverResourcesAreLoaded())
{
UdrDebug1(" ExUdrTcb %p is releasing server resources", this);
NABoolean isResultSet = myTdb().isResultSetProxy();
if (isResultSet)
{
UdrDebug0(" An RS UNLOAD message is about to be sent");
sendControlMessage(UDR_MSG_RS_UNLOAD, FALSE);
}
else
{
UdrDebug0(" An UNLOAD message is about to be sent");
sendControlMessage(UDR_MSG_UNLOAD, FALSE);
}
}
}
// This function is used to send any control messages. Right now there are
// the following types:
// - LOAD is sent during fixup for UDR invocations
// - RS LOAD is sent during fixup for a stored procedure result set proxy
// statement
// - UNLOAD or RS UNLOAD, whichever is appropriate, is sent during teardown
// by freeResources().
// - RS CLOSE is sent by the RS operator when down queue entries are
// cancelled.
NABoolean ExUdrTcb::sendControlMessage(UdrIpcObjectType t,
NABoolean callbackRequired)
{
// Determine the transid we should send in the message. We
// use the statement transid if all the following are true
// a. the TDB says we require a transaction
// b. the message type is UDR_MSG_LOAD or UDR_MSG_UNLOAD
//
// Note:
// * other control messages such as RS LOAD and RS UNLOAD do not
// require a transaction since they travel within the scope of
// an ENTER/EXIT TX pair.
// * when the statement is being deallocated after the transaction
// ended, UNLOAD will be sent without a transid since there is no
// longer a transid in stmt globals.
Int64 &stmtTransId = myExeStmtGlobals()->getTransid();
Int64 transIdToSend = -1; // -1 is an invalid transid
if (myTdb().getTransactionAttrs() == COM_TRANSACTION_REQUIRED &&
(t == UDR_MSG_LOAD || t == UDR_MSG_UNLOAD))
{
transIdToSend = stmtTransId;
}
#ifdef UDR_DEBUG
{
char stmtTx[256];
char msgTx[256];
TransIdToText(stmtTransId, stmtTx, 256);
TransIdToText(transIdToSend, msgTx, 256);
UdrDebug1(" [BEGIN ExUdrTcb::sendControlMessage()] 0x%08x", this);
UdrDebug2(" stmt tx %s, msg tx %s", stmtTx, msgTx);
}
#endif
ex_assert(outstandingControlStream_ == NULL,
"Sending a UDR control message while one is already outstanding");
UdrMessageObj *msg = NULL;
UdrClientControlStream *s = NULL;
NAMemory *h = getIpcHeap();
const ExUdrTdb &udrTdb = myTdb();
UdrTcbState nextState = state_;
// This switch statement builds a control message object of
// the correct type
switch (t)
{
case UDR_MSG_LOAD:
{
releaseServerResources();
setUdrTcbState(SENDING_LOAD);
const char *parentQid;
if (myExeStmtGlobals()->castToExMasterStmtGlobals())
parentQid = myExeStmtGlobals()->castToExMasterStmtGlobals()->
getStatement()->getUniqueStmtId();
else if (myExeStmtGlobals()->castToExEspStmtGlobals())
parentQid = myExeStmtGlobals()->castToExEspStmtGlobals()->
getQueryId();
else
parentQid = "";
// set the instance number/numInstances
Lng32 numInstances = myExeStmtGlobals()->getNumOfInstances();
Lng32 instanceNum = myExeStmtGlobals()->getMyInstanceNumber();
if (numInstances == 0)
numInstances = 1; // I compute, therefore I am.
// Steps to create a LOAD message:
// - First create a UdrLoadMsg instance on the IPC heap and initialize
// it with a UDR handle and UDR metadata
// - Next put metadata for each formal parameter into the object
UdrLoadMsg *loadMsg = new (h) UdrLoadMsg(
h,
udrTdb.getSqlName(),
udrTdb.getRoutineName(),
udrTdb.getSignature(),
udrTdb.getContainerName(),
udrTdb.getPathName(),
udrTdb.getLibrarySqlName(),
udrTdb.getTransactionAttrs(),
udrTdb.getSqlAccessMode(),
udrTdb.getLanguage(),
udrTdb.getParamStyle(),
udrTdb.getExternalSecurity(),
udrTdb.getMaxResultSets(),
udrTdb.getNumParams(),
udrTdb.getNumInputValues(),
udrTdb.getNumOutputValues(),
udrTdb.getRequestSqlBufferSize(),
udrTdb.getReplySqlBufferSize(),
udrTdb.getRequestRowLen(),
udrTdb.getReplyRowLen(),
udrTdb.getUdrFlags(),
udrTdb.getRoutineOwnerId(),
parentQid,
udrTdb.udrSerInvocationInfoLen_,
udrTdb.udrSerInvocationInfo_,
udrTdb.udrSerPlanInfoLen_,
udrTdb.udrSerPlanInfo_,
udrTdb.getJavaDebugPort(),
udrTdb.getJavaDebugTimeout(),
instanceNum,
numInstances
);
loadMsg->setHandle(udrHandle_);
// This loop will insert parameter metadata into the LOAD message.
// The loop iterates over a single list of parameters. But inside
// the LOAD message two lists are maintained, one for input and
// one for output. To keep track of the current position in the
// input and output lists the inPosition and outPosition counters
// are used.
ComUInt32 i;
ComUInt32 inPosition = 0;
ComUInt32 outPosition = 0;
for (i = 0; i < udrTdb.getNumParams(); i++)
{
const UdrFormalParamInfo &formal = *udrTdb.getFormalParamInfo(i);
const char *paramName = formal.getParamName();
ComUInt16 paramNameLen = (ComUInt16) str_len(paramName);
ComSInt16 fsType = formal.getType();
SQLTYPE_CODE ansiType =
(SQLTYPE_CODE) Descriptor::ansiTypeFromFSType(fsType);
if (formal.isIn())
{
// Cannot use a const reference to the Attributes objects because
// Attributes accessors are not const methods
Attributes &actual = *(udrTdb.getRequestAttr(inPosition));
UdrParameterInfo info (
i, // Formal: position
formal.getFlags(), // flags
fsType, // FS type
ansiType, // ANSI type
paramNameLen, // length of param name
paramName, // param name
formal.getPrecision(), // prec
formal.getScale(), // scale
formal.getEncodingCharSet(), // encoding charset
formal.getCollation(), // collation
actual.getLength(), // Actual: dataLength
actual.getNullIndicatorLength(), // nullIndicatorLength
actual.getNullIndOffset(), // nullIndicatorOffset
actual.getOffset(), // dataOffset
actual.getVCLenIndOffset(), // varchar ind offset
actual.getVCIndicatorLength() // varchar ind length
);
loadMsg->setInParam(inPosition, info);
inPosition++;
} // if (formal.isIn())
if (formal.isOut())
{
Attributes &actual = *(udrTdb.getReplyAttr(outPosition));
UdrParameterInfo info (
i, // Formal: position
formal.getFlags(), // flags
fsType, // FS type
ansiType, // ANSI type
paramNameLen, // length of param name
paramName, // param name
formal.getPrecision(), // prec
formal.getScale(), // scale
formal.getEncodingCharSet(), // encoding charset
formal.getCollation(), // collation
actual.getLength(), // Actual: dataLength
actual.getNullIndicatorLength(), // nullIndicatorLength
actual.getNullIndOffset(), // nullIndicatorOffset
actual.getOffset(), // dataOffset
actual.getVCLenIndOffset(), // varchar ind offset
actual.getVCIndicatorLength() // varchar ind length
);
loadMsg->setOutParam(outPosition, info);
outPosition++;
} // if (formal.isOut())
} // for (i = 0; i < udrTdb.getNumParams(); i++)
// Add optional data buffers to the load message
Queue *q = udrTdb.getOptionalData();
if (q)
{
Lng32 numEntries = q->numEntries();
loadMsg->initOptionalDataBufs(numEntries, TRUE);
char *s = NULL;
ComUInt32 j = 0;
q->position();
while ((s = (char *) q->getNext()) != NULL)
{
// Each data element is prefixed by a 4-byte length field
UInt32 dataLen = 0;
str_cpy_all((char *)&dataLen, s, 4);
loadMsg->setOptionalDataBuf(j++, s, (dataLen + 4));
} // for each queue element
} // if (q)
if(udrTdb.isTmudf())
{
setTmUdfInfo(loadMsg,udrTdb);
}
msg = loadMsg;
UdrDebug2(" TCB %p created UdrLoadMsg %p", this, msg);
} // case UDR_MSG_LOAD
break;
case UDR_MSG_RS_LOAD:
{
if (rsInfo_)
{
// Need to send UdrEnterTxMsg to udrserver if the
// txState_ is not ACTIVE.
// This change is added as a precautionary measure.
// Normally this should not happen.
rsInfo_->enterUdrTx(*myExeStmtGlobals());
}
releaseServerResources();
setUdrTcbState(SENDING_LOAD);
// Steps to create an RS LOAD message:
// - First create a message object on the IPC heap and initialize
// it with a UDR handle and RS handle
// - Next put metadata for each output column into the object
UdrRSLoadMsg *rsLoadMsg = new (h)
UdrRSLoadMsg(rsIndex_,
udrTdb.getNumOutputValues(),
udrTdb.getReplyRowLen(),
udrTdb.getReplySqlBufferSize(),
udrTdb.getUdrFlags(),
h);
rsLoadMsg->setHandle(udrHandle_);
rsLoadMsg->setRSHandle(rsHandle_);
// This loop will insert output column descriptions into the RS
// LOAD message. Each column is described by a
// UdrFormalParamInfo instance in the TDB and a UdrParameterInfo
// instance in the outgoing message.
ComUInt32 i;
for (i = 0; i < udrTdb.getNumParams(); i++)
{
const UdrFormalParamInfo &formal = *udrTdb.getFormalParamInfo(i);
Attributes &actual = *(udrTdb.getReplyAttr(i));
ComUInt16 fsType = formal.getType();
ComUInt32 dataLength = actual.getLength();
ComUInt32 dataOffset = actual.getOffset();
ComUInt32 lenIndOffset = actual.getVCLenIndOffset();
ComUInt16 lenIndLen = actual.getVCIndicatorLength();
SQLTYPE_CODE ansiType =
(SQLTYPE_CODE) Descriptor::ansiTypeFromFSType(fsType);
UdrParameterInfo info (
i, // Formal: position
formal.getFlags(), // flags
(ComSInt16) fsType, // FS type
ansiType, // ANSI type
0, // length of param name
"", // param name
formal.getPrecision(), // prec
formal.getScale(), // scale
formal.getEncodingCharSet(), // encoding charset
formal.getCollation(), // collation
dataLength, // Actual: dataLength
actual.getNullIndicatorLength(), // nullIndicatorLength
actual.getNullIndOffset(), // nullIndicatorOffset
dataOffset, // dataOffset
(ComSInt32) lenIndOffset, // varchar ind offset
(ComSInt16) lenIndLen // varchar ind length
);
rsLoadMsg->setColumnDesc(i, info);
} // for (i = 0; i < udrTdb.getNumParams(); i++)
msg = rsLoadMsg;
UdrDebug2(" TCB %p created UdrRSLoadMsg %p", this, msg);
} // case UDR_MSG_RS_LOAD
break;
case UDR_MSG_UNLOAD:
{
setUdrTcbState(SENDING_UNLOAD);
msg = new (h) UdrUnloadMsg(h);
msg->setHandle(udrHandle_);
UdrDebug2(" TCB %p created UdrUnloadMsg %p", this, msg);
} // case UDR_MSG_UNLOAD
break;
case UDR_MSG_RS_UNLOAD:
{
setUdrTcbState(SENDING_UNLOAD);
UdrRSUnloadMsg *m = new (h) UdrRSUnloadMsg(h);
m->setHandle(udrHandle_);
m->setRSHandle(rsHandle_);
UdrDebug2(" TCB %p created UdrRSUnloadMsg %p", this, msg);
msg = m;
} // case UDR_MSG_RS_UNLOAD
break;
case UDR_MSG_RS_CLOSE:
{
UdrRSCloseMsg *m = new (h) UdrRSCloseMsg(h);
m->setHandle(udrHandle_);
m->setRSHandle(rsHandle_);
UdrDebug2(" TCB %p created UdrRSCloseMsg %p", this, msg);
msg = m;
} // case UDR_MSG_RS_CLOSE
break;
default:
ex_assert(FALSE, "Trying to send an invalid control message type");
break;
} // switch (t)
//
// Send the message object if one was created
//
if (msg != NULL && verifyUdrServerProcessId())
{
// Now send the control message and keep a pointer to the control
// stream if callbacks are required
NABoolean isTransactional = (transIdToSend == -1 ? FALSE : TRUE);
s = new (getIpcHeap())
UdrClientControlStream(myIpcEnv(),
(callbackRequired ? this : NULL),
myExeStmtGlobals(),
FALSE,
isTransactional);
#ifdef UDR_DEBUG
if (doIpcTrace_ && traceFile_)
s->setTraceFile(traceFile_);
s->setTrustReplies(trustReplies_);
#endif
if(udrTdb.isTmudf())
{
ex_assert(udrServer_->getUdrControlConnection(), "Invalid control connection (IPC error?)");
s->addRecipient(udrServer_->getUdrControlConnection());
}
else
{
s->addRecipient(myExeStmtGlobals()->getUdrConnection());
}
UdrDebug2(" TCB %p created control stream %p", this, s);
*s << *msg;
if (callbackRequired)
{
UdrDebug2(" TCB %p storing pointer to control stream %p",
this, s);
outstandingControlStream_ = s;
}
UdrDebug2(" TCB %p sending data on control stream %p",
this, s);
if ((t == UDR_MSG_LOAD || t == UDR_MSG_RS_LOAD)
&& udrStats_)
{
ComUInt32 sizeOfMsg = msg->packedLength();
udrStats_->getSentControlBuffers().addEntry(sizeOfMsg);
}
if (transIdToSend == -1)
myExeStmtGlobals()->incrementUdrNonTxMsgsOut();
else
myExeStmtGlobals()->incrementUdrTxMsgsOut();
NABoolean sendWaited = FALSE;
s->send(sendWaited, transIdToSend);
} // if (msg != NULL && verifyUdrServerProcessId())
NABoolean result = TRUE;
if (getIpcBroken())
{
result = FALSE;
}
//
// It is OK for this method to abandon the message now. If the
// stream is still holding a pointer to the message object then
// the reference count will not drop to zero and the object will
// not be destroyed until the stream releases it. The
// deallocateMessage() call here is basically a wrapper around
// decrRefCount() and is a no-op if msg is NULL.
//
deallocateMessage(msg);
UdrDebug1(" [END ExUdrTcb::sendControlMessage()] %p", this);
return result;
} // ExUdrTcb::sendControlMessage()
void ExUdrTcb::setTmUdfInfo(UdrLoadMsg *lm, const ExUdrTdb &tdb)
{
// the constructor allocates the needed memory for UdrTableInputInfo
// fill in the tableInputs_ array
Int16 i = 0;
lm->setNumInputTables( numChildren());
// allocate memory to hold table input info
lm->allocateTableInputInfo();
for (i=0; i < numChildren(); i++)
{
// retrieve the info from the tdb
const UdrTableDescInfo *childtab = tdb.getTableDescInfo(i);
UdrTableInputInfo udrTabInput(i,
str_len(childtab->getCorrName()),
childtab->getCorrName(),
childtab->getNumColumns(),
childtab->getRowLength()
);
// construct the column desc info
short j = 0;
for (j=0; j < childtab->getNumColumns(); j++)
{
const UdrColumnDescInfo *colinfo =childtab->getColumnDescInfo(j);
const char *colName = colinfo->getParamName();
ComUInt16 colNameLen = (ComUInt16)str_len(colName);
ComSInt16 fsType = colinfo->getType();
SQLTYPE_CODE ansiType = (SQLTYPE_CODE)Descriptor::ansiTypeFromFSType(fsType);
Attributes *actual = tdb.getChildTableAttr(i,j);
UdrParameterInfo udrcolinfo(j,
colinfo->getFlags(),
fsType,
ansiType,
colNameLen,
colName,
colinfo->getPrecision(),
colinfo->getScale(),
colinfo->getEncodingCharSet(),
colinfo->getCollation(),
actual->getLength(),
actual->getNullIndicatorLength(),
actual->getNullIndOffset(),
actual->getOffset(),
actual->getVCLenIndOffset(),
actual->getVCIndicatorLength()
);
udrTabInput.setInTableColumnDesc(j,udrcolinfo,lm->getHeap());
}// for j
lm->setChildTableInput(i, udrTabInput);
} //for i
}
void ExUdrTcb::releaseRequestBuffer()
{
UdrDebug1(" [BEGIN ExUdrTcb::releaseRequestBuffer() %p]", this);
//
// No need to decrement a reference count now that data buffers
// are managed by a buffered stream rather than on a NAMemory heap.
//
if (requestBuffer_)
{
UdrDebug1(" Request buffer %p is being released",
requestBuffer_);
requestBuffer_ = NULL;
}
UdrDebug1(" [END ExUdrTcb::releaseRequestBuffer() %p]", this);
}
void ExUdrTcb::releaseChildInputBuffer(Int32 i)
{
UdrDebug1(" [BEGIN ExUdrTcb::releaseChildInputBuffer() 0x%08x]", this);
//
// No need to decrement a reference count now that data buffers
// are managed by a buffered stream rather than on a NAMemory heap.
//
if (childInputBuffers_[i])
{
UdrDebug1(" Child Input buffer %p is being released",
childInputBuffers_[i]);
childInputBuffers_[i] = NULL;
}
UdrDebug1(" [END ExUdrTcb::releaseChildInputBuffer() %p]", this);
}
void ExUdrTcb::releaseReplyBuffer()
{
UdrDebug1(" [BEGIN ExUdrTcb::releaseReplyBuffer() %p]", this);
//
// No need to decrement a reference count now that data buffers are
// managed by a buffered stream rather than on a NAMemory heap. The
// stream will use the method msgObjectIsFree() to determine when a
// receive buffer can be reused.
//
if (replyBuffer_)
{
UdrDebug1(" Reply buffer %p is being released", replyBuffer_);
// We need to put the SqlBuffer into a state where it can be freed
// once the reference counts of all tuple descriptors drop to
// zero. This way, when the stream's garbage collection logic
// calls msgObjIsFree() on this UdrDataBuffer instance, TRUE will
// be returned if the SqlBuffer is no longer being referenced and
// the UdrDataBuffer will be released by the stream. We call
// bufferFull() on the SqlBuffer to accomplish this even though
// that method name is a bit misleading.
//
// If the IPC integrity check failed during the unpacking of this
// object then the SqlBuffer pointer will be NULL and that in
// itself puts the UdrDataBuffer in a state where it will
// eventually be released by the stream because
// UdrDataBuffer::msgObjIsFree() will return TRUE.
SqlBuffer *sqlBuf = replyBuffer_->getSqlBuffer();
if (sqlBuf)
{
sqlBuf->bufferFull();
}
else
UdrDebug0( "The sql buffer in reply data buffer is NULL");
replyBuffer_ = NULL;
}
printDataStreamState();
if (dataStream_)
{
UdrDebug0(" About to call dataStream_->cleanupBuffers()");
dataStream_->cleanupBuffers();
UdrDebug0(" Done");
printDataStreamState();
}
UdrDebug1(" [END ExUdrTcb::releaseReplyBuffer() %p]", this);
}
NABoolean ExUdrTcb::replyBufferIsEmpty()
{
//
// Returns TRUE if we don't currently have a reply buffer or
// if there are no more rows available in the reply buffer
//
NABoolean result = TRUE;
if (replyBuffer_ && replyBuffer_->moreRows())
{
result = FALSE;
}
return result;
}
UdrDataBuffer *ExUdrTcb::getRequestBuffer()
{
UdrDataBuffer *result = NULL;
// We now need to allocate two message objects in the buffered data
// stream. The first is a data header and the second a data
// buffer. Stream buffer sizes should have been chosen so that both
// objects fit in a single buffer. If allocation of the header
// fails, we treat that like a blocked buffer pool and return a NULL
// pointer. If allocation of the data buffer fails, that means the
// stream buffer size was set too low and we trigger an assertion
// failure.
const ULng32 sqlBufferSize = myTdb().getRequestSqlBufferSize();
NABoolean isResultSet = myTdb().isResultSetProxy();
UdrMessageObj *header = NULL;
if (isResultSet)
header = new (*dataStream_) UdrRSDataHeader(udrHandle_, rsHandle_);
else if (myTdb().isTmudf())
header = new (*dataStream_) UdrTmudfDataHeader(udrHandle_);
else
header = new (*dataStream_) UdrDataHeader(udrHandle_);
if (header == NULL)
{
UdrDebug0(" [WORK] Cannot allocate the data header");
UdrDebug0(" Stream heap allocation failed");
result = NULL;
}
else
{
result = new (*dataStream_, sqlBufferSize)
UdrDataBuffer (sqlBufferSize, UdrDataBuffer::UDR_DATA_IN, NULL);
ex_assert(result,
"Could not allocate request. Stream buffer size may be low.");
result->setHandle(udrHandle_);
UdrDebug2(" [WORK] Created request buffer %p. SqlBuffer size %u",
result, sqlBufferSize);
UdrDebug1(" [WORK] Number of send buffers in the data stream is %d",
dataStream_->numOfSendBuffers());
ex_assert(dataStream_->numOfSendBuffers() == 1,
"Multiple buffers for single request. Stream buf size too low.");
#ifdef UDR_DEBUG
if (header && (getenv("UDR_INVALID_DATA_HANDLE") != NULL))
{
UdrDebug0(" [WORK] *** DATA HEADER HANDLE IS BEING INVALIDATED ***");
header->setHandle(0);
}
if (getenv("UDR_INVALID_DATA_MSGTYPE") != NULL)
{
UdrDebug0(" [WORK] *** MESSAGE TYPE IS BEING INVALIDATED ***");
result->setType(0);
}
#endif // UDR_DEBUG
} // if (header == NULL) else ...
return result;
} // ExUdrTcb::getRequestBuffer()
UdrDataBuffer *ExUdrTcb::getReplyBuffer()
{
NABoolean doIntegrityChecks = TRUE;
NABoolean integrityCheckResult = TRUE;
#ifdef UDR_DEBUG
// Integrity checks can be disabled in the debug build by an
// environment setting. trustReplies_ stores the environment
// setting and is initialized by initializeDebugVariables().
doIntegrityChecks = !trustReplies_;
#endif
if (replyBuffer_ == NULL)
{
IpcMessageObjType msgType;
if (dataStream_ && dataStream_->getNextReceiveMsg(msgType)
&& dataStream_->getNextObjType(msgType))
{
switch (msgType)
{
case UDR_MSG_DATA_REPLY:
{
if (doIntegrityChecks)
{
IpcMessageObjSize objSize = dataStream_->getNextObjSize();
replyBuffer_ = new (dataStream_->receiveMsgObj())
UdrDataBuffer(dataStream_, objSize, integrityCheckResult);
}
else
{
replyBuffer_ = new (dataStream_->receiveMsgObj())
UdrDataBuffer(dataStream_);
}
if (integrityCheckResult)
{
if (dataStream_->getNextObjType(msgType))
{
#ifdef UDR_DEBUG
UdrIpcObjectType udrType = (UdrIpcObjectType) msgType;
if (udrType > UDR_IPC_FIRST && udrType < UDR_IPC_LAST)
UdrDebug1("Found an object of type %s in the stream",
GetUdrIpcTypeString(udrType));
else
UdrDebug1("Found an object of type %d in the stream",
(Lng32) msgType);
#endif
if (msgType == UDR_MSG_RS_INFO)
{
NAMemory *h = getIpcHeap();
UdrRSInfoMsg *rsInfoMsg = new (h) UdrRSInfoMsg(h);
UdrDebug0("About to extract UDR_MSG_RS_INFO from stream");
integrityCheckResult =
dataStream_->extractNextObj(*rsInfoMsg, doIntegrityChecks);
if (integrityCheckResult)
{
ex_assert(rsInfo_,
"RS INFO msg received but ExRsInfo ptr is NULL");
UdrDebug0("About to push RS info into statement globals");
ComUInt32 numRs = rsInfoMsg->getNumResultSets();
for (ComUInt32 i=0; i < numRs; i++)
{
ResultSetInfo *rsInfo = &rsInfoMsg->getRSInfo(i);
const char *s = rsInfo->getProxySyntax();
rsInfo_->populate(i + 1, s);
}
UdrDebug0("About to push UDR Server info to stmt globals");
rsInfo_->setUdrServer(udrServer_);
rsInfo_->setIpcProcessId(serverProcessId_);
rsInfo_->setUdrHandle(udrHandle_);
//Release the RS info message object
rsInfoMsg->decrRefCount();
} // if (integrityCheckResult)
else
{
UdrDebug0("*** ERROR: extractNextObj() returned FALSE");
}
} // if (msgType == UDR_MSG_RS_INFO)
} // if (dataStream_->getNextObjType(msgType))
if (integrityCheckResult)
{
#ifdef UDR_DEBUG
if (doTrace_ || doIpcTrace_)
{
FILE *f = (traceFile_ ? traceFile_ : stdout);
UdrPrintf(f,
" [WORK] Received UdrDataBuffer %p, %d tupps",
replyBuffer_,
replyBuffer_->getSqlBuffer()->getTotalTuppDescs());
}
#endif
}
}
}
break;
case UDR_MSG_ERROR_REPLY:
{
UdrDebug0("About to extract UdrErrorReply from stream");
UdrErrorReply *reply = new (getIpcHeap()) UdrErrorReply(getIpcHeap());
integrityCheckResult =
dataStream_->extractNextObj(*reply, doIntegrityChecks);
reply->decrRefCount();
reply = NULL;
if (!integrityCheckResult)
{
UdrDebug0("*** ERROR: extractNextObj() for diags returned FALSE");
}
IpcMessageObjType t;
if (dataStream_->getNextObjType(t) && t == IPC_SQL_DIAG_AREA)
{
ComDiagsArea *returnedDiags = ComDiagsArea::allocate(getHeap());
integrityCheckResult =
dataStream_->extractNextObj(*returnedDiags, doIntegrityChecks);
if (integrityCheckResult)
{
UdrDebug1(" [WORK] Diags arrived with this row, count %d",
(Lng32) returnedDiags->getNumber());
getOrCreateStmtDiags()->mergeAfter(*returnedDiags);
}
else
{
UdrDebug0("*** ERROR: extractNextObj() for diags returned FALSE");
}
returnedDiags->decrRefCount();
returnedDiags = NULL;
}
else
{
UdrDebug0("*** ERROR: Expected diags in the stream but found none");
integrityCheckResult = FALSE;
}
}
break;
default:
{
//
// An unexpected message type arrived. Treat this the same
// as an integrity check failure.
//
UdrDebug0(" ***");
UdrDebug0(" *** WARNING: An unexpected message arrived");
UdrDebug1(" *** Message type is %d", (Lng32) msgType);
UdrDebug0(" ***");
integrityCheckResult = FALSE;
}
break;
} // switch (msgType)
} // if (there are message objects to process)
} // if (replyBuffer_ == NULL)
if (!integrityCheckResult)
{
UdrDebug0("*** ERROR: Integrity check failed for data reply");
ComDiagsArea *diags = getOrCreateStmtDiags();
addIntegrityCheckFailureToDiagsArea(diags);
releaseReplyBuffer();
setUdrTcbState(IPC_ERROR);
}
return replyBuffer_;
} // ExUdrTcb::getReplyBuffer()
ExWorkProcRetcode ExUdrTcb::returnSingleRow()
{
ExWorkProcRetcode result = WORK_OK;
ControlInfo *ci = NULL;
ex_queue_entry *upEntry = qParent_.up->getTailEntry();
SqlBuffer *sqlBuf = replyBuffer_->getSqlBuffer();
ex_assert(sqlBuf, "UDR reply buffer is corrupt or contains no data");
tupp output_row;
ULng32 numOutputValues = myTdb().getNumOutputValues();
NABoolean moveOutputValues =
(myTdb().getOutputExpression() == NULL ? FALSE : TRUE);
// At this point we do not know if the next entry in the reply
// buffer is data or a Q_NO_DATA indicator. But we allocate a
// tupp for our output row ahead of time to avoid the situation
// where we have extracted a data row from the reply buffer but
// then our output buffer pool is full. In the case where a
// tupp is allocated but never used (because the next reply buffer
// entry is not data) the tupp will eventually be recycled
// because its reference count stays at zero until it gets placed
// into an ATP. If there was a way to "peek" into the reply buffer
// and see if there were more data rows, then we could avoid
// allocating the output tupp until we were sure there was data
// to process.
if (moveOutputValues)
{
short ok = outputPool_->
get_free_tuple(output_row, (Lng32) myTdb().getOutputRowLen());
if (ok != 0)
result = WORK_POOL_BLOCKED;
}
if (result == WORK_OK)
{
// Get a pointer to the next reply row. If there are diags
// associated with the reply row they are not packed into
// the SqlBuffer. We will extract a ComDiagsArea from the
// message stream later if the ControlInfo for this row
// indicates that there are diags.
tupp reply_row;
NABoolean endOfData = sqlBuf->moveOutSendOrReplyData(
FALSE, // [IN] sending? (vs. replying)
&(upEntry->upState), // [OUT] queue state
reply_row, // [OUT] new data tupp_descriptor
&ci, // [OUT] new ControlInfo area
NULL, // [OUT] new diags area
NULL // [OUT] new stats area
);
ex_queue::up_status status = upEntry->upState.status;
UdrDebug2(" [WORK] Next reply row: eod %s, status %s",
TF_STRING(endOfData), GetUpStatusString(status));
if (!endOfData)
{
// Process a row from the reply buffer
ex_queue_entry *downEntry = qParent_.down->getHeadEntry();
ExUdrPrivateState &down_pstate =
*((ExUdrPrivateState *) downEntry->pstate);
upEntry->upState.downIndex = qParent_.down->getHeadIndex();
upEntry->upState.parentIndex = downEntry->downState.parentIndex;
// Check for a diags area coming back with this row
ComDiagsArea *returnedDiags = NULL;
NABoolean doIntegrityChecks = TRUE;
NABoolean integrityCheckResult = TRUE;
#ifdef UDR_DEBUG
// Integrity checks can be disabled in the debug build by an
// environment setting. trustReplies_ stores the environment
// setting and is initialized by initializeDebugVariables().
doIntegrityChecks = !trustReplies_;
#endif
if (!ci)
{
UdrDebug0("*** ERROR: No ControlInfo available for this reply row");
integrityCheckResult = FALSE;
}
if (ci && ci->getIsExtDiagsAreaPresent())
{
IpcMessageObjType t;
returnedDiags = ComDiagsArea::allocate(getHeap());
if (dataStream_->getNextObjType(t) && t == IPC_SQL_DIAG_AREA)
{
integrityCheckResult =
dataStream_->extractNextObj(*returnedDiags, doIntegrityChecks);
if (!integrityCheckResult)
{
UdrDebug0("*** ERROR: extractNextObj() for diags returned FALSE");
}
}
else
{
UdrDebug0("*** ERROR: Expected diags in the stream but found none");
returnedDiags->decrRefCount();
returnedDiags = NULL;
integrityCheckResult = FALSE;
}
if (integrityCheckResult)
{
UdrDebug1(" [WORK] Diags arrived with this row, count %d",
(Lng32) returnedDiags->getNumber());
}
}
if (integrityCheckResult)
{
// The server should only return error diagnostics with a
// Q_SQLERROR entry. But if we happen to receive error
// diagnostics with a Q_OK_MMORE entry, let's tolerate that
// and treat it as a Q_SQLERROR entry.
if (status == ex_queue::Q_OK_MMORE &&
returnedDiags &&
returnedDiags->getNumber(DgSqlCode::ERROR_) > 0)
status = ex_queue::Q_SQLERROR;
switch (status)
{
case ex_queue::Q_OK_MMORE:
{
if (down_pstate.step_ != CANCEL_AFTER_SEND)
{
ComDiagsArea *validationDiags = NULL;
if (validateDataRow(reply_row, validationDiags) == FALSE)
{
ex_assert(validationDiags,
"validateDataRow() did not return diagnostics");
insertUpQueueEntry(ex_queue::Q_SQLERROR, validationDiags);
validationDiags->decrRefCount();
}
else
{
// We have a data row to evaluate.
//
// Steps to perform are:
// 1. Initialize the up queue entry ATP
// 2. Evaluate the output expression if one is present
// 3. Evaluate the predicate if one is present
// 4. Copy server diags to the up queue entry
// 5. Return an up queue entry
//
// Skip steps 4 and 5 if the predicate is not satisfied.
// Skip step 3 if errors are encountered in step 2.
NABoolean outputExprError = FALSE;
NABoolean predSatisfied = TRUE;
// 1. Initialize the up entry ATP. copyAtp() moves any
// diags associated with the down entry ATP to the up
// entry ATP.
upEntry->copyAtp(downEntry);
if (numOutputValues > 0)
{
upEntry->getTupp(myTdb().getNumOutputTupps() - 1) =
(moveOutputValues ? output_row : reply_row);
}
// 2. Evaluate the output expression
if (moveOutputValues)
{
workAtp_->getTupp(myTdb().getReplyTuppIndex()) = reply_row;
// Evaluate the output expression. If diags are
// created they will be stored in the up entry
// ATP. If errors occur during expression evaluation
// we will go ahead and insert a data row plus diags
// in the up queue.
ex_expr::exp_return_type expStatus =
myTdb().getOutputExpression()->eval(upEntry->getAtp(),
workAtp_);
if (expStatus != ex_expr::EXPR_OK)
{
outputExprError = TRUE;
UdrDebug1(" [WORK] outputExpr.eval() returned %s",
GetExpStatusString(expStatus));
}
workAtp_->getTupp(myTdb().getReplyTuppIndex()).release();
}
// 3. Evaluate the predicate
ex_expr *predicate = myTdb().getPredicate();
if (!outputExprError && predicate)
{
ex_expr::exp_return_type expStatus =
predicate->eval(upEntry->getAtp(), 0);
if (expStatus == ex_expr::EXPR_ERROR)
{
UdrDebug1(" [WORK] predicate.eval() returned %s",
GetExpStatusString(expStatus));
}
else if (expStatus == ex_expr::EXPR_FALSE)
{
// Record the fact that the predicate is not
// satisfied. Also decrement the reference count
// on any diags area pointed to by this up queue
// entry because the entry is about to be
// abandoned.
predSatisfied = FALSE;
upEntry->setDiagsArea(NULL);
}
}
// 4. Move server diags to the up queue entry
if (predSatisfied && returnedDiags)
{
ComDiagsArea *atpDiags = upEntry->getDiagsArea();
if (atpDiags == NULL)
{
// setDiagsArea() does not increment the reference count
upEntry->setDiagsArea(returnedDiags);
returnedDiags->incrRefCount();
}
else
{
atpDiags->mergeAfter(*returnedDiags);
}
}
// 5. Return an up queue entry
if (predSatisfied)
{
down_pstate.matchCount_++;
upEntry->upState.setMatchNo(down_pstate.matchCount_);
qParent_.up->insert();
}
// Increment actual row count (1 by default) in
// statistics area
if (udrBaseStats_ != NULL)
udrBaseStats_->incActualRowsReturned();
} // if (validateDataRow()) else ...
} // if (down_pstate.step_ != CANCEL_AFTER_SEND)
} // case ex_queue::Q_OK_MMORE
break;
case ex_queue::Q_NO_DATA:
{
insertUpQueueEntry(ex_queue::Q_NO_DATA, returnedDiags);
} // case ex_queue::Q_NO_DATA
break;
case ex_queue::Q_SQLERROR:
{
insertUpQueueEntry(ex_queue::Q_SQLERROR, returnedDiags);
} // case ex_queue::Q_SQLERROR
break;
case ex_queue::Q_INVALID:
default:
{
UdrDebug0("*** ERROR: Invalid queue status for this reply row");
integrityCheckResult = FALSE;
} // case ex_queue::Q_INVALID
break;
} // switch (status)
} // if (integrityCheckResult)
if (!integrityCheckResult)
{
UdrDebug0("*** ERROR: Integrity check failed for this reply row");
ComDiagsArea *newDiags = ComDiagsArea::allocate(getHeap());
addIntegrityCheckFailureToDiagsArea(newDiags);
// Currently our strategy is to treat an integrity check
// failure as an IPC error. This has the side-effect of
// causing the TCB to return WORK_BAD_ERROR to the
// scheduler. When WORK_BAD_ERROR is returned our up queue
// entries get ignored and diagnostics associated with up
// queue entries do not propagate to the application. So even
// though we are working on a specific row right now, we need
// to put diagnostics in the statement diags area so that the
// application is sure to see them. If we changed our error
// handling strategy so that integrity check failures did not
// cause the TCB to return WORK_BAD_ERROR, then we could put
// newDiags in the Q_SQLERROR up queue entry instead of in the
// statement diags area.
// insertUpQueueEntry(ex_queue::Q_SQLERROR, newDiags);
insertUpQueueEntry(ex_queue::Q_NO_DATA);
ComDiagsArea *stmtDiags = getOrCreateStmtDiags();
stmtDiags->mergeAfter(*newDiags);
releaseReplyBuffer();
setUdrTcbState(IPC_ERROR);
newDiags->decrRefCount();
}
if (returnedDiags)
{
returnedDiags->decrRefCount();
returnedDiags = NULL;
}
} // if (!endOfData)
} // if (result == WORK_OK)
return result;
} // ExUdrTcb::returnSingleRow()
//
// Cancel processing
//
ExWorkProcRetcode ExUdrTcb::workCancel()
{
UdrDebug1("[BEGIN TCB CANCEL] %p", this);
ExWorkProcRetcode result = WORK_OK;
// We will keep track of whether any UDR server replies are
// still required after cancelled entries have been processed.
// Replies will be required if some, not all, down queue
// entries have been cancelled and at least one not-cancelled
// entry has already been sent to the UDR server.
NABoolean anyEntriesCancelled = FALSE;
NABoolean repliesRequired = FALSE;
NABoolean needToSendCancelToServer = FALSE;
NABoolean isResultSet = myTdb().isResultSetProxy();
if (!anyOutstandingQueueRequests())
{
UdrDebug0(" No outstanding entries to cancel");
}
else
{
// Down queue entries that are still being processed will have an
// index i, where i logically falls between the head index and
// nextToSend_. If any such entry has been cancelled (state was
// changed to GET_NOMORE), then process the cancel request for
// that entry.
for (queue_index i = qParent_.down->getHeadIndex(); i != nextToSend_; i++)
{
ex_queue_entry *downEntry = qParent_.down->getQueueEntry(i);
ex_queue::down_request &requestType = downEntry->downState.request;
ExUdrPrivateState &down_pstate =
*((ExUdrPrivateState *) downEntry->pstate);
UdrTcbSendStep &sendStep = down_pstate.step_;
switch (requestType)
{
case ex_queue::GET_N:
case ex_queue::GET_ALL:
{
if (sendStep == STARTED)
{
repliesRequired = TRUE;
}
} // case GET_N, GET_ALL
break;
case ex_queue::GET_NOMORE:
{
switch (sendStep)
{
case NOT_STARTED:
sendStep = CANCEL_BEFORE_SEND;
anyEntriesCancelled = TRUE;
break;
case STARTED:
sendStep = CANCEL_AFTER_SEND;
anyEntriesCancelled = TRUE;
needToSendCancelToServer = TRUE;
for (Int32 j=0; j< numChildren(); j++)
{
qChild_[j].down->cancelRequestWithParentIndex(i);
}
break;
default:
break;
} // switch (sendStep)
} // case GET_NOMORE
break;
default:
{
ex_assert(FALSE, "Invalid request type in TCB down queue");
} // default
break;
} // switch (requestType)
} // for (each queue index i)
} // if (!anyOutstandingQueueRequests()) else
if (repliesRequired)
{
#ifdef UDR_DEBUG
// Print a message even if the UDR_DEBUG environment variable
// is not set. This may expose cancel situations that have not
// yet been considered.
FILE *f = (traceFile_ ? traceFile_ : stdout);
UdrPrintf(f, " ***");
UdrPrintf(f, " *** WARNING: Some but not all queue entries cancelled");
UdrPrintf(f, " ***");
#endif // UDR_DEBUG
}
else if (anyEntriesCancelled)
{
UdrDebug0(" ***");
UdrDebug0(" *** WARNING: All queue entries were cancelled");
UdrDebug0(" ***");
if (isResultSet)
{
if (needToSendCancelToServer)
{
// If this is a result set, we tell the server that it can stop
// producing rows. Note that rows may still continue to
// arrive. All we are doing is telling the server to stop
// producing as soon as possible, we are NOT putting the TCB
// into a state where it cannot (or will not) process more reply
// rows.
// First make sure the server is working in the current
// transaction
if (rsInfo_)
rsInfo_->enterUdrTx(*myExeStmtGlobals());
#ifdef UDR_DEBUG
// Print a message if either the UDR_DEBUG or
// UDR_CONTINUE_DEBUG environment variable is set.
if (doTrace_ || (getenv("UDR_CONTINUE_DEBUG") != NULL))
{
FILE *f = (traceFile_ ? traceFile_ : stdout);
UdrPrintf(f, " [CANCEL] Sending RS CLOSE");
}
#endif
// Now send an RS CLOSE request
sendControlMessage(UDR_MSG_RS_CLOSE,
FALSE); // FALSE means the TCB does not require
// a callback when I/O completes
}
}
else
{
// The commentary below applies to CALL statements, not result
// sets, because for CALL statements we do not currently have any
// type of cancel protocol with the server.
//
// What we would like to do now is tell the server to stop working
// on our requests. But the UDR server is currently a serial,
// single-threaded server and does not support any sort of cancel
// protocol.
//
// Thinking about how we got here, it is likely that the executor
// is sending out internal cancel requests because on NSK we do not
// yet support application-initiated cancel. An application can stop
// a SQL statement with Ctrl-C but that event is detected and handled
// in the IPC layers. Since it is likely that this cancellation is
// only being done for internal correctness, not because the application
// wants to interrupt SQL, we will not take aggressive action like
// killing the server. We will do nothing and let normal work method
// processing handle UDR replies.
//
// For what it's worth, the following line could be used to kill
// the UDR server...
// udrServer_->kill(getOrCreateStmtDiags());
//
}
} // else if (anyEntriesCancelled)
UdrDebug2("[END TCB CANCEL] %p. Return value is %s\n",
this, GetWorkRetcodeString(result));
return result;
}
// Returns TRUE if a UDR Server connection exists and the running
// server process ID matches the server process ID stored in this
// TCB. Generates a diagnostic if this TCB was storing a valid
// process ID that does not match the running server.
NABoolean ExUdrTcb::verifyUdrServerProcessId()
{
NABoolean result = FALSE;
if (!getIpcBroken() && udrServer_ && !ProcessIdIsNull(serverProcessId_))
{
if (udrServer_->getState() != ExUdrServer::EX_UDR_BROKEN &&
udrServer_->getServerProcessId() == serverProcessId_)
{
result = TRUE;
}
else
{
setUdrTcbState(IPC_ERROR);
ComDiagsArea *da = getOrCreateStmtDiags();
*da << DgSqlCode(-EXE_UDR_SERVER_WENT_AWAY);
#ifdef UDR_DEBUG
char buf1[300];
serverProcessId_.toAscii(buf1, 300);
UdrDebug0("***");
UdrDebug1("*** WARNING: UDR Server for TCB %p is not running", this);
UdrDebug1("*** TCB state is %s", getUdrTcbStateString(state_));
UdrDebug1("*** Server for this TCB was %s", buf1);
UdrDebug0("***");
if (udrServer_ && udrServer_->getUdrControlConnection())
{
char buf2[300];
const IpcProcessId &runningServer =
udrServer_->getUdrControlConnection()->getOtherEnd();
runningServer.toAscii(buf2, 300);
UdrDebug1("*** Running server is %s", buf2);
}
else
{
UdrDebug0("*** No server is currently running");
}
#endif // UDR_DEBUG
}
}
return result;
}
void ExUdrTcb::reportDataArrival()
{
UdrDebug1(" Data arrived for TCB %p. TCB is being scheduled", this);
// It is possible that we are in the WORK_IO_ACTIVE state and
// conditions to transition back to WORK have now been met. For
// example we might have just completed all outstanding I/O. The
// following method checks the conditions and transitions to WORK if
// appropriate. See commentary in the method for more detail.
attemptTransitionToWorkState();
printDataStreamState();
tickleSchedulerWork();
}
void ExUdrTcb::attemptTransitionToWorkState()
{
// We can transition from WORK_IO_ACTIVE to WORK if the following
// are true:
// * No down queue requests being processed by the server
// * No pending I/O (such as continue requests) in the data stream
//
// Things to note:
//
// * Within other methods of this TCB, any time one of these
// conditions potentially changes (for example a receive callback
// arrives) then this method should be called to see if a transition
// from WORK_IO_ACTIVE to WORK is appropriate.
//
// * Sending a data request while the TCB is in the WORK_IO_ACTIVE
// state can lead to problems. The data stream does not seem to
// tolerate this. Callbacks can become unreliable or we encounter
// stream limits and never drive I/O to completion, leading to
// hangs. The code to send data requests to the server is in method
// buildAndSendRequestBuffer() and that method contains logic to
// never send a request unless the TCB is in the WORK state.
//
if (state_ == WORK_IO_ACTIVE &&
!anyOutstandingQueueRequests() &&
dataStream_->numOfResponsesPending() == 0)
setUdrTcbState(WORK);
}
NABoolean ExUdrTcb::setUdrTcbState(UdrTcbState target)
{
NABoolean result = TRUE;
UdrTcbState source = state_;
#ifdef UDR_DEBUG
if (doTrace_ || doStateTrace_)
{
FILE *f = traceFile_;
UdrPrintf(f, "*** TCB %p state transition: %s -> %s",
this, getUdrTcbStateString(source),
getUdrTcbStateString(target));
}
#endif // UDR_DEBUG
//
// This switch statement sets result to FALSE if an unexpected transition
// is happening. It may be that the unexpected transition is valid, but
// happens to be one that the code does not handle correctly or gracefully
// yet. If that is the case then a new transition should be added to the
// set of "expected" transitions and changes should be made in ExUdrTcb
// code to handle the new transition.
//
// Current thinking about expected transitions:
// - FIXUP should never be skipped
// - We cannot transition to DONE without sending an UNLOAD if
// a LOAD has been sent. But if an IpcError occurred then no
// UNLOAD needs to be sent.
// - We allow a transition from IPC_ERROR back to FIXUP.
// - We do not transition out of DONE. DONE means the TCB is going away.
//
switch (source)
{
case BUILD:
if (target != FIXUP && target != DONE)
{
result = FALSE;
}
break;
case FIXUP:
if (target != SENDING_LOAD && target != DONE)
{
result = FALSE;
}
break;
case SENDING_LOAD:
if (target != WORK && target != LOAD_FAILED &&
target != SENDING_UNLOAD && target != IPC_ERROR)
{
result = FALSE;
}
break;
case LOAD_FAILED:
if (target != SENDING_UNLOAD)
{
result = FALSE;
}
break;
case WORK:
if (target != WORK_IO_ACTIVE && target != SENDING_UNLOAD
&& target != IPC_ERROR && target != SCALAR_INPUT_READY_TO_SEND)
{
result = FALSE;
}
break;
case WORK_IO_ACTIVE:
if (target != WORK && target != SENDING_UNLOAD && target != IPC_ERROR &&
target != READ_TABLE_INPUT_FROM_CHILD)
{
result = FALSE;
}
break;
case SENDING_UNLOAD:
if (target != IPC_ERROR && target != DONE)
{
result = FALSE;
}
break;
case IPC_ERROR:
if (target != DONE && target != FIXUP)
{
result = FALSE;
}
break;
case DONE:
result = FALSE;
break;
case SCALAR_INPUT_READY_TO_SEND:
if (target != WORK_IO_ACTIVE)
{
return FALSE ;
}
break;
case READ_TABLE_INPUT_FROM_CHILD:
if (target != RETURN_ROWS_FROM_CHILD)
{
return FALSE;
}
break;
case RETURN_ROWS_FROM_CHILD :
if (target != CHILD_INPUT_READY_TO_SEND )
{
return FALSE;
}
break;
case CHILD_INPUT_READY_TO_SEND :
if (target != WORK_IO_ACTIVE)
{
return FALSE;
}
break;
default:
result = FALSE;
break;
}
if (!result)
{
char buf[256];
str_sprintf(buf, "Invalid UDR state transition: %s -> %s",
getUdrTcbStateString(source), getUdrTcbStateString(target));
ex_assert(FALSE, buf);
}
else
{
state_ = target;
}
return result;
}
const char *ExUdrTcb::getUdrTcbStateString(UdrTcbState s)
{
switch (s)
{
case BUILD: return "BUILD";
case FIXUP: return "FIXUP";
case SENDING_LOAD: return "SENDING_LOAD";
case LOAD_FAILED: return "LOAD_FAILED";
case WORK: return "WORK";
case WORK_IO_ACTIVE: return "WORK_IO_ACTIVE";
case SENDING_UNLOAD: return "SENDING_UNLOAD";
case IPC_ERROR: return "IPC_ERROR";
case DONE: return "DONE";
case SCALAR_INPUT_READY_TO_SEND: return "SCALAR_INPUT_READY_TO_SEND";
case READ_TABLE_INPUT_FROM_CHILD : return "READ_TABLE_INPUT_FROM_CHILD";
case RETURN_ROWS_FROM_CHILD: return "RETURN_ROWS_FROM_CHILD";
case CHILD_INPUT_READY_TO_SEND: return "CHILD_INPUT_READY_TO_SEND";
default: return ComRtGetUnknownString((Int32) s);
}
}
ExWorkProcRetcode ExUdrTcb::continueRequest()
{
ExWorkProcRetcode result = WORK_OK;
if (anyOutstandingQueueRequests() && state_ == WORK_IO_ACTIVE)
{
// Send continue requests until the limit for outstanding requests
// or for incoming and used buffers is reached
dataStream_->cleanupBuffers();
NABoolean done = FALSE;
while (!done)
{
if (getIpcBroken())
{
result = WORK_BAD_ERROR;
done = TRUE;
}
else if (dataStream_->sendLimitReached())
{
UdrDebug0(" [CONTINUE] Cannot send continue request");
UdrDebug0(" sendLimitReached() returned TRUE");
printDataStreamState();
result = WORK_OK;
done = TRUE;
}
else if (dataStream_->inUseLimitReached())
{
// We can't send another continue request down because there
// is no room up here to take the reply. Return a status
// that causes this task to be rescheduled. Next time,
// methods cleanupBuffers() and checkReceive() can reduce
// the number of in-use buffers and of unread buffers in the
// receive queue.
UdrDebug0(" [CONTINUE] Cannot send continue request");
UdrDebug0(" inUseLimitReached() returned TRUE");
printDataStreamState();
result = WORK_POOL_BLOCKED;
done = TRUE;
}
else
{
if (myTdb().isTmudf())
{
UdrDebug0("Check for input buffers");
printDataStreamState();
if (dataStream_->numOfInputBuffers()> 0)
{
UdrDebug0("Not sending continue request since input buffers is >0");
result = WORK_OK;
done = TRUE;
break;
}
}
NABoolean isResultSet = myTdb().isResultSetProxy();
UdrMessageObj *m = NULL;
if (isResultSet)
m = new (*dataStream_) UdrRSContinueMsg(udrHandle_, rsHandle_);
else
m = new (*dataStream_) UdrContinueMsg(udrHandle_);
if (m == NULL)
{
UdrDebug0(" [CONTINUE] Cannot send continue request");
UdrDebug0(" Buffer allocation failed");
printDataStreamState();
result = WORK_POOL_BLOCKED;
done = TRUE;
}
else
{
// If this is a CALL that produces result sets or a result
// set (both are indicated by a non-NULL rsInfo_ pointer)
// then we may need to bring the UDR server into the current
// transaction before sending the continue request.
if (rsInfo_)
rsInfo_->enterUdrTx(*myExeStmtGlobals());
// Determine the transid we should send in the message. We
// use the statement transid if all the following are true
// a. the TDB says we require a transaction
// b. this is NOT a CALL that returns result sets
// c. this is NOT a result set
//
// Note: (b) and (c) are both indicated by rsInfo_ == NULL
//
Int64 &stmtTransId = myExeStmtGlobals()->getTransid();
Int64 transIdToSend = -1; // -1 is an invalid transid
if (dataRequestsAreTransactional())
transIdToSend = stmtTransId;
#ifdef UDR_DEBUG
// Print a message if tracing is enabled or
// UDR_CONTINUE_DEBUG environment variable is set.
if (doTrace_ ||
doIpcTrace_ ||
(getenv("UDR_CONTINUE_DEBUG") != NULL))
{
FILE *f = (traceFile_ ? traceFile_ : stdout);
char stmtTx[256];
char msgTx[256];
TransIdToText(stmtTransId, stmtTx, 256);
TransIdToText(transIdToSend, msgTx, 256);
if (isResultSet)
UdrPrintf(f, " [CONTINUE] Sending RS CONTINUE %d",
(int) continueMsgsSent_ + 1);
else
UdrPrintf(f,
" [CONTINUE] Sending CONTINUE %d, msg tx %s",
(int) continueMsgsSent_ + 1, msgTx);
}
if (getenv("UDR_INVALID_CONTINUE_HANDLE") != NULL)
{
UdrDebug0(" [CONTINUE] *** HANDLE IS BEING INVALIDATED ***");
m->setHandle(0);
}
#endif // UDR_DEBUG
if (udrStats_)
{
udrStats_->getSentContinueBuffers().addEntry(sizeof(*m));
}
if (state_ == WORK)
setUdrTcbState(WORK_IO_ACTIVE);
if (dataRequestsAreTransactional())
myExeStmtGlobals()->incrementUdrTxMsgsOut();
else
myExeStmtGlobals()->incrementUdrNonTxMsgsOut();
dataStream_->sendRequest(transIdToSend);
continueMsgsSent_++;
printDataStreamState();
}
}
} // while (!done)
} // if (anyOutstandingQueueRequests() && state_ == WORK_IO_ACTIVE)
return result;
} // ExUdrTcb::continueRequest()
// work method for handling table mapping udfs.
// This work method will handle scalar inputs from the parent and
// table valued inputs from one or more children.
ExWorkProcRetcode ExUdrTcb::tmudfWork()
{
#ifdef UDR_DEBUG
static Lng32 workCount = 0;
if (doTrace_)
{
workCount++;
FILE *f = traceFile_;
char text[256];
TransIdToText(myExeStmtGlobals()->getTransid(), text, 256);
ex_queue *up = qParent_.up;
ex_queue *dn = qParent_.down;
UdrPrintf(f, "[BEGIN TMUDF WORK %d] %p", workCount, this);
UdrPrintf(f, " [TMUDF WORK] Stmt tx %s", text);
UdrPrintf(f, " [TMUDF WORK] TCB state %s", getUdrTcbStateString(state_));
UdrPrintf(f, " [TMUDF WORK] Down queue: head %d, tail %d, len %d, size %d",
(Lng32) dn->getHeadIndex(), (Lng32) dn->getTailIndex(),
(Lng32) dn->getLength(), (Lng32) dn->getSize());
UdrPrintf(f, " [TMUDF WORK] Up queue: head %d, tail %d, len %d, size %d",
(Lng32) up->getHeadIndex(), (Lng32) up->getTailIndex(),
(Lng32) up->getLength(), (Lng32) up->getSize());
UdrPrintf(f, " [TMUDF WORK] Next to send: %d", (Lng32) nextToSend_);
printDataStreamState();
}
#endif // UDR_DEBUG
ExWorkProcRetcode result = WORK_OK;
result = tmudfCheckReceive();
UdrDebug1(" [TMUDF WORK] tmudfCheckReceive() returned %s",
GetWorkRetcodeString(result));
if (result == WORK_OK)
{
result = tmudfCheckSend();
UdrDebug1(" [TMUDF WORK] tmudfCheckSend() returned %s",
GetWorkRetcodeString(result));
}
if (result == WORK_OK)
{
result = continueRequest();
UdrDebug1(" [TMUDF WORK] continueRequest() returned %s",
GetWorkRetcodeString(result));
}
if (qParent_.down->isEmpty())
{
UdrDebug0(" [TMUDF WORK] Down queue is empty");
// There are no more parent requests to process. We now want to
// perform garbage collection of buffers in the data stream.
// There seems to be a bug in stream message processing code
// that moves the message from RECEIVE queue to IN USE queue only
// when you try to get the next message from the stream and it
// only garbage collects the IN USE queue.
// So we ask the stream for one more buffer even though we know
// there won't be one. By doing this we guarantee that all
// incoming buffers have moved off the stream's RECEIVE queue and
// on to the IN USE queue.
// If there does happen to be another buffer in the stream,
// that is an internal error.
//
// This cleanup technique is borrowed from the send top node (code
// is in ex_send_top.cpp) and fixes the memory leak reported in
// solution 10-050707-9503.
UdrDebug0(" [TMUDF WORK] Releasing unused IPC buffers in the data stream...");
IpcMessageObjType msgType;
if (dataStream_ && dataStream_->getNextReceiveMsg(msgType))
{
char buf[256];
dataStream_->getNextObjType(msgType);
str_sprintf(buf, "An unexpected message of type %d arrived",
(Int32) msgType);
ex_assert(FALSE, buf);
}
dataStream_->releaseBuffers();
UdrDebug0(" [TMUDF WORK] Done releasing IPC buffers");
} // if (qParent_.down->isEmpty())
#ifdef UDR_DEBUG
if (doTrace_)
{
FILE *f = traceFile_;
UdrPrintf(f, " [TMUDF WORK] UDR TX messages outstanding: %d",
(Lng32) myExeStmtGlobals()->numUdrTxMsgsOut());
UdrPrintf(f, " [TMUDF WORK] UDR Non-TX messages outstanding: %d",
(Lng32) myExeStmtGlobals()->numUdrNonTxMsgsOut());
UdrPrintf(f, " [TMUDF WORK] TCB state: %s", getUdrTcbStateString(state_));
UdrPrintf(f, "[END TMUDF WORK %d] 0x%08x. Return value is %s\n",
workCount, this, GetWorkRetcodeString(result));
}
#endif // UDR_DEBUG
return result;
} // ExUdrTdb::tmudfWork()
ExWorkProcRetcode ExUdrTcb::tmudfCheckReceive()
{
// Work method helper function that attempts to process output rows
ExWorkProcRetcode result = WORK_OK;
NABoolean done = FALSE;
UdrDebug0(" [TMUDF WORK] Inside tmudfCheckReceive");
while (!done && result == WORK_OK
&& anyOutstandingQueueRequests()
&& !qParent_.up->isFull()
&& ((state_ != READ_TABLE_INPUT_FROM_CHILD) &&
(state_ != RETURN_ROWS_FROM_CHILD)))
{
if (getIpcBroken())
{
UdrDebug0(" [TMUDF WORK] IPC is broken");
result = WORK_BAD_ERROR;
}
else
{
ex_queue_entry *downEntry = qParent_.down->getHeadEntry();
ExUdrPrivateState &down_pstate =
*((ExUdrPrivateState *) downEntry->pstate);
switch (down_pstate.step_)
{
case CANCEL_BEFORE_SEND:
insertUpQueueEntry(ex_queue::Q_NO_DATA);
break;
case PRODUCE_ERROR_REPLY:
// An error occurred before the entry could be sent, probably
// during expression evaluation. If any diags were generated
// they were placed in the down entry ATP and will be copied
// to the up entry ATP by the following function.
insertUpQueueEntry(ex_queue::Q_SQLERROR);
down_pstate.step_ = PRODUCE_EOD_AFTER_ERROR;
break;
case PRODUCE_EOD_AFTER_ERROR:
{
// consume any remaining rows from the children, before
// producing the EOD
for (Int32 i=0; i<numChildren(); i++)
if (tmudfStates_[i] == READING_FROM_CHILD)
{
// remove child up queue entries until we see an EOD
while (!qChild_[i].up->isEmpty() &&
tmudfStates_[i] == READING_FROM_CHILD)
{
ex_queue_entry * centry = qChild_[i].up->getHeadEntry();
ex_queue::up_status child_status = centry->upState.status;
qChild_[i].up->removeHead();
if (child_status == ex_queue::Q_NO_DATA)
{
tmudfStates_[i] = INITIAL;
down_pstate.numEodsFromChildTcbs_++;
}
}
}
if (down_pstate.numEodsFromChildTcbs_ == numChildren())
// finish up error with an EOD
insertUpQueueEntry(ex_queue::Q_NO_DATA);
else
return WORK_OK;
}
break;
case STARTED:
case CANCEL_AFTER_SEND:
{
UdrDataBuffer *repBuf;
UdrDebug0(" [TMUDF WORK checkreceive] about to call getReplyBuffer()");
repBuf = getReplyBuffer();
if (repBuf && replyBufferIsEmpty() &&
!(down_pstate.numEodsFromChildTcbs_ ==
myTdb().numChildren()))
{
//check if this is a reply to indicate that we can read
// more from child tcbs.
// if TRUE
// {
// setUdrTcbState(READ_TABLE_INPUT_FROM_CHILD);
// releaseReplyBuffer();
//
// done = TRUE;
// }
if (repBuf->sendMoreData())
{
if ( down_pstate.numEodsFromChildTcbs_ ==1)
{
UdrDebug0(" !!!! ERROR EOD sent already 1 !!!!");
}
//see if udrserver has specified any table index
if(repBuf->tableIndex() != -1)
{
down_pstate.currentChildTcbIndex_ =
repBuf->tableIndex();
// change state_ from WORK_IO_ACTIVE to
setUdrTcbState(READ_TABLE_INPUT_FROM_CHILD);
}
else
{
ex_assert(FALSE,"Udrserver has not specified a valid table index");
}
releaseReplyBuffer();
done = TRUE;
}
}
else if (repBuf)
{
// Now we can return a single up queue entry. If the up
// queue entry is Q_NO_DATA then inside this call to
// returnSingleRow() the head of the down queue will be
// popped.
// the state will remain WORK_IO_ACTIVE at this point
// this ensures that we don't send more data yet but will
// call continueRequest instead.
result = returnSingleRow();
if (replyBufferIsEmpty())
{
//check if this is a reply to indicate that we can read
// more from child tcbs.
// if TRUE
// {
// setUdrTcbState(READ_TABLE_INPUT_FROM_CHILD);
// releaseReplyBuffer();
//
// done = TRUE;
// }
if (repBuf->sendMoreData() &&
!(down_pstate.numEodsFromChildTcbs_ ==
myTdb().numChildren()) )
{
//see if udrserver has specified any table index
if(repBuf->tableIndex() != -1)
{
down_pstate.currentChildTcbIndex_ =
repBuf->tableIndex();
// change state_ from WORK_IO_ACTIVE to
setUdrTcbState(READ_TABLE_INPUT_FROM_CHILD);
}
else
{
ex_assert(FALSE,"Udrserver has sepcified invalid table index");
}
done = TRUE;
releaseReplyBuffer();
}
else
{
// It is possible that we are in the WORK_IO_ACTIVE
// state and conditions to transition back to WORK have
// now been met. For example we might have just finished
// processing all down queue entries that have been sent
// to the server. The following method checks the
// conditions and transitions to WORK if
// appropriate. See commentary in the method for more
// detail.
releaseReplyBuffer();
if (dataStream_->numOfInputBuffers() > 0 )
{
done = FALSE;
UdrDebug0("Received next reply buffer while processing one stay in this method");
}
attemptTransitionToWorkState();
}//else
}// if replyBufferIsEmpty
}// else if repBuf
else
{
// check for errors from getReplyBuffer()
if (getStatementDiags() &&
getStatementDiags()->mainSQLCODE() < 0)
{
insertUpQueueEntry(ex_queue::Q_SQLERROR, getStatementDiags());
down_pstate.step_ = PRODUCE_EOD_AFTER_ERROR;
}
else
done = TRUE;
}
}
break;
case NOT_STARTED:
default:
ex_assert(FALSE, "Invalid step value in TCB down queue");
break;
} // switch (down_pstate.step_)
} // if (getIpcBroken()) else
} // while (!done && result == WORK_OK && outstanding requests ... )
// if there another reply buffer already in the data stream schedule this
// again
return result;
}
ExWorkProcRetcode ExUdrTcb::tmudfCheckSend()
{
return buildAndSendTmudfInput();
}
ExWorkProcRetcode ExUdrTcb::buildAndSendTmudfInput()
{
UdrDebug1(" [WORK] BEGIN ExUdrTcb::buildAndSendTmudfInput() 0x%08x",
this);
ExWorkProcRetcode result = WORK_OK;
NABoolean done = FALSE;
ULng32 numErrors = 0;
ULng32 numCancels = 0;
NABoolean sendSpaceAvailable =TRUE;
// Determine the transid we should send in data messages. We
// use the statement transid if all the following are true
// a. the TDB says we require a transaction
// b. this is NOT a CALL that returns result sets
// c. this is NOT a result set
//
// Note: (b) and (c) are both indicated by rsInfo_ == NULL
//
Int64 &stmtTransId = myExeStmtGlobals()->getTransid();
Int64 transIdToSend = -1; // -1 is an invalid transid
if (dataRequestsAreTransactional())
transIdToSend = stmtTransId;
// Process entries from the down queue. nextToSend_
// will be incremented inside the loop following successful
// insertion of a request row into the request buffer.
// This method will handle a sequence of actions until
// we reach the WORK_IO_ACTIVE state:
//
// WORK:
// - if a down queue entry exists, insert it into request buffer
// - state_ ==> SCALAR_INPUT_READY_TO_SEND
// - other possible subsequent states: WORK (for a cancel request)
// SCALAR_INPUT_READY_TO_SEND:
// - send a message to the UDR server
// - state ==> WORK_IO_ACTIVE
// ...
// READ_TABLE_INPUT_FROM_CHILD:
// - send a down queue request to child, if not done already
// - make sure we have an allocated buffer to store data
// returned from the child, to send to udrserv
// - state ==> RETURN_ROWS_FROM_CHILD
// RETURN_ROWS_FROM_CHILD:
// - If we have some rows and child up queue is empty,
// send the buffer to udrserv
// - Otherwise, try to read more rows
// - state_ ==> CHILD_INPUT_READY_TO_SEND
// CHILD_INPUT_READY_TO_SEND:
// - send message to udrserv
// - state_ ==> WORK_IO_ACTIVE
while (result == WORK_OK &&
!done &&
state_ != WORK_IO_ACTIVE &&
(state_ != WORK || qParent_.down->entryExists(nextToSend_)))
{
// Two early tests to see if we should get out of the outer while
// loop. First make sure a server is running then attempt to
// allocate a request buffer.
//
// 1. Make sure we have a data stream and a running server
if (dataStream_ == NULL || !verifyUdrServerProcessId())
{
result = WORK_BAD_ERROR;
break;
}
queue_index parentQueueIndex;
if (state_ == WORK)
{
// point to the next entry to be sent
parentQueueIndex = nextToSend_;
}
else
{
// point to the head entry we are already working on
parentQueueIndex = qParent_.down->getHeadIndex();
}
ex_queue_entry *downEntry = qParent_.down->getQueueEntry(parentQueueIndex);
const ex_queue::down_request request = downEntry->downState.request;
const Lng32 value = downEntry->downState.requestValue;
ExUdrPrivateState &pstate = *((ExUdrPrivateState *) downEntry->pstate);
switch (state_)
{
case LOAD_FAILED:
result = WORK_BAD_ERROR;
break;
case WORK :
{
// Allocate a request buffer on the message stream heap. Make
// sure stream buffer limits have not been reached. Break out of
// the outer while loop if the allocation fails.
if (requestBuffer_ == NULL)
{
// Free up any receive buffers no longer in use
dataStream_->cleanupBuffers();
// Now test the stream buffer limits. If the send queue is full
// we return WORK_OK and let the operator wake up again when I/O
// completes. If other limits have been reached we return
// WORK_POOL_BLOCKED which causes the operator to be scheduled
// again right away.
if (dataStream_->sendLimitReached())
{
UdrDebug0(" [TMUDF WORK] Cannot allocate request buffer");
UdrDebug0(" sendLimitReached() returned TRUE");
printDataStreamState();
result = WORK_OK;
}
else if (dataStream_->inUseLimitReached())
{
UdrDebug0(" [TMUDF WORK] Cannot allocate request buffer");
UdrDebug0(" inUseLimitReached() returned TRUE");
printDataStreamState();
result = WORK_POOL_BLOCKED;
}
else
{
requestBuffer_ = getRequestBuffer();
if (requestBuffer_ == NULL)
{
result = WORK_POOL_BLOCKED;
printDataStreamState();
}
}
} // if (requestBuffer_ == NULL)
if (requestBuffer_ == NULL)
break;
// get ready to put scalar values from down request into the request
// buffer
NABoolean anyErrors = FALSE;
SqlBuffer *sqlBuf = requestBuffer_->getSqlBuffer();
Lng32 numTuppsBefore = sqlBuf->getTotalTuppDescs();
ex_assert(sqlBuf, "UDR request buffer is corrupt or contains no data");
ULng32 numRequestsAdded = 0;
switch (request)
{
case ex_queue::GET_N:
case ex_queue::GET_ALL:
{
// First step is to allocate space in the SqlBuffer. After
// that a move expression will be evaluated to copy data into
// the buffer. If the expression results in errors then we need
// to back-out the buffer allocation, which may have been a
// single data tupp or possibly control + data. So that the
// correct number are backed out, we will remember how many
// tupps are in the buffer before the allocation takes place.
tupp_descriptor *scalarDesc;
if (sqlBuf->moveInSendOrReplyData(
TRUE, // [IN] sending? (vs. replying)
FALSE, // [IN] force move of control info?
TRUE, // [IN] move data?
&(downEntry->downState), // [IN] queue state
sizeof(ControlInfo), // [IN] length of ControlInfo area
NULL, // [OUT] new ControlInfo area
myTdb().getRequestRowLen(), // [IN] data row length
&scalarDesc, // [OUT] new data tupp_descriptor
NULL, // [IN] diags area
NULL // [OUT] new diags tupp_descriptor
) == SqlBuffer::BUFFER_FULL)
{
// No more space in request buffer. The assertion here says that
// we have at least inserted one row into the message buffer.
ex_assert(numRequestsAdded > 0,
"Request buffer not large enough to hold a single row");
sendSpaceAvailable = FALSE;
}
else
{
ex_expr::exp_return_type expStatus = ex_expr::EXPR_OK;
if (myTdb().getInputExpression())
{
workAtp_->getTupp(myTdb().getRequestTuppIndex()) = scalarDesc;
// Evaluate the input expression. If diags are generated they
// will be left in the down entry ATP to be returned to the parent.
expStatus =
myTdb().getInputExpression()->eval(downEntry->getAtp(),
workAtp_);
#ifdef UDR_DEBUG
if (expStatus != ex_expr::EXPR_OK)
UdrDebug1(" [TMUDF WORK] scalar inputExpr.eval() returned %s",
GetExpStatusString(expStatus));
#endif
workAtp_->getTupp(myTdb().getRequestTuppIndex()).release();
if (expStatus == ex_expr::EXPR_ERROR)
{
while (sqlBuf->getTotalTuppDescs() > numTuppsBefore)
{
sqlBuf->remove_tuple_desc();
}
if (insertUpQueueEntry(ex_queue::Q_SQLERROR))
{
pstate.step_ = ExUdrTcb::PRODUCE_EOD_AFTER_ERROR;
numErrors++;
nextToSend_++;
}
}
} // if (myTdb().getInputExpression())
if (expStatus != ex_expr::EXPR_ERROR)
{
pstate.step_ = ExUdrTcb::STARTED;
numRequestsAdded++;
// Note: this will prevent us from adding
// a second request from the parent down queue,
// so we process those one at a time for now
setUdrTcbState(SCALAR_INPUT_READY_TO_SEND);
nextToSend_++;
}
} // if (no space in the request buffer) else
} // case GET_N, GET_ALL
break;
case ex_queue::GET_NOMORE:
{
pstate.step_ = ExUdrTcb::CANCEL_BEFORE_SEND;
numCancels++;
nextToSend_++;
}
break;
case ex_queue::GET_EMPTY:
case ex_queue::GET_EOD:
default:
{
ex_assert(FALSE, "Invalid request type in TCB down queue");
}
break;
} // switch (request)
} // case work
break;
case SCALAR_INPUT_READY_TO_SEND:
{
SqlBuffer *sqlBuf = requestBuffer_->getSqlBuffer();
if (sqlBuf->getTotalTuppDescs() > 0)
{
#ifdef UDR_DEBUG
// Print a message if tracing is enabled or the
// UDR_CONTINUE_DEBUG environment variable is set
if (doTrace_ ||
doIpcTrace_ ||
(getenv("UDR_CONTINUE_DEBUG") != NULL))
{
char stmtTx[256];
char msgTx[256];
TransIdToText(stmtTransId, stmtTx, 256);
TransIdToText(transIdToSend, msgTx, 256);
UdrPrintf(traceFile_ ? traceFile_ : stdout,
" [TMUDF WORK] Sending %s, %d tupps, msg tx %s",
"INVOKE",
(Lng32) sqlBuf->getTotalTuppDescs(), msgTx);
}
#endif
// set the scalar flag to indicate we're sending a scalar input
requestBuffer_->setSendingScalarValues(TRUE);
sqlBuf->drivePack();
setUdrTcbState(WORK_IO_ACTIVE);
if (getStatsEntry() && getStatsEntry()->castToExUDRStats())
{
getStatsEntry()->castToExUDRStats()->
bufferStats()->sentBuffers().addEntry(sizeof(*requestBuffer_) +
sqlBuf->get_used_size());
getStatsEntry()->castToExUDRStats()->
bufferStats()->totalSentBytes() += sqlBuf->getSendBufferSize();
}
if (dataRequestsAreTransactional())
myExeStmtGlobals()->incrementUdrTxMsgsOut();
else
myExeStmtGlobals()->incrementUdrNonTxMsgsOut();
dataStream_->sendRequest(transIdToSend);
dataMsgsSent_++;
UdrDebug0(" [WORK] The data stream has returned control to the TCB");
// Now that sendRequest() has completed the request buffer is in
// packed form and owned by the stream. This TCB should not
// access the request buffer again so we release it now.
releaseRequestBuffer();
UdrDebug0(" [WORK] Done sending the scalarrequest buffer");
} // if (sqlBuf->getTotalTuppDescs() > 0)
}
break;
case READ_TABLE_INPUT_FROM_CHILD :
{
// read rows from child specified in the reply from server
// if nothing returned from any child. Get outta here.
Int32 currChild = pstate.currentChildTcbIndex_;
UdrDebug1("TMUDF READING FROM CHILD %lu", currChild);
// If this table's buffer is empty, allocate it on the stream
if (childInputBuffers_[currChild] == NULL)
{
// Free up any receive buffers no longer in use
dataStream_->cleanupBuffers();
// Now test the stream buffer limits. If the send queue is full
// we return WORK_OK and let the operator wake up again when I/O
// completes. If other limits have been reached we return
// WORK_POOL_BLOCKED which causes the operator to be scheduled
// again right away.
if (dataStream_->sendLimitReached())
{
UdrDebug0(" [TMUDF WORK] Cannot allocate request buffer");
UdrDebug0(" sendLimitReached() returned TRUE");
printDataStreamState();
return WORK_OK;
}
else if (dataStream_->inUseLimitReached())
{
UdrDebug0(" [TMUDF WORK] Cannot allocate request buffer");
UdrDebug0(" inUseLimitReached() returned TRUE");
printDataStreamState();
return WORK_POOL_BLOCKED;
}
else
{
childInputBuffers_[currChild] = getRequestBuffer();
if (childInputBuffers_[currChild] == NULL)
{
return WORK_POOL_BLOCKED;
printDataStreamState();
}
}
}
// pass the parent request to the child downqueue, if not already done
if (tmudfStates_[currChild] != READING_FROM_CHILD)
if (!qChild_[currChild].down->isFull())
{
ex_queue_entry * centry = qChild_[currChild].down->getTailEntry();
if (request == ex_queue::GET_N)
centry->downState.request = ex_queue::GET_ALL;
else
centry->downState.request = request;
centry->downState.requestValue = downEntry->downState.requestValue;
centry->downState.parentIndex = parentQueueIndex;
// set the child's input atp
centry->passAtp(downEntry->getAtp());
qChild_[currChild].down->insert();
tmudfStates_[currChild] = READING_FROM_CHILD;
}
else
// couldn't pass request to child, return
return WORK_OK;
// we are now ready to read (more) rows from this child
setUdrTcbState(RETURN_ROWS_FROM_CHILD);
}// case READ_TABLE_INPUT_FROM_CHILD
break;
case RETURN_ROWS_FROM_CHILD :
{
// Each child will fill it's own dedicated buffer
// if we fill up a buffer, send it
// and get out of this while loop
// OR if we reach EOD on one particular table input, just send
// whatever we have in that buffer. This is because each tables
// input should reach in it's own buffer
Int32 currChild = pstate.currentChildTcbIndex_;
SqlBuffer *childSqlBuf = childInputBuffers_[currChild]->getSqlBuffer();
ex_assert(childSqlBuf, "UDR childinput buffer is missing");
if ( (qChild_[currChild].up->isEmpty()))
{
// if we have a partially filled buffer send it to the udrserver
if (childInputBuffers_[currChild])
{
if (childInputBuffers_[currChild]->getSqlBuffer()->
getTotalTuppDescs() > 0)
{
childInputBuffers_[currChild]->setTableIndex(currChild);
setUdrTcbState(CHILD_INPUT_READY_TO_SEND);
break;
}
}
// if the buffer is empty we can't send anything
// go back to the scheduler
return WORK_OK;
}
ex_queue_entry * centry = qChild_[currChild].up->getHeadEntry();
ex_queue::up_status child_status = centry->upState.status;
switch(child_status)
{
case ex_queue::Q_OK_MMORE:
{
// First step is to allocate space in the SqlBuffer.
// After that a move expression will be evaluated to
// copy data into the buffer. If the expression results
// in errors then we need to back-out the buffer
// allocation, which may have been a single data tupp
// or possibly control + data. So that the correct
// number are backed out, we will remember how many
// tupps are in the buffer before the allocation takes
// place.
Lng32 numTuppsBefore = childSqlBuf->getTotalTuppDescs();
UdrDebug0(" Received OK_MMORE from child");
tupp_descriptor *dataDesc=NULL;
NABoolean processedUpEntry = TRUE;
if (childSqlBuf->moveInSendOrReplyData(
TRUE, // [IN] sending? (vs. replying)
FALSE, // [IN] force move of control info?
TRUE, // [IN] move data?
&(centry->upState), // [IN] queue state
sizeof(ControlInfo), // [IN] length of ControlInfo area
NULL, // [OUT] new ControlInfo area
myTdb().getTableDescInfo(currChild)->getRowLength(), // [IN] data row length
&dataDesc, // [OUT] new data tupp_descriptor
NULL, // [IN] diags area
NULL // [OUT] new diags tupp_descriptor
)
== SqlBuffer::BUFFER_FULL)
{
// No more space in request buffer. The assertion here
//says that we have at least inserted one row into
// the message buffer.
ex_assert(childSqlBuf->getTotalTuppDescs()
> 0,
"Request buffer not large enough to hold a single row");
UdrDebug0("No more space in send buffer");
sendSpaceAvailable = FALSE;
processedUpEntry = FALSE;
}
else
{
ex_expr::exp_return_type expStatus = ex_expr::EXPR_OK;
if (myTdb().getChildInputExpr(currChild))
{
UInt32 childTuppIndex =
myTdb().getTableDescInfo(currChild)->
getOutputTuppIndex();
workAtp_->getTupp(childTuppIndex)
= dataDesc;
// Evaluate the input expression.
expStatus =
myTdb().getChildInputExpr(currChild)->
eval(centry->getAtp(),workAtp_);
#ifdef UDR_DEBUG
if (expStatus != ex_expr::EXPR_OK)
UdrDebug1(" [TMUDF WORK] childInputExpr.eval() returned %s",
GetExpStatusString(expStatus));
#endif
// SS TBD change to getChildTuppIndex
workAtp_->getTupp(childTuppIndex).release();
if (expStatus == ex_expr::EXPR_ERROR)
{
while (childSqlBuf->
getTotalTuppDescs() > numTuppsBefore)
{
childSqlBuf->remove_tuple_desc();
}
if (insertUpQueueEntry(ex_queue::Q_SQLERROR,
centry->getDiagsArea()))
{
pstate.step_ = ExUdrTcb::PRODUCE_EOD_AFTER_ERROR;
numErrors++;
}
else
{
processedUpEntry = FALSE;
}
}
} // if (myTdb().getChildInputExpression())
if (expStatus != ex_expr::EXPR_ERROR)
pstate.step_ = ExUdrTcb::STARTED;
}//else
if (sendSpaceAvailable && processedUpEntry)
{
qChild_[currChild].up->removeHead();
}
else if (!sendSpaceAvailable)
{
// set the tableindex in the header
UdrDebug1("Ready to send child input buffer, numtupps = %d", childSqlBuf->getTotalTuppDescs());
childInputBuffers_[currChild]->setTableIndex(currChild);
setUdrTcbState(CHILD_INPUT_READY_TO_SEND);
}
}
break;
case ex_queue::Q_NO_DATA:
{
// indicate that this is the last buffer for this table
childInputBuffers_[currChild]->setLastBuffer(TRUE);
UdrDebug1("EOD reached for child input %u", currChild);
//set the tableindex in the buffer.
childInputBuffers_[currChild]->setTableIndex(currChild);
pstate.numEodsFromChildTcbs_++;
UdrDebug1(" NUm Child EOD's so far = %u",
pstate.numEodsFromChildTcbs_);
qChild_[currChild].up->removeHead();
tmudfStates_[currChild] = INITIAL;
setUdrTcbState(CHILD_INPUT_READY_TO_SEND);
}
break;
case ex_queue::Q_SQLERROR:
{
// pass the diags are on to the parent if there is space in
// the up queue
if (insertUpQueueEntry(ex_queue::Q_SQLERROR,
centry->getDiagsArea()))
{
pstate.step_ = ExUdrTcb::PRODUCE_EOD_AFTER_ERROR;
qChild_[currChild].up->removeHead();
numErrors++;
}
else
{
// come back later, when the up queue has room
done = TRUE;
}
}
break;
case ex_queue::Q_INVALID:
ex_assert(0,"ExUdrTcb::tmdudfWork() Invalid state returned by child");
break;
} // switch
}
break;
case CHILD_INPUT_READY_TO_SEND:
{
Int32 currChild = pstate.currentChildTcbIndex_;
//send request in buffer
SqlBuffer *sqlBuf = childInputBuffers_[currChild]->getSqlBuffer();
if (sqlBuf->getTotalTuppDescs() > 0)
{
#ifdef UDR_DEBUG
// Print a message if tracing is enabled or the
// UDR_CONTINUE_DEBUG environment variable is set
if (doTrace_ ||
doIpcTrace_ ||
(getenv("UDR_CONTINUE_DEBUG") != NULL))
{
char stmtTx[256];
char msgTx[256];
TransIdToText(stmtTransId, stmtTx, 256);
TransIdToText(transIdToSend, msgTx, 256);
UdrPrintf(traceFile_ ? traceFile_ : stdout,
" [TMUDF WORK] Sending %s, %d tupps, msg tx %s",
"CHILD TABLE INPUT",
(Lng32) sqlBuf->getTotalTuppDescs(), msgTx);
}
#endif
sqlBuf->drivePack();
if (getStatsEntry() && getStatsEntry()->castToExUDRStats())
{
getStatsEntry()->castToExUDRStats()->
bufferStats()->sentBuffers().
addEntry(sizeof(*requestBuffer_) +
sqlBuf->get_used_size());
getStatsEntry()->castToExUDRStats()->
bufferStats()->totalSentBytes() +=
sqlBuf->getSendBufferSize();
}
if (dataRequestsAreTransactional())
myExeStmtGlobals()->incrementUdrTxMsgsOut();
else
myExeStmtGlobals()->incrementUdrNonTxMsgsOut();
dataStream_->sendRequest(transIdToSend);
dataMsgsSent_++;
UdrDebug0(" [TMUDF WORK] The data stream has returned control to the TCB");
UdrDebug1(" [TMUDF WORK] Done sending the request buffer for child %u", currChild);
} // if (sqlBuf->getTotalTuppDescs() > 0)
// Now that sendRequest() has completed the request buffer is in
// packed form and owned by the stream. This TCB should not
// access the request buffer again so we release it now.
releaseChildInputBuffer(currChild);
setUdrTcbState(WORK_IO_ACTIVE);
}
break;
default :
{
// other states cause us to return back to the caller
done = TRUE;
}
break;
}// switch(state_)
} // while
if (result == WORK_OK)
{
if (getIpcBroken())
{
result = WORK_BAD_ERROR;
}
}
//
// If errors were encountered then there may be down queue
// entries that were not sent but still need to be processed.
// Changing the return code to WORK_CALL_AGAIN makes sure the
// scheduler redrives this TCB.
//
if (result == WORK_OK && (numErrors > 0 || numCancels > 0))
{
result = WORK_CALL_AGAIN;
}
return result;
}
#ifdef UDR_DEBUG
void ExUdrTcb::initializeDebugVariables()
{
// First close the old trace file. Not sure if it is safe to call
// fclose on stdout or stderr so we will guard against that.
if (traceFile_ && (traceFile_ != stdout) && (traceFile_ != stderr))
{
fclose(traceFile_);
}
doTrace_ = (getenv("UDR_""DEBUG") != NULL);
doStateTrace_ = (doTrace_ || getenv("UDR_STATE_DEBUG"));
doIpcTrace_ = (doTrace_ || getenv("UDR_IPC_DEBUG"));
traceFile_ = NULL;
// Now see if the name of a trace file is specified in the
// environment. If not the trace will go to stdout.
if (doTrace_ || doStateTrace_ || doIpcTrace_)
{
FILE *newTraceFile = NULL;
const char *traceFileName = getenv("UDR_""DEBUG_FILE");
if (traceFileName && traceFileName[0])
{
newTraceFile = fopen(traceFileName, "a");
}
traceFile_ = (newTraceFile ? newTraceFile : stdout);
}
// ESP debugging . uncomment the following block of code
/*
// begin esp debugging
doTrace_ = TRUE;
long numInstances = 0;
long instanceNum =0;
myExeStmtGlobals()->getMyNodeLocalInstanceNumber(instanceNum,numInstances);
char esptraceFileName[30] = "";
char *logdir = "C:/temp"; //change if needed
sprintf(esptraceFileName, "%s/espudrtrace.log%d",logdir,instanceNum);
traceFile_ = fopen(esptraceFileName,"a");
*/
// end esp debugging
// Integrity checks in incoming reply messages can be disabled in
// the debug build by an environment setting.
trustReplies_ = (getenv("UDR_TRUST_REPLIES") ? TRUE : FALSE);
}
void ExUdrTcb::printDataStreamState()
{
if (dataStream_ == NULL)
{
UdrDebug0(" Data stream is NULL");
}
else
{
UdrDebug6(" Data stream: "
"inUse %d, in %d, out %d, send %d, replyTag %d, pending %d",
(Int32) dataStream_->numOfInUseBuffers(),
(Int32) dataStream_->numOfInputBuffers(),
(Int32) dataStream_->numOfOutputBuffers(),
(Int32) dataStream_->numOfSendBuffers(),
(Int32) dataStream_->numOfReplyTagBuffers(),
(Int32) dataStream_->numOfResponsesPending());
}
}
#endif
void ExUdrTcb::addIntegrityCheckFailureToDiagsArea(ComDiagsArea *diags) const
{
ex_assert(diags, "Should not call this function with a NULL diags pointer");
if (!ProcessIdIsNull(serverProcessId_))
{
*diags << DgSqlCode(-2037);
myIpcEnv()->getMyOwnProcessId(IPC_DOM_GUA_PHANDLE).
addProcIdToDiagsArea(*diags, 0);
serverProcessId_.addProcIdToDiagsArea(*diags, 1);
}
*diags << DgSqlCode(-EXE_UDR_INVALID_OR_CORRUPT_REPLY);
}
void ExUdrTcb::tmudfCancelChildRequests(queue_index parentIndex)
{
for (int i=0; i<numChildren(); i++)
{
if (tmudfStates_[i] == READING_FROM_CHILD)
qChild_[i].down->cancelRequestWithParentIndex(parentIndex);
}
}
NABoolean ExUdrTcb::validateDataRow(const tupp &replyTupp,
ComDiagsArea *&diags)
{
const ExUdrTdb &tdb = myTdb();
ComUInt32 numOutParams = tdb.getNumOutputValues();
if (numOutParams == 0)
return TRUE;
NABoolean result = TRUE;
ComUInt32 numParams = tdb.getNumParams();
ComUInt32 numInParams = tdb.getNumInputValues();
char *dataPtr = replyTupp.getDataPointer();
ComUInt32 dataSize = replyTupp.getAllocatedSize();
// We walk the parameter array in one of two ways:
// * if this is a stored procedure, look at every element
// * otherwise jump past the input parameters
NABoolean isProcedure =
(tdb.getUdrType() == COM_PROCEDURE_TYPE ? TRUE : FALSE);
ComUInt32 outPosition = 0;
ComUInt32 startIndex = 0;
if (!isProcedure)
startIndex = numInParams;
for (ComUInt32 i = startIndex; result && i < numParams; i++)
{
const UdrFormalParamInfo &formal = *tdb.getFormalParamInfo(i);
if (formal.isOut())
{
Attributes &actual = *(tdb.getReplyAttr(outPosition));
ComSInt16 fsType = formal.getType();
ComSInt16 nullValue = 0;
// Check for a valid null indicator. Null indicators returned
// from MXUDR are 2 bytes. Valid null indicator values are 0 and
// -1.
if (result && actual.getNullFlag())
{
const char *nullIndPtr = dataPtr + actual.getNullIndOffset();
nullValue = *((ComSInt16 *) nullIndPtr);
if (nullValue != 0 && nullValue != -1)
{
if (diags == NULL)
diags = ComDiagsArea::allocate(getHeap());
Lng32 pos = (Lng32) (isProcedure ? (i + 1) : (outPosition + 1));
*diags << DgSqlCode(-EXE_UDR_INVALID_DATA)
<< DgString0(myTdb().getSqlName())
<< DgInt0(pos)
<< DgString1("Invalid null indicator");
result = FALSE;
}
}
// do not check for varchar length when the value is null as the varchar length could be garbage/uninitialized
if (nullValue != -1 && result && DFS2REC::isAnyVarChar(fsType))
{
// Check for valid VARCHAR length
const char *vcIndPtr = dataPtr + actual.getVCLenIndOffset();
ComUInt32 vcLen = actual.getLength((char *) vcIndPtr);
if (vcLen > (ComUInt32) actual.getLength())
{
if (diags == NULL)
diags = ComDiagsArea::allocate(getHeap());
char msg[100];
str_sprintf(msg, "VARCHAR length should not exceed %d",
(Int32) actual.getLength());
Lng32 pos = (Lng32) (isProcedure ? (i + 1) : (outPosition + 1));
*diags << DgSqlCode(-EXE_UDR_INVALID_DATA)
<< DgString0(myTdb().getSqlName())
<< DgInt0(pos)
<< DgString1(msg);
result = FALSE;
}
}
if (result && DFS2REC::isDoubleCharacter(fsType))
{
// Check for valid UCS2 characters
const char *source = dataPtr + actual.getOffset();
ComUInt32 sourceLen = actual.getLength();
if (DFS2REC::isAnyVarChar(fsType))
sourceLen = actual.getLength(dataPtr + actual.getVCLenIndOffset());
ComUInt32 sourceChars = sourceLen / 2;
if (CharInfo::checkCodePoint((wchar_t *) source, (Int32) sourceChars,
CharInfo::UNICODE) == FALSE)
{
if (diags == NULL)
diags = ComDiagsArea::allocate(getHeap());
Lng32 pos = (Lng32) (isProcedure ? (i + 1) : (outPosition + 1));
*diags << DgSqlCode(-EXE_UDR_INVALID_DATA)
<< DgString0(myTdb().getSqlName())
<< DgInt0(pos)
<< DgString1("Invalid UCS2 character");
result = FALSE;
}
}
// Advance the output parameter position
outPosition++;
} // if (formal.isOut())
} // for each param
return result;
}
NABoolean ExUdrTcb::serverResourcesAreLoaded() const
{
NABoolean result = FALSE;
if (state_ == WORK || state_ == WORK_IO_ACTIVE ||
state_== READ_TABLE_INPUT_FROM_CHILD ||
state_ == RETURN_ROWS_FROM_CHILD )
{
result = TRUE;
}
return result;
}
NABoolean ExUdrTcb::dataRequestsAreTransactional() const
{
// Data requests are transactional if all the following are true
// a. the TDB says we require a transaction
// b. this is NOT a CALL that returns result sets
// c. this is NOT a result set
//
// Note: (b) and (c) are both indicated by rsInfo_ == NULL
//
NABoolean result = FALSE;
if (myTdb().getTransactionAttrs() == COM_TRANSACTION_REQUIRED &&
rsInfo_ == NULL)
result = TRUE;
return result;
}
//----------------------------------------------------------------------
// class ExUdrPrivateState
//----------------------------------------------------------------------
ExUdrPrivateState::ExUdrPrivateState()
{
init();
}
void ExUdrPrivateState::init()
{
step_ = ExUdrTcb::NOT_STARTED;
matchCount_ = 0;
numEodsFromChildTcbs_= 0;
currentChildTcbIndex_ = -1; // no table input
}
ExUdrPrivateState::~ExUdrPrivateState()
{
}
#pragma warn(262) // warning elimination