blob: 5d5ed0ac7512706bd02c8927c027cf3b7eb1bbb4 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: sql_buffer.cpp
* Description:
*
*
* Created: 7/10/95
* Language: C++
*
*
*
*
*****************************************************************************
*/
//
// NOTE: Some of the code in this file is excluded from the UDR server
// build using the UDRSERV_BUILD macro. The excluded code is:
// - All of the sql_buffer_pool, SqlBufferOlt, and SqlBufferOltSmall code
// - Code in the SqlBuffer class that deals with ATPs, expression
// evaluation, and statistics
//
#ifdef UDRSERV_BUILD
#include "sql_buffer.h"
#else
#include "Platform.h"
#include "ex_stdh.h"
#include "str.h"
#include "ComQueue.h"
#include "exp_expr.h"
#include "ComTdb.h"
#include "ExStats.h"
#endif // UDRSERV_BUILD
#include "sql_buffer_size.h"
// to use msg_mon_dump_process_id
#include "seabed/ms.h"
//
// Define a local assert macro. We define UdrExeAssert to something
// other than ex_assert in the UDR Server build.
//
#ifdef UDRSERV_BUILD
extern void udrAssert(const char *, Int32, const char *);
#define UdrExeAssert(p, msg) \
if (!(p)) { udrAssert( __FILE__ , __LINE__ , msg); }
#else
#define UdrExeAssert(a,b) ex_assert(a,b)
#endif // UDRSERV_BUILD
////////////////////////////////////////////////////////////////////////////
// class SqlBufferBase
////////////////////////////////////////////////////////////////////////////
SqlBufferBase::SqlBufferBase(BufferType bufType)
: SqlBufferHeader(bufType),
baseFlags_(0),
sizeInBytes_(0)
{
};
void SqlBufferBase::init(ULng32 in_size_in_bytes,
NABoolean clear)
{
baseFlags_ = 0;
#pragma nowarn(1506) // warning elimination
sizeInBytes_ = in_size_in_bytes;
#pragma warn(1506) // warning elimination
SqlBufferHeader::init();
}
void SqlBufferBase::init()
{
baseFlags_ = 0;
SqlBufferHeader::init();
}
void SqlBufferBase::driveInit(ULng32 in_size_in_bytes,
NABoolean clear,
BufferType bt)
{
setBufType(bt);
fixupVTblPtr();
init(in_size_in_bytes, clear);
}
void SqlBufferBase::driveInit()
{
fixupVTblPtr();
init();
}
void SqlBufferBase::drivePack()
{
if (NOT packed())
pack();
setPacked(TRUE);
}
void SqlBufferBase::driveUnpack()
{
if (packed())
{
fixupVTblPtr();
unpack();
}
setPacked(FALSE);
}
NABoolean SqlBufferBase::driveVerify(ULng32 maxBytes)
{
fixupVTblPtr();
return verify(maxBytes);
}
void SqlBufferBase::fixupVTblPtr()
{
switch (bufType())
{
#ifndef UDRSERV_BUILD
case OLT_:
{
SqlBufferOlt sb;
str_cpy_all ((char *)this, (char*)&sb, sizeof(char *));
}
break;
#endif // UDRSERV_BUILD
case DENSE_:
{
SqlBufferDense sb;
str_cpy_all ((char *)this, (char*)&sb, sizeof(char *));
}
break;
case NORMAL_:
{
SqlBufferNormal sb;
str_cpy_all ((char *)this, (char*)&sb, sizeof(char *));
}
break;
default:
{
}
break;
}
}
void SqlBufferBase::pack()
{
}
void SqlBufferBase::unpack()
{
}
NABoolean SqlBufferBase::verify(ULng32 maxBytes) const
{
return TRUE;
}
// positions to the first tupp descriptor in the buffer.
void SqlBufferBase::position()
{
}
void * SqlBufferBase::operator new(size_t size, char*s)
{
return s;
}
SqlBufferBase::moveStatus
SqlBufferBase::moveInSendOrReplyData(NABoolean isSend, // TRUE = send
NABoolean doMoveControl, // TRUE = move
// control always
NABoolean doMoveData, // TRUE = move data.
void * currQState, // up_state(reply) or
// down_state(send)
ULng32 controlInfoLen,
ControlInfo ** controlInfo,
ULng32 projRowLen,
tupp_descriptor ** outTdesc,
ComDiagsArea * diagsArea,
tupp_descriptor ** diagsDesc,
ex_expr_base * expr,
atp_struct * atp1,
atp_struct * workAtp,
atp_struct * destAtp,
unsigned short tuppIndex,
NABoolean doMoveStats,
ExStatisticsArea * statsArea,
tupp_descriptor ** statsDesc,
NABoolean useExternalDA,
NABoolean callerHasExternalDA,
tupp_descriptor * defragTd
#if (defined (NA_LINUX) && defined(_DEBUG) && !defined(__EID))
,ex_tcb * tcb
#endif
,NABoolean noMoveWarnings
)
{
return MOVE_ERROR;
}
NABoolean SqlBufferBase::moveOutSendOrReplyData(NABoolean isSend,
void * currQState,
tupp &outTupp,
ControlInfo ** controlInfo,
ComDiagsArea ** diagsArea,
ExStatisticsArea ** statsArea,
Int64 * numStatsBytes,
NABoolean noStateChange,
NABoolean unpackDA,
CollHeap * heap)
{
return FALSE;
}
// positions to the "tupp_desc_num"th tupp descriptor.
void SqlBufferBase::position(Lng32 ){};
// returns the 'next' tupp descriptor. Increments the
// position. Returns null, if none found.
tupp_descriptor * SqlBufferBase::getNext(){return NULL;};
// returns the 'current' tupp descriptor. Does not increment
// position. Returns null, if none found.
tupp_descriptor * SqlBufferBase::getCurr(){return NULL;};
// advances the current tupp descriptor
void SqlBufferBase::advance(){};
// add a new tuple descriptor to the end of the buffer
tupp_descriptor *SqlBufferBase::add_tuple_desc(Lng32 tup_data_size)
{
return NULL;
}
// remove the tuple desc with the highest number from the buffer
void SqlBufferBase::remove_tuple_desc()
{
}
////////////////////////////////////////////////////////////////////////////
// class SqlBuffer
////////////////////////////////////////////////////////////////////////////
void SqlBuffer::init(ULng32 in_size_in_bytes,
NABoolean clear)
{
SqlBufferBase::init(in_size_in_bytes, clear);
Lng32 sb_size = getClassSize();
freeSpace_ = sizeInBytes_ - ROUND8(sb_size);
bufferStatus_ = EMPTY;
maxTuppDesc_ = 0;
tuppDescIndex_ = 0;
srFlags_ = DEFAULT_;
flags_ = 0;
setPacked(FALSE);
}
void SqlBuffer::init()
{
Lng32 sb_size = getClassSize();
SqlBufferBase::init();
freeSpace_ = sizeInBytes_ - ROUND8(sb_size);
bufferStatus_ = EMPTY;
maxTuppDesc_ = 0;
tuppDescIndex_ = 0;
srFlags_ = DEFAULT_;
flags_ = 0;
setPacked(FALSE);
}
// RETURNS: -1 if this buffer is free and reinitialized, 0 if not.
short SqlBuffer::freeBuffer()
{
// First check if the buffer is in use. If it is in use, do not call
// isFree() or any other virtual method, as the vptr may not be valid,
// i.e., the object may be in a packed state.
if (bufferStatus_ == IN_USE)
return 0;
static SqlBufferOlt sv_sb_olt;
static SqlBufferDense sv_sb_dense;
static SqlBufferNormal sv_sb_normal;
static SqlBufferOltSmall sv_sb_olt_small;
if ((memcmp(this, &sv_sb_normal, sizeof(char *)) == 0) ||
(memcmp(this, &sv_sb_dense, sizeof(char *)) == 0) ||
(memcmp(this, &sv_sb_olt_small, sizeof(char *)) == 0) ||
(memcmp(this, &sv_sb_olt, sizeof(char *)) == 0)) {
;
}
else {
printf("SQ: Buffer virtual function pointer not yet set\n");fflush(stdout);
return 0;
}
short freeState = isFree();
if (freeState && (bufferStatus_ != EMPTY) )
init(sizeInBytes_);
return freeState;
}
void SqlBuffer::compactBuffer()
{
}
tupp_descriptor * SqlBuffer::getPrev(){return NULL;};
short SqlBuffer::space_available(Lng32 tup_data_size)
{
if (freeSpace_ >= (Lng32)(tup_data_size + sizeof(tupp_descriptor)))
return -1;
else
return 0;
};
tupp_descriptor * SqlBuffer::getTuppDescriptor(Lng32 tupp_desc_num)
{
return NULL;
}
// print buffer info. For debugging
void SqlBuffer::printInfo() {
cerr << this << " Status: " << bufferStatus_
<< " maxTuppDesc: " << maxTuppDesc_
<< " freeSpace: " << freeSpace_;
Lng32 refcount = 0;
Lng32 firstref = -1;
#pragma warning (disable : 4018) //warning elimination
for (Int32 i = 0; (i < maxTuppDesc_); i++)
if (tupleDesc(i)->getReferenceCount()) {
if (firstref == -1)
firstref = i;
refcount++;
};
#pragma warning (default : 4018) //warning elimination
cerr << " ref tupps: " << refcount << " first: " << firstref << endl;
};
////////////////////////////////////////////////////////////////////
// Moves control info to send(input) or reply buffer, if needed.
// Moves control info if previous state is different than the
// current state. As long as state doesn't change, all tuples
// in the send/reply buffer are data tupps. A new control tupp is
// added when state changed.
// Allocates space for data, if needed (projRowLen > 0). Returns
// pointer to allocated tupp_descriptor.
// If diagsLen is greater than 0, allocates space for diags area.
// The tupp_descriptor contains info
// indicating whether it is a control or data tupp.
// Returns TRUE if buffer is full, FALSE otherwise.
////////////////////////////////////////////////////////////////////
SqlBuffer::moveStatus SqlBuffer::moveInSendOrReplyData(NABoolean isSend,
NABoolean doMoveControl,
NABoolean doMoveData,
void * currQState,
ULng32 controlInfoLen,
ControlInfo ** controlInfo,
ULng32 projRowLen,
tupp_descriptor ** outTdesc,
ComDiagsArea * diagsArea,
tupp_descriptor ** diagsDesc,
ex_expr_base * expr,
atp_struct * atp1,
atp_struct * workAtp,
atp_struct * destAtp,
unsigned short tuppIndex,
NABoolean doMoveStats,
ExStatisticsArea * statsArea,
tupp_descriptor ** statsDesc,
NABoolean useExternalDA,
NABoolean callerHasExternalDA,
tupp_descriptor * defragTd
#if (defined (NA_LINUX) && defined(_DEBUG) && !defined(__EID))
,ex_tcb * tcb
#endif
,NABoolean noMoveWarnings
)
{
// if this is the first record to be moved in,
// or current state is different than previous
// state, then move in control info first.
tupp_descriptor * cdesc = NULL;
tupp_descriptor * tdesc = NULL;
NABoolean controlInfoMoved = FALSE;
Lng32 daMark = -1;
Lng32 atpDaMark = -1;
SqlBuffer::moveStatus retcode = MOVE_SUCCESS;
NABoolean moveControlInfo = FALSE;
NABoolean setUpStatusToError = FALSE;
if ((srFlags() == REPLY_GET_UNIQUE_REQUEST) &&
(NOT doMoveData))
{
doMoveData = TRUE;
projRowLen = 0;
}
if ((getTotalTuppDescs() == 0) ||
(diagsArea != NULL) ||
((isSend == TRUE) && (NOT (srControlInfo_.getDownState() ==
*((down_state *)currQState)))))
{
moveControlInfo = TRUE;
}
if (moveControlInfo == FALSE)
{
if (srFlags() == REPLY_GET_UNIQUE_REQUEST)
{
if ((((up_state *)currQState)->parentIndex -
srControlInfo_.getUpState().parentIndex) == 1)
{
if (NOT doMoveData)
{
doMoveData = TRUE;
projRowLen = 0;
}
}
else if (((up_state *)currQState)->parentIndex !=
srControlInfo_.getUpState().parentIndex)
{
moveControlInfo = TRUE;
}
else
{
doMoveData = FALSE;
}
}
else
{
if ((doMoveControl) ||
((isSend == FALSE) && (NOT (srControlInfo_.getUpState() ==
*((up_state *)currQState)))))
moveControlInfo = TRUE;
}
}
NABoolean prevHadExternalDA = flags_ & CURR_HAS_EDA;
if (callerHasExternalDA)
{
// External DiagsAreas always need a ControlInfo so that
// moveOutSendOrReplyData will notice them.
moveControlInfo = TRUE;
flags_ |= CURR_HAS_EDA;
}
else if (!callerHasExternalDA && prevHadExternalDA)
{
// If previous entry in this buffer had an External DiagsAreas
// but this one does not, make sure there is a ControlInfo so that
// moveOutSendOrReplyData can reset srControlInfo_.flags_
moveControlInfo = TRUE;
flags_ &= ~CURR_HAS_EDA;
}
else
{
// Have not changed state of SqlBuffer WRT whether curr entry has
// an external diags area. Hence, no need to force a ControlInfo
// to note absence of external DA.
}
if (moveControlInfo)
{
if (controlInfoLen > 0)
{
#pragma nowarn(1506) // warning elimination
cdesc = add_tuple_desc(controlInfoLen);
#pragma warn(1506) // warning elimination
if (!cdesc)
{
return SqlBuffer::BUFFER_FULL; // buffer is full
}
// mark that this is a control tupp descriptor
cdesc->setControlTupleType();
controlInfoMoved = TRUE;
ControlInfo *ci;
ci = (ControlInfo *)(cdesc->getTupleAddress());
ci->resetFlags(); // no constructor got called for ci
ci->setIsExtDiagsAreaPresent(callerHasExternalDA);
if (currQState)
{
if (isSend == TRUE)
{
ci->getDownState() = *(down_state *)currQState;
}
else
{
ci->getUpState() = *(up_state *)currQState;
}
if (controlInfo)
*controlInfo = ci;
}
} // controlInfoLen > 0
}
// now move data. Note that if space for data is to be allocated,
// and the previous tupp moved was control, then this data MUST
// be moved in the same reply buffer. In other words, the last
// tuple in a reply buffer cannot be a control, if that control
// has associated data.
UInt32 defragLength = 0;
UInt32 rowLen = projRowLen;
UInt32 *rowLenPtr = &rowLen;
#ifndef UDRSERV_BUILD
ex_expr::exp_return_type expr_retcode = ex_expr::EXPR_OK;
#endif
if (doMoveData == TRUE)
{
#pragma nowarn(1506) // warning elimination
#ifndef UDRSERV_BUILD
if (defragTd &&
projRowLen > 0 &&
SqlBufferGetTuppSize(projRowLen, bufType()) > (ULng32)getFreeSpace() )
{ // if we are here then there is not enough space to hold the max row size.
// we apply the expression in the defrag buffer and get the actual length
// which may be smaller then the max row size and may fit in the remaining
// buffer space
destAtp->getTupp(tuppIndex) = defragTd;
defragTd->setReferenceCount(1);
expr_retcode = expr->eval(atp1, workAtp, 0, -1, rowLenPtr);
// if no errors then we allocate the actual size. If errors occur then we do not
// allocate and skip the defragmentation for this row
if (expr_retcode != ex_expr::EXPR_ERROR)
{
defragLength = rowLen;
#if (defined (NA_LINUX) && defined(_DEBUG) && !defined(__EID))
char txt[] = "exchange";
sql_buffer_pool::logDefragInfo(txt,
SqlBufferGetTuppSize(projRowLen, bufType()),
SqlBufferGetTuppSize(rowLen, bufType()),
getFreeSpace(),
this,
this->getTotalTuppDescs(),
tcb);
#endif
}
}
#endif
tdesc = add_tuple_desc(rowLen);
#pragma warn(1506) // warning elimination
if (! tdesc)
{
// no space to move data. Release the control
// information, if moved, from the reply buffer.
if (controlInfoMoved == TRUE)
remove_tuple_desc();
#if defined(__EID)
if (getTotalTuppDescs() == 0)
{
if (diagsArea == NULL)
{
// tbd - is expr ever null? Is expr->getHeap() ?
diagsArea = ComDiagsArea::allocate(expr->getHeap());
}
*diagsArea << DgSqlCode(-EXE_ROWLENGTH_EXCEEDS_BUFFER);
up_state errorUpState;
errorUpState.status = ex_queue::Q_SQLERROR;
tupp_descriptor * dDesc = NULL;
Int32 moveRetcode = 0;
moveRetcode = moveInSendOrReplyData(isSend,// false b/c this is EID
doMoveControl,
FALSE, // <-------- doMoveData.
&errorUpState,
controlInfoLen,
controlInfo,
projRowLen,
outTdesc,
diagsArea ,
&dDesc,
expr, atp1, workAtp,
destAtp,
tuppIndex,
doMoveStats,
statsArea,
statsDesc,
useExternalDA,
useExternalDA,
NULL,
noMoveWarnings
);
ex_assert((moveRetcode != SqlBuffer::BUFFER_FULL),
"sending an empty buffer from EID");
if (dDesc)
{
diagsArea->packObjIntoMessage(dDesc->getTupleAddress());
diagsArea->decrRefCount();
}
}
#endif
return SqlBuffer::BUFFER_FULL;
}
// mark that this is a data tupp descriptor
tdesc->setDataTupleType();
if (outTdesc != NULL)
*outTdesc = tdesc;
}
#ifndef UDRSERV_BUILD
if ((expr != NULL) && (tdesc != NULL))
{
destAtp->getTupp(tuppIndex) = tdesc;
if (diagsArea != NULL)
daMark = diagsArea->mark();
NABoolean backoutAndRestart = FALSE;
NABoolean atp1HadNoDiagsAreaBefore = TRUE;
if (atp1->getDiagsArea() != NULL)
{
atpDaMark = atp1->getDiagsArea()->mark();
atp1HadNoDiagsAreaBefore = FALSE;
}
//Clear the last 4 bytes of row buffer. This is to ensure the last 4
//filler bytes are initialized to zero. Also check to make sure the
//rowlength is >= 4 before clearing the bytes.
if(rowLen >= 4) {
char* fillerBytes = (char*)(tdesc->getTupleAddress() + (rowLen -4));
fillerBytes[0] = 0;
fillerBytes[1] = 0;
fillerBytes[2] = 0;
fillerBytes[3] = 0;
}
if (defragLength != 0)
{
str_cpy_all(tdesc->getTupleAddress(),
defragTd->getTupleAddress(),
defragLength);
}
else
{
// get row length after evaluating the expression. If the rowlen is modified,
// then this must be a CIF row and can be resized.
// Initialize rowLen to projRowLen because if the expr evaluation
// decides not to update the row length, there is no need to resize.
expr_retcode = expr->eval(atp1, workAtp,
0, -1, rowLenPtr);
if (expr_retcode == ex_expr::EXPR_ERROR)
{
if (isSend)
{
// No point flowing request down further. Back out and return the error.
destAtp->getTupp(tuppIndex).release();
remove_tuple_desc();
if (controlInfoMoved == TRUE)
remove_tuple_desc();
if (workAtp == destAtp)
destAtp->release();
return MOVE_ERROR;
}
else
{
// Errors raised while replying have to be processed here...
if (controlInfoMoved == FALSE)
{
// ... and we must have a ControlInfo to process errors.
backoutAndRestart = TRUE;
}
}
}
else // success
{ // resize the row if needed
if (rowLenPtr && (*rowLenPtr != projRowLen))
resize_tupp_desc(*rowLenPtr, tdesc->getTupleAddress());
}
}
if ( // Has a new diags area appeared?
(atp1->getDiagsArea() != NULL) && (atp1HadNoDiagsAreaBefore)
&& // And we are sending diags areas externally?
(useExternalDA)
&& // And state transistion from no DA to yes DA?
((flags_ & CURR_HAS_EDA) == FALSE)
)
{
backoutAndRestart = TRUE;
if (prevHadExternalDA)
{
flags_ |= CURR_HAS_EDA;
}
else
{
flags_ &= ~CURR_HAS_EDA;
}
}
// Sometimes we backout and restart, so that we guarantee a ControlInfo:
//
// 1. If an error condition is raised from the move expr, and we have no
// ControlInfo into which we can record Q_SQLERROR;
// 2. If a new diags area appears from the move expr, and the caller is using
// external diags areas and the current tuple placed into the buffer did
// not have an external diags area associated with it.
//
// Logic for this test is above.
if (backoutAndRestart)
{
// Start over, this time ask for a new ctrl info.
destAtp->getTupp(tuppIndex).release();
remove_tuple_desc();
if (controlInfoMoved == TRUE)
remove_tuple_desc();
atp1->getDiagsArea()->rewind(atpDaMark);
return moveInSendOrReplyData(isSend,
TRUE, // force a new ctrl info
doMoveData,
currQState,
controlInfoLen,
controlInfo,
projRowLen,
outTdesc,
NULL, diagsDesc,
expr, atp1, workAtp,
destAtp,
tuppIndex,
doMoveStats,
statsArea,
statsDesc,
useExternalDA,
useExternalDA, // always have ext. DA if used
defragTd
#if (defined (NA_LINUX) && defined(_DEBUG) && !defined(__EID))
,tcb
#endif
,noMoveWarnings
);
}
if (expr_retcode == ex_expr::EXPR_ERROR)
{
if (isSend == FALSE)
{
((ControlInfo *)(cdesc->getTupleAddress()))->getUpState().status
= ex_queue::Q_SQLERROR;
setUpStatusToError = TRUE;
retcode = MOVE_ERROR;
}
}
if (diagsArea == NULL)
{
diagsArea = atp1->getDiagsArea();
}
if (workAtp == destAtp)
destAtp->release();
} // if ((expr != NULL) && (tdesc != NULL))
#endif // UDRSERV_BUILD
// if warnings are not to be sent, dont send them.
// this is done when a vsbb buffer is being shipped.
// vsbb and sidetree eid code cannot handle input warnings.
// one day we will teach it to handle them.
if ((diagsArea != NULL && useExternalDA == FALSE) &&
(diagsArea->getNumber(DgSqlCode::ERROR_) == 0) &&
((diagsArea->getNumber(DgSqlCode::WARNING_) > 0) && noMoveWarnings))
{
// do nothing.
}
else if (diagsArea != NULL && useExternalDA == FALSE)
{
// allocate space for diags area.
tupp_descriptor * tempDiagsDesc = add_tuple_desc(
#pragma nowarn(1506) // warning elimination
diagsArea->packedLength());
#pragma warn(1506) // warning elimination
if (tempDiagsDesc == NULL)
{
// don't have enough space in this input buffer for diags.
// remove the tuple data desc, if any.
if (tdesc != NULL)
remove_tuple_desc();
// remove the control desc.
if (controlInfoMoved == TRUE)
remove_tuple_desc();
if (daMark >= 0)
diagsArea->rewind(daMark);
#if defined(__EID)
if (getTotalTuppDescs() == 0)
{
//there is no room for DA even, clear DA and send one condition only
if(!space_available(diagsArea->packedLength()))
diagsArea->clear();
*diagsArea << DgSqlCode(-EXE_ROWLENGTH_EXCEEDS_BUFFER);
up_state errorUpState;
errorUpState.status = ex_queue::Q_SQLERROR;
tupp_descriptor * dDesc = NULL;
Int32 moveRetcode ;
moveRetcode = moveInSendOrReplyData(isSend,// false b/c this is EID
doMoveControl,
FALSE, // <-------- doMoveData.
&errorUpState,
controlInfoLen,
controlInfo,
projRowLen,
outTdesc,
diagsArea ,
&dDesc,
expr, atp1, workAtp,
destAtp,
tuppIndex,
doMoveStats,
statsArea,
statsDesc,
useExternalDA,
useExternalDA,
NULL,
noMoveWarnings
);
ex_assert((moveRetcode != SqlBuffer::BUFFER_FULL),
"Sending an empty buffer from EID");
if (dDesc)
{
diagsArea->packObjIntoMessage(dDesc->getTupleAddress());
diagsArea->decrRefCount();
}
}
#endif
return SqlBuffer::BUFFER_FULL; // not enough space.
}
// mark that this is a diags tupp descriptor
(tempDiagsDesc)->setDiagsTupleType();
if (diagsDesc)
*diagsDesc = tempDiagsDesc;
}
#ifndef UDRSERV_BUILD
if ((doMoveStats) && (statsArea != NULL))
{
// stats can only be returned with EOD. So no data should
// be allocated at this time.
ex_assert((tdesc == NULL), "Cannot have data with stats");
// allocate space for stats area.
#pragma nowarn(1506) // warning elimination
*statsDesc = add_tuple_desc(statsArea->packedLength());
#pragma warn(1506) // warning elimination
if (*statsDesc == NULL)
{
// don't have enough space in this input buffer for stats.
// remove the control desc.
remove_tuple_desc();
return SqlBuffer::BUFFER_FULL; // not enough space.
}
// mark that this is a stats tupp descriptor
(*statsDesc)->setStatsTupleType();
}
#endif // UDRSERV_BUILD
if (controlInfoMoved && currQState)
{
// Set srControlInfo_ to the queue state of the most recent
// row in the SqlBuffer. Do this last, because only at this
// point can we be sure that the row really fits into the
// buffer.
if (isSend)
srControlInfo_.getDownState() = *(down_state *)currQState;
else
{
srControlInfo_.getUpState() = *(up_state *)currQState;
if (setUpStatusToError)
srControlInfo_.getUpState().status = ex_queue::Q_SQLERROR;
}
}
else if ((srFlags() == REPLY_GET_UNIQUE_REQUEST) &&
(doMoveData) &&
(NOT isSend) &&
(currQState))
{
srControlInfo_.getUpState() = *(up_state *)currQState;
}
return retcode;
}
/////////////////////////////////////////////////////////////////////////
// This method returns pairs of control and data(outTupp). The data
// returned may not be present for some cases, like Q_NO_DATA up state.
// The control and data inside the SqlBuffer are stored in a packed
// form while sending from/to one process to/from another. This method
// unpacks the control and data depending on the format described by
// srFlags_ field, and returns the control/data pair to caller.
//
// if noStateChange is set, then this moveOut is being called in read
// mode and has not yet been shipped. State of tuppDescs
// should not be changed in this case.
//
// See comments in sql_buffer.h too.
//
// In the following comments,
// C means a control tuple.
// D means a data tuple.
// P means the parent index. Pn means parent index 'n'.
//
// Buffer Layout is how tuples were stored inside the buffer by
// the method moveInSendOrReplyData. A NULL data tuple stored means
// that this is a data entry of length zero.
//
// Returned Tuple Pair is how this method returns data. A NULL data
// returned means no data was moved to outTupp.
//
// SEND_SAME_REQUEST:
// Buffer Layout: C1 D11 D12 D13 C2 C3 D31 D32
// C1, C2 and C3 differ in their downState.
//
// Returned tuples: (C1 D11), (C1 D12), (C1 D13), (C2 NULL),
// (C3 D31), (C3 D32)
//
// SEND_CONSECUTIVE_REQUEST:
// Buffer Layout: C1P1 D11 D12 D13 C2P9 C3P2 D31 D32
// A new control tuple is moved if the difference in
// parent index between two control is not equal to the
// number of data tuples in between.
//
// Returned tuples: (C1P1 D11), (C1P2 D12), (C1P3 D13), (C2P9 NULL),
// (C3P2 D31), (C3P3 D32)
//
//
// REPLY_GET_UNIQUE_REQUEST:
// Buffer Layout: C1P1 D11 D_NULL D13 C2P9 D_NULL C3P2 D31 D32
//
// Returned tuples: (C1P1 D11), (C1P1_Q_NO_DATA NULL),
// (C1P2_Q_NO_DATA NULL),
// (C1P3 D13), (C1P3_Q_NO_DATA NULL),
// (C2P9_Q_NO_DATA NULL),
// (C3P2 D31), (C3P2_Q_NO_DATA NULL),
// (C3P3 D32), (C3P2_Q_NO_DATA NULL)
//
//
// REPLY_GET_SUBSET_REQUEST:
// Buffer Layout: C1 D11 D12 D13 C2 C3 D31 D32
// C1, C2 and C3 differ in their downState.
// Returned tuples: (C1M1 D11), (C1M2 D12), (C1M3 D13),
// (C2 NULL),
// (C3M1 D31), (C3M2 D32)
//
//
// REPLY_VSBB_INSERT_REQUEST:
// In the following comments,
// N means a control tuple, standing for a Q_NO_DATA
// E means a control tuple, standing for a Q_SQLERROR
// D means a data tuple
// C means a diags area
// M means a matchNo, associated with a control tuple.
// In the following example, VSBB Insert was asked to insert 7 rows.
// Buffer Layout: N1M7 N2M0
// Returned Tuples: (N1M1), (N2M1), (N3M1), (N4M1), (N5M1), (N6M1),
// (N7M1), (N8M0)
// The astute reader may wonder about the presence of the extra (8th)
// Q_NO_DATA. This is indeed present, probably to respond to the GET_EOD
// that was sent by partition access.
//
// Q_SQLERROR control tuples use their matchNo member to generate Q_NO_DATA
// returned tuples. In this way, the Q_SQLERROR is returned in proper order.
// In the following example, VSBB Insert was asked to insert 7 rows,
// but an error occurred on the 4th.
// Buffer Layout: E1M3 D1 C1 N2M4 N2M0
// Returned Tuples: (N1M1), (N2M1), (N3M1), (E1M0 DATA DIAGS),
// (N4M1), (N5M1), (N6M1), (N7M1), (N8M0)
//
/////////////////////////////////////////////////////////////////////////
NABoolean SqlBuffer::moveOutSendOrReplyData(NABoolean isSend,
void * currQState,
tupp &outTupp,
ControlInfo ** controlInfo,
ComDiagsArea ** diagsArea,
ExStatisticsArea ** statsArea,
Int64 * numStatsBytes,
NABoolean noStateChange,
NABoolean unpackDA,
CollHeap * heap)
{
if (atEOTD())
return TRUE;
// Locate consecutive tupps.
tupp_descriptor *dataTuppDesc = 0;
tupp_descriptor *diagsTuppDesc = 0;
tupp_descriptor *statsTuppDesc = 0;
tupp_descriptor *tuppDesc = 0;
unsigned short dataDescIndex = 0;
TupleDescInfo *dataTuppDescInfo = 0;
ComDiagsArea * unpackedDiagsArea = NULL;
if (tuppDescIndex_ < maxTuppDesc_)
{
switch (bufType())
{
case NORMAL_:
{
#pragma nowarn(1506) // warning elimination
tuppDesc = tupleDesc(tuppDescIndex_);
#pragma warn(1506) // warning elimination
if (tuppDesc->isControlTuple())
{
if (flags_ & IGNORE_CONTROL_FLAG)
break; // case done and break out.
if (setupSrControlInfo( isSend, tuppDesc )
== DEFER_MORE_TUPPS)
break;
if (NOT noStateChange)
tuppDesc->resetCommFlags();
if (++tuppDescIndex_ < maxTuppDesc_)
tuppDesc = tupleDesc(tuppDescIndex_);
else
break; // tupp is control tuple.
}
if (tuppDesc->isDataTuple())
{
dataTuppDesc = tuppDesc;
dataDescIndex = tuppDescIndex_;
if (++tuppDescIndex_ < maxTuppDesc_)
tuppDesc = tupleDesc(tuppDescIndex_);
else
break; // tupp is data tuple.
}
if (tuppDesc->isDiagsTuple())
{
if (NOT noStateChange)
tuppDesc->resetCommFlags();
diagsTuppDesc = tuppDesc;
if (++tuppDescIndex_ < maxTuppDesc_)
tuppDesc = tupleDesc(tuppDescIndex_);
else
break; // tupp is diags tuple.
}
if (tuppDesc->isStatsTuple())
{
if (NOT noStateChange)
tuppDesc->resetCommFlags();
statsTuppDesc = tuppDesc;
++tuppDescIndex_;
}
break;
} // case NORMAL_
case DENSE_:
{
SqlBufferDense* denseBuf = (SqlBufferDense*)(this);
tuppDesc = denseBuf->getCurrDense();
if (tuppDesc && tuppDesc->isControlTuple())
{
if (flags_ & IGNORE_CONTROL_FLAG)
break; // case done and break out.
if (setupSrControlInfo( isSend, tuppDesc )
== DEFER_MORE_TUPPS)
break;
if (NOT noStateChange)
tuppDesc->resetCommFlags();
denseBuf->advanceDense();
tuppDesc = denseBuf->getCurrDense();
}
if (tuppDesc && tuppDesc->isDataTuple())
{
dataTuppDesc = tuppDesc;
dataDescIndex = tuppDescIndex_;
dataTuppDescInfo = castToSqlBufferDense()->currTupleDesc();
denseBuf->advanceDense();
tuppDesc = denseBuf->getCurrDense();
}
if (tuppDesc && tuppDesc->isDiagsTuple())
{
if (NOT noStateChange)
tuppDesc->resetCommFlags();
diagsTuppDesc = tuppDesc;
denseBuf->advanceDense();
tuppDesc = denseBuf->getCurrDense();
}
if (tuppDesc && tuppDesc->isStatsTuple())
{
if (NOT noStateChange)
tuppDesc->resetCommFlags();
statsTuppDesc = tuppDesc;
denseBuf->advanceDense();
tuppDesc = denseBuf->getCurrDense();
}
break;
} // case DENSE_
default:
UdrExeAssert(0, "SqlBuffer::moveOutSendOrReplyData() invalid case");
} // switch
} // more tupp descs
if (controlInfo)
{
*controlInfo = &srControlInfo_;
}
srControlInfo_.setIsDataRowPresent(FALSE);
if (isSend == TRUE)
*(down_state *)currQState = srControlInfo_.getDownState();
else
*(up_state *)currQState = srControlInfo_.getUpState();
switch (srFlags_)
{
case DEFAULT_:
case SEND_SAME_REQUEST:
{
// if next tuple (if present) is data, return it. And advance
// position.
if (dataTuppDesc)
{
srControlInfo_.setIsDataRowPresent(TRUE);
outTupp = dataTuppDesc;
}
}
break;
case SEND_CONSECUTIVE_REQUEST:
{
if (dataTuppDesc)
{
srControlInfo_.setIsDataRowPresent(TRUE);
outTupp = dataTuppDesc;
// advance the parent index
srControlInfo_.getDownState().parentIndex++;
}
}
break;
case SEND_VSBB_WITH_EOD:
{
// if next tuple (if present) is data, return it. And advance
// position.
if (dataTuppDesc)
{
srControlInfo_.setIsDataRowPresent(TRUE);
outTupp = dataTuppDesc;
// need to send an EOD on the next call.
flags_ |= IGNORE_CONTROL_FLAG;
}
else
{
((down_state *)currQState)->request = ex_queue::GET_EOD;
flags_ &= ~IGNORE_CONTROL_FLAG;
}
}
break;
case REPLY_GET_UNIQUE_REQUEST:
{
if (dataTuppDesc)
{
if (srControlInfo_.getUpState().status == ex_queue::Q_OK_MMORE)
{
srControlInfo_.setIsDataRowPresent(TRUE);
outTupp = dataTuppDesc;
// Restore dataDescIndex, dataTuppDesc.
// Start at the same data tupple on the next call.
// The elseif part will be executed on the next call.
tuppDescIndex_ = dataDescIndex;
if (castToSqlBufferDense()) // DENSE_ buffer
castToSqlBufferDense()->currTupleDesc() =
dataTuppDescInfo;
statsTuppDesc = diagsTuppDesc = 0;
srControlInfo_.getUpState().status = ex_queue::Q_NO_DATA;
// We need to return two control info for each data row.
// Ignore next tuple on next call to this proc.
flags_ |= IGNORE_CONTROL_FLAG;
}
else if (srControlInfo_.getUpState().status ==
ex_queue::Q_NO_DATA)
{
srControlInfo_.getUpState().parentIndex++;
srControlInfo_.getUpState().status = ex_queue::Q_OK_MMORE;
flags_ &= ~IGNORE_CONTROL_FLAG;
}
}
}
break;
case REPLY_GET_SUBSET_REQUEST:
{
if (dataTuppDesc)
{
srControlInfo_.setIsDataRowPresent(TRUE);
outTupp = dataTuppDesc;
// advance the match no
if (srControlInfo_.getUpState().getMatchNo() != OverflowedMatchNo)
{
Int64 matchNo64 = srControlInfo_.getUpState().getMatchNo();
srControlInfo_.getUpState().setMatchNo(matchNo64 + 1);
}
}
}
break;
case REPLY_VSBB_INSERT_REQUEST:
{
// if next tuple (if present) is data, return it.
if (dataTuppDesc)
{
srControlInfo_.setIsDataRowPresent(TRUE);
outTupp = dataTuppDesc;
}
if ((diagsTuppDesc) &&
(unpackDA) &&
(heap))
{
ComDiagsArea * d =
(ComDiagsArea *)(diagsTuppDesc->getTupleAddress());
unpackedDiagsArea = ComDiagsArea::allocate(heap);
unpackedDiagsArea->unpackObj(d->getType(),
d->getVersion(),
TRUE, // sameEndianness,
0, // dummy for now...
(IpcConstMessageBufferPtr) d);
if (unpackedDiagsArea->containsRowCountFromEID())
{
flags_ |= RETURN_ROWCOUNT;
}
if (controlInfo)
{
(*controlInfo)->setIsDiagsAreaUnpacked(TRUE);
}
}
if (srControlInfo_.getUpState().matchNo > 0)
{
if (flags_ & RETURN_ROWCOUNT)
{
if (flags_ & ROWCOUNT_RETURNED)
((up_state *)currQState)->matchNo = 0;
else
{
((up_state *)currQState)->matchNo =
(Lng32)unpackedDiagsArea->getRowCount();
unpackedDiagsArea->setContainsRowCountFromEID(FALSE);
unpackedDiagsArea->setRowCount(0);
flags_ |= ROWCOUNT_RETURNED;
}
}
else
((up_state *)currQState)->matchNo = 1;
((up_state *)currQState)->status = ex_queue::Q_NO_DATA;
}
else
{
((up_state *)currQState)->matchNo = 0;
}
srControlInfo_.getUpState().downIndex++;
// We need to return matchNo control info tuples.
// Ignore next tuple on next call to this proc.
// It will be advanced after returning all control tuples.
flags_ |= IGNORE_CONTROL_FLAG;
if (srControlInfo_.getUpState().downIndex >=
(queue_index) srControlInfo_.getUpState().matchNo)
{
flags_ &= ~IGNORE_CONTROL_FLAG;
}
}
break;
} // switch on srFlags_
// if next tuple (if present) is diags, return it.
if (diagsTuppDesc)
{
srControlInfo_.setIsDiagsAreaPresent(TRUE);
if (unpackedDiagsArea)
{
*diagsArea = unpackedDiagsArea;
}
else
*diagsArea = (ComDiagsArea *)(diagsTuppDesc->getTupleAddress());
}
else
{
srControlInfo_.setIsDiagsAreaPresent(FALSE);
if (diagsArea)
*diagsArea = NULL;
}
// if next tuple (if present) is stats, return it.
if (statsArea != NULL)
{
if (statsTuppDesc)
{
*statsArea = (ExStatisticsArea *)(statsTuppDesc->getTupleAddress());
*numStatsBytes = sizeof(*statsTuppDesc) + statsTuppDesc->getAllocatedSize();
}
else
{
*statsArea = NULL;
*numStatsBytes = 0;
}
}
return FALSE;
}
Int32 SqlBuffer::setupSrControlInfo( NABoolean isSend, tupp_descriptor *tuppDesc )
{
// remember that flags_ were overloaded with relocatedAddress
// in the tupp_descriptor (just trying to save some space).
// Now that the buffer has reached its destination, the
// flags_ field could not be used.
ControlInfo *ci = (ControlInfo *)(tuppDesc->getTupleAddress());
srControlInfo_.copyFlags(*ci);
if (isSend == TRUE)
{
srControlInfo_.getDownState() = ci->getDownState();
}
else
{
srControlInfo_.getUpState() = ci->getUpState();
}
if (srFlags_ == REPLY_VSBB_INSERT_REQUEST)
{
// Use downIndex to keep track of how many tuples have been
// returned.
srControlInfo_.getUpState().downIndex = 0;
// SQLERROR reply implies fatal error and OK_MMORE reply implies
// nonfatal error for VSBB insert
if ((srControlInfo_.getUpState().status == ex_queue::Q_SQLERROR ||
srControlInfo_.getUpState().status == ex_queue::Q_OK_MMORE )
&& srControlInfo_.getUpState().matchNo > 0)
{
// Will eventually process this ci again, so mark it
// so that only one sequence of Q_NO_DATAs are unpacked.
ci->getUpState().matchNo = 0;
return DEFER_MORE_TUPPS;
}
}
return PROCEED_MORE_TUPPS;
}
NABoolean SqlBuffer::findAndCancel( queue_index pindex, NABoolean startFromBeginning)
{
// start at the beginning or at curr, depending on argument.
// for each tuple, see if it is a control tuple.
// if so, see if downstate.parentIndex matches pindex.
// if so, change downstate.request to GET_NOMORE & return TRUE,
// if not found, return FALSE.
// save and restore tuppDescIndex_, regardless of outcome.
#pragma nowarn(1506) // warning elimination
Lng32 saveTuppDescIndex = tuppDescIndex_;
#pragma warn(1506) // warning elimination
NABoolean wasFound = FALSE;
if (startFromBeginning)
position();
while ( ! atEOTD() )
{
if (getCurr()->isControlTuple())
{
down_state *ds = &((ControlInfo *)(getCurr()->getTupleAddress()))->
getDownState();
if (ds->parentIndex == pindex)
{
// eureka
ds->request = ex_queue::GET_NOMORE;
wasFound = TRUE;
break;
}
}
advance();
}
position(saveTuppDescIndex);
return wasFound;
}
unsigned short SqlBuffer::getNumInvalidTuplesUptoNow()
{
UdrExeAssert(0, "Do not call the base class method");
return 0;
}
void SqlBuffer::setNumInvalidTuplesUptoNow(Lng32 val)
{
UdrExeAssert(0, "Do not call the base class method");
}
void SqlBuffer::incNumInvalidTuplesUptoNow()
{
UdrExeAssert(0, "Do not call the base class method");
}
void SqlBuffer::decNumInvalidTuplesUptoNow()
{
UdrExeAssert(0, "Do not call the base class method");
}
void SqlBuffer::advanceToNextValidTuple()
{
advance();
while ((!(atEOTD())) && getCurr()->isInvalidTuple())
{
advance();
incNumInvalidTuplesUptoNow();
}
}
static Int64 debugCnt = 0;
void SqlBuffer::setSignature()
{
signature_ = ++debugCnt;
}
NABoolean SqlBuffer::checkSignature(const Int32 nid, const Int32 pid,
Int64 *signature, const Int32 action)
{
if (signature_ <= *signature)
{
if (action > 0)
{
char coreFile[512];
msg_mon_dump_process_id(NULL, nid, pid, coreFile);
}
return FALSE;
}
*signature = signature_;
return TRUE;
}
////////////////////////////////////////////////////////////////////////////
// class SqlBuffer
////////////////////////////////////////////////////////////////////////////
void SqlBufferNormal::init(ULng32 in_size_in_bytes,
NABoolean clear)
{
SqlBuffer::init(in_size_in_bytes, clear);
dataOffset_ = ROUND8(sizeof(*this)); // data starts after this object
// last byte in buffer, rounded to previous 4 byte boundary
tupleDescOffset_ = ROUND4(sizeInBytes_ - 3);
numInvalidTuplesUptoCurrentPosition_ = (unsigned short) NO_INVALID_TUPLES;
if (clear)
memset((char*)this+dataOffset_, 0, freeSpace_);
// adjust freeSpace_ to account for the unused bytes due to rounding.
freeSpace_ = freeSpace_ - (sizeInBytes_ - tupleDescOffset_);
normalBufFlags_ = 0;
}
void SqlBufferNormal::init()
{
SqlBuffer::init();
dataOffset_ = ROUND8(sizeof(*this)); // data starts after this object
// last byte in buffer, rounded to previous 4 byte boundary
tupleDescOffset_ = ROUND4(sizeInBytes_ - 3);
numInvalidTuplesUptoCurrentPosition_ = (unsigned short) NO_INVALID_TUPLES;
// adjust freeSpace_ to account for the unused bytes due to rounding.
freeSpace_ = freeSpace_ - (sizeInBytes_ - tupleDescOffset_);
normalBufFlags_ = 0;
}
// allocate a new tupp descriptor
tupp_descriptor *SqlBufferNormal::allocate_tuple_desc(Lng32 tup_data_size)
{
// NOTE: SqlBufferNormal::allocate_tuple_desc and
// SqlBufferNormal::remove_tuple_desc must be kept
// consistent. remove_tuple_desc should correctly reverse any
// modifications done by the most recent allocate_tuple_desc.
UInt32 rounded_size = ROUND8(tup_data_size);
Lng32 freeSpaceNeeded = rounded_size + sizeof(tupp_descriptor);
if (freeSpace_ < freeSpaceNeeded) // no free space to allocate this tuple
return NULL;
freeSpace_ -= freeSpaceNeeded;
tupp_descriptor * td = tupleDesc(maxTuppDesc_);
maxTuppDesc_++;
td->init((ULng32)tup_data_size,
0,
&((char *)this)[dataOffset_]);
dataOffset_ += rounded_size;
// if buffer is empty, then change status to partially full.
if (bufferStatus_ == EMPTY)
bufferStatus_ = PARTIAL;
return td;
}
// adds a tuple descriptor
tupp_descriptor *SqlBufferNormal::add_tuple_desc(Lng32 tup_data_size)
{
return allocate_tuple_desc(tup_data_size);
}
// removes the last tuple descriptor.
void SqlBufferNormal::remove_tuple_desc()
{
// NOTE: SqlBufferNormal::allocate_tuple_desc and
// SqlBufferNormal::remove_tuple_desc must be kept
// consistent. remove_tuple_desc should correctly reverse any
// modifications done by the most recent allocate_tuple_desc.
if (maxTuppDesc_ > 0)
{
maxTuppDesc_--;
tupp_descriptor * td = tupleDesc(maxTuppDesc_);
UInt32 rounded_size = ROUND8(td->getAllocatedSize());
dataOffset_ -= rounded_size;
freeSpace_ += rounded_size + sizeof(tupp_descriptor);
if (maxTuppDesc_ == 0)
bufferStatus_ = EMPTY;
}
}
// Resize the last allocated tuple in the Dense buffer.
void SqlBufferDense::resize_tupp_desc(UInt32 tup_data_size, char *dataPointer)
{
#ifndef UDRSERV_BUILD
ex_assert(lastTupleDesc(), "resizing non-existant td");
#endif
tupp_descriptor * td = lastTupleDesc()->tupleDesc();
UInt32 rounded_new_size = ROUND8(tup_data_size);
UInt32 rounded_old_size = ROUND8(td->getAllocatedSize());
Int32 diff = rounded_old_size - rounded_new_size;
#ifndef UDRSERV_BUILD
ex_assert((diff >= 0), "trying to increase tupp size");
ex_assert(dataPointer == td->getTupleAddress(), "Bad resize");
#endif
freeSpace_ += diff;
td->setAllocatedSize(tup_data_size);
}
// adjusts data size in the last tupp descriptor
void SqlBufferNormal::resize_tupp_desc(UInt32 tup_data_size, char *dataPointer)
{
tupp_descriptor * td = tupleDesc(maxTuppDesc_ - 1);
UInt32 rounded_new_size = ROUND8(tup_data_size);
UInt32 rounded_old_size = ROUND8(td->getAllocatedSize());
Int32 diff = rounded_old_size - rounded_new_size;
#ifndef UDRSERV_BUILD
ex_assert((diff >= 0), "trying to increase tupp size");
ex_assert(dataPointer == td->getTupleAddress(), "Bad resize");
#endif
dataOffset_ -= diff;
freeSpace_ += diff;
td->setAllocatedSize(tup_data_size);
}
// RETURNS: -1 if this buffer is free, 0 if not.
short SqlBufferNormal::isFree()
{
// if this buffer is in use by an I/O operation
// then don't free it. It has not been filled up yet.
if (bufferStatus_ == IN_USE)
return 0;
// if the buffer is empty then it's free, of course
if (bufferStatus_ == EMPTY)
return -1;
// if all tuple descriptors in this SqlBuffer are not being referenced,
// then set the buffer status to EMPTY.
// Backward search for efficiency. (eric)
for (Int32 i = (maxTuppDesc_ - 1); i >= 0; i--)
{
if (tupleDesc(i)->getReferenceCount() != 0)
// this tuple descriptor is still in use
return 0;
}
return -1;
}
// positions to the first tuple descriptor in the buffer.
void SqlBufferNormal::position()
{
tuppDescIndex_ = 0;
}
// positions to the "tupp_desc_num"th tupp descriptor.
void SqlBufferNormal::position(Lng32 tupp_desc_num)
{
tuppDescIndex_ = (unsigned short)tupp_desc_num;
}
// print buffer info. For debugging
void SqlBufferNormal::printInfo() {
cerr << this << " Status: " << bufferStatus_
<< " maxTuppDesc: " << maxTuppDesc_
<< " freeSpace: " << freeSpace_
<< " numInvalidTuplesUptoCurrentPosition: " << numInvalidTuplesUptoCurrentPosition_ ;
#pragma warn(161) // warning elimination
Lng32 refcount = 0;
Lng32 firstref = -1;
#pragma warning (disable : 4018) //warning elimination
for (Int32 i = 0; (i < maxTuppDesc_); i++)
if (tupleDesc(i)->getReferenceCount()) {
if (firstref == -1)
firstref = i;
refcount++;
};
#pragma warning (default : 4018) //warning elimination
cerr << " ref tupps: " << refcount << " first: " << firstref << endl;
};
// returns the 'next' tuple descriptor. Increments the
// position. Returns null, if none found.
tupp_descriptor * SqlBufferNormal::getNext()
{
if (tuppDescIndex_ < maxTuppDesc_)
#pragma nowarn(1506) // warning elimination
return tupleDesc(tuppDescIndex_++);
#pragma warn(1506) // warning elimination
else
return 0;
}
// returns the 'prev' tuple descriptor. Decrements the
// position. Returns null, if none found.
tupp_descriptor * SqlBufferNormal::getPrev()
{
if (tuppDescIndex_ > 0)
#pragma nowarn(1506) // warning elimination
return tupleDesc(tuppDescIndex_--);
#pragma warn(1506) // warning elimination
else
return 0;
}
// Returns the "tupp_desc_num"th tupp descriptor. The input
// tupp_desc_num is 1-based, so the first tupp_descriptor
// is tupp_desc_num = 1.
// Returns NULL, if tupp_desc_num is greater than maxTuppDesc_.
tupp_descriptor * SqlBufferNormal::getTuppDescriptor(Lng32 tupp_desc_num)
{
#pragma warning (disable : 4018) //warning elimination
if (tupp_desc_num > maxTuppDesc_)
#pragma warning (default : 4018) //warning elimination
return 0;
else
return tupleDesc(tupp_desc_num-1);
}
void SqlBufferNormal::pack()
{
UInt32 i = 0;
while (i < maxTuppDesc_)
{
tupleDesc(i)->setTupleOffset((Lng32)(
(char *)tupleDesc(i)->getTupleAddress() -
(char *)this));
i++;
}
}
void SqlBufferNormal::unpack()
{
UInt32 i = 0;
while (i < maxTuppDesc_)
{
tupleDesc(i)->setTupleAddress((char *)this +
tupleDesc(i)->getTupleOffset());
i++;
}
}
// This verify() method will be called only on packed objects. It
// should return FALSE if it detects that unpacking the object could
// create any internal inconsistencies, or bad pointers that reference
// memory outside the bounds of this object. Currently verify() is
// only called by the ExUdrTcb class when it receives a data reply
// from the non-trusted UDR server.
NABoolean SqlBufferNormal::verify(ULng32 maxBytes) const
{
if ((ULng32) sizeInBytes_ > maxBytes)
return FALSE;
if (tupleDescOffset_ < sizeof(*this))
return FALSE;
ULng32 bytesForTupps = (maxTuppDesc_ * sizeof(tupp_descriptor));
if (bytesForTupps > maxBytes)
return FALSE;
// We are going to loop through all the tuples and keep track of how
// far their data buffers extend. If any data buffer extends beyond
// maxBytes or beyond sizeInBytes_ then we must return
// FALSE. bytesSeen will represent the end of the furthest reaching
// data buffer seen so far.
ULng32 bytesSeen = sizeof(*this);
tupp_descriptor *tdBase = (tupp_descriptor *)
((char *) this + tupleDescOffset_);
for (Int32 i = 0; i < (Int32)maxTuppDesc_; i++)
{
ULng32 offset = tdBase[-(i+1)].getTupleOffset();
if (offset > dataOffset_)
return FALSE;
ULng32 endOfData = offset + tdBase[-(i+1)].getAllocatedSize();
bytesSeen = MAXOF(bytesSeen, endOfData);
}
if (bytesSeen > dataOffset_)
return FALSE;
if (bytesSeen > maxBytes)
return FALSE;
if (bytesSeen > (ULng32) sizeInBytes_)
return FALSE;
return TRUE;
}
unsigned short SqlBufferNormal::getNumInvalidTuplesUptoNow()
{
return numInvalidTuplesUptoCurrentPosition_;
}
void SqlBufferNormal::setNumInvalidTuplesUptoNow(Lng32 val)
{
UdrExeAssert(val < USHRT_MAX, "total invalid tupps in buffer is larger than USHRT_MAX");
numInvalidTuplesUptoCurrentPosition_ = (unsigned short) val;
}
void SqlBufferNormal::incNumInvalidTuplesUptoNow()
{
UdrExeAssert(numInvalidTuplesUptoCurrentPosition_ < USHRT_MAX,
"total invalid tupps in buffer is larger than USHRT_MAX");
numInvalidTuplesUptoCurrentPosition_++;
}
void SqlBufferNormal::decNumInvalidTuplesUptoNow()
{
numInvalidTuplesUptoCurrentPosition_--;
}
/////////////////////////////////////////////////////////////////////
// class SqlBufferDense
/////////////////////////////////////////////////////////////////////
SqlBufferDense::SqlBufferDense() : SqlBuffer(DENSE_)
{
currTupleDesc() = NULL;
lastTupleDesc() = NULL;
#pragma nowarn(161) // warning elimination
numInvalidTuplesUptoCurrentPosition_ = (unsigned short) NO_INVALID_TUPLES;
#pragma warn(161) // warning elimination
str_pad(filler_, 2, '\0');
}
void SqlBufferDense::init(ULng32 in_size_in_bytes,
NABoolean clear)
{
SqlBuffer::init(in_size_in_bytes, FALSE);
lastTupleDesc_ = NULL;
currTupleDesc_ = NULL;
#pragma nowarn(161) // warning elimination
numInvalidTuplesUptoCurrentPosition_ = (unsigned short) NO_INVALID_TUPLES;
#pragma warn(161) // warning elimination
str_pad(filler_, 2, '\0');
}
void SqlBufferDense::init()
{
SqlBuffer::init();
lastTupleDesc_ = NULL;
currTupleDesc_ = NULL;
#pragma nowarn(161) // warning elimination
numInvalidTuplesUptoCurrentPosition_ = (unsigned short) NO_INVALID_TUPLES;
#pragma warn(161) // warning elimination
str_pad(filler_, 2, '\0');
}
tupp_descriptor *SqlBufferDense::add_tuple_desc(Lng32 tup_data_size)
{
ULng32 rounded_size = ROUND8(tup_data_size);
short td_size = ROUND8(sizeof(TupleDescInfo));
#pragma nowarn(1506) // warning elimination
Lng32 freeSpaceNeeded = td_size + rounded_size;
#pragma warn(1506) // warning elimination
if (freeSpace_ < freeSpaceNeeded) // no free space to allocate this tuple
return NULL;
maxTuppDesc_ += 1;
freeSpace_ -= freeSpaceNeeded;
// if buffer is empty, then change status to partially full.
if (bufferStatus_ == EMPTY)
bufferStatus_ = PARTIAL;
TupleDescInfo * tdi = NULL;
if (lastTupleDesc())
tdi =
(TupleDescInfo *)(lastTupleDesc()->tupleDesc()->getTupleAddress()
+ ROUND8(lastTupleDesc()->tupleDesc()->getAllocatedSize()));
else
tdi = firstTupleDesc();
tupp_descriptor * td = tdi->tupleDesc();
td->init((unsigned short)tup_data_size,
0,
(char *)td + td_size);
setPrevTupleDesc(tdi, lastTupleDesc());
if (lastTupleDesc())
setNextTupleDesc(lastTupleDesc(), tdi);
lastTupleDesc() = tdi;
setNextTupleDesc(tdi, NULL);
return td;
}
tupp_descriptor *SqlBufferDense::allocate_tuple_desc(Lng32 tup_data_size)
{
return add_tuple_desc(tup_data_size);
}
// removes the last tuple descriptor.
void SqlBufferDense::remove_tuple_desc()
{
if (maxTuppDesc_ > 0)
{
// we got rid of the last tupp_descriptor
maxTuppDesc_--;
freeSpace_ += (ROUND8(sizeof(tupp_descriptor))
+ ROUND8(lastTupleDesc()->tupleDesc()->getAllocatedSize()));
if (lastTupleDesc() = getPrevTupleDesc(lastTupleDesc())) // =assignment, NOT ==
setNextTupleDesc(lastTupleDesc(), NULL);
compactBuffer();
if (maxTuppDesc_ == 0)
bufferStatus_ = EMPTY;
}
}
short SqlBufferDense::isFree()
{
// if this buffer is in use by an I/O operation
// then don't free it. It has not been filled up yet.
if (bufferStatus_ == IN_USE)
return 0;
// if the buffer is empty then it's free, of course
if (bufferStatus_ == EMPTY)
return -1;
// if all tuple descriptors in this SqlBuffer are not being referenced,
// then set the buffer status to EMPTY.
TupleDescInfo * tdi = lastTupleDesc();
while (tdi)
{
if (tdi->tupleDesc()->getReferenceCount() != 0)
// this tuple descriptor is still in use
return 0;
tdi = getPrevTupleDesc(tdi);
}
return -1;
}
// positions to the first tuple descriptor in the buffer.
void SqlBufferDense::position()
{
tuppDescIndex_ = 0;
currTupleDesc() = firstTupleDesc();
}
void SqlBufferDense::position(Lng32 tupp_desc_num)
{
position();
#pragma warning (disable : 4018) //warning elimination
while ((tuppDescIndex_ != tupp_desc_num) &&
(tuppDescIndex_ < maxTuppDesc_))
advanceDense();
#pragma warning (default : 4018) //warning elimination
}
// returns the 'next' tuple descriptor. Increments the
// position. Returns null, if none found.
tupp_descriptor * SqlBufferDense::getNext()
{
if (tuppDescIndex_ < maxTuppDesc_)
{
TupleDescInfo * tdi = currTupleDesc();
currTupleDesc() = getNextTupleDesc(tdi);
tuppDescIndex_++;
return tdi->tupleDesc();
}
else
return NULL;
}
// returns the 'prev' tuple descriptor. Decrements the
// position. Returns null, if none found.
tupp_descriptor * SqlBufferDense::getPrev()
{
if (tuppDescIndex_ > 0)
{
TupleDescInfo * tdi = currTupleDesc();
currTupleDesc() = getPrevTupleDesc(tdi);
tuppDescIndex_--;
return tdi->tupleDesc();
}
else
return NULL;
}
// Returns the "tupp_desc_num"th tupp descriptor. The input
// tupp_desc_num is 1-based, so the first tupp_descriptor
// is tupp_desc_num = 1.
// Returns NULL, if tupp_desc_num is greater than maxTuppDesc_.
tupp_descriptor * SqlBufferDense::getTuppDescriptor(Lng32 tupp_desc_num)
{
#pragma warning (disable : 4018) //warning elimination
if (tupp_desc_num > maxTuppDesc_)
return 0;
#pragma warning (default : 4018) //warning elimination
else
return tupleDesc(tupp_desc_num-1);
}
SqlBufferHeader::moveStatus SqlBufferDense::moveInVsbbEOD(void * down_q_state)
{
srFlags_ = SEND_VSBB_WITH_EOD;
return MOVE_SUCCESS;
}
void TupleDescInfo::pack(Long base)
{
tupleDesc()->setTupleOffset
((Long)tupleDesc()->getTupleAddress() - base);
}
void TupleDescInfo::unpack(Long base)
{
tupleDesc()->setTupleAddress((char *)base +
tupleDesc()->getTupleOffset());
}
void SqlBufferDense::pack()
{
if (maxTuppDesc_ > 0)
{
TupleDescInfo * temp = firstTupleDesc();
while (temp)
{
TupleDescInfo * nextTDI = getNextTupleDesc(temp);
temp->pack((Long)this);
temp = nextTDI;
}
}
if (lastTupleDesc())
lastTupleDesc() = (TupleDescInfo *)((char *)lastTupleDesc() - (char *)this);
if (currTupleDesc())
currTupleDesc() = (TupleDescInfo *)((char *)currTupleDesc() - (char *)this);
}
void SqlBufferDense::unpack()
{
if (lastTupleDesc())
lastTupleDesc() = (TupleDescInfo *)((Long)lastTupleDesc() + (char *)this);
if (currTupleDesc())
currTupleDesc() = (TupleDescInfo *)((Long)currTupleDesc() + (char *)this);
if (maxTuppDesc_ > 0)
{
TupleDescInfo * temp = firstTupleDesc();
while (temp)
{
temp->unpack((Long)this);
temp = getNextTupleDesc(temp);
}
}
}
unsigned short SqlBufferDense::getNumInvalidTuplesUptoNow()
{
return numInvalidTuplesUptoCurrentPosition_;
}
void SqlBufferDense::setNumInvalidTuplesUptoNow(Lng32 val)
{
UdrExeAssert(val < USHRT_MAX, "total invalid tupps in buffer is larger than USHRT_MAX");
numInvalidTuplesUptoCurrentPosition_ = (unsigned short) val;
}
void SqlBufferDense::incNumInvalidTuplesUptoNow()
{
UdrExeAssert(numInvalidTuplesUptoCurrentPosition_ < USHRT_MAX,
"total invalid tupps in buffer is larger than USHRT_MAX");
numInvalidTuplesUptoCurrentPosition_++;
}
void SqlBufferDense::decNumInvalidTuplesUptoNow()
{
numInvalidTuplesUptoCurrentPosition_--;
}
#define FAILURE { DebugBreak(); }
#ifdef DO_INTEGRITY_CHECK
void SqlBuffer::integrityCheck( NABoolean ignore_omm_check)
{
const Lng32 savedTuppDescIndex = getProcessedTuppDescs();
tupp p;
ControlInfo * ci = 0;
ComDiagsArea * d = NULL;
ExStatisticsArea * s = NULL;
Int64 numStatsBytes = 0;
up_state upState;
enum okMMoreType
{
NOT_KNOWN_ = 0,
YES_ = 1,
NO_ = 3
} okMMoreHasData = NOT_KNOWN_;
position();
while (atEOTD() != -1)
{
moveOutSendOrReplyData(
FALSE, // reply data
&upState,
p,
&ci, &d, &s,
&numStatsBytes,
TRUE, // noStateChange
FALSE, // unpack DA if bufInBuf
NULL // CollHeap (needed if unpack DA)
);
switch (upState.status)
{
case ex_queue::Q_NO_DATA:
{
if (ci->getIsDataRowPresent())
FAILURE;
break;
}
case ex_queue::Q_GET_DONE:
case ex_queue::Q_REC_SKIPPED:
{
if (ci->getIsDataRowPresent())
FAILURE;
if (ci->getIsDiagsAreaPresent())
FAILURE;
break;
}
case ex_queue::Q_OK_MMORE:
{
if (ignore_omm_check)
break;
switch (okMMoreHasData)
{
case NOT_KNOWN_:
{
okMMoreHasData = p.isAllocated() ? YES_ : NO_;
break;
}
case NO_:
{
if (p.isAllocated() == TRUE)
FAILURE;
break;
}
case YES_:
{
if (p.isAllocated() == FALSE)
FAILURE;
break;
}
default:
{
FAILURE;
break;
}
}
break;
}
case ex_queue::Q_STATS:
{
if (ci->getIsDataRowPresent())
FAILURE;
if (ci->getIsDiagsAreaPresent())
FAILURE;
break;
}
case ex_queue::Q_SQLERROR:
{
if (!(ci->getIsDiagsAreaPresent()))
FAILURE;
break;
}
case ex_queue::Q_INVALID:
default:
FAILURE;
}
}
position(savedTuppDescIndex);
return;
}
#else
// See sql_buffer.h for SqlBufferBase no-op implementation.
#endif
#ifndef UDRSERV_BUILD
////////////////////////////////////////////////////////////////////////////
// class sql_buffer_pool
//
// Allocate in_number_of_buffers each with a size of buffer_size.
// Chain them anchored at buffer_list.
////////////////////////////////////////////////////////////////////////////
sql_buffer_pool::sql_buffer_pool(Lng32 numberOfBuffers,
Lng32 bufferSize,
CollHeap * space,
SqlBufferBase::BufferType bufType)
: bufType_(bufType),
defragBuffer_(NULL),
defragTd_(NULL)
{
ex_assert((space != 0), "Must pass in a non-null Space pointer");
currNumberOfBuffers_ = 0;
maxNumberOfBuffers_ = numberOfBuffers;
memoryUsed_ = 0;
staticBufferList_ = NULL;
dynBufferList_ = NULL;
space_ = space;
defaultBufferSize_ = bufferSize;
flags_ = 0;
staticBufferList_ = new(space_) Queue(space_);
dynBufferList_ = new(space_) Queue(space_);
}
sql_buffer_pool::~sql_buffer_pool()
{
char * currBuffer;
if (staticBufferList_)
{
staticBufferList_->position();
while (currBuffer = (char *)(staticBufferList_->getCurr()))
{
space_->deallocateMemory(currBuffer);
// move to the next entry
staticBufferList_->advance();
}
//delete staticBufferList_;
NADELETE(staticBufferList_, Queue, space_);
staticBufferList_ = NULL;
}
if (dynBufferList_)
{
dynBufferList_->position();
while (currBuffer = (char *)(dynBufferList_->getCurr()))
{
space_->deallocateMemory(currBuffer);
// move to the next entry
dynBufferList_->advance();
}
//delete dynBufferList_;
NADELETE(dynBufferList_, Queue, space_);
dynBufferList_ = NULL;
}
if (defragBuffer_)
{
space_->deallocateMemory(defragBuffer_);
}
}
SqlBufferBase * sql_buffer_pool::addBuffer(Lng32 totalBufferSize,
SqlBufferBase::BufferType bufType,
CollHeap * space)
{
// allocate a size that is divisible by 8, otherwise the data that
// is allocated from the end of the buffer won't be aligned properly
Lng32 rounded_size = ROUND8(totalBufferSize);
// allocate a buffer of the right size
char * buf = new(space) char[rounded_size];
SqlBufferBase *newBuffer = NULL;
if (bufType == SqlBufferBase::NORMAL_)
newBuffer = new(buf) SqlBufferNormal();
else if (bufType == SqlBufferBase::DENSE_)
newBuffer = new(buf) SqlBufferDense();
else
newBuffer = new(buf) SqlBufferOlt();
// now make the raw data buffer into an sql_buffer
newBuffer->init(rounded_size);
return newBuffer;
}
SqlBufferBase * sql_buffer_pool::addBuffer(Queue * bufferList,
Lng32 totalBufferSize,
SqlBufferBase::BufferType bufType,
bool failureIsFatal)
{
// allocate a size that is divisible by 8, otherwise the data that
// is allocated from the end of the buffer won't be aligned properly
Lng32 rounded_size = ROUND8(totalBufferSize);
// allocate a buffer of the right size
char * buf = (char*)space_->allocateMemory(rounded_size, failureIsFatal);
if (buf == NULL)
{
return NULL;
}
SqlBufferBase *newBuffer = NULL;
if (bufType == SqlBufferBase::NORMAL_)
newBuffer = new(buf) SqlBufferNormal();
else if (bufType == SqlBufferBase::DENSE_)
newBuffer = new(buf) SqlBufferDense();
else
newBuffer = new(buf) SqlBufferOlt();
// now make the raw data buffer into an sql_buffer
newBuffer->init(rounded_size);
// add the newly created buffer to the end of the queue
bufferList->insert(newBuffer);
// update # of buffers in this pool and memory usage
if (bufferList == dynBufferList_)
currNumberOfBuffers_++;
memoryUsed_ += rounded_size;
return newBuffer;
}
// create a new buffer that is used to hold row temporarily in order
// to do defragmentation and create a tuple descriptor
tupp_descriptor * sql_buffer_pool::addDefragTuppDescriptor(Lng32 dataSize )
{
if (defragTd_)
{
return defragTd_;
}
if (!defragBuffer_)
{
Lng32 neededBufferSize =
(Lng32) SqlBufferNeededSize( 1, dataSize );
// allocate a size that is divisible by 8, otherwise the data that
// is allocated from the end of the buffer won't be aligned properly
Lng32 rounded_size = ROUND8(neededBufferSize);
// allocate a buffer of the right size
char * buf = (char*)space_->allocateMemory(rounded_size, FALSE);
if (buf == NULL)
{
return NULL;
}
SqlBufferBase *newBuffer = NULL;
if (bufType_ == SqlBufferBase::NORMAL_)
newBuffer = new(buf) SqlBufferNormal();
else if (bufType_ == SqlBufferBase::DENSE_)
newBuffer = new(buf) SqlBufferDense();
else
newBuffer = new(buf) SqlBufferOlt();
// now make the raw data buffer into an sql_buffer
newBuffer->init(rounded_size);
defragBuffer_ = newBuffer;
}
tupp_descriptor * td = defragBuffer_->add_tuple_desc(dataSize);
defragTd_ = td;
return defragTd_;
}
SqlBufferBase * sql_buffer_pool::addBuffer(Lng32 totalBufferSize,
bool failureIsFatal)
{
return addBuffer(dynBufferList_, totalBufferSize, bufType_,
failureIsFatal);
}
SqlBufferBase * sql_buffer_pool::addBuffer(Lng32 totalBufferSize,
SqlBufferBase::BufferType bufType,
bool failureIsFatal)
{
return addBuffer(dynBufferList_, totalBufferSize, bufType,
failureIsFatal);
}
SqlBufferBase * sql_buffer_pool::addBuffer(Lng32 numTupps, Lng32 recordLength)
{
return addBuffer((Lng32) SqlBufferNeededSize(numTupps,recordLength,
((bufType_ == SqlBufferBase::DENSE_)
? SqlBuffer::DENSE_
: SqlBuffer::NORMAL_)));
}
SqlBufferBase *sql_buffer_pool::get_free_buffer(Lng32 freeSpace)
{
return getBuffer(freeSpace,-1);
}
// RETURNS: 0, if tuple found. -1, if not found.
short sql_buffer_pool::get_free_tuple(tupp &tp, Lng32 tupDataSize,
SqlBuffer **buf)
{
if(buf)
*buf = NULL;
// there is a bug in SqlBufferDense::add_tuple_desc(), when
// tup_data_size is 0, the tuple address of next allocated
// tupp_descriptor will overlap to the tuple address of
// previous allocated tupp_descriptor.
// Until fixed, set tupDataSize to 8 if it is 0
if (tupDataSize <= 0)
tupDataSize = 8;
#pragma nowarn(1506) // warning elimination
Lng32 neededSpace = SqlBufferGetTuppSize(tupDataSize,bufType_);
#pragma warn(1506) // warning elimination
SqlBuffer *currBuffer;
if (!(currBuffer =getCurrentBuffer(neededSpace)))
{
currBuffer = (SqlBuffer*)getBuffer(neededSpace);
}
if (currBuffer) // buffer found
{
tupp_descriptor *tuple_desc =
currBuffer->allocate_tuple_desc(tupDataSize);
// failed to set tp to a tuple_descriptor from the pool
if (!tuple_desc)
return -1;
tuple_desc->setReferenceCount(0);
tp = tuple_desc;
if(buf)
*buf = currBuffer;
return 0;
///return tuple_desc;
}
// failed to set tp to a tuple_descriptor from the pool
return -1;
}
//check if the current buffer in the pool has enough space
short sql_buffer_pool::currentBufferHasEnoughSpace( Lng32 tupDataSize)
{
SqlBuffer * currBuffer;
if (!(currBuffer = (SqlBuffer *)(activeBufferList()->getCurr())))
{
return -1;
}
Lng32 neededSpace = SqlBufferGetTuppSize(tupDataSize,bufType_);
if (neededSpace <= currBuffer->getFreeSpace())
{
return 1;
}
else
{
return 0;
}
}
SqlBuffer * sql_buffer_pool::getCurrentBuffer()
{
return ((SqlBuffer *)(activeBufferList()->getCurr()));
}
// returns a new tupp_descriptor in the pool
// that has a data length of tupDataSize.
// Sets the reference count to 1.
// RETURNS: tupp_descriptor, if tuple found. NULL, otherwise.
// If buf is non-Null, also sets buf to the buffer from which
// the tupp_descriptor was allocated. This is used to later
// resize the tuple if needed.
tupp_descriptor * sql_buffer_pool::get_free_tupp_descriptor(Lng32 tupDataSize,
SqlBuffer **buf)
{
if(buf)
*buf = NULL;
#pragma nowarn(1506) // warning elimination
Lng32 neededSpace = SqlBufferGetTuppSize(tupDataSize,bufType_);
#pragma warn(1506) // warning elimination
// get a buffer which is not FULL and can allocate tup_data_size
//SqlBuffer *currBuffer = (SqlBuffer*)getBuffer(neededSpace);
// this change is not really related to CIF changes but it may imrove
// performance. I did a similar change to get_free_tuple function
// few years ago and it improved performance
SqlBuffer *currBuffer;
if (!(currBuffer =getCurrentBuffer(neededSpace)))
{
currBuffer = (SqlBuffer*)getBuffer(neededSpace);
}
if (currBuffer) // buffer found
{
// allocate a tuple_desc of the desired size and assign its
// address to tp
tupp_descriptor *tuple_desc =
currBuffer->allocate_tuple_desc(tupDataSize);
// failed to set tp to a tuple_descriptor from the pool
if (!tuple_desc)
return NULL;
tuple_desc->setReferenceCount(1);
if(buf)
*buf = currBuffer;
return tuple_desc;
}
else
// failed to set tp to a tuple_descriptor from the pool
return NULL;
}
void sql_buffer_pool::free_buffers()
{
SqlBuffer *currBuffer;
// position to the first entry
activeBufferList()->position();
while (currBuffer = ((SqlBuffer *)(activeBufferList()->getCurr())))
{
if (!currBuffer->freeBuffer())
{
// couldn't free up this buffer.
// Well, at least compact it.
currBuffer->compactBuffer();
}
// move to the next entry
activeBufferList()->advance();
}
}
void sql_buffer_pool::compact_buffers()
{
SqlBuffer *currBuffer;
// position to the first entry
activeBufferList()->position();
while (currBuffer = ((SqlBuffer *)(activeBufferList()->getCurr())))
{
currBuffer->compactBuffer();
// move to the next entry
activeBufferList()->advance();
}
}
#if (!defined(NA_NSK) && defined(_DEBUG)) || !defined(__EID)
// for debugging purposes
void sql_buffer_pool::printAllBufferInfo() {
staticBufferList_->position();
SqlBuffer * buf;
while (buf = (SqlBuffer *)staticBufferList_->getNext())
buf->printInfo();
dynBufferList_->position();
while (buf = (SqlBuffer *)dynBufferList_->getNext())
buf->printInfo();
};
#endif
SqlBufferBase * sql_buffer_pool::findBuffer(Lng32 freeSpace,
Int32 mustBeEmpty)
{
SqlBuffer *currBuffer;
// walk the list until we find a buffer that we like.
activeBufferList()->position();
while (currBuffer = ((SqlBuffer *)(activeBufferList()->getCurr())))
{
// We like a buffer if it isn't in use by an I/O operation and
// if it has enough space. Some callers also require that it must
// be empty.
if ((currBuffer->bufferStatus_ != SqlBuffer::IN_USE) &&
(!mustBeEmpty || (currBuffer->bufferStatus_ == SqlBuffer::EMPTY)) &&
(freeSpace <= currBuffer->getFreeSpace()))
{
return currBuffer;
}
else
// move to the next entry
activeBufferList()->advance();
}
// out of luck
return NULL;
}
SqlBuffer * sql_buffer_pool::getCurrentBuffer(Lng32 freeSpace,Int32 mustBeEmpty)
{
SqlBuffer * currBuffer;
if ((currBuffer = (SqlBuffer *)(activeBufferList()->getCurr())) &&
(currBuffer->bufferStatus_ != SqlBuffer::IN_USE) &&
(!mustBeEmpty || (currBuffer->bufferStatus_ == SqlBuffer::EMPTY)) &&
(freeSpace <= currBuffer->getFreeSpace()))
{
return currBuffer;
}
else
return NULL;
}
SqlBufferBase * sql_buffer_pool::getBuffer(Lng32 freeSpace,
Int32 mustBeEmpty)
{
SqlBufferBase * result = findBuffer(freeSpace,mustBeEmpty);
if (result)
return result;
// buffer not found. Free up buffers which do not have anyone
// pointing to them and search again.
free_buffers();
result = findBuffer(freeSpace,mustBeEmpty);
// #if defined(NA_WINNT) && defined(_DEBUG)
// if (! result)
// printAllBufferInfo();
// #endif
if (! result)
{
if (staticMode())
{
// allocate a buffer for some static data structures,
// but try to use a denseer buffer size because it is
// likely that there won't be a lot of such static structures
// (guess that there are 10)
Lng32 neededStaticBufferSize;
#pragma warning (disable : 4018) //warning elimination
if (freeSpace > 0)
neededStaticBufferSize =
MINOF((Lng32) SqlBufferNeededSize(
10,
freeSpace,
bufType_),
defaultBufferSize_);
#pragma warning (default : 4018) //warning elimination
else
neededStaticBufferSize = defaultBufferSize_;
result = addBuffer(staticBufferList_,
neededStaticBufferSize,
bufType_);
}
else if (currNumberOfBuffers_ < maxNumberOfBuffers_)
{
// The requested size may be wrong if the buffer size isn't computed
// correctly in codegen.
Lng32 bufferHeaderSize = (Lng32) SqlBufferHeaderSize(bufType_);
// The space needed supplied by the caller (freeSpace) does not
// account for the buffer header (buffer object) size. Add this
// space to the needed space
Lng32 buffSize = MAXOF(freeSpace+bufferHeaderSize,
defaultBufferSize_);
// just add another buffer to the dynamic buffer list
result = addBuffer(dynBufferList_, buffSize, bufType_);
// Possible we can't even get the first buffer due to memory pressure.
UdrExeAssert((result != NULL) && (get_number_of_buffers() != 0),
"Buffer size not computed correctly at codegen");
}
}
return result;
}
void sql_buffer_pool::getUsedMemorySize(UInt32 &staticMemSize,
UInt32 &dynMemSize)
{
staticMemSize = sizeof(sql_buffer_pool);
staticMemSize += 2 * sizeof(Queue);
char * currBuffer;
staticBufferList_->position();
while (currBuffer = (char *)(staticBufferList_->getCurr()))
{
staticMemSize += ((SqlBuffer *)currBuffer)->get_used_size();
// move to the next entry
staticBufferList_->advance();
}
dynBufferList_->position();
dynMemSize = 0;
while (currBuffer = (char *)(dynBufferList_->getCurr()))
{
dynMemSize += ((SqlBuffer *)currBuffer)->get_used_size();
// move to the next entry
dynBufferList_->advance();
}
}
void sql_buffer_pool::resizeLastTuple(UInt32 tup_data_size, char *dataPointer)
{
SqlBuffer *currBuffer = (SqlBuffer *)(activeBufferList()->getCurr());
currBuffer->resize_tupp_desc(tup_data_size, dataPointer);
}
SqlBufferHeader::moveStatus
sql_buffer_pool::moveIn(atp_struct *atp1,
atp_struct *atp2,
UInt16 tuppIndex,
Lng32 tupDataSize,
ex_expr_base *moveExpr,
NABoolean addBufferIfNeeded,
Lng32 bufferSize)
{
if (defragTd_ &&
!currentBufferHasEnoughSpace(tupDataSize))
{
// get row length after evaluating the expression and try to
// to get new tuple with the actual row size. The actual row
// size may be smaller that the max row size and may fit in the remaining
// buffer space
UInt32 defMaxRowLen = tupDataSize;
UInt32 defRowLen = defMaxRowLen;
defragTd_->setReferenceCount(1);
atp2->getTupp(tuppIndex)= defragTd_;
ex_expr::exp_return_type retCode;
retCode = moveExpr->eval(atp1, atp2, 0, -1, &defRowLen);
if (retCode == ex_expr::EXPR_ERROR)
{
return SqlBufferHeader::MOVE_ERROR;
}
if (!get_free_tuple(atp2->getTupp(tuppIndex), defRowLen))
{
#if (defined (NA_LINUX) && defined(_DEBUG) && !defined(__EID))
char txt[] = "hashj";
SqlBuffer *buf = getCurrentBuffer();
sql_buffer_pool::logDefragInfo(txt,
SqlBufferGetTuppSize(tupDataSize, buf->bufType()),
SqlBufferGetTuppSize(defRowLen, buf->bufType()),
buf->getFreeSpace(),
buf,
buf->getTotalTuppDescs());
#endif
char * defragDataPointer = defragTd_->getTupleAddress();
char * dataPointer = atp2->getTupp(tuppIndex).getDataPointer();
str_cpy_all(dataPointer,
defragDataPointer,
defRowLen);
return SqlBufferHeader::MOVE_SUCCESS;
}
}
if (get_free_tuple(atp2->getTupp(tuppIndex), tupDataSize)) {
if(addBufferIfNeeded) {
addBuffer(bufferSize);
if (get_free_tuple(atp2->getTupp(tuppIndex), tupDataSize)) {
ex_assert(0, "sql_buffer_pool::moveIn() No more space for tuples");
}
} else {
return SqlBufferHeader::BUFFER_FULL;
}
}
UInt32 maxRowLen = tupDataSize;
UInt32 rowLen = maxRowLen;
ex_expr::exp_return_type retCode;
retCode = moveExpr->eval(atp1, atp2, 0, -1, &rowLen);
if (retCode == ex_expr::EXPR_ERROR)
{
return SqlBufferHeader::MOVE_ERROR;
}
// Resize the row if the actual size is different from the max size (leftRowLen)
if(rowLen != maxRowLen) {
resizeLastTuple(rowLen,
atp2->getTupp(tuppIndex).getDataPointer());
}
return SqlBufferHeader::MOVE_SUCCESS;
}
SqlBufferHeader::moveStatus
sql_buffer_pool::moveIn(atp_struct *atp,
UInt16 tuppIndex,
Lng32 tupDataSize,
char *srcData,
NABoolean addBufferIfNeeded,
Lng32 bufferSize)
{
if (get_free_tuple(atp->getTupp(tuppIndex), tupDataSize)) {
if(addBufferIfNeeded) {
addBuffer(bufferSize);
if (get_free_tuple(atp->getTupp(tuppIndex), tupDataSize)) {
ex_assert(0, "sql_buffer_pool::moveIn() No more space for tuples");
}
} else {
return SqlBufferHeader::BUFFER_FULL;
}
}
str_cpy_all(atp->getTupp(tuppIndex).getDataPointer(),
srcData,
tupDataSize);
return SqlBufferHeader::MOVE_SUCCESS;
}
/////////////////////////////////////////////////////////////////////
// class SqlBufferOlt
//
// SqlBufferOlt is used to minimize the number of bytes replied
// from dp2 to exe. A special version of this is SqlBufferOltSmall
// which is used to send data from exe to dp2 and could also be used
// during reply if certain conditions are met.
//
// An OLT buffer can send or reply atmost one data tuple.
// No down or up state is sent in the buffer. The OLT PA node
// remembers the down state info (parent index, etc) and returns
// that to its parent along with data returned by DP2.
//
// SqlBufferOlt has a header that has send/reply, buffer and content flags.
// For description of these flags, see the header file sql_buffer.h.
//
// Following the header, it can have data + warning diag or an error diag.
// It can also have rows affected, row count or stats. Appropriate
// flags are set in the header. A tupp descriptor is allocated for each
// of the above.
//
// In the moveReply functions, the diagsArea can either be passed in
// as a parameter or can be generated when evaluating an expression.
//
/////////////////////////////////////////////////////////////////////
void SqlBufferOlt::init(ULng32 in_size_in_bytes, NABoolean clear)
{
SqlBufferBase::init(in_size_in_bytes, clear);
oltBufFlags_ = 0;
contents_ = NOTHING_YET_;
filler_ = 0;
}
// reinitialize data members; note that this version of init()
// can only be used after a buffer has been initialized via
// init(unsigned long).
void SqlBufferOlt::init()
{
SqlBufferBase::init();
oltBufFlags_ = 0;
contents_ = NOTHING_YET_;
filler_ = 0;
}
// converts all pointers in tuple descriptors to offset relative
// to the beginning of SqlBuffer. NOTE: SqlBuffer is not derived
// from the ExGod class but has similar pack and unpack methods.
void SqlBufferOlt::pack()
{
if (containsOltSmall())
return;
Int32 numTupps = 0;
switch (getContents())
{
case ERROR_:
case DATA_:
case NODATA_ROWCOUNT_:
case NODATA_WARNING_:
case NODATA_ROWAFFECTED_WARNING_:
numTupps = 1;
break;
case DATA_WARNING_:
case NODATA_ROWCOUNT_WARNING_:
numTupps = 2;
break;
}
if (isStats())
numTupps++;
tupp_descriptor * nextTdesc = NULL;
tupp_descriptor * tdesc = getNextTuppDesc(NULL);
Int32 i = 0;
while (i++ < numTupps)
{
if (i < numTupps)
nextTdesc = getNextTuppDesc(tdesc);
tdesc->setTupleOffset((char *)tdesc->getTupleAddress() -
(char *)this);
tdesc = nextTdesc;
}
SqlBufferBase::pack();
}
void SqlBufferOlt::unpack()
{
Int32 numTupps = 0;
switch (getContents())
{
case ERROR_:
case DATA_:
case NODATA_ROWCOUNT_:
case NODATA_WARNING_:
case NODATA_ROWAFFECTED_WARNING_:
numTupps = 1;
break;
case DATA_WARNING_:
case NODATA_ROWCOUNT_WARNING_:
numTupps = 2;
break;
}
if (isStats())
numTupps++;
tupp_descriptor * tdesc = NULL;
for (Int32 i = 0; i < numTupps; i++)
{
tdesc = getNextTuppDesc(tdesc);
tdesc->setTupleAddress((char *)this + tdesc->getTupleOffset());
}
SqlBufferBase::unpack();
}
SqlBufferBase::moveStatus
SqlBufferOlt::moveInSendData(ULng32 projRowLen,
ex_expr_base * expr,
atp_struct * atp1,
atp_struct * workAtp,
unsigned short tuppIndex)
{
SqlBufferOltSmall * smallBuf = (SqlBufferOltSmall*)getSendDataPtr();
smallBuf->setBufType(SqlBufferHeader::OLT_SMALL_);
smallBuf->init();
if (projRowLen > 0)
{
smallBuf->setSendData(TRUE);
tupp_descriptor td;
td.init(projRowLen, NULL, smallBuf->sendDataPtr());
workAtp->getTupp(tuppIndex) = &td;
ex_expr::exp_return_type rc = expr->eval(atp1, workAtp);
workAtp->getTupp(tuppIndex).release();
if (rc == ex_expr::EXPR_ERROR)
{
return MOVE_ERROR;
} // error
}
// since we are not allocating a tupp desc to indicate the input
// row length, set this value in the sizeInBytes_ field.
#pragma nowarn(1506) // warning elimination
sizeInBytes_ = sizeof(SqlBufferOltSmall) + projRowLen;
#pragma warn(1506) // warning elimination
return MOVE_SUCCESS;
}
#pragma nowarn(770) // warning elimination
SqlBufferBase::moveStatus
SqlBufferOlt::moveInReplyData(NABoolean doMoveControl,
NABoolean doMoveData,
void * currQState,
ULng32 projRowLen,
ComDiagsArea * diagsArea,
tupp_descriptor ** diagsDesc,
ex_expr_base * expr,
atp_struct * atp1,
atp_struct * workAtp,
atp_struct * destAtp,
unsigned short tuppIndex,
NABoolean doMoveStats,
ExStatisticsArea * statsArea,
tupp_descriptor ** statsDesc)
{
if ((((up_state *)currQState)->matchNo > 1) || (diagsArea) ||
(getContents() == ERROR_) || (getContents() == DATA_WARNING_) ||
((statsArea) && (NOT statsArea->smallStatsObj())))
{
return moveInSendOrReplyData(FALSE, // reply
doMoveControl, doMoveData, currQState,
0, NULL,
projRowLen, NULL,
diagsArea, diagsDesc,
expr, atp1, workAtp, destAtp,
tuppIndex,
doMoveStats, statsArea, statsDesc);
}
if (doMoveData)
setDataProcessed(TRUE);
setContainsOltSmall(TRUE);
SqlBufferOltSmall * smallBuf = (SqlBufferOltSmall*)getReplyDataPtr();
if ((doMoveData) ||
(doMoveControl && (NOT dataProcessed())))
{
smallBuf->setBufType(SqlBufferHeader::OLT_SMALL_);
smallBuf->init();
// since we are not allocating a tupp desc to indicate the input
// row length, set this value in the sizeInBytes_ field.
sizeInBytes_ = sizeof(SqlBufferOltSmall);
}
if (((up_state *)currQState)->status == ex_queue::Q_STATS)
{
smallBuf->setReplyStatsOnly(TRUE);
UdrExeAssert(doMoveStats && statsArea && statsDesc && *statsDesc,
"Incorrect parameters for Q_STATS");
}
else if (((up_state *)currQState)->matchNo > 0)
smallBuf->setReplyRowAffected(TRUE);
if (doMoveData)
{
smallBuf->setReplyData(TRUE);
if (expr)
{
atp_struct * atp = ((destAtp != workAtp) ? destAtp : workAtp);
tupp_descriptor td;
NABoolean tuppAllocated = FALSE;
if (atp->getTupp(tuppIndex).isAllocated())
{
atp->getTupp(tuppIndex).setDataPointer((char *)smallBuf+sizeInBytes_);
}
else
{
tuppAllocated = TRUE;
td.init(projRowLen, NULL, (char *)smallBuf+sizeInBytes_);
atp->getTupp(tuppIndex) = &td;
}
ex_expr::exp_return_type rc = ex_expr::EXPR_OK;
if (expr)
rc = expr->eval(atp1, workAtp);
if (tuppAllocated)
{
atp->getTupp(tuppIndex).release();
}
// an error or warning was returned by expr eval method.
// Cannot use small olt buf.
if (atp1->getDiagsArea())
{
// clean diags area and call moveInSendOrReplyData.
// It will reevaluate the expression.
atp1->getDiagsArea()->clear();
setContainsOltSmall(FALSE);
return moveInSendOrReplyData(
FALSE, // reply
doMoveControl, doMoveData, currQState,
0, NULL,
projRowLen, NULL,
atp1->getDiagsArea(), diagsDesc,
expr, atp1, workAtp, destAtp,
tuppIndex,
doMoveStats, statsArea, statsDesc);
} // error or warning
} // expr passed in
sizeInBytes_ += projRowLen;
}
if ((doMoveStats) && (statsArea) && (statsDesc) && (*statsDesc))
{
// allocate space for stats area.
short statsAreaLen = (short)statsArea->packedLength();
sizeInBytes_ = ROUND2(sizeInBytes_);
*(short*)((char *)smallBuf + sizeInBytes_)
= statsAreaLen;
sizeInBytes_ += sizeof(short);
char * statsAreaLoc =
(statsArea->smallStatsObj() ? ((char *)smallBuf+sizeInBytes_)
: (char*)( ROUND8 (Long((char*)smallBuf+sizeInBytes_) ) ));
(*statsDesc)->init(statsAreaLen, 0, statsAreaLoc);
sizeInBytes_ = (statsAreaLoc - (char *)smallBuf) + statsAreaLen;
smallBuf->setReplyStats(TRUE);
}
return MOVE_SUCCESS;
}
#pragma warn(770) // warning elimination
SqlBufferBase::moveStatus
SqlBufferOlt::moveInSendOrReplyData(NABoolean isSend,
NABoolean doMoveControl,
NABoolean doMoveData,
void * currQState,
ULng32 controlInfoLen,
ControlInfo ** controlInfo,
ULng32 projRowLen,
tupp_descriptor ** outTdesc,
ComDiagsArea * diagsArea,
tupp_descriptor ** diagsDesc,
ex_expr_base * expr,
atp_struct * atp1,
atp_struct * workAtp,
atp_struct * destAtp,
unsigned short tuppIndex,
NABoolean doMoveStats,
ExStatisticsArea * statsArea,
tupp_descriptor ** statsDesc,
NABoolean useExternalDA,
NABoolean callerHasExternalDA,
tupp_descriptor * defragTd
#if (defined (NA_LINUX) && defined(_DEBUG) && !defined(__EID))
,ex_tcb * tcb
#endif
,NABoolean noMoveWarnings
)
{
// this should be used for reply only.
if (isSend)
return MOVE_ERROR;
if ((! diagsArea) && (((up_state *)currQState)->matchNo <= 1) &&
(getContents() != ERROR_) && (getContents() != DATA_WARNING_) &&
((! statsArea) || (statsArea->smallStatsObj())))
{
return moveInReplyData(doMoveControl, doMoveData, currQState,
projRowLen, diagsArea, diagsDesc,
expr, atp1, workAtp, destAtp, tuppIndex,
doMoveStats, statsArea, statsDesc);
}
setContainsOltSmall(FALSE);
SqlBuffer::moveStatus retcode = MOVE_SUCCESS;
tupp_descriptor * tdesc = NULL;
// Genesis 10-040126-2683 and 10-040126-0354.
// On entry into this function, sizeInBytes_ represents the max size of the
// buffer. This function will later change it to the actual number of bytes
// used. Note the max size so that we can ensure the number of bytes used does
// not exceed this limit.
ULng32 maxBufSize = sizeInBytes_;
if ((! doMoveData) && (getContents() != NOTHING_YET_))
{
// either one tupp(data or error) or two tupps (data + warning)
// have been moved in. Skip them.
tdesc = getNextTuppDesc(tdesc); // skip data or error
if (getContents() == DATA_WARNING_)
tdesc = getNextTuppDesc(tdesc); // skip the warning tupp
}
if (doMoveData) // (doMoveData) && (projLen > 0))
{
setContents(DATA_);
tdesc = getNextTuppDesc(tdesc, projRowLen);
if (outTdesc != NULL)
*outTdesc = tdesc;
if (destAtp)
destAtp->getTupp(tuppIndex) = tdesc;
if ((expr != NULL) &&
((expr->eval(atp1, workAtp)) == ex_expr::EXPR_ERROR))
{
if ((destAtp) && (workAtp == destAtp))
destAtp->release();
retcode = MOVE_ERROR;
} // expr present and error
else
setDataProcessed(TRUE);
// the expr eval could have produced a warning or error diag.
// copy it to the diagsArea parameter. A tupp for diag will
// be allocated below based on the diagsArea.
if (diagsArea == NULL)
diagsArea = atp1->getDiagsArea();
}
else if ((doMoveControl) && (getContents() == NOTHING_YET_))
// else if ((doMoveControl) && (NOT dataProcessed()))
{
if (((up_state *)currQState)->status == ex_queue::Q_STATS)
{
setContents(STATSONLY_);
UdrExeAssert(doMoveStats && statsArea && statsDesc && *statsDesc,
"Incorrect parameters for Q_STATS");
}
else if (((up_state *)currQState)->matchNo == 0)
setContents(NODATA_);
else if (((up_state *)currQState)->matchNo == 1)
setContents(NODATA_ROWAFFECTED_);
else
{
setContents(NODATA_ROWCOUNT_);
tdesc = getNextTuppDesc(tdesc, sizeof(Lng32));
*(Lng32*)tdesc->getTupleAddress() = ((up_state *)currQState)->matchNo;
}
}
if (diagsArea != NULL)
{
if (diagsArea->mainSQLCODE() < 0)
{
setContents(ERROR_);
// allocate space for error diags area.
#pragma nowarn(1506) // warning elimination
tdesc = getNextTuppDesc(NULL, diagsArea->packedLength());
#pragma warn(1506) // warning elimination
}
else
{
// Genesis 10-040126-2683 and 10-040126-0354.
// When multiple warnings are generated, the total size of the packed
// buffer sometimes exceeds the allocated size. When this happens, the
// receiving mxci process gets only a partial buffer. And when this
// partial buffer is unpacked, it causes an assertion failure. Since
// the buffers cant be resized, we have to ensure the diags area
// always fits into the space available. So, if there isnt enough
// space, we will first try reducing the number of warnings to just
// one. If there isnt space for even a single warning, the diags area
// wont be packed at all.
if ((UInt32) sizeInBytes_ == maxBufSize)
{
// No data rows are being returned. This diags area is the first
// thing to be packed into the buffer.
sizeInBytes_ = 0;
}
if (diagsArea->packedLength() + sizeInBytes_ > maxBufSize)
{
// We cannot fit all the warnings into the reply buffer.
// Delete all but the first warning.
while (diagsArea->getNumber(DgSqlCode::WARNING_) > 1)
diagsArea->deleteWarning(1);
if (diagsArea->packedLength() + sizeInBytes_ > maxBufSize)
{
// No space for any warnings.
// Should never come here because the buffer should always be big
// enough to contain at least one warning.
diagsArea->deleteWarning(0);
}
}
// move diags area plus warning(s)
#pragma nowarn(1506) // warning elimination
tdesc = getNextTuppDesc(tdesc, diagsArea->packedLength());
#pragma warn(1506) // warning elimination
switch (getContents())
{
case DATA_:
setContents(DATA_WARNING_);
break;
case NODATA_:
setContents(NODATA_WARNING_);
break;
case NODATA_ROWAFFECTED_:
setContents(NODATA_ROWAFFECTED_WARNING_);
break;
case NODATA_ROWCOUNT_:
setContents(NODATA_ROWCOUNT_WARNING_);
break;
}
}
if (diagsDesc)
*diagsDesc = tdesc;
}
if ((doMoveStats) && (statsArea != NULL))
{
// allocate space for stats area.
#pragma nowarn(1506) // warning elimination
tdesc = getNextTuppDesc(tdesc, statsArea->packedLength());
#pragma warn(1506) // warning elimination
if (statsDesc)
*statsDesc = tdesc;
setIsStats(TRUE);
}
return retcode;
}
NABoolean SqlBufferOlt::moveOutSendOrReplyData(NABoolean isSend,
void * currQState,
tupp &outTupp,
ControlInfo ** controlInfo,
ComDiagsArea ** diagsArea,
ExStatisticsArea ** statsArea,
Int64 * numStatsBytes,
NABoolean noStateChange,
NABoolean unpackDA,
CollHeap * heap)
{
tupp_descriptor * tDesc = NULL;
switch (getContents())
{
case ERROR_:
{
tDesc = getNextTuppDesc(tDesc);
*diagsArea = (ComDiagsArea *)(tDesc->getTupleAddress());
((up_state *)currQState)->status = ex_queue::Q_SQLERROR;
}
break;
case DATA_:
{
tDesc = getNextTuppDesc(tDesc);
outTupp = tDesc;
((up_state *)currQState)->status = ex_queue::Q_OK_MMORE;
((up_state *)currQState)->matchNo = 1;
}
break;
case DATA_WARNING_:
{
tDesc = getNextTuppDesc(tDesc);
outTupp = tDesc;
((up_state *)currQState)->status = ex_queue::Q_OK_MMORE;
((up_state *)currQState)->matchNo = 1;
tDesc = getNextTuppDesc(tDesc);
*diagsArea = (ComDiagsArea *)(tDesc->getTupleAddress());
}
break;
case NODATA_:
{
((up_state *)currQState)->status = ex_queue::Q_NO_DATA;
((up_state *)currQState)->matchNo = 0;
}
break;
case NODATA_ROWAFFECTED_:
{
((up_state *)currQState)->status = ex_queue::Q_NO_DATA;
((up_state *)currQState)->matchNo = 1;
}
break;
case NODATA_ROWCOUNT_:
{
((up_state *)currQState)->status = ex_queue::Q_NO_DATA;
tDesc = getNextTuppDesc(tDesc);
((up_state *)currQState)->matchNo = *(Lng32*)tDesc->getTupleAddress();
}
break;
case NODATA_WARNING_:
{
((up_state *)currQState)->status = ex_queue::Q_NO_DATA;
((up_state *)currQState)->matchNo = 0;
tDesc = getNextTuppDesc(tDesc);
*diagsArea = (ComDiagsArea *)(tDesc->getTupleAddress());
}
break;
case NODATA_ROWAFFECTED_WARNING_:
{
((up_state *)currQState)->status = ex_queue::Q_NO_DATA;
((up_state *)currQState)->matchNo = 1;
tDesc = getNextTuppDesc(tDesc);
*diagsArea = (ComDiagsArea *)(tDesc->getTupleAddress());
}
break;
case NODATA_ROWCOUNT_WARNING_:
{
((up_state *)currQState)->status = ex_queue::Q_NO_DATA;
tDesc = getNextTuppDesc(tDesc);
((up_state *)currQState)->matchNo = *(Lng32*)tDesc->getTupleAddress();
tDesc = getNextTuppDesc(tDesc);
*diagsArea = (ComDiagsArea *)(tDesc->getTupleAddress());
}
break;
case STATSONLY_:
{
((up_state *)currQState)->status = ex_queue::Q_STATS;
}
break;
}
if ((isStats()) && (statsArea))
{
tDesc = getNextTuppDesc(tDesc);
*statsArea = (ExStatisticsArea *)(tDesc->getTupleAddress());
*numStatsBytes = sizeof(*tDesc) + tDesc->getAllocatedSize();
}
return FALSE;
}
// add a new tuple descriptor to the end of the buffer
tupp_descriptor *SqlBufferOlt::add_tuple_desc(Lng32 tup_data_size)
{
return NULL;
}
// remove the tuple desc with the highest number from the buffer
void SqlBufferOlt::remove_tuple_desc()
{
}
////////////////////////////////////////////////////////////////////////////
// class SqlBufferOltSmall
////////////////////////////////////////////////////////////////////////////
NABoolean SqlBufferOltSmall::moveOutSendData(tupp &outTupp,
ULng32 returnedRowLen)
{
if (sendData())
{
// this input value didn't come with a tupp descriptor.
// Allocate one now in this buffer.
tupp_descriptor * td =
(tupp_descriptor *)((char*)this+
sizeof(SqlBufferOltSmall)+
ROUND8(returnedRowLen));
td->init(returnedRowLen, NULL, (char *)this+sizeof(SqlBufferOltSmall));
outTupp = td;
}
return FALSE;
}
NABoolean SqlBufferOltSmall::moveOutReplyData(void * currQState,
tupp &outTupp,
ULng32 returnedRowLen,
ComDiagsArea ** diagsArea,
ExStatisticsArea ** statsArea,
Int64 * numStatsBytes)
{
Long replyDataLoc = 0;
Long statsAreaLoc = 0;
Long dataLoc = 0;
short statsAreaLen = 0;
if ((replyData()) || (replyStats()))
{
#pragma nowarn(1506) // warning elimination
dataLoc = (Long)this + sizeof(SqlBufferOltSmall);
#pragma warn(1506) // warning elimination
if (replyData())
{
replyDataLoc = dataLoc;
if (replyStats())
#pragma nowarn(1506) // warning elimination
dataLoc = dataLoc + ROUND2(returnedRowLen);
#pragma warn(1506) // warning elimination
else
#pragma nowarn(1506) // warning elimination
dataLoc = dataLoc + ROUND4(returnedRowLen);
#pragma warn(1506) // warning elimination
}
if (replyStats())
{
statsAreaLen = *(short*)dataLoc;
#pragma nowarn(1506) // warning elimination
statsAreaLoc = dataLoc + sizeof(short);
#pragma warn(1506) // warning elimination
// statsAreaLoc = ROUND8(statsAreaLoc);
dataLoc = statsAreaLoc + ROUND8(statsAreaLen);
}
}
if (replyData())
{
// this reply value didn't come with a tupp descriptor.
// Allocate one now in this buffer.
tupp_descriptor * td = (tupp_descriptor *)((char*)dataLoc);
td->init(returnedRowLen, NULL, (char *)replyDataLoc);
outTupp = td;
((up_state *)currQState)->status = ex_queue::Q_OK_MMORE;
}
else if (replyStatsOnly())
{
((up_state *)currQState)->status = ex_queue::Q_STATS;
}
else
{
((up_state *)currQState)->status = ex_queue::Q_NO_DATA;
}
if (replyRowAffected())
((up_state *)currQState)->matchNo = 1;
else
((up_state *)currQState)->matchNo = 0;
if (replyStats())
{
*statsArea = (ExStatisticsArea *)statsAreaLoc;
*numStatsBytes = sizeof(short) + statsAreaLen;
}
return FALSE;
}
#if (defined (NA_LINUX) && defined(_DEBUG) && !defined(__EID))
void sql_buffer_pool::logDefragInfo(char * txt,
Lng32 neededSpace,
Lng32 actNeededSpace,
Lng32 freeBuffSpace,
void *p,
Lng32 NumRowsInBuff,
ex_tcb * tcb)
{
char * envCifLoggingLocation= getenv("CIF_DEFRAG_LOG_LOC");
if (envCifLoggingLocation)
{
char file2 [255];
snprintf(file2,255,"%s%s",envCifLoggingLocation,"/cif_defrag_logging.log");
FILE * p2 = NULL;
p2 =fopen(file2, "a");
if (p2== NULL)
{
printf("Error in opening a file.."); // file2
}
//fprintf(p2,"%s\n","defragmentation");
time_t rawtime;
struct tm * timeinfo;
time ( &rawtime );
timeinfo = localtime ( &rawtime );
fprintf(p2,"%s %s explainId: %d buffer: %p Neededspace: %d AcualSpace: %d freeSapce: %d NumRowsInBuff: %d\n",
asctime (timeinfo),
tcb ? tcb->getTdb()->getNodeName() : txt,
tcb ? tcb->getTdb()->getExplainNodeId() : -1,
p,neededSpace,actNeededSpace,freeBuffSpace,NumRowsInBuff);
fclose(p2);
}
}
#endif
#endif //UDRSERV_BUILD