| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| /* -*-C++-*- |
| ***************************************************************************** |
| * |
| * File: ex_stored_proc.cpp |
| * Description: |
| * |
| * |
| * Created: 3/15/1997 |
| * Language: C++ |
| * |
| * |
| * |
| * |
| ***************************************************************************** |
| */ |
| |
| #include "cli_stdh.h" |
| #include "ex_stdh.h" |
| #include "ComTdb.h" |
| #include "ex_tcb.h" |
| #include "ex_stored_proc.h" |
| #include "ex_exe_stmt_globals.h" |
| #include "ex_io_control.h" |
| #include "ex_expr.h" |
| #include "exp_tuple_desc.h" |
| #include "exp_attrs.h" |
| #include "exp_clause_derived.h" |
| #include "ExSqlComp.h" |
| |
| #include "ErrorMessage.h" |
| #include "CmpStatement.h" |
| #include "CmpStoredProc.h" |
| |
| ex_tcb * ExStoredProcTdb::build(ex_globals * glob) |
| { |
| ExStoredProcTcb * spTcb = new(glob->getSpace()) |
| ExStoredProcTcb(*this, glob); |
| |
| // add the stored_proc_tcb to the schedule |
| spTcb->registerSubtasks(); |
| |
| return (spTcb); |
| } |
| |
| |
| ExStoredProcTcb::ExStoredProcTcb(const ExStoredProcTdb & sp_tdb, |
| ex_globals * glob |
| ) |
| : ex_tcb( sp_tdb, 1, glob) |
| { |
| Space * space = (glob ? glob->getSpace() : 0); |
| |
| // Allocate the buffer pool |
| pool_ = new(space) sql_buffer_pool(sp_tdb.numBuffers_, |
| sp_tdb.bufferSize_, |
| space); |
| |
| // Allocate the queue to communicate with parent |
| qparent_.down = new(space) ex_queue(ex_queue::DOWN_QUEUE, |
| sp_tdb.queueSizeDown_, |
| sp_tdb.criDescDown_, |
| space); |
| |
| // Allocate the private state in each entry of the down queue |
| ExStoredProcPrivateState *p = new(space) ExStoredProcPrivateState(this); |
| qparent_.down->allocatePstate(p, this); |
| delete p; |
| |
| |
| qparent_.up = new(space) ex_queue(ex_queue::UP_QUEUE, |
| sp_tdb.queueSizeUp_, |
| sp_tdb.criDescUp_, |
| space); |
| |
| inputBuffer_ = pool_->get_free_buffer(0); |
| returnedBuffer_ = NULL; |
| |
| step_ = BUILD_INPUT_BUFFER_; |
| |
| workAtp_ = allocateAtp(sp_tdb.workCriDesc_, space); |
| |
| // create the arkcmp that will be used to process this |
| // SP request. |
| ExMasterStmtGlobals *master_glob = getGlobals()->castToExExeStmtGlobals() |
| ->castToExMasterStmtGlobals(); |
| ex_assert((master_glob != NULL), "Invalid reference in ExStoredProcTcb."); |
| |
| // Get the arkcmp_ that contains the query cache |
| if (spTdb().getUseExistingArkcmp()) { |
| arkcmp_ = master_glob->getCliGlobals()->getArkcmp(); |
| } |
| else { |
| arkcmp_ = |
| new(glob->getDefaultHeap()) ExSqlComp(0, |
| glob->getDefaultHeap(), |
| master_glob->getCliGlobals(), |
| this, |
| COM_VERS_COMPILER_VERSION, |
| NULL, |
| master_glob->getStatement()-> |
| getContext()->getEnvironment()); |
| } |
| |
| ioSubtask_ = NULL; |
| numInputRequests_ = 0; |
| |
| if (sp_tdb.inputExpr_) |
| (void) sp_tdb.inputExpr_->fixup(0, getExpressionMode(), this, space, glob->getDefaultHeap(), FALSE, glob); |
| |
| if (sp_tdb.outputExpr_) |
| (void) sp_tdb.outputExpr_->fixup(0, getExpressionMode(), this, space, glob->getDefaultHeap(), FALSE, glob); |
| |
| if (sp_tdb.predExpr_) |
| (void) sp_tdb.predExpr_->fixup(0, getExpressionMode(), this, space, glob->getDefaultHeap(), FALSE, glob); |
| |
| } |
| |
| ExStoredProcTcb::~ExStoredProcTcb() |
| { |
| delete qparent_.up; |
| delete qparent_.down; |
| |
| if (!spTdb().getUseExistingArkcmp()) { |
| delete arkcmp_; |
| } |
| |
| freeResources(); |
| } |
| |
| void ExStoredProcTcb::freeResources() |
| { |
| delete pool_; |
| pool_ = 0; |
| } |
| |
| void ExStoredProcTcb::registerSubtasks() |
| { |
| ex_tcb::registerSubtasks(); |
| |
| // register a non-queue event for the IPC with the send top node |
| ioSubtask_ = |
| getGlobals()->getScheduler()->registerNonQueueSubtask(sWork,this); |
| } |
| |
| short ExStoredProcTcb::work() |
| { |
| // if no parent request, return |
| if (qparent_.down->isEmpty()) |
| return WORK_OK; |
| |
| ExMasterStmtGlobals *master_glob = getGlobals()->castToExExeStmtGlobals() |
| ->castToExMasterStmtGlobals(); |
| |
| // If there is no arkcmp, return |
| if (!arkcmp_ && spTdb().getUseExistingArkcmp()) { |
| |
| // make sure we have space in the up queue |
| if (qparent_.up->isFull()) { |
| return WORK_OK; |
| } |
| |
| queue_index pindex = qparent_.down->getHeadIndex(); |
| ex_queue_entry *pentry_down = qparent_.down->getQueueEntry(pindex); |
| ex_queue_entry *up_entry = qparent_.up->getTailEntry(); |
| up_entry->upState.status = ex_queue::Q_NO_DATA; |
| up_entry->upState.parentIndex = pentry_down->downState.parentIndex; |
| ExStoredProcPrivateState & pstate |
| = *((ExStoredProcPrivateState *) pentry_down->pstate); |
| up_entry->upState.setMatchNo(pstate.matchCount_); |
| |
| // insert into parent |
| qparent_.up->insert(); |
| |
| pstate.matchCount_ = 0; |
| qparent_.down->removeHead(); |
| numInputRequests_--; |
| |
| return WORK_OK; |
| } |
| |
| while (1) |
| { |
| switch (step_) |
| { |
| case BUILD_INPUT_BUFFER_: |
| { |
| queue_index pindex = qparent_.down->getHeadIndex(); |
| NABoolean done = FALSE; |
| |
| while (NOT done) |
| { |
| if (pindex == qparent_.down->getTailIndex()) |
| done = TRUE; |
| else |
| { |
| ex_queue_entry *pentry_down = |
| qparent_.down->getQueueEntry(pindex); |
| |
| tupp_descriptor * dataDesc; |
| |
| if (inputBuffer_-> |
| moveInSendOrReplyData(TRUE, // send(input) data |
| FALSE, |
| TRUE, // move data |
| &(pentry_down->downState), |
| sizeof(ControlInfo), |
| 0, |
| spTdb().inputRowlen_, |
| &dataDesc, |
| 0, NULL) == SqlBuffer::BUFFER_FULL) |
| { |
| // no more space in input Buffer. |
| done = TRUE; |
| } |
| else |
| { |
| numInputRequests_++; |
| |
| // move input data |
| if (spTdb().inputExpr_) |
| { |
| workAtp_->getTupp(spTdb().workAtpIndex_) |
| = dataDesc; |
| spTdb().inputExpr_->eval(pentry_down->getAtp(), workAtp_); |
| workAtp_->getTupp(spTdb().workAtpIndex_).release(); |
| } |
| |
| // move to the next down entry |
| pindex++; |
| |
| } // move input data |
| } // more entries in down queue |
| } // while not done |
| //CmpCommon::getDefault(PROCESS_ISP_LOCALLY) == DF_ON |
| if (spTdb().isExecuteInLocalProcess()) |
| step_ = PROCESS_REQUEST_; |
| else |
| // Ship the request to ARKCMP. |
| step_ = SEND_INPUT_BUFFER_; |
| } |
| break; |
| case PROCESS_REQUEST_: |
| { |
| //construct request |
| ULng32 sz = inputBuffer_->get_buffer_size(); |
| inputBuffer_->drivePack(); |
| inputBuffer_->setBufferStatus(SqlBuffer::IN_USE); |
| // The request is dynamically allocated here instead of using stack variable, |
| // because it is owned by a CmpStatementISP inside compileDirect, |
| // and will be deleted in destructor of CmpStatementISP. |
| // refer to ExCmpMessage::actOnReceive() |
| CmpMessageISPRequest* ispRequest = new (getHeap()) CmpMessageISPRequest((char *)spTdb().spName_, |
| spTdb().extractInputExpr_, |
| spTdb().extractInputExpr_->getLength(), |
| spTdb().moveOutputExpr_, |
| spTdb().moveOutputExpr_->getLength(), |
| 0, 0, |
| (void *)inputBuffer_, sz, |
| spTdb().outputRowlen_, sz, |
| getHeap(), |
| master_glob->getStatement()->getUniqueStmtId(), |
| master_glob->getStatement()->getUniqueStmtIdLen()); |
| //save requestId in case there may be getNext |
| requestId_ = ispRequest->id(); |
| //allocate returnedBuffer |
| returnedBuffer_ = pool_->get_free_buffer(0); |
| if (returnedBuffer_ == NULL) |
| return WORK_POOL_BLOCKED; |
| ULng32 returnedBuflen = returnedBuffer_->get_buffer_size(); |
| UInt32 dummyDatalen = 0; //not used here |
| Int32 dummyCharSet = 0; //not used here |
| Int32 cpStatus; |
| ComDiagsArea *cpDiagsArea = ComDiagsArea::allocate(getHeap()); |
| //pass request and returnedBuffer_ to compileDirect |
| cpStatus = CmpCommon::context()->compileDirect( |
| (char*)ispRequest, dummyDatalen, |
| master_glob->getStatement()->getContext()->exHeap(), |
| dummyCharSet, |
| CmpMessageObj::INTERNALSP_REQUEST, |
| (char* &)returnedBuffer_, returnedBuflen, |
| master_glob->getStatement()->getContext()->getSqlParserFlags(), |
| |
| NULL, 0, cpDiagsArea); |
| if(cpStatus != 0)//FAILURE |
| { |
| //replied length exceeds the return buffer length, this will abort an dump the core |
| if(returnedBuflen > returnedBuffer_->get_buffer_size()) |
| ex_assert(FALSE, "Replied length exceeds receiving buffer length."); |
| |
| if(cpDiagsArea->mainSQLCODE() != 0) { |
| ComDiagsArea *mainDiags = getGlobals()->castToExExeStmtGlobals()->getDiagsArea(); |
| if (mainDiags == NULL) { |
| mainDiags = ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| getGlobals()->castToExExeStmtGlobals()->setGlobDiagsArea(mainDiags); |
| mainDiags->decrRefCount(); |
| } |
| mainDiags->mergeAfter(*cpDiagsArea); |
| } |
| cpDiagsArea->decrRefCount(); |
| return WORK_BAD_ERROR; |
| } |
| cpDiagsArea->decrRefCount(); |
| returnedBuffer_->driveUnpack(); |
| //on success, returned rows are copied to returnedBuffer_ in compileDirect |
| step_ = RETURN_ROWS_; |
| } |
| break; |
| case PROCESS_GETNEXT_: |
| { |
| //getNext request can be allocated in stack, because it's not own by CmpStatementISP |
| //refer to ExCmpMessage::actOnReceive() |
| CmpMessageISPGetNext ispRequestGetNext(returnedBuffer_->get_buffer_size(), requestId_, 0, 0); |
| returnedBuffer_ = pool_->get_free_buffer(0); |
| if (returnedBuffer_ == NULL) |
| return WORK_POOL_BLOCKED; |
| ULng32 returnedBuflen = returnedBuffer_->get_buffer_size(); |
| UInt32 dummyDatalen = 0; //not used here |
| Int32 dummyCharSet = 0; //not used here |
| Int32 cpStatus; |
| ComDiagsArea *cpDiagsArea = ComDiagsArea::allocate(getHeap()); |
| cpStatus = CmpCommon::context()->compileDirect( |
| (char*)&ispRequestGetNext, dummyDatalen, |
| master_glob->getStatement()->getContext()->exHeap(), |
| dummyCharSet, |
| CmpMessageObj::INTERNALSP_GETNEXT, |
| (char* &)returnedBuffer_, returnedBuflen, |
| master_glob->getStatement()->getContext()->getSqlParserFlags(), |
| NULL, 0, cpDiagsArea); |
| |
| if(cpStatus != 0)//FAILURE |
| { |
| //replied length exceeds the return buffer length, this will abort an dump the core |
| if(returnedBuflen > returnedBuffer_->get_buffer_size()) |
| ex_assert(FALSE, "Replied length exceeds receiving buffer length."); |
| |
| if(cpDiagsArea->mainSQLCODE() != 0) { |
| ComDiagsArea *mainDiags = getGlobals()->castToExExeStmtGlobals()->getDiagsArea(); |
| if (mainDiags == NULL) { |
| mainDiags = ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| getGlobals()->castToExExeStmtGlobals()->setGlobDiagsArea(mainDiags); |
| mainDiags->decrRefCount(); |
| } |
| mainDiags->mergeAfter(*cpDiagsArea); |
| } |
| cpDiagsArea->decrRefCount(); |
| return WORK_BAD_ERROR; |
| } |
| |
| cpDiagsArea->decrRefCount(); |
| returnedBuffer_->driveUnpack(); |
| //on success, returned rows are copied to returnedBuffer_ in compileDirect |
| step_ = RETURN_ROWS_; |
| } |
| break; |
| |
| case SEND_INPUT_BUFFER_: |
| { |
| ULng32 sz = inputBuffer_->get_buffer_size(); |
| inputBuffer_->drivePack(); |
| inputBuffer_->setBufferStatus(SqlBuffer::IN_USE); |
| // Send request to arkcomp and wait for reply. |
| if (arkcmp_->sendRequest( |
| spTdb().spName_, |
| spTdb().extractInputExpr_, |
| spTdb().extractInputExpr_->getLength(), |
| spTdb().moveOutputExpr_, |
| spTdb().moveOutputExpr_->getLength(), |
| 0, 0, |
| (void *)inputBuffer_, |
| sz, |
| spTdb().outputRowlen_, |
| sz, |
| spTdb().getUseExistingArkcmp(), |
| /* If true, then use a waited call. |
| Use no-waited otherwise*/ |
| &requestId_, |
| master_glob->getStatement()->getUniqueStmtId(), |
| master_glob->getStatement()->getUniqueStmtIdLen()) != |
| ExSqlComp::SUCCESS) |
| { |
| |
| if (arkcmp_->breakReceived()) |
| { |
| // Break signal was received. Return as quickly as possible |
| // to the scheduler, after setting a flag in the CLI globals |
| // that is used by mxci to determine whether a message |
| // should be returned to indicate that RECOVER needs to |
| // be run. |
| master_glob->getCliGlobals()->setSPBreakReceived(TRUE); |
| return WORK_BAD_ERROR; |
| } |
| else |
| master_glob->getCliGlobals()->setSPBreakReceived(FALSE); |
| |
| if (arkcmp_->badConnection()) |
| { |
| // Error reporting in the GET_REPLY_BUFFER_ step |
| // will be used, since the error is in the IPC |
| // connection. |
| } |
| else |
| { |
| // Get error from the arkcmp_. |
| ex_assert( |
| arkcmp_->getDiags(), |
| "ExSqlComp::getReply returns error, but no diagnostics"); |
| |
| ComDiagsArea *mainDiags = getGlobals()-> |
| castToExExeStmtGlobals()->getDiagsArea(); |
| |
| if (mainDiags == NULL) |
| { |
| mainDiags = ComDiagsArea::allocate( |
| getGlobals()->getDefaultHeap()); |
| getGlobals()->castToExExeStmtGlobals()-> |
| setGlobDiagsArea(mainDiags); |
| mainDiags->decrRefCount(); |
| } |
| |
| mainDiags->mergeAfter(*arkcmp_->getDiags()); |
| |
| return WORK_BAD_ERROR; |
| } |
| |
| } // end if !SUCCESS |
| |
| step_ = GET_REPLY_BUFFER_; |
| } |
| break; |
| |
| case GET_REPLY_BUFFER_: |
| { |
| if (arkcmp_->breakReceived()) |
| { |
| // Break signal was received. Return as quickly as possible |
| // to the scheduler, after setting a flag in the CLI globals |
| // that is used by mxci to determine whether a message |
| // should be returned to indicate that RECOVER needs to |
| // be run. |
| master_glob->getCliGlobals()->setSPBreakReceived(TRUE); |
| return WORK_BAD_ERROR; |
| } |
| else |
| master_glob->getCliGlobals()->setSPBreakReceived(FALSE); |
| |
| if (arkcmp_->badConnection()) |
| { |
| ComDiagsArea* da = getGlobals()-> |
| castToExExeStmtGlobals()->getDiagsArea(); |
| NABoolean alreadyHadGlobalDA = da ? TRUE : FALSE; |
| |
| arkcmp_->getServer()->getControlConnection()-> |
| populateDiagsArea( da, getHeap()); |
| |
| if (alreadyHadGlobalDA) |
| { |
| // Preceding populateDiagsArea call has |
| // added a ComCondition to existing DA, so |
| // there's nothing more to do. |
| } |
| else |
| { |
| // Preceding populateDiagsArea call has |
| // created a DiagsArea, added a ComCondition, |
| // and side-effected da so make this da |
| // the global DA. |
| getGlobals()->castToExExeStmtGlobals()-> |
| setGlobDiagsArea(da); |
| da->decrRefCount(); |
| } |
| return WORK_BAD_ERROR; |
| } |
| |
| if (arkcmp_->status() != ExSqlComp::FINISHED) |
| { |
| if (spTdb().getUseExistingArkcmp()) { |
| return WORK_CALL_AGAIN; |
| } |
| return WORK_OK; // come back later. |
| } |
| |
| // find an empty buffer where data will be returned. |
| returnedBuffer_ = pool_->get_free_buffer(0); |
| if (returnedBuffer_ == NULL) |
| return WORK_POOL_BLOCKED; |
| |
| ULng32 returnedBuflen_ = returnedBuffer_->get_buffer_size(); |
| char * rb = (char *)returnedBuffer_; |
| ExSqlComp::ReturnStatus status = |
| arkcmp_->getReply(rb, |
| returnedBuflen_, |
| returnedBuflen_, |
| requestId_); |
| if (status == ExSqlComp::MOREDATA) |
| { |
| // issue a getNext so arkcmp_ can prepare for |
| // the next buffer |
| // waited if sharing arkcmp (last argument) |
| status = arkcmp_->getNext(returnedBuflen_, requestId_, spTdb().getUseExistingArkcmp()); |
| } |
| |
| if (status == ExSqlComp::ERROR) |
| { |
| ex_assert( |
| arkcmp_->getDiags(), |
| "ExSqlComp::getReply returns error, but no diagnostics"); |
| |
| ComDiagsArea *mainDiags = getGlobals()-> |
| castToExExeStmtGlobals()->getDiagsArea(); |
| |
| if (mainDiags == NULL) |
| { |
| mainDiags = ComDiagsArea::allocate( |
| getGlobals()->getDefaultHeap()); |
| getGlobals()->castToExExeStmtGlobals()-> |
| setGlobDiagsArea(mainDiags); |
| mainDiags->decrRefCount(); |
| } |
| |
| mainDiags->mergeAfter(*arkcmp_->getDiags()); |
| |
| return WORK_BAD_ERROR; |
| } |
| |
| returnedBuffer_->driveUnpack(); |
| |
| step_ = RETURN_ROWS_; |
| } |
| break; |
| |
| case RETURN_ROWS_: |
| { |
| // make sure we have space in the up queue |
| if(qparent_.up->isFull()) |
| return WORK_OK; |
| |
| ex_queue_entry * up_entry = qparent_.up->getTailEntry(); |
| |
| tupp p; |
| ComDiagsArea* diags; |
| if (returnedBuffer_->moveOutSendOrReplyData(FALSE, // reply data |
| &(up_entry->upState), |
| p, NULL, &diags) |
| == TRUE) // no more rows in this buffer |
| { |
| // when we get a buffer from arkcmp, all tupp_descriptors in the |
| // buffer have a refcount of 0. To make sure that the buffer is not |
| // being reused it's status was IN_USE. Now we have put all |
| // the tupps in the buffer into the up queue, i.e., they have a |
| // refcount > 0. |
| // Mark the buffer as being FULL. |
| // It will be reused later after all tupp_descriptors have been |
| // consumed and reference count of all have become 0. |
| returnedBuffer_->bufferFull(); |
| |
| if (numInputRequests_ > 0) |
| { |
| // still more input requests. Get the |
| // next buffer from arkcmp. |
| if (spTdb().isExecuteInLocalProcess()) |
| step_ = PROCESS_GETNEXT_; |
| else |
| step_ = GET_REPLY_BUFFER_; |
| } |
| else |
| { |
| // done with all the inputs. |
| inputBuffer_->init(inputBuffer_->get_buffer_size()); |
| step_ = BUILD_INPUT_BUFFER_; |
| } |
| return WORK_CALL_AGAIN; |
| } |
| |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| ExStoredProcPrivateState & pstate |
| = *((ExStoredProcPrivateState *) pentry_down->pstate); |
| |
| ex_queue::up_status status = up_entry->upState.status; |
| |
| switch(status) |
| { |
| case ex_queue::Q_OK_MMORE: |
| { |
| if (pstate.errorHappened_) |
| { |
| // just ignore this row -- we've already returned an error. |
| } |
| else |
| { |
| // Copy the pointers to the input data |
| up_entry->copyAtp(pentry_down); |
| |
| // set the new pointer |
| up_entry->getTupp(spTdb().criDescUp_->noTuples()-1) =p; |
| |
| // Is there a selection predicate? |
| if (spTdb().predExpr_) { |
| ex_expr::exp_return_type retCode = |
| (spTdb().predExpr_->eval(up_entry->getAtp(), 0)); |
| if (retCode == ex_expr::EXPR_FALSE) { |
| break; |
| } |
| if (retCode == ex_expr::EXPR_ERROR) { |
| up_entry->upState.status = ex_queue::Q_SQLERROR; |
| up_entry->upState.parentIndex = pentry_down->downState.parentIndex; |
| up_entry->upState.downIndex = qparent_.down->getHeadIndex(); |
| up_entry->upState.setMatchNo(pstate.matchCount_); |
| qparent_.up->insert(); |
| pstate.errorHappened_ = TRUE; |
| break; |
| } |
| } |
| up_entry->upState.parentIndex = pentry_down->downState.parentIndex; |
| up_entry->upState.downIndex = qparent_.down->getHeadIndex(); |
| pstate.matchCount_++; |
| up_entry->upState.setMatchNo(pstate.matchCount_); |
| qparent_.up->insert(); |
| } |
| } |
| break; |
| |
| case ex_queue::Q_NO_DATA: |
| { |
| if (diags) |
| { |
| ComDiagsArea * da = |
| ComDiagsArea::allocate( |
| getGlobals()->getDefaultHeap()); |
| da->unpackObj(diags->getType(), |
| diags->getVersion(), |
| TRUE, // sameEndianness, |
| 0, // dummy for now... |
| (IpcConstMessageBufferPtr) diags); |
| up_entry->setDiagsArea(da); |
| } |
| |
| |
| // done with this input request. |
| up_entry->upState.status = ex_queue::Q_NO_DATA; |
| up_entry->upState.parentIndex = |
| pentry_down->downState.parentIndex; |
| up_entry->upState.setMatchNo(pstate.matchCount_); |
| |
| // insert into parent |
| qparent_.up->insert(); |
| |
| pstate.matchCount_ = 0; |
| pstate.errorHappened_ = FALSE; |
| |
| qparent_.down->removeHead(); |
| |
| numInputRequests_--; |
| } |
| break; |
| |
| case ex_queue::Q_SQLERROR: |
| { |
| if (pstate.errorHappened_) |
| { |
| // just ignore this row -- we've already returned an error. |
| } |
| else |
| { |
| ComDiagsArea * da = |
| ComDiagsArea::allocate( |
| getGlobals()->getDefaultHeap()); |
| da->unpackObj(diags->getType(), |
| diags->getVersion(), |
| TRUE, // sameEndianness, |
| 0, // dummy for now... |
| (IpcConstMessageBufferPtr) diags); |
| up_entry->setDiagsArea(da); |
| |
| // insert into parent |
| qparent_.up->insert(); |
| } |
| } |
| break; |
| |
| case ex_queue::Q_INVALID: |
| ex_assert(0, |
| "ExStoredProcTcb::work() Invalid state returned by arkcmp"); |
| break; |
| } // switch status |
| } |
| break; |
| |
| } // switch step_ |
| } // while |
| |
| return WORK_OK; |
| } |
| |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| // Constructor and destructor for private state. |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| ExStoredProcPrivateState::ExStoredProcPrivateState(const ExStoredProcTcb * |
| /*tcb*/) |
| { |
| matchCount_ = 0; |
| errorHappened_ = FALSE; |
| } |
| |
| ExStoredProcPrivateState::~ExStoredProcPrivateState() |
| { |
| }; |
| |
| ex_tcb_private_state * ExStoredProcPrivateState::allocate_new(const ex_tcb *tcb) |
| { |
| return new(((ex_tcb *)tcb)->getSpace()) ExStoredProcPrivateState((ExStoredProcTcb *) tcb); |
| }; |
| |
| ///////////////////////////////////////////////// |
| // class ExSPInputOutput |
| ///////////////////////////////////////////////// |
| |
| //////////////////////////////////////////////////////////////////// |
| // Moves data value from fieldNum'th position in inputRow to 'data. |
| // fieldNum is zero based, that is, the first field is number 0. |
| // Returns -1 in case of error. diagsArea contains the error number |
| // in this case (well, almost always). |
| //////////////////////////////////////////////////////////////////// |
| short ExSPInputOutput::inputValue(ULng32 fieldNum, char * inputRow, |
| char * data, ULng32 datalen, NABoolean casting, |
| ComDiagsArea * diagsArea) |
| { |
| Attributes * attr = tupleDesc_->getAttr((short)fieldNum); |
| short varcharlen = 2; |
| |
| // diagsArea must be present. And fieldNum must be within range. |
| if ((diagsArea == NULL) || |
| (fieldNum < 0) || |
| (fieldNum >= tupleDesc_->numAttrs())) |
| { |
| // return error |
| return -1; |
| } |
| |
| if (casting == FALSE) |
| convDoIt(&inputRow[attr->getOffset()], |
| attr->getLength(&inputRow[attr->getVCLenIndOffset()]), |
| attr->getDatatype(), |
| attr->getPrecision(), |
| attr->getScale(), |
| |
| // Pass in pointer to target data. |
| // if varchar, then data points to varchar length and |
| // 'data + varcharIndLen' points to the real data. |
| &data[((attr->getVCIndicatorLength() <= 0) ? 0 : |
| attr->getVCIndicatorLength())], |
| attr->getLength(), |
| attr->getDatatype(), |
| attr->getPrecision(), |
| attr->getScale(), |
| (attr->getVCIndicatorLength() > 0 ? data : NULL), |
| attr->getVCIndicatorLength(), |
| NULL, /*CollHeap *heap,*/ |
| &diagsArea, |
| CONV_UNKNOWN_LEFTPAD); |
| else |
| { |
| // TBD: |
| // if (datalen < (displayLength(attr) + |
| // ((attr->getVCIndicatorLength() > 0) ? |
| // attr->getVCIndicatorLength() : 0)) |
| // ; // return error. |
| |
| // target has to be formatted as a varchar. The first two bytes |
| // of target will contain the actual target length, followed |
| // by data bytes. |
| convDoIt(&inputRow[attr->getOffset()], |
| attr->getLength(&inputRow[attr->getVCLenIndOffset()]), |
| attr->getDatatype(), |
| attr->getPrecision(), |
| attr->getScale(), |
| &data[varcharlen], |
| datalen - varcharlen, |
| REC_BYTE_V_ASCII, |
| 0, |
| 0, |
| data, |
| varcharlen, |
| NULL, /*CollHeap *heap,*/ |
| &diagsArea, |
| CONV_UNKNOWN); |
| } |
| |
| return 0; |
| } |
| |
| //////////////////////////////////////////////////////////////////// |
| // Moves data value from 'data' to fieldNum'th position in |
| // outputRow. |
| // fieldNum is zero based, that is, the first field is number 0. |
| // Returns -1 in case of error. diagsArea contains the error number |
| // in this case (well, almost always). |
| //////////////////////////////////////////////////////////////////// |
| short ExSPInputOutput::outputValue(ULng32 fieldNum, |
| char * outputRow, |
| char * data, |
| ULng32 datalen, |
| NABoolean casting, |
| CollHeap * heap, |
| ComDiagsArea * diagsArea) |
| { |
| Attributes * attr = tupleDesc_->getAttr((short)fieldNum); |
| ex_expr::exp_return_type status = ex_expr::EXPR_OK; |
| |
| short nullIndicatorLen = attr->getNullIndicatorLength(), |
| varcharIndLen = attr->getVCIndicatorLength(), |
| scale = attr->getScale(), |
| dataType = attr->getDatatype(); |
| |
| Lng32 nullIndOffset = attr->getNullIndOffset(), |
| varcharIndOffset = attr->getVCLenIndOffset(), |
| precision = attr->getPrecision(); |
| |
| // diagsArea must be present. And fieldNum must be within range. |
| if ((diagsArea == NULL) || |
| (fieldNum < 0) || |
| (fieldNum >= tupleDesc_->numAttrs())) |
| { |
| // return error |
| ExRaiseSqlError(heap, &diagsArea, EXE_FIELD_NUM_OVERFLOW); |
| return ex_expr::EXPR_ERROR; |
| } |
| |
| if (data == NULL) |
| { |
| // a null value has been passed in. Move in a null value. |
| |
| if (attr->getNullFlag()) |
| { |
| // make the result null. |
| str_pad(&outputRow[nullIndOffset], |
| nullIndicatorLen, |
| '\377'); |
| return status; |
| } |
| else |
| { |
| // return error |
| ExRaiseSqlError(heap, &diagsArea, EXE_ASSIGNING_NULL_TO_NOT_NULL); |
| return ex_expr::EXPR_ERROR; |
| } |
| } // null value |
| |
| if (casting == FALSE) |
| status = convDoIt( |
| // Pass in pointer to real data. |
| // if varchar, then data points to varchar length and |
| // 'data + varcharIndLen' points to the real data. |
| &data[((varcharIndLen <= 0) ? 0 : varcharIndLen)], |
| attr->getLength(data), |
| dataType, |
| precision, |
| scale, |
| &outputRow[attr->getOffset()], |
| attr->getLength(), |
| dataType, |
| precision, |
| scale, |
| &outputRow[varcharIndOffset], |
| varcharIndLen, |
| heap, |
| &diagsArea, |
| CONV_UNKNOWN_LEFTPAD); |
| else |
| status = convDoIt(&data[0], |
| datalen, |
| REC_BYTE_F_ASCII, |
| precision, |
| scale, |
| &outputRow[attr->getOffset()], |
| attr->getLength(), |
| dataType, |
| precision, |
| scale, |
| (varcharIndLen > 0 ? |
| &outputRow[varcharIndOffset] : NULL), |
| varcharIndLen, |
| heap, |
| &diagsArea, |
| (getCaseIndexArray() ? getCaseIndexArray()[fieldNum] |
| : CONV_UNKNOWN_LEFTPAD)); |
| |
| if ((status == ex_expr::EXPR_OK) && attr->getNullFlag()) |
| { |
| // make the result not null. |
| str_pad(&outputRow[nullIndOffset], |
| nullIndicatorLen, |
| '\0'); |
| } |
| return status; |
| } |
| |
| // Moved to cli/StoredProcInterface.cpp |
| |