blob: 30eb1ae05d37bb95f984c6a41351b7385e30044a [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: ExVPJoin.h
* Description: VP (Vertical Partition) Join
*
* Created: 5/3/94
* Language: C++
*
*
******************************************************************************
*/
#include "ex_stdh.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ExVPJoin.h"
#include "ex_expr.h"
// Exclude this code from coverage analysis since this feature is
// obsolete and not used.
ex_tcb * ExVPJoinTdb::build(ex_globals * glob)
{
// first build the child TCBs
//
const ex_tcb **childTcbs = (const ex_tcb**) new (glob->getSpace()) ex_tcb* [numChildren_];
short i;
for (i=0; i<numChildren_; i++)
{
childTcbs[i] = childTdbs_[i]->build(glob);
}
// then VPJoin TCB
//
ExVPJoinTcb *vpjTcb =
new (glob->getSpace()) ExVPJoinTcb(*this, childTcbs, glob);
// and finally register work tasks
vpjTcb->registerSubtasks();
return vpjTcb;
}
ExVPJoinTcb::ExVPJoinTcb(const ExVPJoinTdb & vpjTdb,
const ex_tcb ** childTcbs,
ex_globals *glob)
: ex_tcb(vpjTdb, 1, glob),
childTcbs_(childTcbs)
{
CollHeap * space = glob->getSpace();
numChildren_ = vpjTdb.numChildren();
// Allocate array of child queue pairs (i.e., queue pairs for
// communicating with children.
//
qChild_ = (ex_queue_pair*) new (space) ex_queue_pair[numChildren_];
// Initialize array.
//
Int32 i;
for (i=0; i<numChildren_; i++)
{
qChild_[i] = childTcbs_[i]->getParentQueue();
}
// Allocate queue for passing requests from parent to this node,
// and initialize private state associated with each queue entry.
//
qParent_.down = new (space) ex_queue(ex_queue::DOWN_QUEUE,
vpjTdb.fromParent_,
vpjTdb.givenCriDesc_,
space);
ExVPJoinPrivateState ps(this);
qParent_.down->allocatePstate(&ps, this);
// Allocate queue for passing replies from this node to parent.
//
qParent_.up = new (space) ex_queue(ex_queue::UP_QUEUE,
vpjTdb.toParent_,
vpjTdb.returnedCriDesc_,
space);
if (filterPred())
(void) filterPred()->fixup(0, getExpressionMode(), this,
glob->getSpace(), glob->getDefaultHeap());
// Record index of next down request.
nextReqIx_ = qParent_.down->getHeadIndex();
}
ExVPJoinTcb::~ExVPJoinTcb()
{
delete qParent_.up;
delete qParent_.down;
freeResources();
}
ex_queue_pair ExVPJoinTcb::getParentQueue() const
{
return qParent_;
}
const ex_tcb* ExVPJoinTcb::getChild(Int32 pos) const
{
ex_assert((pos >= 0) && (pos < numChildren()), "");
return childTcbs_[pos];
}
Int32 ExVPJoinTcb::numChildren() const
{
return vpJoinTdb().numChildren();
}
void ExVPJoinTcb::freeResources()
{}
void ExVPJoinTcb::registerSubtasks()
{
ExScheduler *sched = getGlobals()->getScheduler();
// parent and child down queues are handled by workDown()
sched->registerInsertSubtask(sWorkDown, this, qParent_.down, "Work Down");
sched->registerCancelSubtask(sWorkCancel, this, qParent_.down, "Work Cancel");
Int32 i;
for (i=0; i<numChildren_; i++)
{
sched->registerUnblockSubtask(sWorkDown, this, qChild_[i].down);
}
// parent and child up queues are handled by workUp()
sched->registerUnblockSubtask(sWorkUp, this, qParent_.up, "Work Up");
for (i=0; i<numChildren_; i++)
{
sched->registerInsertSubtask(sWorkUp, this, qChild_[i].up, "Work Up");
}
// make an event through which we can tell the scheduler to call
// workUp().
exceptionEvent_ = sched->registerNonQueueSubtask(sWorkUp, this);
}
short ExVPJoinTcb::work()
{
// The work procedures for VPJoin first pass a request from the
// parent node to all child nodes. Replies are then merged (the
// i'th reply from each child is merged to form the i'th reply to
// the parent) and passed to the parent. Work procedure workDown()
// handles the first stage, passing requests down to children, while
// procedure workUp() handles the second.
//
// The work procedures can be called in any order.
// The main work procedure (this one) is never called.
//
ex_assert(0, "Should never reach ExVPJoinTcb::work()");
return WORK_OK;
}
ExWorkProcRetcode ExVPJoinTcb::workDown()
{
// Move requests from parent down queue to child down queues.
// Data member nextReqIx_ points to the next request to be passed
// down.
//
queue_index tail = qParent_.down->getTailIndex();
for(; nextReqIx_!=tail; nextReqIx_++)
{
// Check that no child down queue is full. This is the criterion used
// for determining when to pass a new request to child nodes. This is
// a bit more strict than it has to be, as we could potentially start
// some children, even though not all have space in their down queue.
// However, the bookkeeping overhead and complexity of this more lenient
// approach hardly seems worth it.
//
Int32 i;
for (i=0; i<numChildren_; i++)
if (qChild_[i].down->isFull())
return WORK_OK;
ex_queue_entry * pentry = qParent_.down->getQueueEntry(nextReqIx_);
const ex_queue::down_request request = pentry->downState.request;
ExVPJoinPrivateState &pstate = *((ExVPJoinPrivateState*) pentry->pstate);
pstate.init();
if (request!=ex_queue::GET_NOMORE)
{
// Request has not been cancelled; pass down to children.
//
for (i=0; i<numChildren_; i++)
{
ex_queue_entry * centry = qChild_[i].down->getTailEntry();
// pass same request down.
centry->downState.request = request;
centry->downState.requestValue = pentry->downState.requestValue;
// remember the index of this request in the parent down queue
centry->downState.parentIndex = nextReqIx_;
// set the atp to the parent atp
centry->passAtp(pentry);
// finally, insert in child's down queue
qChild_[i].down->insert();
}
pstate.started_ = 1;
}
else
{
// Request is cancelled; schedule workUp() to handle cancellation.
//
exceptionEvent_->schedule();
}
}
return WORK_OK;
}
ExWorkProcRetcode ExVPJoinTcb::workUp()
{
// Move replies from child nodes up to parent. The replies at the
// heads of child up-queues are merged into a single reply to the
// parent. If a filter predicate is specified, then a reply is sent
// to the parent node only if the predicate expression evaluates to
// TRUE.
//
queue_index ix;
for(; (ix=qParent_.down->getHeadIndex()) != nextReqIx_;
qParent_.down->removeHead())
{
ex_queue_entry * upEntry = 0;
ex_queue_entry * pentry = qParent_.down->getHeadEntry();
ExVPJoinPrivateState &pstate = *((ExVPJoinPrivateState*) pentry->pstate);
const ex_queue::down_request & request = pentry->downState.request;
Int32 requestCancelled = (request == ex_queue::GET_NOMORE);
Int32 isGetNRowsRequest = (request == ex_queue::GET_N);
Lng32 numRowsOfGetNRequest = pentry->downState.requestValue;
if (pstate.started_)
{
// Request has been started, which means that child nodes
// are producing replies for current request. Read replies,
// merge them, and, if request hasn't been cancelled and
// filter expression evaluates to TRUE (if there is a filter
// expression), pass reply to parent.
//
Lng32 eodReply = 0; // is current reply from children end-ofdata?
// Loop until there are no more replies to process or until
// end-of-data is read for current request.
//
while (!eodReply)
{
// If a child up-queue is empty, then return.
//
Int32 i;
for (i=0; i<numChildren_; i++)
if (qChild_[i].up->isEmpty())
return WORK_OK;
// Since all children produce the same number of results for
// any given request, we just need to look at the first child
// to determine if the current reply from each child is EOD.
//
eodReply =
qChild_[0].up->getHeadEntry()->upState.status==ex_queue::Q_NO_DATA;
if (!upEntry)
{
// Get a pointer to the entry in this node's parent up queue
// that will receive the current reply.
if (qParent_.up->isFull())
// no room in parent up queue; try again later
return WORK_OK;
upEntry = qParent_.up->getTailEntry();
upEntry->copyAtp(pentry);
}
if (!requestCancelled && !eodReply)
{
// Request isn't cancelled and it isn't end-of-data.
// Evaluate filter predicate, if there is one.
//
// Build ATP that is of the right structure both for
// predicate evaluation and replying to parent.
//
// The result ATP from each child looks as follows:
//
// ------------------------------
// |input data|child output data|
// ------------------------------
//
// Data member firstReplyAtpIndex_ records the atp index
// where output data starts. Output from child nodes is
// placed in the result ATP in the following manner:
//
// ----------------------------------------------
// |input data|child 1 output|child 2 output|...|
// ----------------------------------------------
//
// This is also the ATP structure that is assumed by
// the predicate expression.
//
atp_struct * resAtp = upEntry->getAtp(); // result ATP
short outIx = vpJoinTdb().firstReplyAtpIx_;
// where output data begins,
// both in result ATP and ATPs
// returned from child nodes
for (i=0; i<numChildren_; i++)
{
// current reply from i'th child
const ex_queue_entry *reply = qChild_[i].up->getHeadEntry();
// Append this child's output to result ATP.
//
short numResTups = reply->numTuples()-vpJoinTdb().firstReplyAtpIx_;
short dest_start_tupp = outIx;
short src_start_tupp = vpJoinTdb().firstReplyAtpIx_;
short src_end_tupp = numResTups + vpJoinTdb().firstReplyAtpIx_ - 1;
resAtp->copyPartialAtp(reply->getAtp(), // source atp
dest_start_tupp, // starting ix in resAtp
src_start_tupp, // starting ix in reply
src_end_tupp); // ending ix in reply
outIx += numResTups;
}
if (!filterPred() ||
filterPred()->eval(resAtp,0)==ex_expr::EXPR_TRUE)
{
// There is either no filter predicate or there is one and
// it evaluated to TRUE for the current reply, so return reply
// to parent node.
//
upEntry->upState.parentIndex = pentry->downState.parentIndex;
upEntry->upState.downIndex = ix;
upEntry->upState.setMatchNo(pstate.matchCount_++);
upEntry->upState.status = ex_queue::Q_OK_MMORE;
qParent_.up->insert();
upEntry = 0;
if (isGetNRowsRequest &&
(Lng32) pstate.matchCount_ >= numRowsOfGetNRequest)
{
// Current request is of the type "get N rows" and
// we have returned at least N rows, so cancel the
// request.
//
cancelParentRequest(pentry, ix);
requestCancelled = 1;
}
}
}
for (i=0; i<numChildren_; i++)
qChild_[i].up->removeHead();
}
}
else
{
// Request hasn't been started, which means that it is
// cancelled and hasn't been sent to children.
//
ex_assert(request==ex_queue::GET_NOMORE,
"Expected request to be cancelled");
if (qParent_.up->isFull())
// no room in parent up queue; try again later
return WORK_OK;
upEntry = qParent_.up->getTailEntry();
upEntry->copyAtp(pentry);
}
// Return end-of-data to parent.
upEntry->upState.parentIndex = pentry->downState.parentIndex;
upEntry->upState.downIndex = ix;
upEntry->upState.setMatchNo(pstate.matchCount_);
upEntry->upState.status = ex_queue::Q_NO_DATA;
qParent_.up->insert();
upEntry = 0;
}
return WORK_OK;
}
void ExVPJoinTcb::cancelParentRequest(ex_queue_entry * pentry,
queue_index pix)
{
ExVPJoinPrivateState & pstate = *((ExVPJoinPrivateState*) pentry->pstate);
pentry->downState.request = ex_queue::GET_NOMORE;
if (pstate.started_)
{
Int32 i;
for (i=0; i<numChildren_; i++)
{
qChild_[i].down->cancelRequestWithParentIndex(pix);
}
}
}
ExWorkProcRetcode ExVPJoinTcb::workCancel()
{
// Walk through already processed down requests and cancel those
// with request type GET_NOMORE (even if this was already done)
for (queue_index ci = qParent_.down->getHeadIndex();
ci < nextReqIx_;
ci++)
{
ex_queue_entry * pentry = qParent_.down->getQueueEntry(ci);
if (pentry->downState.request == ex_queue::GET_NOMORE)
cancelParentRequest(pentry, ci);
}
return WORK_OK;
}
short ExVPJoinTcb::sWorkDown(ex_tcb *tcb)
{ return ((ExVPJoinTcb *) tcb)->workDown(); }
short ExVPJoinTcb::sWorkUp(ex_tcb *tcb)
{ return ((ExVPJoinTcb *) tcb)->workUp(); }
short ExVPJoinTcb::sWorkCancel(ex_tcb *tcb)
{ return ((ExVPJoinTcb *) tcb)->workCancel(); }
ExVPJoinPrivateState::ExVPJoinPrivateState(const ExVPJoinTcb * tcb)
{
init();
}
ex_tcb_private_state * ExVPJoinPrivateState::allocate_new(const ex_tcb * tcb)
{
return new (((ex_tcb*)tcb)->getSpace()) ExVPJoinPrivateState((ExVPJoinTcb*) tcb);
}
ExVPJoinPrivateState::~ExVPJoinPrivateState()
{}