| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| /* -*-C++-*- |
| ***************************************************************************** |
| * |
| * File: ex_lob.h |
| * Description: header file for LOB storage and retrieval |
| * |
| * |
| * Created: 10/29/2012 |
| * Language: C++ |
| * |
| * |
| * |
| * |
| ***************************************************************************** |
| */ |
| #ifndef EXP_LOB_ACCESS_H |
| #define EXP_LOB_ACCESS_H |
| |
| #include <string> |
| #include <map> |
| #include <list> |
| |
| #include "ExpLOBenums.h" |
| #include "ExpLOBstats.h" |
| #include "ExpLOBinterface.h" |
| #include "ComSmallDefs.h" |
| #include "Globals.h" |
| #include <seabed/ms.h> |
| #include <seabed/fs.h> |
| #include "ex_stdh.h" |
| #include "ExStats.h" |
| #ifdef LOB_DEBUG_STANDALONE |
| #define Int64 long long |
| #define Lng32 long |
| #else |
| #endif |
| |
| #define SQ_USE_HDFS 1 |
| #include "hdfs.h" |
| using namespace std; |
| |
| |
| class ExLobGlobals; |
| // This class defines the request used to construct the message to send over |
| // to the mxlobsrvr process. It's currently not used. All lob functionailty is |
| // now in te master process. We are retaining it here for furture use. |
| class ExLobRequest |
| { |
| public: |
| ExLobRequest(); |
| ~ExLobRequest(); |
| Ex_Lob_Error send(); |
| |
| void setValues(char *descFileName, Int64 descNumIn, Int64 handleInLen, |
| char *handleIn, LobsStorage storage, Int64 transId, |
| SB_Transid_Type transIdBig, |
| SB_Transseq_Type transStartId, |
| char *blackBox, Int64 blackBoxLen); |
| void getValues(Int64 &descNumOut, Int64 &handleOutLen, |
| char *handleOut, Ex_Lob_Error &requestStatus, |
| Int64 &cliError, |
| char *blackBox, Int64 &blackBoxLen); |
| |
| Int64 getDescNumIn() { return descNumIn_; } |
| void setDescNumIn(Int64 descNum) { descNumIn_ = descNum; } |
| Int64 getDescNumOut() { return descNumOut_; } |
| void setDescNumOut(Int64 descNum) { descNumOut_ = descNum; } |
| Int64 getDataOffset() { return dataOffset_; } |
| void setDataOffset(int dataOffset) { dataOffset_ = dataOffset; } |
| LobsRequest getType() { return type_; } |
| void setType(LobsRequest type) { type_ = type; } |
| char *getDescFileName() { return descFileName_; } |
| Int64 getOperLen() { return operLen_; } |
| void setOperLen(Int64 len) { operLen_ = len; } |
| void log(); |
| void setError(Ex_Lob_Error err) { error_ = err; } |
| Ex_Lob_Error getError() { return error_; } |
| Ex_Lob_Error getStatus() { return status_; } |
| char *getHandleIn() { return handleIn_; } |
| Int64 getHandleInLen() { return handleInLen_; } |
| char *getHandleOut() { return handleOut_; } |
| void setHandleOutLen(Lng32 len) { handleOutLen_ = len; } |
| void setCliError(int cliErr) { cliError_ = cliErr; } |
| int getCliError() { return (int)cliError_; } |
| Int64 getTransId() { return transId_; } |
| SB_Transid_Type getTransIdBig() { return transIdBig_; } |
| SB_Transseq_Type getTransStartId() { return transStartId_; } |
| Int64 getBlackBoxLen() { return blackBoxLen_; }; |
| void setBlackBoxLen(Int64 len) { blackBoxLen_ = len; } |
| char *getBlackBox() { return blackBox_; } |
| |
| void incrReqNum() { reqNum_++; } |
| Int64 getReqNum() { return reqNum_; } |
| |
| private: |
| Int64 reqNum_; |
| Int64 descNumIn_; |
| Int64 descNumOut_; |
| char handleIn_[LOB_HANDLE_LEN]; |
| Int64 handleInLen_; |
| char handleOut_[LOB_HANDLE_LEN]; |
| Int64 handleOutLen_; |
| Int64 dataOffset_; |
| LobsRequest type_; |
| LobsStorage storage_; |
| Int64 operLen_; |
| Ex_Lob_Error error_; |
| Int64 cliError_; |
| Ex_Lob_Error status_; |
| Int64 transId_; |
| SB_Transid_Type transIdBig_; |
| SB_Transseq_Type transStartId_; |
| char descFileName_[MAX_LOB_FILE_NAME_LEN]; // TODO: If we ever use this again, change to use string or NAString |
| char blackBox_[MAX_BLACK_BOX_LEN]; |
| Int64 blackBoxLen_; |
| }; |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // THE SQL LOB API |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| Ex_Lob_Error ExLobsOper ( |
| char *lobName, // lob name |
| ExHdfsScanStats *hdfsAccessStats, // Statistics for Lob access |
| char *handleIn, // input handle (for cli calls) |
| Int32 handleInLen, // input handle len |
| char *hdfsServer, // server where hdfs fs resides |
| Int64 hdfsPort, // port number to access hdfs server |
| char *handleOut, // output handle (for cli calls) |
| Int32 &handleOutLen, // output handle len |
| Int64 descNumIn, // input desc Num (for flat files only) |
| Int64 &descNumOut, // output desc Num (for flat files only) |
| Int64 &retOperLen, // length of data involved in this operation |
| Int64 requestTagIn, // only for checking status |
| Int64 &requestTagOut, // returned with every request other than check status |
| Ex_Lob_Error &requestStatus, // returned req status |
| Int64 &cliError, // err returned by cli call |
| char *dir, // directory in the storage |
| LobsStorage storage, // storage type |
| char *source, // source (memory addr, filename, foreign lob etc) |
| Int64 sourceLen, // source len (memory len, foreign desc offset etc) |
| Int64 cursorBytes, |
| char *cursorId, |
| LobsOper operation, // LOB operation |
| LobsSubOper subOperation, // LOB sub operation |
| Int64 waited, // waited or nowaited |
| ExLobGlobals *&globPtr, // ptr to the Lob objects. |
| Int64 transId, |
| void *blackBox, // black box to be sent to cli |
| Int32 blackBoxLen, // length of black box |
| Int64 lobMaxSize = 0, // max size of lob. |
| Int64 lobMaxChunkMemSize = 0 ,//max length of intermediate mem buffer used to do i/o. |
| Int64 lobGCLimit = 0, // size at which GC must be triggered |
| int bufferSize =0, |
| short replication =0, |
| int blocksize=0, |
| Lng32 openType=0 |
| |
| ); |
| |
| class ExLobLock |
| { |
| public: |
| ExLobLock(); |
| ~ExLobLock(); |
| |
| void lock(); |
| void unlock(); |
| void wakeOne(); |
| void wakeAll(); |
| void wait(); |
| Int64 waiters() { return waiters_; } |
| |
| private: |
| pthread_mutex_t mutex_; |
| pthread_cond_t workBell_; |
| Int64 waiters_; |
| bool bellRang_; |
| }; |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // ExLobDescHeader |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| class ExLobDescHeader |
| { |
| public: |
| |
| ExLobDescHeader(unsigned int size); |
| ExLobDescHeader() { }; |
| ~ExLobDescHeader(); |
| |
| int getFreeDesc() { return freeDesc_; } |
| void incFreeDesc() { freeDesc_++; } |
| unsigned int getAvailSize() { return availSize_; } |
| void decAvailSize(int size) { availSize_ -= size; } |
| int getDataOffset() { return dataOffset_; } |
| void incDataOffset(int size) { dataOffset_ += size; } |
| |
| |
| |
| private: |
| |
| int freeDesc_; |
| int delDesc_; |
| int dataOffset_; |
| unsigned int availSize_; |
| }; |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // ExLobDesc |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| class ExLobDesc |
| { |
| public: |
| ExLobDesc(int offset, int size, int tail); |
| ExLobDesc() : |
| dataOffset_(0), |
| dataSize_(0), |
| dataState_(0), |
| tail_(0), |
| next_(0), |
| prev_(0), |
| nextFree_(0){}; |
| ~ExLobDesc(); |
| |
| int getTailDescNum() { return tail_; } |
| void setTailDescNum(int tail) { tail_ = tail; } |
| int getNextDescNum() { return next_; } |
| void setNextDescNum(int next) { next_ = next; } |
| void setPrevDescNum(int prev) { prev_ = prev; } |
| int getPrevDescNum() { return prev_; } |
| int getSize() { return dataSize_; } |
| void setSize(int size) { dataSize_ = size; } |
| int getOffset() { return dataOffset_; } |
| void setOffset(int offset) { dataOffset_ = offset; } |
| void setDataState(int dataState) { dataState_ = dataState; } |
| |
| |
| private: |
| |
| int dataOffset_; |
| int dataSize_; |
| int dataState_; |
| int tail_; |
| int next_; |
| int prev_; |
| int nextFree_; |
| }; |
| |
| class ExLobInMemoryDescChunksEntry |
| { |
| public: |
| ExLobInMemoryDescChunksEntry(): |
| currentOffset_ (-1), |
| newOffset_( -1), |
| descPartnKey_(-1), |
| descSyskey_(-1), |
| chunkLen_(-1), |
| chunkNum_(-1) |
| {} |
| ExLobInMemoryDescChunksEntry(Int64 co,Int64 partnKey,Int64 syskey,Int64 chunkLen,Int32 chunkNum) |
| { |
| currentOffset_ = co; newOffset_ = -1;descPartnKey_ = partnKey; descSyskey_ = syskey; chunkLen_ = chunkLen; chunkNum_ =chunkNum; |
| } |
| |
| void setCurrentOffset(Int64 co) { currentOffset_ = co;} |
| Int64 getCurrentOffset(){return currentOffset_;} |
| |
| void setNewOffset(Int64 no) { newOffset_ = no;} |
| Int64 getNewOffset(){return newOffset_;} |
| |
| void setDescPartnKey(Int64 dk) { descPartnKey_ = dk;} |
| Int64 getDescPartnKey(){return descPartnKey_;} |
| |
| void setSyskey(Int64 sk) { descSyskey_ = sk;} |
| Int64 getSyskey(){return descSyskey_;} |
| |
| void setChunkLen(Int64 cl) { chunkLen_ = cl;} |
| Int64 getChunkLen(){return chunkLen_;} |
| |
| void setChunkNum(Int32 cn) { chunkNum_ = cn;} |
| Int32 getChunkNum(){return chunkNum_;} |
| private: |
| Int64 currentOffset_; |
| Int64 newOffset_; |
| Int64 descPartnKey_; |
| Int64 descSyskey_; |
| Int64 chunkLen_; |
| Int32 chunkNum_; |
| }; |
| |
| class ExLobCursorBuffer |
| { |
| public: |
| ExLobCursorBuffer() : |
| data_(NULL), |
| bytesRemaining_(-1), |
| bytesUsed_(-1) |
| { |
| } |
| |
| ~ExLobCursorBuffer() |
| { |
| // do nothing here as this object is copied between lists. |
| } |
| |
| public: |
| char *data_; |
| Int64 bytesRemaining_; |
| Int64 bytesUsed_; |
| }; |
| |
| class ExLobCursor |
| { |
| public: |
| |
| ExLobCursor() { } |
| ~ExLobCursor(){ }; |
| void emptyPrefetchList(ExLobGlobals *lobGlobals); |
| |
| public: |
| void *cliInterface_; // passed back and forth for cursors. |
| Int64 bytesRead_; // bytes already read |
| Int64 descOffset_; // from this desc offset |
| Int64 descSize_; // which is of this max size. |
| bool eod_; // end of file, error 100 from cli. |
| bool eor_; // end of range |
| bool eol_; // end of life |
| LobsCursorType type_; // simple or multiple |
| Int64 bufMaxSize_; // max size of buffer |
| Int64 maxBytes_; // bytesLeft to prefetch |
| Int64 prefetch_; // prefetch or not to prefetch |
| string name_; // TODO: change to NAString when ExLobCursor is allocated off of an NAHeap |
| |
| Lng32 currentRange_; // current index of file for multi cursor |
| Lng32 endRange_; // end index of file for multi cursor |
| struct hdfsFile_internal* currentFd_; // file pointed by currentRange_ |
| bool currentEod_; // eod of currentFd_ |
| Int64 currentStartOffset_; // start offset in currentFd_ |
| Int64 currentBytesToRead_; // bytes to read in currentFd_ |
| Int64 currentBytesRead_; // bytes read in currentFd_ |
| |
| ExLobLock lock_; // lock for this cursor |
| typedef list<ExLobCursorBuffer *> bufferList_t; |
| bufferList_t prefetchBufList_; |
| |
| struct timespec openTime_; |
| struct timespec closeTime_; |
| |
| Int64 bufferHits_; |
| Int64 bufferMisses_; |
| }; |
| |
| typedef class ExLobCursor cursor_t; |
| |
| typedef map<string, cursor_t> lobCursors_t; // handle, offset |
| typedef map<string, cursor_t>::iterator lobCursors_it; |
| void cleanupLOBDataDescFiles(const char *hdfsServer, int hdfsPort, const char *hdfsLoc); |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // ExLob |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| class ExLob : public NABasicObject |
| { |
| public: |
| |
| ExLob(NAHeap * heap, ExHdfsScanStats *hdfsAccessStats); // default constructor |
| virtual ~ExLob(); // default desctructor |
| |
| void setUseLibHdfs(NABoolean useLibHdfs) |
| { useLibHdfs_ = useLibHdfs; } |
| |
| NABoolean useLibHdfs() { return useLibHdfs_; } |
| Ex_Lob_Error initialize(const char *lobFile, Ex_Lob_Mode mode, char *dir, |
| LobsStorage storage, char *hdfsServer, Int64 hdfsPort, |
| char *lobLocation, |
| int bufferSize = 0, short replication =0, |
| int blocksize=0, Int64 lobMaxSize = 0, |
| ExLobGlobals *lobGlobals = NULL); |
| |
| Ex_Lob_Error writeDesc(Int64 &sourceLen, char *source, LobsSubOper subOperation, Int64 &descNumOut, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize,Int64 lobGCLimit, char * handleIn, Int32 handleInLen, char *blackBox, Int32 *blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 xnId,void *lobGlobals); |
| Ex_Lob_Error writeLobData(char *source, Int64 sourceLen, |
| LobsSubOper subOperation, |
| Int64 tgtOffset,Int64 &operLen, |
| Int64 lobMaxMemChunkLen); |
| Ex_Lob_Error writeDataSimple(char *data, Int64 size, LobsSubOper subOperation, Int64 &operLen, |
| int bufferSize = 0, short replication =0, int blocksize=0); |
| Ex_Lob_Error readToMem(char *memAddr, Int64 size, Int64 &operLen,char * handleIn, Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 transId); |
| Ex_Lob_Error readToFile(char *fileName, Int64 tgtLen,Int64 &operLen,Int64 lobMaxChunkMemLen, Int32 fileflags, char *handleIn,Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 transId); |
| Ex_Lob_Error readCursor(char *tgt, Int64 tgtSize, char *handleIn, Int32 handleInLen, Int64 &operLen,Int64 transId); |
| Ex_Lob_Error readCursorData(char *tgt, Int64 tgtSize, cursor_t &cursor, Int64 &operLen,char *handleIn, Int32 handeLenIn,Int64 transId); |
| Ex_Lob_Error readCursorDataSimple(char *tgt, Int64 tgtSize, cursor_t &cursor, Int64 &operLen); |
| Ex_Lob_Error readDataCursorSimple(const char *fileName, char *tgt, Int64 tgtSize, Int64 &operLen, ExLobGlobals *lobGlobals); |
| bool hasNoOpenCursors() { return lobCursors_.empty(); } |
| Ex_Lob_Error openCursor(char *handleIn, Int32 handleInLen,Int64 transId); |
| Ex_Lob_Error openDataCursor(const char *fileName, LobsCursorType type, Int64 range, |
| Int64 bytesLeft, Int64 bufMaxSize, Int64 prefetch, ExLobGlobals *lobGlobals, Int32 *hdfsDetailError = NULL); |
| Ex_Lob_Error deleteCursor(const char *cursorName, ExLobGlobals *lobGlobals); |
| Ex_Lob_Error fetchCursor(char *handleIn, Int32 handleLenIn, Int64 &outOffset, Int64 &outSize,NABoolean &isEOD,Int64 transId); |
| Ex_Lob_Error insertData(char *data, Int64 size, LobsSubOper so,Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize,char *handleIn,Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 &handleOutLen, void *lobGlobals); |
| Ex_Lob_Error insertSelect(ExLob *srcLobPtr,char *handleIn,Int32 handleInLen, char *source, Int64 sourceLen, Int64 &operLen,Int64 lobMaxSize, Int64 lobMaxChunkMemLen,Int64 lobGCLimit, char *blackBox, Int32 blackBoxLen, char *handleOut, Int32 &handleOutLen,LobsSubOper so,Int64 xnId,void *lobGlobals); |
| Ex_Lob_Error append(char *data, Int64 size, LobsSubOper so, Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemLen,Int64 lobGCLimit, char *handleIn,Int32 handleInLen, char * handleOut, Int32 &handleOutLen, Int64 xnId,void *lobGlobals); |
| Ex_Lob_Error update(char *data, Int64 size, LobsSubOper so,Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize,Int64 lobMaxChunkMemLen,Int64 lobGCLimit,char *handleIn,Int32 handleInLen, char * handleOut, Int32 &handleOutLen, Int64 xnId,void *lobGlobals); |
| Ex_Lob_Error readSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset); |
| Ex_Lob_Error readHdfsSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset); |
| Ex_Lob_Error readLocalSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset); |
| Ex_Lob_Error readExternalSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset); |
| Ex_Lob_Error statSourceFile(char *srcfile, Int64 &sourceEOF); |
| Ex_Lob_Error delDesc(char *handleIn, Int32 handleInLen, Int64 transId); |
| Ex_Lob_Error purgeLob(); |
| Ex_Lob_Error closeFile(); |
| LobInputOutputFileType fileType(char *ioFileName); |
| Ex_Lob_Error closeCursor(char *handleIn, Int32 handleInLen,Int64 transId); |
| Ex_Lob_Error closeDataCursorSimple(const char *fileName, ExLobGlobals *lobGlobals); |
| |
| Ex_Lob_Error doSanityChecks(char *dir, LobsStorage storage, |
| Int32 handleInLen, Int32 handleOutLen, |
| Int32 blackBoxLen); |
| Ex_Lob_Error allocateDesc(unsigned int size, Int64 &descNum, Int64 &dataOffset,Int64 lobMaxSize,Int64 lobMaxChunkMemSize, char *handleIn, Int32 handleInLen,Int64 lobGCLimit, void *lobGlobals); |
| Ex_Lob_Error readStats(char *buffer); |
| Ex_Lob_Error initStats(); |
| |
| Ex_Lob_Error insertDesc(Int64 offset, Int64 size, char *handleIn, Int32 handleInLen, char *handleOut, Int32 &handleOutLen, char *blackBox, Int32 blackBoxLen,Int64 xnId,void *lobGlobals) ; |
| |
| Ex_Lob_Error lockDesc(); |
| Ex_Lob_Error unlockDesc(); |
| const char *getDataFileName() { return lobDataFile_.data(); } |
| |
| int getErrNo(); |
| |
| |
| Ex_Lob_Error getDesc(ExLobDesc &desc,char * handleIn, Int32 handleInLen, char *blackBox, Int32 *blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 transId); |
| |
| Ex_Lob_Error writeData(Int64 offset, char *data, Int32 size, Int64 &operLen); |
| Ex_Lob_Error readDataToMem(char *memAddr, Int64 offset, Int64 size, |
| Int64 &operLen,char *handleIn, Int32 handleLenIn, |
| NABoolean multipleChunks, Int64 transId); |
| |
| Ex_Lob_Error readDataToLocalFile(char *fileName, Int64 offset, Int64 size,Int64 &operLen,Int64 lobMaxChunkMemLen ,Int32 fileFlags,char *handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId); |
| Ex_Lob_Error readDataToHdfsFile(char *fileName, Int64 offset, Int64 size, Int64 &operLen,Int64 lobMaxChunkMemLen, Int32 fileflags,char *handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId); |
| Ex_Lob_Error readDataToExternalFile(char *tgtFileName, Int64 offset, Int64 size, Int64 &operLen, Int64 lobMaxChunkMemLen, Int32 fileflags,char *handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId); |
| Ex_Lob_Error readDataFromFile(char *memAddr, Int64 len, Int64 &operLen); |
| Ex_Lob_Error compactLobDataFile(ExLobInMemoryDescChunksEntry *dcArray,Int32 numEntries); |
| Ex_Lob_Error restoreLobDataFile(); |
| Ex_Lob_Error purgeBackupLobDataFile(); |
| |
| // dirPath: path to needed directory (includes directory name) |
| // modTS is the latest timestamp on any file/dir under dirPath. |
| // This method validates that current modTS is not greater then input modTS. |
| // On return, failedModTS contains current timestamp that caused mismatch. |
| // Return: LOB_OPER_OK, if passes. LOB_DATA_MOD_CHECK_ERROR, if fails. |
| Ex_Lob_Error dataModCheck( |
| char * dirPath, |
| Int64 modTS, |
| Lng32 numOfPartLevels, |
| ExLobGlobals *lobGlobals, |
| Int64 &failedModTS, |
| char *failedLocBuf, |
| Int32 *failedLocBufLen); |
| |
| Ex_Lob_Error emptyDirectory(char* dirPath, ExLobGlobals* lobGlobals); |
| |
| ExHdfsScanStats *getStats() { return stats_; } |
| NAHeap *getLobGlobalHeap() { return lobGlobalHeap_;} |
| Ex_Lob_Error getLength(char *handleIn, Int32 handleInLen,Int64 &outLobLen,LobsSubOper so, Int64 transId); |
| Ex_Lob_Error getOffset(char *handleIn, Int32 handleInLen,Int64 &outOffset,LobsSubOper so, Int64 transId); |
| Ex_Lob_Error getFileName(char *handleIn, Int32 handleInLen, char *outFileName, Int32 &outFileLen, LobsSubOper so, Int64 transId); |
| // ExLobRequest *getRequest() { return &request_; } |
| |
| //The next 2 functions are not active at this point. They serve as an example |
| //on how to send requests across to the mxlobsrvr process from the master |
| //process |
| Ex_Lob_Error getDesc(ExLobRequest *request); |
| Ex_Lob_Error sendReqToLobServer() ; |
| |
| public: |
| |
| NAString lobDataFile_; |
| lobCursors_t lobCursors_; |
| ExLobLock lobCursorLock_; |
| LobsStorage storage_; |
| string lobStorageLocation_; // lob data directory |
| char *hdfsServer_; |
| Int64 hdfsPort_; |
| hdfsFS fs_; |
| hdfsFile fdData_; |
| int openFlags_; |
| ExHdfsScanStats *stats_; |
| bool prefetchQueued_; |
| NAHeap *lobGlobalHeap_; |
| NABoolean lobTrace_; |
| NABoolean useLibHdfs_; |
| HdfsClient *hdfsClient_; |
| }; |
| |
| typedef map<string, ExLob *> lobMap_t; |
| typedef map<string, ExLob *>::iterator lobMap_it; |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // ExLobHdfsRequest |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| class ExLobHdfsRequest |
| { |
| public: |
| ExLobHdfsRequest(LobsHdfsRequestType reqType, ExLobCursor *cursor); |
| ExLobHdfsRequest(LobsHdfsRequestType reqType, ExLob *lobPtr, ExLobCursor *cursor); |
| ExLobHdfsRequest(LobsHdfsRequestType reqType); |
| ~ExLobHdfsRequest(); |
| |
| bool isShutDown() { return reqType_ == Lob_Hdfs_Shutdown; } |
| |
| public : |
| LobsHdfsRequestType reqType_; |
| ExLob *lobPtr_; |
| ExLobCursor *cursor_; |
| char *buffer_; |
| int size_; |
| Ex_Lob_Error error_; |
| }; |
| |
| |
| class ExLobPreOpen : public NABasicObject |
| { |
| public : |
| |
| ExLobPreOpen(ExLob *lobPtr, const char *cursorName, Int64 range, |
| Int64 bufMaxSize, Int64 maxBytes, Int64 waited, |
| NAHeap * heap) |
| : lobPtr_(lobPtr), |
| cursorName_(cursorName,heap), |
| range_(range), |
| bufMaxSize_(bufMaxSize), |
| maxBytes_(maxBytes), |
| waited_(waited) |
| { |
| // nothing else to do |
| } |
| |
| virtual ~ExLobPreOpen() |
| { |
| // nothing else to do |
| }; |
| |
| public : |
| ExLob *lobPtr_; |
| NAString cursorName_; |
| Int64 range_; |
| Int64 bufMaxSize_; |
| Int64 maxBytes_; |
| Int64 waited_; |
| }; |
| void lobDebugInfo(const char *logMessage,Int32 errorcode, |
| Int32 line, NABoolean lobTrace); |
| typedef list<ExLobPreOpen *> preOpenList_t; |
| typedef list<ExLobPreOpen *>::iterator preOpenList_it; |
| |
| class ExLobGlobals |
| { |
| public : |
| |
| ExLobGlobals(NAHeap *lobHeap); |
| ~ExLobGlobals(); |
| |
| Ex_Lob_Error initialize(); |
| lobMap_t * getLobMap() { return lobMap_; } |
| Ex_Lob_Error getLobPtr(char *lobName, ExLob *& lobPtr); |
| Ex_Lob_Error delLobPtr(char *lobName); |
| hdfsFS getHdfsFs() { return fs_; } |
| void setHdfsFs(hdfsFS fs) { fs_ = fs; } |
| Ex_Lob_Error startWorkerThreads(); |
| void doWorkInThread(); |
| ExLobHdfsRequest* getHdfsRequest(); |
| Ex_Lob_Error enqueueRequest(ExLobHdfsRequest *request); |
| Ex_Lob_Error enqueuePrefetchRequest(ExLob *lobPtr, ExLobCursor *cursor); |
| Ex_Lob_Error enqueueShutdownRequest(); |
| Ex_Lob_Error performRequest(ExLobHdfsRequest *request); |
| Ex_Lob_Error addToPreOpenList(ExLobPreOpen *preOpenObj); |
| Ex_Lob_Error processPreOpens(); |
| NABoolean isCliInitialized() |
| { |
| return isCliInitialized_; |
| } |
| |
| void setCliInitialized() |
| { |
| isCliInitialized_ = TRUE; |
| } |
| |
| void setHeap(void * heap) |
| { |
| heap_ = (NAHeap *) heap; |
| } |
| NAHeap * getHeap() |
| { |
| return heap_; |
| } |
| |
| NABoolean useLibHdfs() { return useLibHdfs_; } |
| void setUseLibHdfs(NABoolean useLibHdfs) |
| { useLibHdfs_ = useLibHdfs; } |
| |
| void traceMessage(const char *logMessage, ExLobCursor *c, int line); |
| |
| public : |
| lobMap_t *lobMap_; |
| hdfsFS fs_; |
| pthread_t threadId_[NUM_WORKER_THREADS]; |
| typedef list<ExLobHdfsRequest *> reqList_t; |
| reqList_t reqQueue_; |
| ExLobLock reqQueueLock_; |
| preOpenList_t preOpenList_; |
| ExLobLock preOpenListLock_; |
| typedef list<ExLobCursorBuffer *> bufferList_t; |
| bufferList_t postfetchBufList_; |
| ExLobLock postfetchBufListLock_; |
| NABoolean isCliInitialized_; |
| NABoolean isHive_; |
| FILE *threadTraceFile_; |
| NAHeap *heap_; |
| NABoolean lobTrace_; |
| long numWorkerThreads_; |
| NABoolean useLibHdfs_; |
| }; |
| |
| |
| |
| #endif |