| // ********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // ********************************************************************** |
| #ifndef EX_HDFS_SCAN_H |
| #define EX_HDFS_SCAN_H |
| |
| // |
| // Task Definition Block |
| // |
| #include "ComTdbHdfsScan.h" |
| #include "ExSimpleSqlBuffer.h" |
| #include "ExStats.h" |
| #include "sql_buffer.h" |
| #include "ex_queue.h" |
| #include <time.h> |
| #include "ExHbaseAccess.h" |
| #include "ExpHbaseInterface.h" |
| |
| #define HIVE_MODE_DOSFORMAT 1 |
| #define HIVE_MODE_CONV_ERROR_TO_NULL 2 |
| |
| // ----------------------------------------------------------------------- |
| // Classes defined in this file |
| // ----------------------------------------------------------------------- |
| class ExHdfsScanTdb; |
| class ExHdfsScanTcb; |
| class HdfsScan; |
| class HdfsClient; |
| |
| // ----------------------------------------------------------------------- |
| // Classes referenced in this file |
| // ----------------------------------------------------------------------- |
| class ex_tcb; |
| class ExHdfsScanStats; |
| class SequenceFileReader; |
| class ExpORCinterface; |
| |
| // ----------------------------------------------------------------------- |
| // ExHdfsScanTdb |
| // ----------------------------------------------------------------------- |
| class ExHdfsScanTdb : public ComTdbHdfsScan |
| { |
| friend class ExOrcScanTdb; |
| |
| public: |
| |
| // --------------------------------------------------------------------- |
| // Constructor is only called to instantiate an object used for |
| // retrieval of the virtual table function pointer of the class while |
| // unpacking. An empty constructor is enough. |
| // --------------------------------------------------------------------- |
| ExHdfsScanTdb() |
| {} |
| |
| virtual ~ExHdfsScanTdb() |
| {} |
| |
| // --------------------------------------------------------------------- |
| // Build a TCB for this TDB. Redefined in the Executor project. |
| // --------------------------------------------------------------------- |
| virtual ex_tcb *build(ex_globals *globals); |
| |
| private: |
| // --------------------------------------------------------------------- |
| // !!!!!!! IMPORTANT -- NO DATA MEMBERS ALLOWED IN EXECUTOR TDB !!!!!!!! |
| // ********************************************************************* |
| // The Executor TDB's are only used for the sole purpose of providing a |
| // way to supplement the Compiler TDB's (in comexe) with methods whose |
| // implementation depends on Executor objects. This is done so as to |
| // decouple the Compiler from linking in Executor objects unnecessarily. |
| // |
| // When a Compiler generated TDB arrives at the Executor, the same data |
| // image is "cast" as an Executor TDB after unpacking. Therefore, it is |
| // a requirement that a Compiler TDB has the same object layout as its |
| // corresponding Executor TDB. As a result of this, all Executor TDB's |
| // must have absolutely NO data members, but only member functions. So, |
| // if you reach here with an intention to add data members to a TDB, ask |
| // yourself two questions: |
| // |
| // 1. Are those data members Compiler-generated? |
| // If yes, put them in the appropriate ComTdb subclass instead. |
| // If no, they should probably belong to someplace else (like TCB). |
| // |
| // 2. Are the classes those data members belong defined in the executor |
| // project? |
| // If your answer to both questions is yes, you might need to move |
| // the classes to the comexe project. |
| // --------------------------------------------------------------------- |
| }; |
| |
| /* |
| USE_LIBHDFS_SCAN - OFF enables hdfs access via java classes |
| org.trafodion.sql.HdfsScan and org.trafodion.sql.HdfsClient |
| Steps involved: |
| 1. Create a new HdfsScan object and set the scan ranges of the fragment instance in it |
| The scan range involves the following and it is determined either at runtime or compile time |
| a) filename |
| b) offset |
| c) len |
| Java layer always reads more than the len by rangeTailIOSize_ to accommdate the record split |
| 2. Two ByteBuffer objects are also passsed to HdfsScan object. These ByteBuffers are backed up by |
| 2 native buffers where the data is fetched. The buffer has a head room of size rangeTailIOSize_ and the |
| data is always read after the head room. |
| 3. HdfsScan returns an int array containing bytesRead, bufNo, rangeNo, isEOF and schedules either |
| the remaining bytes to be read or the next range using ByteBuffers alternatively. |
| 4. HdfsScan returns null array when there is no more data to be read. |
| 5. When the data is processed in one ByteBuffer in the native thread, the data is fetched into the other ByteBuffer by |
| another Java thread. |
| 6. Native layer after processing all the rows in one ByteBuffer, moves the last incomplete row to head room of the |
| other ByteBuffer. Then it requests to check if the read is complete. The native layer processes the buffer starting |
| from the copied partial row. |
| */ |
| |
| class ExHdfsScanTcb : public ex_tcb |
| { |
| |
| public: |
| enum |
| /* |
| USE_LIBHDFS_SCAN - OFF enables hdfs access via java classes |
| org.trafodion.sql.HdfsScan and org.trafodion.sql.HdfsClient |
| Steps involved: |
| 1. Create a new HdfsScan object and set the scan ranges of the fragment instance in it |
| The scan range involves the following and it is determined either at runtime or compile time |
| a) filename |
| b) offset |
| c) len |
| Java layer always reads more than the len by rangeTailIOSize_ to accommdate the record split |
| 2. Two ByteBuffer objects are also passsed to HdfsScan object. These ByteBuffers are backed up by |
| 2 native buffers where the data is fetched. The buffer has a head room of size rangeTailIOSize_ and the |
| data is always read after the head room. |
| 3. HdfsScan returns an int array containing bytesRead, bufNo, rangeNo, isEOF and schedules either |
| the remaining bytes to be read or the next range using ByteBuffers alternatively. |
| 4. HdfsScan returns null array when there is no more data to be read. |
| 5. When the data is processed in one ByteBuffer in the native thread, the data is fetched into the other ByteBuffer by |
| another Java thread. |
| 6. Native layer after processing all the rows in one ByteBuffer, moves the last incomplete row to head room of the |
| other ByteBuffer. Then it requests to check if the read is complete. The native layer processes the buffer starting |
| from the copied incomplete row. |
| */ |
| |
| { |
| BYTES_COMPLETED, |
| BUF_NO, |
| RANGE_NO, |
| IS_EOF |
| } retArrayIndices_; |
| |
| struct HDFS_SCAN_BUF |
| { |
| BYTE *headRoom_; |
| BYTE *buf_; |
| }; |
| ExHdfsScanTcb( const ComTdbHdfsScan &tdb, |
| ex_globals *glob ); |
| |
| ~ExHdfsScanTcb(); |
| |
| void freeResources(); |
| virtual void registerSubtasks(); // register work procedures with scheduler |
| virtual Int32 fixup(); |
| |
| virtual ExWorkProcRetcode work(); |
| |
| // The static work procs for scheduler. |
| static ExWorkProcRetcode sWorkUp(ex_tcb *tcb) |
| { return ((ExHdfsScanTcb *) tcb)->work(); } |
| |
| ex_queue_pair getParentQueue() const { return qparent_;} |
| virtual Int32 numChildren() const { return 0; } |
| virtual const ex_tcb* getChild(Int32 /*pos*/) const { return NULL; } |
| virtual NABoolean needStatsEntry(); |
| virtual ExOperStats *doAllocateStatsEntry(CollHeap *heap, |
| ComTdb *tdb); |
| virtual ex_tcb_private_state *allocatePstates( |
| Lng32 &numElems, |
| Lng32 &pstateLength); |
| |
| ExHdfsScanStats *getHfdsScanStats() |
| { |
| if (getStatsEntry()) |
| return getStatsEntry()->castToExHdfsScanStats(); |
| else |
| return NULL; |
| } |
| |
| protected: |
| enum { |
| NOT_STARTED |
| , INIT_HDFS_CURSOR |
| , OPEN_HDFS_CURSOR |
| , CHECK_FOR_DATA_MOD |
| , CHECK_FOR_DATA_MOD_AND_DONE |
| , ASSIGN_RANGES_AT_RUNTIME |
| , GET_HDFS_DATA |
| , CLOSE_HDFS_CURSOR |
| , PROCESS_HDFS_ROW |
| , RETURN_ROW |
| , REPOS_HDFS_DATA |
| , CLOSE_FILE |
| , ERROR_CLOSE_FILE |
| , COLLECT_STATS |
| , HANDLE_ERROR |
| , HANDLE_EXCEPTION |
| , DONE |
| , HANDLE_ERROR_WITH_CLOSE |
| , HANDLE_ERROR_AND_DONE |
| , SETUP_HDFS_SCAN |
| , TRAF_HDFS_READ |
| , COPY_TAIL_TO_HEAD |
| , STOP_HDFS_SCAN |
| } step_,nextStep_; |
| |
| ///////////////////////////////////////////////////// |
| // Private methods. |
| ///////////////////////////////////////////////////// |
| |
| inline ExHdfsScanTdb &hdfsScanTdb() const |
| { return (ExHdfsScanTdb &) tdb; } |
| |
| inline ex_expr *selectPred() const |
| { return hdfsScanTdb().selectPred_; } |
| |
| inline ex_expr *moveExpr() const |
| { return hdfsScanTdb().moveExpr_; } |
| |
| inline ex_expr *convertExpr() const |
| { return hdfsScanTdb().convertExpr_; } |
| |
| inline ex_expr *moveColsConvertExpr() const |
| { return hdfsScanTdb().moveColsConvertExpr_; } |
| |
| inline bool isSequenceFile() const |
| {return hdfsScanTdb().isSequenceFile(); } |
| |
| // returns ptr to start of next row, if any. Returning NULL |
| // indicates that no complete row was found. Beware of boundary |
| // conditions. Could be an incomplete row in a buffer returned by |
| // hdfsRead. Or it could be the eof (in which case there is a good |
| // row still waiting to be processed). |
| char * extractAndTransformAsciiSourceToSqlRow(int &err, |
| ComDiagsArea * &diagsArea, int mode); |
| |
| void computeRangesAtRuntime(); |
| void deallocateRuntimeRanges(); |
| HdfsFileInfo *getRange(Int32 r) { return getHdfsFileInfoListAsArray().at(r); } |
| |
| short moveRowToUpQueue(const char * row, Lng32 len, |
| short * rc, NABoolean isVarchar); |
| |
| short handleError(short &rc); |
| short handleDone(ExWorkProcRetcode &rc); |
| |
| void handleException(NAHeap *heap, |
| char *loggingDdata, |
| Lng32 loggingDataLen, |
| ComCondition *errorCond); |
| |
| short setupError(Lng32 exeError, Lng32 retcode, |
| const char * str, const char * str2, const char * str3); |
| |
| // Get the array representation of the HdfsFileInfoList to aid in |
| // o(1) access of an entry given an index. |
| HdfsFileInfoArray& getHdfsFileInfoListAsArray() {return hdfsFileInfoListAsArray_;} |
| |
| ///////////////////////////////////////////////////// |
| // Private data. |
| ///////////////////////////////////////////////////// |
| |
| ex_queue_pair qparent_; |
| Int64 matches_; |
| Int64 matchBrkPoint_; |
| atp_struct * workAtp_; |
| Int64 bytesLeft_; |
| hdfsFile hfdsFileHandle_; |
| char * hdfsScanBuffer_; |
| char * hdfsBufNextRow_; |
| |
| char * debugPrevRow_; // Pointer to help with debugging. |
| Int64 debugtrailingPrevRead_; |
| char *debugPenultimatePrevRow_; |
| |
| ExSimpleSQLBuffer *hdfsSqlBuffer_; // this buffer for one row, converted |
| // from ascii to SQL for select pred. |
| tupp hdfsSqlTupp_; // tupp for this one row. |
| char *hdfsSqlData_; // this one row. |
| |
| ExSimpleSQLBuffer *moveExprColsBuffer_; // this buffer for one row, converted |
| // from ascii to SQL for move expr only. |
| tupp moveExprColsTupp_; // tupp for this one row. |
| char *moveExprColsData_; // this one row. |
| |
| // this is where delimited columns that are read from hdfs rows will be moved to. |
| // They will become source for convertExpr which will convert |
| // them to sql row that will be returned by the scan operator. |
| ExSimpleSQLBuffer *hdfsAsciiSourceBuffer_; |
| tupp hdfsAsciiSourceTupp_; |
| char * hdfsAsciiSourceData_; |
| |
| sql_buffer_pool * pool_; // row images after selection pred, |
| // with only the required columns. |
| hdfsFile hdfsFp_; |
| |
| ExLobGlobals *lobGlob_; |
| |
| Int64 requestTag_; |
| Int64 hdfsScanBufMaxSize_; |
| Int64 bytesRead_; |
| Int64 trailingPrevRead_; // for trailing bytes at end of buffer. |
| Int64 numBytesProcessedInRange_; // count bytes of complete records. |
| bool firstBufOfFile_; |
| ExHdfsScanStats * hdfsStats_; |
| HdfsFileInfo *hdfo_; |
| Int64 hdfsOffset_; |
| Lng32 myInstNum_; |
| Lng32 beginRangeNum_; |
| Lng32 numRanges_; |
| Lng32 currRangeNum_; |
| char *endOfRequestedRange_ ; // helps rows span ranges. |
| char * hdfsFileName_; |
| SequenceFileReader* sequenceFileReader_; |
| Int64 stopOffset_; |
| bool seqScanAgain_; |
| char cursorId_[8]; |
| |
| char *loggingFileName_; |
| NABoolean loggingFileCreated_ ; |
| char * hdfsLoggingRow_; |
| char * hdfsLoggingRowEnd_; |
| tupp_descriptor * defragTd_; |
| |
| ExpHbaseInterface * ehi_; |
| |
| NABoolean exception_; |
| ComCondition * lastErrorCnd_; |
| NABoolean checkRangeDelimiter_; |
| |
| NABoolean dataModCheckDone_; |
| ComDiagsArea * loggingErrorDiags_; |
| |
| // this array is populated from the info list stored as Queue. |
| HdfsFileInfoArray hdfsFileInfoListAsArray_; |
| |
| HdfsClient *logFileHdfsClient_; |
| HdfsClient *hdfsClient_; |
| HdfsScan *hdfsScan_; |
| NABoolean useLibhdfsScan_; |
| BYTE *hdfsScanBufBacking_[2]; |
| HDFS_SCAN_BUF hdfsScanBuf_[2]; |
| int retArray_[4]; |
| BYTE *bufBegin_; |
| BYTE *bufEnd_; |
| BYTE *bufLogicalEnd_; |
| long currRangeBytesRead_; |
| int headRoomCopied_; |
| int headRoom_; |
| int prevRangeNum_; |
| int extraBytesRead_; |
| NABoolean recordSkip_; |
| int numFiles_; |
| }; |
| |
| class ExOrcScanTcb : public ExHdfsScanTcb |
| { |
| friend class ExOrcFastAggrTcb; |
| |
| public: |
| ExOrcScanTcb( const ComTdbHdfsScan &tdb, |
| ex_globals *glob ); |
| |
| ~ExOrcScanTcb(); |
| |
| virtual ExWorkProcRetcode work(); |
| |
| protected: |
| enum { |
| NOT_STARTED |
| , INIT_ORC_CURSOR |
| , OPEN_ORC_CURSOR |
| , GET_ORC_ROW |
| , PROCESS_ORC_ROW |
| , CLOSE_ORC_CURSOR |
| , RETURN_ROW |
| , CLOSE_FILE |
| , ERROR_CLOSE_FILE |
| , COLLECT_STATS |
| , HANDLE_ERROR |
| , DONE |
| } step_; |
| |
| virtual Int32 fixup(); |
| ///////////////////////////////////////////////////// |
| // Private methods. |
| ///////////////////////////////////////////////////// |
| private: |
| short extractAndTransformOrcSourceToSqlRow( |
| char * orcRow, |
| Int64 orcRowLen, |
| Lng32 numOrcCols, |
| ComDiagsArea* &diagsArea); |
| |
| ///////////////////////////////////////////////////// |
| // Private data. |
| ///////////////////////////////////////////////////// |
| |
| ExpORCinterface * orci_; |
| |
| Int64 orcStartRowNum_; |
| Int64 orcNumRows_; |
| Int64 orcStopRowNum_; |
| |
| // returned row from orc scanFetch |
| char * orcRow_; |
| Int64 orcRowLen_; |
| Int64 orcRowNum_; |
| Lng32 numOrcCols_; |
| }; |
| |
| // ----------------------------------------------------------------------- |
| // ExOrcFastAggrTdb |
| // ----------------------------------------------------------------------- |
| class ExOrcFastAggrTdb : public ComTdbOrcFastAggr |
| { |
| public: |
| |
| // --------------------------------------------------------------------- |
| // Constructor is only called to instantiate an object used for |
| // retrieval of the virtual table function pointer of the class while |
| // unpacking. An empty constructor is enough. |
| // --------------------------------------------------------------------- |
| ExOrcFastAggrTdb() |
| {} |
| |
| virtual ~ExOrcFastAggrTdb() |
| {} |
| |
| // --------------------------------------------------------------------- |
| // Build a TCB for this TDB. Redefined in the Executor project. |
| // --------------------------------------------------------------------- |
| virtual ex_tcb *build(ex_globals *globals); |
| |
| private: |
| // --------------------------------------------------------------------- |
| // !!!!!!! IMPORTANT -- NO DATA MEMBERS ALLOWED IN EXECUTOR TDB !!!!!!!! |
| // ********************************************************************* |
| // The Executor TDB's are only used for the sole purpose of providing a |
| // way to supplement the Compiler TDB's (in comexe) with methods whose |
| // implementation depends on Executor objects. This is done so as to |
| // decouple the Compiler from linking in Executor objects unnecessarily. |
| // |
| // When a Compiler generated TDB arrives at the Executor, the same data |
| // image is "cast" as an Executor TDB after unpacking. Therefore, it is |
| // a requirement that a Compiler TDB has the same object layout as its |
| // corresponding Executor TDB. As a result of this, all Executor TDB's |
| // must have absolutely NO data members, but only member functions. So, |
| // if you reach here with an intention to add data members to a TDB, ask |
| // yourself two questions: |
| // |
| // 1. Are those data members Compiler-generated? |
| // If yes, put them in the appropriate ComTdb subclass instead. |
| // If no, they should probably belong to someplace else (like TCB). |
| // |
| // 2. Are the classes those data members belong defined in the executor |
| // project? |
| // If your answer to both questions is yes, you might need to move |
| // the classes to the comexe project. |
| // --------------------------------------------------------------------- |
| }; |
| |
| // class ExOrcFastAggrTcb |
| class ExOrcFastAggrTcb : public ExOrcScanTcb |
| { |
| public: |
| ExOrcFastAggrTcb( const ComTdbOrcFastAggr &tdb, |
| ex_globals *glob ); |
| |
| ~ExOrcFastAggrTcb(); |
| |
| virtual ExWorkProcRetcode work(); |
| |
| inline ExOrcFastAggrTdb &orcAggrTdb() const |
| { return (ExOrcFastAggrTdb &) tdb; } |
| |
| protected: |
| enum { |
| NOT_STARTED |
| , ORC_AGGR_INIT |
| , ORC_AGGR_EVAL |
| , ORC_AGGR_PROJECT |
| , ORC_AGGR_RETURN |
| , CLOSE_FILE |
| , ERROR_CLOSE_FILE |
| , COLLECT_STATS |
| , HANDLE_ERROR |
| , DONE |
| } step_; |
| |
| ///////////////////////////////////////////////////// |
| // Private data. |
| ///////////////////////////////////////////////////// |
| private: |
| Int64 rowCount_; |
| char * aggrRow_; |
| }; |
| |
| #define RANGE_DELIMITER '\002' |
| |
| inline char *hdfs_strchr(char *s, int c, const char *end, NABoolean checkRangeDelimiter, int mode , int *changedLen) |
| { |
| char *curr = (char *)s; |
| int count=0; |
| //changedLen is lenght of \r which removed by this function |
| *changedLen = 0; |
| if( (mode & HIVE_MODE_DOSFORMAT ) == 0) |
| { |
| while (curr < end) { |
| if (*curr == c) |
| { |
| return curr; |
| } |
| if (checkRangeDelimiter &&*curr == RANGE_DELIMITER) |
| return NULL; |
| curr++; |
| } |
| } |
| else |
| { |
| while (curr < end) { |
| if (*curr == c) |
| { |
| if(count>0 && c == '\n') |
| { |
| if(s[count-1] == '\r') |
| *changedLen = 1; |
| } |
| return curr - *changedLen; |
| } |
| if (checkRangeDelimiter &&*curr == RANGE_DELIMITER) |
| return NULL; |
| curr++; |
| count++; |
| } |
| } |
| return NULL; |
| } |
| |
| |
| inline char *hdfs_strchr(char *s, int rd, int cd, const char *end, NABoolean checkRangeDelimiter, NABoolean *rdSeen, int mode, int* changedLen) |
| { |
| char *curr = (char *)s; |
| int count = 0; |
| //changedLen is lenght of \r which removed by this function |
| *changedLen = 0; |
| if( (mode & HIVE_MODE_DOSFORMAT)>0 ) //check outside the while loop to make it faster |
| { |
| while (curr < end) { |
| if (*curr == rd) { |
| if(count>0 && rd == '\n') |
| { |
| if(s[count-1] == '\r') |
| *changedLen = 1; |
| } |
| *rdSeen = TRUE; |
| return curr - *changedLen; |
| } |
| else |
| if (*curr == cd) { |
| *rdSeen = FALSE; |
| return curr; |
| } |
| else |
| if (checkRangeDelimiter && *curr == RANGE_DELIMITER) { |
| *rdSeen = TRUE; |
| return NULL; |
| } |
| curr++; |
| count++; |
| } |
| } |
| else |
| { |
| while (curr < end) { |
| if (*curr == rd) { |
| *rdSeen = TRUE; |
| return curr; |
| } |
| else |
| if (*curr == cd) { |
| *rdSeen = FALSE; |
| return curr; |
| } |
| else |
| if (checkRangeDelimiter && *curr == RANGE_DELIMITER) { |
| *rdSeen = TRUE; |
| return NULL; |
| } |
| curr++; |
| } |
| } |
| *rdSeen = FALSE; |
| return NULL; |
| } |
| |
| |
| |
| #endif |