blob: 9458c1919c0a1f013d36ad507f72aa07dae4bb19 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: $File$
* RCS: $Id: exsequence.cpp,v 1.1 2006/11/01 01:44:14 Exp $
* Description: Class declarations for ExSequence
* Created:
* Modified: $Date: 2006/11/01 01:44:14 $
* Language: C++
* Status: $State: Exp $
*
*
*
*
******************************************************************************
*/
#include "ex_stdh.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ExSequence.h"
#include "ex_expr.h"
#include "ExpSequenceFunction.h"
#include "ExSimpleSqlBuffer.h"
#include "ExStats.h"
#include "memorymonitor.h"
#include "logmxevent.h"
char *GetHistoryRowOLAP(void *data, Int32 n,
NABoolean leading, Lng32 winSize, Int32 &retcode)
{
ExSequenceTcb *tcb = (ExSequenceTcb*)data;
retcode = 0;
if(!leading)
retcode = -2;
// For OLAP functions, when fetching the row from the history
// buffer, it is an error if:
// - The index is negative (a future row, not supported yet)
// - The history buffer is too small. This can only happen if
// the buffer is full and the index is beyond the size of the
// buffer. It is not an error if the index is greater than
// the buffer size, but the buffer is not yet full. Until the
// buffer is full, we get the right answer. For example, if
// we were using a history buffer of size 1000 and a moving
// window of size 4000. If the source data only had 100 row,
// then we would get the right answer. No need to raise an error
// in this case.
//
if (n < 0) {
// this path in the code is a safe garde and may not be hit
// maybe we can chage to assert or may be we can remove in the future
retcode = -1; // ERROR condition
return NULL;
} else if (n >= tcb->maxNumberHistoryRows_ &&
tcb->numberHistoryRows_ == tcb->maxNumberHistoryRows_) {
retcode = -1; // ERROR condition
return NULL;
} else if (n >= tcb->numberHistoryRows_) {
if(leading || (n - tcb->numberHistoryRows_) >= winSize)
retcode = 0;
return NULL;
}
if ( n <= tcb->currentHistRowInOLAPBuffer_) // we're within the current buffer
{
return tcb->currentHistRowPtr_ - n * tcb->recLen() ; // offset back from first
}
// search the previous buffers
HashBuffer * tmpBuf = tcb->currentOLAPBuffer_->getPrev();
if (tmpBuf == NULL)
{
tmpBuf = tcb->lastOLAPBuffer_;
}
n -= tcb->currentHistRowInOLAPBuffer_; // make n relative to buffer top, not to firstHistoryRow
while ( n > tcb->maxRowsInOLAPBuffer_ ) { // keep shifting one buffer back at a time
n -= tcb->maxRowsInOLAPBuffer_ ;
tmpBuf = tmpBuf->getPrev();
if (tmpBuf == NULL)
{
tmpBuf = tcb->lastOLAPBuffer_;
}
}
return tmpBuf->getFirstRow() + ( tcb->maxRowsInOLAPBuffer_ - n ) * tcb->recLen();
};
char *GetHistoryRowFollowingOLAP(void *data, Int32 n,
NABoolean leading, Lng32 winSize, Int32 &retcode)
{
ExSequenceTcb *tcb = (ExSequenceTcb*)data;
// flip the sign of n for now. The logic to handle negative offsets should be done
// in GetHistoryRowPrecedingOLAP() (aka GetHistoryRowOLAP()).
if ( n < 0 )
n = -n;
retcode = 0;
if(!leading)
retcode = -2;
if (winSize !=0)
{
if (n == INT_MAX && tcb->isUnboundedFollowing())
{
return tcb->lastRow_;
}
if (n >= tcb->numFollowingRows())
{
if(!leading || (n - tcb->numFollowingRows()) >= winSize && n != INT_MAX)
{
retcode = 0;
return NULL;
} else
n = tcb->numFollowingRows();
}
}
else
{
if (n > tcb->numFollowingRows())
{
retcode = -3;
return NULL;
}
}
n += tcb->currentRetHistRowInOLAPBuffer_; // n become absolute positive offset (i.e. not relative)
if ( n < tcb->maxRowsInOLAPBuffer_ ) // we're within the current buffer
return tcb->currentRetOLAPBuffer_->getFirstRow() + n * tcb->recLen() ;
// search the following buffers
HashBuffer * tmpBuf = tcb->currentRetOLAPBuffer_;
do {
n -= tcb->maxRowsInOLAPBuffer_ ; // shift one buffer back
tmpBuf = tmpBuf->getNext();
if (tmpBuf == NULL)
{
tmpBuf = tcb->firstOLAPBuffer_;
}
} while ( n >= tcb->maxRowsInOLAPBuffer_ ) ;
return tmpBuf->getFirstRow() + n * tcb->recLen();
};
char *GetHistoryRow(void *data, Int32 n,
NABoolean leading, Lng32 winSize, Int32 &retcode)
{
ExSequenceTcb *tcb = (ExSequenceTcb*)data;
retcode = 0;
if (n < 0) return NULL;
else if (n >= tcb->numberHistoryRows_) return NULL;
if ( n <= tcb->currentHistRowInOLAPBuffer_) // we're within the current buffer
{
return tcb->currentHistRowPtr_ - n * tcb->recLen() ; // offset back from first
}
// this code is used for the legacy sequence functions-- Not sure whether to add a test
// for this as the sequence functions are supposed to be replaced by OLAP functions
// I am hiding this code from code coverage tool for now.
// search the previous buffers
HashBuffer * tmpBuf = tcb->currentOLAPBuffer_->getPrev();
if (tmpBuf == NULL)
{
tmpBuf = tcb->lastOLAPBuffer_;
}
n -= tcb->currentHistRowInOLAPBuffer_; // make n relative to buffer top, not to firstHistoryRow
while ( n > tcb->maxRowsInOLAPBuffer_ ) { // keep shifting one buffer back at a time
n -= tcb->maxRowsInOLAPBuffer_ ;
tmpBuf = tmpBuf->getPrev();
if (tmpBuf == NULL)
{
tmpBuf = tcb->lastOLAPBuffer_;
}
}
return tmpBuf->getFirstRow() + ( tcb->maxRowsInOLAPBuffer_ - n ) * tcb->recLen();
};
//
// build - construct the TCB subtree for this TDB.
//
ex_tcb * ExSequenceTdb::build(ex_globals * glob)
{
// first build the child
ex_tcb * child_tcb;
ExSequenceTcb *sfTcb;
child_tcb = tdbChild_->build(glob);
sfTcb = new(glob->getSpace()) ExSequenceTcb(*this, *child_tcb, glob);
sfTcb->registerSubtasks();
sfTcb->registerResizeSubtasks();
return (sfTcb);
}
// ExSequenceTcb constructor
//
// 1. Allocate buffer pool.
// 2. Allocate parent queues and initialize private state.
// 3. Fixup expressions.
//
ExSequenceTcb::ExSequenceTcb (const ExSequenceTdb & myTdb,
const ex_tcb & child_tcb,
ex_globals * glob) :
ex_tcb(myTdb, 1, glob),
lastRow_(NULL),
clusterDb_(NULL),
cluster_(NULL),
ioEventHandler_(NULL),
OLAPBuffersFlushed_(FALSE),
firstOLAPBuffer_(NULL),
lastOLAPBuffer_(NULL),
rc_(EXE_OK),
olapBufferSize_(0),
maxNumberOfOLAPBuffers_(0),
numberOfOLAPBuffers_(0),
minNumberOfOLAPBuffers_(0),
memoryPressureDetected_(FALSE)
{
Space * space = (glob ? glob->getSpace() : 0);
CollHeap * heap = (glob ? glob->getDefaultHeap() : 0);
heap_ = heap;
childTcb_ = &child_tcb;
// Allocate the buffer pool
pool_ = new(space) sql_buffer_pool(myTdb.numBuffers_,
myTdb.bufferSize_,
space);
allocRowLength_ = ROUND8(myTdb.recLen_);
// Initialize the machinery for maintaining the row history for
// computing sequence functions.
//
maxNumberHistoryRows_ = myTdb.maxHistoryRows_;
minFollowing_ = myTdb.minFollowing_;
unboundedFollowing_ = myTdb.isUnboundedFollowing();
maxNumberOfOLAPBuffers_ = myTdb.maxNumberOfOLAPBuffers_;//testing
olapBufferSize_ = myTdb.OLAPBufferSize_ ;
maxRowsInOLAPBuffer_ = myTdb.maxRowsInOLAPBuffer_;
minNumberOfOLAPBuffers_ = myTdb.minNumberOfOLAPBuffers_;
numberOfWinOLAPBuffers_ = myTdb.numberOfWinOLAPBuffers_;
overflowEnabled_ = ! myTdb.isNoOverflow();
ex_assert( maxNumberOfOLAPBuffers_ >= minNumberOfOLAPBuffers_ ,
"maxNumberOfOLAPBuffers is too small");
// Initialize history parameters
// For unbounded following -- also create/initialize clusterDb, cluster
initializeHistory();
// get the queue that child use to communicate with me
qchild_ = child_tcb.getParentQueue();
// Allocate the queue to communicate with parent
qparent_.down = new(space) ex_queue(ex_queue::DOWN_QUEUE,
myTdb.initialQueueSizeDown_,
myTdb.criDescDown_,
space);
// Allocate the private state in each entry of the down queue
ExSequencePrivateState *p
= new(space) ExSequencePrivateState(this);
qparent_.down->allocatePstate(p, this);
delete p;
qparent_.up = new(space) ex_queue(ex_queue::UP_QUEUE,
myTdb.initialQueueSizeUp_,
myTdb.criDescUp_,
space);
// Intialized processedInputs_ to the next request to process
processedInputs_ = qparent_.down->getTailIndex();
workAtp_ = allocateAtp(myTdb.criDescUp_, space);
// Fixup the sequence function expression. This requires the standard
// expression fixup plus initializing the GetRow method for the sequence
// clauses.
//
if (sequenceExpr())
{
((ExpSequenceExpression*)sequenceExpr())->seqFixup
((void*)this, GetHistoryRow, GetHistoryRowOLAP);
sequenceExpr()->fixup(0, getExpressionMode(), this, space, heap_, FALSE, glob);
}
if (returnExpr())
{
((ExpSequenceExpression*)returnExpr())->seqFixup
((void*)this, GetHistoryRow, GetHistoryRowFollowingOLAP);
returnExpr()->fixup(0, getExpressionMode(), this, space, heap_, FALSE, glob);
}
if (postPred())
postPred()->fixup(0, getExpressionMode(), this, space, heap_, FALSE, glob);
if (cancelExpr())
cancelExpr()->fixup(0, getExpressionMode(), this, space, heap_, FALSE, glob);
if (checkPartitionChangeExpr())
{
((ExpSequenceExpression*)checkPartitionChangeExpr())->seqFixup
((void*)this, GetHistoryRow, GetHistoryRowOLAP);
checkPartitionChangeExpr()->fixup(0, getExpressionMode(), this, space, heap_, FALSE, glob);
}
workAtp_->getTupp(myTdb.tuppIndex_) = new(space) tupp_descriptor;
}
// Destructor
//
//
ExSequenceTcb::~ExSequenceTcb()
{
if(qparent_.up) delete qparent_.up;
qparent_.up = NULL;
if(qparent_.down) delete qparent_.down;
qparent_.down = NULL;
if(pool_) delete pool_;
pool_ = NULL;
freeResources();
}
void ExSequenceTcb::registerSubtasks()
{
ExScheduler *sched = getGlobals()->getScheduler();
ex_tcb :: registerSubtasks();
// Regsiter the I/O event, if overflow is possible
if ( isUnboundedFollowing() ) {
ioEventHandler_ = sched->registerNonQueueSubtask(sWork,this);
// work around: The handler was just created, while clusterDb was created
// earlier (in the TCB ctor), so update clusterDb now with the handler
ex_assert( clusterDb_ , "Unlimited following and no clusterDb_") ;
clusterDb_->ioEventHandler_ = ioEventHandler_ ;
}
};
// Free Resources
//
//
void ExSequenceTcb::freeResources()
{
if (cluster_) delete cluster_;
cluster_ = NULL;
if (clusterDb_) delete clusterDb_;
clusterDb_ = NULL;
if (lastRow_&&heap_) heap_->deallocateMemory(lastRow_);
lastRow_ = NULL;
while ( firstOLAPBuffer_ ) {
HashBuffer * temp = firstOLAPBuffer_;
firstOLAPBuffer_ = firstOLAPBuffer_->getNext();
NADELETEBASIC( temp ,getHeap());
}
firstOLAPBuffer_ = NULL;
lastOLAPBuffer_ = NULL;
workAtp_->getTupp(myTdb().tuppIndex_).release();
}
// work - doit...
//
//
short ExSequenceTcb::work()
{
// If there are no parent requests on the queue, then there cannot
// be anything to do here.
//
if (qparent_.down->isEmpty())
return WORK_OK;
ex_queue_entry * pentry_down;
ExSequencePrivateState * pstate;
ex_queue::down_request request;
// Take any new parent requests and pass them on to the child as long
// as the child's queue is not full. processedInputs_ maintains the
// Queue index of the last request that was passed on.
//
for(queue_index tail = qparent_.down->getTailIndex();
(processedInputs_ != tail) && (!qchild_.down->isFull());
processedInputs_++ )
{
pentry_down = qparent_.down->getQueueEntry(processedInputs_);
pstate = (ExSequencePrivateState*) pentry_down->pstate;
request = pentry_down->downState.request;
// If the request has already been cancelled don't pass it to the
// child. Instead, just mark the request as done. This will trigger
// a EOD reply when this request gets worked on.
//
if (request == ex_queue::GET_NOMORE)
{
pstate->step_ = ExSeq_DONE;
}
else
{
pstate->step_ = ExSeq_WORKING_READ;
// Pass the request to the child
//
ex_queue_entry * centry = qchild_.down->getTailEntry();
centry->downState.request = ex_queue::GET_ALL;
centry->downState.requestValue = 11;
centry->downState.parentIndex = processedInputs_;
centry->passAtp(pentry_down);
qchild_.down->insert();
}
} // end for processedInputs_
pentry_down = qparent_.down->getHeadEntry();
pstate = (ExSequencePrivateState*) pentry_down->pstate;
request = pentry_down->downState.request;
workAtp_->copyPartialAtp(pentry_down->getAtp(),0,
MINOF(myTdb().tuppIndex_,
pentry_down->numTuples())-1);
// copy temp and input tupps. Last tupp history row
// Take any child replies and process them. Return the processed
// rows as long the parent queue has room.
//
while (1)
{
// If we have satisfied the parent request (or it was cancelled),
// then stop processing rows, cancel any outstanding child
// requests, and set this request to the CANCELLED state.
//
if ((pstate->step_ == ExSeq_WORKING_READ) ||
(pstate->step_ == ExSeq_WORKING_RETURN))
{
if ((request == ex_queue::GET_NOMORE) ||
((request == ex_queue::GET_N) &&
(pentry_down->downState.requestValue
<= (Lng32)pstate->matchCount_)))
{
qchild_.down->cancelRequestWithParentIndex
(qparent_.down->getHeadIndex());
pstate->step_ = ExSeq_CANCELLED;
}
}
switch (pstate->step_)
{
// ExSeq_CANCELLED
//
// Transition to this state from ...
// 1. ExSeq_Error - After the error has been processed.
// 2. ExSeq_Working - If enough rows have been returned.
// 3. ExSeq_Working - If the request was cancelled.
//
// Remain in this state until ..
// 1. All rows from the child including EOD are consumed
//
// Transition from this state to ...
// 1. ExSeq_DONE - In all cases.
//
case ExSeq_CANCELLED:
{
// There are no extra rows to process from the child yet,
// so try again later.
//
if (qchild_.up->isEmpty())
{
return WORK_OK;
}
ex_queue_entry * centry = qchild_.up->getHeadEntry();
ex_queue::up_status child_status = centry->upState.status;
// If this is the EOD, transition to the ExSeq_DONE state.
//
if (child_status == ex_queue::Q_NO_DATA)
pstate->step_ = ExSeq_DONE;
// Discard the child row.
qchild_.up->removeHead();
break;
}
// ExSeq_ERROR
//
// Transition to this state from ...
// 1. ExSeq_WORKING_READ - a child reply with the type SQLERROR.
// 2. ExSeq_WORKING_RETURN
// 3. ExSeq_OVERFLOW_READ
// 4. ExSeq_OVERFLOW_WRITE
// Remain in this state until ..
// 1. The error row has been returned to the parent.
//
// Transition from this state to ...
// 1. ExSeq_CANCELLED - In all cases.
//
case ExSeq_ERROR:
{
// If there is no room in the parent queue for the reply,
// try again later.
//
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry *pentry_up = qparent_.up->getTailEntry();
// Cancel the child request - there must be a child request in
// progress to get to the ExSeq_ERROR state.
//
qchild_.down->cancelRequestWithParentIndex
(qparent_.down->getHeadIndex());
// Construct and return the error row.
//
if (workAtp_->getDiagsArea()) {
ComDiagsArea * da = workAtp_->getDiagsArea();
pentry_up->setDiagsArea(da);
da->incrRefCount();
workAtp_->setDiagsArea(0);
}
pentry_up->upState.status = ex_queue::Q_SQLERROR;
pentry_up->upState.parentIndex
= pentry_down->downState.parentIndex;
pentry_up->upState.downIndex = qparent_.down->getHeadIndex();
pentry_up->upState.setMatchNo(pstate->matchCount_);
qparent_.up->insert();
// Transition to the ExSeq_CANCELLED state.
//
pstate->step_ = ExSeq_CANCELLED;
break;
}
// ExSeq_WORKING_READ
//
// Transition to this state from ...
// 1. ExSeq_EMPTY - If a request is started.
// 2. ExSeq_WORKING_RETURN -
// 3. ExSeq_OVERFLOW_WRITE -
// Remain in this state until ...
// 1. All child replies including EOD have been processed.
// 2. A SQLERROR row is received.
// 3. Enough rows have been returned.
// 4. The request is cancelled.
// 5. End of partition is reached
// Transition from this state to ...
// 2. ExSeq_ERROR - If an SQLERROR rows is received.
// 3. ExSeq_CANCELLED - If the request is cancelled.
// 4. ExSeq_WORKING_RETURN
// 5. ExSeq_OVERFLOW_WRITE -
case ExSeq_WORKING_READ:
{
if(!isUnboundedFollowing() && isHistoryFull()) {
pstate->step_ = ExSeq_WORKING_RETURN;
break;
}
// If there are no replies, try again later.
//
if (qchild_.up->isEmpty())
return WORK_OK;
ex_queue_entry * centry = qchild_.up->getHeadEntry();
switch (centry->upState.status)
{
// A data row from the child.
//
case ex_queue::Q_OK_MMORE:
{
if ( checkPartitionChangeExpr() &&
currentHistRowPtr_)
{
workAtp_->getTupp
(myTdb().tuppIndex_).setDataPointer(currentHistRowPtr_);
// Check whether the partition changed
ex_expr::exp_return_type retCode = checkPartitionChangeExpr()->eval(workAtp_, centry->getAtp());
if (retCode == ex_expr::EXPR_ERROR)
{
updateDiagsArea(centry);
pstate->step_ = ExSeq_ERROR;
break;
}
if ( retCode == ex_expr::EXPR_FALSE)
{
setPartitionEnd(TRUE);
pstate->step_ = ExSeq_END_OF_PARTITION;
break;
}
}
if (isUnboundedFollowing() )
{
if (OLAPBuffersFlushed_)
{
OLAPBuffersFlushed_ = FALSE;// current row is the first one in first buffer already
}
else
{
NABoolean noMemory =
advanceHistoryRow( TRUE /* checkMemoryPressure */);
if (noMemory)
{
pstate->step_ = ExSeq_OVERFLOW_WRITE;
cluster_->nextBufferToFlush_ = firstOLAPBuffer_;
cluster_->afterLastBufferToFlush_ = NULL;//flush them all
// If it is the first overflow, for this partition
if ( ! memoryPressureDetected_ ) {
memoryPressureDetected_ = TRUE;
} // memory pressure detected
break;
}
}
}
else
{
advanceHistoryRow();
}
workAtp_->getTupp
(myTdb().tuppIndex_).setDataPointer(currentHistRowPtr_);
ex_expr::exp_return_type retCode = ex_expr::EXPR_OK;
// Apply the read phase sequence function expression to compute
// the values of the sequence functions.
if (sequenceExpr())
{
retCode = sequenceExpr()->eval(workAtp_, centry->getAtp());
if (retCode == ex_expr::EXPR_ERROR)
{
updateDiagsArea(centry);
pstate->step_ = ExSeq_ERROR;
break;
}
}
// merge the child's diags area into the work atp
updateDiagsArea(centry);
qchild_.up->removeHead();
break;
}
// The EOD from the child. Transition to ExSeq_DONE.
//
case ex_queue::Q_NO_DATA:
{
setPartitionEnd(TRUE);
if (isHistoryEmpty())
{
pstate->step_ = ExSeq_DONE;
qchild_.up->removeHead();
}
else
{
pstate->step_ = ExSeq_END_OF_PARTITION;
}
}
break;
// An SQLERROR from the child. Transition to ExSeq_ERROR.
//
case ex_queue::Q_SQLERROR:
updateDiagsArea(centry);
pstate->step_ = ExSeq_ERROR;
break;
}
}
break;
// ExSeq_WORKING_RETURN
//
// Transition to this state from ...
// 1. ExSeq_WORKING_READ -
// 2. ExSeq_OVERFLOW_READ -
// 3. ExSeq_END_OF_PARTITION -
// Remain in this state until ...
// 1. All rows are returned.
// 2. A SQLERROR row is received.
// 3. Enough rows have been returned.
//
// Transition from this state to ...
// 1. ExSeq_DONE - If all the child rows including EOD have
// been processed.
// 2. ExSeq_ERROR - If an SQLERROR rows is received.
// 3. ExSeq_CANCELLED - If enough rows have been returned.
// 4. ExSeq_CANCELLED - If the request is cancelled.
// 5. ExSeq_WORKING_RETURN
// 6. ExSeq_DONE
// 7. ExSeq_OVERFLOW_READ
case ExSeq_WORKING_RETURN:
{
// If there is not room in the parent Queue for the reply,
// try again later.
//
if (qparent_.up->isFull())
return WORK_OK;
if(isHistoryEmpty())
{
ex_queue_entry * centry = NULL;
if(!qchild_.up->isEmpty())
{
centry = qchild_.up->getHeadEntry();
}
if(centry && (centry->upState.status == ex_queue::Q_NO_DATA))
{
pstate->step_ = ExSeq_DONE;
qchild_.up->removeHead();
}
else
{
pstate->step_ = ExSeq_WORKING_READ;
if (getPartitionEnd())
{
initializeHistory();
}
}
break;
}
if(!canReturnRows() &&
!getPartitionEnd() &&
!isUnboundedFollowing() &&
!isOverflowStarted()) // redundant? because not unbounded ...
{
pstate->step_ = ExSeq_WORKING_READ;
break;
}
ex_queue_entry * pentry_up = qparent_.up->getTailEntry();
pentry_up->copyAtp(pentry_down);
// Try to allocate a tupp.
//
if (pool_->get_free_tuple(pentry_up->getTupp(myTdb().tuppIndex_),
recLen()))
return WORK_POOL_BLOCKED;
char *tuppData = pentry_up->getTupp
(myTdb().tuppIndex_).getDataPointer();
advanceReturnHistoryRow();
char *histData = currentRetHistRowPtr_;
pentry_up->getTupp
(myTdb().tuppIndex_).setDataPointer(histData);
ex_expr::exp_return_type retCode = ex_expr::EXPR_OK;
// Apply the return phase expression
if(returnExpr())
{
retCode = returnExpr()->eval(pentry_up->getAtp(),workAtp_);
if (retCode == ex_expr::EXPR_ERROR)
{
pstate->step_ = ExSeq_ERROR;
break;
}
}
retCode = ex_expr::EXPR_OK;
//Apply post predicate expression
if (postPred())
{
retCode = postPred()->eval(pentry_up->getAtp(),pentry_up->getAtp());
if (retCode == ex_expr::EXPR_ERROR)
{
pstate->step_ = ExSeq_ERROR;
break;
}
}
//pentry_up->getAtp()->display("return eval result", myTdb().getCriDescUp());
//
// Case-10-030724-7963: we are done pointing the tupp at the
// history buffer, so point it back to the SQL buffer.
//
pentry_up->getTupp
(myTdb().tuppIndex_).setDataPointer(tuppData);
switch(retCode) {
case ex_expr::EXPR_OK:
case ex_expr::EXPR_TRUE:
case ex_expr::EXPR_NULL:
// Copy the row that was computed in the history buffer,
// to the space previously allocated in the SQL buffer.
str_cpy_all(tuppData, histData, recLen());
// Return the processed row.
//
// Finalize the queue entry, then insert it
//
pentry_up->upState.status = ex_queue::Q_OK_MMORE;
pentry_up->upState.parentIndex
= pentry_down->downState.parentIndex;
pentry_up->upState.downIndex =
qparent_.down->getHeadIndex();
pstate->matchCount_++;
pentry_up->upState.setMatchNo(pstate->matchCount_);
qparent_.up->insert();
break;
// If the selection predicate returns FALSE,
// do not return the child row.
//
case ex_expr::EXPR_FALSE:
break;
// If the selection predicate returns an ERROR,
// go to the error processing state.
//
case ex_expr::EXPR_ERROR:
pstate->step_ = ExSeq_ERROR;
break;
}
// MV --
// Now, if there are no errors so far, evaluate the
// cancel expression
if ((pstate->step_ != ExSeq_ERROR) && cancelExpr())
{
// Temporarily point the tupp to the tail of the
// history buffer for evaluating the
// expressions.
//
pentry_up->getTupp
(myTdb().tuppIndex_).setDataPointer(histData);
retCode =
cancelExpr()->eval(pentry_up->getAtp(),pentry_up->getAtp());
// We are done pointing the tupp at the history
// buffer, so point it back to the SQL buffer.
//
pentry_up->getTupp
(myTdb().tuppIndex_).setDataPointer(tuppData);
if (retCode == ex_expr::EXPR_TRUE)
{
qchild_.down->cancelRequestWithParentIndex
(qparent_.down->getHeadIndex());
pstate->step_ = ExSeq_CANCELLED;
}
}
updateHistRowsToReturn();
if ( isOverflowStarted() )
{
numberOfRowsReturnedBeforeReadOF_ ++;
if (numberOfRowsReturnedBeforeReadOF_ == maxNumberOfRowsReturnedBeforeReadOF_)
{
firstOLAPBufferFromOF_ = currentRetOLAPBuffer_->getNext();
if (firstOLAPBufferFromOF_ == NULL)
{
firstOLAPBufferFromOF_ = firstOLAPBuffer_;
}
for( Int32 i = 0; i < numberOfWinOLAPBuffers_; i++)
{
firstOLAPBufferFromOF_ = firstOLAPBufferFromOF_->getNext();
if (firstOLAPBufferFromOF_ == NULL)
{
firstOLAPBufferFromOF_ = firstOLAPBuffer_;
}
}
numberOfOLAPBuffersFromOF_ = numberOfOLAPBuffers_ - numberOfWinOLAPBuffers_;
cluster_->nextBufferToRead_ = firstOLAPBufferFromOF_;
HashBuffer * afterLast = firstOLAPBufferFromOF_;
// last buffer to read into is the current buffer - maybe ?
for ( Lng32 bufcount = numberOfOLAPBuffersFromOF_ ;
bufcount ;
bufcount-- ) {
afterLast = afterLast->getNext() ;
// Don't cycle back if bufcount == 1 because the logic in
// Cluster::read relies on the NULL ptr to stop reading
if ( bufcount > 1 && ! afterLast )
afterLast = firstOLAPBuffer_; // cycle back
}
// The last buffer to read to is always the current buffer
// ex_assert ( afterLast == currentRetOLAPBuffer_->getNext(),
// "Miscalculated the last buffer to read into");
cluster_->afterLastBufferToRead_ = afterLast;
pstate->step_ = ExSeq_OVERFLOW_READ;
}
}
}
break;
// ExSeq_END_OF_PARTITION
//
// Transition to this state from ...
// 1. ExSeq_WORKING_READ -
// Transition from this state to ...
// 1. ExSeq_OVERFLOW_WRITE
// 2. ExSeq_WORKING_RETURN
case ExSeq_END_OF_PARTITION:
{
setPartitionEnd(TRUE);
if (lastRow_ && isUnboundedFollowing())
{
ex_assert(currentHistRowPtr_ != NULL, "ExSequenceTcb::work() - currentHistRowPtr_ is a NULL pointer");
str_cpy_all(lastRow_, currentHistRowPtr_, recLen());
}
if ( isOverflowStarted() ) // we are overflowing
{
cluster_->nextBufferToFlush_ = firstOLAPBuffer_;
// do not flush beyond the current buffer
cluster_->afterLastBufferToFlush_ = currentOLAPBuffer_->getNext();
pstate->step_ = ExSeq_OVERFLOW_WRITE;
}
else
{
pstate->step_ = ExSeq_WORKING_RETURN;
}
}
break;
// ExSeq_OVERFLOW_WRITE
//
// Transition to this state from ...
// 1. ExSeq_WORKING_READ -
// 2. ExSeq_END_OF_PARTITION -
// Remain in this state until ...
// 1. OLAPbuffers are written to oveflow space.
// 2. An error occurs
//
// Transition from this state to ...
// 1. ExSeq_OVERFLOW_READ
// 2. ExSeq_ERROR - If an error occurs
case ExSeq_OVERFLOW_WRITE:
{
if (!overflowEnabled_)
{
// used for debugging when CmpCommon::getDefault(EXE_BMO_DISABLE_OVERFLOW)is set to off ;
updateDiagsArea(EXE_OLAP_OVERFLOW_NOT_SUPPORTED);
pstate->step_ = ExSeq_ERROR;
break;
}
ex_assert(isUnboundedFollowing(),"");
if ( ! cluster_->flush(&rc_) ) { // flush the buffers
// if no errors this code path is not visited
if ( rc_ )
{ // some error
updateDiagsArea( rc_);
pstate->step_ = ExSeq_ERROR;
break;
}
// not all the buffers are completely flushed. An I/O is pending
// maybe we cane remove in the future
return WORK_OK;
}
// At this point -- all the buffers were completely flushed
OLAPBuffersFlushed_ = TRUE;
if (getPartitionEnd())
{
firstOLAPBufferFromOF_ = firstOLAPBuffer_;
numberOfOLAPBuffersFromOF_ = numberOfOLAPBuffers_;
cluster_->nextBufferToRead_ = firstOLAPBufferFromOF_;
// First time we read and fill all the buffers
cluster_->afterLastBufferToRead_ = NULL;
pstate->step_ = ExSeq_OVERFLOW_READ;
}
else
{
pstate->step_ = ExSeq_WORKING_READ;
}
}
break;
// ExSeq_OVERFLOW_READ
//
// Transition to this state from ...
// 1. ExSeq_OVERFLOW_WRITE
// 2. ExSeq_WORKING_RETURN
// Remain in this state until ...
// 1. OLAPbuffers are read from oveflow space.
// 2. An error occurs
//
// Transition from this state to ...
// 1. ExSeq_WORKING_RETURN
// 2. ExSeq_ERROR - If an error occurs
case ExSeq_OVERFLOW_READ:
{
assert(firstOLAPBufferFromOF_ &&
isUnboundedFollowing() );
if ( ! cluster_->read(&rc_) ) {
if ( rc_ ) { // some error
updateDiagsArea( rc_);
pstate->step_ = ExSeq_ERROR;
break;
}
// not all the buffers are completely read. An I/O is pending
return WORK_OK;
}
numberOfRowsReturnedBeforeReadOF_ = 0;
pstate->step_ = ExSeq_WORKING_RETURN;
}
break;
// ExSeq_DONE
//
// Transition to the state from ...
// 1. ExSeq_WORKING_RETURN - if all child rows have been processed.
// 2. ExSeq_CANCELLED - if all child rows have been consumed.
// 3. ExSeq_EMPTY - if the request was DOA.
//
// Remain in this state until ...
// 1. The EOD is returned to the parent.
//
// Transition from this state to ...
// 1. ExSeq_EMPTY - In all cases.
//
case ExSeq_DONE:
{
// If there is not any room in the parent's queue,
// try again later.
//
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry * pentry_up = qparent_.up->getTailEntry();
pentry_up->upState.status = ex_queue::Q_NO_DATA;
pentry_up->upState.parentIndex
= pentry_down->downState.parentIndex;
pentry_up->upState.downIndex = qparent_.down->getHeadIndex();
pentry_up->upState.setMatchNo(pstate->matchCount_);
workAtp_->releasePartialAtp(0,MINOF(myTdb().tuppIndex_,
pentry_down->numTuples())-1);
qparent_.down->removeHead();
qparent_.up->insert();
// Re-initialize pstate
//
pstate->step_ = ExSeq_EMPTY;
pstate->matchCount_ = 0;
// Initialize the history buffer in preparation for the
// next request.
//
initializeHistory();
// If there are no more requests, simply return.
//
if (qparent_.down->isEmpty())
return WORK_OK;
// If we haven't given to our child the new head
// index return and ask to be called again.
//
if (qparent_.down->getHeadIndex() == processedInputs_)
return WORK_CALL_AGAIN;
// Position at the new head of the request queue.
//
pentry_down = qparent_.down->getHeadEntry();
pstate = (ExSequencePrivateState*) pentry_down->pstate;
request = pentry_down->downState.request;
}
break;
} // switch pstate->step_
} // while
}
NABoolean ExSequenceTcb::advanceHistoryRow(NABoolean checkMemoryPressure)
{
histRowsToReturn_++;
if (currentOLAPBuffer_ == NULL)
{
currentOLAPBuffer_ = firstOLAPBuffer_;
ex_assert (currentHistRowPtr_ == NULL,"ExSequenceTcb::advanceHistoryRow() - currentHistRowPtr_ is not a NULL pointer");
currentHistRowPtr_ = currentOLAPBuffer_->getFirstRow() ;
currentHistRowInOLAPBuffer_=0;
}
else
{
currentHistRowInOLAPBuffer_++;
if ( currentHistRowInOLAPBuffer_ == maxRowsInOLAPBuffer_ )
{ // reached end of this buffer
currentHistRowInOLAPBuffer_ = 0;
if ( currentOLAPBuffer_->getNext() == NULL )
{ //last buffer
if ( canAllocateOLAPBuffer() &&
! isOverflowStarted() && // once overflowing - no more new buffers
addNewOLAPBuffer(checkMemoryPressure) )
{
currentOLAPBuffer_ = lastOLAPBuffer_;
currentHistRowPtr_ = currentOLAPBuffer_->getFirstRow();
}
else
{
currentOLAPBuffer_ = firstOLAPBuffer_;
currentHistRowPtr_ = currentOLAPBuffer_->getFirstRow();
return TRUE; // no memory -- start overflow
}
}
else
{
currentOLAPBuffer_ = currentOLAPBuffer_->getNext();
currentHistRowPtr_ = currentOLAPBuffer_->getFirstRow();
}
}
else
{
currentHistRowPtr_ = currentHistRowPtr_ + recLen();
}
}
if (numberHistoryRows_ < maxNumberHistoryRows_)
{
numberHistoryRows_++;
}
return FALSE;
};
void ExSequenceTcb::advanceReturnHistoryRow()
{
if (currentRetOLAPBuffer_ == NULL) //initially currentRetOLAPBuffer_ is null
{
ex_assert(firstOLAPBuffer_ !=NULL,"ExSequenceTcb::advanceReturnHistoryRow() - firstOLAPBuffer is a NULL pointer");
currentRetOLAPBuffer_ = firstOLAPBuffer_;
ex_assert(currentRetHistRowPtr_ == NULL,"ExSequenceTcb::advanceReturnHistoryRow() - currentRetHistRowPtr_ is not a NULL pointer");
currentRetHistRowPtr_ = currentRetOLAPBuffer_->getFirstRow();
currentRetHistRowInOLAPBuffer_ = 0;
return;
}
currentRetHistRowInOLAPBuffer_ ++;
if ( currentRetHistRowInOLAPBuffer_ == maxRowsInOLAPBuffer_ )
{ // reached end of this buffer
currentRetHistRowInOLAPBuffer_ = 0;
if ( currentRetOLAPBuffer_->getNext() == NULL )
{ //last buffer
currentRetOLAPBuffer_ = firstOLAPBuffer_;
currentRetHistRowPtr_ = currentRetOLAPBuffer_->getFirstRow();
}
else
{
currentRetOLAPBuffer_ = currentRetOLAPBuffer_->getNext();
currentRetHistRowPtr_ = currentRetOLAPBuffer_->getFirstRow();
}
}
else
{
currentRetHistRowPtr_ = currentRetHistRowPtr_ + recLen();
}
};
NABoolean ExSequenceTcb::removeOLAPBuffer()
{
if (lastOLAPBuffer_ == NULL || firstOLAPBuffer_ == NULL)
{
return FALSE;
}
if (lastOLAPBuffer_ == firstOLAPBuffer_)
{
// since we keep a minimum number of buffers this code won't be visited
// this code should not be removed. if we decide to not keep a min number
// of buffers in the list it will be used
NADELETEBASIC( lastOLAPBuffer_ ,heap_);
numberOfOLAPBuffers_ = 0;
maxNumberHistoryRows_ = myTdb().maxHistoryRows_;
lastOLAPBuffer_ = NULL;
firstOLAPBuffer_ = NULL;
return TRUE;
}
HashBuffer * tmpBuf = lastOLAPBuffer_;
lastOLAPBuffer_ = lastOLAPBuffer_->getPrev();
NADELETEBASIC( tmpBuf ,heap_);
lastOLAPBuffer_->setNext(NULL);
numberOfOLAPBuffers_ --;
maxNumberHistoryRows_ = numberOfOLAPBuffers_ * maxRowsInOLAPBuffer_;
return TRUE;
}
NABoolean ExSequenceTcb::shrinkOLAPBufferList()
{
while ( numberOfOLAPBuffers_ > minNumberOfOLAPBuffers_ )
{
if (! removeOLAPBuffer())
{
return FALSE;// error
}
};
return TRUE;
}
// Return FALSE if memory allocation failed.
NABoolean ExSequenceTcb::addNewOLAPBuffer(NABoolean checkMemoryPressure)
{
if ( checkMemoryPressure &&
! clusterDb_->enoughMemory(olapBufferSize_) )
return FALSE; // not enought memory (e.g., memory pressure)
HashBuffer *tmpBuf;
if ( cluster_ )
{
tmpBuf = new(heap_) HashBuffer(cluster_);
}
else
{
tmpBuf = new(heap_)
HashBuffer(olapBufferSize_, allocRowLength_, false, heap_, clusterDb_, &rc_);
}
if ( tmpBuf == NULL ||
tmpBuf->getDataPointer() == NULL )
{
return FALSE; // no memory
}
if (firstOLAPBuffer_== NULL)
{
firstOLAPBuffer_ = tmpBuf;
lastOLAPBuffer_ = firstOLAPBuffer_;
}
else
{
lastOLAPBuffer_->setNext(tmpBuf);
tmpBuf->setPrev(lastOLAPBuffer_) ;
lastOLAPBuffer_ = tmpBuf;
}
numberOfOLAPBuffers_++;
maxNumberHistoryRows_ = numberOfOLAPBuffers_ * maxRowsInOLAPBuffer_;
if(isUnboundedFollowing())
{
maxNumberOfRowsReturnedBeforeReadOF_ =
(numberOfOLAPBuffers_ - numberOfWinOLAPBuffers_) * maxRowsInOLAPBuffer_;
}
return TRUE; // no memory problems
}
void ExSequenceTcb::initializeHistory()
{
currentOLAPBuffer_ = NULL;
currentHistRowInOLAPBuffer_ = 0;
currentHistRowPtr_ = NULL;
currentRetOLAPBuffer_ = NULL;
currentRetHistRowInOLAPBuffer_ = 0;
currentRetHistRowPtr_ = NULL;
numberHistoryRows_ = 0;
histRowsToReturn_ = 0;
partitionEnd_ = FALSE;
// allocate the minimum number of olap buffers
if (firstOLAPBuffer_ == NULL)
{
for (Int32 i = 0 ; i < minNumberOfOLAPBuffers_; i++)
{
if ( ! addNewOLAPBuffer( FALSE /* No Memory Pressure Check */ ) )
ex_assert(0, "No memory for minimal OLAP window!");
}
}
else
{
if (!shrinkOLAPBufferList())
ex_assert(0,"initializeHistory-- can not shrink buffer list");
}
// Initialize all the settings needed for unbounded following (and overflow)
if ( isUnboundedFollowing() )
{
//last row in partition
if (lastRow_ == NULL)
{
lastRow_ = new(heap_) char[recLen()];
ex_assert( lastRow_ , "No memory available for OLAP Operator");
}
// initialize parameters needed for overflow handling
OLAPBuffersFlushed_ = FALSE;
firstOLAPBufferFromOF_ = NULL;
numberOfOLAPBuffersFromOF_ = 0;
memoryPressureDetected_ = FALSE;
numberOfRowsReturnedBeforeReadOF_ = 0;
createCluster();
// Cluster needs to know the first buffer in the OLAP list of buffers
// (Currently only used when reading buffers from overflow for
// "bounded following", where Cluster::read() may need to jump back to
// first buffer in the list.)
cluster_->firstBufferInList_ = firstOLAPBuffer_ ;
} // if ( isUnboundedFollowing() )
}
void ExSequenceTcb::createCluster()
{
MemoryMonitor * memMonitor =
getGlobals()->castToExExeStmtGlobals()->getMemoryMonitor();
ULng32 availableMemory = memMonitor->getPhysMemInBytes() / 100
* myTdb().memUsagePercent_;
// if quota, and it's less than avail memory, then use that lower figure
if ( myTdb().memoryQuotaMB() > 0 &&
myTdb().memoryQuotaMB() * ONE_MEG < availableMemory )
availableMemory = myTdb().memoryQuotaMB() * ONE_MEG ;
ULng32 minMemQuotaMB = myTdb().isPossibleMultipleCalls() ?
myTdb().memoryQuotaMB() : 0 ;
// in case we recreate, delete the old objects (incl. old scratch file)
if ( cluster_ ) delete cluster_;
if ( clusterDb_ ) delete clusterDb_;
ULng32 minB4Chk = myTdb().getBmoMinMemBeforePressureCheck() * ONE_MEG;
clusterDb_ =
new(heap_) ClusterDB(ClusterDB::SEQUENCE_OLAP,
myTdb().OLAPBufferSize_,
NULL, // workAtp_,
tdb.getExplainNodeId(),
0, // ... hashJoinTdb().extRightRowAtpIndex1_,
0, // ... hashJoinTdb().extRightRowAtpIndex2_,
NULL, // ... rightSearchExpr_,
NULL, // ... buckets_,
1, // bucketCount_ - must be > 0
availableMemory,
memMonitor,
myTdb().pressureThreshold_,
getGlobals()->castToExExeStmtGlobals(),
&rc_,
myTdb().isNoOverflow(),
FALSE, /*isPartialGroupBy*/
0, // ... minBuffersToFlush_,
0, // ... numInBatch_,
myTdb().forceOverflowEvery(),
0, // ... forceHashLoopAfterNumBuffers()
0, // ... forceClusterSplitAfterMB(),
// first time it's uninitialized - would setup later!
ioEventHandler_, // set up at registerSubtasks
this, // the calling tcb
myTdb().scratchThresholdPct_,
myTdb().logDiagnostics(),
FALSE, // bufferedWrites(),
TRUE, // No early overflow based on cmp hints
myTdb().memoryQuotaMB(),
minMemQuotaMB,
minB4Chk, // BmoMinMemBeforePressureCheck
// next 4 are for early overflow (not used for OLAP)
0,
0,
0,
0,
0, // ... Hash-Table not resizable
NULL // getStatsEntry()
);
ex_assert( clusterDb_ , "No memory available for OLAP Operator");
clusterDb_->setScratchIOVectorSize(myTdb().scratchIOVectorSize());
switch(myTdb().getOverFlowMode())
{
case SQLCLI_OFM_SSD_TYPE:
clusterDb_->setScratchOverflowMode(SCRATCH_SSD);
break;
case SQLCLI_OFM_MMAP_TYPE:
clusterDb_->setScratchOverflowMode(SCRATCH_MMAP);
break;
default:
case SQLCLI_OFM_DISK_TYPE:
clusterDb_->setScratchOverflowMode(SCRATCH_DISK);
break;
}
clusterDb_->setBMOMaxMemThresholdMB(myTdb().getBMOMaxMemThresholdMB());
cluster_ = new(heap_) Cluster(Cluster::IN_MEMORY,
clusterDb_,
NULL, // ... &buckets_[bucketIdx],
0, // ... bucketsPerCluster,
myTdb().recLen_, // Row Length
false,
false,
0, // ... extRightRowAtpIndex1_,
FALSE, // Use as an "outer cluster"
FALSE, // ... no bit map
NULL, // next cluster,
&rc_);
ex_assert( cluster_ , "No memory available for OLAP Operator");
}
void ExSequenceTcb::updateDiagsArea(ex_queue_entry * centry)
{
if (centry->getDiagsArea())
{
if (workAtp_->getDiagsArea())
{
workAtp_->getDiagsArea()->mergeAfter(*centry->getDiagsArea());
}
else
{
ComDiagsArea * da = centry->getDiagsArea();
workAtp_->setDiagsArea(da);
da->incrRefCount();
centry->setDiagsArea(0);
}
}
}
void ExSequenceTcb::updateDiagsArea( ExeErrorCode rc_)
{
ComDiagsArea *da = workAtp_->getDiagsArea();
if(!da)
{
da = ComDiagsArea::allocate(heap_);
workAtp_->setDiagsArea(da);
}
if (!da->contains((Lng32) -rc_))
{
*da << DgSqlCode(-rc_);
}
}
//
// Constructor and destructor private state
//
ExSequencePrivateState::ExSequencePrivateState
(const ExSequenceTcb * tcb)
{
matchCount_ = 0;
step_ = ExSequenceTcb::ExSeq_EMPTY;
}
ExSequencePrivateState::~ExSequencePrivateState()
{
};
ex_tcb_private_state * ExSequencePrivateState::allocate_new
(const ex_tcb *tcb)
{
return new(((ex_tcb*)tcb)->getSpace())
ExSequencePrivateState((ExSequenceTcb*) tcb);
};