blob: 6bcce6908170313b4352a2d14314ecdbb20a987c [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: ExPack.cpp
* Description: Methods for ExPackTdb and ExPackTcb
*
* Created: 6/16/97
* Language: C++
*
*
*
*
******************************************************************************
*/
#include "ExPack.h"
#include "ExSimpleSqlBuffer.h"
#include "ex_exe_stmt_globals.h"
#include "ComDefs.h"
//////////////////////////////////////////////////////////////////////////////
//
// TDB procedures
//
//////////////////////////////////////////////////////////////////////////////
// Build the TCB tree for this Pack node and all of its descendents.
ex_tcb* ExPackRowsTdb::build(ex_globals* glob)
{
ex_tcb* childTcb = childTdb_->build(glob);
ExPackRowsTcb* packTcb = new(glob->getSpace()) ExPackRowsTcb(*this,*childTcb,glob);
packTcb->registerSubtasks();
return packTcb;
}
//////////////////////////////////////////////////////////////////////////////
//
// TCB procedures
//
//////////////////////////////////////////////////////////////////////////////
// Constructor.
ExPackRowsTcb::ExPackRowsTcb(const ExPackRowsTdb& packTdb,
const ex_tcb& childTcb,
ex_globals* glob)
: ex_tcb(packTdb,1,glob)
{
childTcb_ = &childTcb;
Space* space = (glob ? glob->getSpace() : 0);
CollHeap* heap = (glob ? glob->getDefaultHeap() : 0);
// Allocate the buffer pool.
pool_ = new (space) ExSimpleSQLBuffer(packTdb.noOfBuffers_,
packTdb.packTuppLen_,
space);
// Get the queue used by my child to communicate with me.
qChild_ = childTcb_->getParentQueue();
// Allocate an extra input tupp the Pack node adds to its down queues.
qChild_.down->allocateAtps(glob->getSpace());
// Allocate the queue to communicate with parent.
qParent_.down = new (space) ex_queue(ex_queue::DOWN_QUEUE,
packTdb.fromParent_,
packTdb.givenCriDesc_,
space);
// Allocate the private state in each entry of the down queue.
ExPackPrivateState privateState(this);
qParent_.down->allocatePstate(&privateState,this);
// Initialize nextRqst_ to refer to the queue entry used next.
nextRqst_ = qParent_.down->getHeadIndex();
// Allocate a queue to communicate with the parent node.
qParent_.up = new(space) ex_queue(ex_queue::UP_QUEUE,
packTdb.toParent_,
packTdb.returnedCriDesc_,
space);
// Fix up the expressions.
packExpr()->fixup(0,getExpressionMode(),this,space,heap, FALSE, glob);
if(predExpr()) predExpr()->fixup(0,getExpressionMode(),this,space,heap, FALSE, glob);
}
// Destructor.
ExPackRowsTcb::~ExPackRowsTcb()
{
delete qParent_.up;
delete qParent_.down;
freeResources();
}
void ExPackRowsTcb::freeResources()
{
if(pool_) delete pool_;
pool_ = 0;
}
// -----------------------------------------------------------------------
// Register all the pack subtasks with the scheduler.
// -----------------------------------------------------------------------
void ExPackRowsTcb::registerSubtasks()
{
ExScheduler* sched = getGlobals()->getScheduler();
// Schedule this routine if a new entry is inserted into the parent's
// down queue and we are waiting on an event.
sched->registerInsertSubtask(sWorkDown,this,qParent_.down,"DN");
// Schedule this routine if the child's down queue changes from being
// full to being not full and we are waiting on an event.
sched->registerUnblockSubtask(sWorkDown,this,qChild_.down,"DN");
// Schedule this routine if a cancel request occurs on the parent's
// down queue.
sched->registerCancelSubtask(sCancel,this,qParent_.down,"CN");
// Schedule this routine if the parent's up queue changes from being
// full to being not full and we are waiting on an event.
sched->registerUnblockSubtask(sWorkUp,this,qParent_.up,"UP");
// Schedule this routine if a new entry is inserted into the child's
// up queue and we are waiting on an event.
sched->registerInsertSubtask(sWorkUp,this,qChild_.up);
// BertBert VV
// Schedule this routine if a GET_NEXT_N request is updated in the parent's
// down queue and we are waiting on an event.
sched->registerNextSubtask(sWorkDown,this,qParent_.down,"DN");
// BertBert ^^
}
// -----------------------------------------------------------------------
// Send next request down to children. Called by workDown().
// -----------------------------------------------------------------------
void ExPackRowsTcb::start()
{
// Caller needs to make sure we have room in the child's down queue.
ex_queue_entry* cEntryDown = qChild_.down->getTailEntry();
// Caller needs to make sure we have a new request in parent's down queue.
ex_queue_entry* pEntryDown = qParent_.down->getQueueEntry(nextRqst_);
const ex_queue::down_request request = pEntryDown->downState.request;
ExPackPrivateState& pState = *((ExPackPrivateState*)pEntryDown->pstate);
// don't forward cancelled request.
// if(request == ex_queue::GET_NOMORE)
// {
// pState.childState_ = CANCELLED_;
// nextRqst_++;
// return;
// }
// Pass request to children.
cEntryDown->downState.request = request;
cEntryDown->downState.requestValue = pEntryDown->downState.requestValue;
// BertBert VV
// Note, there is no complete support for the GET_NEXT_N protocol in this operator.
cEntryDown->downState.numGetNextsIssued = pEntryDown->downState.numGetNextsIssued;
// BertBert ^^
cEntryDown->downState.parentIndex = nextRqst_;
cEntryDown->copyAtp(pEntryDown->getAtp());
pState.matchCount_ = 0;
qChild_.down->insert();
pState.childState_ = STARTED_;
// If this is a CANCEL request, cancel request we just gave to our child.
if(request == ex_queue::GET_NOMORE)
{
// immediately cancel the request (requests are already in cancelled
// state but the cancel callback isn't activated yet)
qChild_.down->cancelRequestWithParentIndex(nextRqst_);
pState.childState_ = CANCELLED_;
}
nextRqst_++;
}
// -----------------------------------------------------------------------
// All child rows have been returned. Return an EOD indication to parent.
// -----------------------------------------------------------------------
void ExPackRowsTcb::stop()
{
// Caller needs to make sure there is an outstanding parent request.
ex_queue_entry* pEntryDown = qParent_.down->getHeadEntry();
ExPackPrivateState& pState = *((ExPackPrivateState*)pEntryDown->pstate);
// Caller needs to make sure we have room in the parent's up queue.
ex_queue_entry* pEntryUp = qParent_.up->getTailEntry();
// Insert EOD to parent's up queue.
pEntryUp->upState.status = ex_queue::Q_NO_DATA;
pEntryUp->upState.parentIndex = pEntryDown->downState.parentIndex;
qParent_.up->insert();
// child row removed outside of stop().
// Consume the child row.
qChild_.up->removeHead();
// Reinitialize the state of this processed request.
pState.init();
// This parent request has been processed.
qParent_.down->removeHead();
}
// -----------------------------------------------------------------------
// For those requests my parent cancel, cancel the corresponding requests
// that have already been forwarded to my child.
// -----------------------------------------------------------------------
ExWorkProcRetcode ExPackRowsTcb::processCancel()
{
queue_index ix = qParent_.down->getHeadIndex();
// Loop over all requests that have been sent down.
while(ix != nextRqst_)
{
ex_queue_entry* pEntryDown = qParent_.down->getQueueEntry(ix);
// Check whether this is a request cancelled by the parent.
if(pEntryDown->downState.request == ex_queue::GET_NOMORE)
{
ExPackPrivateState& pState =
*((ExPackPrivateState*)pEntryDown->pstate);
// Cancel corresponding request for my child if it's been forwarded.
if(pState.childState_ == ExPackRowsTcb::STARTED_)
{
qChild_.down->cancelRequestWithParentIndex(ix);
pState.childState_ = ExPackRowsTcb::CANCELLED_;
}
}
ix++;
}
return WORK_OK;
}
// -----------------------------------------------------------------------
// Generic work procedure should never be called
// -----------------------------------------------------------------------
ExWorkProcRetcode ExPackRowsTcb::work()
{
ex_assert(0,"Should never reach ExPackRowsTcb::work()");
return WORK_BAD_ERROR;
}
// -----------------------------------------------------------------------
// Work procedure to send requests down
// -----------------------------------------------------------------------
ExWorkProcRetcode ExPackRowsTcb::workDown()
{
// While we have unprocessed down requests and the child's down queue
// has room, start more child requests.
while(
qParent_.down->entryExists(nextRqst_) && !qChild_.down->isFull())
start();
return WORK_OK;
}
// -----------------------------------------------------------------------
// Work procedure to process child rows.
// -----------------------------------------------------------------------
ExWorkProcRetcode ExPackRowsTcb::workUp()
{
// While there is an outstanding parent request not finished.
while(qParent_.down->getHeadIndex() != nextRqst_)
{
// Get the request and retrieve its private state.
ex_queue_entry* pEntryDown = qParent_.down->getHeadEntry();
ExPackPrivateState& pState = *((ExPackPrivateState*)pEntryDown->pstate);
// While we have room in the up queue and rows to process.
while(!qParent_.up->isFull() && !qChild_.up->isEmpty())
{
// New row produced by child.
ex_queue_entry* cEntryUp = qChild_.up->getHeadEntry();
// The current request has already been cancelled.
if(pState.childState_ == ExPackRowsTcb::CANCELLED_)
{
if(cEntryUp->upState.status == ex_queue::Q_NO_DATA)
{
// No more rows for the cancelled request.
// stop() returns Q_NO_DATA and remove the cancelled request.
stop();
// Break out of the inner while-loop and check for next request.
break;
}
else
{
// Child is returning data for a request already cancelled.
// Just ignore them and continue to check whether child's up
// ComQueue.has any more entries.
qChild_.up->removeHead();
continue;
}
}
if(cEntryUp->upState.status == ex_queue::Q_NO_DATA)
{
// Apply selection predicates to and return row being partially
// packed in buffer if there is one.
workReturnRow();
// Send EOD to parent and clean up the current request.
stop();
// Break out of the inner while-loop and check for next request.
break;
}
else if(cEntryUp->upState.status == ex_queue::Q_SQLERROR)
{
// Child has produced an error. Copy diagnostic area to parent.
ex_queue_entry* pEntryUp = qParent_.up->getTailEntry();
pEntryUp->copyAtp(cEntryUp);
pEntryUp->upState.status = ex_queue::Q_SQLERROR;
pEntryUp->upState.parentIndex = pEntryDown->downState.parentIndex;
pEntryUp->upState.downIndex = qParent_.down->getHeadIndex();
pEntryUp->upState.setMatchNo(pState.matchCount_); // (+1?)
qParent_.up->insert();
// Cancel the whole request on child. (may be optional)
qChild_.down->cancelRequestWithParentIndex(
qParent_.down->getHeadIndex());
// Mark the corresponding request cancelled.
pState.childState_ = ExPackRowsTcb::CANCELLED_;
qChild_.up->removeHead();
continue;
}
//
// Now, we got a new row to be packed into the current packed row.
// Here's the core stuff.
// Check if the packTupp has been allocated. If not, allocate it.
if(pState.packTupp_.getDataPointer() == NULL)
{
// Cannot allocate space. Call again when some space free up.
if(pool_->getFreeTuple(pState.packTupp_))
return WORK_POOL_BLOCKED;
// A cheat to initialize contents of the packed row. We need this
// since the logic of packExpr evaluation relies on the fact that
// the no of rows already packed in a packed row (which is 0 to
// begin with) is stored in the packed row itself, even initially.
//
char* packTuppPtr = pState.packTupp_.getDataPointer();
for(Int32 i=0; i < packTdb().packTuppLen_; i++) packTuppPtr[i] = 0;
// $$$ could potentially make it faster by initializing only the
// $$$ first four bytes (which stores the no of rows) to 0.
}
// Put the pack tupp directly in child's returned ATP.
atp_struct * cEntryUpAtp = cEntryUp->getAtp();
cEntryUpAtp->getTupp(packTdb().packTuppIndex_) = pState.packTupp_;
// Apply the pack expression to pack child row into packed row.
ex_expr::exp_return_type retCode = packExpr()->eval(cEntryUpAtp,0);
if(!cEntryUpAtp->getDiagsArea());
else if(!pEntryDown->getDiagsArea())
{
pEntryDown->setDiagsArea(cEntryUpAtp->getDiagsArea());
pEntryDown->getDiagsArea()->incrRefCount();
}
else
pEntryDown->getDiagsArea()->mergeAfter(*cEntryUpAtp->getDiagsArea());
if(retCode == ex_expr::EXPR_TRUE)
{
// for debugging.
// pState.printPackTupp();
// eval() returns TRUE if the packed record is full after the
// child row is packed into it. Return the full packed record.
workReturnRow();
}
else
{
// for debugging.
// pState.printPackTupp();
// Error occurs during expression evaluation.
if(retCode == ex_expr::EXPR_ERROR)
{
// There must be room in the up queue to have reached here.
ex_queue_entry* pEntryUp = qParent_.up->getTailEntry();
// cEntryUp's atp has diagnostic area set up. copy it to parent.
pEntryUp->copyAtp(cEntryUp);
pEntryUp->upState.status = ex_queue::Q_SQLERROR;
pEntryUp->upState.downIndex = qParent_.down->getHeadIndex();
pEntryUp->upState.parentIndex = pEntryDown->downState.parentIndex;
pEntryUp->upState.setMatchNo(pState.matchCount_); //(+1?)
qParent_.up->insert();
// mark this request cancelled in the down queue to child.
qChild_.down->cancelRequestWithParentIndex(
qParent_.down->getHeadIndex());
pState.childState_ = ExPackRowsTcb::CANCELLED_;
}
}
// Child row is no longer useful after the eval, get rid of it.
qChild_.up->removeHead();
} // while parent's up queue is not full and child's up queue not empty.
// If we got here because we finished a request and stop()ed, then
// try again on a new request. If we got here because the parent
// queue is full or the child queue is empty, then we must return.
//
if(qParent_.up->isFull() || qChild_.up->isEmpty()) {
return WORK_OK;
}
} // while parent's down ComQueue.has entries.
// Parent down queue is empty.
return WORK_OK;
}
// -----------------------------------------------------------------------
// Work procedure to apply the selection predicates on the current packed
// record and return it.
// -----------------------------------------------------------------------
ExWorkProcRetcode ExPackRowsTcb::workReturnRow()
{
// Caller needs to make sure there is an outerstanding request.
ex_queue_entry* pEntryDown = qParent_.down->getHeadEntry();
ExPackPrivateState& pState = *((ExPackPrivateState*)pEntryDown->pstate);
// Nothing in packed record. Nothing to return.
if(pState.packTupp_.getDataPointer() == NULL) return WORK_OK;
// Copy parent's ATP from down queue to its up queue and add last tupp.
ex_queue_entry* pEntryUp = qParent_.up->getTailEntry();
atp_struct * pEntryUpAtp = pEntryUp->getAtp();
pEntryUpAtp->copyAtp(pEntryDown->getAtp());
pEntryUpAtp->getTupp(packTdb().packTuppIndex_) = pState.packTupp_;
// Release the tupp in the private state so that it stops referencing
// the packed record.
pState.packTupp_.release();
// Evaluate the selection predicates.
ex_expr::exp_return_type predVal = ex_expr::EXPR_TRUE;
if(predExpr()) predVal = predExpr()->eval(pEntryUpAtp,0);
if(predVal)
{
// Formally insert the ATP into the parent's up queue.
pEntryUp->upState.status = ex_queue::Q_OK_MMORE;
pEntryUp->upState.parentIndex = pEntryDown->downState.parentIndex;
pEntryUp->upState.setMatchNo(pState.matchCount_ + 1);
pState.matchCount_++;
qParent_.up->insert();
}
else
{
// Release the ATP formed in the parent's up queue entry.
pEntryUpAtp->release();
}
return WORK_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
// Private state procedures
//
//////////////////////////////////////////////////////////////////////////////
// Constructor.
ExPackPrivateState::ExPackPrivateState(const ExPackRowsTcb *)
{
packTupp_.init();
init();
}
// Destructor.
ExPackPrivateState::~ExPackPrivateState()
{}
// Initialization.
void ExPackPrivateState::init()
{
matchCount_ = 0;
childState_ = ExPackRowsTcb::EMPTY_;
packTupp_.release();
}
// Allocation.
ex_tcb_private_state* ExPackPrivateState::allocate_new(const ex_tcb *tcb)
{
return new(((ex_tcb *)tcb)->getSpace()) ExPackPrivateState((ExPackRowsTcb *)tcb);
}
void ExPackPrivateState::printPackTupp()
{
/*
char* env = getenv("DEBUG_PACKING");
if(env != NULL)
{
char* data = packTupp_.getDataPointer();
printf("Pack tupp in HEX:\n");
if(data == NULL) return;
int len = packTupp_.getAllocatedSize();
for(int i=0;i<len;i++) printf("%x ",data[i]);
printf("\n");
}
*/
}