blob: bb9b580688039abc0f1ea6edc906d54e6e6378e4 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: EXLOBaccess.cpp
* Description: class to store and retrieve LOB data.
*
*
* Created: 10/29/2012
* Language: C++
*
*
*
*
*****************************************************************************
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <malloc.h>
#include <string>
#include <errno.h>
#include <signal.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <sys/time.h>
#include "hdfs.h"
#include "jni.h"
#include "ExpLOBstats.h"
#include "ExpLOBaccess.h"
#include "ExpLOBinterface.h"
#include "ExpLOBexternal.h"
#include "ExpLOB.h"
#include "NAVersionedObject.h"
#include "ComQueue.h"
#include "QRLogger.h"
#include "NAMemory.h"
#include "HdfsClient_JNI.h"
#include <seabed/ms.h>
#include <seabed/fserr.h>
#include <curl/curl.h>
#include <../../sqf/src/seabed/src/trans.h>
extern int ms_transid_get(bool pv_supp,
bool pv_trace,
MS_Mon_Transid_Type *pp_transid,
MS_Mon_Transseq_Type *pp_startid);
extern int ms_transid_reinstate(MS_Mon_Transid_Type, MS_Mon_Transseq_Type);
// short LobServerFNum;
SB_Phandle_Type serverPhandle;
ExLob::ExLob(NAHeap * heap, ExHdfsScanStats *hdfsAccessStats) :
lobDataFile_(heap),
storage_(Lob_Invalid_Storage),
lobStorageLocation_(string()),
lobGlobalHeap_(NULL),
fs_(NULL),
fdData_(NULL),
openFlags_(0),
stats_(hdfsAccessStats),
lobTrace_(FALSE),
useLibHdfs_(FALSE),
hdfsClient_(NULL)
{
// nothing else to do
}
ExLob::~ExLob()
{
if (fdData_) {
hdfsCloseFile(fs_, fdData_);
fdData_ = NULL;
}
if (hdfsClient_ != NULL) {
HdfsClient::deleteInstance(hdfsClient_);
hdfsClient_ = NULL;
}
}
Ex_Lob_Error ExLob::initialize(const char *lobFile, Ex_Lob_Mode mode,
char *lobStorageLocation,
LobsStorage storage,
char *hdfsServer, Int64 hdfsPort,
char *lobLocation,
int bufferSize , short replication ,
int blockSize, Int64 lobMaxSize, ExLobGlobals *lobGlobals)
{
int openFlags;
struct timespec startTime;
struct timespec endTime;
Int64 secs, nsecs, totalnsecs;
useLibHdfs_ = lobGlobals->useLibHdfs_;
if (lobStorageLocation)
{
if (lobStorageLocation_.empty())
{
lobStorageLocation_ = string(lobStorageLocation);
}
if (lobFile)
{
lobDataFile_ = lobStorageLocation;
lobDataFile_ += "/";
lobDataFile_ += lobFile;
}
}
else
{
if (lobFile)
lobDataFile_ = lobFile;
}
if (storage_ != Lob_Invalid_Storage)
{
return LOB_INIT_ERROR;
} else
{
storage_ = storage;
}
hdfsServer_ = hdfsServer;
hdfsPort_ = hdfsPort;
// lobLocation_ = lobLocation;
clock_gettime(CLOCK_MONOTONIC, &startTime);
lobGlobalHeap_ = lobGlobals->getHeap();
HDFS_Client_RetCode hdfsClientRetcode;
if (useLibHdfs_) {
if (lobGlobals->getHdfsFs() == NULL)
return LOB_HDFS_CONNECT_ERROR;
else
fs_ = lobGlobals->getHdfsFs();
hdfsClient_ = NULL;
}
else {
hdfsClient_ = HdfsClient::newInstance(lobGlobalHeap_, stats_, hdfsClientRetcode);
fs_ = NULL;
if (hdfsClient_ == NULL)
return LOB_HDFS_CONNECT_ERROR;
}
clock_gettime(CLOCK_MONOTONIC, &endTime);
secs = endTime.tv_sec - startTime.tv_sec;
nsecs = endTime.tv_nsec - startTime.tv_nsec;
if (nsecs < 0)
{
secs--;
nsecs += NUM_NSECS_IN_SEC;
}
totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs;
if (! useLibHdfs_) {
if (mode == EX_LOB_CREATE) {
hdfsClientRetcode = hdfsClient_->hdfsCreate(lobDataFile_.data(), FALSE, FALSE);
if (hdfsClientRetcode != HDFS_CLIENT_OK)
return LOB_DATA_FILE_CREATE_ERROR;
}
hdfsClientRetcode = hdfsClient_->hdfsOpen(lobDataFile_.data(), FALSE);
if (hdfsClientRetcode != HDFS_CLIENT_OK)
return LOB_DATA_FILE_OPEN_ERROR;
fdData_ = NULL;
}
else
{
if (mode == EX_LOB_CREATE)
{
// check if file is already created
hdfsFileInfo *fInfo = hdfsGetPathInfo(fs_, lobDataFile_.data());
if (fInfo != NULL)
{
hdfsFreeFileInfo(fInfo, 1);
return LOB_DATA_FILE_CREATE_ERROR;
}
openFlags = O_WRONLY | O_CREAT;
fdData_ = hdfsOpenFile(fs_, lobDataFile_.data(), openFlags, bufferSize, replication, blockSize);
if (!fdData_)
{
return LOB_DATA_FILE_CREATE_ERROR;
}
hdfsCloseFile(fs_, fdData_);
fdData_ = NULL;
}
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::fetchCursor(char *handleIn, Int32 handleLenIn, Int64 &outOffset, Int64 &outSize,NABoolean &isEOD, Int64 transId)
{
Ex_Lob_Error err = LOB_OPER_OK;
Int64 dummyParam;
int cliErr=0;
Int64 offset = 0;
Int64 size = 0;
lobCursors_it it = lobCursors_.find(string(handleIn, handleLenIn));
char logBuf[4096];
lobDebugInfo("In ExLob::fetchCursor",0,__LINE__,lobTrace_);
char *blackBox = new(getLobGlobalHeap()) char[MAX_LOB_FILE_NAME_LEN+6];
Int32 blackBoxLen = 0;
if (it == lobCursors_.end())
{
return LOB_CURSOR_NOT_OPEN;
}
void *cliInterface = it->second.cliInterface_;
cliErr = SQL_EXEC_LOBcliInterface(handleIn, handleLenIn,
blackBox, &blackBoxLen,
(char *)&dummyParam, (Lng32 *)&dummyParam,
LOB_CLI_SELECT_FETCH, LOB_CLI_ExecImmed,
&offset, &size,
&dummyParam, &dummyParam,
&cliInterface,
transId,lobTrace_);
if (cliErr <0 )
{
str_sprintf(logBuf, "LOB_CLI_SELECT_FETCH Returned cli error %d",cliErr);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
err = LOB_DESC_READ_ERROR;
return err;
}
if (cliErr == 100 )
{
isEOD= TRUE;
}
else
{
if (blackBox && blackBoxLen >0 )
{
// we have received the external data file name from the descriptor table
// replace the contents of the lobDataFile with this name
char temp[blackBoxLen+1];
str_cpy_and_null(temp, blackBox, blackBoxLen, '\0', '0', TRUE);
lobDataFile_ = temp;
outOffset = offset;
err=statSourceFile(temp,outSize);
if (err != LOB_OPER_OK)
return err;
}
else
{
outOffset = offset;
outSize = size;
}
}
str_sprintf(logBuf, " Returned after ::fetchCursor %ld,%ld",outOffset,outSize);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
return err;
}
Ex_Lob_Error ExLob::getDesc(ExLobDesc &desc,char * handleIn, Int32 handleInLen, char *blackBox, Int32 *blackBoxLen, char *handleOut, Int32 &handleOutLen, Int64 transId)
{
Ex_Lob_Error err = LOB_OPER_OK;
NABoolean multipleChunks = FALSE;
Int32 clierr = 0;
Int64 size,offset,dummyParam = 0;
char logBuf[4096];
lobDebugInfo("In ExLob::getDesc",0,__LINE__,lobTrace_);
clierr = SQL_EXEC_LOBcliInterface(handleIn,
handleInLen,
blackBox, blackBoxLen,
handleOut, &handleOutLen,
LOB_CLI_SELECT_UNIQUE, LOB_CLI_ExecImmed,
&offset, &size,
&dummyParam, &dummyParam,
0,
transId,lobTrace_);
if (clierr < 0)
return LOB_DESC_READ_ERROR;
desc.setOffset(offset);
desc.setSize(size);
str_sprintf(logBuf,"After Cli LOB_CLI_SELECT_UNIQUE:descOffset:%ld, descSize: %ld",offset,size);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
return err;
}
Ex_Lob_Error ExLob::writeData(Int64 offset, char *data, Int32 size, Int64 &operLen)
{
Ex_Lob_Error err;
HDFS_Client_RetCode hdfsClientRetcode = HDFS_CLIENT_OK;
Int64 writeOffset;
if (! useLibHdfs_ ) {
writeOffset = hdfsClient_->hdfsWriteImmediate(data, size, hdfsClientRetcode);
if (hdfsClientRetcode != HDFS_CLIENT_OK)
return LOB_DATA_WRITE_ERROR;
operLen = size;
return LOB_OPER_OK;
}
lobDebugInfo("In ExLob::writeData",0,__LINE__,lobTrace_);
if (!fdData_ || (openFlags_ != (O_WRONLY | O_APPEND))) // file is not open for write
{
// get file info
hdfsFileInfo *fInfo = hdfsGetPathInfo(fs_, lobDataFile_.data());
if (fInfo == NULL) {
return LOB_DATA_FILE_NOT_FOUND_ERROR;
}
if (fdData_)
{
hdfsCloseFile(fs_, fdData_);
fdData_=NULL;
}
openFlags_ = O_WRONLY | O_APPEND;
fdData_ = hdfsOpenFile(fs_, lobDataFile_.data(), openFlags_, 0, 0, 0);
}
if ((operLen = hdfsWrite(fs_, fdData_, data, size)) == -1) {
return LOB_DATA_WRITE_ERROR;
}
if (hdfsFlush(fs_, fdData_)) {
return LOB_DATA_FLUSH_ERROR;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::writeDataSimple(char *data, Int64 size, LobsSubOper subOperation, Int64 &operLen,
int bufferSize , short replication , int blockSize)
{
Ex_Lob_Error err;
if (! useLibHdfs_)
return writeData(0,data, size, operLen);
if (!fdData_ || (openFlags_ != (O_WRONLY | O_APPEND))) // file is not open for write
{
// get file info
hdfsFileInfo *fInfo = hdfsGetPathInfo(fs_, lobDataFile_.data());
if (fInfo == NULL) {
return LOB_DATA_FILE_NOT_FOUND_ERROR;
} else {
// file exists, check the size
if (fInfo->mSize != 0) {
hdfsFreeFileInfo(fInfo, 1);
return LOB_DATA_FILE_NOT_EMPTY_ERROR;
}
}
hdfsCloseFile(fs_, fdData_);
fdData_=NULL;
openFlags_ = O_WRONLY | O_APPEND ;
fdData_ = hdfsOpenFile(fs_, lobDataFile_.data(), openFlags_, bufferSize, replication, blockSize);
if (!fdData_) {
openFlags_ = -1;
return LOB_DATA_FILE_OPEN_ERROR;
}
}
if (hdfsWrite(fs_, fdData_, data, size) == -1) {
return LOB_DATA_WRITE_ERROR;
}
if (hdfsFlush(fs_, fdData_)) {
return LOB_DATA_FLUSH_ERROR;
}
operLen = size;
return LOB_OPER_OK;
}
// numOfPartLevels: 0, if not partitioned
// N, number of partitioning cols
// failedModTS: timestamp value that caused the mismatch
Ex_Lob_Error ExLob::dataModCheck(
char * dirPath,
Int64 inputModTS,
Lng32 numOfPartLevels,
ExLobGlobals *lobGlobals,
Int64 &failedModTS,
char *failedLocBuf,
Int32 *failedLocBufLen)
{
if (inputModTS <= 0)
return LOB_OPER_OK;
Ex_Lob_Error result = LOB_OPER_OK;
HDFS_Client_RetCode rc;
Int64 currModTS;
failedModTS = -1;
// libhdfs returns a second-resolution timestamp,
// get a millisecond-resolution timestamp via JNI
rc = HdfsClient::getHiveTableMaxModificationTs(currModTS,
dirPath,
numOfPartLevels);
// check for errors and timestamp mismatches
if (rc != HDFS_CLIENT_OK || currModTS <= 0)
{
result = LOB_DATA_READ_ERROR;
}
else if (currModTS > inputModTS)
{
result = LOB_DATA_MOD_CHECK_ERROR;
failedModTS = currModTS;
}
if (result != LOB_OPER_OK && failedLocBuf && failedLocBufLen)
{
// sorry, we lost the exact location for partitioned
// files, user needs to search for him/herself
Lng32 failedFileLen = strlen(dirPath);
Lng32 copyLen = (failedFileLen > (*failedLocBufLen-1)
? (*failedLocBufLen-1) : failedFileLen);
str_cpy_and_null(failedLocBuf, dirPath, copyLen,
'\0', ' ', TRUE);
*failedLocBufLen = copyLen;
}
return result;
}
Ex_Lob_Error ExLob::emptyDirectory(char *dirPath,
ExLobGlobals *lobGlobals)
{
int retcode = 0;
HDFS_Client_RetCode hdfsClientRetcode;
if (! useLibHdfs_) {
hdfsClientRetcode = HdfsClient::hdfsDeletePath(dirPath);
if (hdfsClientRetcode != HDFS_CLIENT_OK)
return LOB_DATA_FILE_DELETE_ERROR;
return LOB_OPER_OK;
}
hdfsFileInfo *fileInfos = hdfsGetPathInfo(fs_, dirPath);
if (fileInfos == NULL)
{
return LOB_DIR_NAME_ERROR;
}
Lng32 currNumFilesInDir = 0;
fileInfos = hdfsListDirectory(fs_, dirPath, &currNumFilesInDir);
if ((currNumFilesInDir > 0) && (fileInfos == NULL))
{
return LOB_DATA_FILE_NOT_FOUND_ERROR;
}
if ((currNumFilesInDir == 0) && (fileInfos == NULL)) // empty directory
{
return LOB_OPER_OK;
}
// delete all files in this directory
NABoolean error = FALSE;
for (Lng32 i = 0; i < currNumFilesInDir; i++)
{
hdfsFileInfo &fileInfo = fileInfos[i];
if (fileInfo.mKind == kObjectKindFile)
{
retcode = hdfsDelete(fs_, fileInfo.mName, 0);
if (retcode != 0)
error = TRUE;
}
} // for
// recursively delete all files in sub-dirs
for (Lng32 i = 0; i < currNumFilesInDir; i++)
{
hdfsFileInfo &fileInfo = fileInfos[i];
if (fileInfo.mKind == kObjectKindDirectory)
{
retcode = emptyDirectory(fileInfo.mName, lobGlobals);
if (retcode != LOB_OPER_OK)
error = TRUE;
}
} // for
if (fileInfos)
{
hdfsFreeFileInfo(fileInfos, currNumFilesInDir);
}
if (error)
return LOB_DATA_FILE_DELETE_ERROR;
return LOB_OPER_OK;
}
struct MemoryStruct {
char *memory;
size_t size;
NAHeap *heap;
};
// callback for writing from http file to memory while dynamically growing the size.
static size_t
WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp)
{
size_t realsize = size * nmemb;
struct MemoryStruct *mem = (struct MemoryStruct *)userp;
mem->memory = (char *)(mem->heap)->allocateMemory(mem->size + realsize + 1 );
if(mem->memory == NULL) {
/* out of memory! */
return 0;
}
memcpy(&(mem->memory[mem->size]), contents, realsize);
mem->size += realsize;
mem->memory[mem->size] = 0;
return realsize;
}
//Call back for retrieving http file header info
static size_t header_throw_away(void *ptr, size_t size, size_t nmemb, void *data)
{
/* we are not interested in the headers itself,
so we only return the size we would have saved ... */
return (size_t)(size * nmemb);
}
Ex_Lob_Error ExLob::statSourceFile(char *srcfile, Int64 &sourceEOF)
{
char logBuf[4096];
lobDebugInfo("In ExLob::statSourceFile",0,__LINE__,lobTrace_);
// check if the source file is a hdfs file or from local file system.
LobInputOutputFileType srcType = fileType(srcfile);
HDFS_Client_RetCode hdfsClientRetcode;
if (srcType == HDFS_FILE)
{
if (! useLibHdfs_) {
sourceEOF = HdfsClient::hdfsSize(srcfile, hdfsClientRetcode);
if (hdfsClientRetcode != HDFS_CLIENT_OK)
return LOB_SOURCE_FILE_OPEN_ERROR;
ex_assert(sourceEOF >= 0, "Offset is -1 possibly due to path being directory");
}
else {
hdfsFile sourceFile = hdfsOpenFile(fs_,srcfile,O_RDONLY,0,0,0);
if (!sourceFile)
return LOB_SOURCE_FILE_OPEN_ERROR;
hdfsFileInfo *sourceFileInfo = hdfsGetPathInfo(fs_,srcfile);
// get EOD from source hdfs file.
if (sourceFileInfo)
sourceEOF = sourceFileInfo->mSize;
else
return LOB_SOURCE_FILE_OPEN_ERROR;
str_sprintf(logBuf,"Returning EOF of %ld for file %s", sourceEOF,srcfile);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
}
}
else if (srcType == LOCAL_FILE)
{
int openFlags = O_RDONLY;
int fdSrcFile = open(srcfile, openFlags);
if (fdSrcFile < 0) {
return LOB_SOURCE_FILE_OPEN_ERROR;
}
if (flock(fdSrcFile, LOCK_EX) == -1) {
return LOB_SOURCE_FILE_LOCK_ERROR;
}
struct stat statbuf;
if (stat(srcfile, &statbuf) != 0) {
return LOB_SOURCE_FILE_STAT_ERROR;
}
sourceEOF = statbuf.st_size;
flock(fdSrcFile, LOCK_UN);
close(fdSrcFile);
str_sprintf(logBuf,"Returning EOF of %ld for file %s", sourceEOF,srcfile);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
}
else if (srcType == CURL_FILE)
{
// This is an http/ftp file. Use curl interface to determine size
CURL *curl;
CURLcode res;
const time_t filetime = 0;
double filesize = 0;
curl = curl_easy_init();
if(curl) {
curl_easy_setopt(curl, CURLOPT_URL, srcfile);
/* find file size from header */
/* No download if the file */
curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
/* Ask for filetime */
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(curl, CURLOPT_FILETIME, 1L);
/* No header output: TODO 14.1 http-style HEAD output for ftp */
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION,header_throw_away);
curl_easy_setopt(curl, CURLOPT_HEADER, 0L);
res = curl_easy_perform(curl);
if(CURLE_OK == res) {
res = curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &filesize);
if (res == CURLE_OK)
{
Int64 temp_fs = 0;
temp_fs = filesize;
sourceEOF = temp_fs;
}
else
return LOB_SOURCE_FILE_STAT_ERROR;
}
curl_easy_cleanup(curl);
}
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::readSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset)
{
Ex_Lob_Error lobErr = LOB_OPER_OK;
// check if the source file is a hdfs file or from local file system.
LobInputOutputFileType srcType = fileType(srcfile);
if (srcType == HDFS_FILE)
{
lobErr = readHdfsSourceFile(srcfile, fileData, size, offset);
}
else if (srcType == LOCAL_FILE)
{
lobErr = readLocalSourceFile(srcfile, fileData, size, offset);
}
else if(srcType == CURL_FILE)
{
lobErr = readExternalSourceFile((char *)srcfile, fileData, size, offset);
}
else
return LOB_SOURCE_FILE_OPEN_ERROR;
return lobErr;
}
Ex_Lob_Error ExLob::readHdfsSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset)
{
char logBuf[4096];
str_sprintf(logBuf,"Calling ::readHdfsSourceFile: %s Offset:%ld, Size: %d",srcfile, offset,size);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
HDFS_Client_RetCode hdfsClientRetcode;
Int64 bytesRead;
if (!useLibHdfs_) {
HdfsClient *srcHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode);
ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error");
hdfsClientRetcode = srcHdfsClient->hdfsOpen(srcfile, FALSE);
if (hdfsClientRetcode != HDFS_CLIENT_OK) {
HdfsClient::deleteInstance(srcHdfsClient);
return LOB_SOURCE_FILE_OPEN_ERROR;
}
fileData = (char *) (getLobGlobalHeap())->allocateMemory(size);
if (fileData == (char *)-1) {
HdfsClient::deleteInstance(srcHdfsClient);
return LOB_SOURCE_DATA_ALLOC_ERROR;
}
bytesRead = srcHdfsClient->hdfsRead(offset, fileData, size, hdfsClientRetcode);
if (hdfsClientRetcode != HDFS_CLIENT_OK) {
HdfsClient::deleteInstance(srcHdfsClient);
getLobGlobalHeap()->deallocateMemory(fileData);
return LOB_SOURCE_FILE_READ_ERROR;
}
size = bytesRead;
// Memory growth/leak
HdfsClient::deleteInstance(srcHdfsClient);
return LOB_OPER_OK;
}
int openFlags = O_RDONLY;
hdfsFile fdSrcFile = hdfsOpenFile(fs_,srcfile, openFlags,0,0,0);
if (fdSrcFile == NULL)
return LOB_SOURCE_FILE_OPEN_ERROR;
fileData = (char *) (getLobGlobalHeap())->allocateMemory(size);
if (fileData == (char *)-1) {
return LOB_SOURCE_DATA_ALLOC_ERROR;
}
if (hdfsPread(fs_,fdSrcFile, offset,fileData, size) == -1) {
hdfsCloseFile(fs_,fdSrcFile);
getLobGlobalHeap()->deallocateMemory(fileData);
fileData = NULL;
return LOB_SOURCE_FILE_READ_ERROR;
}
hdfsCloseFile(fs_,fdSrcFile);
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::readLocalSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset)
{
char logBuf[4096];
str_sprintf(logBuf,"Calling ::readLocalSourceFile: %s Offset:%ld, Size: %d",srcfile, offset,size);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
int openFlags = O_RDONLY;
int fdSrcFile = open(srcfile, openFlags);
if (fdSrcFile < 0 ) {
return LOB_SOURCE_FILE_OPEN_ERROR;
}
if (flock(fdSrcFile, LOCK_EX) == -1) {
return LOB_SOURCE_FILE_LOCK_ERROR;
}
struct stat statbuf;
if (stat(srcfile, &statbuf) != 0) {
return LOB_SOURCE_FILE_STAT_ERROR;
}
fileData = (char *) (getLobGlobalHeap())->allocateMemory(size);
if (fileData == (char *)-1) {
return LOB_SOURCE_DATA_ALLOC_ERROR;
}
if (pread(fdSrcFile, fileData, size, offset) == -1) {
close(fdSrcFile);
getLobGlobalHeap()->deallocateMemory(fileData);
fileData = NULL;
return LOB_SOURCE_FILE_READ_ERROR;
}
flock(fdSrcFile, LOCK_UN);
close(fdSrcFile);
return LOB_OPER_OK ;
}
Ex_Lob_Error ExLob::readExternalSourceFile(char *srcfile, char *&fileData, Int32 &size,Int64 offset)
{
CURL *curl;
CURLcode res;
struct MemoryStruct chunk;
chunk.memory = (char *) (getLobGlobalHeap())->allocateMemory(size);
chunk.size = 0; /* no data at this point */
chunk.heap = getLobGlobalHeap();
curl = curl_easy_init();
if(curl) {
curl_easy_setopt(curl, CURLOPT_URL, srcfile);
/* send all data to this function */
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
/* we pass our 'chunk' struct to the callback function */
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&chunk);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
res = curl_easy_perform(curl);
curl_easy_cleanup(curl);
fileData = chunk.memory;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::getLength(char *handleIn, Int32 handleInLen,Int64 &outLobLen,LobsSubOper so, Int64 transId)
{
char logBuf[4096];
Int32 cliErr = 0;
Ex_Lob_Error err=LOB_OPER_OK;
char *blackBox = new(getLobGlobalHeap()) char[MAX_LOB_FILE_NAME_LEN+6];
Int32 blackBoxLen = 0;
Int64 dummy = 0;
Int32 dummy2 = 0;
if (so != Lob_External_File)
{
cliErr = SQL_EXEC_LOBcliInterface(handleIn, handleInLen,NULL,NULL,NULL,NULL,LOB_CLI_SELECT_LOBLENGTH,LOB_CLI_ExecImmed, 0,&outLobLen, 0, 0,0,transId,lobTrace_);
if (cliErr < 0 ) {
str_sprintf(logBuf,"CLI SELECT_LOBLENGTH returned error %d",cliErr);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
return LOB_DESC_READ_ERROR;
}
}
else
{
//Get the lob external filename from the descriptor file and get the length of the file
cliErr = SQL_EXEC_LOBcliInterface(handleIn,
handleInLen,
blackBox, &blackBoxLen,
NULL, 0,
LOB_CLI_SELECT_UNIQUE, LOB_CLI_ExecImmed,
&dummy, &dummy,
&dummy, &dummy,
0,
transId,lobTrace_);
if (cliErr < 0 ) {
str_sprintf(logBuf,"CLI SELECT_LOBLENGTH returned error %d",cliErr);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
return LOB_DESC_READ_ERROR;
}
if (blackBox && blackBoxLen >0 )
{
// we have received the external data file name from the
// descriptor table
char temp[blackBoxLen+1];
str_cpy_and_null(temp, blackBox, blackBoxLen, '\0', '0', TRUE);
err=statSourceFile(temp,outLobLen);
if (err != LOB_OPER_OK)
return err;
}
}
return err;
}
Ex_Lob_Error ExLob::getOffset(char *handleIn, Int32 handleInLen,Int64 &outLobOffset,LobsSubOper so, Int64 transId)
{
char logBuf[4096];
Int32 cliErr = 0;
Ex_Lob_Error err=LOB_OPER_OK;
Int64 dummy = 0;
Int32 dummy2 = 0;
if (so != Lob_External_File)
{
cliErr = SQL_EXEC_LOBcliInterface(handleIn, handleInLen,NULL,NULL,NULL,NULL,LOB_CLI_SELECT_LOBOFFSET,LOB_CLI_ExecImmed,&outLobOffset,0, 0, 0,0,transId,lobTrace_);
if (cliErr < 0 ) {
str_sprintf(logBuf,"CLI SELECT_LOBOFFSET returned error %d",cliErr);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
return LOB_DESC_READ_ERROR;
}
}
return err;
}
Ex_Lob_Error ExLob::getFileName(char *handleIn, Int32 handleInLen, char *outFileName, Int32 &outFileLen , LobsSubOper so, Int64 transId)
{
char logBuf[4096];
Int32 cliErr = 0;
Ex_Lob_Error err=LOB_OPER_OK;
Int64 dummy = 0;
Int32 dummy2 = 0;
if (so != Lob_External_File)
{
//Derive the filename from the LOB handle and return
str_cpy_all(outFileName, (char *)lobDataFile_.data(),lobDataFile_.length());
}
else
{
//Get the lob external filename from the descriptor file
cliErr = SQL_EXEC_LOBcliInterface(handleIn,
handleInLen,
(char *)outFileName, &outFileLen,
NULL, 0,
LOB_CLI_SELECT_UNIQUE, LOB_CLI_ExecImmed,
&dummy, &dummy,
&dummy, &dummy,
0,
transId,lobTrace_);
if (cliErr < 0 ) {
str_sprintf(logBuf,"CLI SELECT_FILENAME returned error %d",cliErr);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
return LOB_DESC_READ_ERROR;
}
}
return err;
}
Ex_Lob_Error ExLob::writeDesc(Int64 &sourceLen, char *source, LobsSubOper subOper, Int64 &descNumOut, Int64 &operLen, Int64 lobMaxSize,Int64 lobMaxChunkMemSize,Int64 lobGCLimit, char * handleIn, Int32 handleInLen, char *blackBox, Int32 *blackBoxLen, char *handleOut, Int32 &handleOutLen, Int64 xnId, void *lobGlobals)
{
Ex_Lob_Error err=LOB_OPER_OK;
Int64 dataOffset = 0;
Int64 outDescPartnKey = 0;
Int64 outDescSyskey = 0;
Int32 clierr = 0;
char logBuf[4096];
lobDebugInfo("In ExLob::writeDesc",0,__LINE__,lobTrace_);
//if external lob input, make sure it resides in hdfs
if (subOper == Lob_External_File)
{
LobInputOutputFileType srcFileType = fileType(source);
if (srcFileType != HDFS_FILE)
return LOB_SOURCE_FILE_READ_ERROR;
//Check if external file exists
Int64 sourceEOD = 0;
if (statSourceFile(source, sourceEOD) != LOB_OPER_OK)
return LOB_SOURCE_FILE_READ_ERROR;
}
// Calculate sourceLen for each subOper.
if ((subOper == Lob_File))
{
err = statSourceFile(source, sourceLen);
if (err != LOB_OPER_OK)
return err;
}
if (sourceLen < 0 || sourceLen > lobMaxSize)
{
return LOB_MAX_LIMIT_ERROR; //exceeded the size of the max lob size
}
if (subOper != Lob_External_File)
{
lobDebugInfo("Calling ExLob::allocateDesc",0,__LINE__,lobTrace_);
err = allocateDesc((unsigned int)sourceLen, descNumOut, dataOffset, lobMaxSize, lobMaxChunkMemSize,handleIn, handleInLen, lobGCLimit,lobGlobals);
}
operLen = 0;
if (err != LOB_OPER_OK)
return err;
lobDebugInfo("Calling cli LOB_CLI_INSERT",0,__LINE__,lobTrace_);
clierr = SQL_EXEC_LOBcliInterface(handleIn,
handleInLen,
blackBox,
blackBoxLen,
handleOut, &handleOutLen,
LOB_CLI_INSERT, LOB_CLI_ExecImmed,
&dataOffset, &sourceLen,
&outDescPartnKey, &outDescSyskey,
0,
xnId,lobTrace_);
if (clierr < 0 ) {
str_sprintf(logBuf,"CLI LOB_CLI_INSERT returned error %d",clierr);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
return LOB_DESC_WRITE_ERROR;
}
return err;
}
Ex_Lob_Error ExLob::insertDesc(Int64 offset, Int64 size, char *handleIn, Int32 handleInLen, char *handleOut, Int32 &handleOutLen, char *blackBox, Int32 blackBoxLen,Int64 xnId, void *lobGlobals)
{
Lng32 clierr;
Int64 dummyParam;
Int64 outDescSyskey = 0;
Int64 outDescPartnKey = 0;
handleOutLen = 0;
Int32 chunkNum = 1;
NABoolean foundUnused = FALSE;
char logBuf[4096];
lobDebugInfo("In ExLob::InsertDesc",0,__LINE__,lobTrace_);
str_sprintf(logBuf,"Calling Cli LOB_CLI_INSERT: Offset:%ld, Size: %ld",
offset,size);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
clierr = SQL_EXEC_LOBcliInterface(handleIn,
handleInLen,
NULL, &chunkNum,
handleOut, &handleOutLen,
LOB_CLI_INSERT, LOB_CLI_ExecImmed,
&offset, &size,
&outDescPartnKey, &outDescSyskey,
0,
xnId,lobTrace_);
str_sprintf(logBuf,"After LOB_CLI_INSERT: ChunkNum:%d OutSyskey:%ld",
chunkNum,outDescSyskey);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
lobDebugInfo("Leaving ExLob::InsertDesc",0,__LINE__,lobTrace_);
if (clierr < 0 ) {
lobDebugInfo("LOB_CLI_INSERT cli call returned error :",clierr,__LINE__,TRUE);
return LOB_DESC_WRITE_ERROR;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::writeLobData(char *source, Int64 sourceLen, LobsSubOper subOperation, Int64 tgtOffset,Int64 &operLen, Int64 lobMaxChunkMemSize)
{
Ex_Lob_Error err=LOB_OPER_OK;
char logBuf[4096];
lobDebugInfo("In ExLob::writeLobData",0,__LINE__,lobTrace_);
HDFS_Client_RetCode hdfsClientRetcode = HDFS_CLIENT_OK;
Int64 writeOffset;
char *inputAddr = source;
Int64 readOffset = 0;
Int32 allocMemSize = 0;
Int64 inputSize = sourceLen;
writeOffset = tgtOffset;
if (subOperation == Lob_External_File)
return LOB_OPER_OK;
while(inputSize > 0)
{
allocMemSize = MINOF(lobMaxChunkMemSize, inputSize);
if (subOperation == Lob_File)
{
str_sprintf(logBuf,"reading source file %s allocMemSize : %d, readOffset:%ld", source,allocMemSize,readOffset);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
err = readSourceFile(source, inputAddr, allocMemSize, readOffset);
if (err != LOB_OPER_OK)
{
lobDebugInfo("readSouceFile returned an error",0,__LINE__,lobTrace_);
return err;
}
}
else
{ // in memory
}
err = writeData(writeOffset, inputAddr, allocMemSize, operLen);
if (err != LOB_OPER_OK)
{
str_sprintf(logBuf,"::writeData returned error .writeOffset:%ld, allocMemSize:%d, operLen %ld ", writeOffset,allocMemSize,operLen);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
//handle errors that happen in one of the chunks.
return err;
}
if (subOperation == Lob_File) {
writeOffset = writeOffset+allocMemSize;
readOffset = readOffset+allocMemSize;
inputSize = inputSize-lobMaxChunkMemSize;
getLobGlobalHeap()->deallocateMemory(inputAddr);
str_sprintf(logBuf,"Bookkeeping for Lob_File source.writeOffset:%ld, readOffset:%ld, inputSize: %ld ", writeOffset,readOffset, inputSize);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
}
else
{
writeOffset = writeOffset+allocMemSize;
inputSize = inputSize-lobMaxChunkMemSize;
inputAddr = inputAddr+allocMemSize;
str_sprintf(logBuf,"Bookkeeping for Lob_Memory source. writeOffset:%ld, inputSize: %ld ", writeOffset, inputSize);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
}
}
lobDebugInfo("Leaving ExLob::writeLobData",0,__LINE__,lobTrace_);
if (useLibHdfs_) {
hdfsCloseFile(fs_, fdData_);
fdData_=NULL;
}
return err;
}
Ex_Lob_Error ExLob::readToMem(char *memAddr, Int64 size, Int64 &operLen,char * handleIn, Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 transId)
{
Ex_Lob_Error err = LOB_OPER_OK;
NABoolean multipleChunks = FALSE;
int cliErr;
operLen = 0;
ExLobDesc desc;
Int64 sizeToRead = 0;
char logBuf[4096];
lobDebugInfo("In ExLob::readToMem",0,__LINE__,lobTrace_);
err = getDesc(desc,handleIn,handleInLen,blackBox, &blackBoxLen,handleOut,handleOutLen,transId);
if (err != LOB_OPER_OK)
{
return err;
}
sizeToRead = MINOF(size,desc.getSize());
if (blackBox && blackBoxLen >0 )
{
// we have received the external data file name from the descriptor table
// replace the contents of the lobDataFile with this name
char temp[blackBoxLen+1];
str_cpy_and_null(temp, blackBox, blackBoxLen, '\0', '0', TRUE);
lobDataFile_ = temp;
}
if (blackBoxLen == -1)
{
lobDebugInfo("Reading multiple chunks",0,__LINE__,lobTrace_);
sizeToRead = size;
multipleChunks = TRUE;
}
str_sprintf(logBuf,"sizeToRead:%ld, desc.size :%d", sizeToRead, desc.getSize());
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
err = readDataToMem(memAddr, desc.getOffset(),sizeToRead, operLen, handleIn,handleInLen, multipleChunks,transId);
return err;
}
LobInputOutputFileType ExLob::fileType(char *ioFileName)
{
std::string fileTgt(ioFileName);
std:string hdfsDirStr("hdfs://");
std::string httpStr("http://");
std:: string fileDirStr("file://");
short found = 0;
LobInputOutputFileType filetype;
bool isHdfs = FALSE;
bool isLocal = FALSE;
bool isExternal = FALSE;
bool isHdfsDir = FALSE;
bool isFileDir = FALSE;
if (((found = fileTgt.find(hdfsDirStr)) != std::string::npos) && (found == 0))
{
return HDFS_FILE;
}
else if (((found = fileTgt.find(fileDirStr)) != std::string::npos) &&(found == 0))
return LOCAL_FILE;
else if (((found = fileTgt.find(httpStr)) != std::string::npos) && (found == 0))
return CURL_FILE;
else
return LOCAL_FILE;
}
Ex_Lob_Error ExLob::readToFile(char *tgtFileName, Int64 tgtLength, Int64 &operLen, Int64 lobMaxChunkMemLen, Int32 fileflags,char *handleIn,Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 &handleOutLen, Int64 transId)
{
char logBuf[4096];
lobDebugInfo("In ExLob::readToFile",0,__LINE__,lobTrace_);
Ex_Lob_Error err = LOB_OPER_OK;
Int64 srcOffset = 0;
Int64 srcLength = 0;
LobInputOutputFileType tgtType = fileType(tgtFileName);
ExLobDesc desc;
NABoolean multipleChunks = FALSE;
err = getDesc(desc,handleIn,handleInLen,blackBox, &blackBoxLen,handleOut,handleOutLen,transId);
if (err != LOB_OPER_OK)
return err;
if (blackBoxLen == -1) // mxlobsrvr returned -1 indicating multiple chunks for this particular lob handle
{
lobDebugInfo("Reading multiple chunks",0,__LINE__,lobTrace_);
multipleChunks = TRUE;
//the data retrieval in chunks is handled in readDataToMem.
}
else if (tgtLength <=0 )
{
return LOB_SOURCE_FILE_READ_ERROR;
}
else
{
srcOffset = desc.getOffset();
}
if (blackBox)
{
// we have received the external data file name from the descriptor table
// replace the contents of the lobDataFile with this name
char temp[blackBoxLen+1];
str_cpy_and_null(temp, blackBox, blackBoxLen, '\0', '0', TRUE);
lobDataFile_ = temp;
}
if (tgtType == HDFS_FILE)
{
err = readDataToHdfsFile(tgtFileName, srcOffset , tgtLength,operLen, lobMaxChunkMemLen, fileflags,handleIn,handleInLen,multipleChunks,transId);
if (err != LOB_OPER_OK)
return err;
}
else if(tgtType == CURL_FILE)
{
err = readDataToExternalFile(tgtFileName, srcOffset, tgtLength, operLen, lobMaxChunkMemLen, fileflags,handleIn, handleInLen,multipleChunks,transId);
if (err != LOB_OPER_OK)
return err;
}
else if (tgtType == LOCAL_FILE)
{
err = readDataToLocalFile(tgtFileName,srcOffset, tgtLength,operLen, lobMaxChunkMemLen, fileflags,handleIn,handleInLen,multipleChunks,transId);
if (err != LOB_OPER_OK)
return err;
}
else
return LOB_TARGET_FILE_OPEN_ERROR; //unknown format
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::insertSelect(ExLob *srcLobPtr,
char *handleIn,Int32 handleInLen,char *source,
Int64 sourceLen, Int64 &operLen,
Int64 lobMaxSize, Int64 lobMaxChunkMemLen,
Int64 lobGCLimit, char *blackBox,
Int32 blackBoxLen,char *handleOut,
Int32 &handleOutLen,LobsSubOper so, Int64 xnId,void *lobGlobals)
{
Ex_Lob_Error err = LOB_OPER_OK;
Int32 cliRC;
Int16 flags;
Lng32 lobNum;
Int64 descNumOut = 0;
Int64 descNumIn = 0;
Int64 descSyskey = 0;
Int32 lobType = 0;
Int64 uid, inDescSyskey, descPartnKey;
short schNameLen;
char schName[1024];
Int64 inputLobDataLen = 0;
Int64 retOperLen = 0;
Int64 dummy = 0;
char logBuf[4096];
char sourceHandle[LOB_HANDLE_LEN] = {};
str_cpy_all(sourceHandle, source, sourceLen);
if (so == Lob_External_Lob)
{
ExLobDesc desc;
char extFileName[MAX_LOB_FILE_NAME_LEN+6];
Int64 extFileNameLen = 0;
// retrieve the external file name from the source log descriptor
err = getDesc(desc,sourceHandle,sourceLen,blackBox,&blackBoxLen,handleOut,handleOutLen,xnId);
if (err != LOB_OPER_OK)
return LOB_DESC_READ_ERROR;
// we have received the external data file name from the descriptor table
// replace the contents of the lobDataFile with this name
str_cpy_all(extFileName, blackBox, blackBoxLen);
extFileName[blackBoxLen]= '\0';
extFileNameLen = blackBoxLen;
// Now insert this into the target lob descriptor
err = writeDesc(extFileNameLen, extFileName, Lob_External_File, descNumOut,
retOperLen, lobMaxSize, lobMaxChunkMemLen,lobGCLimit,
handleIn,handleInLen,(char *)blackBox, &blackBoxLen,
handleOut,handleOutLen,xnId,lobGlobals);
if (err != LOB_OPER_OK)
return err;
return err;
}
// First retrieve length of the lob pointed to by source (input handle)
cliRC = SQL_EXEC_LOBcliInterface(sourceHandle, sourceLen,
NULL,NULL,NULL,NULL,
LOB_CLI_SELECT_LOBLENGTH,LOB_CLI_ExecImmed,
0,&inputLobDataLen, &dummy, &dummy,0,0,FALSE);
if (cliRC < 0)
{
str_sprintf(logBuf,"cli LOB_CLI_SElECT_LOBLENGTH returned :%d", cliRC);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
return LOB_DESC_READ_ERROR;
}
// Allocate memory to hold the lob data from the input lob handle
char *inputLobData = (char *)(getLobGlobalHeap()->allocateMemory(inputLobDataLen));
// retrieve the section/sections from the lob pointed to by the input handle
// into memory
err = srcLobPtr->readToMem(inputLobData, inputLobDataLen,retOperLen,
sourceHandle,sourceLen,
blackBox, blackBoxLen,
handleOut, handleOutLen,xnId);
if (err != LOB_OPER_OK)
return err;
// write the lob data into the target lob
err = writeDesc(inputLobDataLen, inputLobData, Lob_Memory, descNumOut, retOperLen, lobMaxSize, lobMaxChunkMemLen,lobGCLimit,handleIn,handleInLen,(char *)blackBox, &blackBoxLen,handleOut,handleOutLen,xnId,lobGlobals);
if (err != LOB_OPER_OK)
return err;
if (handleOutLen > 0)
{
ExpLOBoper::extractFromLOBhandle(NULL, &lobType, NULL, NULL, &descSyskey,
NULL, NULL, NULL, handleOut);
ExpLOBoper::updLOBhandle(descSyskey, 0, handleIn);
}
err = insertData(inputLobData, inputLobDataLen, Lob_Memory, descNumIn, retOperLen, lobMaxSize,lobMaxChunkMemLen,handleIn,handleInLen,(char *)blackBox, blackBoxLen,handleOut,handleOutLen,lobGlobals);
if (err != LOB_OPER_OK)
return err;
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::append(char *data, Int64 size, LobsSubOper so, Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize,Int64 lobMaxChunkMemSize, Int64 lobGCLimit, char *handleIn, Int32 handleInLen, char * handleOut, Int32 &handleOutLen,Int64 xnId,void *lobGlobals)
{
Ex_Lob_Error err = LOB_OPER_OK;
Int64 dummyParam;
Int64 dataOffset=0;
Int64 sourceLen = size;
Int32 clierr = 0;
Int32 chunkNum = 0;
Int64 outDescPartnKey, outDescSyskey = 0;
char logBuf[4096];
char *blackBox = NULL;
Int32 blackBoxLen = 0;
if (so ==Lob_External_File)
{
blackBox = data;
blackBoxLen = (Int32)size;
}
lobDebugInfo("In ExLob::append",0,__LINE__,lobTrace_);
if ((so == Lob_File))
{
err = statSourceFile(data, sourceLen);
if (err != LOB_OPER_OK)
return err;
}
if (sourceLen <= 0 || sourceLen > lobMaxSize)
{
return LOB_MAX_LIMIT_ERROR; //exceeded the size of the max lob size
}
err = allocateDesc((unsigned int)sourceLen, dummyParam, dataOffset, lobMaxSize,lobMaxChunkMemSize,handleIn, handleInLen,lobGCLimit,lobGlobals);
if (err != LOB_OPER_OK)
return err;
lobDebugInfo("Calling cli LOB_CLI_INSERT_APPEND",0,__LINE__,lobTrace_);
clierr = SQL_EXEC_LOBcliInterface(handleIn, handleInLen,
blackBox, &blackBoxLen,
handleOut, &handleOutLen,
LOB_CLI_INSERT_APPEND, LOB_CLI_ExecImmed,
&dataOffset, &sourceLen,
&outDescPartnKey, &outDescSyskey,
0,
xnId,lobTrace_);
if (clierr < 0 || clierr == 100) { // some error or EOD.
str_sprintf(logBuf,"cli LOB_CLI_INSERT_APPEND returned :%d", clierr);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
return LOB_DESC_APPEND_ERROR;
}
char *inputAddr = data;
str_sprintf(logBuf,"Calling writeLobData: inputAddr: %ld, InputSize%ld, tgtOffset:%ld",(long)inputAddr,sourceLen,dataOffset);
err = writeLobData(inputAddr, sourceLen,so,dataOffset,operLen,lobMaxChunkMemSize);
if (err != LOB_OPER_OK)
{
lobDebugInfo("writeLobData returned error",0,__LINE__,lobTrace_);
return err;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::insertData(char *data, Int64 size, LobsSubOper so,Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize,char * handleIn, Int32 handleInLen, char *blackBox, Int32 blackBoxLen, char * handleOut, Int32 &handleOutLen, void *lobGlobals)
{
Ex_Lob_Error err=LOB_OPER_OK;
ExLobDesc desc;
int clierr = 0;
operLen = 0;
char logBuf[4096];
lobDebugInfo("In ExLob::InsertData",0,__LINE__,lobTrace_);
str_sprintf(logBuf,"data:%ld, size %ld, lobMaxSize:%ld, lobMaxChunkMemSize:%ld", (long)data, size,lobMaxSize,lobMaxChunkMemSize);
// get offset and input size from desc (the one that was just
// inserted into the descriptor handle table)
err = getDesc(desc,handleIn,handleInLen,blackBox, &blackBoxLen,handleOut,handleOutLen,0);
if (err !=LOB_OPER_OK) { // some error or EOD.
lobDebugInfo("getDesc returned error",0,__LINE__,lobTrace_);
return LOB_DESC_READ_ERROR;
}
if ((data == NULL)) {
return LOB_SOURCE_DATA_ERROR;
}
char *inputAddr = data;
Int64 inputSize = desc.getSize();
Int64 tgtOffset = desc.getOffset();
str_sprintf(logBuf,"Calling writeLobData: inputAddr: %ld, InputSize%ld, tgtOffset:%ld",(long)inputAddr,inputSize,tgtOffset);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
err = writeLobData(inputAddr, inputSize,so, tgtOffset,
operLen,lobMaxChunkMemSize);
if (err != LOB_OPER_OK){
lobDebugInfo("writeLobData returned error",0,__LINE__,lobTrace_);
return err;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::update(char *data, Int64 size, LobsSubOper so,Int64 headDescNum, Int64 &operLen, Int64 lobMaxSize, Int64 lobMaxChunkMemSize, Int64 lobGCLimit, char *handleIn, Int32 handleInLen, char *handleOut, Int32 &handleOutLen, Int64 xnId,void *lobGlobals)
{
Ex_Lob_Error err = LOB_OPER_OK;
Int64 dummyParam;
Int64 dataOffset = 0;
Int64 sourceLen = size;
Int32 clierr = 0;
Int64 outDescPartnKey,outDescSyskey = 0;
Int32 chunkNum = 0;
char logBuf[4096];
char *blackBox = NULL;
Int32 blackBoxLen = 0;
if (so == Lob_External_File)
{
blackBox = data;
blackBoxLen = (Int32)size;
}
lobDebugInfo("In ExLob::update",0,__LINE__,lobTrace_);
if ((so == Lob_File) || (so == Lob_External_File))
{
str_sprintf(logBuf,"Calling statSourceFile: source:%s, sourceLen: %ld",
data,sourceLen);
lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
err = statSourceFile(data, sourceLen);
if (err != LOB_OPER_OK)
return err;
}
if(so != Lob_External_File)
{
if (sourceLen < 0 || sourceLen > lobMaxSize)
{
return LOB_MAX_LIMIT_ERROR; //exceeded the size of the max lob size
}
lobDebugInfo("Calling allocateDesc",0,__LINE__,lobTrace_);
err = allocateDesc((unsigned int)sourceLen, dummyParam, dataOffset, lobMaxSize, lobMaxChunkMemSize, handleIn, handleInLen, lobGCLimit,lobGlobals);
if (err != LOB_OPER_OK)
return err;
}
lobDebugInfo("Calling CLI LOB_CLI_UPDATE_UNIQUE",0,__LINE__,lobTrace_);
clierr = SQL_EXEC_LOBcliInterface(handleIn,
handleInLen,
blackBox, &blackBoxLen,
handleOut, &handleOutLen,
LOB_CLI_UPDATE_UNIQUE, LOB_CLI_ExecImmed,
&dataOffset, &sourceLen,
&outDescPartnKey, &outDescSyskey,
0,
xnId,lobTrace_);
if (clierr < 0 || clierr == 100) { // some error or EOD.
return LOB_DESC_UPDATE_ERROR;
}
if (sourceLen ==0 )
{
//No need to write any data
return err;
}
char *inputAddr = data;
str_sprintf(logBuf,"Calling writeLobData.sourceLen:%ld, dataOffset:%ld",sourceLen,dataOffset);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
err = writeLobData(inputAddr, sourceLen,so,dataOffset,operLen,lobMaxChunkMemSize);
str_sprintf(logBuf,"writeLobData returned. operLen:%ld",operLen);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
if (err != LOB_OPER_OK){
lobDebugInfo("writeLobData Failed",0,__LINE__,lobTrace_);
return err;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::delDesc(char *handleIn, Int32 handleInLen, Int64 transId)
{
Ex_Lob_Error err;
Int64 offset=0;
Int64 dummyParam=0;
Lng32 clierr=0;
clierr = SQL_EXEC_LOBcliInterface(handleIn, handleInLen,
0, 0,
(char *)&dummyParam, (Lng32 *)&dummyParam,
LOB_CLI_DELETE, LOB_CLI_ExecImmed,
&dummyParam, &dummyParam,
&dummyParam, &dummyParam,
0,
transId,lobTrace_);
if (clierr < 0)
return LOB_DESC_FILE_DELETE_ERROR;
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::purgeLob()
{
char logBuf[4096];
if (! useLibHdfs_) {
HDFS_Client_RetCode hdfsClientRetcode;
hdfsClientRetcode = HdfsClient::hdfsDeletePath(lobDataFile_.data());
if (hdfsClientRetcode != HDFS_CLIENT_OK)
return LOB_DATA_FILE_DELETE_ERROR;
return LOB_OPER_OK;
}
if (hdfsDelete(fs_, lobDataFile_.data(), 0) != 0)
{
// extract a substring small enough to fit into logBuf
size_t len = MINOF(lobDataFile_.length(),sizeof(logBuf)-40);
char lobDataFileSubstr[len+1]; // +1 for trailing null
strncpy(lobDataFileSubstr,lobDataFile_.data(),len);
lobDataFileSubstr[len] = '\0';
str_sprintf(logBuf,"hdfsDelete of %s returned error",lobDataFileSubstr);
lobDebugInfo("In ExLob::purgeLob",0,__LINE__,lobTrace_);
return LOB_DATA_FILE_DELETE_ERROR;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::openCursor(char *handleIn, Int32 handleInLen,Int64 transId)
{
Ex_Lob_Error err;
cursor_t cursor;
Int32 clierr;
Int64 dummyParam = 0;
void *cliInterface = NULL;
char logBuf[4096];
lobDebugInfo("In ExLob::openCursor",0,__LINE__,lobTrace_);
clierr = SQL_EXEC_LOBcliInterface(handleIn,
handleInLen,
0,0,
(char *)&dummyParam, (Lng32 *)&dummyParam,
LOB_CLI_SELECT_CURSOR, LOB_CLI_ExecImmed,
&dummyParam, &dummyParam,
&dummyParam, &dummyParam,
&cliInterface,
transId,lobTrace_);
if (clierr <0 ) {
str_sprintf(logBuf,"openCursor returned cliErr %d",clierr);
return LOB_DESC_READ_ERROR;
}
cursor.bytesRead_ = -1;
cursor.descOffset_ = -1;
cursor.descSize_ = -1;
cursor.cliInterface_ = cliInterface; // used only in lob process
cursor.eod_ = false;
cursor.eor_ = false;
cursor.eol_ = false;
lobCursors_it it = lobCursors_.find(string(handleIn, handleInLen));
if (it == lobCursors_.end())
{
lobCursors_.insert(pair<string, cursor_t>
(string(handleIn, handleInLen), cursor));
}
else
{
it->second = cursor;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::openDataCursor(const char *file, LobsCursorType type,
Int64 range, Int64 bufMaxSize,
Int64 maxBytes, Int64 waited,
ExLobGlobals *lobGlobals,
Int32 *hdfsDetailError)
{
Ex_Lob_Error err;
cursor_t cursor;
clock_gettime(CLOCK_MONOTONIC, &cursor.openTime_);
// check to see if cursor is already open.
// occurs for pre-open cases
lobCursorLock_.lock();
lobCursors_it it = lobCursors_.find(string(file, strlen(file)));
if (it != lobCursors_.end()) {
clock_gettime(CLOCK_MONOTONIC, &cursor.openTime_);
lobCursorLock_.unlock();
return LOB_OPER_OK;
}
union ranges_t {
Int64 range64;
struct {
Lng32 beginRange;
Lng32 numRanges;
}r;
} ranges;
cursor.bytesRead_ = -1;
cursor.descOffset_ = -1;
cursor.descSize_ = -1;
cursor.cliInterface_ = NULL; // used only in lob process
cursor.eod_ = false;
cursor.eor_ = false;
cursor.eol_ = false;
cursor.type_ = type;
cursor.bufMaxSize_ = bufMaxSize;
cursor.maxBytes_ = maxBytes;
cursor.prefetch_ = !waited;
cursor.bufferHits_ = 0;
cursor.bufferMisses_ = 0;
cursor.name_ = file;
cursor.currentRange_ = -1;
cursor.endRange_ = -1;
cursor.currentStartOffset_ = -1;
cursor.descOffset_ = range;
cursor.currentFd_ = NULL;
cursor.currentBytesToRead_ = -1;
cursor.currentBytesRead_ = 0;
cursor.currentEod_ = false;
lobCursors_.insert(pair<string, cursor_t>
(string(file, strlen(file)), cursor));
it = lobCursors_.find(string(file, strlen(file))); // to get the actual cursor object in the map
if (!fdData_ || (openFlags_ != O_RDONLY))
{
hdfsCloseFile(fs_, fdData_);
fdData_ = NULL;
openFlags_ = O_RDONLY;
fdData_ = hdfsOpenFile(fs_, lobDataFile_.data(), openFlags_, 0, 0, 0);
if (!fdData_)
{
openFlags_ = -1;
if (hdfsDetailError)
*hdfsDetailError = errno;
lobCursorLock_.unlock();
return LOB_DATA_FILE_OPEN_ERROR;
}
if (hdfsSeek(fs_, fdData_, (it->second).descOffset_) == -1)
{
lobCursorLock_.unlock();
return LOB_DATA_FILE_POSITION_ERROR;
}
}
// start reading in a worker thread
lobGlobals->enqueuePrefetchRequest(this, &(it->second));
lobCursorLock_.unlock();
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::readCursor(char *tgt, Int64 tgtSize, char *handleIn, Int32 handleInLen, Int64 &operLen,Int64 transId)
{
int dataOffset;
Ex_Lob_Error result;
cursor_t cursor;
char logBuf[4096];
lobCursors_it it = lobCursors_.find(string(handleIn, handleInLen));
lobDebugInfo("In ExLob::readCursor",0,__LINE__,lobTrace_);
if (it == lobCursors_.end())
{
return LOB_CURSOR_NOT_OPEN;
}
else
{
cursor = it->second;
}
str_sprintf(logBuf,"ExLob::readCursor:: cliInterface:%ld,bytesRead_:%ld,descOffset_:%lddescSize_:%ld,eod_:%d,eor_:%d,eol_:%d,",(long)cursor.cliInterface_,cursor.bytesRead_,cursor.descOffset_,cursor.descSize_,cursor.eod_,cursor.eor_,cursor.eol_);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
if (cursor.eod_) {
// remove cursor from the map.
// server has already closed the cursor.
closeCursor(handleIn, handleInLen,transId);
// indicate EOD to SQL
operLen = 0;
return LOB_OPER_OK;
}
result = readCursorData(tgt, tgtSize, cursor, operLen, handleIn,handleInLen,transId); // increments cursor
if (result != LOB_OPER_OK)
return result;
it->second = cursor;
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::closeCursor(char *handleIn, Int32 handleInLen, Int64 transId)
{
char logBuf[4096];
Int64 dummyParam = 0;
Int32 cliErr = 0;
Ex_Lob_Error err = LOB_OPER_OK;
lobCursors_it it = lobCursors_.find(string(handleIn, handleInLen));
if (it == lobCursors_.end())
{
// cursor already closed
return LOB_OPER_OK;
}
void *cliInterface = it->second.cliInterface_;
if (cliInterface)
{
cliErr = SQL_EXEC_LOBcliInterface(handleIn, handleInLen,
NULL, NULL,
(char *)&dummyParam, (Lng32 *)&dummyParam,
LOB_CLI_SELECT_CLOSE, LOB_CLI_ExecImmed,
&dummyParam, &dummyParam,
&dummyParam, &dummyParam,
&cliInterface,
transId,lobTrace_);
if (cliErr <0 )
{
str_sprintf(logBuf, "LOB_CLI_SELECT_CLOSE Returned cli error %d",cliErr);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
err = LOB_DESC_READ_ERROR;
return err;
}
}
if (it != lobCursors_.end())
{
str_sprintf(logBuf,"closing cursor for handle");
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
lobCursors_.erase(it);
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::allocateDesc(ULng32 size, Int64 &descNum, Int64 &dataOffset, Int64 lobMaxSize, Int64 lobMaxChunkMemLen, char *handleIn, Int32 handleInLen, Int64 lobGCLimit, void *lobGlobals)
{
NABoolean GCDone = FALSE;
Ex_Lob_Error err = LOB_OPER_OK;
Lng32 retval = 0;
Int64 numRead = 0;
Int64 numWritten = 0;
dataOffset = 0;
Int64 dummyParam = 0;
if (size > lobMaxSize)
return LOB_MAX_LIMIT_ERROR;
char logBuf[4096];
lobDebugInfo("In ExLob::allocateDesc",0,__LINE__,lobTrace_);
Int32 openFlags = O_RDONLY ;
HDFS_Client_RetCode hdfsClientRetcode;
if (! useLibHdfs_) {
if (size == 0) {
// Delete and Create the Hdfs file by passing overwrite to TRUE
hdfsClientRetcode = hdfsClient_->hdfsCreate(lobDataFile_.data(), TRUE, FALSE);
if (hdfsClientRetcode != HDFS_CLIENT_OK)
return LOB_DATA_FILE_WRITE_ERROR;
else {
dataOffset = 0;
return LOB_OPER_OK;
}
}
else {
dataOffset = hdfsClient_->hdfsSize(hdfsClientRetcode);
if (hdfsClientRetcode != HDFS_CLIENT_OK)
return LOB_DATA_FILE_WRITE_ERROR;
ex_assert(dataOffset >= 0, "Offset is -1 possibly due to path being directory");
return LOB_OPER_OK;
}
}
if (size == 0) //we are trying to empty this lob.
{
//rename lob datafile
char saveLobDataFile[lobDataFile_.length() + sizeof("_save")]; // sizeof includes room for null terminator
strcpy(saveLobDataFile,lobDataFile_.data());
strcpy(saveLobDataFile+lobDataFile_.length(),"_save");
Int32 rc2 = hdfsRename(fs_,lobDataFile_.data(),saveLobDataFile);
if (rc2 == -1)
{
lobDebugInfo("Problem renaming datafile to save data file",0,__LINE__,lobTrace_);
return LOB_DATA_FILE_WRITE_ERROR;
}
//create a new file of the same name.
hdfsFile fdNew = hdfsOpenFile(fs_, lobDataFile_.data(),O_WRONLY|O_CREAT,0,0,0);
if (!fdNew)
{
// extract a substring small enough to fit into logBuf
size_t len = MINOF(lobDataFile_.length(),sizeof(logBuf)-40);
char lobDataFileSubstr[len+1]; // +1 for trailing null
strncpy(lobDataFileSubstr,lobDataFile_.data(),len);
lobDataFileSubstr[len] = '\0';
str_sprintf(logBuf,"Could not create/open file:%s",lobDataFileSubstr);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
//restore previous version
Int32 rc2 = hdfsRename(fs_,saveLobDataFile,lobDataFile_.data());
if (rc2 == -1)
{
lobDebugInfo("Problem restoring datafile . Will need to retry the update",0,__LINE__,lobTrace_);
return LOB_DATA_FILE_WRITE_ERROR;
}
return LOB_DATA_FILE_OPEN_ERROR;
}
else
{
//A new empty data file has been created.
// delete the saved data file
Int32 rc2 = hdfsDelete(fs_,saveLobDataFile,FALSE);//ok to ignore error.nt32
if (rc2 == -1)
{
lobDebugInfo("Problem deleting saved datafile . Will need to manually cleanup saved datafile",0,__LINE__,lobTrace_);
}
hdfsCloseFile(fs_,fdNew);
fdNew = NULL;
}
}
hdfsFileInfo *fInfo = hdfsGetPathInfo(fs_, lobDataFile_.data());
if (fInfo)
dataOffset = fInfo->mSize;
// if -1, don't do GC or if reached the limit do GC
if ((lobGCLimit != -1) && (dataOffset > lobGCLimit))
{
str_sprintf(logBuf,"Starting GC. Current Offset : %ld",dataOffset);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
Int32 rc = SQL_EXEC_LOB_GC_Interface(lobGlobals,handleIn,handleInLen,
hdfsServer_,hdfsPort_,
(char *)lobStorageLocation_.c_str(),
lobMaxChunkMemLen,lobTrace_);
if (rc<0)
{
lobDebugInfo("GC failed",0,__LINE__,lobTrace_);
GCDone = FALSE;
}
else
GCDone = TRUE;
}
if (GCDone) // recalculate the new offset
{
hdfsFreeFileInfo(fInfo, 1);
fInfo = hdfsGetPathInfo(fs_, lobDataFile_.data());
}
if (fInfo)
dataOffset = fInfo->mSize;
// extract a substring small enough to fit into logBuf
size_t len = MINOF(lobDataFile_.length(),sizeof(logBuf)-70);
char lobDataFileSubstr[len+1]; // +1 for trailing null
strncpy(lobDataFileSubstr,lobDataFile_.data(),len);
lobDataFileSubstr[len] = '\0';
if (GCDone)
str_sprintf(logBuf,"Done GC. Allocating new Offset %ld in %s",
dataOffset,lobDataFileSubstr);
else
str_sprintf(logBuf,"Allocating new Offset %ld in %s ",
dataOffset,lobDataFileSubstr);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
//Find the last offset in the file
// dataOffset = hdfsTell(fs_,fdData_); //commenting out.hdfsTell always returns 0 !!
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::compactLobDataFile(ExLobInMemoryDescChunksEntry *dcArray,Int32 numEntries)
{
Ex_Lob_Error rc = LOB_OPER_OK;
char logBuf[4096];
lobDebugInfo("In ExLob::compactLobDataFile",0,__LINE__,lobTrace_);
Int64 maxMemChunk = 64*1024*1024; //64 MB limit for intermediate buffer for transfering data
// make some temporary file names
size_t len = lobDataFile_.length();
char saveLobDataFile[len + sizeof("_save")]; // sizeof includes room for null terminator
strcpy(saveLobDataFile,lobDataFile_.data());
strcpy(saveLobDataFile+len,"_save");
char tmpLobDataFile[len + sizeof("_tmp")]; // sizeof includes room for null terminator
strcpy(tmpLobDataFile,lobDataFile_.data());
strcpy(tmpLobDataFile+len,"_tmp");
// extract small enough bits of these file names to fit in logBuf
len = MINOF(lobDataFile_.length(),sizeof(logBuf)/3 - 20);
char lobDataFileSubstr[len + 1];
strncpy(lobDataFileSubstr,lobDataFile_.data(),len);
lobDataFileSubstr[len] = '\0';
len = MINOF(sizeof(tmpLobDataFile),sizeof(logBuf)/3 - 20);
char tmpLobDataFileSubstr[len + 1];
strncpy(tmpLobDataFileSubstr,tmpLobDataFile,len);
tmpLobDataFileSubstr[len] = '\0';
len = MINOF(sizeof(saveLobDataFile),sizeof(logBuf)/3 - 20);
char saveLobDataFileSubstr[len + 1];
strncpy(saveLobDataFileSubstr,saveLobDataFile,len);
saveLobDataFileSubstr[len] = '\0';
str_sprintf(logBuf,"DataFile %s, TempDataFile : %s, SaveDataFile : %s ",
lobDataFileSubstr,tmpLobDataFileSubstr, saveLobDataFileSubstr);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
HDFS_Client_RetCode hdfsClientRetcode = HDFS_CLIENT_OK;
HdfsClient *srcHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode);
ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error");
HdfsClient *dstHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode);
ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error");
hdfsClientRetcode = srcHdfsClient->hdfsOpen(lobDataFile_.data(), FALSE);
if (hdfsClientRetcode != HDFS_CLIENT_OK)
{
// extract substring small enough to fit in logBuf
len = MINOF(lobDataFile_.length(),sizeof(logBuf) - 40);
char lobDataFileSubstr2[len + 1];
strncpy(lobDataFileSubstr2,lobDataFile_.data(),len);
lobDataFileSubstr2[len] = '\0';
str_sprintf(logBuf,"Could not open file:%s",lobDataFileSubstr2);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
return LOB_DATA_FILE_OPEN_ERROR;
}
hdfsClientRetcode = dstHdfsClient->hdfsCreate(tmpLobDataFile, TRUE, FALSE);
if (hdfsClientRetcode != HDFS_CLIENT_OK)
{
// extract substring small enough to fit in logBuf
len = MINOF(sizeof(tmpLobDataFile),sizeof(logBuf)/3 - 20);
char tmpLobDataFileSubstr2[len + 1];
strncpy(tmpLobDataFileSubstr2,tmpLobDataFile,len);
tmpLobDataFileSubstr2[len] = '\0';
str_sprintf(logBuf,"Could not open file:%s",tmpLobDataFileSubstr2);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
srcHdfsClient->hdfsClose();
HdfsClient::deleteInstance(srcHdfsClient);
return LOB_DATA_FILE_OPEN_ERROR;
}
Int32 i = 0;
Int64 bytesRead = 0;
Int64 bytesWritten = 0;
Int64 size = 0;
Int64 chunkLen = 0;
Int64 readLen = 0;
Int64 offset;
char * tgt = NULL;
Ex_Lob_Error saveError = LOB_OPER_OK;
tgt = (char *)(getLobGlobalHeap())->allocateMemory(maxMemChunk);
while ((i < numEntries) && (saveError == LOB_OPER_OK))
{
readLen = dcArray[i].getChunkLen();
offset = dcArray[i].getCurrentOffset();
while (readLen > 0)
{
if (readLen > maxMemChunk)
chunkLen = maxMemChunk;
else
chunkLen = readLen;
bytesRead = srcHdfsClient->hdfsRead(offset, tgt, chunkLen, hdfsClientRetcode);
if (hdfsClientRetcode != HDFS_CLIENT_OK) {
lobDebugInfo("Problem reading from data file",0,__LINE__,lobTrace_);
saveError = LOB_SOURCE_FILE_READ_ERROR;
break;
}
bytesWritten = dstHdfsClient->hdfsWrite(tgt, chunkLen, hdfsClientRetcode);
if (hdfsClientRetcode != HDFS_CLIENT_OK || bytesWritten != chunkLen) {
lobDebugInfo("Problem writing temp data file",0,__LINE__,lobTrace_);
saveError = LOB_DATA_FILE_WRITE_ERROR;
break;
}
readLen -= chunkLen;
offset += chunkLen;
}
i++;
}
getLobGlobalHeap()->deallocateMemory(tgt);
srcHdfsClient->hdfsClose();
dstHdfsClient->hdfsClose();
HdfsClient::deleteInstance(srcHdfsClient);
HdfsClient::deleteInstance(dstHdfsClient);
if (saveError != LOB_OPER_OK)
return saveError;
//Now save the data file and rename the tempfile to the original datafile
hdfsClientRetcode = HdfsClient::hdfsRename(lobDataFile_.data(),saveLobDataFile);
if (hdfsClientRetcode != HDFS_CLIENT_OK) {
lobDebugInfo("Problem renaming datafile to save data file",0,__LINE__,lobTrace_);
return LOB_DATA_FILE_WRITE_ERROR;
}
hdfsClientRetcode = HdfsClient::hdfsRename(tmpLobDataFile, lobDataFile_.data());
if (hdfsClientRetcode != HDFS_CLIENT_OK) {
lobDebugInfo("Problem renaming temp datafile to data file",0,__LINE__,lobTrace_);
return LOB_DATA_FILE_WRITE_ERROR;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::restoreLobDataFile()
{
lobDebugInfo("In ExLob::restoreLobDataFile",0,__LINE__,lobTrace_);
HDFS_Client_RetCode hdfsClientRetcode;
char saveLobDataFile[lobDataFile_.length() + sizeof("_save")]; // sizeof includes room for null terminator
strcpy(saveLobDataFile,lobDataFile_.data());
strcpy(saveLobDataFile+lobDataFile_.length(),"_save");
hdfsClientRetcode = HdfsClient::hdfsDeletePath(lobDataFile_.data());
hdfsClientRetcode = HdfsClient::hdfsRename(saveLobDataFile, lobDataFile_.data());
if (hdfsClientRetcode != HDFS_CLIENT_OK)
{
lobDebugInfo("Problem renaming savedatafile to data file",0,__LINE__,lobTrace_);
return LOB_OPER_ERROR;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::purgeBackupLobDataFile()
{
Ex_Lob_Error rc = LOB_OPER_OK;
lobDebugInfo("In ExLob::purgeBackupLobDataFile",0,__LINE__,lobTrace_);
char saveLobDataFile[lobDataFile_.length() + sizeof("_save")]; // sizeof includes room for null terminator
strcpy(saveLobDataFile,lobDataFile_.data());
strcpy(saveLobDataFile+lobDataFile_.length(),"_save");
HDFS_Client_RetCode hdfsClientRetcode = HdfsClient::hdfsDeletePath(saveLobDataFile);//ok to ignore error.
return rc;
}
///////////////////////////////////////////////////////////////////////////////
// ExLobDescHeader definitions
///////////////////////////////////////////////////////////////////////////////
ExLobDescHeader::ExLobDescHeader(unsigned int size) :
freeDesc_(0),
dataOffset_(0),
availSize_(size)
{
}
ExLobDescHeader::~ExLobDescHeader()
{
}
///////////////////////////////////////////////////////////////////////////////
// ExLobDesc definitions
///////////////////////////////////////////////////////////////////////////////
ExLobDesc::ExLobDesc(int offset, int size, int tail) :
dataOffset_(offset),
dataSize_(size),
dataState_(EX_LOB_DATA_INITIALIZING),
tail_(tail),
next_(-1),
prev_(-1),
nextFree_(-1)
{
}
ExLobDesc::~ExLobDesc()
{
}
Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 tgtSize, cursor_t &cursor, Int64 &operLen, char *handleIn, Int32 handleLenIn, Int64 transId)
{
ExLobDesc desc;
Ex_Lob_Error err;
Int64 bytesAvailable = 0;
Int64 bytesToCopy = 0;
Int64 bytesRead = 0;
operLen = 0;
tOffset offset;
struct timespec startTime;
struct timespec endTime;
NABoolean isEOD=FALSE;
Int64 outOffset = 0;
Int64 outSize = 0;
char logBuf[4096];
lobDebugInfo("In ExLob::readCursorData",0,__LINE__,lobTrace_);
while ( (operLen < tgtSize) && !cursor.eod_ )
{
if (cursor.bytesRead_ == cursor.descSize_) // time to read next chunck
{
err = fetchCursor(handleIn, handleLenIn,outOffset, outSize,isEOD,transId);
if (err != LOB_OPER_OK) {
return err;
}
if (isEOD) {
cursor.eod_ = true; // subsequent call will return 100 and close the cursor
continue;
} else {
cursor.descSize_ = outSize;
cursor.descOffset_ = outOffset;
cursor.bytesRead_ = 0;
if (outSize == 0) // this is an empty lob entry
continue;
}
}
bytesAvailable = cursor.descSize_ - cursor.bytesRead_;
bytesToCopy = min(bytesAvailable, tgtSize - operLen);
offset = cursor.descOffset_ + cursor.bytesRead_;
if (!useLibHdfs_) {
HDFS_Client_RetCode hdfsClientRetcode;
if (storage_ == Lob_External_HDFS_File) {
HdfsClient *srcHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode);
ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error");
hdfsClientRetcode = srcHdfsClient->hdfsOpen(lobDataFile_.data(), FALSE);
if (hdfsClientRetcode != HDFS_CLIENT_OK)
return LOB_SOURCE_FILE_OPEN_ERROR;
bytesRead = srcHdfsClient->hdfsRead(offset, tgt, bytesToCopy, hdfsClientRetcode);
if (hdfsClientRetcode != HDFS_CLIENT_OK) {
HdfsClient::deleteInstance(srcHdfsClient);
return LOB_SOURCE_FILE_READ_ERROR;
}
HdfsClient::deleteInstance(srcHdfsClient);
}
else {
bytesRead = hdfsClient_->hdfsRead(offset, tgt, bytesToCopy, hdfsClientRetcode);
if (hdfsClientRetcode != HDFS_CLIENT_OK)
return LOB_DATA_READ_ERROR;
}
}
else {
// #endif
if (!fdData_ || (openFlags_ != O_RDONLY))
{
hdfsCloseFile(fs_, fdData_);
fdData_=NULL;
openFlags_ = O_RDONLY;
fdData_ = hdfsOpenFile(fs_, lobDataFile_.data(), openFlags_, 0, 0, 0);
if (!fdData_)
{
openFlags_ = -1;
return LOB_DATA_FILE_OPEN_ERROR;
}
}
clock_gettime(CLOCK_MONOTONIC, &startTime);
bytesRead = hdfsPread(fs_, fdData_, offset, tgt, bytesToCopy);
str_sprintf(logBuf,"After hdfsPread: BytesToCopy:%ld, Offset:%ld, tgt:%ld, BytesRead :%ld",
bytesToCopy,offset,(long)tgt,bytesRead);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
clock_gettime(CLOCK_MONOTONIC, &endTime);
Int64 secs = endTime.tv_sec - startTime.tv_sec;
Int64 nsecs = endTime.tv_nsec - startTime.tv_nsec;
if (nsecs < 0) {
secs--;
nsecs += NUM_NSECS_IN_SEC;
}
Int64 totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs;
} // useLibHdfs
if (bytesRead == -1) {
return LOB_DATA_READ_ERROR;
} else if (bytesRead == 0) {
cursor.eod_ = true;
continue;
}
cursor.bytesRead_ += bytesRead;
operLen += bytesRead;
tgt += bytesRead;
}
if (useLibHdfs_) {
hdfsCloseFile(fs_, fdData_);
fdData_ = NULL;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::readDataToMem(char *memAddr,
Int64 offset, Int64 size, Int64 &operLen,
char *handleIn, Int32 handleLenIn,
NABoolean multipleChunks, Int64 transId)
{
Ex_Lob_Error err = LOB_OPER_OK;
operLen = 0;
Int64 bytesRead = 0;
char logBuf[4096];
lobDebugInfo("In ExLob::readToMem",0,__LINE__,lobTrace_);
if (multipleChunks)
{
lobDebugInfo("Reading in multiple chunks",0,__LINE__,lobTrace_);
err = openCursor(handleIn,
handleLenIn,transId);
//now we can fetch the descriptors for each chunk
}
if (err != LOB_OPER_OK)
return err;
if (useLibHdfs_)
{
if (fdData_)// we may have a stale handle. close and open to refresh
{
hdfsCloseFile(fs_, fdData_);
fdData_=NULL;
openFlags_ = O_RDONLY;
fdData_ = hdfsOpenFile(fs_, lobDataFile_.data(), openFlags_, 0, 0, 0);
if (!fdData_)
{
openFlags_ = -1;
return LOB_DATA_FILE_OPEN_ERROR;
}
}
else
{
fdData_ = hdfsOpenFile(fs_, lobDataFile_.data(), openFlags_, 0, 0, 0);
if (!fdData_)
{
openFlags_ = -1;
return LOB_DATA_FILE_OPEN_ERROR;
}
}
} // useLibHdfs_
if (!multipleChunks)
{
if (! useLibHdfs_) {
HDFS_Client_RetCode hdfsClientRetcode;
Int32 readLen;
if (storage_ == Lob_External_HDFS_File) {
HdfsClient *srcHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode);
ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error");
hdfsClientRetcode = srcHdfsClient->hdfsOpen(lobDataFile_.data(), FALSE);
if (hdfsClientRetcode != HDFS_CLIENT_OK) {
HdfsClient::deleteInstance(srcHdfsClient);
return LOB_SOURCE_FILE_OPEN_ERROR;
}
readLen = srcHdfsClient->hdfsRead(offset, memAddr, size, hdfsClientRetcode);
if (hdfsClientRetcode != HDFS_CLIENT_OK) {
HdfsClient::deleteInstance(srcHdfsClient);
return LOB_SOURCE_FILE_READ_ERROR;
}
HdfsClient::deleteInstance(srcHdfsClient);
operLen = readLen;
}
else {
readLen = hdfsClient_->hdfsRead(offset, memAddr, size, hdfsClientRetcode);
if (hdfsClientRetcode != HDFS_CLIENT_OK)
return LOB_DATA_READ_ERROR;
operLen = readLen;
}
return LOB_OPER_OK;
}
lobDebugInfo("Reading in single chunk",0,__LINE__,lobTrace_);
if ((bytesRead = hdfsPread(fs_, fdData_, offset,
memAddr, size)) == -1) {
return LOB_DATA_READ_ERROR;
}
// extract a substring small enough to fit into logBuf
size_t len = MINOF(lobDataFile_.length(),sizeof(logBuf)-100);
char lobDataFileSubstr[len+1]; // +1 for trailing null
strncpy(lobDataFileSubstr,lobDataFile_.data(),len);
lobDataFileSubstr[len] = '\0';
//str_sprintf(logBuf,"After hdfsPread: File:%s, Offset:%ld, Size:%ld,Target Mem Addr:%d",
// lobDataFileSubstr,offset,size,memAddr);
str_sprintf(logBuf,"After hdfsPread: File:%s, Offset:%ld, Size:%ld",
lobDataFileSubstr,offset,size);
lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
operLen = bytesRead;
return LOB_OPER_OK;
}
else
{
//handle reading the multiple chunks like a cursor
err = readCursor(memAddr,size, handleIn,
handleLenIn, operLen, transId);
if (err==LOB_OPER_OK)
closeCursor(handleIn,
handleLenIn,transId);
else
return err;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::readDataToLocalFile(char *fileName, Int64 offset, Int64 size, Int64 &writeOperLen, Int64 lobMaxChunkMemSize, Int32 fileflags,char *handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId)
{
Ex_Lob_Error err;
Int64 operLen = 0;
Int64 srcLen = size;
Int64 srcOffset = offset;
Int64 tgtOffset = 0;
char *lobData = 0;
Int64 chunkSize = 0;
char logBuf[4096];
lobDebugInfo("In ExLob::readDataToLocalFile",0,__LINE__,lobTrace_);
if (srcLen <=0)
return LOB_SOURCE_DATA_ALLOC_ERROR;
// open the target file for writing
int filePerms = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
int openFlags = O_RDWR ; // O_DIRECT needs mem alignment
if (((LobTgtFileFlags)fileflags == Lob_Append_Or_Error ) ||
((LobTgtFileFlags)fileflags == Lob_Error_Or_Create ) ||
((LobTgtFileFlags)fileflags == Lob_Append_Or_Create))
openFlags |= O_APPEND;
else
openFlags |= O_TRUNC;
int fdDestFile = open(fileName, openFlags, filePerms);
if (fdDestFile >=0 )
{
if ((LobTgtFileFlags)fileflags == Lob_Error_Or_Create)
return LOB_TARGET_FILE_EXISTS_ERROR;
}
if (fdDestFile == -1)
{
if (((LobTgtFileFlags)fileflags == Lob_Append_Or_Error) ||
((LobTgtFileFlags)fileflags == Lob_Truncate_Or_Error))
return LOB_TARGET_FILE_OPEN_ERROR;
else
{
openFlags = O_CREAT | O_RDWR ;
fdDestFile = open(fileName, openFlags, filePerms);
if (fdDestFile == -1)
return LOB_TARGET_FILE_OPEN_ERROR;
}
}
if ((srcLen < lobMaxChunkMemSize) && (multipleChunks ==FALSE)) // simple single I/O case
{
lobDebugInfo("Reading in single chunk",0,__LINE__,lobTrace_);
lobData = (char *) (getLobGlobalHeap())->allocateMemory(srcLen);
if (lobData == NULL)
{
return LOB_SOURCE_DATA_ALLOC_ERROR;
}
err = readDataToMem(lobData, srcOffset,srcLen,operLen, handleIn,handleInLen, multipleChunks,transId);
if (err != LOB_OPER_OK)
{
getLobGlobalHeap()->deallocateMemory(lobData);
return err;
}
writeOperLen += pwrite(fdDestFile, lobData, srcLen, tgtOffset) ;
if (writeOperLen <= 0)
{
getLobGlobalHeap()->deallocateMemory(lobData);
return LOB_TARGET_FILE_WRITE_ERROR;
}
getLobGlobalHeap()->deallocateMemory(lobData);
}
else // multiple chunks to read
{
lobDebugInfo("Reading in multiple chunks into local file",0,__LINE__,lobTrace_);
err = openCursor(handleIn,
handleInLen,transId);
if (err != LOB_OPER_OK)
return err;
while ( srcLen > 0)
{
chunkSize = MINOF(srcLen, lobMaxChunkMemSize);
lobData = (char *) (getLobGlobalHeap())->allocateMemory(chunkSize);
if (lobData == NULL)
{
getLobGlobalHeap()->deallocateMemory(lobData);
return LOB_SOURCE_DATA_ALLOC_ERROR;
}
//handle reading the multiple chunks like a cursor
err = readCursor(lobData,chunkSize, handleIn,
handleInLen, operLen, transId);
if ((operLen == 0) && (err == LOB_OPER_OK)) //this may be an empty lob section
continue;
if ((err != LOB_OPER_OK) || (operLen != chunkSize))
{
getLobGlobalHeap()->deallocateMemory(lobData);
return LOB_DATA_READ_ERROR;
}
writeOperLen += pwrite(fdDestFile, lobData, chunkSize, tgtOffset) ;
if (writeOperLen <= 0)
{
getLobGlobalHeap()->deallocateMemory(lobData);
return LOB_TARGET_FILE_WRITE_ERROR;
}
getLobGlobalHeap()->deallocateMemory(lobData);
srcLen -= chunkSize;
tgtOffset += chunkSize;
}
closeCursor(handleIn,
handleInLen,transId);
}
close(fdDestFile);
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::readDataToHdfsFile(char *tgtFileName, Int64 offset, Int64 size, Int64 &writeOperLen, Int64 lobMaxChunkMemLen, Int32 fileflags,char *handleIn, Int32 handleInLen, NABoolean multipleChunks,Int64 transId)
{
Ex_Lob_Error err;
Int64 operLen = 0;
Int64 srcLen = size;
Int64 srcOffset = offset;
Int64 tgtOffset = 0;
char *lobData = 0;
Int64 chunkSize = 0;
hdfsFile fdTgtFile;
char logBuf[4096];
lobDebugInfo("In ExLob::readDataToHdfsFile",0,__LINE__,lobTrace_);
// open and write to the target file
int openFlags = O_WRONLY;
if ((LobTgtFileFlags)fileflags == Lob_Append_Or_Error )
openFlags |= O_APPEND;
//hdfsFile fdTgtFile = hdfsOpenFile(fs_,tgtFileName, openFlags, 0,0,0);
if (hdfsExists(fs_,tgtFileName) == 0)
{
if ((LobTgtFileFlags)fileflags == Lob_Error_Or_Create)
return LOB_TARGET_FILE_EXISTS_ERROR;
else
{
openFlags = O_WRONLY ;
if ((LobTgtFileFlags)fileflags == Lob_Append_Or_Error )
openFlags |= O_APPEND;
fdTgtFile = hdfsOpenFile(fs_, tgtFileName, openFlags, 0,0,0);
if (fdTgtFile == NULL)
return LOB_TARGET_FILE_OPEN_ERROR;
}
}
else
{
if (((LobTgtFileFlags)fileflags == Lob_Append_Or_Error) ||
((LobTgtFileFlags)fileflags == Lob_Truncate_Or_Error))
return LOB_TARGET_FILE_OPEN_ERROR;
else
{
openFlags = O_WRONLY ;
fdTgtFile = hdfsOpenFile(fs_, tgtFileName, openFlags, 0,0,0);
if (fdTgtFile == NULL)
return LOB_TARGET_FILE_OPEN_ERROR;
}
}
if ((srcLen < lobMaxChunkMemLen) && (multipleChunks ==FALSE)) // simple single I/O case
{
lobDebugInfo("Reading in single chunk",0,__LINE__,lobTrace_);
lobData = (char *) (getLobGlobalHeap())->allocateMemory(srcLen);
if (lobData == NULL)
{
return LOB_SOURCE_DATA_ALLOC_ERROR;
}
err = readDataToMem(lobData, srcOffset,srcLen,operLen,handleIn,handleInLen, multipleChunks,transId);
if (err != LOB_OPER_OK)
{
getLobGlobalHeap()->deallocateMemory(lobData);
return err;
}
writeOperLen += hdfsWrite(fs_,fdTgtFile,lobData, srcLen);
if (writeOperLen <= 0)
{
getLobGlobalHeap()->deallocateMemory(lobData);
return LOB_TARGET_FILE_WRITE_ERROR;
}
if (hdfsFlush(fs_, fdTgtFile))
{
getLobGlobalHeap()->deallocateMemory(lobData);
return LOB_DATA_FLUSH_ERROR;
}
getLobGlobalHeap()->deallocateMemory(lobData);
}
else
{// multiple chunks to read
lobDebugInfo("Reading in multiple chunks into local file",0,__LINE__,lobTrace_);
err = openCursor(handleIn,
handleInLen,
transId);
if (err != LOB_OPER_OK)
return err;
while ( srcLen > 0)
{
chunkSize = MINOF(srcLen, lobMaxChunkMemLen);
lobData = (char *) (getLobGlobalHeap())->allocateMemory(chunkSize);
if (lobData == NULL)
{
getLobGlobalHeap()->deallocateMemory(lobData);
return LOB_SOURCE_DATA_ALLOC_ERROR;
}
//handle reading the multiple chunks like a cursor
err = readCursor(lobData,chunkSize, handleIn,
handleInLen, operLen, transId);
if ((operLen == 0) && (err == LOB_OPER_OK)) //this may be an empty lob section
continue;
if ((err != LOB_OPER_OK) || (operLen != chunkSize))
{
getLobGlobalHeap()->deallocateMemory(lobData);
return LOB_DATA_READ_ERROR;
}
writeOperLen += hdfsWrite(fs_,fdTgtFile,lobData, chunkSize);
if (writeOperLen <= 0)
{
getLobGlobalHeap()->deallocateMemory(lobData);
return LOB_TARGET_FILE_WRITE_ERROR;
}
if (hdfsFlush(fs_, fdTgtFile))
{
getLobGlobalHeap()->deallocateMemory(lobData);
return LOB_DATA_FLUSH_ERROR;
}
getLobGlobalHeap()->deallocateMemory(lobData);
srcLen -= chunkSize;
}
closeCursor(handleIn,
handleInLen,transId);
}
hdfsCloseFile(fs_, fdTgtFile);
fdTgtFile=NULL;
hdfsCloseFile(fs_,fdData_);
fdData_=NULL;
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::readDataToExternalFile(char *tgtFileName, Int64 offset, Int64 size, Int64 &operLen,Int64 lobMaxChunkMemLen,Int32 fileflags,char *handleIn,Int32 handleInLen, NABoolean multipleChunks,Int64 transId)
{
//TBD
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::closeFile()
{
if (fdData_)
{
hdfsCloseFile(fs_, fdData_);
fdData_ = NULL;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::readStats(char *statsBuffer)
{
stats_ = (ExHdfsScanStats *)statsBuffer;
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::initStats()
{
return LOB_OPER_OK;
}
//Main driver of any LOB related operation
Ex_Lob_Error ExLobsOper (
char *lobName, // lob name
ExHdfsScanStats *hdfsAccessStats,
char *handleIn, // input handle (for cli calls)
Int32 handleInLen, // input handle len
char *hdfsServer, // server where hdfs fs resides
Int64 hdfsPort, // port number to access hdfs server
char *handleOut, // output handle (for cli calls)
Int32 &handleOutLen, // output handle len
Int64 descNumIn, // input desc Num (for flat files only)
Int64 &descNumOut, // output desc Num (for flat files only)
Int64 &retOperLen, // length of data involved in this operation
Int64 requestTagIn, // only for checking status
Int64 &requestTagOut, // returned with every request other than check status
Ex_Lob_Error &requestStatus, // returned req status
Int64 &cliError, // err returned by cli call
char *lobStorageLocation, // directory in the storage
LobsStorage storage, // storage type
char *source, // source (memory addr, filename, foreign lob etc)
Int64 sourceLen, // source len (memory len, foreign desc offset etc)
Int64 cursorBytes,
char *cursorId,
LobsOper operation, // LOB operation
LobsSubOper subOperation, // LOB sub operation
Int64 waited, // waited or nowaited
ExLobGlobals *&globPtr, // ptr to the Lob objects.
Int64 transId,
void *blackBox, // black box to be sent to cli
Int32 blackBoxLen, // length of black box
Int64 lobMaxSize,
Int64 lobMaxChunkMemSize,
Int64 lobGCLimit,
int bufferSize ,
short replication ,
int blockSize,
Lng32 openType)
{
Ex_Lob_Error err = LOB_OPER_OK;
ExLob *lobPtr = NULL;
struct timespec startTime;
struct timespec endTime;
Int64 secs, nsecs, totalnsecs;
ExLobPreOpen *preOpenObj;
ExLobGlobals *lobGlobals = NULL;
transId = 0;
retOperLen = 0;
ExLobDesc desc;
lobMap_t *lobMap = NULL;
lobMap_it it;
clock_gettime(CLOCK_MONOTONIC, &startTime);
const char *fileName = lobName;
if (globPtr == NULL)
{
if ((operation == Lob_Init))
{
NAHeap *lobHeap = (NAHeap *)blackBox;
globPtr = new (lobHeap) ExLobGlobals(lobHeap);
if (globPtr == NULL)
return LOB_INIT_ERROR;
lobGlobals = (ExLobGlobals *)globPtr;
err = lobGlobals->initialize();
if (err != LOB_OPER_OK)
return err;
}
else
{
return LOB_GLOB_PTR_ERROR;
}
}
if (globPtr != NULL)
{
lobGlobals = (ExLobGlobals *)globPtr;
if ((operation != Lob_Init) && (operation != Lob_Cleanup))
{
lobMap = lobGlobals->getLobMap();
it = lobMap->find(string(fileName));
if (it == lobMap->end())
{
lobPtr = new (lobGlobals->getHeap())ExLob(lobGlobals->getHeap(), hdfsAccessStats);
if (lobPtr == NULL)
return LOB_ALLOC_ERROR;
err = lobPtr->initialize(fileName, (operation == Lob_Create) ? EX_LOB_CREATE : EX_LOB_RW, lobStorageLocation, storage, hdfsServer, hdfsPort, lobStorageLocation,bufferSize, replication, blockSize,lobMaxSize,lobGlobals);
if (err != LOB_OPER_OK)
{
char buf[5000];
str_sprintf(buf,"Lob initialization failed;filename:%s;location:%s;hdfsserver:%s;lobMaxSize:%ld",fileName,lobStorageLocation,hdfsServer,lobMaxSize);
lobDebugInfo(buf,err,__LINE__,lobGlobals->lobTrace_);
return err;
}
lobMap->insert(pair<string, ExLob*>(string(fileName), lobPtr));
}
else
{
lobPtr = it->second;
}
lobPtr->lobTrace_ = lobGlobals->lobTrace_;
}
}
/*
// **Note** This is code that needs to get called before sneding a request to the
//mxlobsrvr process. It's inactive code currently
MS_Mon_Transid_Type transIdBig;
MS_Mon_Transseq_Type transStartId;
if (!lobGlobals->isHive())
{
// get current transaction
int transIdErr = ms_transid_get(false, false, &transIdBig, &transStartId);
// set the pass thru request object values in the lob
lobPtr->getRequest()->setValues(lobPtr->getDescFileName(),
descNumIn, handleInLen, handleIn, storage,
transId, transIdBig, transStartId,
(char *)blackBox, blackBoxLen);
}
*/
switch(operation)
{
case Lob_Init:
case Lob_Create:
break;
case Lob_InsertDesc:
err = lobPtr->writeDesc(sourceLen, source, subOperation, descNumOut, retOperLen, lobMaxSize, lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,(char *)blackBox, &blackBoxLen,handleOut,handleOutLen,transId,lobGlobals);
if (err != LOB_OPER_OK)
{
lobDebugInfo("writeDesc failed ",err,__LINE__,lobGlobals->lobTrace_);
}
break;
case Lob_InsertData:
err = lobPtr->insertData(source, sourceLen, subOperation, descNumIn, retOperLen, lobMaxSize,lobMaxChunkMemSize,handleIn,handleInLen,(char *)blackBox, blackBoxLen,handleOut,handleOutLen,lobGlobals);
if (err != LOB_OPER_OK)
{
lobDebugInfo("insertData failed ",err,__LINE__,lobGlobals->lobTrace_);
}
break;
case Lob_InsertDataSimple:
err = lobPtr->writeDataSimple(source, sourceLen, subOperation, retOperLen,
bufferSize , replication , blockSize);
break;
case Lob_InsSel:
{
ExLob *srcLobPtr;
Int16 flags;
Lng32 lobNum;
Int32 lobType;
Int64 uid, inDescSyskey, descPartnKey;
short schNameLen;
char schName[ComAnsiNamePart::MAX_IDENTIFIER_EXT_LEN+1];
char sourceHandle[LOB_HANDLE_LEN] = {};
str_cpy_all(sourceHandle, source, sourceLen);
ExpLOBoper::extractFromLOBhandle(&flags, &lobType, &lobNum, &uid,
&inDescSyskey, &descPartnKey,
&schNameLen, schName,
sourceHandle);
char srcLobNameBuf[LOB_NAME_LEN];
char * srcLobName =
ExpLOBoper::ExpGetLOBname(uid, lobNum, srcLobNameBuf, LOB_NAME_LEN);
lobMap_it it2;
it2 = lobMap->find(string(srcLobName));
if (it2 == lobMap->end())
{
srcLobPtr = new (lobGlobals->getHeap())ExLob(lobGlobals->getHeap(), hdfsAccessStats);
if (srcLobPtr == NULL)
return LOB_ALLOC_ERROR;
err = srcLobPtr->initialize(srcLobName, EX_LOB_RW, lobStorageLocation, storage, hdfsServer, hdfsPort, lobStorageLocation,bufferSize, replication, blockSize,lobMaxSize,lobGlobals);
if (err != LOB_OPER_OK)
{
char buf[5000];
str_sprintf(buf,"Lob initialization failed;filename:%s;location:%s;hdfsserver:%s;lobMaxSize:%ld",srcLobName,lobStorageLocation,hdfsServer,lobMaxSize);
lobDebugInfo(buf,err,__LINE__,lobGlobals->lobTrace_);
return err;
}
lobMap->insert(pair<string, ExLob*>(string(srcLobName), srcLobPtr));
}
else
srcLobPtr = it2->second;
err = lobPtr->insertSelect(srcLobPtr,handleIn, handleInLen, source, sourceLen, retOperLen, lobMaxSize, lobMaxChunkMemSize,lobGCLimit,(char *)blackBox, blackBoxLen,handleOut,handleOutLen,subOperation,transId, lobGlobals);
if (err != LOB_OPER_OK)
{
lobDebugInfo("insertSelect failed ",err,__LINE__,lobGlobals->lobTrace_);
}
}
break;
case Lob_Read:
if (storage == Lob_External_HDFS_File)
//Allocate storage to read the lob external file name from the
//descriptor tables to get the data from.
// TODO: do we ever take this code path for Hive files?
blackBox = new(lobGlobals->getHeap()) char[MAX_LOB_FILE_NAME_LEN+6];
if (subOperation == Lob_Memory)
{
err = lobPtr->readToMem(source,sourceLen,retOperLen,handleIn,handleInLen,(char *)blackBox, blackBoxLen,handleOut,handleOutLen,transId);
if (err != LOB_OPER_OK)
{
lobDebugInfo("readToMem failed ",err,__LINE__,lobGlobals->lobTrace_);
}
}
else if (subOperation == Lob_File)
{
err = lobPtr->readToFile(source, sourceLen, retOperLen, lobMaxChunkMemSize, openType,handleIn,handleInLen,(char *)blackBox, blackBoxLen,handleOut,handleOutLen,transId);
if (err != LOB_OPER_OK)
{
lobDebugInfo("readToFile failed ",err,__LINE__,lobGlobals->lobTrace_);
}
}
else
err = LOB_SUBOPER_ERROR;
if (blackBox)
(lobGlobals->getHeap())->deallocateMemory((char*) blackBox);
break;
case Lob_GetLength:
{
err = lobPtr->getLength(handleIn, handleInLen,retOperLen,subOperation,transId);
}
break;
case Lob_GetOffset:
{
err = lobPtr->getOffset(handleIn, handleInLen,retOperLen,subOperation,transId);
}
break;
case Lob_GetFileName:
{
err = lobPtr->getFileName(handleIn, handleInLen, (char *)blackBox, blackBoxLen, subOperation, transId);
}
break;
case Lob_ReadDesc: // read desc only. Needed for pass thru.
err = lobPtr->getDesc(desc,handleIn,handleInLen,(char *)blackBox, &blackBoxLen,handleOut,handleOutLen,transId);
retOperLen = 0;
break;
case Lob_OpenCursor:
err = lobPtr->openCursor(handleIn, handleInLen,transId);
break;
case Lob_OpenDataCursorSimple:
{
size_t dataFileNameLen = strlen(lobPtr->getDataFileName());
size_t cursorIdLen = strlen(cursorId);
if (openType == 1) { // preopen
char temp1[30]; // big enough for :%Lx:
sprintf(temp1, ":%Lx:",(long long unsigned int)lobName);
char fn[dataFileNameLen + sizeof(temp1) + cursorIdLen + 1];
strcpy(fn,lobPtr->getDataFileName());
strcpy(fn + dataFileNameLen, temp1);
strcpy(fn + dataFileNameLen + strlen(temp1), cursorId);
preOpenObj = new (lobGlobals->getHeap()) ExLobPreOpen(lobPtr, fn, descNumIn, sourceLen,
cursorBytes, waited, lobGlobals->getHeap());
lobGlobals->addToPreOpenList(preOpenObj);
} else if (openType == 2) { // must open
char temp2[30]; // big enough for :%Lx:
sprintf(temp2, ":%Lx:",(long long unsigned int)lobName);
char fn[dataFileNameLen + sizeof(temp2) + cursorIdLen + 1];
strcpy(fn,lobPtr->getDataFileName());
strcpy(fn + dataFileNameLen, temp2);
strcpy(fn + dataFileNameLen + strlen(temp2), cursorId);
err = lobPtr->openDataCursor(fn, Lob_Cursor_Simple, descNumIn, sourceLen, cursorBytes, waited, lobGlobals, (Int32 *)blackBox);
} else
err = LOB_SUBOPER_ERROR;
}
break;
case Lob_ReadCursor:
if ((subOperation == Lob_Memory) || (subOperation == Lob_Buffer))
err = lobPtr->readCursor(source, sourceLen, handleIn, handleInLen, retOperLen,transId);
else if (subOperation == Lob_File)
err = lobPtr->readCursor(source, -1, handleIn, handleInLen, retOperLen,transId);
else
err = LOB_SUBOPER_ERROR;
break;
case Lob_ReadDataCursorSimple:
{
char temp3[30]; // big enough for :%Lx:
sprintf(temp3, ":%Lx:",(long long unsigned int)lobName);
size_t dataFileNameLen = strlen(lobPtr->getDataFileName());
size_t cursorIdLen = strlen(cursorId);
char fn[dataFileNameLen + sizeof(temp3) + cursorIdLen + 1];
strcpy(fn,lobPtr->getDataFileName());
strcpy(fn + dataFileNameLen, temp3);
strcpy(fn + dataFileNameLen + strlen(temp3), cursorId);
err = lobPtr->readDataCursorSimple(fn, source, sourceLen, retOperLen, lobGlobals);
}
break;
case Lob_CloseFile:
if (lobPtr->hasNoOpenCursors()) {
lobGlobals->traceMessage("Lob_CloseFile",NULL,__LINE__);
err = lobPtr->closeFile();
}
break;
case Lob_CloseCursor:
err = lobPtr->closeCursor(handleIn, handleInLen,transId);
break;
case Lob_CloseDataCursorSimple:
{
char temp4[30]; // big enough for :%Lx:
sprintf(temp4, ":%Lx:",(long long unsigned int)lobName);
size_t dataFileNameLen = strlen(lobPtr->getDataFileName());
size_t cursorIdLen = strlen(cursorId);
char fn[dataFileNameLen + sizeof(temp4) + cursorIdLen + 1];
strcpy(fn,lobPtr->getDataFileName());
strcpy(fn + dataFileNameLen, temp4);
strcpy(fn + dataFileNameLen + strlen(temp4), cursorId);
err = lobPtr->closeDataCursorSimple(fn, lobGlobals);
}
break;
case Lob_Append:
if ((subOperation == Lob_Memory) ||(subOperation == Lob_Buffer) || (subOperation ==Lob_External_File))
{
err = lobPtr->append(source, sourceLen, subOperation, descNumIn, retOperLen,lobMaxSize, lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,handleOut,handleOutLen,transId, lobGlobals);
if (err != LOB_OPER_OK)
{
lobDebugInfo("append(Memory,Buffer) failed ",err,__LINE__,lobGlobals->lobTrace_);
}
}
else if (subOperation == Lob_File)
{
err = lobPtr->append(source, -1, subOperation, descNumIn, retOperLen,lobMaxSize, lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,handleOut,handleOutLen,transId,lobGlobals);
if (err != LOB_OPER_OK)
{
lobDebugInfo("append(File) failed ",err,__LINE__,lobGlobals->lobTrace_);
}
}
else
err = LOB_SUBOPER_ERROR;
break;
case Lob_Update:
if ((subOperation == Lob_Memory)||(subOperation == Lob_Buffer)||(subOperation ==Lob_External_File))
{
err = lobPtr->update(source, sourceLen, subOperation, descNumIn, retOperLen, lobMaxSize, lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,handleOut,handleOutLen,transId, lobGlobals);
if (err != LOB_OPER_OK)
{
lobDebugInfo("update(Memory,Buffer) failed ",err,__LINE__,lobGlobals->lobTrace_);
}
}
else if (subOperation == Lob_File)
{
err = lobPtr->update(source, -1, subOperation,descNumIn, retOperLen,lobMaxSize, lobMaxChunkMemSize,lobGCLimit,handleIn,handleInLen,handleOut,handleOutLen,transId,lobGlobals);
if (err != LOB_OPER_OK)
{
lobDebugInfo("update(Memory,Buffer) failed ",err,__LINE__,lobGlobals->lobTrace_);
}
}
else
err = LOB_SUBOPER_ERROR;
break;
case Lob_Delete:
err = lobPtr->delDesc(handleIn, handleInLen,transId);
break;
case Lob_Drop:
err = lobPtr->purgeLob();
it = lobMap->find(string(lobName));
lobMap->erase(it);
NADELETE(lobPtr, ExLob,lobGlobals->getHeap()) ;
lobPtr = NULL;
if (err != LOB_OPER_OK)
lobDebugInfo("purgeLob failed ",err,__LINE__,lobGlobals->lobTrace_);
break;
case Lob_Purge:
err = lobPtr->purgeLob();
it = lobMap->find(string(lobName));
lobMap->erase(it);
NADELETE(lobPtr, ExLob,lobGlobals->getHeap()) ;
lobPtr = NULL;
if (err != LOB_OPER_OK)
lobDebugInfo("purgeLob failed ",err,__LINE__,lobGlobals->lobTrace_);
break;
case Lob_Empty_Directory:
err = lobPtr->emptyDirectory(lobStorageLocation, lobGlobals);
break;
case Lob_Data_Mod_Check:
{
Int64 inputModTS = *(Int64*)blackBox;
Int32 inputNumOfPartLevels =
*(Lng32*)&((char*)blackBox)[sizeof(inputModTS)];
Int32 * failedLocBufLen =
(Int32*)&((char*)blackBox)[sizeof(inputModTS)+
sizeof(inputNumOfPartLevels)];
char * failedLocBuf = &((char*)blackBox)[sizeof(inputModTS)+
sizeof(inputNumOfPartLevels)+
sizeof(*failedLocBufLen)];
Int64 failedModTS = -1;
err =
lobPtr->dataModCheck(lobStorageLocation,
inputModTS, inputNumOfPartLevels,
lobGlobals, failedModTS,
failedLocBuf, failedLocBufLen);
descNumOut = failedModTS;
}
break;
case Lob_Cleanup:
NADELETE(lobGlobals,ExLobGlobals, lobGlobals->getHeap());
break;
case Lob_PerformGC:
err = lobPtr->compactLobDataFile((ExLobInMemoryDescChunksEntry *)source,sourceLen);
if (err != LOB_OPER_OK)
lobDebugInfo("compactLobDataFile failed ",err,__LINE__,lobGlobals->lobTrace_);
break;
case Lob_RestoreLobDataFile:
err = lobPtr->restoreLobDataFile();
if (err != LOB_OPER_OK)
lobDebugInfo("restoreLobDataFile failed ",err,__LINE__,lobGlobals->lobTrace_);
break;
case Lob_PurgeBackupLobDataFile:
err = lobPtr->purgeBackupLobDataFile();
if (err != LOB_OPER_OK)
lobDebugInfo("purgeBackupLobDataFile failed ",err,__LINE__,lobGlobals->lobTrace_);
break;
default:
err = LOB_OPER_ERROR;
break;
}
/*
//**Note ** This code is needed to reinstate the master transaction after
// returning from the mxlobsrvr process. This is inactive code for now
if (!lobGlobals->isHive() )
{
if (lobPtr)
// set the pass thru request object values from the lob
lobPtr->getRequest()->getValues(descNumOut, handleOutLen, handleOut,
requestStatus, cliError,
(char *)blackBox, blackBoxLen); // reinstate the transaction
if (TRANSID_IS_VALID(transIdBig)) {
ms_transid_reinstate(transIdBig, transStartId);
}
}
*/
clock_gettime(CLOCK_MONOTONIC, &endTime);
secs = endTime.tv_sec - startTime.tv_sec;
nsecs = endTime.tv_nsec - startTime.tv_nsec;
if (nsecs < 0) {
secs--;
nsecs += NUM_NSECS_IN_SEC;
}
totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs;
/*
if (lobPtr && lobPtr->getStats())
lobPtr->getStats()->hdfsAccessLayerTime += totalnsecs;
*/
return err;
}
void cleanupLOBDataDescFiles(const char *lobHdfsServer,int lobHdfsPort,const char *lobHdfsLoc)
{
HDFS_Client_RetCode hdfsClientRetcode = HdfsClient::hdfsDeletePath(lobHdfsLoc);//ok to ignore error.
return;
}
// The following methods are used for hive access
/*
Main thread issues an open to open a range of 128 MB and wakes up a
worker thread. It doesn’t wait.It calls pre open on the next range. This is
done in method ::readDataCursorSimple.
The worker threads do their work in ::doWorkInThread and ::performRequests, ::readCursorDataSimple.(note the diff from the method the mainthread calls above)
Main thread then issues a read. Since worker thread had already begun fetching
16KB buffers in (1), the main thread most likely will not need to wait and the
data will be ready. It keeps consuming the buffers, recycling them back into
postFetchBufList.
When done, the main thread closes the cursor(::closeDataCursorSimple). This is determined by whether we
have reached the end of range or the end of data for that file.
The worker threads on the other hand read 16KB of data and buffers them in a
prefetchBufList. It continues doing this until end of range is reached or the
buffer limit (128MB) has been reached.
*/
Ex_Lob_Error ExLob::readDataCursorSimple(const char *file, char *tgt, Int64 tgtSize,
Int64 &operLen, ExLobGlobals *lobGlobals)
{
int dataOffset;
Ex_Lob_Error result = LOB_OPER_OK;
cursor_t *cursor;
ExLobCursor::bufferList_t::iterator c_it;
ExLobCursorBuffer *buf = NULL;
Int64 bytesToCopy = 0;
operLen = 0;
Int64 len;
char *target = tgt;
bool done = false;
struct timespec startTime;
struct timespec endTime;
lobCursorLock_.lock();
lobCursors_it it = lobCursors_.find(string(file, strlen(file)));
if (it == lobCursors_.end())
{
lobCursorLock_.unlock();
return LOB_CURSOR_NOT_OPEN;
}
else
{
cursor = &(it->second);
}
lobCursorLock_.unlock();
while ((operLen < tgtSize) && !done && !cursor->eol_)
{
lobGlobals->traceMessage("locking cursor",cursor,__LINE__);
cursor->lock_.lock();
// if no buffers to read and is eor or eod, we are done.
// else wait for prefetch thread to wake us up.
if (cursor->prefetchBufList_.size() == 0) {
if (cursor->eor_ || cursor->eod_) {
done = true;
} else {
cursor->bufferMisses_++;
lobGlobals->traceMessage("wait on condition cursor",cursor,__LINE__);
cursor->lock_.wait();
}
lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
cursor->lock_.unlock();
continue;
}
// a buffer is available
c_it = cursor->prefetchBufList_.begin();
buf = *c_it;
lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
cursor->lock_.unlock();
bytesToCopy = min(buf->bytesRemaining_, tgtSize - operLen);
memcpy(target, buf->data_ + buf->bytesUsed_, bytesToCopy);
target += bytesToCopy;
if (bytesToCopy == buf->bytesRemaining_) { // buffer is now empty
buf->bytesRemaining_ = -1;
buf->bytesUsed_ = -1;
lobGlobals->postfetchBufListLock_.lock();
lobGlobals->postfetchBufList_.push_back(buf);
lobGlobals->postfetchBufListLock_.unlock();
lobGlobals->traceMessage("locking cursor",cursor,__LINE__);
cursor->lock_.lock();
c_it = cursor->prefetchBufList_.erase(c_it);
lobGlobals->traceMessage("signal condition cursor",cursor,__LINE__);
cursor->lock_.wakeOne(); // wake up prefetch thread if it was waiting for an empty buffer.
lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
cursor->lock_.unlock();
} else {
buf->bytesUsed_ += bytesToCopy;
buf->bytesRemaining_ -= bytesToCopy;
}
//stats_.bytesPrefetched += bytesToCopy;
operLen += bytesToCopy;
}
/*
// update stats
stats_.bytesRead += operLen;
stats_.bytesToRead += tgtSize;
stats_.numReadReqs++;
*/
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::closeDataCursorSimple(const char *fileName, ExLobGlobals *lobGlobals)
{
cursor_t *cursor = NULL;
Int64 secs = 0;
Int64 nsecs = 0;
lobCursorLock_.lock();
lobCursors_it it = lobCursors_.find(string(fileName, strlen(fileName)));
if (it != lobCursors_.end())
{
cursor = &(it->second);
lobGlobals->traceMessage("locking cursor",cursor,__LINE__);
cursor->lock_.lock();
clock_gettime(CLOCK_MONOTONIC, &cursor->closeTime_);
secs = cursor->closeTime_.tv_sec - cursor->openTime_.tv_sec;
nsecs = cursor->closeTime_.tv_nsec - cursor->openTime_.tv_nsec;
if (cursor->eod_ || cursor->eor_) { // prefetch thread already done,
cursor->emptyPrefetchList(lobGlobals);
lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
cursor->lock_.unlock();
lobCursors_.erase(it); // so erase it here.
// no need to unlock as cursor object is gone.
} else {
cursor->eol_ = true; // prefetch thread will do the eol rituals
lobGlobals->traceMessage("signal condition cursor",cursor,__LINE__);
cursor->lock_.wakeOne(); // wakeup prefetch thread
lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
cursor->lock_.unlock();
}
}
lobCursorLock_.unlock();
if (nsecs < 0) {
secs--;
nsecs += NUM_NSECS_IN_SEC;
}
Int64 totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs;
//stats_.cursorElapsedTime += totalnsecs;
return LOB_OPER_OK;
}
Ex_Lob_Error ExLobGlobals::performRequest(ExLobHdfsRequest *request)
{
Ex_Lob_Error err = LOB_OPER_OK;
ExLob *lobPtr;
ExLobCursorBuffer *buf;
ExLobCursor *cursor;
Int64 size;
NABoolean seenEOR = false;
NABoolean seenEOD = false;
ExLobCursor::bufferList_t::iterator c_it;
Int64 totalBufSize;
switch (request->reqType_)
{
case Lob_Hdfs_Cursor_Prefetch :
lobPtr = request->lobPtr_;
cursor = request->cursor_;
traceMessage("locking cursor",cursor,__LINE__);
cursor->lock_.lock();
while (!cursor->eod_ && !cursor->eor_ && !cursor->eol_)
{
postfetchBufListLock_.lock();
c_it = postfetchBufList_.begin();
if (c_it != postfetchBufList_.end()) {
buf = *c_it;
postfetchBufList_.erase(c_it);
postfetchBufListLock_.unlock();
traceMessage("unlocking cursor",cursor,__LINE__);
cursor->lock_.unlock();
} else {
postfetchBufListLock_.unlock();
// there are no empty buffers.
// if prefetch list already has the max, wait for one to free up.
totalBufSize = cursor->prefetchBufList_.size() * cursor->bufMaxSize_;
if (totalBufSize > LOB_CURSOR_PREFETCH_BYTES_MAX) {
traceMessage("wait on condition cursor",cursor,__LINE__);
cursor->lock_.wait();
char buffer2[2048];
sprintf(buffer2, "cursor->eod_ %d cursor->eor_ %d "
"cursor->eol_ %d", cursor->eod_,
cursor->eor_, cursor->eol_);
traceMessage(buffer2, cursor, __LINE__);
continue;
}
// create a new buffer
traceMessage("unlocking cursor",cursor,__LINE__);
cursor->lock_.unlock();
buf = new (getHeap()) ExLobCursorBuffer();
buf->data_ = (char *) (getHeap())->allocateMemory( cursor->bufMaxSize_);
//lobPtr->stats_.buffersUsed++;
}
size = min(cursor->bufMaxSize_, (cursor->maxBytes_ - cursor->bytesRead_));
if (buf->data_) {
lobPtr->readCursorDataSimple(buf->data_, size, *cursor, buf->bytesRemaining_);
buf->bytesUsed_ = 0;
traceMessage("locking cursor",cursor,__LINE__);
cursor->lock_.lock();
if (size < (cursor->bufMaxSize_)) {
cursor->eor_ = true;
seenEOR = true;
}
if (buf->bytesRemaining_) {
cursor->prefetchBufList_.push_back(buf);
traceMessage("signal condition cursor",cursor,__LINE__);
cursor->lock_.wakeOne();
traceMessage("unlocking cursor",cursor,__LINE__);
cursor->lock_.unlock();
} else {
cursor->eod_ = true;
seenEOD = true;
traceMessage("signal condition cursor",cursor,__LINE__);
cursor->lock_.wakeOne();
traceMessage("unlocking cursor",cursor,__LINE__);
cursor->lock_.unlock();
postfetchBufListLock_.lock();
postfetchBufList_.push_back(buf);
postfetchBufListLock_.unlock();
}
} else {
assert("data_ is null");
}
// Important! Break and do not access cursor object if we have reached
// end of data or range.
// The main thread could have destroyed the cursor
// in ::closeDataCursorSimple
if (seenEOD || seenEOR)
{
char buffer2[2048];
sprintf(buffer2, "seenEOD %d seenEOR %d",
seenEOD, seenEOR);
traceMessage(buffer2, cursor, __LINE__);
break;
}
traceMessage("locking cursor",cursor,__LINE__);
cursor->lock_.lock();
} // while
if (!seenEOD && !seenEOR)
{
traceMessage("locking cursor",cursor,__LINE__);
cursor->lock_.unlock();
if (cursor->eol_) { // never reaches here ??
lobPtr->deleteCursor(cursor->name_.c_str(), this);
}
}
processPreOpens();
break;
default:
request->error_ = LOB_HDFS_REQUEST_UNKNOWN;
}
return LOB_OPER_OK;
}
Ex_Lob_Error ExLob::readCursorDataSimple(char *tgt, Int64 tgtSize, cursor_t &cursor, Int64 &operLen)
{
ExLobDesc desc;
Ex_Lob_Error err;
Int64 bytesAvailable = 0;
Int64 bytesToCopy = 0;
Int64 bytesRead = 0;
operLen = 0;
tOffset offset;
struct timespec startTime;
struct timespec endTime;
bool done = false;
if (!fdData_) {
return LOB_CURSOR_NOT_OPEN_ERROR;
}
if (cursor.bytesRead_ == -1) { // starting
cursor.bytesRead_ = 0;
}
clock_gettime(CLOCK_MONOTONIC, &startTime);
while ( (operLen < tgtSize) && !done )
{
//offset = cursor.descOffset_ + cursor.bytesRead_;
bytesToCopy = tgtSize - operLen;
offset = cursor.descOffset_ + cursor.bytesRead_;
// gets chunks of 64KB. Uses readDirect internally.
// bytesRead = hdfsPread(fs_, fdData_, offset, tgt, bytesToCopy);
bytesRead = hdfsRead(fs_, fdData_, tgt, bytesToCopy);
//stats_.numHdfsReqs++;
if (bytesRead == -1) {
return LOB_DATA_READ_ERROR;
} else if (bytesRead == 0) {
done = true;
}
cursor.bytesRead_ += bytesRead;
operLen += bytesRead;
tgt += bytesRead;
}
clock_gettime(CLOCK_MONOTONIC, &endTime);
Int64 secs = endTime.tv_sec - startTime.tv_sec;
Int64 nsecs = endTime.tv_nsec - startTime.tv_nsec;
if (nsecs < 0) {
secs--;
nsecs += NUM_NSECS_IN_SEC;
}
Int64 totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs;
//stats_.CumulativeReadTime += totalnsecs;
return LOB_OPER_OK;
}
void ExLobCursor::emptyPrefetchList(ExLobGlobals *lobGlobals)
{
ExLobCursor::bufferList_t::iterator c_it;
ExLobCursorBuffer *buf = NULL;
c_it = prefetchBufList_.begin();
while (c_it != prefetchBufList_.end())
{
buf = *c_it;
lobGlobals->postfetchBufListLock_.lock();
lobGlobals->postfetchBufList_.push_back(buf);
lobGlobals->postfetchBufListLock_.unlock();
c_it = prefetchBufList_.erase(c_it);
}
}
// Seems like this is currently unused.
// closeDataCusrorSimple takes care of destroying the cursor.But addign code
// similar to closeDataCursorSimple for correctness in case it is used in future
Ex_Lob_Error ExLob::deleteCursor(const char *cursorName, ExLobGlobals *lobGlobals)
{
cursor_t *cursor = NULL;
lobCursorLock_.lock();
lobCursors_it it = lobCursors_.find(string(cursorName, strlen(cursorName)));
if (it != lobCursors_.end())
{
cursor = &(it->second);
lobGlobals->traceMessage("locking cursor",cursor,__LINE__);
cursor->lock_.lock();
cursor->emptyPrefetchList(lobGlobals);
lobGlobals->traceMessage("unlocking cursor",cursor,__LINE__);
cursor->lock_.unlock();
lobCursors_.erase(it);
}
lobCursorLock_.unlock();
return LOB_OPER_OK;
}
//*** Note - sample code to send and receive
Ex_Lob_Error ExLob::sendReqToLobServer()
{
Ex_Lob_Error err;
return err;
}
///////////////////////////////////////////////////////////////////////////////
// ExLobGlobals definitions
///////////////////////////////////////////////////////////////////////////////
ExLobGlobals::ExLobGlobals(NAHeap *lobHeap) :
lobMap_(NULL),
fs_(NULL),
isCliInitialized_(FALSE),
threadTraceFile_(NULL),
lobTrace_(FALSE),
numWorkerThreads_(0),
heap_(lobHeap),
useLibHdfs_(FALSE)
{
//initialize the log file
if (getenv("TRACE_HDFS_THREAD_ACTIONS"))
{
char logFileName[50]= "";
sprintf(logFileName,"trace_threads.%d",getpid());
threadTraceFile_ = fopen(logFileName,"a");
}
if(getenv("TRACE_LOB_ACTIONS"))
lobTrace_ = TRUE;
}
ExLobGlobals::~ExLobGlobals()
{
ExLobCursor::bufferList_t::iterator c_it;
ExLobCursorBuffer *buf = NULL;
if (numWorkerThreads_ > 0) {
for (int i=0; numWorkerThreads_-i > 0 && i < NUM_WORKER_THREADS; i++) {
QRLogger::log(CAT_SQL_EXE, LL_DEBUG, 0, NULL,
"Worker Thread Shutdown Requested %ld ",
threadId_[i]);
enqueueShutdownRequest();
}
for (int i=0; numWorkerThreads_ > 0 && i < NUM_WORKER_THREADS; i++) {
pthread_join(threadId_[i], NULL);
QRLogger::log(CAT_SQL_EXE, LL_DEBUG, 0, NULL,
"Worker Thread Completed %ld ",
threadId_[i]);
numWorkerThreads_--;
}
}
//Free the preOpenList AFTER the worker threads have left to avoid the
//case where a slow worker thread is still processing a preOpen and
//may access the preOpenList.
preOpenListLock_.lock();
ExLobPreOpen *po = NULL;
preOpenList_t::iterator p_it;
p_it = preOpenList_.begin();
while (p_it != preOpenList_.end())
{
po = *p_it;
NADELETE(po,ExLobPreOpen,heap_);
p_it = preOpenList_.erase(p_it);
}
preOpenListLock_.unlock();
//Free the request list
ExLobHdfsRequest *request;
reqList_t::iterator it;
reqQueueLock_.lock();
it = reqQueue_.begin();
while (it != reqQueue_.end())
{
request = *it;
NADELETE(request,ExLobHdfsRequest,heap_);
it = reqQueue_.erase(it);
}
reqQueueLock_.unlock();
// Free the post fetch bugf list AFTER the worker threads have left to
// avoid slow worker thread being stuck and master deallocating these
// buffers and not consuming the buffers which could cause a lock.
postfetchBufListLock_.lock();
c_it = postfetchBufList_.begin();
while (c_it != postfetchBufList_.end()) {
buf = *c_it;
if (buf->data_) {
heap_->deallocateMemory( buf->data_);
}
c_it = postfetchBufList_.erase(c_it);
}
postfetchBufListLock_.unlock();
//delete the lobMap AFTER the worker threads have finished their pending
//work since they may still be using an objetc that was fetched off the lobMap_
if (lobMap_)
{
lobMap_it it2;
for (it2 = lobMap_->begin(); it2 != lobMap_->end() ; ++it2)
{
ExLob *lobPtr = it2->second;
NADELETE(lobPtr, ExLob, heap_);
}
lobMap_->clear();
NADELETE(lobMap_,lobMap_t,heap_);
lobMap_ = NULL;
}
//msg_mon_close_process(&serverPhandle);
if (threadTraceFile_)
fclose(threadTraceFile_);
threadTraceFile_ = NULL;
}
// called once per process
Ex_Lob_Error ExLobGlobals::initialize()
{
Ex_Lob_Error err = LOB_OPER_OK;
lobMap_ = (lobMap_t *) new (getHeap())lobMap_t;
if (lobMap_ == NULL)
return LOB_INIT_ERROR;
return err;
}
static void *workerThreadMain(void *arg)
{
// parameter passed to the thread is an instance of the ExLobHdfs object
ExLobGlobals *glob = (ExLobGlobals *)arg;
glob->doWorkInThread();
return NULL;
}
Ex_Lob_Error ExLobGlobals::startWorkerThreads()
{
int rc;
for (int i=0; i<NUM_WORKER_THREADS; i++) {
rc = pthread_create(&threadId_[i], NULL, workerThreadMain, this);
if (rc != 0)
return LOB_HDFS_THREAD_CREATE_ERROR;
QRLogger::log(CAT_SQL_EXE, LL_DEBUG, 0, NULL,
"Worker Thread Created %ld ",
threadId_[i]);
numWorkerThreads_++;
}
return LOB_OPER_OK;
}
///////////////////////////////////////////////////////////////////////////////
// ExLobHdfs definitions
///////////////////////////////////////////////////////////////////////////////
ExLobLock::ExLobLock()
: bellRang_(false),
waiters_(0)
{
pthread_mutexattr_t mutexAttr;
pthread_mutexattr_init( &mutexAttr );
pthread_mutex_init( &mutex_, &mutexAttr );
pthread_cond_init( &workBell_, NULL );
}
ExLobLock::~ExLobLock()
{
pthread_mutex_unlock( &mutex_ );
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&workBell_);
}
void ExLobLock::lock()
{
pthread_mutex_lock( &mutex_ );
}
void ExLobLock::unlock()
{
pthread_mutex_unlock( &mutex_ );
}
void ExLobLock::wakeOne()
{
pthread_cond_signal(&workBell_);
}
void ExLobLock::wakeAll()
{
pthread_cond_broadcast(&workBell_);
}
void ExLobLock::wait()
{
waiters_++;
pthread_cond_wait(&workBell_, &mutex_);
waiters_--;
}
ExLobHdfsRequest::ExLobHdfsRequest(LobsHdfsRequestType reqType, ExLobCursor *cursor) :
reqType_(reqType),
cursor_(cursor)
{
buffer_=0;
lobPtr_=0;
size_=0;
error_=LOB_OPER_OK;
}
ExLobHdfsRequest::ExLobHdfsRequest(LobsHdfsRequestType reqType, ExLob *lobPtr, ExLobCursor *cursor) :
reqType_(reqType),
lobPtr_(lobPtr),
cursor_(cursor)
{
buffer_=0;
size_=0;
error_=LOB_OPER_OK;
}
ExLobHdfsRequest::ExLobHdfsRequest(LobsHdfsRequestType reqType) :
reqType_(reqType)
{
buffer_=0;
cursor_=0;
lobPtr_=0;
size_=0;
error_=LOB_OPER_OK;
}
ExLobHdfsRequest::~ExLobHdfsRequest()
{
}
Ex_Lob_Error ExLobGlobals::enqueueRequest(ExLobHdfsRequest *request)
{
char buffer2[2048];
sprintf(buffer2, "enqueue request %d", request->reqType_);
traceMessage(buffer2, NULL, __LINE__);
reqQueueLock_.lock();
reqQueue_.push_back(request);
reqQueueLock_.wakeOne();
reqQueueLock_.unlock();
return LOB_OPER_OK;
}
Ex_Lob_Error ExLobGlobals::enqueuePrefetchRequest(ExLob *lobPtr, ExLobCursor *cursor)
{
ExLobHdfsRequest *request = new (heap_) ExLobHdfsRequest(Lob_Hdfs_Cursor_Prefetch, lobPtr, cursor);
if (!request) {
// return error
}
enqueueRequest(request);
return LOB_OPER_OK;
}
Ex_Lob_Error ExLobGlobals::enqueueShutdownRequest()
{
ExLobHdfsRequest *request = new (heap_) ExLobHdfsRequest(Lob_Hdfs_Shutdown);
if (!request) {
// return error
}
enqueueRequest(request);
return LOB_OPER_OK;
}
ExLobHdfsRequest* ExLobGlobals::getHdfsRequest()
{
ExLobHdfsRequest *request;
reqList_t::iterator it;
reqQueueLock_.lock();
it = reqQueue_.begin();
request = NULL;
while(request == NULL)
{
if (it != reqQueue_.end())
{
request = *it;
it = reqQueue_.erase(it);
} else {
reqQueueLock_.wait();
it = reqQueue_.begin();
}
}
reqQueueLock_.unlock();
char buffer2[2048];
sprintf(buffer2, "got request %d", request->reqType_);
traceMessage(buffer2, NULL, __LINE__);
return request;
}
void ExLobGlobals::doWorkInThread()
{
ExLobHdfsRequest *request;
// mask all signals
struct sigaction act;
sigemptyset(&act.sa_mask);
sigset_t mask;
sigfillset(&mask);
int rc = pthread_sigmask(SIG_SETMASK, &mask, NULL);
if (rc != 0) {
return;
}
// enter processing zone
for (;;)
{
request = getHdfsRequest(); // will wait until new req arrives
if (request->isShutDown()) {
//we are asked to shutdown
//wake up next worker before going away
reqQueueLock_.lock();
reqQueueLock_.wakeOne();
reqQueueLock_.unlock();
break;
}
else {
performRequest(request);
NADELETE(request, ExLobHdfsRequest, heap_);
}
}
pthread_exit(0);
}
Ex_Lob_Error ExLobGlobals::addToPreOpenList(ExLobPreOpen *preOpenObj)
{
preOpenListLock_.lock();
preOpenList_.push_back(preOpenObj);
preOpenListLock_.unlock();
return LOB_OPER_OK;
}
Ex_Lob_Error ExLobGlobals::processPreOpens()
{
ExLobPreOpen *preOpenObj = NULL;
preOpenList_t::iterator p_it;
preOpenListLock_.lock();
if (!preOpenList_.empty())
{
p_it = preOpenList_.begin();
preOpenObj = *p_it;
preOpenList_.erase(p_it);
}
preOpenListLock_.unlock();
if (preOpenObj != NULL)
{
ExLob *lobPtr = preOpenObj->lobPtr_;
lobPtr->openDataCursor(preOpenObj->cursorName_.data(), Lob_Cursor_Simple, preOpenObj->range_,
preOpenObj->bufMaxSize_, preOpenObj->maxBytes_,
preOpenObj->waited_, this,0);
}
return LOB_OPER_OK;
}
//Enable envvar TRACE_HDFS_THREAD_ACTIONS to enable tracing.
//The output file will be named trace_threads.<pid> on ech node
void ExLobGlobals::traceMessage(const char *logMessage, ExLobCursor *cursor,
int line)
{
if ( threadTraceFile_ && logMessage)
{
fprintf(threadTraceFile_,
"Thread: 0x%lx Line: %d %s 0x%lx\n" ,
(unsigned long)pthread_self(), line, logMessage,
(unsigned long) cursor);
fflush(threadTraceFile_);
}
}
//Enable envvar TRACE_LOB_ACTIONS to enable tracing.
//The output file will be in
//$TRAF_HOME/logs directory on each node
void lobDebugInfo(const char *logMessage,Int32 errorcode,
Int32 line, NABoolean lobTrace)
{
if ( lobTrace)
{
NAString logString("LOB : ");
logString += logMessage;
SQLMXLoggingArea::logSQLMXDebugEvent(logString.data(),(short)errorcode,line);
}
}
// ExLobRequest definitions
///////////////////////////////////////////////////////////////////////////////
ExLobRequest::ExLobRequest() :
reqNum_(0),
descNumIn_(-1),
descNumOut_(-1),
handleInLen_(-1),
handleOutLen_(-1),
dataOffset_(-1),
type_(Lob_Req_Invalid),
storage_(Lob_Invalid_Storage),
operLen_(-1),
error_(LOB_INVALID_ERROR_VAL),
cliError_(-1),
status_(LOB_INVALID_ERROR_VAL),
transId_(0)
{
TRANSID_SET_NULL(transIdBig_);
}
void ExLobRequest::setValues(char *descFileName, Int64 descNumIn, Int64 handleInLen,
char *handleIn, LobsStorage storage, Int64 transId,
SB_Transid_Type transIdBig,
SB_Transseq_Type transStartId,
char *blackBox, Int64 blackBoxLen)
{
descNumIn_ = descNumIn;
handleInLen_ = handleInLen;
storage_ = storage;
strcpy(descFileName_, descFileName);
if (handleIn != NULL && handleInLen > 0) {
memcpy(handleIn_, handleIn, handleInLen);
}
cliError_ = -1;
error_ = LOB_INVALID_ERROR_VAL;
status_ = LOB_INVALID_ERROR_VAL;
transId_ = transId;
transIdBig_ = transIdBig;
transStartId_ = transStartId;
blackBoxLen_ = blackBoxLen;
if (blackBox != NULL && blackBoxLen > 0) {
memcpy(blackBox_, blackBox, blackBoxLen);
}
}
void ExLobRequest::getValues(Int64 &descNumOut, Int64 &handleOutLen,
char *handleOut, Ex_Lob_Error &requestStatus,
Int64 &cliError,
char *blackBox, Int64 &blackBoxLen)
{
descNumOut = descNumOut_;
handleOutLen = handleOutLen_;
requestStatus = error_;
cliError = cliError_;
if (handleOut != NULL && handleOutLen_ > 0) {
memcpy(handleOut, handleOut_, handleOutLen_);
}
blackBoxLen = blackBoxLen_;
if (blackBox != NULL && blackBoxLen_ > 0) {
memcpy(blackBox, blackBox_, blackBoxLen_);
}
// #endif
}
ExLobRequest::~ExLobRequest()
{
}
Ex_Lob_Error ExLobRequest::send()
{
int msgid;
int oid;
MS_Result_Type result;
short req_ctrl[BUFSIZ];
short rep_ctrl[BUFSIZ];
char *req_data = (char *)this;
ExLobRequest rep_data;
short req_data_len = sizeof(ExLobRequest);
short rep_data_max = sizeof(ExLobRequest);
int err=0;
int inx=0;
int retries = 3;
incrReqNum();
status_ = LOB_OPER_REQ_IN_PROGRESS;
do
{
err = BMSG_LINK_(&serverPhandle,
&msgid,
req_ctrl,
(ushort) (inx &1),
rep_ctrl,
1,
req_data,
req_data_len,
(char *)&rep_data,
rep_data_max,
0,0,0,0);
retries--;
err = BMSG_BREAK_(msgid, (short *) &result, &serverPhandle);
if (err == -XZFIL_ERR_PATHDOWN) {
//lobGlobals>resetServerPhandle();
}
} while ( (err == XZFIL_ERR_PATHDOWN) && (retries > 0) ); // 201 if lobserver got restared
status_ = LOB_OPER_REQ_DONE;
if (err != XZFIL_ERR_OK)
return LOB_SEND_MSG_ERROR;
memcpy(this, &rep_data, rep_data_max);
return LOB_OPER_OK;
}