blob: 4097d62e577f301c164be3d7d2ae511d9bc9bd58 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ex_lob.h
* Description: header file for LOB storage and retrieval
*
*
* Created: 10/29/2012
* Language: C++
*
*
*
*
*****************************************************************************
*/
#ifndef EXP_LOB_ACCESS_H
#define EXP_LOB_ACCESS_H
#include <string>
#include <map>
#include <list>
#include "ExpLOBenums.h"
#include "ExpLOBstats.h"
#include "ExpLOBinterface.h"
#include "ComSmallDefs.h"
#include "Globals.h"
#include <seabed/ms.h>
#include <seabed/fs.h>
#include "ex_stdh.h"
#include "ExStats.h"
#ifdef LOB_DEBUG_STANDALONE
#define Int64 long long
#define Lng32 long
#else
#endif
#define SQ_USE_HDFS 1
#include "hdfs.h"
using namespace std;
class ExLobGlobals;
// This class defines the request used to construct the message to send over
// to the mxlobsrvr process. It's currently not used. All lob functionailty is
// now in te master process. We are retaining it here for furture use.
class ExLobRequest
{
public:
ExLobRequest();
~ExLobRequest();
Ex_Lob_Error send();
void setValues(char *descFileName, Int64 descNumIn, Int64 handleInLen,
char *handleIn, LobsStorage storage, Int64 transId,
SB_Transid_Type transIdBig,
SB_Transseq_Type transStartId,
char *blackBox, Int64 blackBoxLen);
void getValues(Int64 &descNumOut, Int64 &handleOutLen,
char *handleOut, Ex_Lob_Error &requestStatus,
Int64 &cliError,
char *blackBox, Int64 &blackBoxLen);
Int64 getDescNumIn() { return descNumIn_; }
void setDescNumIn(Int64 descNum) { descNumIn_ = descNum; }
Int64 getDescNumOut() { return descNumOut_; }
void setDescNumOut(Int64 descNum) { descNumOut_ = descNum; }
Int64 getDataOffset() { return dataOffset_; }
void setDataOffset(int dataOffset) { dataOffset_ = dataOffset; }
LobsRequest getType() { return type_; }
void setType(LobsRequest type) { type_ = type; }
char *getDescFileName() { return descFileName_; }
Int64 getOperLen() { return operLen_; }
void setOperLen(Int64 len) { operLen_ = len; }
void log();
void setError(Ex_Lob_Error err) { error_ = err; }
Ex_Lob_Error getError() { return error_; }
Ex_Lob_Error getStatus() { return status_; }
char *getHandleIn() { return handleIn_; }
Int64 getHandleInLen() { return handleInLen_; }
char *getHandleOut() { return handleOut_; }
void setHandleOutLen(Lng32 len) { handleOutLen_ = len; }
void setCliError(int cliErr) { cliError_ = cliErr; }
int getCliError() { return (int)cliError_; }
Int64 getTransId() { return transId_; }
SB_Transid_Type getTransIdBig() { return transIdBig_; }
SB_Transseq_Type getTransStartId() { return transStartId_; }
Int64 getBlackBoxLen() { return blackBoxLen_; };
void setBlackBoxLen(Int64 len) { blackBoxLen_ = len; }
char *getBlackBox() { return blackBox_; }
void incrReqNum() { reqNum_++; }
Int64 getReqNum() { return reqNum_; }
private:
Int64 reqNum_;
Int64 descNumIn_;
Int64 descNumOut_;
char handleIn_[LOB_HANDLE_LEN];
Int64 handleInLen_;
char handleOut_[LOB_HANDLE_LEN];
Int64 handleOutLen_;
Int64 dataOffset_;
LobsRequest type_;
LobsStorage storage_;
Int64 operLen_;
Ex_Lob_Error error_;
Int64 cliError_;
Ex_Lob_Error status_;
Int64 transId_;
SB_Transid_Type transIdBig_;
SB_Transseq_Type transStartId_;
char descFileName_[MAX_LOB_FILE_NAME_LEN]; // TODO: If we ever use this again, change to use string or NAString
char blackBox_[MAX_BLACK_BOX_LEN];
Int64 blackBoxLen_;
};
///////////////////////////////////////////////////////////////////////////////
// THE SQL LOB API
///////////////////////////////////////////////////////////////////////////////
Ex_Lob_Error ExLobsOper (
char *lobName, // lob name
ExHdfsScanStats *hdfsAccessStats, // Statistics for Lob access
char *handleIn, // input handle (for cli calls)
Int32 handleInLen, // input handle len
char *hdfsServer, // server where hdfs fs resides
Int64 hdfsPort, // port number to access hdfs server
char *handleOut, // output handle (for cli calls)
Int32 &handleOutLen, // output handle len
Int64 descNumIn, // input desc Num (for flat files only)
Int64 &descNumOut, // output desc Num (for flat files only)
Int64 &retOperLen, // length of data involved in this operation
Int64 requestTagIn, // only for checking status
Int64 &requestTagOut, // returned with every request other than check status
Ex_Lob_Error &requestStatus, // returned req status
Int64 &cliError, // err returned by cli call
char *dir, // directory in the storage
LobsStorage storage, // storage type
char *source, // source (memory addr, filename, foreign lob etc)
Int64 sourceLen, // source len (memory len, foreign desc offset etc)
Int64 cursorBytes,
char *cursorId,
LobsOper operation, // LOB operation
LobsSubOper subOperation, // LOB sub operation
Int64 waited, // waited or nowaited
ExLobGlobals *&globPtr, // ptr to the Lob objects.
Int64 transId,
void *blackBox, // black box to be sent to cli
Int32 blackBoxLen, // length of black box
Int64 lobMaxSize = 0, // max size of lob.
Int64 lobMaxChunkMemSize = 0 ,//max length of intermediate mem buffer used to do i/o.
Int64 lobGCLimit = 0, // size at which GC must be triggered
int bufferSize =0,
short replication =0,
int blocksize=0,
Lng32 openType=0
);
class ExLobLock
{
public:
ExLobLock();
~ExLobLock();
void lock();
void unlock();
void wakeOne();
void wakeAll();
void wait();
Int64 waiters() { return waiters_; }
private:
pthread_mutex_t mutex_;
pthread_cond_t workBell_;
Int64 waiters_;
bool bellRang_;
};
///////////////////////////////////////////////////////////////////////////////
// ExLobDescHeader
///////////////////////////////////////////////////////////////////////////////
class ExLobDescHeader
{
public:
ExLobDescHeader(unsigned int size);
ExLobDescHeader() { };
~ExLobDescHeader();
int getFreeDesc() { return freeDesc_; }
void incFreeDesc() { freeDesc_++; }
unsigned int getAvailSize() { return availSize_; }
void decAvailSize(int size) { availSize_ -= size; }
int getDataOffset() { return dataOffset_; }
void incDataOffset(int size) { dataOffset_ += size; }
private:
int freeDesc_;
int delDesc_;
int dataOffset_;
unsigned int availSize_;
};
///////////////////////////////////////////////////////////////////////////////
// ExLobDesc
///////////////////////////////////////////////////////////////////////////////
class ExLobDesc
{
public:
ExLobDesc(int offset, int size, int tail);
ExLobDesc() :
dataOffset_(0),
dataSize_(0),
dataState_(0),
tail_(0),
next_(0),
prev_(0),
nextFree_(0){};
~ExLobDesc();
int getTailDescNum() { return tail_; }
void setTailDescNum(int tail) { tail_ = tail; }
int getNextDescNum() { return next_; }
void setNextDescNum(int next) { next_ = next; }
void setPrevDescNum(int prev) { prev_ = prev; }
int getPrevDescNum() { return prev_; }
int getSize() { return dataSize_; }
void setSize(int size) { dataSize_ = size; }
int getOffset() { return dataOffset_; }
void setOffset(int offset) { dataOffset_ = offset; }
void setDataState(int dataState) { dataState_ = dataState; }
private:
int dataOffset_;
int dataSize_;
int dataState_;
int tail_;
int next_;
int prev_;
int nextFree_;
};
class ExLobInMemoryDescChunksEntry
{
public:
ExLobInMemoryDescChunksEntry():
currentOffset_ (-1),
newOffset_( -1),
descPartnKey_(-1),
descSyskey_(-1),
chunkLen_(-1),
chunkNum_(-1)
{}
ExLobInMemoryDescChunksEntry(Int64 co,Int64 partnKey,Int64 syskey,Int64 chunkLen,Int32 chunkNum)
{
currentOffset_ = co; newOffset_ = -1;descPartnKey_ = partnKey; descSyskey_ = syskey; chunkLen_ = chunkLen; chunkNum_ =chunkNum;
}
void setCurrentOffset(Int64 co) { currentOffset_ = co;}
Int64 getCurrentOffset(){return currentOffset_;}
void setNewOffset(Int64 no) { newOffset_ = no;}
Int64 getNewOffset(){return newOffset_;}
void setDescPartnKey(Int64 dk) { descPartnKey_ = dk;}
Int64 getDescPartnKey(){return descPartnKey_;}
void setSyskey(Int64 sk) { descSyskey_ = sk;}
Int64 getSyskey(){return descSyskey_;}
void setChunkLen(Int64 cl) { chunkLen_ = cl;}
Int64 getChunkLen(){return chunkLen_;}
void setChunkNum(Int32 cn) { chunkNum_ = cn;}
Int32 getChunkNum(){return chunkNum_;}
private:
Int64 currentOffset_;
Int64 newOffset_;
Int64 descPartnKey_;
Int64 descSyskey_;
Int64 chunkLen_;
Int32 chunkNum_;
};
class ExLobCursorBuffer
{
public:
ExLobCursorBuffer() :
data_(NULL),
bytesRemaining_(-1),
bytesUsed_(-1)
{
}
~ExLobCursorBuffer()
{
// do nothing here as this object is copied between lists.
}
public:
char *data_;
Int64 bytesRemaining_;
Int64 bytesUsed_;
};
class ExLobCursor
{
public:
ExLobCursor() { }
~ExLobCursor(){ };
void emptyPrefetchList(ExLobGlobals *lobGlobals);
public:
void *cliInterface_; // passed back and forth for cursors.
Int64 bytesRead_; // bytes already read
Int64 descOffset_; // from this desc offset
Int64 descSize_; // which is of this max size.
bool eod_; // end of file, error 100 from cli.
bool eor_; // end of range
bool eol_; // end of life
LobsCursorType type_; // simple or multiple
Int64 bufMaxSize_; // max size of buffer
Int64 maxBytes_; // bytesLeft to prefetch
Int64 prefetch_; // prefetch or not to prefetch
string name_; // TODO: change to NAString when ExLobCursor is allocated off of an NAHeap
Lng32 currentRange_; // current index of file for multi cursor
Lng32 endRange_; // end index of file for multi cursor
struct hdfsFile_internal* currentFd_; // file pointed by currentRange_
bool currentEod_; // eod of currentFd_
Int64 currentStartOffset_; // start offset in currentFd_
Int64 currentBytesToRead_; // bytes to read in currentFd_
Int64 currentBytesRead_; // bytes read in currentFd_
ExLobLock lock_; // lock for this cursor
typedef list<ExLobCursorBuffer *> bufferList_t;
bufferList_t prefetchBufList_;
struct timespec openTime_;
struct timespec closeTime_;
Int64 bufferHits_;
Int64 bufferMisses_;
};
typedef class ExLobCursor cursor_t;
typedef map<string, cursor_t> lobCursors_t; // handle, offset
typedef map<string, cursor_t>::iterator lobCursors_it;
void cleanupLOBDataDescFiles(const char *hdfsServer, int hdfsPort, const char *hdfsLoc);
///////////////////////////////////////////////////////////////////////////////
// ExLob
///////////////////////////////////////////////////////////////////////////////
class ExLob : public NABasicObject
{
public:
ExLob(NAHeap * heap, ExHdfsScanStats *hdfsAccessStats); // default constructor
virtual ~ExLob(); // default desctructor
void setUseLibHdfs(NABoolean useLibHdfs)
{ useLibHdfs_ = useLibHdfs; }
NABoolean useLibHdfs() { return useLibHdfs_; }
Ex_Lob_Error initialize(const char *lobFile, Ex_Lob_Mode mode, char *dir,
LobsStorage storage, char *hdfsServer, Int64 hdfsPort,
char *lobLocation,
int bufferSize = 0, short replication =0,
int blocksize=0, Int64 lobMaxSize = 0,
ExLobGlobals *lobGlobals = NULL);
Ex_Lob_Error writeDesc(Int64 &sourceLen, char *source, LobsSubOper subOperation, Int64 &descNumOut, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize,Int64 lobGCLimit, char * handleIn, Int32 handleInLen, char *blackBox, Int32 *blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 xnId,void *lobGlobals);
Ex_Lob_Error writeLobData(char *source, Int64 sourceLen,
LobsSubOper subOperation,
Int64 tgtOffset,Int64 &operLen,
Int64 lobMaxMemChunkLen);
Ex_Lob_Error writeDataSimple(char *data, Int64 size, LobsSubOper subOperation, Int64 &operLen,
int bufferSize = 0, short replication =0, int blocksize=0);
Ex_Lob_Error readToMem(char *memAddr, Int64 size, Int64 &operLen,char * handleIn, Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 transId);
Ex_Lob_Error readToFile(char *fileName, Int64 tgtLen,Int64 &operLen,Int64 lobMaxChunkMemLen, Int32 fileflags, char *handleIn,Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 transId);
Ex_Lob_Error readCursor(char *tgt, Int64 tgtSize, char *handleIn, Int32 handleInLen, Int64 &operLen,Int64 transId);
Ex_Lob_Error readCursorData(char *tgt, Int64 tgtSize, cursor_t &cursor, Int64 &operLen,char *handleIn, Int32 handeLenIn,Int64 transId);
Ex_Lob_Error readCursorDataSimple(char *tgt, Int64 tgtSize, cursor_t &cursor, Int64 &operLen);
Ex_Lob_Error readDataCursorSimple(const char *fileName, char *tgt, Int64 tgtSize, Int64 &operLen, ExLobGlobals *lobGlobals);
bool hasNoOpenCursors() { return lobCursors_.empty(); }
Ex_Lob_Error openCursor(char *handleIn, Int32 handleInLen,Int64 transId);
Ex_Lob_Error openDataCursor(const char *fileName, LobsCursorType type, Int64 range,
Int64 bytesLeft, Int64 bufMaxSize, Int64 prefetch, ExLobGlobals *lobGlobals, Int32 *hdfsDetailError = NULL);
Ex_Lob_Error deleteCursor(const char *cursorName, ExLobGlobals *lobGlobals);
Ex_Lob_Error fetchCursor(char *handleIn, Int32 handleLenIn, Int64 &outOffset, Int64 &outSize,NABoolean &isEOD,Int64 transId);
Ex_Lob_Error insertData(char *data, Int64 size, LobsSubOper so,Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize,char *handleIn,Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 &handleOutLen, void *lobGlobals);
Ex_Lob_Error insertSelect(ExLob *srcLobPtr,char *handleIn,Int32 handleInLen, char *source, Int64 sourceLen, Int64 &operLen,Int64 lobMaxSize, Int64 lobMaxChunkMemLen,Int64 lobGCLimit, char *blackBox, Int32 blackBoxLen, char *handleOut, Int32 &handleOutLen,LobsSubOper so,Int64 xnId,void *lobGlobals);
Ex_Lob_Error append(char *data, Int64 size, LobsSubOper so, Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemLen,Int64 lobGCLimit, char *handleIn,Int32 handleInLen, char * handleOut, Int32 &handleOutLen, Int64 xnId,void *lobGlobals);
Ex_Lob_Error update(char *data, Int64 size, LobsSubOper so,Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize,Int64 lobMaxChunkMemLen,Int64 lobGCLimit,char *handleIn,Int32 handleInLen, char * handleOut, Int32 &handleOutLen, Int64 xnId,void *lobGlobals);
Ex_Lob_Error readSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset);
Ex_Lob_Error readHdfsSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset);
Ex_Lob_Error readLocalSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset);
Ex_Lob_Error readExternalSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset);
Ex_Lob_Error statSourceFile(char *srcfile, Int64 &sourceEOF);
Ex_Lob_Error delDesc(char *handleIn, Int32 handleInLen, Int64 transId);
Ex_Lob_Error purgeLob();
Ex_Lob_Error closeFile();
LobInputOutputFileType fileType(char *ioFileName);
Ex_Lob_Error closeCursor(char *handleIn, Int32 handleInLen,Int64 transId);
Ex_Lob_Error closeDataCursorSimple(const char *fileName, ExLobGlobals *lobGlobals);
Ex_Lob_Error doSanityChecks(char *dir, LobsStorage storage,
Int32 handleInLen, Int32 handleOutLen,
Int32 blackBoxLen);
Ex_Lob_Error allocateDesc(unsigned int size, Int64 &descNum, Int64 &dataOffset,Int64 lobMaxSize,Int64 lobMaxChunkMemSize, char *handleIn, Int32 handleInLen,Int64 lobGCLimit, void *lobGlobals);
Ex_Lob_Error readStats(char *buffer);
Ex_Lob_Error initStats();
Ex_Lob_Error insertDesc(Int64 offset, Int64 size, char *handleIn, Int32 handleInLen, char *handleOut, Int32 &handleOutLen, char *blackBox, Int32 blackBoxLen,Int64 xnId,void *lobGlobals) ;
Ex_Lob_Error lockDesc();
Ex_Lob_Error unlockDesc();
const char *getDataFileName() { return lobDataFile_.data(); }
int getErrNo();
Ex_Lob_Error getDesc(ExLobDesc &desc,char * handleIn, Int32 handleInLen, char *blackBox, Int32 *blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 transId);
Ex_Lob_Error writeData(Int64 offset, char *data, Int32 size, Int64 &operLen);
Ex_Lob_Error readDataToMem(char *memAddr, Int64 offset, Int64 size,
Int64 &operLen,char *handleIn, Int32 handleLenIn,
NABoolean multipleChunks, Int64 transId);
Ex_Lob_Error readDataToLocalFile(char *fileName, Int64 offset, Int64 size,Int64 &operLen,Int64 lobMaxChunkMemLen ,Int32 fileFlags,char *handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId);
Ex_Lob_Error readDataToHdfsFile(char *fileName, Int64 offset, Int64 size, Int64 &operLen,Int64 lobMaxChunkMemLen, Int32 fileflags,char *handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId);
Ex_Lob_Error readDataToExternalFile(char *tgtFileName, Int64 offset, Int64 size, Int64 &operLen, Int64 lobMaxChunkMemLen, Int32 fileflags,char *handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId);
Ex_Lob_Error readDataFromFile(char *memAddr, Int64 len, Int64 &operLen);
Ex_Lob_Error compactLobDataFile(ExLobInMemoryDescChunksEntry *dcArray,Int32 numEntries);
Ex_Lob_Error restoreLobDataFile();
Ex_Lob_Error purgeBackupLobDataFile();
// dirPath: path to needed directory (includes directory name)
// modTS is the latest timestamp on any file/dir under dirPath.
// This method validates that current modTS is not greater then input modTS.
// On return, failedModTS contains current timestamp that caused mismatch.
// Return: LOB_OPER_OK, if passes. LOB_DATA_MOD_CHECK_ERROR, if fails.
Ex_Lob_Error dataModCheck(
char * dirPath,
Int64 modTS,
Lng32 numOfPartLevels,
ExLobGlobals *lobGlobals,
Int64 &failedModTS,
char *failedLocBuf,
Int32 *failedLocBufLen);
Ex_Lob_Error emptyDirectory(char* dirPath, ExLobGlobals* lobGlobals);
ExHdfsScanStats *getStats() { return stats_; }
NAHeap *getLobGlobalHeap() { return lobGlobalHeap_;}
Ex_Lob_Error getLength(char *handleIn, Int32 handleInLen,Int64 &outLobLen,LobsSubOper so, Int64 transId);
Ex_Lob_Error getOffset(char *handleIn, Int32 handleInLen,Int64 &outOffset,LobsSubOper so, Int64 transId);
Ex_Lob_Error getFileName(char *handleIn, Int32 handleInLen, char *outFileName, Int32 &outFileLen, LobsSubOper so, Int64 transId);
// ExLobRequest *getRequest() { return &request_; }
//The next 2 functions are not active at this point. They serve as an example
//on how to send requests across to the mxlobsrvr process from the master
//process
Ex_Lob_Error getDesc(ExLobRequest *request);
Ex_Lob_Error sendReqToLobServer() ;
public:
NAString lobDataFile_;
lobCursors_t lobCursors_;
ExLobLock lobCursorLock_;
LobsStorage storage_;
string lobStorageLocation_; // lob data directory
char *hdfsServer_;
Int64 hdfsPort_;
hdfsFS fs_;
hdfsFile fdData_;
int openFlags_;
ExHdfsScanStats *stats_;
bool prefetchQueued_;
NAHeap *lobGlobalHeap_;
NABoolean lobTrace_;
NABoolean useLibHdfs_;
HdfsClient *hdfsClient_;
};
typedef map<string, ExLob *> lobMap_t;
typedef map<string, ExLob *>::iterator lobMap_it;
///////////////////////////////////////////////////////////////////////////////
// ExLobHdfsRequest
///////////////////////////////////////////////////////////////////////////////
class ExLobHdfsRequest
{
public:
ExLobHdfsRequest(LobsHdfsRequestType reqType, ExLobCursor *cursor);
ExLobHdfsRequest(LobsHdfsRequestType reqType, ExLob *lobPtr, ExLobCursor *cursor);
ExLobHdfsRequest(LobsHdfsRequestType reqType);
~ExLobHdfsRequest();
bool isShutDown() { return reqType_ == Lob_Hdfs_Shutdown; }
public :
LobsHdfsRequestType reqType_;
ExLob *lobPtr_;
ExLobCursor *cursor_;
char *buffer_;
int size_;
Ex_Lob_Error error_;
};
class ExLobPreOpen : public NABasicObject
{
public :
ExLobPreOpen(ExLob *lobPtr, const char *cursorName, Int64 range,
Int64 bufMaxSize, Int64 maxBytes, Int64 waited,
NAHeap * heap)
: lobPtr_(lobPtr),
cursorName_(cursorName,heap),
range_(range),
bufMaxSize_(bufMaxSize),
maxBytes_(maxBytes),
waited_(waited)
{
// nothing else to do
}
virtual ~ExLobPreOpen()
{
// nothing else to do
};
public :
ExLob *lobPtr_;
NAString cursorName_;
Int64 range_;
Int64 bufMaxSize_;
Int64 maxBytes_;
Int64 waited_;
};
void lobDebugInfo(const char *logMessage,Int32 errorcode,
Int32 line, NABoolean lobTrace);
typedef list<ExLobPreOpen *> preOpenList_t;
typedef list<ExLobPreOpen *>::iterator preOpenList_it;
class ExLobGlobals
{
public :
ExLobGlobals(NAHeap *lobHeap);
~ExLobGlobals();
Ex_Lob_Error initialize();
lobMap_t * getLobMap() { return lobMap_; }
Ex_Lob_Error getLobPtr(char *lobName, ExLob *& lobPtr);
Ex_Lob_Error delLobPtr(char *lobName);
hdfsFS getHdfsFs() { return fs_; }
void setHdfsFs(hdfsFS fs) { fs_ = fs; }
Ex_Lob_Error startWorkerThreads();
void doWorkInThread();
ExLobHdfsRequest* getHdfsRequest();
Ex_Lob_Error enqueueRequest(ExLobHdfsRequest *request);
Ex_Lob_Error enqueuePrefetchRequest(ExLob *lobPtr, ExLobCursor *cursor);
Ex_Lob_Error enqueueShutdownRequest();
Ex_Lob_Error performRequest(ExLobHdfsRequest *request);
Ex_Lob_Error addToPreOpenList(ExLobPreOpen *preOpenObj);
Ex_Lob_Error processPreOpens();
NABoolean isCliInitialized()
{
return isCliInitialized_;
}
void setCliInitialized()
{
isCliInitialized_ = TRUE;
}
void setHeap(void * heap)
{
heap_ = (NAHeap *) heap;
}
NAHeap * getHeap()
{
return heap_;
}
NABoolean useLibHdfs() { return useLibHdfs_; }
void setUseLibHdfs(NABoolean useLibHdfs)
{ useLibHdfs_ = useLibHdfs; }
void traceMessage(const char *logMessage, ExLobCursor *c, int line);
public :
lobMap_t *lobMap_;
hdfsFS fs_;
pthread_t threadId_[NUM_WORKER_THREADS];
typedef list<ExLobHdfsRequest *> reqList_t;
reqList_t reqQueue_;
ExLobLock reqQueueLock_;
preOpenList_t preOpenList_;
ExLobLock preOpenListLock_;
typedef list<ExLobCursorBuffer *> bufferList_t;
bufferList_t postfetchBufList_;
ExLobLock postfetchBufListLock_;
NABoolean isCliInitialized_;
NABoolean isHive_;
FILE *threadTraceFile_;
NAHeap *heap_;
NABoolean lobTrace_;
long numWorkerThreads_;
NABoolean useLibHdfs_;
};
#endif