| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| /* -*-C++-*- |
| ***************************************************************************** |
| * |
| * File: <file> |
| * Description: |
| * |
| * |
| * Created: 7/10/95 |
| * Language: C++ |
| * |
| * |
| * |
| * |
| ***************************************************************************** |
| */ |
| |
| #include "ex_stdh.h" |
| #include "ComTdb.h" |
| #include "ex_tcb.h" |
| #include "ex_tuple_flow.h" |
| #include "ex_exe_stmt_globals.h" |
| #include "ex_expr.h" |
| #include "str.h" |
| #include "ExStats.h" |
| #include "ExpError.h" |
| #ifndef __EID |
| #include "cli_stdh.h" |
| #endif |
| |
| |
| ///////////////////////////////////////////////////////////////////////// |
| // |
| // TDB procedures |
| // |
| ///////////////////////////////////////////////////////////////////////// |
| |
| ex_tcb * ExTupleFlowTdb::build(ex_globals * glob) |
| { |
| // first build the children |
| ex_tcb * src_tcb = tdbSrc_->build(glob); |
| ex_tcb * tgt_tcb = tdbTgt_->build(glob); |
| |
| ExTupleFlowTcb * tuple_flow_tcb = |
| new(glob->getSpace()) ExTupleFlowTcb(*this, |
| *src_tcb, *tgt_tcb, glob); |
| |
| // add the tuple_flow_tcb to the schedule |
| tuple_flow_tcb->registerSubtasks(); |
| |
| return (tuple_flow_tcb); |
| } |
| |
| |
| ///////////////////////////////////////////////////////////////////////// |
| // |
| // TCB procedures |
| // |
| ///////////////////////////////////////////////////////////////////////// |
| ExTupleFlowTcb::ExTupleFlowTcb(const ExTupleFlowTdb & tuple_flow_tdb, |
| const ex_tcb & src_tcb, // src queue pair |
| const ex_tcb & tgt_tcb, // tgt queue pair |
| ex_globals *glob |
| ) : ex_tcb( tuple_flow_tdb, 1, glob) |
| { |
| CollHeap * space = glob->getSpace(); |
| |
| // LCOV_EXCL_START |
| // Allocate the buffer pool, if tgtExpr_ is present |
| if (tuple_flow_tdb.tgtExpr_) |
| #pragma nowarn(1506) // warning elimination |
| pool_ = new(glob->getSpace()) sql_buffer_pool(tuple_flow_tdb.numBuffers_, |
| tuple_flow_tdb.bufferSize_, |
| glob->getSpace()); |
| #pragma warn(1506) // warning elimination |
| // LCOV_EXCL_STOP |
| |
| tcbSrc_ = &src_tcb; |
| tcbTgt_ = &tgt_tcb; |
| |
| // get the queues that src and tgt use to communicate with me |
| qSrc_ = src_tcb.getParentQueue(); |
| qTgt_ = tgt_tcb.getParentQueue(); |
| |
| ex_cri_desc * from_parent_cri = tuple_flow_tdb.criDescDown_; |
| ex_cri_desc * to_parent_cri = tuple_flow_tdb.criDescUp_; |
| |
| // Allocate the queue to communicate with parent |
| qParent_.down = new(space) ex_queue(ex_queue::DOWN_QUEUE, |
| tuple_flow_tdb.queueSizeDown_, |
| from_parent_cri, |
| space); |
| |
| // source atps will be copied to target atps. Allocate |
| // target atps. |
| qTgt_.down->allocateAtps(space); |
| |
| // Allocate the private state in each entry of the down queue |
| ExTupleFlowPrivateState *p = |
| new(space) ExTupleFlowPrivateState(this); |
| qParent_.down->allocatePstate(p, this); |
| delete p; |
| |
| qParent_.up = new(space) ex_queue(ex_queue::UP_QUEUE, |
| tuple_flow_tdb.queueSizeUp_, |
| to_parent_cri, |
| space); |
| |
| // LCOV_EXCL_START |
| // fixup expressions |
| if (tflowTdb().tgtExpr_) |
| (void) tflowTdb().tgtExpr_->fixup(0, getExpressionMode(), this, |
| glob->getSpace(), glob->getDefaultHeap(), FALSE, glob); |
| // LCOV_EXCL_STOP |
| |
| } |
| |
| ExTupleFlowTcb::~ExTupleFlowTcb() |
| { |
| delete qParent_.up; |
| delete qParent_.down; |
| freeResources(); |
| } |
| |
| void ExTupleFlowTcb::freeResources() |
| { |
| delete pool_; |
| pool_ = 0; |
| } |
| |
| short ExTupleFlowTcb::work() |
| { |
| // This is some sort of a hack to fix the problems with the number of rows |
| // inserted returned to the user for packed tables. For these tables, rows |
| // are packed (by the Pack node which is the left child of this tuple flow) |
| // before they are sent off to DP2. DP2 has no idea that it's actually |
| // inserting multiple logical rows (as perceived by the user). However, |
| // there is actually a hidden count of logical rows stored as the first 4 |
| // bytes of the packed row. This counter is supposed to keep track of a sum |
| // of this number in each packed row it gets from the left. When all |
| // insertions are done, this sum is used to override what's stored by the |
| // PA node in the executor global area the number of rows inserted. This is |
| // not a very good place to have this fix, but since this is a low-priority |
| // problem at this time, here we are. |
| // |
| // |
| // NB: The code introduced for this fix |
| // could be safely removed if desired. Also, all changes are within |
| // this file. |
| // |
| |
| if (qParent_.down->isEmpty()) |
| return WORK_OK; |
| |
| ex_queue_entry * pentry_down = qParent_.down->getHeadEntry(); |
| ExTupleFlowPrivateState & pstate = |
| *((ExTupleFlowPrivateState*) pentry_down->pstate); |
| |
| if ((tflowTdb().userSidetreeInsert()) && |
| (pentry_down->downState.request == ex_queue::GET_EOD) && |
| (NOT pstate.parentEOD_)) |
| { |
| pstate.step_ = MOVE_EOD_TO_TGT_; |
| } |
| else if ((pstate.step_ != DONE_) && |
| (pstate.step_ != CANCELLED_) && |
| (pentry_down->downState.request == ex_queue::GET_NOMORE)) |
| { |
| if (pstate.step_ == EMPTY_) |
| pstate.step_ = DONE_; |
| else |
| pstate.step_ = CANCELLED_; |
| } |
| |
| while (1) |
| { |
| switch (pstate.step_) |
| { |
| case EMPTY_: |
| { |
| if (qSrc_.down->isFull()) |
| return WORK_OK; |
| |
| ex_queue_entry * src_entry = qSrc_.down->getTailEntry(); |
| |
| src_entry->downState.request = pentry_down->downState.request; |
| src_entry->downState.requestValue = |
| pentry_down->downState.requestValue; |
| |
| if ((tflowTdb().firstNRows() >= 0) && |
| (pentry_down->downState.request != ex_queue::GET_N)) |
| { |
| src_entry->downState.request = ex_queue::GET_N; |
| src_entry->downState.requestValue = tflowTdb().firstNRows(); |
| } |
| |
| src_entry->downState.parentIndex = |
| qParent_.down->getHeadIndex(); |
| |
| src_entry->passAtp(pentry_down); |
| |
| qSrc_.down->insert(); |
| |
| // just checking to make sure we got a diags area from the CLI if we are |
| // executing a non-tomic insert. This is done now so that we don't have to do it in multiple |
| // places later. |
| if (tflowTdb().isNonFatalErrorTolerated()) { |
| ComDiagsArea *cliDiagsArea = pentry_down->getDiagsArea(); |
| ex_assert(cliDiagsArea, "In Tupleflow : Non-Atomic insert received no diags area from the CLI"); |
| } |
| |
| pstate.parentEOD_ = FALSE; |
| pstate.srcEOD_ = FALSE; |
| pstate.matchCount_ = 0; |
| pstate.tgtRequests_ = 0; |
| pstate.tgtRowsSent_ = FALSE; |
| pstate.noOfUnPackedRows_ = 0; |
| pstate.srcRequestCount_ = -1; |
| pstate.nonFatalErrorSeen_ = FALSE; |
| // Set startRightIndex_ so that CancelReques doesn't do anything. |
| pstate.startRightIndex_ = pstate.srcRequestCount_; |
| pstate.step_ = MOVE_SRC_TO_TGT_; |
| } |
| break; |
| |
| case MOVE_SRC_TO_TGT_: |
| { |
| // if there are some rows in source up queue, move them to target. |
| while ((! qSrc_.up->isEmpty()) && (! qTgt_.down->isFull()) |
| && (pstate.step_ != HANDLE_ERROR_)) |
| { |
| ex_queue_entry * src_entry = qSrc_.up->getHeadEntry(); |
| ex_queue_entry * tgt_entry = qTgt_.down->getTailEntry(); |
| |
| switch (src_entry->upState.status) |
| { |
| case ex_queue::Q_OK_MMORE: |
| { |
| // move this source row to target. |
| |
| // LCOV_EXCL_START |
| // BEGIN: - Read note at beginning of work(). |
| // |
| if (tcbSrc_->getNodeType() == ComTdb::ex_PACKROWS) |
| { |
| char* packTuppPtr = |
| src_entry->getTupp(src_entry->numTuples()-1) |
| .getDataPointer(); |
| Int32 noOfRows = *((Int32 *)packTuppPtr); |
| pstate.noOfUnPackedRows_ += (noOfRows - 1); |
| |
| } |
| // |
| // END:- Read note at beginning of work(). |
| // LCOV_EXCL_STOP |
| |
| pstate.srcRequestCount_++; |
| tgt_entry->downState.request = |
| pentry_down->downState.request; |
| tgt_entry->downState.requestValue = |
| pentry_down->downState.requestValue; |
| tgt_entry->downState.parentIndex = |
| (Lng32) pstate.srcRequestCount_; |
| tgt_entry->copyAtp(src_entry); |
| qTgt_.down->insert(); |
| pstate.tgtRequests_++; |
| pstate.tgtRowsSent_ = TRUE; |
| qSrc_.up->removeHead(); |
| } |
| break; |
| |
| case ex_queue::Q_NO_DATA: |
| { |
| if ((tflowTdb().vsbbInsertOn()) && |
| (pstate.tgtRowsSent_ == TRUE)) |
| { |
| if (tflowTdb().userSidetreeInsert()) |
| { |
| tgt_entry->downState.request = |
| ex_queue::GET_EOD_NO_ST_COMMIT; |
| } |
| else |
| { |
| tgt_entry->downState.request = |
| ex_queue::GET_EOD; |
| } |
| |
| tgt_entry->downState.requestValue = |
| pentry_down->downState.requestValue; |
| tgt_entry->downState.parentIndex = |
| (Lng32) pstate.srcRequestCount_; |
| tgt_entry->copyAtp(src_entry); |
| |
| qTgt_.down->insert(); |
| pstate.tgtRequests_++; |
| } |
| |
| // LCOV_EXCL_START |
| if ((pstate.tgtRowsSent_ == FALSE) && |
| (src_entry->getDiagsArea())) |
| { |
| // a warning is returned with EOD and |
| // nothing else was returned from source. |
| // Move warning to parent's up queue. |
| if (qParent_.up->isFull()) |
| return WORK_OK; |
| |
| ex_queue_entry * up_entry = |
| qParent_.up->getTailEntry(); |
| up_entry->setDiagsArea(src_entry->getDiagsArea()); |
| } |
| // LCOV_EXCL_STOP |
| |
| qSrc_.up->removeHead(); |
| |
| pstate.srcEOD_ = TRUE; |
| |
| // LCOV_EXCL_START |
| if (tflowTdb().sendEODtoTgt()) |
| pstate.step_ = MOVE_EOD_TO_TGT_; |
| // LCOV_EXCL_STOP |
| } |
| break; |
| |
| case ex_queue::Q_SQLERROR: |
| { |
| if (qParent_.up->isFull()) |
| return WORK_OK; |
| |
| ex_queue_entry * pentry = qParent_.up->getTailEntry(); |
| ComDiagsArea * da = src_entry->getDiagsArea(); |
| ex_assert(da, "We have a Q_SQLERROR in Tupleflow but no diags area"); |
| |
| if (tflowTdb().isNonFatalErrorTolerated() && |
| (da->getNextRowNumber(ComCondition::NONFATAL_ERROR) == |
| ComCondition::NONFATAL_ERROR)) |
| { |
| pstate.nonFatalErrorSeen_ = TRUE; |
| } |
| else |
| { |
| pstate.step_ = HANDLE_ERROR_; |
| pstate.nonFatalErrorSeen_ = FALSE; |
| } |
| |
| pstate.srcRequestCount_++; |
| if(tflowTdb().isRowsetIterator()) |
| da->setAllRowNumber((Lng32) pstate.srcRequestCount_); |
| |
| ComDiagsArea *accumulatedDiagsArea = pentry->getDiagsArea(); |
| if (accumulatedDiagsArea) |
| { |
| accumulatedDiagsArea->mergeAfter(*da); |
| if (!(accumulatedDiagsArea->canAcceptMoreErrors()) && |
| tflowTdb().isNonFatalErrorTolerated()) |
| { |
| pstate.nonFatalErrorSeen_ = FALSE; |
| pstate.step_ = HANDLE_ERROR_; |
| } |
| } |
| else |
| { |
| pentry->setDiagsArea(da); |
| da->incrRefCount(); |
| accumulatedDiagsArea = da ; |
| if (tflowTdb().isNonFatalErrorTolerated()) |
| { |
| ComDiagsArea *cliDiagsArea = pentry_down->getDiagsArea(); |
| da->setLengthLimit(cliDiagsArea->getLengthLimit()); |
| } |
| } |
| |
| // For Non-Fatal errors we will remove this Q_SQLERROR reply from the |
| // left child right below as we will continue to stay in this state (MOVE_SRC_TO_TGT_). |
| // For fatal errors this Q_SQLERROR reply is removed in HANDLE_ERROR step to which |
| // we will transition immediately. |
| if (pstate.nonFatalErrorSeen_ == TRUE) |
| qSrc_.up->removeHead(); |
| } |
| break; |
| |
| case ex_queue::Q_REC_SKIPPED: |
| { |
| pstate.srcRequestCount_++; |
| ComDiagsArea * da = src_entry->getDiagsArea(); |
| if (da) |
| pstate.nonFatalErrorSeen_ = TRUE; |
| qSrc_.up->removeHead(); |
| } |
| break; |
| |
| default: |
| { |
| ex_assert(0, "ExTupleFlowTcb::work() Error returned from src"); // LCOV_EXCL_LINE |
| } |
| break; |
| } // switch |
| |
| } // while |
| |
| // if the child reply is not an Q_SQLERROR, then process target |
| if ((pstate.step_ != HANDLE_ERROR_) && |
| (pstate.step_ != MOVE_EOD_TO_TGT_)) |
| pstate.step_ = PROCESS_TGT_; |
| } // MOVE_SRC_TO_TGT |
| break; |
| |
| case MOVE_EOD_TO_TGT_: |
| { |
| pstate.parentEOD_ = TRUE; |
| |
| if (qTgt_.down->isFull()) |
| return WORK_OK; |
| |
| ex_queue_entry * tgt_entry = qTgt_.down->getTailEntry(); |
| tgt_entry->downState.request = ex_queue::GET_EOD; |
| |
| tgt_entry->downState.requestValue = |
| pentry_down->downState.requestValue; |
| tgt_entry->downState.parentIndex = |
| qParent_.down->getHeadIndex(); |
| //tgt_entry->passAtp(pentry_down); |
| |
| qTgt_.down->insert(); |
| |
| pstate.tgtRequests_++; |
| |
| // LCOV_EXCL_START |
| if (tflowTdb().sendEODtoTgt()) |
| pstate.srcEOD_ = TRUE; |
| // LCOV_EXCL_STOP |
| else |
| pstate.srcEOD_ = FALSE; |
| |
| pstate.step_ = PROCESS_TGT_; |
| } |
| break; |
| |
| case PROCESS_TGT_: |
| { |
| while (! qTgt_.up->isEmpty() && pstate.step_ != HANDLE_ERROR_) |
| { |
| ex_queue_entry * tgt_entry = qTgt_.up->getHeadEntry(); |
| switch (tgt_entry->upState.status) |
| { |
| case ex_queue::Q_OK_MMORE: |
| { |
| if (!tflowTdb().isNonFatalErrorTolerated()) |
| { |
| // ex_assert(0, "ExTupleFlowTcb::work() OK_MMORE from tgt"); |
| if (qParent_.up->isFull()) |
| return WORK_OK; |
| |
| ex_queue_entry * pentry = qParent_.up->getTailEntry(); |
| |
| pentry->upState.status = ex_queue::Q_OK_MMORE; |
| pentry->upState.downIndex = qParent_.down->getHeadIndex(); |
| pentry->upState.parentIndex = pentry_down->downState.parentIndex; |
| pentry->upState.setMatchNo(pstate.matchCount_); |
| |
| // copy input tupps from parent request |
| pentry->copyAtp(pentry_down); |
| |
| // copy child's atp to |
| // the output atp (to parent's up queue) |
| pentry->copyAtp(tgt_entry); |
| |
| // insert into parent up queue |
| qParent_.up->insert(); |
| } |
| else |
| { |
| ComDiagsArea * da = tgt_entry->getDiagsArea(); |
| ex_assert(da, "We have a Q_OK_MMORE in Tupleflow but no diags area"); |
| if (da->mainSQLCODE() != 0) { |
| // Non-atomic Rowsets sends OK_MMORE with non-empty diags from child |
| // empty diags (mainsqlcode == 0) implies OK_MMORE sent by ignoreDupKey code |
| // when NAR is on, for -8102 error. Just consume the OK_MMORE. |
| |
| if(tflowTdb().isRowsetIterator()) |
| { |
| da->setAllRowNumber(Lng32 (tgt_entry->upState.parentIndex)); |
| } |
| |
| pstate.nonFatalErrorSeen_ = TRUE; |
| ex_queue_entry * pentry = qParent_.up->getTailEntry(); |
| ComDiagsArea *accumulatedDiagsArea = pentry->getDiagsArea(); |
| if (accumulatedDiagsArea) |
| { |
| accumulatedDiagsArea->mergeAfter(*da); |
| if (!(accumulatedDiagsArea->canAcceptMoreErrors()) && |
| tflowTdb().isNonFatalErrorTolerated()) |
| { |
| pstate.nonFatalErrorSeen_ = FALSE; |
| pstate.step_ = HANDLE_ERROR_; |
| } |
| } |
| else |
| { |
| pentry->setDiagsArea(da); |
| da->incrRefCount(); |
| if (tflowTdb().isNonFatalErrorTolerated()) { |
| ComDiagsArea *cliDiagsArea = pentry_down->getDiagsArea(); |
| da->setLengthLimit(cliDiagsArea->getLengthLimit()); |
| } |
| } |
| } |
| } |
| |
| qTgt_.up->removeHead(); |
| } |
| break; |
| |
| case ex_queue::Q_NO_DATA: |
| { |
| ComDiagsArea * da = tgt_entry->getDiagsArea(); |
| if (da) |
| { |
| ex_queue_entry * pentry = qParent_.up->getTailEntry(); |
| ComDiagsArea *accumulatedDiagsArea = pentry->getDiagsArea(); |
| if (accumulatedDiagsArea) |
| accumulatedDiagsArea->mergeAfter(*da); |
| else |
| { |
| pentry->setDiagsArea(da); |
| da->incrRefCount(); |
| if (tflowTdb().isNonFatalErrorTolerated()) { |
| ComDiagsArea *cliDiagsArea = pentry_down->getDiagsArea(); |
| da->setLengthLimit(cliDiagsArea->getLengthLimit()); |
| } |
| } |
| } |
| pstate.matchCount_ += tgt_entry->upState.getMatchNo(); |
| qTgt_.up->removeHead(); |
| pstate.tgtRequests_--; |
| pstate.startRightIndex_++; |
| } |
| break; |
| |
| case ex_queue::Q_SQLERROR: |
| { |
| if (qParent_.up->isFull()) |
| return WORK_OK; |
| |
| ex_queue_entry * pentry = qParent_.up->getTailEntry(); |
| pentry->copyAtp(tgt_entry); |
| pstate.nonFatalErrorSeen_ = FALSE; |
| pstate.step_ = HANDLE_ERROR_; |
| |
| if(tflowTdb().isRowsetIterator()) |
| { |
| ex_queue_entry * pentry = qParent_.up->getTailEntry(); |
| ComDiagsArea *da = pentry->getDiagsArea(); |
| ex_assert(da, "To set RowNumber, an error condition must be present in the diags area"); |
| da->setAllRowNumber(Lng32 (tgt_entry->upState.parentIndex)); |
| } |
| |
| } |
| break; |
| |
| default: |
| { |
| ex_assert(0, "ExTupleFlowTcb::work() Error returned from tgt"); // LCOV_EXCL_LINE |
| } |
| break; |
| |
| } // switch |
| |
| } // while |
| |
| if (pstate.step_ == HANDLE_ERROR_) |
| break; |
| |
| // if source has returned EOD, |
| // and there are no pending requests in target's down |
| // queue, then we are done with this parent request. |
| if (((pstate.srcEOD_ == TRUE) || |
| (pstate.parentEOD_ == TRUE)) && |
| (qTgt_.down->isEmpty())) |
| pstate.step_ = DONE_; |
| else |
| { |
| if (NOT pstate.parentEOD_) |
| pstate.step_ = MOVE_SRC_TO_TGT_; |
| |
| if (qSrc_.up->isEmpty() || qTgt_.down->isFull()) |
| return WORK_OK; |
| else |
| return WORK_CALL_AGAIN; |
| } |
| } |
| break; |
| |
| case HANDLE_ERROR_: |
| { |
| ex_queue_entry * pentry = qParent_.up->getTailEntry(); |
| |
| pentry->upState.status = ex_queue::Q_SQLERROR; |
| pentry->upState.downIndex = qParent_.down->getHeadIndex(); |
| pentry->upState.parentIndex = pentry_down->downState.parentIndex; |
| pentry->upState.setMatchNo(pstate.matchCount_); |
| |
| ComDiagsArea *da = pentry->getDiagsArea(); |
| if (tflowTdb().isNonFatalErrorTolerated() && |
| !(da->canAcceptMoreErrors())) { |
| ComDiagsArea *cliDiagsArea = pentry_down->getDiagsArea(); |
| da->removeLastErrorCondition(); |
| *da << DgSqlCode(-EXE_NONATOMIC_FAILURE_LIMIT_EXCEEDED) |
| << DgInt0(cliDiagsArea->getLengthLimit()); |
| } |
| |
| // insert into parent up queue |
| qParent_.up->insert(); |
| |
| pstate.step_ = CANCELLED_; |
| } |
| break; |
| |
| case CANCELLED_: |
| { |
| qSrc_.down->cancelRequestWithParentIndex(qParent_.down->getHeadIndex()); |
| |
| // Cancel all the outstanding requests that have been sent to the target. |
| // Cancel all requests within given range (inclusive) |
| qTgt_.down->cancelRequestWithParentIndexRange((queue_index)pstate.startRightIndex_+1, |
| (queue_index)pstate.srcRequestCount_); |
| pstate.startRightIndex_ = pstate.srcRequestCount_; |
| |
| //ignore all rows from source child, till Q_NO_DATA is reached |
| while ((pstate.srcEOD_ != TRUE) && (!qSrc_.up->isEmpty())) |
| { |
| ex_queue_entry * src_entry = qSrc_.up->getHeadEntry(); |
| switch(src_entry->upState.status) |
| { |
| case ex_queue::Q_OK_MMORE: |
| case ex_queue::Q_SQLERROR: |
| case ex_queue::Q_REC_SKIPPED: |
| { |
| qSrc_.up->removeHead(); |
| } |
| break; |
| |
| case ex_queue::Q_NO_DATA: |
| { |
| pstate.srcEOD_ = TRUE; |
| qSrc_.up->removeHead(); |
| } |
| break; |
| |
| default: |
| { |
| ex_assert(0, "ExTupleFlowTcb::work() Error returned from src"); // LCOV_EXCL_LINE |
| } |
| break; |
| } |
| } |
| |
| //ignore all rows from target child, till Q_NO_DATA is reached |
| while (pstate.tgtRequests_ && !qTgt_.up->isEmpty()) |
| { |
| ex_queue_entry * tgt_entry = qTgt_.up->getHeadEntry(); |
| switch(tgt_entry->upState.status) |
| { |
| case ex_queue::Q_OK_MMORE: |
| case ex_queue::Q_SQLERROR: |
| { |
| qTgt_.up->removeHead(); |
| } |
| break; |
| |
| case ex_queue::Q_NO_DATA: |
| { |
| qTgt_.up->removeHead(); |
| pstate.tgtRequests_--; |
| } |
| break; |
| |
| default: |
| { |
| ex_assert(0, "ExTupleFlowTcb::work() Error returned from tgt"); // LCOV_EXCL_LINE |
| } |
| break; |
| } |
| } |
| |
| // if both source and target returned all the rows, |
| // insert Q_SQLERROR into the parent up queue |
| if ((pstate.srcEOD_ == TRUE) && !pstate.tgtRequests_) |
| { |
| pstate.step_ = DONE_; |
| } |
| else |
| return WORK_OK; |
| } |
| break; |
| |
| case DONE_: |
| { |
| if (qParent_.up->isFull()) |
| return WORK_OK; |
| |
| ex_queue_entry * pentry = qParent_.up->getTailEntry(); |
| |
| if (pstate.nonFatalErrorSeen_) { |
| ComDiagsArea *da = pentry->getDiagsArea(); |
| ComDiagsArea *cliDiagsArea = pentry_down->getDiagsArea(); |
| ex_assert((da || cliDiagsArea), "We have non-fatal errors in Tupleflow but no diags area"); |
| if (cliDiagsArea) { |
| if (da) |
| da->mergeAfter(*cliDiagsArea); |
| else |
| { |
| pentry->setDiagsArea(cliDiagsArea); |
| cliDiagsArea->incrRefCount(); |
| } |
| } |
| |
| if (cliDiagsArea->canAcceptMoreErrors()) { |
| ComDiagsArea *mergedDiagsArea = pentry->getDiagsArea(); |
| // used to make mainSQLCODE() return 30022 or 30035. |
| mergedDiagsArea->setNonFatalErrorSeen(TRUE); |
| NABoolean anyRowsAffected = FALSE; |
| |
| |
| // This tupleflow should be in the master for |
| // non-atomic rowsets. |
| ExMasterStmtGlobals *g = getGlobals()-> |
| castToExExeStmtGlobals()->castToExMasterStmtGlobals(); |
| ex_assert(g, "Rowset insert has a flow node that is not in the master executor"); |
| if (g->getRowsAffected() > 0) |
| anyRowsAffected = TRUE; |
| |
| if (anyRowsAffected) |
| *mergedDiagsArea << DgSqlCode(EXE_NONFATAL_ERROR_SEEN); |
| else |
| *mergedDiagsArea << DgSqlCode(EXE_NONFATAL_ERROR_ON_ALL_ROWS); |
| |
| } // we exceeded the Nonfatal error limit when merging with the CLI diags area |
| else { |
| pstate.step_ = HANDLE_ERROR_; |
| // will prevent us from merging the diags areas again |
| pstate.nonFatalErrorSeen_ = FALSE ; |
| break ; |
| } |
| } |
| |
| pentry->upState.status = ex_queue::Q_NO_DATA; |
| pentry->upState.downIndex = qParent_.down->getHeadIndex(); |
| pentry->upState.parentIndex = pentry_down->downState.parentIndex; |
| pentry->upState.setMatchNo(pstate.matchCount_); |
| |
| // LCOV_EXCL_START |
| // BEGIN: Read note at beginning of work(). |
| // |
| if(pstate.noOfUnPackedRows_ != 0) |
| { |
| ComDiagsArea *da = pentry->getDiagsArea(); |
| if (da == NULL) |
| { |
| da = ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| pentry->setDiagsArea(da); |
| } |
| da->addRowCount(pstate.noOfUnPackedRows_); |
| pstate.noOfUnPackedRows_ = 0; |
| } |
| // |
| // END: - Read note at beginning of work(). |
| // LCOV_EXCL_STOP |
| |
| // if stats are to be collected, collect them. |
| if (getStatsEntry()) |
| { |
| // nothing yet returned from right child or returned |
| // to parent. |
| getStatsEntry()->setActualRowsReturned(0); |
| } |
| |
| // insert into parent up queue |
| qParent_.up->insert(); |
| |
| pstate.step_ = EMPTY_; |
| qParent_.down->removeHead(); |
| |
| return WORK_CALL_AGAIN; // check for more down requests |
| |
| } |
| break; |
| |
| } // switch pstate.step_ |
| } // while |
| |
| #pragma nowarn(203) // warning elimination |
| return 0; |
| #pragma warn(203) // warning elimination |
| } |
| |
| |
| ///////////////////////////////////////////////////////////////////////// |
| // |
| // Private state procedures |
| // |
| ///////////////////////////////////////////////////////////////////////// |
| |
| ExTupleFlowPrivateState::ExTupleFlowPrivateState(const ExTupleFlowTcb * /*tcb*/) |
| { |
| init(); |
| } |
| |
| void ExTupleFlowPrivateState::init() |
| { |
| matchCount_ = 0; |
| tgtRequests_ = 0; |
| tgtRowsSent_ = FALSE; |
| srcEOD_ = FALSE; |
| parentEOD_ = FALSE; |
| noOfUnPackedRows_ = 0; |
| step_ = ExTupleFlowTcb::EMPTY_; |
| srcRequestCount_ = -1; |
| nonFatalErrorSeen_ = FALSE; |
| // parent index for target Q entries are eqaul to value of srcRequestCount_. |
| // therefore startRightIndex_ is initialized to the same value as srcRequestCount_ |
| startRightIndex_ = -1; |
| } |
| |
| ExTupleFlowPrivateState::~ExTupleFlowPrivateState() |
| { |
| } |
| |
| ex_tcb_private_state * ExTupleFlowPrivateState::allocate_new(const ex_tcb *tcb) |
| { |
| return new(((ex_tcb *)tcb)->getSpace()) ExTupleFlowPrivateState((ExTupleFlowTcb *) tcb); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |