blob: c40377da98a0ba98ceec9ff4c37c8b748e0e23f5 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
******************************************************************************
*
* File: ex_queue.c
* Description: Methods for class ex_queue ex_queue_entry atp_struct
*
*
* Created: 5/3/94
*
* Language: C++
* Status: Experimental
*
*
*
*
******************************************************************************
*/
#include "ex_stdh.h"
#include "stdlib.h"
#include "ComTdb.h" // For cancel/error injection testing.
#include "ex_tcb.h" // For cancel/error injection testing.
#include "ex_error.h" // For cancel/error injection testing.
ex_queue::ex_queue(const queue_type type,
queue_index initialSize,
ex_cri_desc * cri_desc,
CollHeap * space,
enum queueAllocType allocType) :
//ExGod(space),
upDown_(type), size_(initialSize),
numTuples_(cri_desc->noTuples()),
queue_(NULL),
resizePoints_(0),
queueWasFull_(FALSE),
needsResize_(FALSE),
resizeLimit_(0),
needsPstates_(FALSE),
needsAtps_(FALSE)
{
ULng32 i;
// make size a power of 2 and greater than 1.
//
ULng32 count = 1;
queue_index s = size_ - 1;
while (s > 1) {
count++;
s = s >> 1;
};
if (count > (sizeof(queue_index) * 8))
size_ = maxQueueSize;
else
size_ = (1 << count);
ex_assert(size_ > 1, "invalid queue size");
ex_assert((space != 0), "Must pass in a valid space pointer.");
queue_ = (ex_queue_entry *)(space->allocateMemory(size_ *
sizeof(ex_queue_entry)));
// Since code should tolerate head_ and tail_ wrapping
// around, initializing them to these values insures that
// this assumption is always tested in our developer regressions.
head_ = UINT_MAX - 1;
tail_ = UINT_MAX - 1;
mask_ = size_ - 1;
criDesc_ = cri_desc;
atpsAllocated_ = FALSE;
insertSubtask_ = NULL;
unblockSubtask_ = NULL;
cancelSubtask_ = NULL;
// BertBert VV
nextSubtask_ = NULL;
// BertBert ^^
resizeSubtask_ = NULL;
for(i = 0; i<size_; i++)
{
// Initialize private state pointer
queue_[i].pstate = NULL;
// initialize the public state
queue_[i].initializeState(upDown_);
// Initialize the pointer to the atp_struct
queue_[i].atp_ = NULL;
}
// allocate space for the array of tuple pointers if required to do so
if(allocType == ALLOC_ATP ||
allocType == ALLOC_DEFAULT && upDown_ == UP_QUEUE)
allocateAtps(space);
}
ex_queue::~ex_queue()
{
if(atpsAllocated_)
deallocateAtps();
deallocatePstate();
}
// some weight factors for state transitions of queues
#define SUB_EMPTY_EMPTY_TRANSITION 2
#define ADD_EMPTY_FULL_TRANSITION 3
#define ADD_FULL_EMPTY_TRANSITION 3
#define SUB_FULL_FULL_TRANSITION 1
void ex_queue::handleFullQueue()
{
// if head was the same as the tail that meant that the queue
// was full (blocked), notify the task that is waiting for the
// queue to become unblocked, but do it first!!
head_++; // ok to wrap around
unblockSubtask_->schedule();
// logic for dynamic resizing of queues
if (queueWasFull_)
{
// the queue went from full to full without reaching
// the empty state, decrement the count of empty/full
// transitions
if (resizePoints_ > SUB_FULL_FULL_TRANSITION)
resizePoints_ -= SUB_FULL_FULL_TRANSITION;
}
else
{
// the query became empty in the recent past, increment
// empty/full transitions
resizePoints_ += ADD_EMPTY_FULL_TRANSITION;
if (resizePoints_ >= resizeLimit_ && resizeSubtask_)
{
// We have reached the resize limit and should resize.
needsResize_ = TRUE;
resizeSubtask_->schedule();
}
queueWasFull_ = TRUE;
} // end of logic for dynamic resizing of queues
// at this point the queue was full and queueWasFull_ is TRUE
}
void ex_queue::handleEmptyQueue()
{
if (queueWasFull_)
{
// the query became full in the recent past, increment
// empty/full transitions
resizePoints_ += ADD_FULL_EMPTY_TRANSITION;
if (resizePoints_ >= resizeLimit_ && resizeSubtask_)
{
needsResize_ = TRUE;
resizeSubtask_->schedule();
}
queueWasFull_ = FALSE;
}
else
{
// the queue went from empty to empty without reaching
// the full state, decrement the count of empty/full
// transitions
if (resizePoints_ > SUB_EMPTY_EMPTY_TRANSITION)
resizePoints_ -= SUB_EMPTY_EMPTY_TRANSITION;
}
// at this point the queue is empty and queueWasFull_ is FALSE
}
#ifdef _DEBUG
void ex_queue::removeHead()
{
// make sure the queue is not empty
ExQueueAssert(!isEmpty(), "ex_queue::removeHead() on an empty queue");
ex_queue_entry *headEntry = &queue_[head_ & mask_]; //(size_ - 1)];
#ifdef _DEBUG
logRemoveHead();
#endif
if (atpsAllocated_ && headEntry->getAtp())
headEntry->getAtp()->release();
if (isFull())
handleFullQueue();
else
{
head_++; // ok to wrap around
// logic for dynamic resizing of queues
if (isEmpty())
{
handleEmptyQueue();
} // end of logic for dynamic resizing of queues
}
} // ex_queue::removeHead()
#endif
// LCOV_EXCL_START
//soln 10-040111-2308 start
void ex_queue::deleteHeadEntry()
{
// removes the head entry. Same as removeHead except that it doesn't
// any queue resizing logic.
// To be used to remove head entries ONLY after a cancel request has been
// sent by the operator. This method removes head entries without initiating
// the q resizing logic. This is used to handle special cases like cancelling
// a request after a GET_N is satisfied.After this removal of entries from
// the queue is just looking for a Q_NO_DATA. Hence these shouldnt be
// considered for qresizing as the query has been satisfied.
// make sure the queue is not empty
ExQueueAssert(!isEmpty(), "ex_queue::deleteHeadEntry() on an empty queue");
ex_queue_entry *headEntry = &queue_[head_ & mask_]; //(size_ - 1)];
//headEntry->initializeState(upDown_);
#ifdef _DEBUG
logRemoveHead();
#endif
if (atpsAllocated_ && headEntry->getAtp())
{
headEntry->getAtp()->release();
}
if (isFull())
{
head_++; //ok to wrap around
unblockSubtask_->schedule();
}
else
head_++; //ok to wrap around
// Since this is called in the cancel routine, set resize points to zero
// so that we do not increase the points.
// Note that resize points are persistant across requests and hence it is
// essential to reset it to 0, so that resize points dont get accumulated
// across requests
resizePoints_ = 0;
} // ex_queue::deleteHeadEntry()
//soln 10-040111-2308 end
// LCOV_EXCL_STOP
void ex_queue_entry::passAtp(const ex_queue_entry *from)
{
ExQueueAssert(atp_ == NULL || criDesc() == from->criDesc(),
"incompatible queue entries");
atp_ = from->atp_;
}
void ex_queue_entry::passAtp(atp_struct *from)
{
ExQueueAssert(atp_ == NULL || criDesc() == from->getCriDesc(),
"incompatible queue entries");
atp_ = from;
}
NABoolean ex_queue::allocateAtps(CollHeap * space,
atp_struct * atps,
queue_index numNewAtps,
Int32 atpSize,
NABoolean failureIsFatal)
{
queue_index i;
// remember that we need ATPs for this queue
needsAtps_ = TRUE;
// if the number of needed ATPs is not passed in, assume that all
// queue entries need an ATP
if (atps == NULL)
{
// initially try to get a whole array of ATP's rather than one
// at a time We are sent back the size of the atp struct since
// it also allocates space for number of tuples per criDesc.
numNewAtps = size_;
#pragma nowarn(1506) // warning elimination
atps = allocateAtpArray( criDesc_, numNewAtps, &atpSize, space, failureIsFatal);
#pragma warn(1506) // warning elimination
}
// check whether the caller provided the ATPs or whether we were
// successful in allocating them as an array
if (atps)
{
ex_assert(atpSize >= sizeof(atp_struct),
"Invalid ATP size passed to ex_queue::allocateAtps()");
for(i = 0; i < size_; i++)
{
if (queue_[i].atp_ == NULL)
{
ex_assert(numNewAtps != 0,
"There are more empty queue entries than new ATPs");
numNewAtps--;
queue_[i].atp_ = atps;
atps = (atp_struct *) ((char *)atps + atpSize);
}
}
}
else
// Didn't get enough space to get all of them at one shot so do
// it one at a time.
// Allocate space for the tuples in each atp
// Allocate the atp if it has not been allocated yet
for(i = 0; i<size_; i++)
{
if (queue_[i].atp_ == NULL)
{
queue_[i].atp_ = allocateAtpArray(criDesc_, 1, &atpSize, space, failureIsFatal);
if (!failureIsFatal && queue_[i].atp_ == NULL)
return FALSE; // let user handle the failure
ex_assert(queue_[i].atp_,"Memory Allocation Fatal Failure Expected but Jump Buffer does not seem to be set!");
}
}
atpsAllocated_ = TRUE;
return TRUE;
}
void ex_queue::allocatePstate(ex_tcb_private_state *p, ex_tcb * tcb)
{
queue_index i;
// Allocate space for the private state in each queue entry
for(i = 0; i<size_; i++)
{
if (queue_[i].pstate == NULL)
queue_[i].pstate = p->allocate_new(tcb);
}
needsPstates_ = TRUE;
}
NABoolean ex_queue::allocatePstate(ex_tcb * tcb,
ex_tcb_private_state *pstates,
queue_index numNewPstates,
Lng32 pstateLength,
NABoolean failureIsFatal)
{
queue_index i;
ex_tcb_private_state *actualPstates = pstates;
#pragma nowarn(1506) // warning elimination
Lng32 actualNumPstates = numNewPstates;
#pragma warn(1506) // warning elimination
Lng32 actualPstateLength = pstateLength;
if (actualPstates == NULL)
{
#pragma nowarn(1506) // warning elimination
actualNumPstates = size_;
#pragma warn(1506) // warning elimination
// NOTE: actualNumPstates may be changed (reduced) by this call
actualPstates = tcb->allocatePstates(actualNumPstates,
actualPstateLength);
if (!failureIsFatal && actualNumPstates == 0)
return FALSE; // out of memory and user wants to handle this
ex_assert(actualNumPstates > 0, "Unable to allocate even one pstate");
}
else
{
// As Lenin said: trusting is good, checking is better
ex_assert(actualNumPstates > 0,"Can't preallocate 0 pstates");
ex_assert(actualPstateLength >= sizeof(ex_tcb_private_state),
"Pstate size is too small");
}
// Allocate space for the private state in each queue entry
for(i=0; i<size_; i++)
{
if (queue_[i].pstate == NULL)
{
// need a pstate
if (actualNumPstates <= 0)
{
// the array doesn't have enough pstates in it, allocate
// the entries one by one
actualNumPstates = 1;
actualPstates =
tcb->allocatePstates(actualNumPstates, actualPstateLength);
if (!failureIsFatal && actualNumPstates == 0)
return FALSE; // out of memory and user wants to handle this
ex_assert(actualNumPstates,"Unable to allocate a pstate");
}
// let the current queue entry point to the first entry of
// the array and advance the array pointer to the next element
queue_[i].pstate = actualPstates;
actualPstates = (ex_tcb_private_state *)
(((char *)actualPstates) + actualPstateLength);
actualNumPstates--;
}
}
needsPstates_ = TRUE;
return TRUE;
}
void ex_queue::deallocateAtps()
{
queue_index i;
for(i = 0; i<size_; i++)
{
// release all the tupps before deallocating the array of tupps
if (queue_[i].atp_)
queue_[i].atp_->release();
// de-allocate array of pointers to tuples
//::deallocateAtp(queue_[i].atp_, collHeap());
queue_[i].atp_ = 0;
}
atpsAllocated_ = FALSE;
}
void ex_queue::deallocatePstate()
{
// don't explicitly delete the pstate objects since they came
// from the space and since they may have been allocated as
// arrays
needsPstates_ = FALSE;
}
queue_index ex_queue::resize(ex_tcb * tcb,
queue_index newSize)
{
queue_index sizeDelta;
Space *space = tcb->getSpace();
ex_queue_entry *newQueue;
queue_index newMask;
Int32 queueWasFull = isFull();
NABoolean needAtps = needsAtps();
Int32 atpSize = 0;
atp_struct *atps = NULL;
queue_index numAllocatedAtps = 0;
ex_tcb_private_state *pstates = NULL;
Lng32 numAllocatedPstates = 0;
Lng32 pstateLength = 0;
queue_index qi;
queue_index lastEntryToCopy;
ex_queue_entry *oldQueue = queue_;
const queue_index oldSize = size_;
const queue_index oldMask = mask_;
// the new size must be a power of 2, round up to the next power of 2
queue_index p2 = size_;
while (p2 < newSize && p2 != 0)
p2 = p2 << 1;
// p2 is now a power of 2 that is >= newSize, except that
// it is 0 in some unsupported cases, like size_ being 0 or newSize
// being too large to be rounded up to a power of 2
newSize = p2;
// can't resize to a smaller size
if (newSize <= size_)
return size_;
sizeDelta = newSize - size_;
newMask = newSize - 1;
// try to allocate the needed memory first and give up if that's not
// possible
// allocate new array of ex_queue_entry structs
newQueue = (ex_queue_entry *) (space->allocateMemory(
newSize * sizeof(ex_queue_entry),FALSE));
if (newQueue == NULL)
return size_; // out of memory
// allocate array of ATPs if needed
if (needAtps)
{
#pragma nowarn(1506) // warning elimination
atps = allocateAtpArray(criDesc_, sizeDelta, &atpSize, space, FALSE);
#pragma warn(1506) // warning elimination
// for now, give up if it wasn't possible to allocate the ATPs in
// one piece (one could try to allocate them one by one)
if (atps == NULL)
return size_;
numAllocatedAtps = sizeDelta;
}
// allocate an array of pstate objects if needed
if (needsPstates_)
{
#pragma nowarn(1506) // warning elimination
numAllocatedPstates = sizeDelta;
#pragma warn(1506) // warning elimination
pstates = tcb->allocatePstates(numAllocatedPstates,pstateLength);
if (pstates == NULL)
{
// either the TCB doesn't support the allocatePstates()
// method yet (it would be a dumb thing for a TCB not to
// support this AND to call the resize method) or we don't
// have enough memory. Give up without changing the size.
return size_;
}
}
// at this point we have allocated everything we need, so we
// should be in good shape from now on
// initialize the new queue
for(qi = 0; qi < newSize; qi++)
{
// Initialize private state pointer
newQueue[qi].pstate = NULL;
// initialize the public state
newQueue[qi].initializeState(upDown_);
// Initialize the pointer to the atp_struct
newQueue[qi].atp_ = NULL;
}
// calculate last entry to copy (the addition here may wrap around
// such that <lastEntryToCopy> is actually smaller than <head_>)
lastEntryToCopy = head_ + size_;
// Copy all queue entries (whether they are in use or not) over to
// the new queue. Do this in such a way that the used entries move
// to the new queue entry that corresponds to their index. Note
// that we will start copying the used entries (if any) first, and
// that those are the first entries until (but not including) the
// entry where qi is equal to <tail_>.
for (qi = head_; qi != lastEntryToCopy; qi++)
{
newQueue[qi & newMask] = queue_[qi & mask_];
}
// At this point, <size_> elements of the new queue are initialized
// with actual queue entries, while <sizeDelta> entries may still
// need ATPs and/or pstates (if applicable). These sizeDelta entries
// will be taken care of next by using the regular allocation methods
// for new queues. Note that the ATPs and PSTATEs of the remaining
// entries may or may not be contiguous.
// reset counter that counts empty/full transitions
resizePoints_ = 0;
needsResize_ = FALSE;
// set the data member to point to the newly allocated structures
queue_ = newQueue;
size_ = newSize;
mask_ = newMask;
NABoolean finalAllocSucceeded = TRUE;
// initialize the uninitialized ATPs if required to do so
if (needAtps)
finalAllocSucceeded = allocateAtps(space,
atps,
numAllocatedAtps,
atpSize,
FALSE);
// allocate PSTATEs for the uninitialized queue entries if this
// queue needs PSTATEs
if (needsPstates_ && finalAllocSucceeded)
{
finalAllocSucceeded = allocatePstate(tcb,
pstates,
numAllocatedPstates,
pstateLength,
FALSE);
}
if (finalAllocSucceeded)
{
// schedule the unblock subtask of the queue if the queue was full
// before the resize and now is no longer full (a deadlock could
// occur if we wouldn't schedule the unblock task for a full to
// not full transition of a queue)
if (queueWasFull)
unblockSubtask_->schedule();
}
else
{
// We failed during allocation of ATPs or PStates. Undo the switch
// to the new, resized queue and restore the queue to its old state.
queue_ = oldQueue;
size_ = oldSize;
mask_ = oldMask;
}
return size_;
}
void ex_queue::cancelRequestWithParentIndex(queue_index pindex)
{
queue_index i = head_;
// search through all existing entries for the given parent index
while (!isVacant(i))
{
// if this entry has the given parent index, cancel it
if (queue_[i&mask_].downState.parentIndex == pindex)
cancelRequest(i);
i++;
}
}
// Cancel all requests with a parent index in the range from startPI to endPI (inclusive)
void ex_queue::cancelRequestWithParentIndexRange(queue_index startPI, queue_index endPI)
{
queue_index i = head_;
// search through all existing entries for the parent indexes within the range
if(endPI >= startPI) {
while (!isVacant(i))
{
// if this entry is within the range, cancel it
if (queue_[i&mask_].downState.parentIndex >= startPI &&
queue_[i&mask_].downState.parentIndex <= endPI)
cancelRequest(i);
i++;
}
} else if(startPI > endPI) {
// If endPI has wrapped around, the range also wraps.
while (!isVacant(i))
{
// if this entry is within the range, cancel it
if (queue_[i&mask_].downState.parentIndex >= startPI ||
queue_[i&mask_].downState.parentIndex <= endPI)
cancelRequest(i);
i++;
}
}
}
// used to initiliaze an empty queue entry
void ex_queue_entry::initializeState(const ex_queue::queue_type type)
{
switch(type)
{
case ex_queue::UP_QUEUE:
upState.status = ex_queue::Q_INVALID;
break;
case ex_queue::DOWN_QUEUE:
downState.request = ex_queue::GET_EMPTY;
downState.requestValue = 0;
// BertBert VV
downState.numGetNextsIssued = 0;
// BertBert ^^
break;
}
}
NA_EIDPROC
const char * NodeTypeToString(ComTdb::ex_node_type nodeType)
{
const char * tdbName;
switch (nodeType)
{
case ComTdb::ex_DDL:
tdbName = "ex_ddl";
break;
case ComTdb::ex_DESCRIBE:
tdbName = "ex_describe";
break;
case ComTdb::ex_EXE_UTIL:
tdbName = "ExExeUtil";
break;
case ComTdb::ex_ROOT:
tdbName = "ex_root";
break;
case ComTdb::ex_ONLJ:
tdbName = "ex_onlj";
break;
case ComTdb::ex_MJ:
tdbName = "ex_mj";
break;
case ComTdb::ex_HASHJ:
tdbName = "ex_hashj";
break;
case ComTdb::ex_HASH_GRBY:
tdbName = "ex_hash_grby";
break;
case ComTdb::ex_LOCK:
tdbName = "ExLock";
break;
case ComTdb::ex_SORT:
tdbName = "ExSort";
break;
case ComTdb::ex_SORT_GRBY:
tdbName = "ex_sort_grby";
break;
case ComTdb::ex_SEQUENCE_FUNCTION:
tdbName = "ex_sequence_function";
break;
case ComTdb::ex_TRANSACTION:
tdbName = "ExTrans";
break;
case ComTdb::ex_SET_TIMEOUT:
tdbName = "ExTimeout";
break;
case ComTdb::ex_UNION:
tdbName = "ex_union";
break;
case ComTdb::ex_TUPLE:
tdbName = "ex_tuple";
break;
case ComTdb::ex_SPLIT_TOP:
tdbName = "ex_split_top";
break;
case ComTdb::ex_SPLIT_BOTTOM:
tdbName = "ex_split_bottom";
break;
case ComTdb::ex_PARTN_ACCESS:
tdbName = "ex_partn_access";
break;
case ComTdb::ex_SEND_TOP:
tdbName = "ex_send_top";
break;
case ComTdb::ex_SEND_BOTTOM:
tdbName = "ex_send_bottom";
break;
case ComTdb::ex_EXPLAIN:
tdbName = "exExplain";
break;
case ComTdb:: ex_TUPLE_FLOW:
tdbName = "ExTupleFlow";
break;
case ComTdb::ex_FAST_EXTRACT:
tdbName = "ExFastExtract";
break;
case ComTdb::ex_UDR:
tdbName = "ExUdr";
break;
case ComTdb::ex_PROBE_CACHE:
tdbName = "ExProbeCache";
break;
case ComTdb::ex_HDFS_SCAN:
tdbName = "ExHdfsScan";
break;
case ComTdb::ex_ARQ_WNR_INSERT:
tdbName = "ExExeUtilAqrWnrInsert";
break;
default:
tdbName = "NULL";
}
return tdbName;
}
void ex_queue::injectErrorOrCancel()
{
#ifdef _DEBUG
ex_queue_entry *qe = getQueueEntry(tail_-1);
// DO the ol' switcheroo, but not every time.
ULng32 freq = insertSubtask_->getTcb()->getGlobals()->getInjectErrorAtQueue();
if (freq == 0)
return;
if (upDown_ == UP_QUEUE)
{
if ((rand() & (freq-1)) != 0)
return;
#pragma nowarn(269) // warning elimination
NABoolean needsError = FALSE;
#pragma warn(269) // warning elimination
switch (qe->upState.status)
{
case Q_OK_MMORE:
{
needsError = TRUE;
qe->upState.status = Q_SQLERROR;
#if (!defined (__EID) && !defined(NA_C89))
cerr << "Converting a Q_OK_MMORE to a Q_SQLERROR, from "
<< NodeTypeToString(unblockSubtask_->getTcb()->getNodeType())
<< "(" << unblockSubtask_->getTcb() << ")"
<< " to "
<< NodeTypeToString(insertSubtask_->getTcb()->getNodeType())
<< "(" << insertSubtask_->getTcb() << ")"
<< endl;
#endif
break;
}
case Q_NO_DATA:
if (!isFull())
{
needsError = TRUE;
qe->upState.status = Q_SQLERROR;
ex_queue_entry *newQNODATA = getTailEntry();
newQNODATA->upState = qe->upState;
newQNODATA->upState.status = Q_NO_DATA;
newQNODATA->getAtp()->copyAtp(qe->getAtp());
tail_++;
#if (!defined (__EID) && !defined(NA_C89))
cerr << "Injecting a Q_SQLERROR before a Q_NO_DATA, from "
<< NodeTypeToString(unblockSubtask_->getTcb()->getNodeType())
<< "(" << unblockSubtask_->getTcb() << ")"
<< " to "
<< NodeTypeToString(insertSubtask_->getTcb()->getNodeType())
<< "(" << insertSubtask_->getTcb() << ")"
<< endl;
#endif
}
break;
default:
break;
}
if (needsError)
{
ComDiagsArea * da = qe->getDiagsArea();
if (!da)
da = ComDiagsArea::allocate(insertSubtask_->getTcb()->getHeap());
else
da = da->copy();
qe->setDiagsArea(da);
*da << DgSqlCode(-EXE_ERROR_INJECTED)
<< DgString0(__FILE__)
<< DgInt0(__LINE__);
}
}
#endif
return;
}
void ex_queue::logHeader()
{
#if (!defined (__EID) && !defined(NA_C89))
if (upDown_ == UP_QUEUE)
{
cerr
<< NodeTypeToString(unblockSubtask_->getTcb()->getNodeType())
<< "(" << unblockSubtask_->getTcb() << ")"
<< " up to "
<< NodeTypeToString(insertSubtask_->getTcb()->getNodeType())
<< "(" << insertSubtask_->getTcb() << ")"
;
}
else
{
cerr << NodeTypeToString(unblockSubtask_->getTcb()->getNodeType())
<< "(" << unblockSubtask_->getTcb() << ")"
<< " down to "
<< NodeTypeToString(insertSubtask_->getTcb()->getNodeType())
<< "(" << insertSubtask_->getTcb() << ")"
;
}
#endif
}
void ex_queue::logRemoveHead()
{
#if 0
#if (!defined (__EID) && !defined(NA_C89))
ex_queue_entry *qe = &queue_[head_ & mask_];
logHeader();
cerr << "remove ";
if (upDown_ == UP_QUEUE)
{
switch (qe->upState.status)
{
case Q_OK_MMORE:
cerr << "Q_OK_MMORE";
break;
case Q_SQLERROR:
cerr << "Q_SQLERROR";
break;
case Q_NO_DATA:
cerr << "Q_NO_DATA";
break;
default:
cerr << "unknown";
break;
}
cerr << ", pi " << qe->upState.parentIndex;
}
else
{
switch (qe->downState.request)
{
case GET_ALL:
cerr << "GET_ALL";
break;
case GET_N:
cerr << "GET_N";
break;
case GET_EOD:
cerr << "GET_EOD";
break;
case GET_NOMORE:
cerr << "GET_NOMORE";
break;
default:
cerr << "unknown";
break;
}
cerr << ", pi " << qe->downState.parentIndex;
}
cerr << endl;
#endif
#endif
}