blob: c6093abbb80530f966f7dec97e9a4c79bfa6fbb4 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ExFirstN.cpp
* Description: class to return N rows.
*
*
* Created: 5/2/2003
* Language: C++
*
*
*
*
*****************************************************************************
*/
#include "ex_stdh.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ExFirstN.h"
//
// Build a firstN tcb
//
ex_tcb * ExFirstNTdb::build(ex_globals * glob)
{
// first build the child
ex_tcb * child_tcb = tdbChild_->build(glob);
ExFirstNTcb * firstn_tcb =
new(glob->getSpace()) ExFirstNTcb(*this, *child_tcb, glob);
// add this tcb to the schedule
firstn_tcb->registerSubtasks();
return (firstn_tcb);
}
//
// Constructor for firstn_tcb
//
ExFirstNTcb::ExFirstNTcb(const ExFirstNTdb & firstn_tdb,
const ex_tcb & child_tcb, // child queue pair
ex_globals *glob
) : ex_tcb( firstn_tdb, 1, glob),
step_(INITIAL_)
{
childTcb_ = &child_tcb;
Space * space = glob->getSpace();
CollHeap * heap = (glob ? glob->getDefaultHeap() : NULL);
// Allocate the buffer pool
#pragma nowarn(1506) // warning elimination
pool_ = new(space) sql_buffer_pool(firstn_tdb.numBuffers_,
firstn_tdb.bufferSize_,
space);
#pragma warn(1506) // warning elimination
// get the queue that child use to communicate with me
qchild_ = child_tcb.getParentQueue();
// Allocate the queue to communicate with parent
qparent_.down = new(space) ex_queue(ex_queue::DOWN_QUEUE,
firstn_tdb.queueSizeDown_,
firstn_tdb.criDescDown_,
space);
// Allocate the private state in each entry of the down queue
ExFirstNPrivateState p(this);
qparent_.down->allocatePstate(&p, this);
qparent_.up = new(space) ex_queue(ex_queue::UP_QUEUE,
firstn_tdb.queueSizeUp_,
firstn_tdb.criDescUp_,
space);
workAtp_ = NULL;
firstNParamVal_ = 0;
effectiveFirstN_ = -1;
returnedSoFar_ = 0;
if (firstn_tdb.workCriDesc_)
{
workAtp_ = allocateAtp(firstn_tdb.workCriDesc_, space);
pool_->get_free_tuple(workAtp_->getTupp(firstn_tdb.workCriDesc_->noTuples()-1), 0);
workAtp_->getTupp(firstn_tdb.workCriDesc_->noTuples()-1).
setDataPointer((char*)&firstNParamVal_);
}
if (firstn_tdb.firstNRowsExpr_)
{
firstn_tdb.firstNRowsExpr_->fixup(0, getExpressionMode(), this, space, heap,
FALSE, glob);
}
};
ExFirstNTcb::~ExFirstNTcb()
{
freeResources();
};
////////////////////////////////////////////////////////////////////////
// Free Resources
//
void ExFirstNTcb::freeResources()
{
delete qparent_.up;
delete qparent_.down;
};
////////////////////////////////////////////////////////////////////////
// Register subtasks
//
void ExFirstNTcb::registerSubtasks()
{
ExScheduler *sched = getGlobals()->getScheduler();
ex_queue_pair pQueue = getParentQueue();
// register events for parent queue
ex_assert(pQueue.down && pQueue.up,"Parent down queue must exist");
sched->registerInsertSubtask(ex_tcb::sWork, this, pQueue.down);
sched->registerCancelSubtask(sCancel, this, pQueue.down);
sched->registerUnblockSubtask(ex_tcb::sWork,this, pQueue.up);
// register events for child queues
const ex_queue_pair cQueue = getChild(0)->getParentQueue();
sched->registerUnblockSubtask(ex_tcb::sWork,this, cQueue.down);
sched->registerInsertSubtask(ex_tcb::sWork, this, cQueue.up);
}
short ExFirstNTcb::moveChildDataToParent()
{
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ex_queue_entry * cUpEntry = qchild_.up->getHeadEntry();
ex_queue_entry * pUpEntry = qparent_.up->getTailEntry();
pUpEntry->copyAtp(cUpEntry);
pUpEntry->upState.status = cUpEntry->upState.status;
pUpEntry->upState.downIndex = qparent_.down->getHeadIndex();
pUpEntry->upState.parentIndex = pentry_down->downState.parentIndex;
pUpEntry->upState.setMatchNo(cUpEntry->upState.getMatchNo());
// insert into parent up queue
qparent_.up->insert();
qchild_.up->removeHead();
return 0;
}
////////////////////////////////////////////////////////////////////////////
// This is where the action is.
////////////////////////////////////////////////////////////////////////////
#pragma nowarn(262) // warning elimination
short ExFirstNTcb::work()
{
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExFirstNPrivateState *pstate = (ExFirstNPrivateState*) pentry_down->pstate;
const ex_queue::down_request & request = pentry_down->downState.request;
while (1) // exit via return
{
switch (step_)
{
case INITIAL_:
{
if (qchild_.down->isFull())
return WORK_OK;
ex_queue_entry * centry = qchild_.down->getTailEntry();
// effectiveFirstN_ is set to a positive number
// if FIRST N rows are requested.
// It is set to 0 or a negative number, if last N rows are needed.
// 0 means process all but don't return any rows.
// -1 means get all rows. Should not reach this state.
// <-1 means return the last '-(N+2)' rows.
effectiveFirstN_ = firstnTdb().firstNRows();
returnedSoFar_ = 0;
if (firstnTdb().firstNRowsExpr_)
{
ex_expr::exp_return_type evalRetCode =
firstnTdb().firstNRowsExpr_->eval(pentry_down->getAtp(), workAtp_);
if (evalRetCode == ex_expr::EXPR_ERROR)
{
step_ = CANCEL_;
break;
}
effectiveFirstN_ = firstNParamVal_;
}
if (effectiveFirstN_ >= 0)
{
centry->downState.request = ex_queue::GET_N;
// if I got a GET_ALL request then send a GET_N request to
// my child with the N value being effectiveFirstN_.
// if I got a GET_N request, then send the minimum of the
// GET_N request value and effectiveFirstN_ to my child.
if ((pentry_down->downState.request != ex_queue::GET_N) ||
(pentry_down->downState.requestValue == effectiveFirstN_))
centry->downState.requestValue = effectiveFirstN_;
else
{
centry->downState.requestValue =
MINOF(pentry_down->downState.requestValue, effectiveFirstN_);
}
step_ = PROCESS_FIRSTN_;
}
else
{
// last N processing, retrieve all rows.
centry->downState.request = ex_queue::GET_ALL;
centry->downState.requestValue = 11;
requestedLastNRows_ = -(effectiveFirstN_ + 2);
returnedLastNRows_ = 0;
step_ = PROCESS_LASTN_;
}
centry->downState.parentIndex = qparent_.down->getHeadIndex();
centry->passAtp(pentry_down);
qchild_.down->insert();
}
break;
case PROCESS_FIRSTN_:
{
if ((qchild_.up->isEmpty()) ||
(qparent_.up->isFull()))
return WORK_OK;
ex_queue_entry * cUpEntry = qchild_.up->getHeadEntry();
ex_queue_entry * pUpEntry = qparent_.up->getTailEntry();
switch (cUpEntry->upState.status)
{
case ex_queue::Q_OK_MMORE:
{
if (returnedSoFar_ < effectiveFirstN_)
{
moveChildDataToParent();
returnedSoFar_++;
}
else
{
// looks like the child may not honor our
// GET_N request, so send a cancel, maybe
// that will work better
qchild_.down->cancelRequest();
step_ = CANCEL_;
}
}
break;
case ex_queue::Q_NO_DATA:
{
moveChildDataToParent();
qparent_.down->removeHead();
step_ = DONE_;
}
break;
case ex_queue::Q_SQLERROR:
{
qchild_.down->cancelRequest();
moveChildDataToParent();
step_ = CANCEL_;
}
break;
case ex_queue::Q_INVALID:
{
ex_assert(0, "ExFirstNTcb::work() Invalid state return by child.");
}
break;
} // switch cUpEntry status
}
break;
case PROCESS_LASTN_:
{
if ((qchild_.up->isEmpty()) ||
(qparent_.up->isFull()))
return WORK_OK;
ex_queue_entry * cUpEntry = qchild_.up->getHeadEntry();
ex_queue_entry * pUpEntry = qparent_.up->getTailEntry();
switch (cUpEntry->upState.status)
{
case ex_queue::Q_OK_MMORE:
{
if (requestedLastNRows_ == 0) // last 0
{
// ignore any upcoming rows.
qchild_.up->removeHead();
}
else if (requestedLastNRows_ == 1) // last 1
{
// We know that current entry is Q_OK_MMORE.
// Need atleast 1 more entry than requested to process
// last N. Note that there is a small chance that this
// will lead to a buffer deadlock (child's buffer pool
// is full, child expects us to consume a row before it
// can produce another one).
if (qchild_.up->getLength() < 1 + 1)
return WORK_OK;
queue_index headIndex = qchild_.up->getHeadIndex();
ex_queue_entry * nextCupEntry =
qchild_.up->getQueueEntry(headIndex + 1);
switch (nextCupEntry->upState.status)
{
case ex_queue::Q_NO_DATA:
{
// if the next entry is Q_NO_DATA, return the
// current child head entry. Its the last row.
// This call will also remove the child head entry.
moveChildDataToParent();
}
break;
case ex_queue::Q_SQLERROR:
case ex_queue::Q_OK_MMORE:
{
// just remove the current head entry. We don't
// need it.
qchild_.up->removeHead();
}
break;
case ex_queue::Q_INVALID:
{
ex_assert(0, "ExFirstNTcb::work() Invalid state return by child.");
}
break;
} // switch
}
else
{
// not supported.
ex_assert(0, "ExFirstNTcb::work(): only last 0 and last 1 supported.");
}
}
break;
case ex_queue::Q_SQLERROR:
{
qchild_.down->cancelRequest();
moveChildDataToParent();
step_ = CANCEL_;
}
break;
case ex_queue::Q_NO_DATA:
{
moveChildDataToParent();
if (cUpEntry->upState.status == ex_queue::Q_NO_DATA)
{
qparent_.down->removeHead();
step_ = DONE_;
}
}
break;
case ex_queue::Q_INVALID:
{
ex_assert(0, "ExFirstNTcb::work() Invalid state return by child.");
}
break;
} // switch cUpEntry status
}
break;
case DONE_:
{
step_ = INITIAL_;
return WORK_CALL_AGAIN;
}
break;
case CANCEL_:
{
// ignore all up rows from child. wait for Q_NO_DATA.
if (qchild_.up->isEmpty())
return WORK_OK;
ex_queue_entry * cUpEntry = qchild_.up->getHeadEntry();
ex_queue_entry * pUpEntry = qparent_.up->getTailEntry();
switch(cUpEntry->upState.status)
{
case ex_queue::Q_OK_MMORE:
case ex_queue::Q_SQLERROR:
{
qchild_.up->removeHead();
}
break;
case ex_queue::Q_NO_DATA:
{
moveChildDataToParent();
qparent_.down->removeHead();
step_ = DONE_;
}
break;
case ex_queue::Q_INVALID:
{
ex_assert(0, "ExFirstNTcb::work() Invalid state returned by child");
}; break;
}
}
break;
case ERROR_:
{
}
break;
} // switch
} // while
return 0;
}
#pragma warn(262) // warning elimination
short ExFirstNTcb::cancel()
{
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExFirstNPrivateState *pstate = (ExFirstNPrivateState*) pentry_down->pstate;
if (pentry_down->downState.request == ex_queue::GET_NOMORE)
{
step_ = CANCEL_;
qchild_.down->cancelRequest();
}
return WORK_OK;
}
///////////////////////////////////////////////////////////////////////////////
// Constructor and destructor for sort_private_state
///////////////////////////////////////////////////////////////////////////////
ExFirstNPrivateState::ExFirstNPrivateState(const ExFirstNTcb * tcb)
{
}
ExFirstNPrivateState::~ExFirstNPrivateState()
{
};
ex_tcb_private_state * ExFirstNPrivateState::allocate_new(const ex_tcb *tcb)
{
return new(((ex_tcb *)tcb)->getSpace()) ExFirstNPrivateState((ExFirstNTcb *) tcb);
};