blob: 49d5bb10518e426edf08ca348857bc8fd27de471 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: ExPackedRows.cpp
* Description: Methods for the tdb and tcb of a UNPACKROWS operation
*
*
* Created: 6/19/97
* Language: C++
*
*
*
*
******************************************************************************
*/
//
// This file contains all the executor methods associated
// with a unPackRows operator
//
#include "ExPackedRows.h"
#include "ExSimpleSqlBuffer.h"
#include "ex_error.h"
#include "ExpError.h"
//////////////////////////////////////////////////////////////////////////////
//
// TDB procedures
//
//////////////////////////////////////////////////////////////////////////////
// ExUnPackRowsTdb::build() --------------------------------------------
// Build the TCB tree for this node and all nodes below.
//
// Parameters
//
// ex_globals *glob - contains references to global executor information,
// notably the space object used to allocate objects.
//
// Returns - returns the TCB tree build for this node and all nodes below.
//
ex_tcb *
ExUnPackRowsTdb::build(ex_globals *glob)
{
// Build the Tcb tree below the unPackRows node.
//
ex_tcb *childTcb = NULL;
if (childTdb_)
childTcb = childTdb_->build(glob);
// Build the unPackRows Tcb, given the TDB.
//
ex_tcb *unPackRowsTcb = NULL;
if (rowwiseRowset())
unPackRowsTcb =
new(glob->getSpace()) ExUnPackRowwiseRowsTcb(*this, glob);
else
unPackRowsTcb =
new(glob->getSpace()) ExUnPackRowsTcb(*this, *childTcb, glob);
// add the unPackRowsTcb and all its work procedures to the schedule
//
unPackRowsTcb->registerSubtasks();
// Return the result to my parent.
//
return unPackRowsTcb;
}
/////////////////////////////////////////////////////////////////////////
//
// TCB procedures
//
/////////////////////////////////////////////////////////////////////////
// ExUnPackRowsTcb::ExUnPackRowsTcb() ------------------------------------
// Constructor for unPackRows TCB
//
ExUnPackRowsTcb::ExUnPackRowsTcb(const ExUnPackRowsTdb &unPackRowsTdbLocal,
const ex_tcb &childTcb,
ex_globals *glob) :
ex_tcb(unPackRowsTdbLocal, 1, glob)
{
childTcb_ = &childTcb;
CollHeap *space = glob->getSpace();
// Allocate the buffer pool. Allocate a buffer big enough to hold
// twice the size of my upqueue.
//
pool_ = new(space) ExSimpleSQLBuffer(unPackRowsTdb().queueSizeUp_ * 2,
unPackRowsTdb().unPackColsTupleLen_,
space);
// get the queue used by my child to communicate with me
//
childQueue_ = childTcb_->getParentQueue();
// Allocate the queue to communicate with parent
// (Child allocates queue to communicate with Child)
//
qParent_.down = new(space) ex_queue(ex_queue::DOWN_QUEUE,
unPackRowsTdb().queueSizeDown_,
unPackRowsTdb().criDescDown_,
space);
// Allocate the private state in each entry of the down queue
//
ExUnPackRowsPrivateState privateState(this);
qParent_.down->allocatePstate(&privateState, this);
// Initialize processedInputs_ to refer to the queue entry
// which will be used next.
//
processedInputs_ = qParent_.down->getHeadIndex();
// Allocate a queue to communicate with the parent node.
//
qParent_.up = new(space) ex_queue(ex_queue::UP_QUEUE,
unPackRowsTdb().queueSizeUp_,
unPackRowsTdb().criDescUp_,
space);
// fixup all expressions expressions
//
if(unPackColsExpr())
unPackColsExpr()->fixup(0,
getExpressionMode(), this,
glob->getSpace(),
glob->getDefaultHeap(), FALSE, glob);
// Initialize the workAtp_ and the Tupp used to hold the
// index value. This index value is an integer local to the
// workUp method.
//
indexValueTuppDesc_.init(4, 0, (char *)&indexValue_);
indexValueTupp_.init();
indexValueTupp_ = &indexValueTuppDesc_;
workAtp_ = allocateAtp(unPackRowsTdb().workCriDesc_,
glob->getSpace());
// Must always have a packingFactor expression
//
ex_assert(packingFactor(),
"ExUnPackRowsTcb::ExUnPackRowsTcb(): Internal Error");
packingFactor()->fixup(0,
getExpressionMode(), this,
glob->getSpace(),
glob->getDefaultHeap(), FALSE, glob);
// Initialize the numRowsAtp_ and the numRowsTupp. These are used
// to hold the result of the move expression (packingFactor()) used
// to extract the packing Factor from one of the packed rows.
//
numRowsTuppDesc_.init(4, 0, (char *)&numRows_);
numRowsTupp_.init();
numRowsTupp_ = &numRowsTuppDesc_;
numRowsAtp_ = allocateAtp(unPackRowsTdb().criDescUp_,
glob->getSpace());
}
// ExUnPackRowsTcb::~ExUnPackRowsTcb() ---------------------------------------
// Destructor for ExUnPackRowsTcb
// Free up the queues and the sql buffer and the child TCB tree.
//
ExUnPackRowsTcb::~ExUnPackRowsTcb()
{
delete qParent_.up;
delete qParent_.down;
freeResources();
}
// ExUnPackRowsTcb::freeResources() ----------------------------------------
// Free any run time resources.
// For unPackRows, this means freeing up the buffer pool.
// The queues are not freed here. (Should they be?).
// This method is call by the TCB destructor.
//
void ExUnPackRowsTcb::freeResources()
{
// Add this when space_ is added;
// if (workAtp_) {
// deallocateAtp(workAtp_, space_);
// workAtp_ = NULL;
// }
// if (numRowsAtp_) {
// deallocateAtp(numRowsAtp_, space_);
// numRowsAtp_ = NULL;
// }
if (pool_) {
delete pool_;
pool_ = NULL;
}
}
// ExUnPackRowsTcb::registerSubtasks() -------------------------------------
// Register all the unPackRows subtasks with the scheduler.
//
void ExUnPackRowsTcb::registerSubtasks()
{
ExScheduler *sched = getGlobals()->getScheduler();
// down queues are handled by workDown()
//
// Schedule this routine if a new entry is inserted into the
// parents down queue and we are waiting on an event.
// (had returned WORK_OK).
//
sched->registerInsertSubtask(sWorkDown, this, qParent_.down, "DN");
// Schedule this routine if the child's down queue changes from being
// full to being not full and we are waiting on an event.
// (had returned WORK_OK).
//
sched->registerUnblockSubtask(sWorkDown, this, childQueue_.down, "DN");
// Schedule this routine if a cancel request occurs on the
// parents down queue.
//
sched->registerCancelSubtask(sCancel, this, qParent_.down, "CN");
// up queues are handled by workUp()
//
// Schedule this routine if the parent's up queue changes from being
// full to being not full and we are waiting on an event.
// (had returned WORK_OK).
//
sched->registerUnblockSubtask(sWorkUp, this, qParent_.up,"UP");
// Schedule this routine if a new entry is inserted into the
// child's up queue and we are waiting on an event.
// (had returned WORK_OK).
//
sched->registerInsertSubtask(sWorkUp, this, childQueue_.up);
}
// ExUnPackRowsTcb::start() ------------------------------------------
// Starts up the next requested operation by sending
// requests to children. Child queue must have room!
// Called by workDown().
//
void
ExUnPackRowsTcb::start()
{
// Advance processedInputs_ by one, all checks need to be done
// by the caller. No down request is sent for a cancelled message.
//
ex_queue_entry *childEntry = childQueue_.down->getTailEntry();
ex_queue_entry *pEntryDown = qParent_.down->getQueueEntry(processedInputs_);
const ex_queue::down_request request = pEntryDown->downState.request;
ExUnPackRowsPrivateState &pState =
*((ExUnPackRowsPrivateState*)pEntryDown->pstate);
// pass request to children
//
childEntry->downState.request = ex_queue::GET_ALL;
childEntry->downState.requestValue = pEntryDown->downState.requestValue;
childEntry->downState.parentIndex = processedInputs_;
childEntry->passAtp(pEntryDown);
// no matches yet for this parent row
//
pState.matchCount_ = 0;
// Insert request in child queue.
//
childQueue_.down->insert();
// Record the state of this request.
//
pState.childState_ = STARTED_;
// If this is a CANCEL request, Cancel the request we just gave to
// our child.
//
if (request == ex_queue::GET_NOMORE) {
// immediately cancel the request (requests are already in
// cancelled state but the cancel callback isn't activated yet)
//
childQueue_.down->cancelRequestWithParentIndex(processedInputs_);
pState.childState_ = CANCELLED_;
// $$$$ should find a smarter way to do this
}
// We are now ready to look at the next request.
//
processedInputs_++;
}
// ExUnPackRowsTcb::stop() ----------------------------------------------
// All child rows have been returned.
// Now return an EOD indication to parent.
//
void ExUnPackRowsTcb::stop()
{
// Remove the head entries of the parent down queue and the
// child up queue. All checks are done by the caller.
//
ex_queue_entry *pEntryDown = qParent_.down->getHeadEntry();
ExUnPackRowsPrivateState &pState =
*((ExUnPackRowsPrivateState*) pEntryDown->pstate);
ex_queue_entry *pEntry = qParent_.up->getTailEntry();
pEntry->upState.status = ex_queue::Q_NO_DATA;
pEntry->upState.parentIndex = pEntryDown->downState.parentIndex;
pEntry->upState.downIndex = qParent_.down->getHeadIndex();
pEntry->upState.setMatchNo(pState.matchCount_);
// insert EOD into parent
//
qParent_.up->insert();
// consume the child row
//
childQueue_.up->removeHead();
// Reinitialize the state of this processed request.
// Pstate will be initialized for when this entry gets used again.
//
pState.init();
// this parent request has been processed.
//
qParent_.down->removeHead();
}
ExWorkProcRetcode
ExUnPackRowsTcb::processCancel()
{
// Check the down queue from the parent for cancellations. Propagate
// cancel requests and remove all requests that are completely cancelled.
// loop over all requests that have been sent down
// (the others are handled by start() later)
queue_index ix = qParent_.down->getHeadIndex();
while (ix != processedInputs_) {
ex_queue_entry *pEntryDown = qParent_.down->getQueueEntry(ix);
// check whether the current down request is cancelled
//
if (pEntryDown->downState.request == ex_queue::GET_NOMORE) {
// yes, cancel this down request if not already done so
//
ExUnPackRowsPrivateState &pState =
*((ExUnPackRowsPrivateState*) pEntryDown->pstate);
// cancel child requests if not already done so and if they
// aren't already finished
//
if (pState.childState_ == ExUnPackRowsTcb::STARTED_) {
// cancel request to this child
//
childQueue_.down->cancelRequestWithParentIndex(ix);
pState.childState_ = ExUnPackRowsTcb::CANCELLED_;
}
} // cancelled rows request
ix++; // may wrap around
} // parent queue is not empty
// checked all active entries nothing more to do
return WORK_OK;
}
// -----------------------------------------------------------------------
// Generic work procedure should never be called
// -----------------------------------------------------------------------
ExWorkProcRetcode ExUnPackRowsTcb::work()
{
ex_assert(0,"Should never reach ExUnPackRowsTcb::work()");
return WORK_BAD_ERROR;
}
// -----------------------------------------------------------------------
// Work procedure to send requests down
// -----------------------------------------------------------------------
ExWorkProcRetcode ExUnPackRowsTcb::workDown()
{
// while we have unprocessed down requests and while
// child's down ComQueue.has room, start more child requests
//
while (qParent_.down->entryExists(processedInputs_) &&
! childQueue_.down->isFull())
start();
return WORK_OK;
}
// -----------------------------------------------------------------------
// generic up-processing for unPackRows
// -----------------------------------------------------------------------
ExWorkProcRetcode
ExUnPackRowsTcb::workUp()
{
// while there is a chance that we have work (may exit via return)
// (If processedInputs_ equals the HeadIndex(), then there are no
// entries in the parent down queue.)
//
const NABoolean rowsetIterator = unPackRowsTdb().isRowsetIterator();
const NABoolean tolerateNonFatalError = unPackRowsTdb().isNonFatalErrorTolerated();
while (qParent_.down->getHeadIndex() != processedInputs_) {
// get head entry and pState
//
ex_queue_entry *pEntryDown = qParent_.down->getHeadEntry();
ExUnPackRowsPrivateState &pState =
*((ExUnPackRowsPrivateState*)pEntryDown->pstate);
// Set up the numRowsAtp_ to recieve the packing factor.
//
numRowsAtp_->copyAtp(pEntryDown->getAtp());
numRowsAtp_->
getTupp(unPackRowsTdb().criDescUp_->noTuples() - 1)
= numRowsTupp_;
// We are not returning any of the childs returned values.
// So we copy from the parents request.
//
workAtp_->copyAtp(pEntryDown->getAtp());
// Set up the Tupp for the index value. The index value is
// a data member of this tcb and is input to the unPackExpr.
//
workAtp_->getTupp(unPackRowsTdb().indexValueAtpIndex_)
= indexValueTupp_;
// Get the current value of the index for this request.
//
indexValue_ = pState.unPackCount_;
// for non-atomic rowsets store the next rownumber where a CLI
// error was raised. This row will be skipped later.
if (tolerateNonFatalError) {
ComDiagsArea * da = workAtp_->getDiagsArea();
if (da) {
pState.nextCLIErrorRowNum_ = da->getNextRowNumber(indexValue_);
}
}
// while we have room in the up queue and rows to process
//
while (!qParent_.up->isFull() && !childQueue_.up->isEmpty()) {
ex_queue_entry *cEntry = childQueue_.up->getHeadEntry();
atp_struct * childAtp = cEntry->getAtp();
if(cEntry->upState.status == ex_queue::Q_NO_DATA) {
// Send EOD to parent and clean up the current request.
// (will loop back to to of routine and try the next
// request if there is one)
//
stop();
// Must break here (rather than continue, so that we will
// reset the pEntryDown and pState.)
//
break;
} else if(pState.childState_ == CANCELLED_) {
} else if(cEntry->upState.status == ex_queue::Q_SQLERROR) {
processError(childAtp,FALSE,0);
continue;
} else if (pEntryDown->downState.request == ex_queue::GET_N &&
pEntryDown->downState.requestValue <=
(Lng32)pState.matchCount_) {
qParent_.down->cancelRequest(qParent_.down->getHeadIndex());
processCancel();
continue;
} else {
ex_expr::exp_return_type retCode = ex_expr::EXPR_TRUE;
// If this is the first row to be unpacked from this packed row.
//
if(indexValue_ == 0) {
// Extract the packing factor (numRows) from the packed column.
//
retCode = packingFactor()->eval(numRowsAtp_, childAtp);
if(retCode == ex_expr::EXPR_ERROR) {
processError(workAtp_,FALSE,0);
continue;
}
// Register the numRows in the private state, in case we
// start working on an other request.
//
pState.numRows_ = numRows_;
// If the number of entries in the rowset is negative, we
// signal an error
if (numRows_ < 0) {
ComDiagsArea * da = workAtp_->getDiagsArea();
if (!da)
{
da = ComDiagsArea::allocate(getHeap());
workAtp_->setDiagsArea(da);
}
*da << DgSqlCode(-EXE_ROWSET_INDEX_OUTOF_RANGE);
processError(workAtp_,FALSE,0);
continue;
}
}
// Only allocate a new tuple if needed. If a tuple has
// been allocated, but not inserted into the up queue
// because the predicate did not pass, reuse the tuple.
//
Int32 needTuple = (unPackColsExpr() ? TRUE : FALSE);
// While we still have expressions to apply to the current
// child entry and the parent can accept rows.
// Each expression that is applied to the child entry
// produces one row to be put onto the parents up queue.
//
while(indexValue_ < pState.numRows_ && !qParent_.up->isFull()) {
// Handle GET_N processing
//
if (pEntryDown->downState.request == ex_queue::GET_N &&
pEntryDown->downState.requestValue <= (Lng32)pState.matchCount_) {
qParent_.down->cancelRequest(qParent_.down->getHeadIndex());
processCancel();
indexValue_ = pState.numRows_;
continue;
}
// For Non-atomic rowsets skips over rows that raised an error
// in the CLI
if (pState.nextCLIErrorRowNum_ == indexValue_) {
processSkippedRow(workAtp_);
indexValue_++;
ComDiagsArea * da = workAtp_->getDiagsArea();
pState.nextCLIErrorRowNum_ = da->getNextRowNumber(indexValue_);
continue ;
}
// UnPackRows also adds one TP for the generated columns. A
// tuple must be allocated if one isn't already available.
//
if(needTuple) {
if (pool_->
getFreeTuple(workAtp_->
getTupp(unPackRowsTdb().unPackColsAtpIndex_))) {
// Return. Will be called again when some space frees up.
//
pState.unPackCount_ = indexValue_;
return WORK_POOL_BLOCKED;
}
// Set flag to indicate that we have allocated a tuple.
//
needTuple = FALSE;
}
// Apply any selection predicate.
//
// The unPackColsExpr() will be NULL when there are no
// columns to unPack, but we still need to produce the
// proper number of rows. For example count(*).
//
// Apply the unPack expression to this row. This will
// generate the proper values for the generated columns.
//
Lng32 markValue = 0;
ComDiagsArea * da = workAtp_->getDiagsArea();
if (da)
markValue = da->mark();
retCode = (unPackColsExpr() ?
unPackColsExpr()->eval(workAtp_, childAtp) :
ex_expr::EXPR_TRUE);
switch(retCode) {
case ex_expr::EXPR_TRUE:
{
// Get the entry on the parent up queue.
//
ex_queue_entry *pEntry = qParent_.up->getTailEntry();
pEntry->copyAtp(workAtp_);
// for non-atomic rowsets do NOT flow digas area which we get from parent
// this diags area contains errors from the CLI. We do not want to flow them up
// with OK_MMORE replies as this will lead to duplication.
// We will flow them up with the Q_REC_SKIPPED replies.
if (tolerateNonFatalError)
pEntry->setDiagsArea(NULL);
// Release the reference to the tupp in the workAtp_
//
workAtp_->getTupp(unPackRowsTdb().unPackColsAtpIndex_).release();
// Set the flag to indicate that the tuple has been used and
// a new tuple needs to be allocated.
//
needTuple = (unPackColsExpr() ? TRUE : FALSE);
// Finialize the queue entry, then insert it
//
pEntry->upState.status = cEntry->upState.status;
pEntry->upState.parentIndex = pEntryDown->downState.parentIndex;
pEntry->upState.setMatchNo(++pState.matchCount_);
qParent_.up->insert();
break;
}
case ex_expr::EXPR_FALSE:
if (rowsetIterator)
processSkippedRow(NULL);
else {
// Do nothing
}
break;
case ex_expr::EXPR_ERROR:
processError(workAtp_,tolerateNonFatalError,markValue);
// Release the reference to the tupp in the workAtp_
//
workAtp_->getTupp(unPackRowsTdb().unPackColsAtpIndex_).release();
// Set the flag to indicate that the tuple has been used and
// a new tuple needs to be allocated.
//
needTuple = (unPackColsExpr() ? TRUE : FALSE);
break;
default:
break;
}
if(retCode == ex_expr::EXPR_ERROR && !tolerateNonFatalError)
break;
// Advance the expression counter.
//
indexValue_++;
}
// This will cause the EOD to be sent after error processing.
//
if(retCode == ex_expr::EXPR_ERROR && !tolerateNonFatalError)
continue;
if(indexValue_ < pState.numRows_) {
// We still have more expressions to apply. We must have
// reached this point because the parent up queue is full.
// Return, will be called again when the queue becomes non-full.
//
pState.unPackCount_ = indexValue_;
return WORK_OK;
}
}
// We have applied all the expressions on this child entry,
// reset the expression counter, remove the child entry
// and then see if there is another entry to be processed.
//
indexValue_ = 0;
childQueue_.up->removeHead();
} // while parent up ComQueue.has room and child up ComQueue.has replies
// If we got here because we finished a request and stop()ed,
// then try again on a new request. But if we got here because
// the parent queue is full or the child queue is empty, then
// we must return.
//
if(qParent_.up->isFull() || childQueue_.up->isEmpty()) {
pState.unPackCount_ = indexValue_;
return WORK_OK;
}
pState.unPackCount_ = indexValue_;
} // while parent ComQueue.has entries
// parent down queue is empty
//
return WORK_OK;
}
void
ExUnPackRowsTcb::processError(atp_struct *atp, NABoolean isNonFatalError, Lng32 markValue)
{
ex_queue_entry * pEntryDown = qParent_.down->getHeadEntry();
ex_queue_entry * pEntry = qParent_.up->getTailEntry();
ExUnPackRowsPrivateState &pState =
*((ExUnPackRowsPrivateState*) pEntryDown->pstate);
pEntry->upState.status = ex_queue::Q_SQLERROR;
pEntry->upState.parentIndex = pEntryDown->downState.parentIndex;
pEntry->upState.downIndex = qParent_.down->getHeadIndex();
pEntry->upState.setMatchNo(pState.matchCount_);
if (!isNonFatalError) {
pEntry->copyAtp(atp);
// cancel this request and cancel all the children.
qParent_.down->cancelRequest(qParent_.down->getHeadIndex());
pState.childState_ = CANCELLED_;
while(!childQueue_.up->isEmpty() &&
(childQueue_.up->getHeadEntry()->upState.status
!= ex_queue::Q_NO_DATA))
childQueue_.up->removeHead();
}
else {
ComDiagsArea * fromDa = atp->getDiagsArea();
ex_assert(fromDa, "We have an expression error in UnPack but no diags area");
ComDiagsArea* toDa = ComDiagsArea::allocate(getHeap());
fromDa->rewindAndMergeIfDifferent(markValue, toDa);
pEntry->setDiagsArea(toDa);
toDa->setAllRowNumber(ComCondition::NONFATAL_ERROR) ;
}
qParent_.up->insert();
}
void
ExUnPackRowsTcb::processSkippedRow(atp_struct *atp)
{
ex_queue_entry * pEntryDown = qParent_.down->getHeadEntry();
ex_queue_entry * pEntry = qParent_.up->getTailEntry();
ExUnPackRowsPrivateState &pState =
*((ExUnPackRowsPrivateState*) pEntryDown->pstate);
pEntry->upState.status = ex_queue::Q_REC_SKIPPED;
pEntry->upState.parentIndex = pEntryDown->downState.parentIndex;
pEntry->upState.downIndex = qParent_.down->getHeadIndex();
pEntry->upState.setMatchNo(pState.matchCount_);
if (atp)
pEntry->copyAtp(atp);
qParent_.up->insert();
}
///////////////////////////////////////////////////////////////
//
// Private state procedures
//
///////////////////////////////////////////////////////////////
// Constructor and destructor for ExUnPackRowsPrivateState
//
ExUnPackRowsPrivateState::ExUnPackRowsPrivateState(const ExUnPackRowsTcb *)
{
init();
}
// Initialize the data members of the private state.
// Called by the constructor when the private state is
// initially allocated and when we are now with a parent
// request, so that the next time the queue entry is used,
// it's private state is already initialized.
//
void
ExUnPackRowsPrivateState::init()
{
matchCount_ = 0;
unPackCount_ = 0;
numRows_ = 0;
childState_ = ExUnPackRowsTcb::EMPTY_;
// row number is a zero based index, so initialize to -1.
nextCLIErrorRowNum_ = -1;
}
ExUnPackRowsPrivateState::~ExUnPackRowsPrivateState()
{
}
ex_tcb_private_state *
ExUnPackRowsPrivateState::allocate_new(const ex_tcb *tcb)
{
return new(((ex_tcb *)tcb)->getSpace())
ExUnPackRowsPrivateState((ExUnPackRowsTcb *)tcb);
}
// ExUnPackRowwiseRowsTcb::ExUnPackRowwiseRowsTcb() -------------------------
//
ExUnPackRowwiseRowsTcb::ExUnPackRowwiseRowsTcb(
const ExUnPackRowsTdb &unPackRowsTdbLocal,
ex_globals *glob) :
ex_tcb(unPackRowsTdbLocal, 1, glob)
{
CollHeap *space = glob->getSpace();
pool_ = new(space) sql_buffer_pool(uprTdb().numBuffers_,
uprTdb().bufferSize_,
space);
// Allocate the queue to communicate with parent
// (Child allocates queue to communicate with Child)
//
qParent_.down = new(space) ex_queue(ex_queue::DOWN_QUEUE,
uprTdb().queueSizeDown_,
uprTdb().criDescDown_,
space);
// Allocate the private state in each entry of the down queue
//
ExUnPackRowwiseRowsPrivateState privateState(this);
qParent_.down->allocatePstate(&privateState, this);
// Allocate a queue to communicate with the parent node.
//
qParent_.up = new(space) ex_queue(ex_queue::UP_QUEUE,
uprTdb().queueSizeUp_,
uprTdb().criDescUp_,
space);
// fixup all expressions
if (uprTdb().rwrsInputSizeExpr())
uprTdb().rwrsInputSizeExpr()->fixup(0,
getExpressionMode(), this,
glob->getSpace(),
glob->getDefaultHeap(), FALSE, glob);
if (uprTdb().rwrsMaxInputRowlenExpr())
uprTdb().rwrsMaxInputRowlenExpr()->fixup(0,
getExpressionMode(), this,
glob->getSpace(),
glob->getDefaultHeap(), FALSE, glob);
if (uprTdb().rwrsBufferAddrExpr())
uprTdb().rwrsBufferAddrExpr()->fixup(0,
getExpressionMode(), this,
glob->getSpace(),
glob->getDefaultHeap(), FALSE, glob);
rwrsInputValuesTuppDesc_.init();
//rwrsInputValuesTupp_.init();
workAtp_ = allocateAtp(uprTdb().workCriDesc_,
glob->getSpace());
workAtp_->getTupp(uprTdb().rwrsWorkIndex()) =
&rwrsInputValuesTuppDesc_;
rwrsNumRows_ = 0;
rwrsMaxInputRowlen_ = -1;
rwrsBufferAddr_ = NULL;
step_ = INITIAL_;
}
ExUnPackRowwiseRowsTcb::~ExUnPackRowwiseRowsTcb()
{
delete qParent_.up;
delete qParent_.down;
}
ExWorkProcRetcode
ExUnPackRowwiseRowsTcb::work()
{
ex_expr::exp_return_type retCode;
// if no parent request, return
if (qParent_.down->isEmpty())
return WORK_OK;
ex_queue_entry * pentry_down = qParent_.down->getHeadEntry();
ExUnPackRowwiseRowsPrivateState * pstate
= (ExUnPackRowwiseRowsPrivateState*)pentry_down->pstate;
ex_queue::down_request request = pentry_down->downState.request;
while (TRUE) // exit via return
{
switch (step_)
{
case INITIAL_:
{
rwrsNumRows_ = 0;
rwrsMaxInputRowlen_ = -1;
rwrsBufferAddr_ = NULL;
currentRowNum_ = 0;
step_ = GET_INPUT_VALUES_;
}
break;
case GET_INPUT_VALUES_:
{
// get number of rows in the input rwrs buffer
workAtp_->getTupp(uprTdb().rwrsWorkIndex()).
setDataPointer((char *)&rwrsNumRows_);
retCode =
uprTdb().rwrsInputSizeExpr()->eval(pentry_down->getAtp(),
workAtp_);
if (retCode == ex_expr::EXPR_ERROR)
{
step_ = ERROR_;
break;
}
if (rwrsNumRows_ == 0)
{
step_ = DONE_;
break;
}
if (rwrsNumRows_ < 0)
{
ExHandleErrors(qParent_,
pentry_down,
0,
getGlobals(),
NULL,
EXE_NUMERIC_OVERFLOW,
NULL,
NULL
);
step_ = DONE_;
break;
}
// get max length of each row
workAtp_->getTupp(uprTdb().rwrsWorkIndex()).
setDataPointer((char *)&rwrsMaxInputRowlen_);
retCode =
uprTdb().rwrsMaxInputRowlenExpr()->eval(pentry_down->getAtp(),
workAtp_);
if (retCode == ex_expr::EXPR_ERROR)
{
step_ = ERROR_;
break;
}
if (rwrsMaxInputRowlen_ <= 0)
{
ExHandleErrors(qParent_,
pentry_down,
0,
getGlobals(),
NULL,
EXE_NUMERIC_OVERFLOW,
NULL,
NULL
);
step_ = DONE_;
break;
}
// get address of rwrs buffer
workAtp_->getTupp(uprTdb().rwrsWorkIndex()).
setDataPointer((char *)&rwrsBufferAddr_);
retCode =
uprTdb().rwrsBufferAddrExpr()->eval(pentry_down->getAtp(),
workAtp_);
if (retCode == ex_expr::EXPR_ERROR)
{
step_ = ERROR_;
break;
}
step_ = RETURN_ROW_;
}
break;
case RETURN_ROW_:
{
if (qParent_.up->isFull())
return WORK_OK;
ex_queue_entry * up_entry = qParent_.up->getTailEntry();
// get pointer to current row
char * currRow = (char*)(rwrsBufferAddr_ +
currentRowNum_ * rwrsMaxInputRowlen_);
// allocate an empty tupp descriptor
tupp_descriptor * td = pool_->get_free_tupp_descriptor(0);
if (! td)
return WORK_POOL_BLOCKED; // couldn't allocate, try again later.
// initialize it with the addr of the row to be returned
td->init(rwrsMaxInputRowlen_, NULL, currRow);
up_entry->copyAtp(pentry_down);
// move this row to up queue
up_entry->getTupp(uprTdb().unPackColsAtpIndex_) = td;
up_entry->upState.status = ex_queue::Q_OK_MMORE;
up_entry->upState.parentIndex
= pentry_down->downState.parentIndex;
up_entry->upState.downIndex = qParent_.down->getHeadIndex();
up_entry->upState.setMatchNo(1);
qParent_.up->insert();
currentRowNum_++;
if (currentRowNum_ == rwrsNumRows_)
{
step_ = DONE_;
break;
}
}
break;
case ERROR_:
{
if (qParent_.up->isFull())
return WORK_OK;
// Return EOF.
ex_queue_entry * up_entry = qParent_.up->getTailEntry();
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_SQLERROR;
ComDiagsArea *diagsArea = pentry_down->getDiagsArea();
if (diagsArea != up_entry->getDiagsArea())
{
up_entry->setDiagsArea (diagsArea);
if (diagsArea != NULL)
diagsArea->incrRefCount();
}
// insert into parent
qParent_.up->insert();
step_ = DONE_;
}
break;
case DONE_:
{
if (qParent_.up->isFull())
return WORK_OK;
// Return EOF.
ex_queue_entry * up_entry = qParent_.up->getTailEntry();
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_NO_DATA;
// insert into parent
qParent_.up->insert();
step_ = INITIAL_;
qParent_.down->removeHead();
return WORK_OK;
}
break;
}
} // while
return WORK_OK;
}
///////////////////////////////////////////////////////////////
//
// Private state procedures
//
///////////////////////////////////////////////////////////////
// Constructor and destructor for ExUnPackRowsPrivateState
//
ExUnPackRowwiseRowsPrivateState::ExUnPackRowwiseRowsPrivateState(const ExUnPackRowwiseRowsTcb *)
{
init();
}
// Initialize the data members of the private state.
// Called by the constructor when the private state is
// initially allocated and when we are now with a parent
// request, so that the next time the queue entry is used,
// it's private state is already initialized.
//
void
ExUnPackRowwiseRowsPrivateState::init()
{
}
ExUnPackRowwiseRowsPrivateState::~ExUnPackRowwiseRowsPrivateState()
{
}
ex_tcb_private_state *
ExUnPackRowwiseRowsPrivateState::allocate_new(const ex_tcb *tcb)
{
return new(((ex_tcb *)tcb)->getSpace())
ExUnPackRowwiseRowsPrivateState((ExUnPackRowwiseRowsTcb *)tcb);
}