blob: 982d5c1a25b61049dfc1a0784b987952e86e8de2 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ex_stored_proc.cpp
* Description:
*
*
* Created: 3/15/1997
* Language: C++
*
*
*
*
*****************************************************************************
*/
#include "cli_stdh.h"
#include "ex_stdh.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ex_stored_proc.h"
#include "ex_exe_stmt_globals.h"
#include "ex_io_control.h"
#include "ex_expr.h"
#include "exp_tuple_desc.h"
#include "exp_attrs.h"
#include "exp_clause_derived.h"
#include "ExSqlComp.h"
#include "ErrorMessage.h"
#include "CmpStatement.h"
#include "CmpStoredProc.h"
ex_tcb * ExStoredProcTdb::build(ex_globals * glob)
{
ExStoredProcTcb * spTcb = new(glob->getSpace())
ExStoredProcTcb(*this, glob);
// add the stored_proc_tcb to the schedule
spTcb->registerSubtasks();
return (spTcb);
}
ExStoredProcTcb::ExStoredProcTcb(const ExStoredProcTdb & sp_tdb,
ex_globals * glob
)
: ex_tcb( sp_tdb, 1, glob)
{
Space * space = (glob ? glob->getSpace() : 0);
// Allocate the buffer pool
pool_ = new(space) sql_buffer_pool(sp_tdb.numBuffers_,
sp_tdb.bufferSize_,
space);
// Allocate the queue to communicate with parent
qparent_.down = new(space) ex_queue(ex_queue::DOWN_QUEUE,
sp_tdb.queueSizeDown_,
sp_tdb.criDescDown_,
space);
// Allocate the private state in each entry of the down queue
ExStoredProcPrivateState *p = new(space) ExStoredProcPrivateState(this);
qparent_.down->allocatePstate(p, this);
delete p;
qparent_.up = new(space) ex_queue(ex_queue::UP_QUEUE,
sp_tdb.queueSizeUp_,
sp_tdb.criDescUp_,
space);
inputBuffer_ = pool_->get_free_buffer(0);
returnedBuffer_ = NULL;
step_ = BUILD_INPUT_BUFFER_;
workAtp_ = allocateAtp(sp_tdb.workCriDesc_, space);
// create the arkcmp that will be used to process this
// SP request.
ExMasterStmtGlobals *master_glob = getGlobals()->castToExExeStmtGlobals()
->castToExMasterStmtGlobals();
ex_assert((master_glob != NULL), "Invalid reference in ExStoredProcTcb.");
// Get the arkcmp_ that contains the query cache
if (spTdb().getUseExistingArkcmp()) {
arkcmp_ = master_glob->getCliGlobals()->getArkcmp();
}
else {
arkcmp_ =
new(glob->getDefaultHeap()) ExSqlComp(0,
glob->getDefaultHeap(),
master_glob->getCliGlobals(),
this,
COM_VERS_COMPILER_VERSION,
NULL,
master_glob->getStatement()->
getContext()->getEnvironment());
}
ioSubtask_ = NULL;
numInputRequests_ = 0;
if (sp_tdb.inputExpr_)
(void) sp_tdb.inputExpr_->fixup(0, getExpressionMode(), this, space, glob->getDefaultHeap(), FALSE, glob);
if (sp_tdb.outputExpr_)
(void) sp_tdb.outputExpr_->fixup(0, getExpressionMode(), this, space, glob->getDefaultHeap(), FALSE, glob);
if (sp_tdb.predExpr_)
(void) sp_tdb.predExpr_->fixup(0, getExpressionMode(), this, space, glob->getDefaultHeap(), FALSE, glob);
}
ExStoredProcTcb::~ExStoredProcTcb()
{
delete qparent_.up;
delete qparent_.down;
if (!spTdb().getUseExistingArkcmp()) {
delete arkcmp_;
}
freeResources();
}
void ExStoredProcTcb::freeResources()
{
delete pool_;
pool_ = 0;
}
void ExStoredProcTcb::registerSubtasks()
{
ex_tcb::registerSubtasks();
// register a non-queue event for the IPC with the send top node
ioSubtask_ =
getGlobals()->getScheduler()->registerNonQueueSubtask(sWork,this);
}
short ExStoredProcTcb::work()
{
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
ExMasterStmtGlobals *master_glob = getGlobals()->castToExExeStmtGlobals()
->castToExMasterStmtGlobals();
// If there is no arkcmp, return
if (!arkcmp_ && spTdb().getUseExistingArkcmp()) {
// make sure we have space in the up queue
if (qparent_.up->isFull()) {
return WORK_OK;
}
queue_index pindex = qparent_.down->getHeadIndex();
ex_queue_entry *pentry_down = qparent_.down->getQueueEntry(pindex);
ex_queue_entry *up_entry = qparent_.up->getTailEntry();
up_entry->upState.status = ex_queue::Q_NO_DATA;
up_entry->upState.parentIndex = pentry_down->downState.parentIndex;
ExStoredProcPrivateState & pstate
= *((ExStoredProcPrivateState *) pentry_down->pstate);
up_entry->upState.setMatchNo(pstate.matchCount_);
// insert into parent
qparent_.up->insert();
pstate.matchCount_ = 0;
qparent_.down->removeHead();
numInputRequests_--;
return WORK_OK;
}
while (1)
{
switch (step_)
{
case BUILD_INPUT_BUFFER_:
{
queue_index pindex = qparent_.down->getHeadIndex();
NABoolean done = FALSE;
while (NOT done)
{
if (pindex == qparent_.down->getTailIndex())
done = TRUE;
else
{
ex_queue_entry *pentry_down =
qparent_.down->getQueueEntry(pindex);
tupp_descriptor * dataDesc;
if (inputBuffer_->
moveInSendOrReplyData(TRUE, // send(input) data
FALSE,
TRUE, // move data
&(pentry_down->downState),
sizeof(ControlInfo),
0,
spTdb().inputRowlen_,
&dataDesc,
0, NULL) == SqlBuffer::BUFFER_FULL)
{
// no more space in input Buffer.
done = TRUE;
}
else
{
numInputRequests_++;
// move input data
if (spTdb().inputExpr_)
{
workAtp_->getTupp(spTdb().workAtpIndex_)
= dataDesc;
spTdb().inputExpr_->eval(pentry_down->getAtp(), workAtp_);
workAtp_->getTupp(spTdb().workAtpIndex_).release();
}
// move to the next down entry
pindex++;
} // move input data
} // more entries in down queue
} // while not done
//CmpCommon::getDefault(PROCESS_ISP_LOCALLY) == DF_ON
if (spTdb().isExecuteInLocalProcess())
step_ = PROCESS_REQUEST_;
else
// Ship the request to ARKCMP.
step_ = SEND_INPUT_BUFFER_;
}
break;
case PROCESS_REQUEST_:
{
//construct request
ULng32 sz = inputBuffer_->get_buffer_size();
inputBuffer_->drivePack();
inputBuffer_->setBufferStatus(SqlBuffer::IN_USE);
// The request is dynamically allocated here instead of using stack variable,
// because it is owned by a CmpStatementISP inside compileDirect,
// and will be deleted in destructor of CmpStatementISP.
// refer to ExCmpMessage::actOnReceive()
CmpMessageISPRequest* ispRequest = new (getHeap()) CmpMessageISPRequest((char *)spTdb().spName_,
spTdb().extractInputExpr_,
spTdb().extractInputExpr_->getLength(),
spTdb().moveOutputExpr_,
spTdb().moveOutputExpr_->getLength(),
0, 0,
(void *)inputBuffer_, sz,
spTdb().outputRowlen_, sz,
getHeap(),
master_glob->getStatement()->getUniqueStmtId(),
master_glob->getStatement()->getUniqueStmtIdLen());
//save requestId in case there may be getNext
requestId_ = ispRequest->id();
//allocate returnedBuffer
returnedBuffer_ = pool_->get_free_buffer(0);
if (returnedBuffer_ == NULL)
return WORK_POOL_BLOCKED;
ULng32 returnedBuflen = returnedBuffer_->get_buffer_size();
UInt32 dummyDatalen = 0; //not used here
Int32 dummyCharSet = 0; //not used here
Int32 cpStatus;
ComDiagsArea *cpDiagsArea = ComDiagsArea::allocate(getHeap());
//pass request and returnedBuffer_ to compileDirect
cpStatus = CmpCommon::context()->compileDirect(
(char*)ispRequest, dummyDatalen,
master_glob->getStatement()->getContext()->exHeap(),
dummyCharSet,
CmpMessageObj::INTERNALSP_REQUEST,
(char* &)returnedBuffer_, returnedBuflen,
master_glob->getStatement()->getContext()->getSqlParserFlags(),
NULL, 0, cpDiagsArea);
if(cpStatus != 0)//FAILURE
{
//replied length exceeds the return buffer length, this will abort an dump the core
if(returnedBuflen > returnedBuffer_->get_buffer_size())
ex_assert(FALSE, "Replied length exceeds receiving buffer length.");
if(cpDiagsArea->mainSQLCODE() != 0) {
ComDiagsArea *mainDiags = getGlobals()->castToExExeStmtGlobals()->getDiagsArea();
if (mainDiags == NULL) {
mainDiags = ComDiagsArea::allocate(getGlobals()->getDefaultHeap());
getGlobals()->castToExExeStmtGlobals()->setGlobDiagsArea(mainDiags);
mainDiags->decrRefCount();
}
mainDiags->mergeAfter(*cpDiagsArea);
}
cpDiagsArea->decrRefCount();
return WORK_BAD_ERROR;
}
cpDiagsArea->decrRefCount();
returnedBuffer_->driveUnpack();
//on success, returned rows are copied to returnedBuffer_ in compileDirect
step_ = RETURN_ROWS_;
}
break;
case PROCESS_GETNEXT_:
{
//getNext request can be allocated in stack, because it's not own by CmpStatementISP
//refer to ExCmpMessage::actOnReceive()
CmpMessageISPGetNext ispRequestGetNext(returnedBuffer_->get_buffer_size(), requestId_, 0, 0);
returnedBuffer_ = pool_->get_free_buffer(0);
if (returnedBuffer_ == NULL)
return WORK_POOL_BLOCKED;
ULng32 returnedBuflen = returnedBuffer_->get_buffer_size();
UInt32 dummyDatalen = 0; //not used here
Int32 dummyCharSet = 0; //not used here
Int32 cpStatus;
ComDiagsArea *cpDiagsArea = ComDiagsArea::allocate(getHeap());
cpStatus = CmpCommon::context()->compileDirect(
(char*)&ispRequestGetNext, dummyDatalen,
master_glob->getStatement()->getContext()->exHeap(),
dummyCharSet,
CmpMessageObj::INTERNALSP_GETNEXT,
(char* &)returnedBuffer_, returnedBuflen,
master_glob->getStatement()->getContext()->getSqlParserFlags(),
NULL, 0, cpDiagsArea);
if(cpStatus != 0)//FAILURE
{
//replied length exceeds the return buffer length, this will abort an dump the core
if(returnedBuflen > returnedBuffer_->get_buffer_size())
ex_assert(FALSE, "Replied length exceeds receiving buffer length.");
if(cpDiagsArea->mainSQLCODE() != 0) {
ComDiagsArea *mainDiags = getGlobals()->castToExExeStmtGlobals()->getDiagsArea();
if (mainDiags == NULL) {
mainDiags = ComDiagsArea::allocate(getGlobals()->getDefaultHeap());
getGlobals()->castToExExeStmtGlobals()->setGlobDiagsArea(mainDiags);
mainDiags->decrRefCount();
}
mainDiags->mergeAfter(*cpDiagsArea);
}
cpDiagsArea->decrRefCount();
return WORK_BAD_ERROR;
}
cpDiagsArea->decrRefCount();
returnedBuffer_->driveUnpack();
//on success, returned rows are copied to returnedBuffer_ in compileDirect
step_ = RETURN_ROWS_;
}
break;
case SEND_INPUT_BUFFER_:
{
ULng32 sz = inputBuffer_->get_buffer_size();
inputBuffer_->drivePack();
inputBuffer_->setBufferStatus(SqlBuffer::IN_USE);
// Send request to arkcomp and wait for reply.
if (arkcmp_->sendRequest(
spTdb().spName_,
spTdb().extractInputExpr_,
spTdb().extractInputExpr_->getLength(),
spTdb().moveOutputExpr_,
spTdb().moveOutputExpr_->getLength(),
0, 0,
(void *)inputBuffer_,
sz,
spTdb().outputRowlen_,
sz,
spTdb().getUseExistingArkcmp(),
/* If true, then use a waited call.
Use no-waited otherwise*/
&requestId_,
master_glob->getStatement()->getUniqueStmtId(),
master_glob->getStatement()->getUniqueStmtIdLen()) !=
ExSqlComp::SUCCESS)
{
if (arkcmp_->breakReceived())
{
// Break signal was received. Return as quickly as possible
// to the scheduler, after setting a flag in the CLI globals
// that is used by mxci to determine whether a message
// should be returned to indicate that RECOVER needs to
// be run.
master_glob->getCliGlobals()->setSPBreakReceived(TRUE);
return WORK_BAD_ERROR;
}
else
master_glob->getCliGlobals()->setSPBreakReceived(FALSE);
if (arkcmp_->badConnection())
{
// Error reporting in the GET_REPLY_BUFFER_ step
// will be used, since the error is in the IPC
// connection.
}
else
{
// Get error from the arkcmp_.
ex_assert(
arkcmp_->getDiags(),
"ExSqlComp::getReply returns error, but no diagnostics");
ComDiagsArea *mainDiags = getGlobals()->
castToExExeStmtGlobals()->getDiagsArea();
if (mainDiags == NULL)
{
mainDiags = ComDiagsArea::allocate(
getGlobals()->getDefaultHeap());
getGlobals()->castToExExeStmtGlobals()->
setGlobDiagsArea(mainDiags);
mainDiags->decrRefCount();
}
mainDiags->mergeAfter(*arkcmp_->getDiags());
return WORK_BAD_ERROR;
}
} // end if !SUCCESS
step_ = GET_REPLY_BUFFER_;
}
break;
case GET_REPLY_BUFFER_:
{
if (arkcmp_->breakReceived())
{
// Break signal was received. Return as quickly as possible
// to the scheduler, after setting a flag in the CLI globals
// that is used by mxci to determine whether a message
// should be returned to indicate that RECOVER needs to
// be run.
master_glob->getCliGlobals()->setSPBreakReceived(TRUE);
return WORK_BAD_ERROR;
}
else
master_glob->getCliGlobals()->setSPBreakReceived(FALSE);
if (arkcmp_->badConnection())
{
ComDiagsArea* da = getGlobals()->
castToExExeStmtGlobals()->getDiagsArea();
NABoolean alreadyHadGlobalDA = da ? TRUE : FALSE;
arkcmp_->getServer()->getControlConnection()->
populateDiagsArea( da, getHeap());
if (alreadyHadGlobalDA)
{
// Preceding populateDiagsArea call has
// added a ComCondition to existing DA, so
// there's nothing more to do.
}
else
{
// Preceding populateDiagsArea call has
// created a DiagsArea, added a ComCondition,
// and side-effected da so make this da
// the global DA.
getGlobals()->castToExExeStmtGlobals()->
setGlobDiagsArea(da);
da->decrRefCount();
}
return WORK_BAD_ERROR;
}
if (arkcmp_->status() != ExSqlComp::FINISHED)
{
if (spTdb().getUseExistingArkcmp()) {
return WORK_CALL_AGAIN;
}
return WORK_OK; // come back later.
}
// find an empty buffer where data will be returned.
returnedBuffer_ = pool_->get_free_buffer(0);
if (returnedBuffer_ == NULL)
return WORK_POOL_BLOCKED;
ULng32 returnedBuflen_ = returnedBuffer_->get_buffer_size();
char * rb = (char *)returnedBuffer_;
ExSqlComp::ReturnStatus status =
arkcmp_->getReply(rb,
returnedBuflen_,
returnedBuflen_,
requestId_);
if (status == ExSqlComp::MOREDATA)
{
// issue a getNext so arkcmp_ can prepare for
// the next buffer
// waited if sharing arkcmp (last argument)
status = arkcmp_->getNext(returnedBuflen_, requestId_, spTdb().getUseExistingArkcmp());
}
if (status == ExSqlComp::ERROR)
{
ex_assert(
arkcmp_->getDiags(),
"ExSqlComp::getReply returns error, but no diagnostics");
ComDiagsArea *mainDiags = getGlobals()->
castToExExeStmtGlobals()->getDiagsArea();
if (mainDiags == NULL)
{
mainDiags = ComDiagsArea::allocate(
getGlobals()->getDefaultHeap());
getGlobals()->castToExExeStmtGlobals()->
setGlobDiagsArea(mainDiags);
mainDiags->decrRefCount();
}
mainDiags->mergeAfter(*arkcmp_->getDiags());
return WORK_BAD_ERROR;
}
returnedBuffer_->driveUnpack();
step_ = RETURN_ROWS_;
}
break;
case RETURN_ROWS_:
{
// make sure we have space in the up queue
if(qparent_.up->isFull())
return WORK_OK;
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
tupp p;
ComDiagsArea* diags;
if (returnedBuffer_->moveOutSendOrReplyData(FALSE, // reply data
&(up_entry->upState),
p, NULL, &diags)
== TRUE) // no more rows in this buffer
{
// when we get a buffer from arkcmp, all tupp_descriptors in the
// buffer have a refcount of 0. To make sure that the buffer is not
// being reused it's status was IN_USE. Now we have put all
// the tupps in the buffer into the up queue, i.e., they have a
// refcount > 0.
// Mark the buffer as being FULL.
// It will be reused later after all tupp_descriptors have been
// consumed and reference count of all have become 0.
returnedBuffer_->bufferFull();
if (numInputRequests_ > 0)
{
// still more input requests. Get the
// next buffer from arkcmp.
if (spTdb().isExecuteInLocalProcess())
step_ = PROCESS_GETNEXT_;
else
step_ = GET_REPLY_BUFFER_;
}
else
{
// done with all the inputs.
inputBuffer_->init(inputBuffer_->get_buffer_size());
step_ = BUILD_INPUT_BUFFER_;
}
return WORK_CALL_AGAIN;
}
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ExStoredProcPrivateState & pstate
= *((ExStoredProcPrivateState *) pentry_down->pstate);
ex_queue::up_status status = up_entry->upState.status;
switch(status)
{
case ex_queue::Q_OK_MMORE:
{
if (pstate.errorHappened_)
{
// just ignore this row -- we've already returned an error.
}
else
{
// Copy the pointers to the input data
up_entry->copyAtp(pentry_down);
// set the new pointer
up_entry->getTupp(spTdb().criDescUp_->noTuples()-1) =p;
// Is there a selection predicate?
if (spTdb().predExpr_) {
ex_expr::exp_return_type retCode =
(spTdb().predExpr_->eval(up_entry->getAtp(), 0));
if (retCode == ex_expr::EXPR_FALSE) {
break;
}
if (retCode == ex_expr::EXPR_ERROR) {
up_entry->upState.status = ex_queue::Q_SQLERROR;
up_entry->upState.parentIndex = pentry_down->downState.parentIndex;
up_entry->upState.downIndex = qparent_.down->getHeadIndex();
up_entry->upState.setMatchNo(pstate.matchCount_);
qparent_.up->insert();
pstate.errorHappened_ = TRUE;
break;
}
}
up_entry->upState.parentIndex = pentry_down->downState.parentIndex;
up_entry->upState.downIndex = qparent_.down->getHeadIndex();
pstate.matchCount_++;
up_entry->upState.setMatchNo(pstate.matchCount_);
qparent_.up->insert();
}
}
break;
case ex_queue::Q_NO_DATA:
{
if (diags)
{
ComDiagsArea * da =
ComDiagsArea::allocate(
getGlobals()->getDefaultHeap());
da->unpackObj(diags->getType(),
diags->getVersion(),
TRUE, // sameEndianness,
0, // dummy for now...
(IpcConstMessageBufferPtr) diags);
up_entry->setDiagsArea(da);
}
// done with this input request.
up_entry->upState.status = ex_queue::Q_NO_DATA;
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(pstate.matchCount_);
// insert into parent
qparent_.up->insert();
pstate.matchCount_ = 0;
pstate.errorHappened_ = FALSE;
qparent_.down->removeHead();
numInputRequests_--;
}
break;
case ex_queue::Q_SQLERROR:
{
if (pstate.errorHappened_)
{
// just ignore this row -- we've already returned an error.
}
else
{
ComDiagsArea * da =
ComDiagsArea::allocate(
getGlobals()->getDefaultHeap());
da->unpackObj(diags->getType(),
diags->getVersion(),
TRUE, // sameEndianness,
0, // dummy for now...
(IpcConstMessageBufferPtr) diags);
up_entry->setDiagsArea(da);
// insert into parent
qparent_.up->insert();
}
}
break;
case ex_queue::Q_INVALID:
ex_assert(0,
"ExStoredProcTcb::work() Invalid state returned by arkcmp");
break;
} // switch status
}
break;
} // switch step_
} // while
return WORK_OK;
}
/////////////////////////////////////////////////////////////////////////////
// Constructor and destructor for private state.
/////////////////////////////////////////////////////////////////////////////
ExStoredProcPrivateState::ExStoredProcPrivateState(const ExStoredProcTcb *
/*tcb*/)
{
matchCount_ = 0;
errorHappened_ = FALSE;
}
ExStoredProcPrivateState::~ExStoredProcPrivateState()
{
};
ex_tcb_private_state * ExStoredProcPrivateState::allocate_new(const ex_tcb *tcb)
{
return new(((ex_tcb *)tcb)->getSpace()) ExStoredProcPrivateState((ExStoredProcTcb *) tcb);
};
/////////////////////////////////////////////////
// class ExSPInputOutput
/////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////
// Moves data value from fieldNum'th position in inputRow to 'data.
// fieldNum is zero based, that is, the first field is number 0.
// Returns -1 in case of error. diagsArea contains the error number
// in this case (well, almost always).
////////////////////////////////////////////////////////////////////
short ExSPInputOutput::inputValue(ULng32 fieldNum, char * inputRow,
char * data, ULng32 datalen, NABoolean casting,
ComDiagsArea * diagsArea)
{
Attributes * attr = tupleDesc_->getAttr((short)fieldNum);
short varcharlen = 2;
// diagsArea must be present. And fieldNum must be within range.
if ((diagsArea == NULL) ||
(fieldNum < 0) ||
(fieldNum >= tupleDesc_->numAttrs()))
{
// return error
return -1;
}
if (casting == FALSE)
convDoIt(&inputRow[attr->getOffset()],
attr->getLength(&inputRow[attr->getVCLenIndOffset()]),
attr->getDatatype(),
attr->getPrecision(),
attr->getScale(),
// Pass in pointer to target data.
// if varchar, then data points to varchar length and
// 'data + varcharIndLen' points to the real data.
&data[((attr->getVCIndicatorLength() <= 0) ? 0 :
attr->getVCIndicatorLength())],
attr->getLength(),
attr->getDatatype(),
attr->getPrecision(),
attr->getScale(),
(attr->getVCIndicatorLength() > 0 ? data : NULL),
attr->getVCIndicatorLength(),
NULL, /*CollHeap *heap,*/
&diagsArea,
CONV_UNKNOWN_LEFTPAD);
else
{
// TBD:
// if (datalen < (displayLength(attr) +
// ((attr->getVCIndicatorLength() > 0) ?
// attr->getVCIndicatorLength() : 0))
// ; // return error.
// target has to be formatted as a varchar. The first two bytes
// of target will contain the actual target length, followed
// by data bytes.
convDoIt(&inputRow[attr->getOffset()],
attr->getLength(&inputRow[attr->getVCLenIndOffset()]),
attr->getDatatype(),
attr->getPrecision(),
attr->getScale(),
&data[varcharlen],
datalen - varcharlen,
REC_BYTE_V_ASCII,
0,
0,
data,
varcharlen,
NULL, /*CollHeap *heap,*/
&diagsArea,
CONV_UNKNOWN);
}
return 0;
}
////////////////////////////////////////////////////////////////////
// Moves data value from 'data' to fieldNum'th position in
// outputRow.
// fieldNum is zero based, that is, the first field is number 0.
// Returns -1 in case of error. diagsArea contains the error number
// in this case (well, almost always).
////////////////////////////////////////////////////////////////////
short ExSPInputOutput::outputValue(ULng32 fieldNum,
char * outputRow,
char * data,
ULng32 datalen,
NABoolean casting,
CollHeap * heap,
ComDiagsArea * diagsArea)
{
Attributes * attr = tupleDesc_->getAttr((short)fieldNum);
ex_expr::exp_return_type status = ex_expr::EXPR_OK;
short nullIndicatorLen = attr->getNullIndicatorLength(),
varcharIndLen = attr->getVCIndicatorLength(),
scale = attr->getScale(),
dataType = attr->getDatatype();
Lng32 nullIndOffset = attr->getNullIndOffset(),
varcharIndOffset = attr->getVCLenIndOffset(),
precision = attr->getPrecision();
// diagsArea must be present. And fieldNum must be within range.
if ((diagsArea == NULL) ||
(fieldNum < 0) ||
(fieldNum >= tupleDesc_->numAttrs()))
{
// return error
ExRaiseSqlError(heap, &diagsArea, EXE_FIELD_NUM_OVERFLOW);
return ex_expr::EXPR_ERROR;
}
if (data == NULL)
{
// a null value has been passed in. Move in a null value.
if (attr->getNullFlag())
{
// make the result null.
str_pad(&outputRow[nullIndOffset],
nullIndicatorLen,
'\377');
return status;
}
else
{
// return error
ExRaiseSqlError(heap, &diagsArea, EXE_ASSIGNING_NULL_TO_NOT_NULL);
return ex_expr::EXPR_ERROR;
}
} // null value
if (casting == FALSE)
status = convDoIt(
// Pass in pointer to real data.
// if varchar, then data points to varchar length and
// 'data + varcharIndLen' points to the real data.
&data[((varcharIndLen <= 0) ? 0 : varcharIndLen)],
attr->getLength(data),
dataType,
precision,
scale,
&outputRow[attr->getOffset()],
attr->getLength(),
dataType,
precision,
scale,
&outputRow[varcharIndOffset],
varcharIndLen,
heap,
&diagsArea,
CONV_UNKNOWN_LEFTPAD);
else
status = convDoIt(&data[0],
datalen,
REC_BYTE_F_ASCII,
precision,
scale,
&outputRow[attr->getOffset()],
attr->getLength(),
dataType,
precision,
scale,
(varcharIndLen > 0 ?
&outputRow[varcharIndOffset] : NULL),
varcharIndLen,
heap,
&diagsArea,
(getCaseIndexArray() ? getCaseIndexArray()[fieldNum]
: CONV_UNKNOWN_LEFTPAD));
if ((status == ex_expr::EXPR_OK) && attr->getNullFlag())
{
// make the result not null.
str_pad(&outputRow[nullIndOffset],
nullIndicatorLen,
'\0');
}
return status;
}
// Moved to cli/StoredProcInterface.cpp