| // ********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // ********************************************************************** |
| |
| #include "Platform.h" |
| |
| #include "ex_stdh.h" |
| #include "ComTdb.h" |
| #include "ex_tcb.h" |
| #include "ExHbaseAccess.h" |
| #include "ex_exe_stmt_globals.h" |
| #include "ExpHbaseInterface.h" |
| #include "hs_util.h" |
| #include "NLSConversion.h" |
| #include "ExHdfsScan.h" |
| #include "Context.h" |
| #include "HdfsClient_JNI.h" |
| |
| ExHbaseAccessInsertTcb::ExHbaseAccessInsertTcb( |
| const ExHbaseAccessTdb &hbaseAccessTdb, |
| ex_globals * glob ) : |
| ExHbaseAccessTcb( hbaseAccessTdb, glob), |
| step_(NOT_STARTED) |
| { |
| insertRowlen_ = 0; |
| } |
| |
| ExWorkProcRetcode ExHbaseAccessInsertTcb::work() |
| { |
| Lng32 retcode = 0; |
| short rc = 0; |
| |
| while (!qparent_.down->isEmpty()) |
| { |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| if (pentry_down->downState.request == ex_queue::GET_NOMORE) |
| step_ = CLOSE_AND_DONE; |
| |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| matches_ = 0; |
| |
| step_ = INSERT_INIT; |
| } |
| break; |
| |
| case INSERT_INIT: |
| { |
| retcode = ehi_->init(getHbaseAccessStats()); |
| if (setupError(retcode, "ExpHbaseInterface::init")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| table_.val = hbaseAccessTdb().getTableName(); |
| table_.len = strlen(hbaseAccessTdb().getTableName()); |
| |
| step_ = SETUP_INSERT; |
| } |
| break; |
| |
| case SETUP_INSERT: |
| { |
| step_ = EVAL_INSERT_EXPR; |
| } |
| break; |
| |
| case EVAL_INSERT_EXPR: |
| { |
| workAtp_->getTupp(hbaseAccessTdb().convertTuppIndex_) |
| .setDataPointer(convertRow_); |
| |
| if (convertExpr()) |
| { |
| ex_expr::exp_return_type evalRetCode = |
| convertExpr()->eval(pentry_down->getAtp(), workAtp_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| ExpTupleDesc * convertRowTD = |
| hbaseAccessTdb().workCriDesc_->getTupleDescriptor |
| (hbaseAccessTdb().convertTuppIndex_); |
| |
| for (Lng32 i = 0; i < convertRowTD->numAttrs(); i++) |
| { |
| Attributes * attr = convertRowTD->getAttr(i); |
| Lng32 len = 0; |
| if (attr) |
| { |
| if (attr->getVCIndicatorLength() == sizeof(short)) |
| len = *(short*)&convertRow_[attr->getVCLenIndOffset()]; |
| else |
| len = *(Lng32*)&convertRow_[attr->getVCLenIndOffset()]; |
| |
| switch (i) |
| { |
| case HBASE_ROW_ID_INDEX: |
| { |
| insRowId_.assign(&convertRow_[attr->getOffset()], len); |
| } |
| break; |
| |
| case HBASE_COL_FAMILY_INDEX: |
| { |
| insColFam_.assign(&convertRow_[attr->getOffset()], len); |
| } |
| break; |
| |
| case HBASE_COL_NAME_INDEX: |
| { |
| insColNam_.assign(&convertRow_[attr->getOffset()], len); |
| } |
| break; |
| |
| case HBASE_COL_VALUE_INDEX: |
| { |
| insColVal_.assign(&convertRow_[attr->getOffset()], len); |
| } |
| break; |
| |
| case HBASE_COL_TS_INDEX: |
| { |
| insColTS_ = (Int64*)&convertRow_[attr->getOffset()]; |
| } |
| break; |
| |
| } // switch |
| } // if attr |
| } // convertExpr |
| |
| step_ = PROCESS_INSERT; |
| } |
| break; |
| |
| case PROCESS_INSERT: |
| { |
| createDirectRowBuffer(insColFam_, insColNam_, insColVal_); |
| HbaseStr rowID; |
| rowID.val = (char *)insRowId_.data(); |
| rowID.len = insRowId_.size(); |
| retcode = ehi_->insertRow(table_, |
| rowID, |
| row_, |
| hbaseAccessTdb().useHbaseXn(), |
| hbaseAccessTdb().useRegionXn(), |
| *insColTS_, |
| FALSE); // AsyncOperations is always FALSE for native HBase |
| |
| if (setupError(retcode, "ExpHbaseInterface::insertRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (getHbaseAccessStats()) |
| getHbaseAccessStats()->incUsedRows(); |
| |
| matches_++; |
| |
| step_ = INSERT_CLOSE; |
| } |
| break; |
| |
| case INSERT_CLOSE: |
| { |
| retcode = ehi_->close(); |
| if (setupError(retcode, "ExpHbaseInterface::close")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = DONE; |
| } |
| break; |
| |
| case HANDLE_ERROR: |
| { |
| if (handleError(rc)) |
| return rc; |
| step_ = CLOSE_AND_DONE; |
| } |
| break; |
| |
| case DONE: |
| case CLOSE_AND_DONE: |
| { |
| if (step_ == CLOSE_AND_DONE) |
| ehi_->close(); |
| if (handleDone(rc, matches_)) |
| return rc; |
| step_ = NOT_STARTED; |
| } |
| break; |
| |
| } // switch |
| |
| } // while |
| |
| return WORK_OK; |
| } |
| |
| ExHbaseAccessInsertRowwiseTcb::ExHbaseAccessInsertRowwiseTcb( |
| const ExHbaseAccessTdb &hbaseAccessTdb, |
| ex_globals * glob ) : |
| ExHbaseAccessInsertTcb( hbaseAccessTdb, glob) |
| { |
| } |
| |
| ExWorkProcRetcode ExHbaseAccessInsertRowwiseTcb::work() |
| { |
| Lng32 retcode = 0; |
| short rc = 0; |
| |
| while (!qparent_.down->isEmpty()) |
| { |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| if (pentry_down->downState.request == ex_queue::GET_NOMORE) |
| step_ = CLOSE_AND_DONE; |
| |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| matches_ = 0; |
| |
| step_ = INSERT_INIT; |
| } |
| break; |
| |
| case INSERT_INIT: |
| { |
| retcode = ehi_->init(getHbaseAccessStats()); |
| if (setupError(retcode, "ExpHbaseInterface::init")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| table_.val = hbaseAccessTdb().getTableName(); |
| table_.len = strlen(hbaseAccessTdb().getTableName()); |
| |
| step_ = SETUP_INSERT; |
| } |
| break; |
| |
| case SETUP_INSERT: |
| { |
| step_ = EVAL_INSERT_EXPR; |
| } |
| break; |
| |
| case EVAL_INSERT_EXPR: |
| { |
| workAtp_->getTupp(hbaseAccessTdb().convertTuppIndex_) |
| .setDataPointer(convertRow_); |
| |
| if (convertExpr()) |
| { |
| ex_expr::exp_return_type evalRetCode = |
| convertExpr()->eval(pentry_down->getAtp(), workAtp_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| ExpTupleDesc * convertRowTD = |
| hbaseAccessTdb().workCriDesc_->getTupleDescriptor |
| (hbaseAccessTdb().convertTuppIndex_); |
| |
| for (Lng32 i = 0; i < convertRowTD->numAttrs(); i++) |
| { |
| Attributes * attr = convertRowTD->getAttr(i); |
| short len = 0; |
| if (attr) |
| { |
| len = *(short*)&convertRow_[attr->getVCLenIndOffset()]; |
| |
| switch (i) |
| { |
| case HBASE_ROW_ID_INDEX: |
| { |
| insRowId_.assign(&convertRow_[attr->getOffset()], len); |
| } |
| break; |
| |
| case HBASE_COL_DETAILS_INDEX: |
| { |
| char * convRow = &convertRow_[attr->getOffset()]; |
| |
| retcode = createDirectRowwiseBuffer(convRow); |
| } |
| break; |
| |
| } // switch |
| } // if attr |
| } // for |
| |
| step_ = PROCESS_INSERT; |
| } |
| break; |
| |
| case PROCESS_INSERT: |
| { |
| if (numColsInDirectBuffer() > 0) |
| { |
| HbaseStr rowID; |
| rowID.val = (char *)insRowId_.data(); |
| rowID.len = insRowId_.size(); |
| retcode = ehi_->insertRow(table_, |
| rowID, |
| row_, |
| hbaseAccessTdb().useHbaseXn(), |
| hbaseAccessTdb().useRegionXn(), |
| -1, //*insColTS_ |
| FALSE); // AsyncOperations is always FALSE for native HBase |
| |
| if (setupError(retcode, "ExpHbaseInterface::insertRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (getHbaseAccessStats()) |
| getHbaseAccessStats()->incUsedRows(); |
| |
| matches_++; |
| } |
| |
| step_ = INSERT_CLOSE; |
| } |
| break; |
| |
| case INSERT_CLOSE: |
| { |
| retcode = ehi_->close(); |
| if (setupError(retcode, "ExpHbaseInterface::close")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = DONE; |
| } |
| break; |
| |
| case HANDLE_ERROR: |
| { |
| if (handleError(rc)) |
| return rc; |
| step_ = CLOSE_AND_DONE; |
| } |
| break; |
| |
| case DONE: |
| case CLOSE_AND_DONE: |
| { |
| if (step_ == CLOSE_AND_DONE) |
| ehi_->close(); |
| if (handleDone(rc, matches_)) |
| return rc; |
| |
| step_ = NOT_STARTED; |
| } |
| break; |
| |
| } // switch |
| |
| } // while |
| |
| return WORK_OK; |
| } |
| |
| ExHbaseAccessInsertSQTcb::ExHbaseAccessInsertSQTcb( |
| const ExHbaseAccessTdb &hbaseAccessTdb, |
| ex_globals * glob ) : |
| ExHbaseAccessInsertTcb( hbaseAccessTdb, glob) |
| { |
| } |
| |
| ExWorkProcRetcode ExHbaseAccessInsertSQTcb::work() |
| { |
| Lng32 retcode = 0; |
| short rc = 0; |
| |
| while (!qparent_.down->isEmpty()) |
| { |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| if (pentry_down->downState.request == ex_queue::GET_NOMORE) |
| step_ = CLOSE_AND_DONE; |
| |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| matches_ = 0; |
| asyncCompleteRetryCount_ = 0; |
| asyncOperationTimeout_ = 1; |
| asyncOperation_ = hbaseAccessTdb().asyncOperations() && getTransactionIDFromContext(); |
| step_ = INSERT_INIT; |
| } |
| break; |
| |
| case INSERT_INIT: |
| { |
| retcode = ehi_->init(getHbaseAccessStats()); |
| if (setupError(retcode, "ExpHbaseInterface::init")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| table_.val = hbaseAccessTdb().getTableName(); |
| table_.len = strlen(hbaseAccessTdb().getTableName()); |
| |
| step_ = SETUP_INSERT; |
| } |
| break; |
| |
| case SETUP_INSERT: |
| { |
| rc = evalInsDelPreCondExpr(); |
| if (rc == -1) |
| step_ = HANDLE_ERROR; |
| else if (rc == 0) |
| step_ = INSERT_CLOSE; |
| else // expr is true or does not exist |
| step_ = EVAL_INSERT_EXPR; |
| } |
| break; |
| |
| case EVAL_INSERT_EXPR: |
| { |
| workAtp_->getTupp(hbaseAccessTdb().convertTuppIndex_) |
| .setDataPointer(convertRow_); |
| |
| if (convertExpr()) |
| { |
| insertRowlen_ = hbaseAccessTdb().convertRowLen_; |
| ex_expr::exp_return_type evalRetCode = |
| convertExpr()->eval(pentry_down->getAtp(), workAtp_, |
| NULL, -1, &insertRowlen_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| genAndAssignSyskey(hbaseAccessTdb().convertTuppIndex_, convertRow_); |
| |
| step_ = EVAL_CONSTRAINT; |
| } |
| break; |
| |
| case EVAL_CONSTRAINT: |
| { |
| rc = evalConstraintExpr(insConstraintExpr()); |
| if (rc == 1) |
| step_ = CREATE_MUTATIONS; |
| else if (rc == 0) |
| step_ = INSERT_CLOSE; |
| else |
| step_ = HANDLE_ERROR; |
| } |
| break; |
| |
| case CREATE_MUTATIONS: |
| { |
| retcode = createDirectRowBuffer( hbaseAccessTdb().convertTuppIndex_, |
| convertRow_, |
| hbaseAccessTdb().listOfUpdatedColNames(), |
| hbaseAccessTdb().listOfOmittedColNames(), |
| (hbaseAccessTdb().hbaseSqlIUD() ? FALSE : TRUE)); |
| |
| if (retcode == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| insColTSval_ = -1; |
| |
| step_ = EVAL_ROWID_EXPR; |
| } |
| break; |
| |
| case EVAL_ROWID_EXPR: |
| { |
| NABoolean isVC = hbaseAccessTdb().keyInVCformat(); |
| if (evalRowIdExpr(isVC) == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| insRowId_.assign(rowId_.val, rowId_.len); |
| |
| if (hbaseAccessTdb().hbaseSqlIUD()) |
| step_ = CHECK_AND_INSERT; |
| else |
| step_ = PROCESS_INSERT; |
| } |
| break; |
| |
| case CHECK_AND_INSERT: |
| { |
| HbaseStr rowID; |
| rowID.val = (char *)insRowId_.data(); |
| rowID.len = insRowId_.size(); |
| retcode = ehi_->checkAndInsertRow(table_, |
| rowID, |
| row_, |
| hbaseAccessTdb().useHbaseXn(), |
| hbaseAccessTdb().useRegionXn(), |
| insColTSval_, |
| asyncOperation_, |
| hbaseAccessTdb().getColIndexOfPK1()); |
| |
| if (retcode == HBASE_DUP_ROW_ERROR) // row exists, return error |
| { |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(8102)); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (setupError(retcode, "ExpHbaseInterface::checkAndInsertRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (getHbaseAccessStats()) |
| getHbaseAccessStats()->incUsedRows(); |
| |
| if (hbaseAccessTdb().returnRow()) { |
| step_ = RETURN_ROW; |
| break; |
| } |
| matches_++; |
| if (asyncOperation_) { |
| step_ = COMPLETE_ASYNC_INSERT; |
| return WORK_CALL_AGAIN; |
| } |
| else { |
| step_ = INSERT_CLOSE; |
| } |
| } |
| break; |
| case COMPLETE_ASYNC_INSERT: |
| { |
| if (resultArray_ == NULL) |
| resultArray_ = new (getHeap()) NABoolean[1]; |
| Int32 timeout; |
| if (asyncCompleteRetryCount_ < 10) |
| timeout = -1; |
| else { |
| asyncOperationTimeout_ = asyncOperationTimeout_ * 2; |
| timeout = asyncOperationTimeout_; |
| } |
| retcode = ehi_->completeAsyncOperation(timeout, resultArray_, 1); |
| if (retcode == HBASE_RETRY_AGAIN) { |
| asyncCompleteRetryCount_++; |
| return WORK_CALL_AGAIN; |
| } |
| asyncCompleteRetryCount_ = 0; |
| if (setupError(retcode, "ExpHbaseInterface::completeAsyncOperation")) { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (resultArray_[0] == FALSE) { |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(8102)); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| step_ = INSERT_CLOSE; |
| } |
| break; |
| case PROCESS_INSERT: |
| { |
| HbaseStr rowID; |
| rowID.val = (char *)insRowId_.data(); |
| rowID.len = insRowId_.size(); |
| retcode = ehi_->insertRow(table_, |
| rowID, |
| row_, |
| hbaseAccessTdb().useHbaseXn(), |
| hbaseAccessTdb().useRegionXn(), |
| insColTSval_, |
| asyncOperation_); |
| |
| if (setupError(retcode, "ExpHbaseInterface::insertRow")) { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (getHbaseAccessStats()) |
| getHbaseAccessStats()->incUsedRows(); |
| if (hbaseAccessTdb().returnRow()) { |
| step_ = RETURN_ROW; |
| break; |
| } |
| matches_++; |
| if (asyncOperation_) { |
| step_ = COMPLETE_ASYNC_INSERT; |
| return WORK_CALL_AGAIN; |
| } |
| else { |
| step_ = INSERT_CLOSE; |
| } |
| } |
| break; |
| |
| case RETURN_ROW: |
| { |
| if (qparent_.up->isFull()) |
| return WORK_OK; |
| |
| if (returnUpdateExpr()) |
| { |
| ex_queue_entry * up_entry = qparent_.up->getTailEntry(); |
| |
| // allocate tupps where returned rows will be created |
| if (allocateUpEntryTupps( |
| -1, |
| 0, |
| hbaseAccessTdb().returnedTuppIndex_, |
| hbaseAccessTdb().returnUpdatedRowLen_, |
| FALSE, |
| &rc)) |
| return 1; |
| |
| ex_expr::exp_return_type exprRetCode = |
| returnUpdateExpr()->eval(up_entry->getAtp(), workAtp_); |
| if (exprRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| rc = 0; |
| // moveRowToUpQueue also increments matches_ |
| if (moveRowToUpQueue(&rc)) |
| return 1; |
| } |
| else |
| { |
| rc = 0; |
| // moveRowToUpQueue also increments matches_ |
| if (moveRowToUpQueue(convertRow_, hbaseAccessTdb().convertRowLen(), |
| &rc, FALSE)) |
| return 1; |
| } |
| if (asyncOperation_) { |
| step_ = COMPLETE_ASYNC_INSERT; |
| return WORK_CALL_AGAIN; |
| } |
| else |
| step_ = INSERT_CLOSE; |
| } |
| break; |
| |
| case INSERT_CLOSE: |
| { |
| retcode = ehi_->close(); |
| if (setupError(retcode, "ExpHbaseInterface::close")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = DONE; |
| } |
| break; |
| |
| case HANDLE_ERROR: |
| { |
| if (handleError(rc)) |
| return rc; |
| step_ = CLOSE_AND_DONE; |
| } |
| break; |
| |
| case DONE: |
| case CLOSE_AND_DONE: |
| { |
| if (step_ == CLOSE_AND_DONE) |
| ehi_->close(); |
| if (NOT hbaseAccessTdb().computeRowsAffected()) |
| matches_ = 0; |
| |
| if (handleDone(rc, matches_)) |
| return rc; |
| |
| step_ = NOT_STARTED; |
| } |
| break; |
| |
| } // switch |
| |
| } // while |
| |
| return WORK_OK; |
| } |
| |
| ExHbaseAccessUpsertVsbbSQTcb::ExHbaseAccessUpsertVsbbSQTcb( |
| const ExHbaseAccessTdb &hbaseAccessTdb, |
| ex_globals * glob ) : |
| ExHbaseAccessInsertTcb( hbaseAccessTdb, glob) |
| { |
| prevTailIndex_ = 0; |
| |
| nextRequest_ = qparent_.down->getHeadIndex(); |
| |
| numRetries_ = 0; |
| rowsInserted_ = 0; |
| lastHandledStep_ = NOT_STARTED; |
| numRowsInVsbbBuffer_ = 0; |
| } |
| |
| ExWorkProcRetcode ExHbaseAccessUpsertVsbbSQTcb::work() |
| { |
| Lng32 retcode = 0; |
| short rc = 0; |
| |
| ExMasterStmtGlobals *g = getGlobals()-> |
| castToExExeStmtGlobals()->castToExMasterStmtGlobals(); |
| |
| |
| while (!qparent_.down->isEmpty()) |
| { |
| nextRequest_ = qparent_.down->getHeadIndex(); |
| |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| if ((step_ == HANDLE_ERROR) || |
| (step_ == CLOSE_AND_DONE)) |
| { |
| // move down to error/close case. |
| } |
| else if (pentry_down->downState.request == ex_queue::GET_NOMORE) |
| step_ = CLOSE_AND_DONE; |
| else if (pentry_down->downState.request == ex_queue::GET_EOD) |
| { |
| if (currRowNum_ > rowsInserted_) |
| { |
| step_ = PROCESS_INSERT_AND_CLOSE; |
| } |
| else |
| { |
| if (lastHandledStep_ == ALL_DONE) |
| matches_=0; |
| step_ = ALL_DONE; |
| } |
| } |
| |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| matches_ = 0; |
| currRowNum_ = 0; |
| numRetries_ = 0; |
| |
| prevTailIndex_ = 0; |
| lastHandledStep_ = NOT_STARTED; |
| |
| nextRequest_ = qparent_.down->getHeadIndex(); |
| |
| rowsInserted_ = 0; |
| asyncCompleteRetryCount_ = 0; |
| asyncOperationTimeout_ = 1; |
| asyncOperation_ = hbaseAccessTdb().asyncOperations() && getTransactionIDFromContext(); |
| numRowsInVsbbBuffer_ = 0; |
| step_ = INSERT_INIT; |
| } |
| break; |
| |
| case INSERT_INIT: |
| { |
| retcode = ehi_->init(getHbaseAccessStats()); |
| if (setupError(retcode, "ExpHbaseInterface::init")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| table_.val = hbaseAccessTdb().getTableName(); |
| table_.len = strlen(hbaseAccessTdb().getTableName()); |
| |
| ExpTupleDesc * rowTD = |
| hbaseAccessTdb().workCriDesc_->getTupleDescriptor |
| (hbaseAccessTdb().convertTuppIndex_); |
| allocateDirectRowBufferForJNI(rowTD->numAttrs(), hbaseAccessTdb().getHbaseRowsetVsbbSize()); |
| allocateDirectRowIDBufferForJNI(hbaseAccessTdb().getHbaseRowsetVsbbSize()); |
| if (hbaseAccessTdb().getCanAdjustTrafParams()) |
| { |
| if (hbaseAccessTdb().getWBSize() > 0) |
| { |
| retcode = ehi_->setWriteBufferSize(table_, |
| hbaseAccessTdb().getWBSize()); |
| if (setupError(retcode, "ExpHbaseInterface::setWriteBufferSize")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| retcode = ehi_->setWriteToWAL(table_, |
| hbaseAccessTdb().getTrafWriteToWAL()); |
| if (setupError(retcode, "ExpHbaseInterface::setWriteToWAL")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| step_ = SETUP_INSERT; |
| } |
| break; |
| |
| case SETUP_INSERT: |
| { |
| step_ = EVAL_INSERT_EXPR; |
| } |
| break; |
| |
| case EVAL_INSERT_EXPR: |
| { |
| workAtp_->getTupp(hbaseAccessTdb().convertTuppIndex_) |
| .setDataPointer(convertRow_); |
| |
| if (convertExpr()) |
| { |
| insertRowlen_ = hbaseAccessTdb().convertRowLen_; |
| ex_expr::exp_return_type evalRetCode = |
| convertExpr()->eval(pentry_down->getAtp(), workAtp_, |
| NULL, -1, &insertRowlen_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| genAndAssignSyskey(hbaseAccessTdb().convertTuppIndex_, convertRow_); |
| step_ = EVAL_CONSTRAINT; |
| } |
| break; |
| |
| case EVAL_CONSTRAINT: |
| { |
| rc = evalConstraintExpr(insConstraintExpr()); |
| if (rc == 1) |
| step_ = CREATE_MUTATIONS; |
| else if (rc == 0) |
| step_ = INSERT_CLOSE; |
| else |
| step_ = HANDLE_ERROR; |
| } |
| break; |
| |
| case CREATE_MUTATIONS: |
| { |
| retcode = createDirectRowBuffer( |
| hbaseAccessTdb().convertTuppIndex_, |
| convertRow_, |
| hbaseAccessTdb().listOfUpdatedColNames(), |
| hbaseAccessTdb().listOfOmittedColNames(), |
| TRUE); |
| if (retcode == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| insColTSval_ = -1; |
| step_ = EVAL_ROWID_EXPR; |
| } |
| break; |
| |
| case EVAL_ROWID_EXPR: |
| { |
| NABoolean isVC = hbaseAccessTdb().keyInVCformat(); |
| if (evalRowIdExpr(isVC) == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| copyRowIDToDirectBuffer(rowId_); |
| |
| currRowNum_++; |
| |
| |
| if (!hbaseAccessTdb().returnRow()) |
| matches_++; |
| // if we are returning a row moveRowToUpQueue will increment matches_ |
| else |
| { |
| step_ = RETURN_ROW; |
| break; |
| } |
| |
| if (currRowNum_ < hbaseAccessTdb().getHbaseRowsetVsbbSize()) |
| { |
| step_ = DONE; |
| break; |
| } |
| |
| step_ = PROCESS_INSERT_AND_CLOSE; |
| } |
| break; |
| |
| case RETURN_ROW: |
| { |
| if (qparent_.up->isFull()) |
| return WORK_OK; |
| |
| if (returnUpdateExpr()) |
| { |
| ex_queue_entry * up_entry = qparent_.up->getTailEntry(); |
| |
| // allocate tupps where returned rows will be created |
| if (allocateUpEntryTupps( |
| -1, |
| 0, |
| hbaseAccessTdb().returnedTuppIndex_, |
| hbaseAccessTdb().returnUpdatedRowLen_, |
| FALSE, |
| &rc)) |
| return rc; |
| |
| ex_expr::exp_return_type exprRetCode = |
| returnUpdateExpr()->eval(up_entry->getAtp(), workAtp_); |
| if (exprRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| rc = 0; |
| // moveRowToUpQueue also increments matches_ |
| if (moveRowToUpQueue(&rc)) |
| return rc; |
| } |
| else |
| { |
| rc = 0; |
| // moveRowToUpQueue also increments matches_ |
| if (moveRowToUpQueue(convertRow_, hbaseAccessTdb().convertRowLen(), |
| &rc, FALSE)) |
| return rc; |
| } |
| |
| if (currRowNum_ < hbaseAccessTdb().getHbaseRowsetVsbbSize()) |
| step_ = DONE; |
| else |
| step_ = PROCESS_INSERT_AND_CLOSE; |
| |
| break; |
| |
| |
| |
| } |
| break; |
| case PROCESS_INSERT_AND_CLOSE: |
| { |
| numRowsInVsbbBuffer_ = patchDirectRowBuffers(); |
| |
| retcode = ehi_->insertRows(table_, |
| hbaseAccessTdb().getRowIDLen(), |
| rowIDs_, |
| rows_, |
| hbaseAccessTdb().useHbaseXn(), |
| insColTSval_, |
| asyncOperation_); |
| |
| if (setupError(retcode, "ExpHbaseInterface::insertRows")) { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (getHbaseAccessStats()) { |
| getHbaseAccessStats()->lobStats()->numReadReqs++; |
| getHbaseAccessStats()->incUsedRows(numRowsInVsbbBuffer_); |
| } |
| rowsInserted_ += numRowsInVsbbBuffer_; |
| if (asyncOperation_) { |
| lastHandledStep_ = step_; |
| step_ = COMPLETE_ASYNC_INSERT; |
| } |
| else |
| step_ = INSERT_CLOSE; |
| } |
| break; |
| case COMPLETE_ASYNC_INSERT: |
| { |
| if (resultArray_ == NULL) |
| resultArray_ = new (getHeap()) NABoolean[hbaseAccessTdb().getHbaseRowsetVsbbSize()]; |
| Int32 timeout; |
| if (asyncCompleteRetryCount_ < 10) |
| timeout = -1; |
| else { |
| asyncOperationTimeout_ = asyncOperationTimeout_ * 2; |
| timeout = asyncOperationTimeout_; |
| } |
| retcode = ehi_->completeAsyncOperation(timeout, resultArray_, numRowsInVsbbBuffer_); |
| if (retcode == HBASE_RETRY_AGAIN) { |
| asyncCompleteRetryCount_++; |
| return WORK_CALL_AGAIN; |
| } |
| asyncCompleteRetryCount_ = 0; |
| if (setupError(retcode, "ExpHbaseInterface::completeAsyncOperation")) { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| for (int i = 0 ; i < numRowsInVsbbBuffer_; i++) { |
| if (resultArray_[i] == FALSE) { |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(8102)); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| if (step_ == HANDLE_ERROR) |
| break; |
| if (lastHandledStep_ == PROCESS_INSERT_AND_CLOSE) |
| step_ = INSERT_CLOSE; |
| else |
| step_ = ALL_DONE; |
| } |
| break; |
| case INSERT_CLOSE: |
| { |
| retcode = ehi_->close(); |
| if (setupError(retcode, "ExpHbaseInterface::close")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = ALL_DONE; |
| } |
| break; |
| |
| case HANDLE_ERROR: |
| { |
| if (handleError(rc)) |
| return rc; |
| step_ = CLOSE_AND_DONE; |
| } |
| break; |
| |
| case DONE: |
| case CLOSE_AND_DONE: |
| case ALL_DONE: |
| { |
| if (step_ == CLOSE_AND_DONE) |
| ehi_->close(); |
| if (NOT hbaseAccessTdb().computeRowsAffected()) |
| matches_ = 0; |
| |
| if ((step_ == DONE) && |
| (qparent_.down->getLength() == 1)) |
| { |
| |
| // only one row in the down queue. |
| |
| // Before we send input buffer to hbase, give parent |
| // another chance in case there is more input data. |
| // If parent doesn't input any more data on second (or |
| // later) chances, then process the request. |
| if (numRetries_ == 3) |
| { |
| numRetries_ = 0; |
| |
| // Insert the current batch and then done. |
| step_ = PROCESS_INSERT_AND_CLOSE; |
| break; |
| } |
| |
| numRetries_++; |
| return WORK_CALL_AGAIN; |
| |
| break; |
| } |
| |
| if (handleDone(rc, (step_ == ALL_DONE ? matches_ : 0))) |
| return rc; |
| lastHandledStep_ = step_; |
| |
| if (step_ == DONE) |
| step_ = SETUP_INSERT; |
| else |
| step_ = NOT_STARTED; |
| } |
| break; |
| |
| } // switch |
| |
| } // while |
| |
| return WORK_OK; |
| } |
| |
| ExHbaseAccessBulkLoadPrepSQTcb::ExHbaseAccessBulkLoadPrepSQTcb( |
| const ExHbaseAccessTdb &hbaseAccessTdb, |
| ex_globals * glob ) : |
| ExHbaseAccessUpsertVsbbSQTcb( hbaseAccessTdb, glob), |
| prevRowId_ (NULL), |
| lastErrorCnd_(NULL), |
| sampleFileHdfsClient_(NULL) |
| { |
| hFileParamsInitialized_ = false; |
| //sortedListOfColNames_ = NULL; |
| posVec_.clear(); |
| |
| Lng32 fileNum = getGlobals()->castToExExeStmtGlobals()->getMyInstanceNumber(); |
| |
| |
| ExHbaseAccessTcb::buildLoggingFileName((NAHeap *)getHeap(), ((ExHbaseAccessTdb &)hbaseAccessTdb).getLoggingLocation(), |
| // (char *)((ExHbaseAccessTdb &)hbaseAccessTdb).getErrCountRowId(), |
| ((ExHbaseAccessTdb &)hbaseAccessTdb).getTableName(), |
| "traf_upsert_err", |
| fileNum, |
| loggingFileName_); |
| loggingRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.updateRowLen_]; |
| } |
| |
| ExHbaseAccessBulkLoadPrepSQTcb::~ExHbaseAccessBulkLoadPrepSQTcb() |
| { |
| if (sampleFileHdfsClient_ != NULL) |
| NADELETE(sampleFileHdfsClient_, HdfsClient, getHeap()); |
| } |
| |
| // Given the type information available via the argument, return the name of |
| // the Hive type we use to represent it in the Hive sample table created by |
| // the bulk load utility. |
| static const char* TrafToHiveType(Attributes* attrs) |
| { |
| Int64 maxValue = 0; |
| Int16 precision = 0; |
| Int16 scale = 0; |
| Int16 datatype = attrs->getDatatype(); |
| |
| if (DFS2REC::isInterval(datatype)) |
| { |
| precision = dynamic_cast<SimpleType*>(attrs)->getPrecision(); |
| scale = dynamic_cast<SimpleType*>(attrs)->getScale(); |
| } |
| |
| switch (datatype) |
| { |
| case REC_BIN8_SIGNED: |
| case REC_BIN8_UNSIGNED: |
| return "tinyint"; |
| |
| case REC_BIN16_SIGNED: |
| case REC_BIN16_UNSIGNED: |
| case REC_BPINT_UNSIGNED: |
| return "smallint"; |
| |
| case REC_BIN32_SIGNED: |
| case REC_BIN32_UNSIGNED: |
| return "int"; |
| |
| case REC_BIN64_SIGNED: |
| return "bigint"; |
| |
| case REC_IEEE_FLOAT32: |
| return "float"; |
| |
| case REC_IEEE_FLOAT64: |
| return "double"; |
| |
| case REC_DECIMAL_UNSIGNED: |
| case REC_DECIMAL_LS: |
| case REC_DECIMAL_LSE: |
| maxValue = (Int64)pow(10, dynamic_cast<SimpleType*>(attrs)->getPrecision()); |
| break; |
| |
| //case REC_NUM_BIG_UNSIGNED: return extFormat? (char *)"NUMERIC":(char *)"REC_NUM_BIG_UNSIGNED"; |
| //case REC_NUM_BIG_SIGNED: return extFormat? (char *)"NUMERIC":(char *)"REC_NUM_BIG_SIGNED"; |
| |
| case REC_BYTE_F_ASCII: |
| case REC_NCHAR_F_UNICODE: |
| case REC_BYTE_V_ASCII: |
| case REC_NCHAR_V_UNICODE: |
| case REC_BYTE_V_ASCII_LONG: |
| case REC_BYTE_V_ANSI: |
| case REC_BYTE_V_ANSI_DOUBLE: |
| case REC_SBYTE_LOCALE_F: |
| case REC_MBYTE_LOCALE_F: |
| case REC_MBYTE_F_SJIS: |
| case REC_MBYTE_V_SJIS: |
| return "string"; |
| |
| case REC_DATETIME: |
| return "timestamp"; |
| |
| case REC_INT_YEAR: |
| case REC_INT_MONTH: |
| case REC_INT_DAY: |
| case REC_INT_HOUR: |
| case REC_INT_MINUTE: |
| maxValue = (Int64)pow(10, precision); |
| break; |
| |
| case REC_INT_SECOND: |
| maxValue = (Int64)pow(10, precision + scale); |
| break; |
| |
| case REC_INT_YEAR_MONTH: |
| maxValue = 12 * (Int64)pow(10, precision); |
| break; |
| |
| case REC_INT_DAY_HOUR: |
| maxValue = 24 * (Int64)pow(10, precision); |
| break; |
| |
| case REC_INT_HOUR_MINUTE: |
| maxValue = 60 * (Int64)pow(10, precision); |
| break; |
| |
| case REC_INT_DAY_MINUTE: |
| maxValue = 24 * 60 * (Int64)pow(10, precision); |
| break; |
| |
| case REC_INT_MINUTE_SECOND: |
| maxValue = (Int64)pow(10, precision + 2 + scale); |
| break; |
| |
| case REC_INT_HOUR_SECOND: |
| maxValue = (Int64)pow(10, precision + 4 + scale); |
| break; |
| |
| case REC_INT_DAY_SECOND: |
| maxValue = (Int64)pow(10, precision + 5 + scale); |
| break; |
| |
| default: |
| |
| break; |
| } // switch |
| |
| //assert(maxValue > 0); |
| if (maxValue < SHRT_MAX) |
| return "smallint"; |
| else if (maxValue <= INT_MAX) |
| return "int"; |
| else |
| return "bigint"; |
| } |
| |
| // Return in ddlText the Hive statement to create the Hive external table that |
| // that will hold a sample for the Trafodion table being loaded. The files |
| // containing the sample data are written independently to HDFS and linked to |
| // the Hive table by the location clause in the generated Hive DDL. |
| void ExHbaseAccessBulkLoadPrepSQTcb::getHiveCreateTableDDL(NAString& hiveSampleTblNm, NAString& ddlText) |
| { |
| ExHbaseAccessTdb& hbaTdb = ((ExHbaseAccessTdb&)hbaseAccessTdb()); |
| ddlText = "create external table "; |
| ddlText.append(hiveSampleTblNm).append("("); |
| |
| ExpTupleDesc* td = hbaTdb.workCriDesc_->getTupleDescriptor(hbaTdb.convertTuppIndex_); |
| hbaTdb.listOfUpdatedColNames()->position(); |
| Attributes* attrs; |
| char colNumBuf[12]; |
| for (UInt32 i = 0; i < td->numAttrs(); i++) |
| { |
| attrs = td->getAttr(i); |
| sprintf(colNumBuf, "%d", *(UInt32*) hbaTdb.listOfUpdatedColNames()->getCurr()); |
| ddlText.append("col").append(colNumBuf).append(" "); |
| ddlText.append(TrafToHiveType(attrs)); |
| if (i < td->numAttrs() - 1) |
| ddlText.append(", "); |
| else |
| ddlText.append(") row format delimited fields terminated by '|' location '") |
| .append(hbaTdb.getSampleLocation()) |
| .append((const char*)hbaTdb.getTableName()) |
| .append("/'"); |
| hbaTdb.listOfUpdatedColNames()->advance(); |
| } |
| } |
| |
| ExWorkProcRetcode ExHbaseAccessBulkLoadPrepSQTcb::work() |
| { |
| Lng32 retcode = 0; |
| short rc = 0; |
| Int64 exceptionCount; |
| Lng32 errorRowCount; |
| |
| ExMasterStmtGlobals *g = getGlobals()-> |
| castToExExeStmtGlobals()->castToExMasterStmtGlobals(); |
| |
| NABoolean eodSeen = false; |
| |
| // Get the percentage of rows to include in the ustat sample table. A value of |
| // 0 indicates that no sample table is to be created. |
| static NABoolean displayed = FALSE; |
| double samplingRate = ((ExHbaseAccessTdb&)hbaseAccessTdb()).getSamplingRate(); |
| |
| while (!qparent_.down->isEmpty()) |
| { |
| nextRequest_ = qparent_.down->getHeadIndex(); |
| |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| if (pentry_down->downState.request == ex_queue::GET_NOMORE) |
| step_ = ALL_DONE; |
| else if (pentry_down->downState.request == ex_queue::GET_EOD && |
| step_ != HANDLE_ERROR && lastHandledStep_ != HANDLE_ERROR) { |
| eodSeen = true; |
| if (currRowNum_ > rowsInserted_) |
| step_ = PROCESS_INSERT; |
| else |
| { |
| if (lastHandledStep_ == ALL_DONE) |
| matches_ = 0; |
| step_ = ALL_DONE; |
| } |
| } |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| matches_ = 0; |
| currRowNum_ = 0; |
| numRetries_ = 0; |
| prevTailIndex_ = 0; |
| lastHandledStep_ = NOT_STARTED; |
| |
| nextRequest_ = qparent_.down->getHeadIndex(); |
| rowsInserted_ = 0; |
| step_ = INSERT_INIT; |
| } |
| break; |
| |
| case INSERT_INIT: |
| { |
| retcode = ehi_->initHBLC(getHbaseAccessStats()); |
| |
| if (setupError(retcode, "ExpHbaseInterface::initHBLC")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| table_.val = hbaseAccessTdb().getTableName(); |
| table_.len = strlen(hbaseAccessTdb().getTableName()); |
| short numCols = 0; |
| |
| if (!hFileParamsInitialized_) |
| { |
| importLocation_= std::string(((ExHbaseAccessTdb&)hbaseAccessTdb()).getLoadPrepLocation()) + |
| ((ExHbaseAccessTdb&)hbaseAccessTdb()).getTableName() ; |
| familyLocation_ = std::string(importLocation_ + "/#1"); |
| Lng32 fileNum = getGlobals()->castToExExeStmtGlobals()->getMyInstanceNumber(); |
| hFileName_ = std::string("hfile"); |
| char hFileName[50]; |
| snprintf(hFileName, 50, "hfile%d", fileNum); |
| hFileName_ = hFileName; |
| |
| NAString hiveDDL; |
| NAString hiveSampleTblNm; |
| if (samplingRate > 0 && fileNum == 0) // master exec creates hive sample table |
| { |
| hiveSampleTblNm = ((ExHbaseAccessTdb&)hbaseAccessTdb()).getTableName(); |
| TrafToHiveSampleTableName(hiveSampleTblNm); |
| getHiveCreateTableDDL(hiveSampleTblNm, hiveDDL); |
| } |
| retcode = ehi_->initHFileParams(table_, familyLocation_, hFileName_, |
| hbaseAccessTdb().getMaxHFileSize(), |
| hiveSampleTblNm.data(), hiveDDL.data()); |
| hFileParamsInitialized_ = true; |
| |
| if (samplingRate > 0) |
| { |
| // Seed random number generator (used to select rows to write to sample table). |
| srand(time(0)); |
| |
| // Set up HDFS file for sample table. |
| |
| ContextCli *currContext = getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext(); |
| Text samplePath = std::string(((ExHbaseAccessTdb&)hbaseAccessTdb()).getSampleLocation()) + |
| ((ExHbaseAccessTdb&)hbaseAccessTdb()).getTableName() ; |
| char filePart[10]; |
| sprintf(filePart, "/%d", fileNum); |
| HDFS_Client_RetCode hdfsClientRetcode; |
| samplePath.append(filePart); |
| if (sampleFileHdfsClient_ == NULL) |
| sampleFileHdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), NULL, hdfsClientRetcode); |
| if (hdfsClientRetcode == HDFS_CLIENT_OK) { |
| hdfsClientRetcode = sampleFileHdfsClient_->hdfsOpen(samplePath.data(), FALSE); |
| if (hdfsClientRetcode != HDFS_CLIENT_OK) { |
| NADELETE(sampleFileHdfsClient_, HdfsClient, getHeap()); |
| sampleFileHdfsClient_ = NULL; |
| } |
| } |
| if (hdfsClientRetcode != HDFS_CLIENT_OK) { |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(8110)); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| posVec_.clear(); |
| hbaseAccessTdb().listOfUpdatedColNames()->position(); |
| while (NOT hbaseAccessTdb().listOfUpdatedColNames()->atEnd()) |
| { |
| UInt32 pos = *(UInt32*) hbaseAccessTdb().listOfUpdatedColNames()->getCurr(); |
| posVec_.push_back(pos); |
| hbaseAccessTdb().listOfUpdatedColNames()->advance(); |
| numCols++; |
| } |
| } |
| if (setupError(retcode, "ExpHbaseInterface::createHFile")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| allocateDirectRowBufferForJNI( |
| numCols, |
| hbaseAccessTdb().getTrafLoadFlushSize()); |
| allocateDirectRowIDBufferForJNI(hbaseAccessTdb().getTrafLoadFlushSize()); |
| step_ = SETUP_INSERT; |
| } |
| break; |
| |
| case SETUP_INSERT: |
| { |
| step_ = EVAL_INSERT_EXPR; |
| } |
| break; |
| |
| case EVAL_INSERT_EXPR: |
| { |
| workAtp_->getTupp(hbaseAccessTdb().convertTuppIndex_) |
| .setDataPointer(convertRow_); |
| lastErrorCnd_ = NULL; |
| if (convertExpr()) |
| { |
| insertRowlen_ = hbaseAccessTdb().convertRowLen_; |
| ex_expr::exp_return_type evalRetCode = |
| convertExpr()->eval(pentry_down->getAtp(), workAtp_, |
| NULL, -1, &insertRowlen_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) { |
| if (hbaseAccessTdb().getContinueOnError()) { |
| if (pentry_down->getDiagsArea()) { |
| Lng32 errorCount = pentry_down->getDiagsArea()->getNumber(DgSqlCode::ERROR_); |
| lastErrorCnd_ = pentry_down->getDiagsArea()->getErrorEntry(errorCount); |
| } |
| step_= HANDLE_EXCEPTION; |
| break; |
| } |
| else |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| } |
| |
| genAndAssignSyskey(hbaseAccessTdb().convertTuppIndex_, convertRow_); |
| |
| step_ = EVAL_ROWID_EXPR; |
| } |
| |
| break; |
| |
| case EVAL_ROWID_EXPR: |
| { |
| NABoolean isVC = hbaseAccessTdb().keyInVCformat(); |
| if (evalRowIdExpr(isVC) == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| lastErrorCnd_ = NULL; |
| // duplicates (same rowid) are not allowed in Hfiles. adding duplicates causes Hfiles to generate |
| // errors |
| if (prevRowId_ == NULL) |
| { |
| prevRowId_ = new char[rowId_.len + 1]; |
| memmove(prevRowId_, rowId_.val, rowId_.len); |
| } |
| else |
| { |
| // rows are supposed to sorted by rowId and to detect duplicates |
| // compare the current rowId to the previous one |
| if (memcmp(prevRowId_, rowId_.val, rowId_.len) == 0) |
| { |
| if (((ExHbaseAccessTdb&) hbaseAccessTdb()).getNoDuplicates() || |
| ((NOT ((ExHbaseAccessTdb&) hbaseAccessTdb()).getNoDuplicates()) && |
| hbaseAccessTdb().getContinueOnError())) { |
| //8110 Duplicate rows detected. |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(8110)); |
| pentry_down->setDiagsArea(diagsArea); |
| if (hbaseAccessTdb().getContinueOnError()) { |
| if (pentry_down->getDiagsArea()) { |
| Lng32 errorCount = pentry_down->getDiagsArea()->getNumber(DgSqlCode::ERROR_); |
| lastErrorCnd_ = pentry_down->getDiagsArea()->getErrorEntry(errorCount); |
| } |
| step_= HANDLE_EXCEPTION; |
| break; |
| } |
| else { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| else |
| { |
| //skip duplicate |
| step_ = DONE; |
| break; |
| } |
| } |
| memmove(prevRowId_, rowId_.val, rowId_.len); |
| } |
| step_ = CREATE_MUTATIONS; |
| } |
| break; |
| |
| case CREATE_MUTATIONS: |
| { |
| retcode = createDirectRowBuffer( |
| hbaseAccessTdb().convertTuppIndex_, |
| convertRow_, |
| hbaseAccessTdb().listOfUpdatedColNames(), |
| hbaseAccessTdb().listOfOmittedColNames(), |
| FALSE, //TRUE, |
| &posVec_, |
| samplingRate); |
| if (retcode == -1) |
| { |
| //need to re-verify error handling |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| copyRowIDToDirectBuffer( rowId_); |
| currRowNum_++; |
| if (!hbaseAccessTdb().returnRow()) { |
| matches_++; // if we are returning a row moveRowToUpQueue |
| //will increment matches_ |
| } |
| else { |
| step_ = RETURN_ROW; |
| break ; |
| } |
| |
| if (currRowNum_ < hbaseAccessTdb().getTrafLoadFlushSize()) |
| { |
| step_ = DONE; |
| break; |
| } |
| step_ = PROCESS_INSERT; // currRowNum_ == rowset size && we are not returning a row |
| } |
| break; |
| |
| case PROCESS_INSERT: |
| { |
| numRowsInVsbbBuffer_ = patchDirectRowBuffers(); |
| retcode = ehi_->addToHFile(hbaseAccessTdb().getRowIDLen(), |
| rowIDs_, |
| rows_); |
| |
| if (setupError(retcode, "ExpHbaseInterface::addToHFile")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| rowsInserted_ += numRowsInVsbbBuffer_; |
| |
| if (getHbaseAccessStats()) |
| { |
| getHbaseAccessStats()->lobStats()->numReadReqs++; |
| getHbaseAccessStats()->incUsedRows(numRowsInVsbbBuffer_); |
| } |
| |
| step_ = ALL_DONE; |
| } |
| break; |
| |
| case HANDLE_ERROR: |
| { |
| if (handleError(rc)) |
| return rc; |
| |
| lastHandledStep_ =HANDLE_ERROR; |
| eodSeen = true; |
| matches_ = 0; |
| step_ = ALL_DONE; |
| } |
| break; |
| |
| case HANDLE_EXCEPTION: |
| { |
| exceptionCount = 0; |
| ExHbaseAccessTcb::incrErrorCount( ehi_,exceptionCount, hbaseAccessTdb().getErrCountTab(), |
| hbaseAccessTdb().getErrCountRowId()); |
| if (hbaseAccessTdb().getMaxErrorRows() > 0) |
| { |
| if (exceptionCount > hbaseAccessTdb().getMaxErrorRows()) |
| { |
| if (pentry_down->getDiagsArea()) |
| pentry_down->getDiagsArea()->clear(); |
| if (workAtp_->getDiagsArea()) |
| workAtp_->getDiagsArea()->clear(); |
| errorRowCount = (Lng32) exceptionCount; |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlWarning(getHeap(), &diagsArea, |
| (ExeErrorCode)(EXE_ERROR_ROWS_FOUND), NULL, &errorRowCount); |
| //8113 max number of error rows exceeded. |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(EXE_MAX_ERROR_ROWS_EXCEEDED)); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| if (hbaseAccessTdb().getLogErrorRows()) |
| { |
| workAtp_->getTupp(hbaseAccessTdb().updateTuppIndex_).setDataPointer(updateRow_); |
| |
| if (updateExpr()) |
| { |
| ex_expr::exp_return_type evalRetCode = |
| updateExpr()->eval(pentry_down->getAtp(), workAtp_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| int loggingRowLen = 0; |
| Lng32 errorMsgLen = 0; |
| createLoggingRow( hbaseAccessTdb().updateTuppIndex_, updateRow_, |
| loggingRow_ , loggingRowLen); |
| ExHbaseAccessTcb::handleException((NAHeap *)getHeap(), loggingRow_, loggingRowLen, |
| lastErrorCnd_); |
| } |
| if (pentry_down->getDiagsArea()) |
| pentry_down->getDiagsArea()->clear(); |
| if (workAtp_->getDiagsArea()) |
| workAtp_->getDiagsArea()->clear(); |
| |
| step_ = DONE; |
| } |
| break; |
| case RETURN_ROW: |
| { |
| if (qparent_.up->isFull()) |
| return WORK_OK; |
| |
| if (returnUpdateExpr()) |
| { |
| ex_queue_entry * up_entry = qparent_.up->getTailEntry(); |
| // allocate tupps where returned rows will be created |
| if (allocateUpEntryTupps( |
| -1, |
| 0, |
| hbaseAccessTdb().returnedTuppIndex_, |
| hbaseAccessTdb().returnUpdatedRowLen_, |
| FALSE, |
| &rc)) |
| return rc; |
| ex_expr::exp_return_type exprRetCode = |
| returnUpdateExpr()->eval(up_entry->getAtp(), workAtp_); |
| if (exprRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| rc = 0; |
| // moveRowToUpQueue also increments matches_ |
| if (moveRowToUpQueue(&rc)) |
| return rc; |
| } |
| else |
| { |
| rc = 0; |
| // moveRowToUpQueue also increments matches_ |
| if (moveRowToUpQueue(convertRow_, hbaseAccessTdb().convertRowLen(), |
| &rc, FALSE)) |
| return rc; |
| } |
| if (currRowNum_ < hbaseAccessTdb().getHbaseRowsetVsbbSize()) |
| step_ = DONE; |
| else |
| step_ = PROCESS_INSERT; |
| |
| break; |
| } |
| case DONE: |
| case ALL_DONE: |
| { |
| if (step_ == ALL_DONE && eodSeen) { |
| exceptionCount = 0; |
| ExHbaseAccessTcb::getErrorCount( ehi_,exceptionCount, hbaseAccessTdb().getErrCountTab(), |
| hbaseAccessTdb().getErrCountRowId()); |
| errorRowCount = (Lng32) exceptionCount; |
| if (errorRowCount != 0 || loggingErrorDiags_ != NULL) { |
| ex_queue_entry * down_entry = qparent_.down->getHeadEntry(); |
| ComDiagsArea * diagsArea = down_entry->getDiagsArea(); |
| if (!diagsArea) { |
| diagsArea = ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| down_entry->setDiagsArea(diagsArea); |
| } |
| if (loggingErrorDiags_ != NULL) { |
| diagsArea->mergeAfter(*loggingErrorDiags_); |
| loggingErrorDiags_->clear(); |
| } |
| if (errorRowCount > 0) |
| ExRaiseSqlWarning((NAMemory *)getHeap(), &diagsArea, |
| (ExeErrorCode)(EXE_ERROR_ROWS_FOUND), (ComCondition **)NULL, &errorRowCount); |
| } |
| } |
| |
| if (handleDone(rc, (step_ == ALL_DONE ? matches_ : 0))) |
| return rc; |
| lastHandledStep_ = step_; |
| |
| if (step_ == DONE) |
| step_ = SETUP_INSERT; |
| else |
| { |
| step_ = NOT_STARTED; |
| if (eodSeen) |
| { |
| ehi_->closeHFile(table_); |
| if (logFileHdfsClient_ != NULL) |
| logFileHdfsClient_->hdfsClose(); |
| if (sampleFileHdfsClient_ != NULL) |
| sampleFileHdfsClient_->hdfsClose(); |
| hFileParamsInitialized_ = false; |
| retcode = ehi_->close(); |
| } |
| } |
| } |
| break; |
| |
| } // switch |
| |
| } // while |
| |
| return WORK_OK; |
| } |
| |
| short ExHbaseAccessBulkLoadPrepSQTcb::createLoggingRow( UInt16 tuppIndex, char * tuppRow, char * targetRow, int &targetRowLen) |
| { |
| |
| ExpTupleDesc * rowTD = |
| hbaseAccessTdb().workCriDesc_->getTupleDescriptor |
| (tuppIndex); |
| |
| short colNameLen; |
| char * colName; |
| short nullVal = 0; |
| short nullValLen = 0; |
| short colValLen; |
| char *colVal; |
| |
| Attributes * attr; |
| short *numColsPtr; |
| |
| char * tmpTargetRow = targetRow; |
| for (Lng32 i = 0; i < rowTD->numAttrs(); i++) |
| { |
| Attributes * attr = rowTD->getAttr(i); |
| |
| if (attr) |
| { |
| colVal = &tuppRow[attr->getOffset()]; |
| nullVal = 0; |
| if (attr->getNullFlag() && |
| (*(short*)&tuppRow[attr->getNullIndOffset()])) |
| { |
| targetRow[0] = '|'; |
| targetRow++; |
| } |
| else |
| { |
| colValLen = attr->getLength(&tuppRow[attr->getVCLenIndOffset()]); |
| memcpy(targetRow,colVal, colValLen); |
| targetRow +=colValLen; |
| if (i != rowTD->numAttrs() -1) |
| targetRow[0] = '|'; |
| else |
| targetRow[0] = '\n'; |
| targetRow++; |
| } |
| } |
| else |
| { |
| ex_assert(false, "Unable to obtain column descriptor"); |
| } |
| |
| } // for |
| |
| targetRowLen= targetRow - tmpTargetRow; |
| |
| return 0; |
| } |
| |
| // UMD (unique UpdMergeDel on Trafodion tables) |
| ExHbaseUMDtrafUniqueTaskTcb::ExHbaseUMDtrafUniqueTaskTcb |
| (ExHbaseAccessUMDTcb * tcb) |
| : ExHbaseTaskTcb(tcb) |
| , step_(NOT_STARTED) |
| { |
| latestRowTimestamp_ = -1; |
| columnToCheck_.val = (char *)(new (tcb->getHeap()) BYTE[MAX_COLNAME_LEN]); |
| columnToCheck_.len = MAX_COLNAME_LEN; |
| colValToCheck_.val = (char *)(new (tcb->getHeap()) BYTE[tcb->hbaseAccessTdb().getRowIDLen()]); |
| colValToCheck_.len = tcb->hbaseAccessTdb().getRowIDLen(); |
| } |
| |
| void ExHbaseUMDtrafUniqueTaskTcb::init() |
| { |
| step_ = NOT_STARTED; |
| } |
| |
| ExWorkProcRetcode ExHbaseUMDtrafUniqueTaskTcb::work(short &rc) |
| { |
| Lng32 retcode = 0; |
| rc = 0; |
| |
| while (1) |
| { |
| ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry(); |
| |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| rowUpdated_ = FALSE; |
| latestRowTimestamp_ = -1; |
| |
| step_ = SETUP_UMD; |
| } |
| break; |
| |
| case SETUP_UMD: |
| { |
| tcb_->currRowidIdx_ = 0; |
| |
| step_ = GET_NEXT_ROWID; |
| } |
| break; |
| |
| case GET_NEXT_ROWID: |
| { |
| if (tcb_->currRowidIdx_ == tcb_->rowIds_.entries()) |
| { |
| step_ = GET_CLOSE; |
| break; |
| } |
| |
| if ((tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) && |
| (tcb_->hbaseAccessTdb().canDoCheckAndUpdel())) |
| { |
| if (tcb_->hbaseAccessTdb().hbaseSqlIUD()) |
| step_ = CHECK_AND_DELETE_ROW; |
| else |
| step_ = DELETE_ROW; |
| break; |
| } |
| else if ((tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::UPDATE_) && |
| (tcb_->hbaseAccessTdb().canDoCheckAndUpdel())) |
| { |
| step_ = CREATE_UPDATED_ROW; |
| break; |
| } |
| |
| retcode = tcb_->ehi_->getRowOpen( tcb_->table_, |
| tcb_->rowIds_[tcb_->currRowidIdx_], |
| tcb_->columns_, -1); |
| if ( tcb_->setupError(retcode, "ExpHbaseInterface::getRowOpen")) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = NEXT_ROW; |
| } |
| break; |
| |
| case NEXT_ROW: |
| { |
| retcode = tcb_->ehi_->nextRow(); |
| if ( (retcode == HBASE_ACCESS_EOD) || (retcode == HBASE_ACCESS_EOR) ) |
| { |
| if (tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::MERGE_) |
| { |
| // didn't find the row, cannot update. |
| // evaluate the mergeInsert expr and insert the row. |
| step_ = CREATE_MERGE_INSERTED_ROW; |
| break; |
| } |
| |
| tcb_->currRowidIdx_++; |
| step_ = GET_NEXT_ROWID; |
| break; |
| } |
| |
| if ( tcb_->setupError(retcode, "ExpHbaseInterface::nextRow")) |
| step_ = HANDLE_ERROR; |
| else if ((tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) && |
| (! tcb_->scanExpr()) && |
| (! tcb_->lobDelExpr()) && |
| (NOT tcb_->hbaseAccessTdb().returnRow())) |
| step_ = DELETE_ROW; |
| else |
| step_ = CREATE_FETCHED_ROW; |
| } |
| break; |
| |
| case CREATE_FETCHED_ROW: |
| { |
| retcode = tcb_->createSQRowDirect(&latestRowTimestamp_); |
| if (retcode == HBASE_ACCESS_NO_ROW) |
| { |
| step_ = NEXT_ROW; |
| break; |
| } |
| if (retcode < 0) |
| { |
| rc = (short)retcode; |
| tcb_->setupError(rc, "createSQRowDirect"); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (retcode != HBASE_ACCESS_SUCCESS) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = APPLY_PRED; |
| } |
| break; |
| |
| case APPLY_PRED: |
| { |
| rc = tcb_->applyPred(tcb_->scanExpr()); |
| if (rc == 1) |
| { |
| if (tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) |
| step_ = DELETE_ROW; |
| else if ((tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::MERGE_) && |
| (tcb_->mergeUpdScanExpr())) |
| step_ = APPLY_MERGE_UPD_SCAN_PRED; |
| else |
| step_ = CREATE_UPDATED_ROW; |
| } |
| else if (rc == -1) |
| step_ = HANDLE_ERROR; |
| else |
| { |
| if (tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::MERGE_) |
| { |
| // didn't find the row, cannot update. |
| // evaluate the mergeInsert expr and insert the row. |
| step_ = CREATE_MERGE_INSERTED_ROW; |
| break; |
| } |
| |
| tcb_->currRowidIdx_++; |
| step_ = GET_NEXT_ROWID; |
| } |
| } |
| break; |
| |
| case APPLY_MERGE_UPD_SCAN_PRED: |
| { |
| rc = tcb_->applyPred(tcb_->mergeUpdScanExpr()); |
| if (rc == 1) |
| { |
| step_ = CREATE_UPDATED_ROW; |
| } |
| else if (rc == -1) |
| step_ = HANDLE_ERROR; |
| else |
| { |
| tcb_->currRowidIdx_++; |
| step_ = GET_NEXT_ROWID; |
| } |
| } |
| break; |
| |
| case CREATE_UPDATED_ROW: |
| { |
| if (! tcb_->updateExpr()) |
| { |
| tcb_->currRowidIdx_++; |
| |
| step_ = GET_NEXT_ROWID; |
| break; |
| } |
| |
| tcb_->workAtp_->getTupp(tcb_->hbaseAccessTdb().updateTuppIndex_) |
| .setDataPointer(tcb_->updateRow_); |
| |
| if (tcb_->updateExpr()) |
| { |
| tcb_->insertRowlen_ = tcb_->hbaseAccessTdb().updateRowLen_; |
| ex_expr::exp_return_type evalRetCode = |
| tcb_->updateExpr()->eval(pentry_down->getAtp(), tcb_->workAtp_, |
| NULL, -1, &tcb_->insertRowlen_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| step_ = EVAL_UPD_CONSTRAINT; |
| } |
| break; |
| |
| case EVAL_UPD_CONSTRAINT: |
| { |
| rc = tcb_->evalConstraintExpr(tcb_->updConstraintExpr(), tcb_->hbaseAccessTdb().updateTuppIndex_, |
| tcb_->updateRow_); |
| if (rc == 1) |
| step_ = CREATE_MUTATIONS; |
| else if (rc == 0) |
| step_ = GET_CLOSE; |
| else |
| step_ = HANDLE_ERROR; |
| } |
| break; |
| |
| case CREATE_MUTATIONS: |
| { |
| rowUpdated_ = TRUE; |
| // Merge can result in inserting rows. |
| // Use Number of columns in insert rather number |
| // of columns in update if an insert is involved in this tcb |
| if (tcb_->hbaseAccessTdb().getAccessType() |
| == ComTdbHbaseAccess::MERGE_) |
| { |
| ExpTupleDesc * rowTD = NULL; |
| if (tcb_->mergeInsertExpr()) |
| { |
| rowTD = tcb_->hbaseAccessTdb().workCriDesc_->getTupleDescriptor |
| (tcb_->hbaseAccessTdb().mergeInsertTuppIndex_); |
| } |
| else |
| { |
| rowTD = tcb_->hbaseAccessTdb().workCriDesc_->getTupleDescriptor |
| (tcb_->hbaseAccessTdb().updateTuppIndex_); |
| } |
| |
| if (rowTD->numAttrs() > 0) |
| tcb_->allocateDirectRowBufferForJNI(rowTD->numAttrs()); |
| } |
| |
| retcode = tcb_->createDirectRowBuffer( tcb_->hbaseAccessTdb().updateTuppIndex_, |
| tcb_->updateRow_, |
| tcb_->hbaseAccessTdb().listOfUpdatedColNames(), |
| tcb_->hbaseAccessTdb().listOfOmittedColNames(), |
| TRUE); |
| if (retcode == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (tcb_->hbaseAccessTdb().canDoCheckAndUpdel()) |
| step_ = CHECK_AND_UPDATE_ROW; |
| else |
| step_ = UPDATE_ROW; |
| } |
| break; |
| |
| case CREATE_MERGE_INSERTED_ROW: |
| { |
| if (! tcb_->mergeInsertExpr()) |
| { |
| tcb_->currRowidIdx_++; |
| |
| step_ = GET_NEXT_ROWID; |
| break; |
| } |
| |
| tcb_->workAtp_->getTupp(tcb_->hbaseAccessTdb().mergeInsertTuppIndex_) |
| .setDataPointer(tcb_->mergeInsertRow_); |
| |
| if (tcb_->mergeInsertExpr()) |
| { |
| tcb_->insertRowlen_ = tcb_->hbaseAccessTdb().mergeInsertRowLen_; |
| ex_expr::exp_return_type evalRetCode = |
| tcb_->mergeInsertExpr()->eval(pentry_down->getAtp(), tcb_->workAtp_, |
| NULL, -1, &tcb_->insertRowlen_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| if (tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::MERGE_) |
| rowUpdated_ = FALSE; |
| step_ = EVAL_INS_CONSTRAINT; |
| } |
| break; |
| case EVAL_INS_CONSTRAINT: |
| { |
| rc = tcb_->evalConstraintExpr(tcb_->insConstraintExpr()); |
| if (rc == 0) { |
| step_ = GET_CLOSE; |
| break; |
| } |
| else if (rc != 1) { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| retcode = tcb_->createDirectRowBuffer( tcb_->hbaseAccessTdb().mergeInsertTuppIndex_, |
| tcb_->mergeInsertRow_, |
| tcb_->hbaseAccessTdb().listOfMergedColNames(), |
| tcb_->hbaseAccessTdb().listOfOmittedColNames()); |
| if (retcode == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::MERGE_) |
| step_ = CHECK_AND_INSERT_ROW; |
| else |
| step_ = UPDATE_ROW; |
| } |
| break; |
| |
| case UPDATE_ROW: |
| { |
| retcode = tcb_->ehi_->insertRow(tcb_->table_, |
| tcb_->rowIds_[tcb_->currRowidIdx_], |
| tcb_->row_, |
| tcb_->hbaseAccessTdb().useHbaseXn(), |
| tcb_->hbaseAccessTdb().useRegionXn(), |
| -1, //colTS_ |
| tcb_->asyncOperation_); |
| if ( tcb_->setupError(retcode, "ExpHbaseInterface::insertRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (tcb_->getHbaseAccessStats()) |
| tcb_->getHbaseAccessStats()->incUsedRows(); |
| |
| // matches will get incremented during return row. |
| if (NOT tcb_->hbaseAccessTdb().returnRow()) |
| tcb_->matches_++; |
| |
| step_ = NEXT_ROW_AFTER_UPDATE; |
| } |
| break; |
| |
| case CHECK_AND_UPDATE_ROW: |
| { |
| rc = tcb_->evalKeyColValExpr(columnToCheck_, colValToCheck_); |
| if (rc == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| retcode = tcb_->ehi_->checkAndUpdateRow(tcb_->table_, |
| tcb_->rowIds_[tcb_->currRowidIdx_], |
| tcb_->row_, |
| columnToCheck_, |
| colValToCheck_, |
| tcb_->hbaseAccessTdb().useHbaseXn(), |
| tcb_->hbaseAccessTdb().useRegionXn(), |
| -1, //colTS_ |
| tcb_->asyncOperation_); |
| |
| if (retcode == HBASE_ROW_NOTFOUND_ERROR) |
| { |
| step_ = NEXT_ROW_AFTER_UPDATE; |
| break; |
| } |
| |
| if ( tcb_->setupError(retcode, "ExpHbaseInterface::checkAndUpdateRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (tcb_->getHbaseAccessStats()) |
| tcb_->getHbaseAccessStats()->incUsedRows(); |
| |
| // matches will get incremented during return row. |
| if (NOT tcb_->hbaseAccessTdb().returnRow()) |
| tcb_->matches_++; |
| |
| step_ = NEXT_ROW_AFTER_UPDATE; |
| } |
| break; |
| |
| case CHECK_AND_INSERT_ROW: |
| { |
| Text rowIdRow; |
| if (tcb_->mergeInsertRowIdExpr()) |
| { |
| tcb_->workAtp_->getTupp(tcb_->hbaseAccessTdb().mergeInsertRowIdTuppIndex_) |
| .setDataPointer(tcb_->rowIdRow_); |
| |
| ex_expr::exp_return_type evalRetCode = |
| tcb_->mergeInsertRowIdExpr()->eval(pentry_down->getAtp(), tcb_->workAtp_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| rowIdRow.assign(tcb_->rowIdRow_, tcb_->hbaseAccessTdb().getRowIDLen()); |
| } |
| HbaseStr rowID; |
| if (tcb_->mergeInsertRowIdExpr()) |
| { |
| rowID.val = (char *)rowIdRow.data(); |
| rowID.len = rowIdRow.size(); |
| } |
| else |
| { |
| rowID.val = (char *)tcb_->rowIds_[tcb_->currRowidIdx_].val; |
| rowID.len = tcb_->rowIds_[tcb_->currRowidIdx_].len; |
| } |
| retcode = tcb_->ehi_->checkAndInsertRow(tcb_->table_, |
| rowID, |
| tcb_->row_, |
| tcb_->hbaseAccessTdb().useHbaseXn(), |
| tcb_->hbaseAccessTdb().useRegionXn(), |
| -1, // colTS |
| tcb_->asyncOperation_, |
| tcb_->hbaseAccessTdb().getColIndexOfPK1()); |
| |
| if (retcode == HBASE_DUP_ROW_ERROR) |
| { |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(tcb_->getHeap(), &diagsArea, |
| (ExeErrorCode)(8102)); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| else if (tcb_->setupError(retcode, "ExpHbaseInterface::insertRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (tcb_->getHbaseAccessStats()) |
| tcb_->getHbaseAccessStats()->incUsedRows(); |
| |
| // matches will get incremented during return row. |
| if (NOT tcb_->hbaseAccessTdb().returnRow()) |
| tcb_->matches_++; |
| |
| step_ = NEXT_ROW_AFTER_UPDATE; |
| } |
| break; |
| |
| case NEXT_ROW_AFTER_UPDATE: |
| { |
| tcb_->currRowidIdx_++; |
| |
| if (tcb_->hbaseAccessTdb().returnRow()) |
| { |
| step_ = EVAL_RETURN_ROW_EXPRS; |
| break; |
| } |
| |
| step_ = GET_NEXT_ROWID; |
| } |
| break; |
| |
| case DELETE_ROW: |
| { |
| rc = tcb_->evalInsDelPreCondExpr(); |
| if (rc == -1) { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (rc == 0) { // No need to delete |
| tcb_->currRowidIdx_++; |
| step_ = GET_NEXT_ROWID; |
| break; |
| } |
| retcode = tcb_->ehi_->deleteRow(tcb_->table_, |
| tcb_->rowIds_[tcb_->currRowidIdx_], |
| NULL, |
| tcb_->hbaseAccessTdb().useHbaseXn(), |
| tcb_->hbaseAccessTdb().useRegionXn(), |
| latestRowTimestamp_, |
| tcb_->asyncOperation_); |
| if ( tcb_->setupError(retcode, "ExpHbaseInterface::deleteRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| // delete entries from LOB desc table, if needed |
| if (tcb_->lobDelExpr()) |
| { |
| ex_expr::exp_return_type exprRetCode = |
| tcb_->lobDelExpr()->eval(pentry_down->getAtp(), tcb_->workAtp_); |
| if (exprRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| if (tcb_->getHbaseAccessStats()) |
| tcb_->getHbaseAccessStats()->incUsedRows(); |
| |
| tcb_->currRowidIdx_++; |
| |
| if (tcb_->hbaseAccessTdb().returnRow()) |
| { |
| step_ = RETURN_ROW; |
| break; |
| } |
| |
| tcb_->matches_++; |
| |
| step_ = GET_NEXT_ROWID; |
| } |
| break; |
| |
| case CHECK_AND_DELETE_ROW: |
| { |
| rc = tcb_->evalInsDelPreCondExpr(); |
| if (rc == -1) { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (rc == 0) { // donot delete |
| tcb_->currRowidIdx_++; |
| step_ = GET_NEXT_ROWID; |
| break; |
| } |
| |
| rc = tcb_->evalKeyColValExpr(columnToCheck_, colValToCheck_); |
| if (rc == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| retcode = tcb_->ehi_->checkAndDeleteRow(tcb_->table_, |
| tcb_->rowIds_[tcb_->currRowidIdx_], |
| columnToCheck_, |
| colValToCheck_, |
| tcb_->hbaseAccessTdb().useHbaseXn(), |
| tcb_->hbaseAccessTdb().useRegionXn(), |
| -1 //colTS_ |
| ); |
| |
| if (retcode == HBASE_ROW_NOTFOUND_ERROR) |
| { |
| tcb_->currRowidIdx_++; |
| step_ = GET_NEXT_ROWID; |
| break; |
| } |
| |
| if ( tcb_->setupError(retcode, "ExpHbaseInterface::checkAndDeleteRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| tcb_->currRowidIdx_++; |
| |
| if (tcb_->getHbaseAccessStats()) |
| tcb_->getHbaseAccessStats()->incUsedRows(); |
| |
| if (tcb_->hbaseAccessTdb().returnRow()) |
| { |
| step_ = RETURN_ROW; |
| break; |
| } |
| |
| tcb_->matches_++; |
| |
| step_ = GET_NEXT_ROWID; |
| } |
| break; |
| |
| case RETURN_ROW: |
| { |
| if (tcb_->qparent_.up->isFull()) |
| { |
| rc = WORK_OK; |
| return 1; |
| } |
| |
| rc = 0; |
| // moveRowToUpQueue also increments matches_ |
| if (tcb_->moveRowToUpQueue(tcb_->convertRow_, tcb_->hbaseAccessTdb().convertRowLen(), |
| &rc, FALSE)) |
| return 1; |
| |
| if ((pentry_down->downState.request == ex_queue::GET_N) && |
| (pentry_down->downState.requestValue == tcb_->matches_)) |
| { |
| step_ = GET_CLOSE; |
| break; |
| } |
| |
| step_ = GET_NEXT_ROWID; |
| } |
| break; |
| |
| case EVAL_RETURN_ROW_EXPRS: |
| { |
| ex_queue_entry * up_entry = tcb_->qparent_.up->getTailEntry(); |
| |
| rc = 0; |
| |
| // allocate tupps where returned rows will be created |
| if (tcb_->allocateUpEntryTupps( |
| tcb_->hbaseAccessTdb().returnedFetchedTuppIndex_, |
| tcb_->hbaseAccessTdb().returnFetchedRowLen_, |
| tcb_->hbaseAccessTdb().returnedUpdatedTuppIndex_, |
| tcb_->hbaseAccessTdb().returnUpdatedRowLen_, |
| FALSE, |
| &rc)) |
| return 1; |
| |
| ex_expr::exp_return_type exprRetCode; |
| |
| char * fetchedDataPtr = NULL; |
| char * updatedDataPtr = NULL; |
| char * mergeIUDIndicatorDataPtr = NULL; |
| if (tcb_->returnFetchExpr()) |
| { |
| exprRetCode = |
| tcb_->returnFetchExpr()->eval(up_entry->getAtp(), tcb_->workAtp_); |
| if (exprRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| fetchedDataPtr = up_entry->getAtp()->getTupp(tcb_->hbaseAccessTdb().returnedFetchedTuppIndex_).getDataPointer(); |
| |
| } |
| if (tcb_->hbaseAccessTdb().mergeIUDIndicatorTuppIndex_ > 0) |
| mergeIUDIndicatorDataPtr = |
| tcb_->workAtp_-> |
| getTupp(tcb_->hbaseAccessTdb().mergeIUDIndicatorTuppIndex_). |
| getDataPointer(); |
| |
| if (rowUpdated_) |
| { |
| if (tcb_->returnUpdateExpr()) |
| { |
| if (mergeIUDIndicatorDataPtr) |
| *mergeIUDIndicatorDataPtr = 'U'; |
| exprRetCode = |
| tcb_->returnUpdateExpr()->eval(up_entry->getAtp(), tcb_->workAtp_); |
| if (exprRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| updatedDataPtr = |
| up_entry->getAtp()->getTupp(tcb_->hbaseAccessTdb().returnedUpdatedTuppIndex_).getDataPointer(); |
| } |
| } |
| else |
| { |
| if (mergeIUDIndicatorDataPtr) |
| *mergeIUDIndicatorDataPtr = 'I'; |
| if (tcb_->returnMergeInsertExpr()) |
| { |
| exprRetCode = |
| tcb_->returnMergeInsertExpr()->eval(up_entry->getAtp(), tcb_->workAtp_); |
| if (exprRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| updatedDataPtr = |
| up_entry->getAtp()->getTupp(tcb_->hbaseAccessTdb().returnedUpdatedTuppIndex_).getDataPointer(); |
| } |
| } |
| |
| step_ = RETURN_UPDATED_ROWS; |
| } |
| break; |
| |
| case RETURN_UPDATED_ROWS: |
| { |
| rc = 0; |
| // moveRowToUpQueue also increments matches_ |
| if (tcb_->moveRowToUpQueue(&rc)) |
| return 1; |
| |
| if ((pentry_down->downState.request == ex_queue::GET_N) && |
| (pentry_down->downState.requestValue == tcb_->matches_)) |
| { |
| step_ = GET_CLOSE; |
| break; |
| } |
| |
| step_ = GET_NEXT_ROWID; |
| } |
| break; |
| |
| case GET_CLOSE: |
| { |
| retcode = tcb_->ehi_->getClose(); |
| if ( tcb_->setupError(retcode, "ExpHbaseInterface::getClose")) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = DONE; |
| } |
| break; |
| |
| case HANDLE_ERROR: |
| { |
| step_ = NOT_STARTED; |
| return -1; |
| } |
| break; |
| |
| case DONE: |
| { |
| step_ = NOT_STARTED; |
| return 0; |
| } |
| break; |
| |
| }// switch |
| |
| } // while |
| |
| } |
| |
| // UMD (unique UpdMergeDel on hbase tables. Well, Merge not supported yet) |
| ExHbaseUMDnativeUniqueTaskTcb::ExHbaseUMDnativeUniqueTaskTcb |
| (ExHbaseAccessUMDTcb * tcb) |
| : ExHbaseUMDtrafUniqueTaskTcb(tcb) |
| , step_(NOT_STARTED) |
| { |
| } |
| |
| void ExHbaseUMDnativeUniqueTaskTcb::init() |
| { |
| step_ = NOT_STARTED; |
| } |
| |
| ExWorkProcRetcode ExHbaseUMDnativeUniqueTaskTcb::work(short &rc) |
| { |
| Lng32 retcode = 0; |
| rc = 0; |
| |
| while (1) |
| { |
| ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry(); |
| |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| rowUpdated_ = FALSE; |
| |
| step_ = SETUP_UMD; |
| } |
| break; |
| |
| case SETUP_UMD: |
| { |
| tcb_->currRowidIdx_ = 0; |
| |
| tcb_->setupListOfColNames(tcb_->hbaseAccessTdb().listOfDeletedColNames(), |
| tcb_->deletedColumns_); |
| |
| tcb_->setupListOfColNames(tcb_->hbaseAccessTdb().listOfFetchedColNames(), |
| tcb_->columns_); |
| |
| step_ = GET_NEXT_ROWID; |
| } |
| break; |
| |
| case GET_NEXT_ROWID: |
| { |
| if (tcb_->currRowidIdx_ == tcb_->rowIds_.entries()) |
| { |
| step_ = GET_CLOSE; |
| break; |
| } |
| |
| // retrieve columns to be deleted. If none of the columns exist, then |
| // this row cannot be deleted. |
| // But if there is a scan expr, then we need to also retrieve the columns used |
| // in the pred. Add those. |
| LIST(HbaseStr) columns(tcb_->getHeap()); |
| if (tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) |
| { |
| columns = tcb_->deletedColumns_; |
| if (tcb_->scanExpr()) |
| { |
| // retrieve all columns if none is specified. |
| if (tcb_->columns_.entries() == 0) |
| columns.clear(); |
| else |
| // append retrieved columns to deleted columns. |
| columns.insert(tcb_->columns_); |
| } |
| } |
| |
| retcode = tcb_->ehi_->getRowOpen( tcb_->table_, |
| tcb_->rowIds_[tcb_->currRowidIdx_], |
| columns, -1); |
| if ( tcb_->setupError(retcode, "ExpHbaseInterface::getRowOpen")) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = NEXT_ROW; |
| } |
| break; |
| |
| case NEXT_ROW: |
| { |
| retcode = tcb_->ehi_->nextRow(); |
| if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR) |
| { |
| step_ = GET_CLOSE; |
| break; |
| } |
| if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow")) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = NEXT_CELL; |
| } |
| break; |
| |
| case NEXT_CELL: |
| { |
| if (tcb_->colVal_.val == NULL) |
| tcb_->colVal_.val = new (tcb_->getHeap()) |
| char[tcb_->hbaseAccessTdb().convertRowLen()]; |
| tcb_->colVal_.len = tcb_->hbaseAccessTdb().convertRowLen(); |
| retcode = tcb_->ehi_->nextCell(tcb_->rowId_, tcb_->colFamName_, |
| tcb_->colName_, tcb_->colVal_, tcb_->colTS_); |
| if (retcode == HBASE_ACCESS_EOD) |
| { |
| if ((tcb_->hbaseAccessTdb().getAccessType() |
| == ComTdbHbaseAccess::DELETE_) && (! tcb_->scanExpr())) |
| step_ = DELETE_ROW; |
| else |
| step_ = CREATE_FETCHED_ROW; |
| break; |
| } |
| if (tcb_->setupError(retcode, "ExpHbaseInterface::nextCell")) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = APPEND_CELL_TO_ROW; |
| } |
| break; |
| |
| case APPEND_CELL_TO_ROW: |
| { |
| tcb_->copyCell(); |
| step_ = NEXT_CELL; |
| } |
| break; |
| |
| case CREATE_FETCHED_ROW: |
| { |
| rc = tcb_->createRowwiseRow(); |
| if (rc < 0) |
| { |
| if (rc != -1) |
| tcb_->setupError(rc, "createRowwiseRow"); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = APPLY_PRED; |
| } |
| break; |
| |
| case APPLY_PRED: |
| { |
| rc = tcb_->applyPred(tcb_->scanExpr()); |
| if (rc == 1) |
| { |
| if (tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) |
| step_ = DELETE_ROW; |
| else |
| step_ = CREATE_UPDATED_ROW; |
| } |
| else if (rc == -1) |
| step_ = HANDLE_ERROR; |
| else |
| { |
| tcb_->currRowidIdx_++; |
| step_ = GET_NEXT_ROWID; |
| } |
| } |
| break; |
| |
| case CREATE_UPDATED_ROW: |
| { |
| if (! tcb_->updateExpr()) |
| { |
| tcb_->currRowidIdx_++; |
| |
| step_ = GET_NEXT_ROWID; |
| break; |
| } |
| |
| tcb_->workAtp_->getTupp(tcb_->hbaseAccessTdb().updateTuppIndex_) |
| .setDataPointer(tcb_->updateRow_); |
| |
| if (tcb_->updateExpr()) |
| { |
| ex_expr::exp_return_type evalRetCode = |
| tcb_->updateExpr()->eval(pentry_down->getAtp(), tcb_->workAtp_, |
| NULL, -1, &tcb_->insertRowlen_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| ExpTupleDesc * rowTD = |
| tcb_->hbaseAccessTdb().workCriDesc_->getTupleDescriptor |
| (tcb_->hbaseAccessTdb().updateTuppIndex_); |
| |
| Attributes * attr = rowTD->getAttr(0); |
| |
| rowUpdated_ = TRUE; |
| retcode = tcb_->createDirectRowwiseBuffer( |
| &tcb_->updateRow_[attr->getOffset()]); |
| if (retcode == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = UPDATE_ROW; |
| } |
| break; |
| |
| case DELETE_ROW: |
| { |
| retcode = tcb_->ehi_->deleteRow(tcb_->table_, |
| tcb_->rowIds_[tcb_->currRowidIdx_], |
| &tcb_->deletedColumns_, |
| tcb_->hbaseAccessTdb().useHbaseXn(), |
| tcb_->hbaseAccessTdb().useRegionXn(), |
| -1 , |
| tcb_->asyncOperation_); |
| if ( tcb_->setupError(retcode, "ExpHbaseInterface::deleteRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| tcb_->currRowidIdx_++; |
| |
| if (tcb_->getHbaseAccessStats()) |
| tcb_->getHbaseAccessStats()->incUsedRows(); |
| |
| tcb_->matches_++; |
| |
| step_ = GET_NEXT_ROWID; |
| } |
| break; |
| |
| case UPDATE_ROW: |
| { |
| if (tcb_->numColsInDirectBuffer() > 0) |
| { |
| retcode = tcb_->ehi_->insertRow(tcb_->table_, |
| tcb_->rowIds_[tcb_->currRowidIdx_], |
| tcb_->row_, |
| tcb_->hbaseAccessTdb().useHbaseXn(), |
| tcb_->hbaseAccessTdb().useRegionXn(), |
| -1, // colTS_ |
| tcb_->asyncOperation_); |
| |
| if ( tcb_->setupError(retcode, "ExpHbaseInterface::insertRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (tcb_->getHbaseAccessStats()) |
| tcb_->getHbaseAccessStats()->incUsedRows(); |
| |
| tcb_->matches_++; |
| } |
| tcb_->currRowidIdx_++; |
| |
| step_ = GET_NEXT_ROWID; |
| } |
| break; |
| |
| case GET_CLOSE: |
| { |
| retcode = tcb_->ehi_->getClose(); |
| if ( tcb_->setupError(retcode, "ExpHbaseInterface::getClose")) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = DONE; |
| } |
| break; |
| |
| case HANDLE_ERROR: |
| { |
| step_ = NOT_STARTED; |
| return -1; |
| } |
| break; |
| |
| case DONE: |
| { |
| step_ = NOT_STARTED; |
| return 0; |
| } |
| break; |
| |
| }// switch |
| |
| } // while |
| |
| } |
| |
| ExHbaseUMDtrafSubsetTaskTcb::ExHbaseUMDtrafSubsetTaskTcb |
| (ExHbaseAccessUMDTcb * tcb) |
| : ExHbaseTaskTcb(tcb) |
| , step_(NOT_STARTED) |
| { |
| } |
| |
| void ExHbaseUMDtrafSubsetTaskTcb::init() |
| { |
| step_ = NOT_STARTED; |
| } |
| |
| ExWorkProcRetcode ExHbaseUMDtrafSubsetTaskTcb::work(short &rc) |
| { |
| Lng32 retcode = 0; |
| HbaseStr rowID; |
| rc = 0; |
| Lng32 remainingInBatch = batchSize_; |
| |
| while (1) |
| { |
| ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry(); |
| |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| step_ = SCAN_OPEN; |
| } |
| break; |
| |
| case SCAN_OPEN: |
| { |
| // Pre-fetch is disabled because it interfers with |
| // Delete operations |
| retcode = tcb_->ehi_->scanOpen(tcb_->table_, |
| tcb_->beginRowId_, tcb_->endRowId_, |
| tcb_->columns_, -1, |
| tcb_->hbaseAccessTdb().readUncommittedScan(), |
| tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks(), |
| tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScanner(), |
| tcb_->hbaseAccessTdb().getHbasePerfAttributes()->numCacheRows(), |
| FALSE, NULL, NULL, NULL, |
| tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner()); |
| if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen")) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = NEXT_ROW; |
| } |
| break; |
| |
| case NEXT_ROW: |
| { |
| if (--remainingInBatch <= 0) |
| { |
| rc = WORK_CALL_AGAIN; |
| return 1; |
| } |
| |
| retcode = tcb_->ehi_->nextRow(); |
| if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR) |
| { |
| step_ = SCAN_CLOSE; |
| break; |
| } |
| if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) |
| { |
| if ((! tcb_->scanExpr()) && |
| (NOT tcb_->hbaseAccessTdb().returnRow())) |
| { |
| step_ = DELETE_ROW; |
| break; |
| } |
| } |
| |
| step_ = CREATE_FETCHED_ROW; |
| } |
| break; |
| |
| case CREATE_FETCHED_ROW: |
| { |
| retcode = tcb_->createSQRowDirect(); |
| if (retcode == HBASE_ACCESS_NO_ROW) |
| { |
| step_ = NEXT_ROW; |
| break; |
| } |
| if (retcode < 0) |
| { |
| rc = (short)retcode; |
| tcb_->setupError(rc, "createSQRowDirect"); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (retcode != HBASE_ACCESS_SUCCESS) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| step_ = APPLY_PRED; |
| } |
| break; |
| |
| case APPLY_PRED: |
| { |
| rc = tcb_->applyPred(tcb_->scanExpr()); |
| if (rc == 1) |
| { |
| if (tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) |
| step_ = DELETE_ROW; |
| else |
| step_ = CREATE_UPDATED_ROW; |
| } |
| else if (rc == -1) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = NEXT_ROW; |
| } |
| break; |
| |
| case CREATE_UPDATED_ROW: |
| { |
| tcb_->workAtp_->getTupp(tcb_->hbaseAccessTdb().updateTuppIndex_) |
| .setDataPointer(tcb_->updateRow_); |
| |
| if (tcb_->updateExpr()) |
| { |
| tcb_->insertRowlen_ = tcb_->hbaseAccessTdb().updateRowLen_; |
| ex_expr::exp_return_type evalRetCode = |
| tcb_->updateExpr()->eval(pentry_down->getAtp(), tcb_->workAtp_, |
| NULL, -1, &tcb_->insertRowlen_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| step_ = EVAL_UPD_CONSTRAINT; |
| } |
| break; |
| |
| case EVAL_UPD_CONSTRAINT: |
| { |
| rc = tcb_->evalConstraintExpr(tcb_->updConstraintExpr(), tcb_->hbaseAccessTdb().updateTuppIndex_, |
| tcb_->updateRow_); |
| if (rc == 1) |
| step_ = CREATE_MUTATIONS; |
| else if (rc == 0) |
| step_ = SCAN_CLOSE; |
| else // error |
| step_ = HANDLE_ERROR; |
| } |
| break; |
| |
| case CREATE_MUTATIONS: |
| { |
| // Merge can result in inserting rows |
| // Use Number of columns in insert rather number |
| // of columns in update if an insert is involved in this tcb |
| if (tcb_->hbaseAccessTdb().getAccessType() |
| == ComTdbHbaseAccess::MERGE_) |
| { |
| ExpTupleDesc * rowTD = NULL; |
| if (tcb_->mergeInsertExpr()) |
| { |
| rowTD = tcb_->hbaseAccessTdb().workCriDesc_->getTupleDescriptor |
| (tcb_->hbaseAccessTdb().mergeInsertTuppIndex_); |
| } |
| else |
| { |
| rowTD = tcb_->hbaseAccessTdb().workCriDesc_->getTupleDescriptor |
| (tcb_->hbaseAccessTdb().updateTuppIndex_); |
| } |
| |
| if (rowTD->numAttrs() > 0) |
| tcb_->allocateDirectRowBufferForJNI(rowTD->numAttrs()); |
| } |
| |
| retcode = tcb_->createDirectRowBuffer( |
| tcb_->hbaseAccessTdb().updateTuppIndex_, |
| tcb_->updateRow_, |
| tcb_->hbaseAccessTdb().listOfUpdatedColNames(), |
| tcb_->hbaseAccessTdb().listOfOmittedColNames(), |
| TRUE); |
| if (retcode == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = UPDATE_ROW; |
| } |
| break; |
| |
| case UPDATE_ROW: |
| { |
| retcode = tcb_->ehi_->getRowID(rowID); |
| if (tcb_->setupError(retcode, "ExpHbaseInterface::insertRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| retcode = tcb_->ehi_->insertRow(tcb_->table_, |
| rowID, |
| tcb_->row_, |
| tcb_->hbaseAccessTdb().useHbaseXn(), |
| tcb_->hbaseAccessTdb().useRegionXn(), |
| -1, // colTS_ |
| tcb_->asyncOperation_); |
| if (tcb_->setupError(retcode, "ExpHbaseInterface::insertRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (tcb_->hbaseAccessTdb().returnRow()) |
| { |
| step_ = EVAL_RETURN_ROW_EXPRS; |
| break; |
| } |
| |
| if (tcb_->getHbaseAccessStats()) |
| tcb_->getHbaseAccessStats()->incUsedRows(); |
| |
| tcb_->matches_++; |
| |
| step_ = NEXT_ROW; |
| } |
| break; |
| |
| case DELETE_ROW: |
| { |
| retcode = tcb_->ehi_->getRowID(rowID); |
| if (tcb_->setupError(retcode, "ExpHbaseInterface::insertRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| retcode = tcb_->ehi_->deleteRow(tcb_->table_, |
| rowID, |
| NULL, |
| tcb_->hbaseAccessTdb().useHbaseXn(), |
| tcb_->hbaseAccessTdb().useRegionXn(), |
| -1, |
| tcb_->asyncOperation_); |
| if ( tcb_->setupError(retcode, "ExpHbaseInterface::deleteRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| // delete entries from LOB desc table, if needed |
| if (tcb_->lobDelExpr()) |
| { |
| ex_expr::exp_return_type exprRetCode = |
| tcb_->lobDelExpr()->eval(pentry_down->getAtp(), tcb_->workAtp_); |
| if (exprRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| if (tcb_->getHbaseAccessStats()) |
| tcb_->getHbaseAccessStats()->incUsedRows(); |
| |
| tcb_->currRowidIdx_++; |
| |
| if (tcb_->hbaseAccessTdb().returnRow()) |
| { |
| step_ = RETURN_ROW; |
| break; |
| } |
| |
| tcb_->matches_++; |
| |
| step_ = NEXT_ROW; |
| } |
| break; |
| |
| case RETURN_ROW: |
| { |
| if (tcb_->qparent_.up->isFull()) |
| { |
| rc = WORK_OK; |
| return 1; |
| } |
| |
| rc = 0; |
| // moveRowToUpQueue also increments matches_ |
| if (tcb_->moveRowToUpQueue(tcb_->convertRow_, tcb_->hbaseAccessTdb().convertRowLen(), |
| &rc, FALSE)) |
| return 1; |
| |
| if ((pentry_down->downState.request == ex_queue::GET_N) && |
| (pentry_down->downState.requestValue == tcb_->matches_)) |
| { |
| step_ = SCAN_CLOSE; |
| break; |
| } |
| |
| step_ = NEXT_ROW; |
| } |
| break; |
| |
| case EVAL_RETURN_ROW_EXPRS: |
| { |
| ex_queue_entry * up_entry = tcb_->qparent_.up->getTailEntry(); |
| |
| rc = 0; |
| |
| // allocate tupps where returned rows will be created |
| if (tcb_->allocateUpEntryTupps( |
| tcb_->hbaseAccessTdb().returnedFetchedTuppIndex_, |
| tcb_->hbaseAccessTdb().returnFetchedRowLen_, |
| tcb_->hbaseAccessTdb().returnedUpdatedTuppIndex_, |
| tcb_->hbaseAccessTdb().returnUpdatedRowLen_, |
| FALSE, |
| &rc)) |
| return 1; |
| |
| ex_expr::exp_return_type exprRetCode; |
| |
| char * fetchedDataPtr = NULL; |
| char * updatedDataPtr = NULL; |
| if (tcb_->returnFetchExpr()) |
| { |
| exprRetCode = |
| tcb_->returnFetchExpr()->eval(up_entry->getAtp(), tcb_->workAtp_); |
| if (exprRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| fetchedDataPtr = up_entry->getAtp()->getTupp(tcb_->hbaseAccessTdb().returnedFetchedTuppIndex_).getDataPointer(); |
| |
| } |
| |
| if (tcb_->returnUpdateExpr()) |
| { |
| exprRetCode = |
| tcb_->returnUpdateExpr()->eval(up_entry->getAtp(), tcb_->workAtp_); |
| if (exprRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| updatedDataPtr = up_entry->getAtp()->getTupp(tcb_->hbaseAccessTdb().returnedUpdatedTuppIndex_).getDataPointer(); |
| } |
| |
| step_ = RETURN_UPDATED_ROWS; |
| } |
| break; |
| |
| case RETURN_UPDATED_ROWS: |
| { |
| rc = 0; |
| // moveRowToUpQueue also increments matches_ |
| if (tcb_->moveRowToUpQueue(&rc)) |
| return 1; |
| |
| if ((pentry_down->downState.request == ex_queue::GET_N) && |
| (pentry_down->downState.requestValue == tcb_->matches_)) |
| { |
| step_ = SCAN_CLOSE; |
| break; |
| } |
| |
| step_ = NEXT_ROW; |
| } |
| break; |
| |
| case SCAN_CLOSE: |
| { |
| retcode = tcb_->ehi_->scanClose(); |
| if (tcb_->setupError(retcode, "ExpHbaseInterface::scanClose")) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = DONE; |
| } |
| break; |
| |
| case HANDLE_ERROR: |
| { |
| step_ = NOT_STARTED; |
| return -1; |
| } |
| break; |
| |
| case DONE: |
| { |
| step_ = NOT_STARTED; |
| return 0; |
| } |
| break; |
| |
| }// switch |
| |
| } // while |
| |
| } |
| |
| ExHbaseUMDnativeSubsetTaskTcb::ExHbaseUMDnativeSubsetTaskTcb |
| (ExHbaseAccessUMDTcb * tcb) |
| : ExHbaseUMDtrafSubsetTaskTcb(tcb) |
| , step_(NOT_STARTED) |
| { |
| } |
| |
| void ExHbaseUMDnativeSubsetTaskTcb::init() |
| { |
| step_ = NOT_STARTED; |
| } |
| |
| ExWorkProcRetcode ExHbaseUMDnativeSubsetTaskTcb::work(short &rc) |
| { |
| Lng32 retcode = 0; |
| rc = 0; |
| Lng32 remainingInBatch = batchSize_; |
| |
| while (1) |
| { |
| ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry(); |
| |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| tcb_->setupListOfColNames(tcb_->hbaseAccessTdb().listOfDeletedColNames(), |
| tcb_->deletedColumns_); |
| |
| tcb_->setupListOfColNames(tcb_->hbaseAccessTdb().listOfFetchedColNames(), |
| tcb_->columns_); |
| |
| step_ = SCAN_OPEN; |
| } |
| break; |
| |
| case SCAN_OPEN: |
| { |
| // retrieve columns to be deleted. If the column doesn't exist, then |
| // this row cannot be deleted. |
| // But if there is a scan expr, then we need to also retrieve the columns used |
| // in the pred. Add those. |
| LIST(HbaseStr) columns(tcb_->getHeap()); |
| if (tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) |
| { |
| columns = tcb_->deletedColumns_; |
| if (tcb_->scanExpr()) |
| { |
| // retrieve all columns if none is specified. |
| if (tcb_->columns_.entries() == 0) |
| columns.clear(); |
| else |
| // append retrieved columns to deleted columns. |
| columns.insert(tcb_->columns_); |
| } |
| } |
| |
| retcode = tcb_->ehi_->scanOpen(tcb_->table_, |
| tcb_->beginRowId_, tcb_->endRowId_, |
| columns, -1, |
| tcb_->hbaseAccessTdb().readUncommittedScan(), |
| tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks(), |
| tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScanner(), |
| tcb_->hbaseAccessTdb().getHbasePerfAttributes()->numCacheRows(), |
| FALSE, NULL, NULL, NULL, |
| tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner()); |
| if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen")) |
| step_ = HANDLE_ERROR; |
| else |
| { |
| step_ = NEXT_ROW; |
| tcb_->isEOD_ = FALSE; |
| } |
| } |
| break; |
| |
| case NEXT_ROW: |
| { |
| if (--remainingInBatch <= 0) |
| { |
| rc = WORK_CALL_AGAIN; |
| return 1; |
| } |
| retcode = tcb_->ehi_->nextRow(); |
| if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR) |
| { |
| tcb_->isEOD_ = TRUE; |
| step_ = SCAN_CLOSE; |
| break; |
| } |
| if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow")) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = NEXT_CELL; |
| } |
| break; |
| |
| case NEXT_CELL: |
| { |
| if (tcb_->colVal_.val == NULL) |
| tcb_->colVal_.val = new (tcb_->getHeap()) |
| char[tcb_->hbaseAccessTdb().convertRowLen()]; |
| tcb_->colVal_.len = tcb_->hbaseAccessTdb().convertRowLen(); |
| retcode = tcb_->ehi_->nextCell( tcb_->rowId_, tcb_->colFamName_, |
| tcb_->colName_, tcb_->colVal_, |
| tcb_->colTS_); |
| if (retcode == HBASE_ACCESS_EOD) |
| { |
| if (tcb_->hbaseAccessTdb().getAccessType() |
| == ComTdbHbaseAccess::DELETE_) |
| { |
| if (! tcb_->scanExpr()) |
| { |
| step_ = DELETE_ROW; |
| break; |
| } |
| } |
| step_ = CREATE_FETCHED_ROWWISE_ROW; |
| break; |
| } |
| if (tcb_->setupError(retcode, "ExpHbaseInterface::nextCell")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| step_ = APPEND_CELL_TO_ROW; |
| } |
| break; |
| |
| case APPEND_CELL_TO_ROW: |
| { |
| tcb_->copyCell(); |
| step_ = NEXT_CELL; |
| } |
| break; |
| |
| case CREATE_FETCHED_ROWWISE_ROW: |
| { |
| rc = tcb_->createRowwiseRow(); |
| if (rc < 0) |
| { |
| if (rc != -1) |
| tcb_->setupError(rc, "createRowwiseRow"); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = APPLY_PRED; |
| } |
| break; |
| |
| case APPLY_PRED: |
| { |
| rc = tcb_->applyPred(tcb_->scanExpr()); |
| if (rc == 1) |
| { |
| if (tcb_->hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) |
| step_ = DELETE_ROW; |
| else |
| step_ = CREATE_UPDATED_ROWWISE_ROW; |
| } |
| else if (rc == -1) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = NEXT_ROW; |
| } |
| break; |
| |
| case CREATE_UPDATED_ROWWISE_ROW: |
| { |
| tcb_->workAtp_->getTupp(tcb_->hbaseAccessTdb().updateTuppIndex_) |
| .setDataPointer(tcb_->updateRow_); |
| |
| if (tcb_->updateExpr()) |
| { |
| ex_expr::exp_return_type evalRetCode = |
| tcb_->updateExpr()->eval(pentry_down->getAtp(), tcb_->workAtp_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| step_ = CREATE_MUTATIONS; |
| } |
| break; |
| |
| case CREATE_MUTATIONS: |
| { |
| ExpTupleDesc * rowTD = |
| tcb_->hbaseAccessTdb().workCriDesc_->getTupleDescriptor |
| (tcb_->hbaseAccessTdb().updateTuppIndex_); |
| |
| Attributes * attr = rowTD->getAttr(0); |
| |
| retcode = tcb_->createDirectRowwiseBuffer( |
| &tcb_->updateRow_[attr->getOffset()]); |
| |
| if (retcode == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = UPDATE_ROW; |
| } |
| break; |
| |
| case UPDATE_ROW: |
| { |
| if (tcb_->numColsInDirectBuffer() > 0) |
| { |
| retcode = tcb_->ehi_->insertRow(tcb_->table_, |
| tcb_->rowId_, |
| tcb_->row_, |
| tcb_->hbaseAccessTdb().useHbaseXn(), |
| tcb_->hbaseAccessTdb().useRegionXn(), |
| -1,// colTS_ |
| tcb_->asyncOperation_); |
| if (tcb_->setupError(retcode, "ExpHbaseInterface::insertRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (tcb_->getHbaseAccessStats()) |
| tcb_->getHbaseAccessStats()->incUsedRows(); |
| |
| tcb_->matches_++; |
| } |
| |
| step_ = NEXT_ROW; |
| } |
| break; |
| |
| case DELETE_ROW: |
| { |
| retcode = tcb_->ehi_->deleteRow(tcb_->table_, |
| tcb_->rowId_, |
| &tcb_->deletedColumns_, |
| tcb_->hbaseAccessTdb().useHbaseXn(), |
| tcb_->hbaseAccessTdb().useRegionXn(), |
| -1, |
| tcb_->asyncOperation_); |
| if ( tcb_->setupError(retcode, "ExpHbaseInterface::deleteRow")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| tcb_->currRowidIdx_++; |
| |
| if (tcb_->getHbaseAccessStats()) |
| tcb_->getHbaseAccessStats()->incUsedRows(); |
| |
| tcb_->matches_++; |
| |
| step_ = NEXT_ROW; |
| } |
| break; |
| |
| case SCAN_CLOSE: |
| { |
| retcode = tcb_->ehi_->scanClose(); |
| if (tcb_->setupError(retcode, "ExpHbaseInterface::scanClose")) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = DONE; |
| } |
| break; |
| |
| case HANDLE_ERROR: |
| { |
| step_ = NOT_STARTED; |
| return -1; |
| } |
| break; |
| |
| case DONE: |
| { |
| step_ = NOT_STARTED; |
| return 0; |
| } |
| break; |
| |
| }// switch |
| |
| } // while |
| } |
| |
| ExHbaseAccessUMDTcb::ExHbaseAccessUMDTcb( |
| const ExHbaseAccessTdb &hbaseAccessTdb, |
| ex_globals * glob ) : |
| ExHbaseAccessTcb(hbaseAccessTdb, glob), |
| step_(NOT_STARTED) |
| { |
| umdSQSubsetTaskTcb_ = NULL; |
| umdSQUniqueTaskTcb_ = NULL; |
| |
| for (Lng32 i = 0; i < UMD_MAX_TASKS; i++) |
| { |
| tasks_[i] = FALSE; |
| } |
| |
| ExHbaseAccessTdb &hbaseTdb = (ExHbaseAccessTdb&)hbaseAccessTdb; |
| |
| if (hbaseTdb.listOfScanRows()) |
| { |
| tasks_[UMD_SUBSET_TASK] = TRUE; |
| |
| if (hbaseTdb.sqHbaseTable()) |
| umdSQSubsetTaskTcb_ = |
| new(getGlobals()->getDefaultHeap()) ExHbaseUMDtrafSubsetTaskTcb(this); |
| else |
| umdSQSubsetTaskTcb_ = |
| new(getGlobals()->getDefaultHeap()) ExHbaseUMDnativeSubsetTaskTcb(this); |
| } |
| |
| if ((hbaseTdb.keySubsetGen()) && |
| (NOT hbaseTdb.uniqueKeyInfo())) |
| { |
| tasks_[UMD_SUBSET_KEY_TASK] = TRUE; |
| |
| if (hbaseTdb.sqHbaseTable()) |
| umdSQSubsetTaskTcb_ = |
| new(getGlobals()->getDefaultHeap()) ExHbaseUMDtrafSubsetTaskTcb(this); |
| else |
| umdSQSubsetTaskTcb_ = |
| new(getGlobals()->getDefaultHeap()) ExHbaseUMDnativeSubsetTaskTcb(this); |
| } |
| |
| if (hbaseTdb.listOfGetRows()) |
| { |
| tasks_[UMD_UNIQUE_TASK] = TRUE; |
| |
| if (hbaseTdb.sqHbaseTable()) |
| umdSQUniqueTaskTcb_ = |
| new(getGlobals()->getDefaultHeap()) ExHbaseUMDtrafUniqueTaskTcb(this); |
| else |
| umdSQUniqueTaskTcb_ = |
| new(getGlobals()->getDefaultHeap()) ExHbaseUMDnativeUniqueTaskTcb(this); |
| } |
| |
| if ((hbaseTdb.keySubsetGen()) && |
| (hbaseTdb.uniqueKeyInfo())) |
| { |
| tasks_[UMD_UNIQUE_KEY_TASK] = TRUE; |
| |
| if (hbaseTdb.sqHbaseTable()) |
| umdSQUniqueTaskTcb_ = |
| new(getGlobals()->getDefaultHeap()) ExHbaseUMDtrafUniqueTaskTcb(this); |
| else |
| umdSQUniqueTaskTcb_ = |
| new(getGlobals()->getDefaultHeap()) ExHbaseUMDnativeUniqueTaskTcb(this); |
| } |
| } |
| |
| ExWorkProcRetcode ExHbaseAccessUMDTcb::work() |
| { |
| Lng32 retcode = 0; |
| short rc = 0; |
| |
| ExMasterStmtGlobals *g = getGlobals()-> |
| castToExExeStmtGlobals()->castToExMasterStmtGlobals(); |
| |
| while (!qparent_.down->isEmpty()) |
| { |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| if ((pentry_down->downState.request == ex_queue::GET_NOMORE) && |
| (step_ != DONE)) |
| { |
| step_ = UMD_CLOSE_NO_ERROR; //DONE; |
| } |
| |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| matches_ = 0; |
| |
| step_ = UMD_INIT; |
| } |
| break; |
| |
| case UMD_INIT: |
| { |
| retcode = ehi_->init(getHbaseAccessStats()); |
| if (setupError(retcode, "ExpHbaseInterface::init")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (hbaseAccessTdb().listOfScanRows()) |
| hbaseAccessTdb().listOfScanRows()->position(); |
| |
| if (hbaseAccessTdb().listOfGetRows()) |
| { |
| if (! rowIdExpr()) |
| { |
| setupError(-HBASE_OPEN_ERROR, "", "RowId Expr is empty"); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| hbaseAccessTdb().listOfGetRows()->position(); |
| } |
| |
| table_.val = hbaseAccessTdb().getTableName(); |
| table_.len = strlen(hbaseAccessTdb().getTableName()); |
| |
| if (umdSQSubsetTaskTcb_) |
| umdSQSubsetTaskTcb_->init(); |
| |
| if (umdSQUniqueTaskTcb_) |
| umdSQUniqueTaskTcb_->init(); |
| |
| step_ = SETUP_SUBSET; |
| } |
| break; |
| |
| case SETUP_SUBSET: |
| { |
| if (NOT tasks_[UMD_SUBSET_TASK]) |
| { |
| step_ = SETUP_UNIQUE; |
| break; |
| } |
| |
| hsr_ = |
| (ComTdbHbaseAccess::HbaseScanRows*)hbaseAccessTdb().listOfScanRows() |
| ->getCurr(); |
| |
| retcode = setupSubsetRowIdsAndCols(hsr_); |
| if (retcode == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = PROCESS_SUBSET; |
| } |
| break; |
| |
| case PROCESS_SUBSET: |
| { |
| rc = 0; |
| retcode = umdSQSubsetTaskTcb_->work(rc); |
| if (retcode == 1) |
| return rc; |
| else if (retcode < 0) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = NEXT_SUBSET; |
| } |
| break; |
| |
| case NEXT_SUBSET: |
| { |
| hbaseAccessTdb().listOfScanRows()->advance(); |
| |
| if (! hbaseAccessTdb().listOfScanRows()->atEnd()) |
| { |
| step_ = SETUP_SUBSET; |
| break; |
| } |
| |
| step_ = SETUP_UNIQUE; |
| } |
| break; |
| |
| case SETUP_UNIQUE: |
| { |
| if (NOT tasks_[UMD_UNIQUE_TASK]) |
| { |
| step_ = SETUP_SUBSET_KEY; |
| break; |
| } |
| |
| hgr_ = |
| (ComTdbHbaseAccess::HbaseGetRows*)hbaseAccessTdb().listOfGetRows() |
| ->getCurr(); |
| |
| retcode = setupUniqueRowIdsAndCols(hgr_); |
| if (retcode == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = PROCESS_UNIQUE; |
| } |
| break; |
| |
| case PROCESS_UNIQUE: |
| { |
| rc = 0; |
| retcode = umdSQUniqueTaskTcb_->work(rc); |
| if (retcode == 1) |
| return rc; |
| else if (retcode < 0) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = NEXT_UNIQUE; |
| } |
| break; |
| |
| case NEXT_UNIQUE: |
| { |
| hbaseAccessTdb().listOfGetRows()->advance(); |
| |
| if (! hbaseAccessTdb().listOfGetRows()->atEnd()) |
| { |
| step_ = SETUP_UNIQUE; |
| break; |
| } |
| |
| step_ = SETUP_SUBSET_KEY; |
| } |
| break; |
| |
| case SETUP_SUBSET_KEY: |
| { |
| if (NOT tasks_[UMD_SUBSET_KEY_TASK]) |
| { |
| step_ = SETUP_UNIQUE_KEY; |
| break; |
| } |
| |
| retcode = setupSubsetKeysAndCols(); |
| if (retcode == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = PROCESS_SUBSET_KEY; |
| } |
| break; |
| |
| case PROCESS_SUBSET_KEY: |
| { |
| rc = 0; |
| retcode = umdSQSubsetTaskTcb_->work(rc); |
| if (retcode == 1) |
| return rc; |
| else if (retcode < 0) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = SETUP_UNIQUE_KEY; |
| } |
| break; |
| |
| case SETUP_UNIQUE_KEY: |
| { |
| if (NOT tasks_[UMD_UNIQUE_KEY_TASK]) |
| { |
| step_ = UMD_CLOSE; |
| break; |
| } |
| |
| retcode = setupUniqueKeyAndCols(TRUE); |
| if (retcode == -1) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = PROCESS_UNIQUE_KEY; |
| } |
| break; |
| |
| case PROCESS_UNIQUE_KEY: |
| { |
| rc = 0; |
| retcode = umdSQUniqueTaskTcb_->work(rc); |
| if (retcode == 1) |
| return rc; |
| else if (retcode < 0) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = UMD_CLOSE; |
| } |
| break; |
| |
| case UMD_CLOSE: |
| case UMD_CLOSE_NO_ERROR: |
| { |
| retcode = ehi_->close(); |
| if (step_ == UMD_CLOSE) |
| { |
| if (setupError(retcode, "ExpHbaseInterface::close")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| step_ = DONE; |
| } |
| break; |
| |
| case HANDLE_ERROR: |
| { |
| if (handleError(rc)) |
| return rc; |
| |
| retcode = ehi_->close(); |
| |
| step_ = DONE; |
| } |
| break; |
| |
| case DONE: |
| { |
| if (NOT hbaseAccessTdb().computeRowsAffected()) |
| matches_ = 0; |
| |
| if (handleDone(rc, matches_)) |
| return rc; |
| |
| if (umdSQSubsetTaskTcb_) |
| umdSQSubsetTaskTcb_->init(); |
| |
| if (umdSQUniqueTaskTcb_) |
| umdSQUniqueTaskTcb_->init(); |
| |
| step_ = NOT_STARTED; |
| } |
| break; |
| |
| } // switch |
| } // while |
| |
| return WORK_OK; |
| } |
| |
| ExHbaseAccessSQRowsetTcb::ExHbaseAccessSQRowsetTcb( |
| const ExHbaseAccessTdb &hbaseAccessTdb, |
| ex_globals * glob ) : |
| ExHbaseAccessTcb( hbaseAccessTdb, glob) |
| , step_(NOT_STARTED) |
| { |
| prevTailIndex_ = 0; |
| |
| nextRequest_ = qparent_.down->getHeadIndex(); |
| |
| numRetries_ = 0; |
| lastHandledStep_ = NOT_STARTED; |
| numRowsInVsbbBuffer_ = 0; |
| } |
| |
| Lng32 ExHbaseAccessSQRowsetTcb::setupUniqueKey() |
| { |
| ex_queue_entry *pentry_down = qparent_.down->getQueueEntry(nextRequest_); |
| |
| if (pentry_down->downState.request == ex_queue::GET_NOMORE |
| || pentry_down->downState.request == ex_queue::GET_EOD) |
| return 1; |
| |
| ex_expr::exp_return_type exprRetCode = ex_expr::EXPR_OK; |
| keyRangeEx::getNextKeyRangeReturnType keyRangeStatus; |
| |
| initNextKeyRange(pool_, pentry_down->getAtp()); |
| |
| keyRangeStatus = |
| keySubsetExeExpr_->getNextKeyRange(pentry_down->getAtp(), FALSE, TRUE); |
| |
| if (keyRangeStatus == keyRangeEx::EXPRESSION_ERROR) |
| return -1; |
| |
| tupp &keyData = keySubsetExeExpr_->getBkData(); |
| char * beginKeyRow = keyData.getDataPointer(); |
| HbaseStr rowIdRowText; |
| |
| if ((NOT hbaseAccessTdb().sqHbaseTable()) || |
| (hbaseAccessTdb().keyInVCformat())) { |
| // Key is in varchar format. |
| short keyLen = *(short*)beginKeyRow; |
| rowIdRowText.val = beginKeyRow + sizeof(short); |
| rowIdRowText.len = keyLen; |
| } |
| else { |
| rowIdRowText.val = beginKeyRow; |
| rowIdRowText.len = hbaseAccessTdb().keyLen_; |
| } |
| |
| if (keyRangeStatus == keyRangeEx::NO_MORE_RANGES) |
| { |
| // To ensure no row is found, add extra byte with "0" value |
| rowIdRowText.val[rowIdRowText.len] = '\0'; |
| rowIdRowText.len += 1; |
| } |
| copyRowIDToDirectBuffer(rowIdRowText); |
| return 0; |
| } |
| |
| |
| Lng32 ExHbaseAccessSQRowsetTcb::setupRowIds() |
| { |
| Lng32 retcode; |
| UInt16 rowsetMaxRows = hbaseAccessTdb().getHbaseRowsetVsbbSize(); |
| |
| queue_index tlindex = qparent_.down->getTailIndex(); |
| while (nextRequest_ != tlindex) { |
| retcode = setupUniqueKey(); |
| if (retcode != 0) |
| return retcode; |
| nextRequest_++; |
| // Don't buffer more than HBASE_ROWSET_VSBB_SIZE |
| if (numRowsInDirectBuffer() >= rowsetMaxRows) |
| return 1; |
| } |
| return 0; |
| } |
| |
| ExWorkProcRetcode ExHbaseAccessSQRowsetTcb::work() |
| { |
| Lng32 retcode = 0; |
| short rc = 0; |
| |
| ExMasterStmtGlobals *g = getGlobals()-> |
| castToExExeStmtGlobals()->castToExMasterStmtGlobals(); |
| |
| while (!qparent_.down->isEmpty()) |
| { |
| |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| if (pentry_down->downState.request == ex_queue::GET_NOMORE) |
| step_ = CLOSE_AND_DONE; |
| else if (pentry_down->downState.request == ex_queue::GET_EOD) { |
| if (numRowsInDirectBuffer() > 0) { |
| if (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::UPDATE_) |
| step_ = PROCESS_UPDATE_AND_CLOSE; |
| else if (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) |
| step_ = PROCESS_DELETE_AND_CLOSE; |
| else |
| ex_assert(0, "EOD and Select is not handled here"); |
| } |
| else |
| step_ = ALL_DONE; |
| } |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| matches_ = 0; |
| currRowNum_ = 0; |
| numRetries_ = 0; |
| |
| prevTailIndex_ = 0; |
| asyncCompleteRetryCount_ = 0; |
| asyncOperationTimeout_ = 1; |
| asyncOperation_ = hbaseAccessTdb().asyncOperations() && getTransactionIDFromContext(); |
| numRowsInVsbbBuffer_ = 0; |
| lastHandledStep_ = NOT_STARTED; |
| |
| nextRequest_ = qparent_.down->getHeadIndex(); |
| step_ = RS_INIT; |
| } |
| break; |
| |
| case RS_INIT: |
| { |
| retcode = ehi_->init(getHbaseAccessStats()); |
| if (setupError(retcode, "ExpHbaseInterface::init")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| table_.val = hbaseAccessTdb().getTableName(); |
| table_.len = strlen(hbaseAccessTdb().getTableName()); |
| |
| if (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::UPDATE_) |
| { |
| ExpTupleDesc * rowTD = |
| hbaseAccessTdb().workCriDesc_->getTupleDescriptor |
| (hbaseAccessTdb().updateTuppIndex_); |
| allocateDirectRowBufferForJNI(rowTD->numAttrs(), |
| hbaseAccessTdb().getHbaseRowsetVsbbSize()); |
| } |
| if (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::UPDATE_ |
| || hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::SELECT_ |
| || hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) |
| allocateDirectRowIDBufferForJNI(hbaseAccessTdb().getHbaseRowsetVsbbSize()); |
| |
| setupListOfColNames(hbaseAccessTdb().listOfFetchedColNames(), |
| columns_); |
| |
| if (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::SELECT_) |
| step_ = SETUP_SELECT; |
| else |
| step_ = SETUP_UMD; |
| } |
| break; |
| case SETUP_SELECT: |
| { |
| retcode = setupRowIds(); |
| switch (retcode) { |
| case 0: |
| if (qparent_.down->getLength() == 1) { |
| // only one row in the down queue. |
| // Before we send input buffer to hbase, give parent |
| // another chance in case there is more input data. |
| // If parent doesn't input any more data on second (or |
| // later) chances, then process the request. |
| if (numRetries_ == 3) { |
| numRetries_ = 0; |
| step_ = PROCESS_SELECT; |
| } else { |
| numRetries_++; |
| return WORK_CALL_AGAIN; |
| } |
| } |
| else |
| step_ = PROCESS_SELECT; |
| break; |
| case 1: |
| // Reached the max. number of rowIds |
| // Process the rowIds in the buffer |
| step_ = PROCESS_SELECT; |
| break; |
| default: |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| break; |
| case SETUP_UMD: |
| { |
| rc = evalInsDelPreCondExpr(); |
| if (rc == -1) { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (rc == 0) { // No need to delete |
| step_ = NEXT_ROW; |
| break; |
| } |
| rowIds_.clear(); |
| retcode = setupUniqueKeyAndCols(FALSE); |
| if (retcode == -1) { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| copyRowIDToDirectBuffer(rowIds_[0]); |
| |
| if ((hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) || |
| (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::SELECT_)) |
| step_ = NEXT_ROW; |
| else if (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::UPDATE_) |
| step_ = CREATE_UPDATED_ROW; |
| else |
| step_ = HANDLE_ERROR; |
| |
| } |
| break; |
| |
| case NEXT_ROW: |
| { |
| currRowNum_++; |
| if (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::SELECT_) { |
| // matches_ is set to 1 when the row is projected by moveRowToUpQueue |
| // to denote that there is a matching entry |
| matches_ = 0; |
| retcode = ehi_->nextRow(); |
| // EOR is end of result set for the current Rowset |
| // EOD is no data for the current row |
| // But EOD is never returned, instead HBASE_ACCESS_NO_ROW is returned |
| // when no row is found in CREATE_ROW step |
| if (retcode == HBASE_ACCESS_EOR) { |
| step_ = RS_CLOSE; |
| break; |
| } |
| if (retcode == HBASE_ACCESS_EOD) { |
| step_ = ROW_DONE; |
| break; |
| } |
| if (setupError(retcode, "ExpHbaseInterface::nextRow")) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = CREATE_ROW; |
| break; |
| } |
| matches_++; |
| if (numRowsInDirectBuffer() < hbaseAccessTdb().getHbaseRowsetVsbbSize()) { |
| step_ = DONE; |
| break; |
| } |
| if (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) |
| step_ = PROCESS_DELETE_AND_CLOSE; |
| else if (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::UPDATE_) |
| step_ = PROCESS_UPDATE_AND_CLOSE; |
| else |
| step_ = HANDLE_ERROR; |
| } |
| break; |
| case CREATE_ROW: |
| { |
| retcode = createSQRowDirect(); |
| if (retcode == HBASE_ACCESS_NO_ROW) { |
| step_ = ROW_DONE; |
| break; |
| } |
| if (retcode < 0) |
| { |
| rc = (short)retcode; |
| setupError(rc, "createSQRowDirect"); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (retcode != HBASE_ACCESS_SUCCESS) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| step_ = APPLY_PRED; |
| } |
| break; |
| case APPLY_PRED: |
| { |
| rc = applyPred(scanExpr()); |
| if (rc == 1) |
| step_ = RETURN_ROW; |
| else if (rc == -1) |
| step_ = HANDLE_ERROR; |
| else |
| step_ = ROW_DONE; |
| } |
| break; |
| case RETURN_ROW: |
| { |
| rc = 0; |
| if (moveRowToUpQueue(convertRow_, hbaseAccessTdb().convertRowLen(), |
| &rc, FALSE)) |
| return rc; |
| if (getHbaseAccessStats()) |
| getHbaseAccessStats()->incUsedRows(); |
| step_ = ROW_DONE; |
| } |
| break; |
| case PROCESS_DELETE_AND_CLOSE: |
| { |
| numRowsInVsbbBuffer_ = patchDirectRowIDBuffers(); |
| retcode = ehi_->deleteRows(table_, |
| hbaseAccessTdb().getRowIDLen(), |
| rowIDs_, |
| hbaseAccessTdb().useHbaseXn(), |
| -1, |
| asyncOperation_); |
| currRowNum_ = 0; |
| if (setupError(retcode, "ExpHbaseInterface::deleteRows")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (asyncOperation_) { |
| lastHandledStep_ = step_; |
| step_ = COMPLETE_ASYNC_OPERATION; |
| break; |
| } |
| if (getHbaseAccessStats()) { |
| getHbaseAccessStats()->lobStats()->numReadReqs++; |
| getHbaseAccessStats()->incUsedRows(numRowsInVsbbBuffer_); |
| } |
| step_ = RS_CLOSE; |
| } |
| break; |
| |
| case PROCESS_SELECT: |
| { |
| if (numRowsInDirectBuffer() > 0) { |
| numRowsInVsbbBuffer_ = patchDirectRowIDBuffers(); |
| retcode = ehi_->getRowsOpen( |
| table_, |
| hbaseAccessTdb().getRowIDLen(), |
| rowIDs_, |
| columns_); |
| currRowNum_ = 0; |
| if (setupError(retcode, "ExpHbaseInterface::getRowsOpen")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| step_ = NEXT_ROW; |
| |
| if (getHbaseAccessStats()) |
| { |
| getHbaseAccessStats()->lobStats()->numReadReqs++; |
| } |
| } |
| else |
| step_ = SETUP_SELECT; |
| } |
| break; |
| |
| case CREATE_UPDATED_ROW: |
| { |
| workAtp_->getTupp(hbaseAccessTdb().updateTuppIndex_) |
| .setDataPointer(updateRow_); |
| |
| if (updateExpr()) { |
| ex_expr::exp_return_type evalRetCode = |
| updateExpr()->eval(pentry_down->getAtp(), workAtp_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| step_ = EVAL_CONSTRAINT; |
| } |
| break; |
| case EVAL_CONSTRAINT: |
| { |
| rc = evalConstraintExpr(updConstraintExpr(), hbaseAccessTdb().updateTuppIndex_,updateRow_); |
| if (rc == 0) { |
| step_ = RS_CLOSE; |
| break; |
| } |
| else if (rc != 1) { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| retcode = createDirectRowBuffer( |
| hbaseAccessTdb().updateTuppIndex_, |
| updateRow_, |
| hbaseAccessTdb().listOfUpdatedColNames(), |
| hbaseAccessTdb().listOfOmittedColNames(), |
| TRUE); |
| if (retcode == -1) { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| step_ = NEXT_ROW; |
| } |
| break; |
| case PROCESS_UPDATE_AND_CLOSE: |
| { |
| numRowsInVsbbBuffer_ = patchDirectRowBuffers(); |
| |
| retcode = ehi_->insertRows(table_, |
| hbaseAccessTdb().getRowIDLen(), |
| rowIDs_, |
| rows_, |
| hbaseAccessTdb().useHbaseXn(), |
| -1, |
| asyncOperation_); |
| currRowNum_ = 0; |
| if (setupError(retcode, "ExpHbaseInterface::insertRows")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (asyncOperation_) { |
| lastHandledStep_ = step_; |
| step_ = COMPLETE_ASYNC_OPERATION; |
| break; |
| } |
| if (getHbaseAccessStats()) { |
| getHbaseAccessStats()->lobStats()->numReadReqs++; |
| getHbaseAccessStats()->incUsedRows(numRowsInVsbbBuffer_); |
| } |
| step_ = RS_CLOSE; |
| } |
| break; |
| case COMPLETE_ASYNC_OPERATION: |
| { |
| if (resultArray_ == NULL) |
| resultArray_ = new (getHeap()) NABoolean[hbaseAccessTdb().getHbaseRowsetVsbbSize()]; |
| Int32 timeout; |
| if (asyncCompleteRetryCount_ < 10) |
| timeout = -1; |
| else { |
| asyncOperationTimeout_ = asyncOperationTimeout_ * 2; |
| timeout = asyncOperationTimeout_; |
| } |
| retcode = ehi_->completeAsyncOperation(timeout, resultArray_, numRowsInVsbbBuffer_); |
| if (retcode == HBASE_RETRY_AGAIN) { |
| asyncCompleteRetryCount_++; |
| return WORK_CALL_AGAIN; |
| } |
| asyncCompleteRetryCount_ = 0; |
| if (setupError(retcode, "ExpHbaseInterface::completeAsyncOperation")) { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| for (int i = 0 ; i < numRowsInVsbbBuffer_; i++) { |
| if (resultArray_[i] == FALSE) { |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(8102)); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| if (step_ == HANDLE_ERROR) |
| break; |
| if (getHbaseAccessStats()) { |
| getHbaseAccessStats()->lobStats()->numReadReqs++; |
| getHbaseAccessStats()->incUsedRows(numRowsInVsbbBuffer_); |
| } |
| step_ = RS_CLOSE; |
| } |
| break; |
| case RS_CLOSE: |
| { |
| retcode = ehi_->close(); |
| if (setupError(retcode, "ExpHbaseInterface::close")) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::SELECT_) |
| step_ = NOT_STARTED; |
| else |
| step_ = ALL_DONE; |
| } |
| break; |
| |
| case HANDLE_ERROR: |
| { |
| if (handleError(rc)) |
| return rc; |
| step_ = CLOSE_AND_DONE; |
| } |
| break; |
| case ROW_DONE: |
| { |
| if (handleDone(rc, 0)) |
| return rc; |
| step_ = NEXT_ROW; |
| } |
| break; |
| case DONE: |
| case CLOSE_AND_DONE: |
| case ALL_DONE: |
| { |
| if (step_ == CLOSE_AND_DONE) |
| ehi_->close(); |
| if (NOT hbaseAccessTdb().computeRowsAffected()) |
| matches_ = 0; |
| |
| if ((step_ == DONE) && |
| (qparent_.down->getLength() == 1)) |
| { |
| // only one row in the down queue. |
| // Before we send input buffer to hbase, give parent |
| // another chance in case there is more input data. |
| // If parent doesn't input any more data on second (or |
| // later) chances, then process the request. |
| if (numRetries_ == 3 || numRowsInDirectBuffer() > 1) |
| { |
| numRetries_ = 0; |
| |
| // Delete/update the current batch and then done. |
| if (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::DELETE_) |
| step_ = PROCESS_DELETE_AND_CLOSE; |
| else if (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::UPDATE_) |
| step_ = PROCESS_UPDATE_AND_CLOSE; |
| else |
| { |
| ex_assert(false, "DONE state is invalid in Rowset SELECT"); |
| } |
| break; |
| } |
| numRetries_++; |
| return WORK_CALL_AGAIN; |
| } |
| |
| if (handleDone(rc, (step_ == ALL_DONE ? matches_ : 0))) |
| return rc; |
| |
| if (step_ == DONE) |
| step_ = SETUP_UMD; |
| else |
| step_ = NOT_STARTED; |
| } |
| break; |
| } // switch |
| |
| } // while |
| if (qparent_.down->isEmpty() |
| && (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::SELECT_)) { |
| ehi_->close(); |
| step_ = NOT_STARTED; |
| } |
| |
| return WORK_OK; |
| } |
| |