| // ********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // ********************************************************************** |
| |
| #include "Platform.h" |
| |
| #include <stdint.h> |
| #include <stdio.h> |
| #include <unistd.h> |
| #include <sys/time.h> |
| #include <poll.h> |
| |
| #include <iostream> |
| |
| #include "ex_stdh.h" |
| #include "ComTdb.h" |
| #include "ex_tcb.h" |
| #include "ExHdfsScan.h" |
| #include "ex_exe_stmt_globals.h" |
| #include "ExpLOBinterface.h" |
| #include "SequenceFileReader.h" |
| #include "Hbase_types.h" |
| #include "stringBuf.h" |
| #include "NLSConversion.h" |
| #include "Context.h" |
| |
| #include "ExpORCinterface.h" |
| #include "ComSmallDefs.h" |
| #include "HdfsClient_JNI.h" |
| |
| ex_tcb * ExHdfsScanTdb::build(ex_globals * glob) |
| { |
| ExExeStmtGlobals * exe_glob = glob->castToExExeStmtGlobals(); |
| |
| ex_assert(exe_glob,"This operator cannot be in DP2"); |
| |
| ExHdfsScanTcb *tcb = NULL; |
| |
| if ((isTextFile()) || (isSequenceFile())) |
| { |
| tcb = new(exe_glob->getSpace()) |
| ExHdfsScanTcb( |
| *this, |
| exe_glob); |
| } |
| else if (isOrcFile()) |
| { |
| tcb = new(exe_glob->getSpace()) |
| ExOrcScanTcb( |
| *this, |
| exe_glob); |
| } |
| |
| ex_assert(tcb, "Error building ExHdfsScanTcb."); |
| |
| return (tcb); |
| } |
| |
| ex_tcb * ExOrcFastAggrTdb::build(ex_globals * glob) |
| { |
| ExHdfsScanTcb *tcb = NULL; |
| tcb = new(glob->getSpace()) |
| ExOrcFastAggrTcb( |
| *this, |
| glob); |
| |
| ex_assert(tcb, "Error building ExHdfsScanTcb."); |
| |
| return (tcb); |
| } |
| |
| //////////////////////////////////////////////////////////////// |
| // Constructor and initialization. |
| //////////////////////////////////////////////////////////////// |
| |
| ExHdfsScanTcb::ExHdfsScanTcb( |
| const ComTdbHdfsScan &hdfsScanTdb, |
| ex_globals * glob ) : |
| ex_tcb( hdfsScanTdb, 1, glob) |
| , workAtp_(NULL) |
| , bytesLeft_(0) |
| , hdfsScanBuffer_(NULL) |
| , hdfsBufNextRow_(NULL) |
| , hdfsLoggingRow_(NULL) |
| , hdfsLoggingRowEnd_(NULL) |
| , debugPrevRow_(NULL) |
| , hdfsSqlBuffer_(NULL) |
| , hdfsSqlData_(NULL) |
| , pool_(NULL) |
| , step_(NOT_STARTED) |
| , matches_(0) |
| , matchBrkPoint_(0) |
| , endOfRequestedRange_(NULL) |
| , sequenceFileReader_(NULL) |
| , seqScanAgain_(false) |
| , hdfo_(NULL) |
| , numBytesProcessedInRange_(0) |
| , exception_(FALSE) |
| , checkRangeDelimiter_(FALSE) |
| , dataModCheckDone_(FALSE) |
| , loggingErrorDiags_(NULL) |
| , loggingFileName_(NULL) |
| , logFileHdfsClient_(NULL) |
| , hdfsClient_(NULL) |
| , hdfsScan_(NULL) |
| , hdfsStats_(NULL) |
| , hdfsFileInfoListAsArray_(glob->getDefaultHeap(), hdfsScanTdb.getHdfsFileInfoList()->numEntries()) |
| |
| { |
| Space * space = (glob ? glob->getSpace() : 0); |
| CollHeap * heap = (glob ? glob->getDefaultHeap() : 0); |
| useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan(); |
| if (isSequenceFile() || hdfsScanTdb.isCompressedFile()) |
| useLibhdfsScan_ = TRUE; |
| lobGlob_ = NULL; |
| hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_; |
| headRoom_ = (Int32)hdfsScanTdb.rangeTailIOSize_; |
| |
| if (useLibhdfsScan_) { |
| hdfsScanBuffer_ = new(heap) char[ hdfsScanBufMaxSize_ + 1 ]; |
| hdfsScanBuffer_[hdfsScanBufMaxSize_] = '\0'; |
| } else { |
| hdfsScanBufBacking_[0] = new (heap) BYTE[hdfsScanBufMaxSize_ + 2 * (headRoom_)]; |
| hdfsScanBufBacking_[1] = new (heap) BYTE[hdfsScanBufMaxSize_ + 2 * (headRoom_)]; |
| for (int i=0; i < 2; i++) { |
| BYTE *hdfsScanBufBacking = hdfsScanBufBacking_[i]; |
| hdfsScanBuf_[i].headRoom_ = hdfsScanBufBacking; |
| hdfsScanBuf_[i].buf_ = hdfsScanBufBacking + headRoom_; |
| } |
| bufBegin_ = NULL; |
| bufEnd_ = NULL; |
| bufLogicalEnd_ = NULL; |
| headRoomCopied_ = 0; |
| prevRangeNum_ = -1; |
| currRangeBytesRead_ = 0; |
| recordSkip_ = FALSE; |
| } |
| moveExprColsBuffer_ = new(space) ExSimpleSQLBuffer( 1, // one row |
| (Int32)hdfsScanTdb.moveExprColsRowLength_, |
| space); |
| short error = moveExprColsBuffer_->getFreeTuple(moveExprColsTupp_); |
| ex_assert((error == 0), "get_free_tuple cannot hold a row."); |
| moveExprColsData_ = moveExprColsTupp_.getDataPointer(); |
| |
| |
| hdfsSqlBuffer_ = new(space) ExSimpleSQLBuffer( 1, // one row |
| (Int32)hdfsScanTdb.hdfsSqlMaxRecLen_, |
| space); |
| error = hdfsSqlBuffer_->getFreeTuple(hdfsSqlTupp_); |
| ex_assert((error == 0), "get_free_tuple cannot hold a row."); |
| hdfsSqlData_ = hdfsSqlTupp_.getDataPointer(); |
| |
| hdfsAsciiSourceBuffer_ = new(space) ExSimpleSQLBuffer( 1, // one row |
| (Int32)hdfsScanTdb.asciiRowLen_ * 2, // just in case |
| space); |
| error = hdfsAsciiSourceBuffer_->getFreeTuple(hdfsAsciiSourceTupp_); |
| ex_assert((error == 0), "get_free_tuple cannot hold a row."); |
| hdfsAsciiSourceData_ = hdfsAsciiSourceTupp_.getDataPointer(); |
| |
| pool_ = new(space) |
| sql_buffer_pool(hdfsScanTdb.numBuffers_, |
| hdfsScanTdb.bufferSize_, |
| space, |
| ((ExHdfsScanTdb &)hdfsScanTdb).denseBuffers() ? |
| SqlBufferBase::DENSE_ : SqlBufferBase::NORMAL_); |
| |
| |
| pool_->setStaticMode(TRUE); |
| |
| |
| defragTd_ = NULL; |
| // removing the cast produce a compile error |
| if (((ExHdfsScanTdb &)hdfsScanTdb).useCifDefrag()) |
| { |
| defragTd_ = pool_->addDefragTuppDescriptor(hdfsScanTdb.outputRowLength_); |
| } |
| // Allocate the queue to communicate with parent |
| allocateParentQueues(qparent_); |
| |
| workAtp_ = allocateAtp(hdfsScanTdb.workCriDesc_, space); |
| |
| // fixup expressions |
| if (selectPred()) |
| selectPred()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob); |
| if (moveExpr()) |
| moveExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob); |
| if (convertExpr()) |
| convertExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob); |
| if (moveColsConvertExpr()) |
| moveColsConvertExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob); |
| |
| |
| // Register subtasks with the scheduler |
| registerSubtasks(); |
| registerResizeSubtasks(); |
| |
| Lng32 fileNum = getGlobals()->castToExExeStmtGlobals()->getMyInstanceNumber(); |
| ExHbaseAccessTcb::buildLoggingFileName((NAHeap *)getHeap(), ((ExHdfsScanTdb &)hdfsScanTdb).getLoggingLocation(), |
| ((ExHdfsScanTdb &)hdfsScanTdb).tableName(), |
| "hive_scan_err", |
| fileNum, |
| loggingFileName_); |
| loggingFileCreated_ = FALSE; |
| |
| |
| //shoud be move to work method |
| int jniDebugPort = 0; |
| int jniDebugTimeout = 0; |
| ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(), |
| (char*)"", //Later replace with server cqd |
| (char*)""); |
| ex_assert(ehi_ != NULL, "Internal error: ehi_ is null in ExHdfsScan"); |
| HDFS_Client_RetCode hdfsClientRetcode; |
| hdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), NULL, hdfsClientRetcode); |
| ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error"); |
| // Populate the hdfsInfo list into an array to gain o(1) lookup access |
| Queue* hdfsInfoList = hdfsScanTdb.getHdfsFileInfoList(); |
| if ( hdfsInfoList && hdfsInfoList->numEntries() > 0 ) |
| { |
| hdfsInfoList->position(); |
| int i = 0; |
| HdfsFileInfo* fInfo = NULL; |
| while ((fInfo = (HdfsFileInfo*)hdfsInfoList->getNext()) != NULL) |
| { |
| hdfsFileInfoListAsArray_.insertAt(i, fInfo); |
| i++; |
| } |
| } |
| } |
| |
| ExHdfsScanTcb::~ExHdfsScanTcb() |
| { |
| freeResources(); |
| } |
| |
| void ExHdfsScanTcb::freeResources() |
| { |
| if (loggingFileName_ != NULL) { |
| NADELETEBASIC(loggingFileName_, getHeap()); |
| loggingFileName_ = NULL; |
| } |
| if (workAtp_) |
| { |
| workAtp_->release(); |
| deallocateAtp(workAtp_, getSpace()); |
| workAtp_ = NULL; |
| } |
| if (hdfsScanBuffer_ ) |
| { |
| NADELETEBASIC(hdfsScanBuffer_, getHeap()); |
| hdfsScanBuffer_ = NULL; |
| } |
| if (hdfsAsciiSourceBuffer_) |
| { |
| NADELETEBASIC(hdfsAsciiSourceBuffer_, getSpace()); |
| hdfsAsciiSourceBuffer_ = NULL; |
| } |
| if(sequenceFileReader_) |
| { |
| NADELETE(sequenceFileReader_,SequenceFileReader, getHeap()); |
| sequenceFileReader_ = NULL; |
| } |
| |
| // hdfsSqlTupp_.release() ; // ??? |
| if (hdfsSqlBuffer_) |
| { |
| delete hdfsSqlBuffer_; |
| hdfsSqlBuffer_ = NULL; |
| } |
| if (moveExprColsBuffer_) |
| { |
| delete moveExprColsBuffer_; |
| moveExprColsBuffer_ = NULL; |
| } |
| if (pool_) |
| { |
| delete pool_; |
| pool_ = NULL; |
| } |
| if (qparent_.up) |
| { |
| delete qparent_.up; |
| qparent_.up = NULL; |
| } |
| if (qparent_.down) |
| { |
| delete qparent_.down; |
| qparent_.down = NULL; |
| } |
| deallocateRuntimeRanges(); |
| if (lobGlob_) { |
| ExpLOBinterfaceCleanup(lobGlob_); |
| lobGlob_ = NULL; |
| } |
| if (hdfsClient_ != NULL) |
| NADELETE(hdfsClient_, HdfsClient, getHeap()); |
| if (logFileHdfsClient_ != NULL) |
| NADELETE(logFileHdfsClient_, HdfsClient, getHeap()); |
| if (hdfsScan_ != NULL) |
| NADELETE(hdfsScan_, HdfsScan, getHeap()); |
| } |
| NABoolean ExHdfsScanTcb::needStatsEntry() |
| { |
| // stats are collected for ALL and OPERATOR options. |
| if ((getGlobals()->getStatsArea()->getCollectStatsType() == |
| ComTdb::ALL_STATS) || |
| (getGlobals()->getStatsArea()->getCollectStatsType() == |
| ComTdb::OPERATOR_STATS)) |
| return TRUE; |
| else if ( getGlobals()->getStatsArea()->getCollectStatsType() == ComTdb::PERTABLE_STATS) |
| return TRUE; |
| else |
| return FALSE; |
| } |
| |
| ExOperStats * ExHdfsScanTcb::doAllocateStatsEntry(CollHeap *heap, |
| ComTdb *tdb) |
| { |
| ExEspStmtGlobals *espGlobals = getGlobals()->castToExExeStmtGlobals()->castToExEspStmtGlobals(); |
| StmtStats *ss; |
| if (espGlobals != NULL) |
| ss = espGlobals->getStmtStats(); |
| else |
| ss = getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->getStatement()->getStmtStats(); |
| |
| hdfsStats_ = new(heap) ExHdfsScanStats(heap, |
| this, |
| tdb); |
| if (ss != NULL) |
| hdfsStats_->setQueryId(ss->getQueryId(), ss->getQueryIdLen()); |
| return hdfsStats_; |
| } |
| |
| void ExHdfsScanTcb::registerSubtasks() |
| { |
| ExScheduler *sched = getGlobals()->getScheduler(); |
| |
| sched->registerInsertSubtask(sWork, this, qparent_.down,"PD"); |
| sched->registerUnblockSubtask(sWork, this, qparent_.up, "PU"); |
| sched->registerCancelSubtask(sWork, this, qparent_.down,"CN"); |
| |
| } |
| |
| ex_tcb_private_state *ExHdfsScanTcb::allocatePstates( |
| Lng32 &numElems, // inout, desired/actual elements |
| Lng32 &pstateLength) // out, length of one element |
| { |
| PstateAllocator<ex_tcb_private_state> pa; |
| return pa.allocatePstates(this, numElems, pstateLength); |
| } |
| |
| Int32 ExHdfsScanTcb::fixup() |
| { |
| lobGlob_ = NULL; |
| |
| ExpLOBinterfaceInit |
| (lobGlob_, (NAHeap *)getGlobals()->getDefaultHeap(),getGlobals()->castToExExeStmtGlobals()->getContext(),TRUE, hdfsScanTdb().hostName_,hdfsScanTdb().port_); |
| |
| return 0; |
| } |
| |
| void brkpoint() |
| {} |
| |
| short ExHdfsScanTcb::setupError(Lng32 exeError, Lng32 retcode, |
| const char * str, const char * str2, const char * str3) |
| { |
| // Make sure retcode is positive. |
| if (retcode < 0) |
| retcode = -retcode; |
| |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| |
| Lng32 intParam1 = retcode; |
| Lng32 intParam2 = 0; |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(exeError), NULL, &intParam1, |
| &intParam2, NULL, |
| (str ? (char*)str : (char*)" "), |
| (str2 ? (char*)str2 : (char*)" "), |
| (str3 ? (char*)str3 : (char*)" ")); |
| |
| pentry_down->setDiagsArea(diagsArea); |
| return -1; |
| } |
| |
| ExWorkProcRetcode ExHdfsScanTcb::work() |
| { |
| Lng32 retcode = 0; |
| SFR_RetCode sfrRetCode = SFR_OK; |
| char *errorDesc = NULL; |
| char cursorId[8]; |
| HdfsFileInfo *hdfo = NULL; |
| Lng32 openType = 0; |
| int changedLen = 0; |
| ContextCli *currContext = getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext(); |
| Int32 hdfsErrorDetail = 0;//this is errno returned form underlying hdfsOpenFile call. |
| HDFS_Scan_RetCode hdfsScanRetCode; |
| |
| while (!qparent_.down->isEmpty()) |
| { |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| if (pentry_down->downState.request == ex_queue::GET_NOMORE && step_ != DONE) |
| { |
| if (! useLibhdfsScan_) |
| step_ = STOP_HDFS_SCAN; |
| } |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| matches_ = 0; |
| beginRangeNum_ = -1; |
| numRanges_ = -1; |
| hdfsOffset_ = 0; |
| checkRangeDelimiter_ = FALSE; |
| if (getStatsEntry()) |
| hdfsStats_ = getStatsEntry()->castToExHdfsScanStats(); |
| dataModCheckDone_ = FALSE; |
| myInstNum_ = getGlobals()->getMyInstanceNumber(); |
| hdfsScanBufMaxSize_ = hdfsScanTdb().hdfsBufSize_; |
| |
| if (hdfsScanTdb().getAssignRangesAtRuntime()) |
| { |
| step_ = ASSIGN_RANGES_AT_RUNTIME; |
| break; |
| } |
| else if (getHdfsFileInfoListAsArray().isEmpty()) |
| { |
| step_ = CHECK_FOR_DATA_MOD_AND_DONE; |
| break; |
| } |
| |
| beginRangeNum_ = |
| *(Lng32*)hdfsScanTdb().getHdfsFileRangeBeginList()->get(myInstNum_); |
| |
| numRanges_ = |
| *(Lng32*)hdfsScanTdb().getHdfsFileRangeNumList()->get(myInstNum_); |
| |
| currRangeNum_ = beginRangeNum_; |
| |
| if (numRanges_ > 0) |
| step_ = CHECK_FOR_DATA_MOD; |
| else |
| step_ = CHECK_FOR_DATA_MOD_AND_DONE; |
| } |
| break; |
| |
| case ASSIGN_RANGES_AT_RUNTIME: |
| computeRangesAtRuntime(); |
| currRangeNum_ = beginRangeNum_; |
| if (numRanges_ > 0) { |
| if (useLibhdfsScan_) |
| step_ = INIT_HDFS_CURSOR; |
| else |
| step_ = SETUP_HDFS_SCAN; |
| } |
| else |
| step_ = DONE; |
| break; |
| |
| case CHECK_FOR_DATA_MOD: |
| case CHECK_FOR_DATA_MOD_AND_DONE: |
| { |
| char * dirPath = hdfsScanTdb().hdfsRootDir_; |
| Int64 modTS = hdfsScanTdb().modTSforDir_; |
| if ((dirPath == NULL) || (modTS == -1)) |
| dataModCheckDone_ = TRUE; |
| |
| if (NOT dataModCheckDone_) |
| { |
| dataModCheckDone_ = TRUE; |
| |
| Lng32 numOfPartLevels = hdfsScanTdb().numOfPartCols_; |
| |
| if (hdfsScanTdb().hdfsDirsToCheck()) |
| { |
| // TBD |
| } |
| |
| Int64 failedModTS = -1; |
| Lng32 failedLocBufLen = 1000; |
| char failedLocBuf[failedLocBufLen]; |
| retcode = ExpLOBinterfaceDataModCheck |
| (lobGlob_, |
| dirPath, |
| hdfsScanTdb().hostName_, |
| hdfsScanTdb().port_, |
| modTS, |
| numOfPartLevels, |
| failedModTS, |
| failedLocBuf, failedLocBufLen); |
| |
| if (retcode < 0) |
| { |
| Lng32 cliError = 0; |
| |
| Lng32 intParam1 = -retcode; |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(EXE_ERROR_FROM_LOB_INTERFACE), |
| NULL, &intParam1, |
| &cliError, |
| NULL, |
| "HDFS", |
| (char*)"ExpLOBInterfaceDataModCheck", |
| getLobErrStr(intParam1)); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR_AND_DONE; |
| break; |
| } |
| |
| if (retcode == 1) // check failed |
| { |
| char errStr[200]; |
| str_sprintf(errStr, "genModTS = %ld, failedModTS = %ld", |
| modTS, failedModTS); |
| |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(EXE_HIVE_DATA_MOD_CHECK_ERROR), NULL, |
| NULL, NULL, NULL, |
| errStr); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR_AND_DONE; |
| break; |
| } |
| } |
| |
| if (step_ == CHECK_FOR_DATA_MOD_AND_DONE) |
| step_ = DONE; |
| else { |
| if (useLibhdfsScan_) |
| step_ = INIT_HDFS_CURSOR; |
| else |
| step_ = SETUP_HDFS_SCAN; |
| } |
| } |
| break; |
| case SETUP_HDFS_SCAN: |
| { |
| if (hdfsScan_ != NULL) |
| NADELETE(hdfsScan_, HdfsScan, getHeap()); |
| if (hdfsFileInfoListAsArray_.entries() == 0) { |
| step_ = DONE; |
| break; |
| } |
| hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_, |
| &hdfsFileInfoListAsArray_, beginRangeNum_, numRanges_, hdfsScanTdb().rangeTailIOSize_, |
| hdfsStats_, hdfsScanRetCode); |
| if (hdfsScanRetCode != HDFS_SCAN_OK) { |
| setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN", |
| GetCliGlobals()->getJniErrorStr(), NULL); |
| step_ = HANDLE_ERROR_AND_DONE; |
| break; |
| } |
| bufBegin_ = NULL; |
| bufEnd_ = NULL; |
| bufLogicalEnd_ = NULL; |
| headRoomCopied_ = 0; |
| prevRangeNum_ = -1; |
| currRangeBytesRead_ = 0; |
| recordSkip_ = FALSE; |
| extraBytesRead_ = 0; |
| step_ = TRAF_HDFS_READ; |
| } |
| break; |
| case TRAF_HDFS_READ: |
| { |
| hdfsScanRetCode = hdfsScan_->trafHdfsRead(retArray_, sizeof(retArray_)/sizeof(int)); |
| if (hdfsScanRetCode == HDFS_SCAN_EOR) { |
| step_ = DONE; |
| break; |
| } |
| else if (hdfsScanRetCode != HDFS_SCAN_OK) { |
| setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN", |
| GetCliGlobals()->getJniErrorStr(), NULL); |
| step_ = HANDLE_ERROR_AND_DONE; |
| break; |
| } |
| hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]); |
| bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED]; |
| if (retArray_[RANGE_NO] != prevRangeNum_) { |
| currRangeBytesRead_ = retArray_[BYTES_COMPLETED]; |
| bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_; |
| if (hdfo->getStartOffset() == 0) |
| recordSkip_ = FALSE; |
| else |
| recordSkip_ = TRUE; |
| } else { |
| // Throw away the rest of the data when done with the current range |
| if (currRangeBytesRead_ > hdfo->getBytesToRead()) { |
| step_ = TRAF_HDFS_READ; |
| break; |
| } |
| currRangeBytesRead_ += retArray_[BYTES_COMPLETED]; |
| bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - headRoomCopied_; |
| recordSkip_ = FALSE; |
| } |
| if (currRangeBytesRead_ > hdfo->getBytesToRead()) |
| extraBytesRead_ = currRangeBytesRead_ - hdfo->getBytesToRead(); |
| else |
| extraBytesRead_ = 0; |
| // headRoom_ is the number of extra bytes to be read (rangeTailIOSize) |
| // If EOF is reached while reading the range and the extraBytes read |
| // is less than headRoom_ then process all the data till EOF |
| // TODO: If the whole range fits in one buffer, it is need too to process rows till EOF for the last range alone |
| // No easy way to identify that last range read, but can identify that it is not the first range. |
| // The rows could be read more than once if there are more than 2 ranges. |
| // Fix optimizer not to have more than 2 ranges in that case |
| if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_ && hdfo->getStartOffset() != 0) |
| extraBytesRead_ = 0; |
| bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - extraBytesRead_; |
| prevRangeNum_ = retArray_[RANGE_NO]; |
| headRoomCopied_ = 0; |
| if (recordSkip_) { |
| hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_, |
| hdfsScanTdb().recordDelimiter_, |
| (char *)bufEnd_, |
| checkRangeDelimiter_, |
| hdfsScanTdb().getHiveScanMode(), &changedLen); |
| if (hdfsBufNextRow_ == NULL) { |
| setupError(8446, 0, "No record delimiter found in buffer from hdfsRead", |
| NULL, NULL); |
| step_ = HANDLE_ERROR_AND_DONE; |
| break; |
| } |
| //add changedLen since hdfs_strchr will remove the pointer ahead to remove the \r |
| hdfsBufNextRow_ += 1 + changedLen; // point past record delimiter. |
| } |
| else |
| hdfsBufNextRow_ = (char *)bufBegin_; |
| step_ = PROCESS_HDFS_ROW; |
| } |
| break; |
| case COPY_TAIL_TO_HEAD: |
| { |
| BYTE *headRoomStartAddr; |
| headRoomCopied_ = bufLogicalEnd_ - (BYTE *)hdfsBufNextRow_; |
| if (retArray_[BUF_NO] == 0) |
| headRoomStartAddr = hdfsScanBuf_[1].buf_ - headRoomCopied_; |
| else |
| headRoomStartAddr = hdfsScanBuf_[0].buf_ - headRoomCopied_; |
| memcpy(headRoomStartAddr, hdfsBufNextRow_, headRoomCopied_); |
| step_ = TRAF_HDFS_READ; |
| } |
| break; |
| case STOP_HDFS_SCAN: |
| { |
| hdfsScanRetCode = hdfsScan_->stop(); |
| if (hdfsScanRetCode != HDFS_SCAN_OK) { |
| setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "HdfsScan::stop", |
| GetCliGlobals()->getJniErrorStr(), NULL); |
| step_ = HANDLE_ERROR_AND_DONE; |
| } |
| step_ = DONE; |
| } |
| break; |
| case INIT_HDFS_CURSOR: |
| { |
| hdfo_ = getRange(currRangeNum_); |
| if ((hdfo_->getBytesToRead() == 0) && |
| (beginRangeNum_ == currRangeNum_) && (numRanges_ > 1)) |
| { |
| // skip the first range if it has 0 bytes to read |
| // doing this for subsequent ranges is more complex |
| // since the file may neeed to be closed. The first |
| // range being 0 is common with sqoop generated files |
| currRangeNum_++; |
| hdfo_ = getRange(currRangeNum_); |
| } |
| |
| hdfsOffset_ = hdfo_->getStartOffset(); |
| bytesLeft_ = hdfo_->getBytesToRead(); |
| |
| hdfsFileName_ = hdfo_->fileName(); |
| sprintf(cursorId_, "%d", currRangeNum_); |
| stopOffset_ = hdfsOffset_ + hdfo_->getBytesToRead(); |
| |
| |
| step_ = OPEN_HDFS_CURSOR; |
| } |
| |
| break; |
| |
| case OPEN_HDFS_CURSOR: |
| { |
| retcode = 0; |
| if (isSequenceFile() && !sequenceFileReader_) |
| { |
| sequenceFileReader_ = new(getHeap()) |
| SequenceFileReader((NAHeap *)getHeap()); |
| sfrRetCode = sequenceFileReader_->init(); |
| |
| if (sfrRetCode != JNI_OK) |
| { |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, (ExeErrorCode)(8447), NULL, |
| NULL, NULL, NULL, sequenceFileReader_->getErrorText(sfrRetCode), NULL); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| |
| if (isSequenceFile()) |
| { |
| sfrRetCode = sequenceFileReader_->open(hdfsFileName_); |
| |
| if (sfrRetCode == JNI_OK) |
| { |
| // Seek to start offset |
| sfrRetCode = sequenceFileReader_->seeknSync(hdfsOffset_); |
| } |
| |
| if (sfrRetCode != JNI_OK) |
| { |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, (ExeErrorCode)(8447), NULL, |
| NULL, NULL, NULL, sequenceFileReader_->getErrorText(sfrRetCode), NULL); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| else |
| { |
| Int64 rangeTail = hdfo_->fileIsSplitEnd() ? |
| hdfsScanTdb().rangeTailIOSize_ : 0; |
| openType = 2; // must open |
| retcode = ExpLOBInterfaceSelectCursor |
| (lobGlob_, |
| hdfsFileName_, //hdfsScanTdb().hdfsFileName_, |
| NULL, //(char*)"", |
| (Lng32)Lob_External_HDFS_File, |
| hdfsScanTdb().hostName_, |
| hdfsScanTdb().port_, |
| 0, NULL, // handle not valid for non lob access |
| bytesLeft_ + rangeTail, // max bytes |
| cursorId_, |
| |
| requestTag_, Lob_Memory, |
| 0, // not check status |
| (NOT hdfsScanTdb().hdfsPrefetch()), //1, // waited op |
| |
| hdfsOffset_, |
| hdfsScanBufMaxSize_, |
| bytesRead_, |
| NULL, |
| 1, // open |
| openType, // |
| &hdfsErrorDetail |
| ); |
| |
| if ((retcode < 0) && |
| ((hdfsErrorDetail == ENOENT) || (hdfsErrorDetail == EAGAIN))) |
| { |
| ComDiagsArea * diagsArea = NULL; |
| if (hdfsErrorDetail == ENOENT) |
| { |
| char errBuf[strlen(hdfsScanTdb().tableName()) + |
| strlen(hdfsFileName_) + 100]; |
| snprintf(errBuf, sizeof(errBuf),"%s (fileLoc: %s)", |
| hdfsScanTdb().tableName(), hdfsFileName_); |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(EXE_TABLE_NOT_FOUND), NULL, |
| NULL, NULL, NULL, |
| errBuf); |
| } |
| else |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(EXE_HIVE_DATA_MOD_CHECK_ERROR)); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR_AND_DONE; |
| break; |
| } |
| |
| // preopen next range. |
| if ( (currRangeNum_ + 1) < (beginRangeNum_ + numRanges_) ) |
| { |
| hdfo = getRange(currRangeNum_ + 1); |
| |
| hdfsFileName_ = hdfo->fileName(); |
| sprintf(cursorId, "%d", currRangeNum_ + 1); |
| rangeTail = hdfo->fileIsSplitEnd() ? |
| hdfsScanTdb().rangeTailIOSize_ : 0; |
| |
| openType = 1; // preOpen |
| |
| retcode = ExpLOBInterfaceSelectCursor |
| (lobGlob_, |
| hdfsFileName_, //hdfsScanTdb().hdfsFileName_, |
| NULL, //(char*)"", |
| (Lng32)Lob_External_HDFS_File, |
| hdfsScanTdb().hostName_, |
| hdfsScanTdb().port_, |
| 0, NULL,//handle not relevant for non lob access |
| hdfo->getBytesToRead() + rangeTail, // max bytes |
| cursorId, |
| |
| requestTag_, Lob_Memory, |
| 0, // not check status |
| (NOT hdfsScanTdb().hdfsPrefetch()), //1, // waited op |
| |
| hdfo->getStartOffset(), |
| hdfsScanBufMaxSize_, |
| bytesRead_, |
| NULL, |
| 1,// open |
| openType, |
| &hdfsErrorDetail |
| ); |
| |
| hdfsFileName_ = hdfo_->fileName(); |
| } |
| } |
| |
| if (retcode < 0) |
| { |
| Lng32 cliError = 0; |
| |
| Lng32 intParam1 = -retcode; |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(EXE_ERROR_FROM_LOB_INTERFACE), NULL, |
| &intParam1, |
| &hdfsErrorDetail, |
| NULL, |
| "HDFS", |
| (char*)"ExpLOBInterfaceSelectCursor/open", |
| getLobErrStr(intParam1)); |
| |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| trailingPrevRead_ = 0; |
| firstBufOfFile_ = true; |
| numBytesProcessedInRange_ = 0; |
| |
| step_ = GET_HDFS_DATA; |
| } |
| break; |
| |
| case GET_HDFS_DATA: |
| { |
| Int64 bytesToRead = hdfsScanBufMaxSize_ - trailingPrevRead_; |
| ex_assert(bytesToRead >= 0, "bytesToRead less than zero."); |
| if (hdfo_->fileIsSplitEnd() && !isSequenceFile()) |
| { |
| if (bytesLeft_ > 0) |
| bytesToRead = min(bytesToRead, |
| (bytesLeft_ + hdfsScanTdb().rangeTailIOSize_)); |
| else |
| bytesToRead = hdfsScanTdb().rangeTailIOSize_; |
| } |
| else |
| { |
| ex_assert(bytesLeft_ >= 0, "Bad assumption at e-o-f"); |
| if (bytesToRead > bytesLeft_ + |
| 1 // plus one for end-of-range files with no |
| // record delimiter at eof. |
| ) |
| bytesToRead = bytesLeft_ + 1; |
| } |
| |
| ex_assert(bytesToRead + trailingPrevRead_ <= hdfsScanBufMaxSize_, |
| "too many bites."); |
| |
| if (hdfsStats_) |
| hdfsStats_->getHdfsTimer().start(); |
| |
| retcode = 0; |
| |
| if (isSequenceFile()) |
| { |
| sfrRetCode = sequenceFileReader_->fetchRowsIntoBuffer(stopOffset_, |
| hdfsScanBuffer_, |
| hdfsScanBufMaxSize_, //bytesToRead, |
| bytesRead_, |
| hdfsScanTdb().recordDelimiter_); |
| |
| if (sfrRetCode != JNI_OK && sfrRetCode != SFR_NOMORE) |
| { |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, (ExeErrorCode)(8447), NULL, |
| NULL, NULL, NULL, sequenceFileReader_->getErrorText(sfrRetCode), NULL); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR_WITH_CLOSE; |
| break; |
| } |
| else |
| { |
| seqScanAgain_ = (sfrRetCode != SFR_NOMORE); |
| } |
| } |
| else |
| { |
| Int32 hdfsErrorDetail = 0;///this is the errno returned from the underlying hdfs call. |
| retcode = ExpLOBInterfaceSelectCursor |
| (lobGlob_, |
| hdfsFileName_, |
| NULL, |
| (Lng32)Lob_External_HDFS_File, |
| hdfsScanTdb().hostName_, |
| hdfsScanTdb().port_, |
| 0, NULL, |
| 0, cursorId_, |
| |
| requestTag_, Lob_Memory, |
| 0, // not check status |
| (NOT hdfsScanTdb().hdfsPrefetch()), //1, // waited op |
| |
| hdfsOffset_, |
| bytesToRead, |
| bytesRead_, |
| hdfsScanBuffer_ + trailingPrevRead_, |
| 2, // read |
| 0 // openType, not applicable for read |
| &hdfsErrorDetail |
| ); |
| |
| if (hdfsStats_) |
| { |
| hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); |
| hdfsStats_->incHdfsCalls(); |
| } |
| if (retcode < 0) |
| { |
| Lng32 cliError = 0; |
| |
| Lng32 intParam1 = -retcode; |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(EXE_ERROR_FROM_LOB_INTERFACE), NULL, |
| &intParam1, |
| &hdfsErrorDetail, |
| NULL, |
| "HDFS", |
| (char*)"ExpLOBInterfaceSelectCursor/read", |
| getLobErrStr(intParam1)); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR_WITH_CLOSE; |
| break; |
| } |
| } |
| |
| if (bytesRead_ <= 0) |
| { |
| // Finished with this file. Unexpected? Warning/event? |
| step_ = CLOSE_HDFS_CURSOR; |
| break; |
| } |
| else |
| { |
| char * lastByteRead = hdfsScanBuffer_ + |
| trailingPrevRead_ + bytesRead_ - 1; |
| if ((bytesRead_ < bytesToRead) && |
| (*lastByteRead != hdfsScanTdb().recordDelimiter_)) |
| { |
| // Some files end without a record delimiter but |
| // hive treats the end-of-file as a record delimiter. |
| lastByteRead[1] = hdfsScanTdb().recordDelimiter_; |
| bytesRead_++; |
| } |
| if (bytesRead_ > bytesLeft_) |
| { |
| if (isSequenceFile()) |
| endOfRequestedRange_ = hdfsScanBuffer_ + bytesRead_; |
| else |
| endOfRequestedRange_ = hdfsScanBuffer_ + |
| trailingPrevRead_ + bytesLeft_; |
| } |
| else |
| endOfRequestedRange_ = NULL; |
| |
| if (isSequenceFile()) |
| { |
| // If the file is compressed, we don't know the real value |
| // of bytesLeft_, but it doesn't really matter. |
| if (seqScanAgain_ == false) |
| bytesLeft_ = 0; |
| } |
| else |
| bytesLeft_ -= bytesRead_; |
| } |
| |
| if (hdfsStats_) |
| hdfsStats_->incBytesRead(bytesRead_); |
| |
| if (firstBufOfFile_ && hdfo_->fileIsSplitBegin() && !isSequenceFile()) |
| { |
| // Position in the hdfsScanBuffer_ to the |
| // first record delimiter. |
| hdfsBufNextRow_ = |
| hdfs_strchr(hdfsScanBuffer_, |
| hdfsScanTdb().recordDelimiter_, |
| hdfsScanBuffer_+trailingPrevRead_+ |
| min(bytesRead_, hdfo_->bytesToRead_), |
| checkRangeDelimiter_, |
| hdfsScanTdb().getHiveScanMode(), &changedLen); |
| // May be that the record is too long? Or data isn't ascii? |
| // Or delimiter is incorrect. |
| if (! hdfsBufNextRow_) |
| { |
| if (hdfo_->bytesToRead_ < hdfsScanTdb().rangeTailIOSize_) |
| { |
| // for wide rows it is not an error if a whole range |
| // does not include a record delimiter. RangeTaileIOSize |
| // is set to max row size in generator by default. |
| // It is also checked in the compiler that rowsize |
| // is less than buffer size. |
| step_ = CLOSE_HDFS_CURSOR; |
| } |
| else |
| { |
| ComDiagsArea *diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(8446), NULL, |
| NULL, NULL, NULL, |
| (char*)"No record delimiter found in buffer from hdfsRead.", |
| NULL); |
| // no need to log errors in this case (bulk load) since |
| // this is a major issue and needs to be corrected |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR_WITH_CLOSE; |
| } |
| break; |
| } |
| |
| hdfsBufNextRow_ += 1 + changedLen; // point past record delimiter. |
| //add changedLen since hdfs_strchr will remove the pointer ahead to remove the \r |
| } |
| else |
| hdfsBufNextRow_ = hdfsScanBuffer_; |
| |
| debugPrevRow_ = hdfsScanBuffer_; // By convention, at |
| // beginning of scan, the |
| // prev is set to next. |
| debugtrailingPrevRead_ = 0; |
| debugPenultimatePrevRow_ = NULL; |
| firstBufOfFile_ = false; |
| |
| hdfsOffset_ += bytesRead_; |
| |
| step_ = PROCESS_HDFS_ROW; |
| } |
| break; |
| |
| case PROCESS_HDFS_ROW: |
| { |
| if (!useLibhdfsScan_ && hdfsBufNextRow_ == NULL) { |
| step_ = TRAF_HDFS_READ; |
| break; |
| } |
| exception_ = FALSE; |
| nextStep_ = NOT_STARTED; |
| debugPenultimatePrevRow_ = debugPrevRow_; |
| debugPrevRow_ = hdfsBufNextRow_; |
| |
| int formattedRowLength = 0; |
| ComDiagsArea *transformDiags = NULL; |
| int err = 0; |
| char *startOfNextRow = |
| extractAndTransformAsciiSourceToSqlRow(err, transformDiags, hdfsScanTdb().getHiveScanMode()); |
| |
| bool rowWillBeSelected = true; |
| lastErrorCnd_ = NULL; |
| if(err) |
| { |
| if (hdfsScanTdb().continueOnError()) |
| { |
| Lng32 errorCount = workAtp_->getDiagsArea()->getNumber(DgSqlCode::ERROR_); |
| if (errorCount>0) |
| lastErrorCnd_ = workAtp_->getDiagsArea()->getErrorEntry(errorCount); |
| exception_ = TRUE; |
| rowWillBeSelected = false; |
| } |
| else |
| { |
| if (transformDiags) |
| pentry_down->setDiagsArea(transformDiags); |
| step_ = HANDLE_ERROR_WITH_CLOSE; |
| break; |
| } |
| } |
| |
| if (startOfNextRow == NULL) |
| { |
| if (useLibhdfsScan_) |
| step_ = REPOS_HDFS_DATA; |
| else { |
| if (retArray_[IS_EOF]) { |
| headRoomCopied_ = 0; |
| step_ = TRAF_HDFS_READ; |
| } |
| else |
| step_ = COPY_TAIL_TO_HEAD; |
| } |
| // Looks like we can break always |
| if (!exception_) |
| break; |
| } |
| else |
| { |
| if (useLibhdfsScan_) { |
| numBytesProcessedInRange_ += |
| startOfNextRow - hdfsBufNextRow_; |
| hdfsBufNextRow_ = startOfNextRow; |
| } |
| else { |
| if ((BYTE *)startOfNextRow > bufLogicalEnd_) { |
| headRoomCopied_ = 0; |
| hdfsBufNextRow_ = NULL; |
| } else |
| hdfsBufNextRow_ = startOfNextRow; |
| } |
| } |
| |
| if (exception_) |
| { |
| nextStep_ = step_; |
| step_ = HANDLE_EXCEPTION; |
| break; |
| } |
| if (hdfsStats_) |
| hdfsStats_->incAccessedRows(); |
| |
| workAtp_->getTupp(hdfsScanTdb().workAtpIndex_) = |
| hdfsSqlTupp_; |
| |
| if ((rowWillBeSelected) && (selectPred())) |
| { |
| ex_expr::exp_return_type evalRetCode = |
| selectPred()->eval(pentry_down->getAtp(), workAtp_); |
| if (evalRetCode == ex_expr::EXPR_FALSE) |
| rowWillBeSelected = false; |
| else if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| if (hdfsScanTdb().continueOnError()) |
| { |
| if (pentry_down->getDiagsArea() || workAtp_->getDiagsArea()) |
| { |
| Lng32 errorCount = 0; |
| if (pentry_down->getDiagsArea()) |
| { |
| errorCount = pentry_down->getDiagsArea()->getNumber(DgSqlCode::ERROR_); |
| if (errorCount > 0) |
| lastErrorCnd_ = pentry_down->getDiagsArea()->getErrorEntry(errorCount); |
| } |
| else |
| { |
| errorCount = workAtp_->getDiagsArea()->getNumber(DgSqlCode::ERROR_); |
| if (errorCount > 0) |
| lastErrorCnd_ = workAtp_->getDiagsArea()->getErrorEntry(errorCount); |
| } |
| } |
| exception_ = TRUE; |
| nextStep_ = step_; |
| step_ = HANDLE_EXCEPTION; |
| |
| rowWillBeSelected = false; |
| |
| break; |
| } |
| step_ = HANDLE_ERROR_WITH_CLOSE; |
| break; |
| } |
| else |
| ex_assert(evalRetCode == ex_expr::EXPR_TRUE, |
| "invalid return code from expr eval"); |
| } |
| |
| if (rowWillBeSelected) |
| { |
| if (moveColsConvertExpr()) |
| { |
| ex_expr::exp_return_type evalRetCode = |
| moveColsConvertExpr()->eval(workAtp_, workAtp_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| if (hdfsScanTdb().continueOnError()) |
| { |
| if ( workAtp_->getDiagsArea()) |
| { |
| Lng32 errorCount = 0; |
| errorCount = workAtp_->getDiagsArea()->getNumber(DgSqlCode::ERROR_); |
| if (errorCount > 0) |
| lastErrorCnd_ = workAtp_->getDiagsArea()->getErrorEntry(errorCount); |
| } |
| exception_ = TRUE; |
| nextStep_ = step_; |
| step_ = HANDLE_EXCEPTION; |
| break; |
| } |
| step_ = HANDLE_ERROR_WITH_CLOSE; |
| break; |
| } |
| } |
| if (hdfsStats_) |
| hdfsStats_->incUsedRows(); |
| |
| step_ = RETURN_ROW; |
| |
| break; |
| } |
| |
| break; |
| } |
| case RETURN_ROW: |
| { |
| if (qparent_.up->isFull()) |
| return WORK_OK; |
| |
| lastErrorCnd_ = NULL; |
| |
| ex_queue_entry *up_entry = qparent_.up->getTailEntry(); |
| queue_index saveParentIndex = up_entry->upState.parentIndex; |
| queue_index saveDownIndex = up_entry->upState.downIndex; |
| |
| up_entry->copyAtp(pentry_down); |
| |
| up_entry->upState.parentIndex = |
| pentry_down->downState.parentIndex; |
| up_entry->upState.downIndex = qparent_.down->getHeadIndex(); |
| up_entry->upState.status = ex_queue::Q_OK_MMORE; |
| |
| |
| if (moveExpr()) |
| { |
| UInt32 maxRowLen = hdfsScanTdb().outputRowLength_; |
| UInt32 rowLen = maxRowLen; |
| |
| if (hdfsScanTdb().useCifDefrag() && |
| !pool_->currentBufferHasEnoughSpace((Lng32)hdfsScanTdb().outputRowLength_)) |
| { |
| up_entry->getTupp(hdfsScanTdb().tuppIndex_) = defragTd_; |
| defragTd_->setReferenceCount(1); |
| ex_expr::exp_return_type evalRetCode = |
| moveExpr()->eval(up_entry->getAtp(), workAtp_,0,-1,&rowLen); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| if (hdfsScanTdb().continueOnError()) |
| { |
| if ((pentry_down->downState.request == ex_queue::GET_N) && |
| (pentry_down->downState.requestValue == matches_)) { |
| if (useLibhdfsScan_) |
| step_ = CLOSE_HDFS_CURSOR; |
| else |
| step_ = DONE; |
| } |
| else |
| step_ = PROCESS_HDFS_ROW; |
| |
| up_entry->upState.parentIndex =saveParentIndex ; |
| up_entry->upState.downIndex = saveDownIndex ; |
| if (up_entry->getDiagsArea() || workAtp_->getDiagsArea()) |
| { |
| Lng32 errorCount = 0; |
| if (up_entry->getDiagsArea()) |
| { |
| errorCount = up_entry->getDiagsArea()->getNumber(DgSqlCode::ERROR_); |
| if (errorCount > 0) |
| lastErrorCnd_ = up_entry->getDiagsArea()->getErrorEntry(errorCount); |
| } |
| else |
| { |
| errorCount = workAtp_->getDiagsArea()->getNumber(DgSqlCode::ERROR_); |
| if (errorCount > 0) |
| lastErrorCnd_ = workAtp_->getDiagsArea()->getErrorEntry(errorCount); |
| } |
| } |
| exception_ = TRUE; |
| nextStep_ = step_; |
| step_ = HANDLE_EXCEPTION; |
| break; |
| } |
| else |
| { |
| // Get diags from up_entry onto pentry_down, which |
| // is where the HANDLE_ERROR step expects it. |
| ComDiagsArea *diagsArea = pentry_down->getDiagsArea(); |
| if (diagsArea == NULL) |
| { |
| diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| pentry_down->setDiagsArea (diagsArea); |
| } |
| pentry_down->getDiagsArea()-> |
| mergeAfter(*up_entry->getDiagsArea()); |
| up_entry->setDiagsArea(NULL); |
| step_ = HANDLE_ERROR_WITH_CLOSE; |
| break; |
| } |
| if (pool_->get_free_tuple( |
| up_entry->getTupp(hdfsScanTdb().tuppIndex_), |
| rowLen)) |
| return WORK_POOL_BLOCKED; |
| str_cpy_all(up_entry->getTupp(hdfsScanTdb().tuppIndex_).getDataPointer(), |
| defragTd_->getTupleAddress(), |
| rowLen); |
| } |
| } |
| else |
| { |
| if (pool_->get_free_tuple( |
| up_entry->getTupp(hdfsScanTdb().tuppIndex_), |
| (Lng32)hdfsScanTdb().outputRowLength_)) |
| return WORK_POOL_BLOCKED; |
| ex_expr::exp_return_type evalRetCode = |
| moveExpr()->eval(up_entry->getAtp(), workAtp_,0,-1,&rowLen); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| if (hdfsScanTdb().continueOnError()) |
| { |
| if ((pentry_down->downState.request == ex_queue::GET_N) && |
| (pentry_down->downState.requestValue == matches_)) { |
| if (useLibhdfsScan_) |
| step_ = CLOSE_HDFS_CURSOR; |
| else |
| step_ = DONE; |
| } |
| else |
| step_ = PROCESS_HDFS_ROW; |
| |
| if (up_entry->getDiagsArea() || workAtp_->getDiagsArea()) |
| { |
| Lng32 errorCount = 0; |
| if (up_entry->getDiagsArea()) |
| { |
| errorCount = up_entry->getDiagsArea()->getNumber(DgSqlCode::ERROR_); |
| if (errorCount > 0) |
| lastErrorCnd_ = up_entry->getDiagsArea()->getErrorEntry(errorCount); |
| } |
| else |
| { |
| errorCount = workAtp_->getDiagsArea()->getNumber(DgSqlCode::ERROR_); |
| if (errorCount > 0) |
| lastErrorCnd_ = workAtp_->getDiagsArea()->getErrorEntry(errorCount); |
| } |
| } |
| up_entry->upState.parentIndex =saveParentIndex ; |
| up_entry->upState.downIndex = saveDownIndex ; |
| exception_ = TRUE; |
| nextStep_ = step_; |
| step_ = HANDLE_EXCEPTION; |
| break; |
| } |
| else |
| { |
| // Get diags from up_entry onto pentry_down, which |
| // is where the HANDLE_ERROR step expects it. |
| ComDiagsArea *diagsArea = pentry_down->getDiagsArea(); |
| if (diagsArea == NULL) |
| { |
| diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| pentry_down->setDiagsArea (diagsArea); |
| } |
| pentry_down->getDiagsArea()-> |
| mergeAfter(*up_entry->getDiagsArea()); |
| up_entry->setDiagsArea(NULL); |
| if (useLibhdfsScan_) |
| step_ = HANDLE_ERROR_WITH_CLOSE; |
| else |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| if (hdfsScanTdb().useCif() && rowLen != maxRowLen) |
| { |
| pool_->resizeLastTuple(rowLen, |
| up_entry->getTupp(hdfsScanTdb().tuppIndex_).getDataPointer()); |
| } |
| } |
| } |
| up_entry->upState.setMatchNo(++matches_); |
| if (matches_ == matchBrkPoint_) |
| brkpoint(); |
| qparent_.up->insert(); |
| |
| // use ExOperStats now, to cover OPERATOR stats as well as |
| // ALL stats. |
| if (getStatsEntry()) |
| getStatsEntry()->incActualRowsReturned(); |
| |
| workAtp_->setDiagsArea(NULL); // get rid of warnings. |
| if (((pentry_down->downState.request == ex_queue::GET_N) && |
| (pentry_down->downState.requestValue == matches_)) || |
| (pentry_down->downState.request == ex_queue::GET_NOMORE)) { |
| if (useLibhdfsScan_) |
| step_ = CLOSE_HDFS_CURSOR; |
| else |
| step_ = DONE; |
| } |
| else |
| step_ = PROCESS_HDFS_ROW; |
| break; |
| } |
| case REPOS_HDFS_DATA: |
| { |
| bool scanAgain = false; |
| if (isSequenceFile()) |
| scanAgain = seqScanAgain_; |
| else |
| { |
| if (hdfo_->fileIsSplitEnd()) |
| { |
| if (numBytesProcessedInRange_ < hdfo_->getBytesToRead()) |
| scanAgain = true; |
| } |
| else |
| if (bytesLeft_ > 0) |
| scanAgain = true; |
| } |
| |
| if (scanAgain) |
| { |
| // Get ready for another gulp of hdfs data. |
| debugtrailingPrevRead_ = trailingPrevRead_; |
| trailingPrevRead_ = bytesRead_ - |
| (hdfsBufNextRow_ - |
| (hdfsScanBuffer_ + trailingPrevRead_)); |
| |
| // Move trailing data from the end of buffer to the front. |
| // The GET_HDFS_DATA step will use trailingPrevRead_ to |
| // adjust the read buffer ptr so that the next read happens |
| // contiguously to the final byte of the prev read. It will |
| // also use trailingPrevRead_ to to adjust the size of |
| // the next read so that fixed size buffer is not overrun. |
| // Finally, trailingPrevRead_ is used in the |
| // extractSourceFields method to keep from processing |
| // bytes left in the buffer from the previous read. |
| if ((trailingPrevRead_ > 0) && |
| (hdfsBufNextRow_[0] == RANGE_DELIMITER)) |
| { |
| checkRangeDelimiter_ = FALSE; |
| step_ = CLOSE_HDFS_CURSOR; |
| break; |
| } |
| memmove(hdfsScanBuffer_, hdfsBufNextRow_, |
| (size_t)trailingPrevRead_); |
| step_ = GET_HDFS_DATA; |
| } |
| else |
| { |
| trailingPrevRead_ = 0; |
| step_ = CLOSE_HDFS_CURSOR; |
| } |
| break; |
| } |
| case CLOSE_HDFS_CURSOR: |
| { |
| retcode = 0; |
| if (isSequenceFile()) |
| { |
| sfrRetCode = sequenceFileReader_->close(); |
| if (sfrRetCode != JNI_OK) |
| { |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, (ExeErrorCode)(8447), NULL, |
| NULL, NULL, NULL, sequenceFileReader_->getErrorText(sfrRetCode), NULL); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| else |
| { |
| retcode = ExpLOBInterfaceSelectCursor |
| (lobGlob_, |
| hdfsFileName_, |
| NULL, |
| (Lng32)Lob_External_HDFS_File, |
| hdfsScanTdb().hostName_, |
| hdfsScanTdb().port_, |
| 0,NULL, //handle not relevant for non lob access |
| 0, cursorId_, |
| |
| requestTag_, Lob_Memory, |
| 0, // not check status |
| (NOT hdfsScanTdb().hdfsPrefetch()), //1, // waited op |
| |
| 0, |
| hdfsScanBufMaxSize_, |
| bytesRead_, |
| hdfsScanBuffer_, |
| 3, // close |
| 0); // openType, not applicable for close |
| |
| if (retcode < 0) |
| { |
| Lng32 cliError = 0; |
| |
| Lng32 intParam1 = -retcode; |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(EXE_ERROR_FROM_LOB_INTERFACE), NULL, |
| &intParam1, |
| &errno, |
| NULL, |
| "HDFS", |
| (char*)"ExpLOBInterfaceSelectCursor/close", |
| getLobErrStr(intParam1)); |
| pentry_down->setDiagsArea(diagsArea); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| step_ = CLOSE_FILE; |
| } |
| break; |
| case HANDLE_EXCEPTION: |
| { |
| step_ = nextStep_; |
| exception_ = FALSE; |
| Int64 exceptionCount = 0; |
| ExHbaseAccessTcb::incrErrorCount( ehi_,exceptionCount, |
| hdfsScanTdb().getErrCountTable(),hdfsScanTdb().getErrCountRowId()); |
| if (hdfsScanTdb().getMaxErrorRows() > 0) |
| { |
| if (exceptionCount > hdfsScanTdb().getMaxErrorRows()) |
| { |
| if (pentry_down->getDiagsArea()) |
| pentry_down->getDiagsArea()->clear(); |
| if (workAtp_->getDiagsArea()) |
| workAtp_->getDiagsArea()->clear(); |
| |
| ComDiagsArea *da = workAtp_->getDiagsArea(); |
| if(!da) |
| { |
| da = ComDiagsArea::allocate(getHeap()); |
| workAtp_->setDiagsArea(da); |
| } |
| *da << DgSqlCode(-EXE_MAX_ERROR_ROWS_EXCEEDED); |
| step_ = HANDLE_ERROR_WITH_CLOSE; |
| break; |
| } |
| } |
| if (hdfsScanTdb().getLogErrorRows()) |
| { |
| int loggingRowLen = hdfsLoggingRowEnd_ - hdfsLoggingRow_ +1; |
| handleException((NAHeap *)getHeap(), hdfsLoggingRow_, |
| loggingRowLen, lastErrorCnd_ ); |
| |
| |
| } |
| |
| if (pentry_down->getDiagsArea()) |
| pentry_down->getDiagsArea()->clear(); |
| if (workAtp_->getDiagsArea()) |
| workAtp_->getDiagsArea()->clear(); |
| } |
| break; |
| |
| case HANDLE_ERROR_WITH_CLOSE: |
| case HANDLE_ERROR: |
| case HANDLE_ERROR_AND_DONE: |
| { |
| if (qparent_.up->isFull()) |
| return WORK_OK; |
| ex_queue_entry *up_entry = qparent_.up->getTailEntry(); |
| up_entry->copyAtp(pentry_down); |
| up_entry->upState.parentIndex = |
| pentry_down->downState.parentIndex; |
| up_entry->upState.downIndex = qparent_.down->getHeadIndex(); |
| if (workAtp_->getDiagsArea()) |
| { |
| ComDiagsArea *diagsArea = up_entry->getDiagsArea(); |
| |
| if (diagsArea == NULL) |
| { |
| diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| |
| up_entry->setDiagsArea (diagsArea); |
| } |
| |
| up_entry->getDiagsArea()->mergeAfter(*workAtp_->getDiagsArea()); |
| workAtp_->setDiagsArea(NULL); |
| } |
| up_entry->upState.status = ex_queue::Q_SQLERROR; |
| qparent_.up->insert(); |
| |
| if (useLibhdfsScan_) { |
| if (step_ == HANDLE_ERROR_WITH_CLOSE) |
| step_ = CLOSE_HDFS_CURSOR; |
| else if (step_ == HANDLE_ERROR_AND_DONE) |
| step_ = DONE; |
| else |
| step_ = ERROR_CLOSE_FILE; |
| } else |
| step_ = DONE; |
| break; |
| } |
| |
| case CLOSE_FILE: |
| case ERROR_CLOSE_FILE: |
| { |
| if (getStatsEntry()) |
| { |
| ExHdfsScanStats * stats = |
| getStatsEntry()->castToExHdfsScanStats(); |
| |
| if (stats) |
| { |
| ExLobStats s; |
| s.init(); |
| |
| retcode = ExpLOBinterfaceStats |
| (lobGlob_, |
| &s, |
| hdfsFileName_, //hdfsScanTdb().hdfsFileName_, |
| NULL, //(char*)"", |
| (Lng32)Lob_External_HDFS_File, |
| hdfsScanTdb().hostName_, |
| hdfsScanTdb().port_); |
| |
| *stats->lobStats() = *stats->lobStats() + s; |
| } |
| } |
| |
| // if next file is not same as current file, then close the current file. |
| bool closeFile = true; |
| |
| if ( (step_ == CLOSE_FILE) && |
| ((currRangeNum_ + 1) < (beginRangeNum_ + numRanges_))) |
| |
| { |
| hdfo = getRange(currRangeNum_ + 1); |
| if (strcmp(hdfsFileName_, hdfo->fileName()) == 0) |
| closeFile = false; |
| } |
| |
| if (closeFile) |
| { |
| retcode = ExpLOBinterfaceCloseFile |
| (lobGlob_, |
| hdfsFileName_, |
| NULL, |
| (Lng32)Lob_External_HDFS_File, |
| hdfsScanTdb().hostName_, |
| hdfsScanTdb().port_); |
| |
| if ((step_ == CLOSE_FILE) && |
| (retcode < 0)) |
| { |
| Lng32 cliError = 0; |
| |
| Lng32 intParam1 = -retcode; |
| ComDiagsArea * diagsArea = NULL; |
| ExRaiseSqlError(getHeap(), &diagsArea, |
| (ExeErrorCode)(EXE_ERROR_FROM_LOB_INTERFACE), NULL, |
| &intParam1, |
| &cliError, |
| NULL, |
| "HDFS", |
| (char*)"ExpLOBinterfaceCloseFile", |
| getLobErrStr(intParam1)); |
| pentry_down->setDiagsArea(diagsArea); |
| } |
| } |
| if (step_ == CLOSE_FILE) |
| { |
| currRangeNum_++; |
| if (currRangeNum_ < (beginRangeNum_ + numRanges_)) { |
| if (((pentry_down->downState.request == ex_queue::GET_N) && |
| (pentry_down->downState.requestValue == matches_)) || |
| (pentry_down->downState.request == ex_queue::GET_NOMORE)) |
| step_ = DONE; |
| else |
| // move to the next file. |
| step_ = INIT_HDFS_CURSOR; |
| break; |
| } |
| } |
| step_ = DONE; |
| } |
| |
| |
| break; |
| |
| case DONE: |
| { |
| if (qparent_.up->isFull()) |
| return WORK_OK; |
| if (logFileHdfsClient_ != NULL) |
| retcode = logFileHdfsClient_->hdfsClose(); |
| ex_queue_entry *up_entry = qparent_.up->getTailEntry(); |
| up_entry->copyAtp(pentry_down); |
| up_entry->upState.parentIndex = |
| pentry_down->downState.parentIndex; |
| up_entry->upState.downIndex = qparent_.down->getHeadIndex(); |
| up_entry->upState.status = ex_queue::Q_NO_DATA; |
| up_entry->upState.setMatchNo(matches_); |
| if (loggingErrorDiags_ != NULL) |
| { |
| ComDiagsArea * diagsArea = up_entry->getDiagsArea(); |
| if (!diagsArea) |
| { |
| diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| up_entry->setDiagsArea(diagsArea); |
| } |
| diagsArea->mergeAfter(*loggingErrorDiags_); |
| loggingErrorDiags_->clear(); |
| } |
| qparent_.up->insert(); |
| |
| qparent_.down->removeHead(); |
| step_ = NOT_STARTED; |
| break; |
| } |
| |
| default: |
| { |
| break; |
| } |
| |
| } // switch |
| } // while |
| |
| return WORK_OK; |
| } |
| |
| char * ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err, |
| ComDiagsArea* &diagsArea, int mode) |
| { |
| err = 0; |
| char *sourceData = hdfsBufNextRow_; |
| char *sourceRowEnd = NULL; |
| char *sourceColEnd = NULL; |
| int changedLen = 0; |
| NABoolean isTrailingMissingColumn = FALSE; |
| ExpTupleDesc * asciiSourceTD = |
| hdfsScanTdb().workCriDesc_->getTupleDescriptor(hdfsScanTdb().asciiTuppIndex_); |
| |
| ExpTupleDesc * origSourceTD = |
| hdfsScanTdb().workCriDesc_->getTupleDescriptor(hdfsScanTdb().origTuppIndex_); |
| |
| const char cd = hdfsScanTdb().columnDelimiter_; |
| const char rd = hdfsScanTdb().recordDelimiter_; |
| const char *sourceDataEnd; |
| const char *endOfRequestedRange; |
| if (useLibhdfsScan_) { |
| sourceDataEnd = hdfsScanBuffer_+trailingPrevRead_+ bytesRead_; |
| endOfRequestedRange = endOfRequestedRange_; |
| } |
| else { |
| sourceDataEnd = (const char *)bufEnd_; |
| endOfRequestedRange = NULL; |
| } |
| hdfsLoggingRow_ = hdfsBufNextRow_; |
| if (asciiSourceTD->numAttrs() == 0) |
| { |
| sourceRowEnd = hdfs_strchr(sourceData, rd, sourceDataEnd, checkRangeDelimiter_, mode, &changedLen); |
| hdfsLoggingRowEnd_ = sourceRowEnd + changedLen; |
| |
| if (sourceRowEnd == NULL) |
| return NULL; |
| if (useLibhdfsScan_) { |
| if ((endOfRequestedRange) && |
| (sourceRowEnd >= endOfRequestedRange)) { |
| checkRangeDelimiter_ = TRUE; |
| *(sourceRowEnd +1)= RANGE_DELIMITER; |
| } |
| } |
| // no columns need to be converted. For e.g. count(*) with no predicate |
| return sourceRowEnd+1; |
| } |
| |
| Lng32 neededColIndex = 0; |
| Attributes * attr = NULL; |
| Attributes * tgtAttr = NULL; |
| NABoolean rdSeen = FALSE; |
| |
| for (Lng32 i = 0; i < hdfsScanTdb().convertSkipListSize_; i++) |
| { |
| // all remainin columns wil be skip columns, don't bother |
| // finding their column delimiters |
| if (neededColIndex == asciiSourceTD->numAttrs()) |
| continue; |
| |
| tgtAttr = NULL; |
| if (hdfsScanTdb().convertSkipList_[i] > 0) |
| { |
| attr = asciiSourceTD->getAttr(neededColIndex); |
| tgtAttr = origSourceTD->getAttr(neededColIndex); |
| neededColIndex++; |
| } |
| else |
| attr = NULL; |
| |
| if (!isTrailingMissingColumn) { |
| sourceColEnd = hdfs_strchr(sourceData, rd, cd, sourceDataEnd, checkRangeDelimiter_, &rdSeen,mode, &changedLen); |
| if (sourceColEnd == NULL) { |
| if (rdSeen || (sourceRowEnd == NULL)) |
| return NULL; |
| else |
| return sourceRowEnd+1; |
| } |
| Int32 len = 0; |
| len = (Int64)sourceColEnd - (Int64)sourceData; |
| if (rdSeen) { |
| sourceRowEnd = sourceColEnd + changedLen; |
| hdfsLoggingRowEnd_ = sourceRowEnd; |
| if ((endOfRequestedRange) && |
| (sourceRowEnd >= endOfRequestedRange)) { |
| checkRangeDelimiter_ = TRUE; |
| *(sourceRowEnd +1)= RANGE_DELIMITER; |
| } |
| if (i != hdfsScanTdb().convertSkipListSize_ - 1) |
| isTrailingMissingColumn = TRUE; |
| } |
| |
| if (attr) // this is a needed column. We need to convert |
| { |
| if (attr->getVCIndicatorLength() == sizeof(short)) |
| *(short*)&hdfsAsciiSourceData_[attr->getVCLenIndOffset()] |
| = (short)len; |
| else |
| *(Int32*)&hdfsAsciiSourceData_[attr->getVCLenIndOffset()] |
| = len; |
| |
| if (attr->getNullFlag()) |
| { |
| *(short *)&hdfsAsciiSourceData_[attr->getNullIndOffset()] = 0; |
| if (hdfsScanTdb().getNullFormat()) // null format specified by user |
| { |
| if (((len == 0) && (strlen(hdfsScanTdb().getNullFormat()) == 0)) || |
| ((len > 0) && (memcmp(sourceData, hdfsScanTdb().getNullFormat(), len) == 0))) |
| { |
| *(short *)&hdfsAsciiSourceData_[attr->getNullIndOffset()] = -1; |
| } |
| } // if |
| else // null format not specified by user |
| { |
| // Use default null format. |
| // for non-varchar, length of zero indicates a null value. |
| // For all datatypes, HIVE_DEFAULT_NULL_STRING('\N') indicates a null value. |
| if (((len == 0) && (tgtAttr && (NOT DFS2REC::isSQLVarChar(tgtAttr->getDatatype())))) || |
| ((len == strlen(HIVE_DEFAULT_NULL_STRING)) && (memcmp(sourceData, HIVE_DEFAULT_NULL_STRING, len) == 0))) |
| { |
| *(short *)&hdfsAsciiSourceData_[attr->getNullIndOffset()] = -1; |
| } |
| } // else |
| } // if nullable attr |
| |
| if (len > 0) |
| { |
| // move address of data into the source operand. |
| // convertExpr will dereference this addr and get to the actual |
| // data. |
| *(Int64*)&hdfsAsciiSourceData_[attr->getOffset()] = |
| (Int64)sourceData; |
| } |
| else |
| { |
| *(Int64*)&hdfsAsciiSourceData_[attr->getOffset()] = |
| (Int64)0; |
| |
| } |
| } // if(attr) |
| } // if (!trailingMissingColumn) |
| else |
| { |
| // A delimiter was found, but not enough columns. |
| // Treat the rest of the columns as NULL. |
| if (attr && attr->getNullFlag()) |
| *(short *)&hdfsAsciiSourceData_[attr->getNullIndOffset()] = -1; |
| } |
| sourceData = sourceColEnd + 1 ; |
| } |
| // It is possible that the above loop came out before |
| // rowDelimiter is encountered |
| // So try to find the record delimiter |
| if (sourceRowEnd == NULL) { |
| sourceRowEnd = hdfs_strchr(sourceData, rd, sourceDataEnd, checkRangeDelimiter_,mode, &changedLen); |
| if (sourceRowEnd) { |
| hdfsLoggingRowEnd_ = sourceRowEnd + changedLen; //changedLen is when hdfs_strchr move the return pointer to remove the extra \r |
| if ((endOfRequestedRange) && |
| (sourceRowEnd >= endOfRequestedRange )) { |
| checkRangeDelimiter_ = TRUE; |
| *(sourceRowEnd +1)= RANGE_DELIMITER; |
| } |
| } |
| } |
| |
| workAtp_->getTupp(hdfsScanTdb().workAtpIndex_) = hdfsSqlTupp_; |
| workAtp_->getTupp(hdfsScanTdb().asciiTuppIndex_) = hdfsAsciiSourceTupp_; |
| // for later |
| workAtp_->getTupp(hdfsScanTdb().moveExprColsTuppIndex_) = moveExprColsTupp_; |
| |
| if (convertExpr()) |
| { |
| ex_expr::exp_return_type evalRetCode = |
| convertExpr()->eval(workAtp_, workAtp_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| err = -1; |
| else |
| err = 0; |
| } |
| if (sourceRowEnd) |
| return sourceRowEnd+1; |
| return NULL; |
| } |
| |
| void ExHdfsScanTcb::computeRangesAtRuntime() |
| { |
| int numFiles = 0; |
| Int64 totalSize = 0; |
| Int64 myShare = 0; |
| Int64 runningSum = 0; |
| Int64 myStartPositionInBytes = 0; |
| Int64 myEndPositionInBytes = 0; |
| Int64 firstFileStartingOffset = 0; |
| Int64 lastFileBytesToRead = -1; |
| Int32 numParallelInstances = MAXOF(getGlobals()->getNumOfInstances(),1); |
| |
| HDFS_FileInfo *fileInfos; |
| HDFS_Client_RetCode hdfsClientRetcode; |
| |
| hdfsClientRetcode = hdfsClient_->hdfsListDirectory(hdfsScanTdb().hdfsRootDir_, &fileInfos, &numFiles); |
| ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error:hdfsClient->hdfsListDirectory returned an error") |
| |
| deallocateRuntimeRanges(); |
| |
| // in a first round, count the total number of bytes |
| for (int f=0; f<numFiles; f++) |
| { |
| ex_assert(fileInfos[f].mKind == HDFS_FILE_KIND, |
| "subdirectories not supported with runtime HDFS ranges"); |
| totalSize += (Int64) fileInfos[f].mSize; |
| } |
| |
| // compute my share, in bytes |
| // (the last of the ESPs may read a bit more) |
| myShare = totalSize / numParallelInstances; |
| myStartPositionInBytes = myInstNum_ * myShare; |
| beginRangeNum_ = -1; |
| numRanges_ = 0; |
| |
| if (totalSize > 0) |
| { |
| if (myInstNum_ < numParallelInstances-1) |
| // read "myShare" bytes |
| myEndPositionInBytes = myStartPositionInBytes + myShare; |
| else |
| // the last ESP reads whatever is remaining |
| myEndPositionInBytes = totalSize; |
| |
| // second round, find out the range of files I need to read |
| for (int g=0; |
| g < numFiles && runningSum < myEndPositionInBytes; |
| g++) |
| { |
| Int64 prevSum = runningSum; |
| |
| runningSum += (Int64) fileInfos[g].mSize; |
| |
| if (runningSum >= myStartPositionInBytes) |
| { |
| if (beginRangeNum_ < 0) |
| { |
| // I have reached the first file that I need to read |
| beginRangeNum_ = g; |
| firstFileStartingOffset = |
| myStartPositionInBytes - prevSum; |
| } |
| |
| numRanges_++; |
| |
| if (runningSum > myEndPositionInBytes) |
| // I don't need to read all the way to the end of this file |
| lastFileBytesToRead = myEndPositionInBytes - prevSum; |
| |
| } // file is at or beyond my starting file |
| } // loop over files, determining ranges |
| } // total size > 0 |
| else |
| beginRangeNum_ = 0; |
| |
| // third round, populate the ranges that this ESP needs to read |
| for (int h=beginRangeNum_; h<beginRangeNum_+numRanges_; h++) |
| { |
| HdfsFileInfo *e = new(getHeap()) HdfsFileInfo; |
| const char *fileName = fileInfos[h].mName; |
| Int32 fileNameLen = strlen(fileName) + 1; |
| |
| e->entryNum_ = h; |
| e->flags_ = 0; |
| e->fileName_ = new(getHeap()) char[fileNameLen]; |
| str_cpy_all(e->fileName_, fileName, fileNameLen); |
| if (h == beginRangeNum_ && |
| firstFileStartingOffset > 0) |
| { |
| e->startOffset_ = firstFileStartingOffset; |
| e->setFileIsSplitBegin(TRUE); |
| } |
| else |
| e->startOffset_ = 0; |
| |
| |
| if (h == beginRangeNum_+numRanges_-1 && lastFileBytesToRead > 0) |
| { |
| e->bytesToRead_ = lastFileBytesToRead; |
| e->setFileIsSplitEnd(TRUE); |
| } |
| else |
| e->bytesToRead_ = (Int64) fileInfos[h].mSize; |
| |
| hdfsFileInfoListAsArray_.insertAt(h, e); |
| } |
| } |
| |
| void ExHdfsScanTcb::deallocateRuntimeRanges() |
| { |
| if (hdfsScanTdb().getAssignRangesAtRuntime() && |
| hdfsFileInfoListAsArray_.entries() > 0) |
| { |
| for (int i=0; i<hdfsFileInfoListAsArray_.getUsedLength(); i++) |
| if (hdfsFileInfoListAsArray_.used(i)) |
| { |
| NADELETEBASIC(hdfsFileInfoListAsArray_[i]->fileName_.getPointer(), getHeap()); |
| NADELETEBASIC(hdfsFileInfoListAsArray_[i], getHeap()); |
| } |
| hdfsFileInfoListAsArray_.clear(); |
| } |
| } |
| |
| short ExHdfsScanTcb::moveRowToUpQueue(const char * row, Lng32 len, |
| short * rc, NABoolean isVarchar) |
| { |
| if (qparent_.up->isFull()) |
| { |
| if (rc) |
| *rc = WORK_OK; |
| return -1; |
| } |
| |
| Lng32 length; |
| if (len <= 0) |
| length = strlen(row); |
| else |
| length = len; |
| |
| tupp p; |
| if (pool_->get_free_tuple(p, (Lng32) |
| ((isVarchar ? SQL_VARCHAR_HDR_SIZE : 0) |
| + length))) |
| { |
| if (rc) |
| *rc = WORK_POOL_BLOCKED; |
| return -1; |
| } |
| |
| char * dp = p.getDataPointer(); |
| if (isVarchar) |
| { |
| *(short*)dp = (short)length; |
| str_cpy_all(&dp[SQL_VARCHAR_HDR_SIZE], row, length); |
| } |
| else |
| { |
| str_cpy_all(dp, row, length); |
| } |
| |
| ex_queue_entry * pentry_down = qparent_.down->getHeadEntry(); |
| ex_queue_entry * up_entry = qparent_.up->getTailEntry(); |
| |
| up_entry->copyAtp(pentry_down); |
| up_entry->getAtp()->getTupp((Lng32)hdfsScanTdb().tuppIndex_) = p; |
| |
| up_entry->upState.parentIndex = |
| pentry_down->downState.parentIndex; |
| |
| up_entry->upState.setMatchNo(++matches_); |
| up_entry->upState.status = ex_queue::Q_OK_MMORE; |
| |
| // insert into parent |
| qparent_.up->insert(); |
| |
| return 0; |
| } |
| |
| short ExHdfsScanTcb::handleError(short &rc) |
| { |
| if (qparent_.up->isFull()) |
| { |
| rc = WORK_OK; |
| return -1; |
| } |
| |
| if (qparent_.up->isFull()) |
| return WORK_OK; |
| |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| ex_queue_entry *up_entry = qparent_.up->getTailEntry(); |
| up_entry->copyAtp(pentry_down); |
| up_entry->upState.parentIndex = |
| pentry_down->downState.parentIndex; |
| up_entry->upState.downIndex = qparent_.down->getHeadIndex(); |
| up_entry->upState.status = ex_queue::Q_SQLERROR; |
| qparent_.up->insert(); |
| |
| return 0; |
| } |
| |
| short ExHdfsScanTcb::handleDone(ExWorkProcRetcode &rc) |
| { |
| if (qparent_.up->isFull()) |
| { |
| rc = WORK_OK; |
| return -1; |
| } |
| |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| ex_queue_entry *up_entry = qparent_.up->getTailEntry(); |
| up_entry->copyAtp(pentry_down); |
| up_entry->upState.parentIndex = |
| pentry_down->downState.parentIndex; |
| up_entry->upState.downIndex = qparent_.down->getHeadIndex(); |
| up_entry->upState.status = ex_queue::Q_NO_DATA; |
| up_entry->upState.setMatchNo(matches_); |
| qparent_.up->insert(); |
| |
| qparent_.down->removeHead(); |
| |
| return 0; |
| } |
| |
| void ExHdfsScanTcb::handleException(NAHeap *heap, |
| char *logErrorRow, |
| Lng32 logErrorRowLen, |
| ComCondition *errorCond) |
| { |
| Lng32 errorMsgLen = 0; |
| charBuf *cBuf = NULL; |
| char *errorMsg; |
| HDFS_Client_RetCode hdfsClientRetcode; |
| |
| if (loggingErrorDiags_ != NULL) |
| return; |
| |
| if (!loggingFileCreated_) { |
| logFileHdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), NULL, hdfsClientRetcode); |
| if (hdfsClientRetcode == HDFS_CLIENT_OK) |
| hdfsClientRetcode = logFileHdfsClient_->hdfsCreate(loggingFileName_, TRUE, FALSE); |
| if (hdfsClientRetcode == HDFS_CLIENT_OK) |
| loggingFileCreated_ = TRUE; |
| else |
| goto logErrorReturn; |
| } |
| logFileHdfsClient_->hdfsWrite(logErrorRow, logErrorRowLen, hdfsClientRetcode); |
| if (hdfsClientRetcode != HDFS_CLIENT_OK) |
| goto logErrorReturn; |
| if (errorCond != NULL) { |
| errorMsgLen = errorCond->getMessageLength(); |
| const NAWcharBuf wBuf((NAWchar*)errorCond->getMessageText(), errorMsgLen, heap); |
| cBuf = unicodeToISO88591(wBuf, heap, cBuf); |
| errorMsg = (char *)cBuf->data(); |
| errorMsgLen = cBuf -> getStrLen(); |
| errorMsg[errorMsgLen]='\n'; |
| errorMsgLen++; |
| } |
| else { |
| errorMsg = (char *)"[UNKNOWN EXCEPTION]\n"; |
| errorMsgLen = strlen(errorMsg); |
| } |
| logFileHdfsClient_->hdfsWrite(errorMsg, errorMsgLen, hdfsClientRetcode); |
| logErrorReturn: |
| if (hdfsClientRetcode != HDFS_CLIENT_OK) { |
| loggingErrorDiags_ = ComDiagsArea::allocate(heap); |
| *loggingErrorDiags_ << DgSqlCode(EXE_ERROR_WHILE_LOGGING) |
| << DgString0(loggingFileName_) |
| << DgString1((char *)GetCliGlobals()->getJniErrorStr()); |
| } |
| } |
| |
| |
| //////////////////////////////////////////////////////////////////////// |
| // ORC files |
| //////////////////////////////////////////////////////////////////////// |
| ExOrcScanTcb::ExOrcScanTcb( |
| const ComTdbHdfsScan &orcScanTdb, |
| ex_globals * glob ) : |
| ExHdfsScanTcb( orcScanTdb, glob), |
| step_(NOT_STARTED) |
| { |
| orci_ = ExpORCinterface::newInstance(glob->getDefaultHeap(), |
| (char*)orcScanTdb.hostName_, |
| orcScanTdb.port_); |
| } |
| |
| ExOrcScanTcb::~ExOrcScanTcb() |
| { |
| } |
| |
| Int32 ExOrcScanTcb::fixup() |
| { |
| lobGlob_ = NULL; |
| |
| return 0; |
| } |
| |
| |
| |
| short ExOrcScanTcb::extractAndTransformOrcSourceToSqlRow( |
| char * orcRow, |
| Int64 orcRowLen, |
| Lng32 numOrcCols, |
| ComDiagsArea* &diagsArea) |
| { |
| short err = 0; |
| |
| if ((!orcRow) || (orcRowLen <= 0)) |
| return -1; |
| |
| char *sourceData = orcRow; |
| |
| ExpTupleDesc * asciiSourceTD = |
| hdfsScanTdb().workCriDesc_->getTupleDescriptor(hdfsScanTdb().asciiTuppIndex_); |
| if (asciiSourceTD->numAttrs() == 0) |
| { |
| // no columns need to be converted. For e.g. count(*) with no predicate |
| return 0; |
| } |
| |
| Lng32 neededColIndex = 0; |
| Attributes * attr = NULL; |
| |
| Lng32 numCurrCols = 0; |
| Lng32 currColLen; |
| for (Lng32 i = 0; i < hdfsScanTdb().convertSkipListSize_; i++) |
| { |
| if (hdfsScanTdb().convertSkipList_[i] > 0) |
| { |
| attr = asciiSourceTD->getAttr(neededColIndex); |
| neededColIndex++; |
| } |
| else |
| attr = NULL; |
| |
| currColLen = *(Lng32*)sourceData; |
| sourceData += sizeof(currColLen); |
| |
| if (attr) // this is a needed column. We need to convert |
| { |
| *(short*)&hdfsAsciiSourceData_[attr->getVCLenIndOffset()] = currColLen; |
| if (attr->getNullFlag()) |
| { |
| if (currColLen == 0) |
| *(short *)&hdfsAsciiSourceData_[attr->getNullIndOffset()] = -1; |
| else if (memcmp(sourceData, HIVE_DEFAULT_NULL_STRING, currColLen) == 0) |
| *(short *)&hdfsAsciiSourceData_[attr->getNullIndOffset()] = -1; |
| else |
| *(short *)&hdfsAsciiSourceData_[attr->getNullIndOffset()] = 0; |
| } |
| |
| if (currColLen > 0) |
| { |
| // move address of data into the source operand. |
| // convertExpr will dereference this addr and get to the actual |
| // data. |
| *(Int64*)&hdfsAsciiSourceData_[attr->getOffset()] = |
| (Int64)sourceData; |
| } |
| |
| } // if(attr) |
| |
| numCurrCols++; |
| sourceData += currColLen; |
| } |
| |
| if (numCurrCols != numOrcCols) |
| { |
| return -1; |
| } |
| |
| workAtp_->getTupp(hdfsScanTdb().workAtpIndex_) = hdfsSqlTupp_; |
| workAtp_->getTupp(hdfsScanTdb().asciiTuppIndex_) = hdfsAsciiSourceTupp_; |
| // for later |
| workAtp_->getTupp(hdfsScanTdb().moveExprColsTuppIndex_) = moveExprColsTupp_; |
| |
| err = 0; |
| if (convertExpr()) |
| { |
| ex_expr::exp_return_type evalRetCode = |
| convertExpr()->eval(workAtp_, workAtp_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| err = -1; |
| else |
| err = 0; |
| } |
| |
| return err; |
| } |
| |
| ExWorkProcRetcode ExOrcScanTcb::work() |
| { |
| Lng32 retcode = 0; |
| short rc = 0; |
| |
| while (!qparent_.down->isEmpty()) |
| { |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| if (pentry_down->downState.request == ex_queue::GET_NOMORE) |
| step_ = DONE; |
| |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| matches_ = 0; |
| |
| beginRangeNum_ = -1; |
| numRanges_ = -1; |
| |
| if (hdfsScanTdb().getHdfsFileInfoList()->isEmpty()) |
| { |
| step_ = DONE; |
| break; |
| } |
| |
| myInstNum_ = getGlobals()->getMyInstanceNumber(); |
| |
| beginRangeNum_ = |
| *(Lng32*)hdfsScanTdb().getHdfsFileRangeBeginList()->get(myInstNum_); |
| |
| numRanges_ = |
| *(Lng32*)hdfsScanTdb().getHdfsFileRangeNumList()->get(myInstNum_); |
| |
| currRangeNum_ = beginRangeNum_; |
| |
| if (numRanges_ > 0) |
| step_ = INIT_ORC_CURSOR; |
| else |
| step_ = DONE; |
| } |
| break; |
| |
| case INIT_ORC_CURSOR: |
| { |
| /* orci_ = ExpORCinterface::newInstance(getHeap(), |
| (char*)hdfsScanTdb().hostName_, |
| |
| */ |
| |
| hdfo_ = (HdfsFileInfo*) |
| hdfsScanTdb().getHdfsFileInfoList()->get(currRangeNum_); |
| |
| orcStartRowNum_ = hdfo_->getStartRow(); |
| orcNumRows_ = hdfo_->getNumRows(); |
| |
| hdfsFileName_ = hdfo_->fileName(); |
| sprintf(cursorId_, "%d", currRangeNum_); |
| |
| if (orcNumRows_ == -1) // select all rows |
| orcStopRowNum_ = -1; |
| else |
| orcStopRowNum_ = orcStartRowNum_ + orcNumRows_ - 1; |
| |
| step_ = OPEN_ORC_CURSOR; |
| } |
| break; |
| |
| case OPEN_ORC_CURSOR: |
| { |
| retcode = orci_->scanOpen(hdfsFileName_, |
| orcStartRowNum_, orcStopRowNum_); |
| if (retcode < 0) |
| { |
| setupError(EXE_ERROR_FROM_LOB_INTERFACE, retcode, "ORC", "scanOpen", |
| orci_->getErrorText(-retcode)); |
| |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| step_ = GET_ORC_ROW; |
| } |
| break; |
| |
| case GET_ORC_ROW: |
| { |
| orcRow_ = hdfsScanBuffer_; |
| orcRowLen_ = hdfsScanTdb().hdfsBufSize_; |
| retcode = orci_->scanFetch(orcRow_, orcRowLen_, orcRowNum_, |
| numOrcCols_); |
| if (retcode < 0) |
| { |
| setupError(EXE_ERROR_FROM_LOB_INTERFACE, retcode, "ORC", "scanFetch", |
| orci_->getErrorText(-retcode)); |
| |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (retcode == 100) |
| { |
| step_ = CLOSE_ORC_CURSOR; |
| break; |
| } |
| |
| step_ = PROCESS_ORC_ROW; |
| } |
| break; |
| |
| case PROCESS_ORC_ROW: |
| { |
| int formattedRowLength = 0; |
| ComDiagsArea *transformDiags = NULL; |
| short err = |
| extractAndTransformOrcSourceToSqlRow(orcRow_, orcRowLen_, |
| numOrcCols_, transformDiags); |
| |
| if (err) |
| { |
| if (transformDiags) |
| pentry_down->setDiagsArea(transformDiags); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (hdfsStats_) |
| hdfsStats_->incAccessedRows(); |
| |
| workAtp_->getTupp(hdfsScanTdb().workAtpIndex_) = |
| hdfsSqlTupp_; |
| |
| bool rowWillBeSelected = true; |
| if (selectPred()) |
| { |
| ex_expr::exp_return_type evalRetCode = |
| selectPred()->eval(pentry_down->getAtp(), workAtp_); |
| if (evalRetCode == ex_expr::EXPR_FALSE) |
| rowWillBeSelected = false; |
| else if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| else |
| ex_assert(evalRetCode == ex_expr::EXPR_TRUE, |
| "invalid return code from expr eval"); |
| } |
| |
| if (rowWillBeSelected) |
| { |
| if (moveColsConvertExpr()) |
| { |
| ex_expr::exp_return_type evalRetCode = |
| moveColsConvertExpr()->eval(workAtp_, workAtp_); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| } |
| if (hdfsStats_) |
| hdfsStats_->incUsedRows(); |
| |
| step_ = RETURN_ROW; |
| break; |
| } |
| |
| step_ = GET_ORC_ROW; |
| } |
| break; |
| |
| case RETURN_ROW: |
| { |
| if (qparent_.up->isFull()) |
| return WORK_OK; |
| |
| ex_queue_entry *up_entry = qparent_.up->getTailEntry(); |
| up_entry->copyAtp(pentry_down); |
| up_entry->upState.parentIndex = |
| pentry_down->downState.parentIndex; |
| up_entry->upState.downIndex = qparent_.down->getHeadIndex(); |
| up_entry->upState.status = ex_queue::Q_OK_MMORE; |
| |
| if (moveExpr()) |
| { |
| UInt32 maxRowLen = hdfsScanTdb().outputRowLength_; |
| UInt32 rowLen = maxRowLen; |
| |
| if (hdfsScanTdb().useCifDefrag() && |
| !pool_->currentBufferHasEnoughSpace((Lng32)hdfsScanTdb().outputRowLength_)) |
| { |
| up_entry->getTupp(hdfsScanTdb().tuppIndex_) = defragTd_; |
| defragTd_->setReferenceCount(1); |
| ex_expr::exp_return_type evalRetCode = |
| moveExpr()->eval(up_entry->getAtp(), workAtp_,0,-1,&rowLen); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| // Get diags from up_entry onto pentry_down, which |
| // is where the HANDLE_ERROR step expects it. |
| ComDiagsArea *diagsArea = pentry_down->getDiagsArea(); |
| if (diagsArea == NULL) |
| { |
| diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| pentry_down->setDiagsArea (diagsArea); |
| } |
| pentry_down->getDiagsArea()-> |
| mergeAfter(*up_entry->getDiagsArea()); |
| up_entry->setDiagsArea(NULL); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (pool_->get_free_tuple( |
| up_entry->getTupp(hdfsScanTdb().tuppIndex_), |
| rowLen)) |
| return WORK_POOL_BLOCKED; |
| str_cpy_all(up_entry->getTupp(hdfsScanTdb().tuppIndex_).getDataPointer(), |
| defragTd_->getTupleAddress(), |
| rowLen); |
| |
| } |
| else |
| { |
| if (pool_->get_free_tuple( |
| up_entry->getTupp(hdfsScanTdb().tuppIndex_), |
| (Lng32)hdfsScanTdb().outputRowLength_)) |
| return WORK_POOL_BLOCKED; |
| ex_expr::exp_return_type evalRetCode = |
| moveExpr()->eval(up_entry->getAtp(), workAtp_,0,-1,&rowLen); |
| if (evalRetCode == ex_expr::EXPR_ERROR) |
| { |
| // Get diags from up_entry onto pentry_down, which |
| // is where the HANDLE_ERROR step expects it. |
| ComDiagsArea *diagsArea = pentry_down->getDiagsArea(); |
| if (diagsArea == NULL) |
| { |
| diagsArea = |
| ComDiagsArea::allocate(getGlobals()->getDefaultHeap()); |
| pentry_down->setDiagsArea (diagsArea); |
| } |
| pentry_down->getDiagsArea()-> |
| mergeAfter(*up_entry->getDiagsArea()); |
| up_entry->setDiagsArea(NULL); |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| if (hdfsScanTdb().useCif() && rowLen != maxRowLen) |
| { |
| pool_->resizeLastTuple(rowLen, |
| up_entry->getTupp(hdfsScanTdb().tuppIndex_).getDataPointer()); |
| } |
| } |
| } |
| |
| up_entry->upState.setMatchNo(++matches_); |
| if (matches_ == matchBrkPoint_) |
| brkpoint(); |
| qparent_.up->insert(); |
| |
| // use ExOperStats now, to cover OPERATOR stats as well as |
| // ALL stats. |
| if (getStatsEntry()) |
| getStatsEntry()->incActualRowsReturned(); |
| |
| workAtp_->setDiagsArea(NULL); // get rid of warnings. |
| |
| if ((pentry_down->downState.request == ex_queue::GET_N) && |
| (pentry_down->downState.requestValue == matches_)) |
| step_ = CLOSE_ORC_CURSOR; |
| else |
| step_ = GET_ORC_ROW; |
| break; |
| } |
| |
| case CLOSE_ORC_CURSOR: |
| { |
| retcode = orci_->scanClose(); |
| if (retcode < 0) |
| { |
| setupError(EXE_ERROR_FROM_LOB_INTERFACE, retcode, "ORC", "scanClose", |
| orci_->getErrorText(-retcode)); |
| |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| currRangeNum_++; |
| |
| if (currRangeNum_ < (beginRangeNum_ + numRanges_)) |
| { |
| // move to the next file. |
| step_ = INIT_ORC_CURSOR; |
| break; |
| } |
| |
| step_ = DONE; |
| } |
| break; |
| |
| case HANDLE_ERROR: |
| { |
| if (handleError(rc)) |
| return rc; |
| |
| step_ = DONE; |
| } |
| break; |
| |
| case DONE: |
| { |
| if (handleDone(rc)) |
| return rc; |
| |
| step_ = NOT_STARTED; |
| } |
| break; |
| |
| default: |
| { |
| break; |
| } |
| } // switch |
| |
| } // while |
| |
| return WORK_OK; |
| } |
| |
| ExOrcFastAggrTcb::ExOrcFastAggrTcb( |
| const ComTdbOrcFastAggr &orcAggrTdb, |
| ex_globals * glob ) : |
| ExOrcScanTcb(orcAggrTdb, glob), |
| step_(NOT_STARTED) |
| { |
| if (orcAggrTdb.outputRowLength_ > 0) |
| aggrRow_ = new(glob->getDefaultHeap()) char[orcAggrTdb.outputRowLength_]; |
| } |
| |
| ExOrcFastAggrTcb::~ExOrcFastAggrTcb() |
| { |
| } |
| |
| ExWorkProcRetcode ExOrcFastAggrTcb::work() |
| { |
| Lng32 retcode = 0; |
| short rc = 0; |
| |
| while (!qparent_.down->isEmpty()) |
| { |
| ex_queue_entry *pentry_down = qparent_.down->getHeadEntry(); |
| if (pentry_down->downState.request == ex_queue::GET_NOMORE) |
| step_ = DONE; |
| |
| switch (step_) |
| { |
| case NOT_STARTED: |
| { |
| matches_ = 0; |
| |
| orcAggrTdb().getHdfsFileInfoList()->position(); |
| |
| rowCount_ = 0; |
| |
| step_ = ORC_AGGR_INIT; |
| } |
| break; |
| |
| case ORC_AGGR_INIT: |
| { |
| if (orcAggrTdb().getHdfsFileInfoList()->atEnd()) |
| { |
| step_ = ORC_AGGR_PROJECT; |
| break; |
| } |
| |
| hdfo_ = (HdfsFileInfo*)orcAggrTdb().getHdfsFileInfoList()->getNext(); |
| |
| hdfsFileName_ = hdfo_->fileName(); |
| |
| step_ = ORC_AGGR_EVAL; |
| } |
| break; |
| |
| case ORC_AGGR_EVAL: |
| { |
| Int64 currRowCount = 0; |
| retcode = orci_->getRowCount(hdfsFileName_, currRowCount); |
| if (retcode < 0) |
| { |
| setupError(EXE_ERROR_FROM_LOB_INTERFACE, retcode, "ORC", "getRowCount", |
| orci_->getErrorText(-retcode)); |
| |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| rowCount_ += currRowCount; |
| |
| step_ = ORC_AGGR_INIT; |
| } |
| break; |
| |
| case ORC_AGGR_PROJECT: |
| { |
| ExpTupleDesc * projTuppTD = |
| orcAggrTdb().workCriDesc_->getTupleDescriptor |
| (orcAggrTdb().workAtpIndex_); |
| |
| Attributes * attr = projTuppTD->getAttr(0); |
| if (! attr) |
| { |
| step_ = HANDLE_ERROR; |
| break; |
| } |
| |
| if (attr->getNullFlag()) |
| { |
| *(short*)&aggrRow_[attr->getNullIndOffset()] = 0; |
| } |
| |
| str_cpy_all(&aggrRow_[attr->getOffset()], (char*)&rowCount_, sizeof(rowCount_)); |
| |
| step_ = ORC_AGGR_RETURN; |
| } |
| break; |
| |
| case ORC_AGGR_RETURN: |
| { |
| if (qparent_.up->isFull()) |
| return WORK_OK; |
| |
| short rc = 0; |
| if (moveRowToUpQueue(aggrRow_, orcAggrTdb().outputRowLength_, |
| &rc, FALSE)) |
| return rc; |
| |
| step_ = DONE; |
| } |
| break; |
| |
| case HANDLE_ERROR: |
| { |
| if (handleError(rc)) |
| return rc; |
| |
| step_ = DONE; |
| } |
| break; |
| |
| case DONE: |
| { |
| if (handleDone(rc)) |
| return rc; |
| |
| step_ = NOT_STARTED; |
| } |
| break; |
| |
| } // switch |
| } // while |
| |
| return WORK_OK; |
| } |
| |