blob: aff2945dc033d6a130b44c5fc063285acb7a6dfb [file] [log] [blame]
// **********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
// **********************************************************************
#include "SequenceFileReader.h"
#include "QRLogger.h"
#include "Globals.h"
// ===========================================================================
// ===== Class SequenceFileReader
// ===========================================================================
JavaMethodInit* SequenceFileReader::JavaMethods_ = NULL;
jclass SequenceFileReader::javaClass_ = 0;
bool SequenceFileReader::javaMethodsInitialized_ = false;
pthread_mutex_t SequenceFileReader::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER;
static const char* const sfrErrorEnumStr[] =
{
"No more data."
,"JNI NewStringUTF() in initSerDe()"
,"Java exception in initSerDe()"
,"JNI NewStringUTF() in open()"
,"Java exception in open()"
,"Java exception in getPos()"
,"Java exception in seeknSync()"
,"Java exception in isEOF()"
,"Java exception in fetchNextRow()"
,"Java exception in close()"
};
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
char* SequenceFileReader::getErrorText(SFR_RetCode errEnum)
{
if (errEnum < (SFR_RetCode)JOI_LAST)
return JavaObjectInterface::getErrorText((JOI_RetCode)errEnum);
else
return (char*)sfrErrorEnumStr[errEnum-JOI_LAST];
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SequenceFileReader::~SequenceFileReader()
{
close();
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFR_RetCode SequenceFileReader::init()
{
static char className[]="org/trafodion/sql/SequenceFileReader";
SFR_RetCode rc;
if (javaMethodsInitialized_)
return (SFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
else
{
pthread_mutex_lock(&javaMethodsInitMutex_);
if (javaMethodsInitialized_)
{
pthread_mutex_unlock(&javaMethodsInitMutex_);
return (SFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
}
JavaMethods_ = new JavaMethodInit[JM_LAST];
JavaMethods_[JM_CTOR ].jm_name = "<init>";
JavaMethods_[JM_CTOR ].jm_signature = "()V";
// JavaMethods_[JM_INITSERDE ].jm_name = "initSerDe";
// JavaMethods_[JM_INITSERDE ].jm_signature = "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V";
JavaMethods_[JM_GETERROR ].jm_name = "getLastError";
JavaMethods_[JM_GETERROR ].jm_signature = "()Ljava/lang/String;";
JavaMethods_[JM_OPEN ].jm_name = "open";
JavaMethods_[JM_OPEN ].jm_signature = "(Ljava/lang/String;)Ljava/lang/String;";
JavaMethods_[JM_GETPOS ].jm_name = "getPosition";
JavaMethods_[JM_GETPOS ].jm_signature = "()J";
JavaMethods_[JM_SYNC ].jm_name = "seeknSync";
JavaMethods_[JM_SYNC ].jm_signature = "(J)Ljava/lang/String;";
JavaMethods_[JM_ISEOF ].jm_name = "isEOF";
JavaMethods_[JM_ISEOF ].jm_signature = "()Z";
// JavaMethods_[JM_FETCHCOLS ].jm_name = "fetchArrayOfColumns";
// JavaMethods_[JM_FETCHCOLS ].jm_signature = "()[Ljava/lang/String;";
JavaMethods_[JM_FETCHROW ].jm_name = "fetchNextRow";
JavaMethods_[JM_FETCHROW ].jm_signature = "()Ljava/lang/String;";
JavaMethods_[JM_FETCHROW2 ].jm_name = "fetchNextRow";
JavaMethods_[JM_FETCHROW2 ].jm_signature = "(J)Ljava/lang/String;";
JavaMethods_[JM_FETCHBUFF1].jm_name = "fetchArrayOfRows";
JavaMethods_[JM_FETCHBUFF1].jm_signature = "(I)[Ljava/lang/String;";
JavaMethods_[JM_FETCHBUFF2].jm_name = "fetchArrayOfRows";
JavaMethods_[JM_FETCHBUFF2].jm_signature = "(II)[Ljava/lang/String;";
JavaMethods_[JM_CLOSE ].jm_name = "close";
JavaMethods_[JM_CLOSE ].jm_signature = "()Ljava/lang/String;";
setHBaseCompatibilityMode(FALSE);
rc = (SFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
javaMethodsInitialized_ = TRUE;
pthread_mutex_unlock(&javaMethodsInitMutex_);
}
return rc;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
//SFR_RetCode SequenceFileReader::initSerDe(int numColumns,
// const char* fieldDelim,
// const char* columns,
// const char* colTypes,
// const char* nullFormat)
//{
// char numColumns_s[10];
// sprintf(numColumns_s, "%d", numColumns);
// jstring js_numColumns = jenv_->NewStringUTF(numColumns_s);
// if (js_numColumns == NULL)
// return SFR_ERROR_INITSERDE_PARAMS;
//
// jstring js_delim = jenv_->NewStringUTF(fieldDelim);
// if (js_delim == NULL)
// return SFR_ERROR_INITSERDE_PARAMS;
//
// jstring js_columns = jenv_->NewStringUTF(columns);
// if (js_columns == NULL)
// return SFR_ERROR_INITSERDE_PARAMS;
//
// jstring js_colTypes = jenv_->NewStringUTF(colTypes);
// if (js_colTypes == NULL)
// return SFR_ERROR_INITSERDE_PARAMS;
//
// jstring js_nullFormat = jenv_->NewStringUTF(nullFormat);
// if (js_nullFormat == NULL)
// return SFR_ERROR_INITSERDE_PARAMS;
//
// jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_INITSERDE].methodID, js_numColumns, js_delim, js_columns, js_colTypes, js_nullFormat);
//
// jenv_->DeleteLocalRef(js_numColumns);
// jenv_->DeleteLocalRef(js_delim);
// jenv_->DeleteLocalRef(js_columns);
// jenv_->DeleteLocalRef(js_colTypes);
// jenv_->DeleteLocalRef(js_nullFormat);
//
// if (jenv_->ExceptionCheck())
// {
// //jenv_->ExceptionDescribe();
// jenv_->ExceptionClear();
// return SFR_ERROR_INITSERDE_EXCEPTION;
// }
//
// return SFR_OK;
//}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFR_RetCode SequenceFileReader::open(const char* path)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::open(%s) called.", path);
jstring js_path = jenv_->NewStringUTF(path);
if (js_path == NULL)
return SFR_ERROR_OPEN_PARAM;
// String open(java.lang.String);
tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_OPEN].methodID, js_path);
jenv_->DeleteLocalRef(js_path);
if (jresult != NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::open()", jresult);
return SFR_ERROR_OPEN_EXCEPTION;
}
return SFR_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFR_RetCode SequenceFileReader::getPosition(Int64& pos)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::getPosition(%ld) called.", pos);
// long getPosition();
tsRecentJMFromJNI = JavaMethods_[JM_GETPOS].jm_full_name;
Int64 result = jenv_->CallLongMethod(javaObj_, JavaMethods_[JM_GETPOS].methodID);
if (result == -1)
{
logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::getPosition()", getLastError());
return SFR_ERROR_GETPOS_EXCEPTION;
}
pos = result;
return SFR_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFR_RetCode SequenceFileReader::seeknSync(Int64 pos)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::seeknSync(%ld) called.", pos);
// String seeknSync(long);
tsRecentJMFromJNI = JavaMethods_[JM_SYNC].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_SYNC].methodID, pos);
if (jresult != NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::seeknSync()", jresult);
return SFR_ERROR_SYNC_EXCEPTION;
}
return SFR_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFR_RetCode SequenceFileReader::isEOF(bool& isEOF)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::isEOF() called.");
// boolean isEOF();
tsRecentJMFromJNI = JavaMethods_[JM_ISEOF].jm_full_name;
bool result = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_ISEOF].methodID);
isEOF = result;
return SFR_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
//char** SequenceFileReader::fetchArrayOfColumns()
//{
// // java.lang.String[] fetchArrayOfColumns();
// jobjectArray jresult = (jobjectArray)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHCOLS].methodID);
// if (jenv_->ExceptionCheck())
// {
// jenv_->ExceptionDescribe();
// jenv_->ExceptionClear();
// jenv_->DeleteLocalRef(jresult);
// return NULL;
// }
//
// if (jresult == NULL)
// {
// return NULL;
// }
//
// return SequenceFileReader::JStringArray2CharsArray(jresult);
//}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFR_RetCode SequenceFileReader::fetchNextRow(Int64 stopOffset, char* buffer)
{
// java.lang.String fetchNextRow(long stopOffset);
tsRecentJMFromJNI = JavaMethods_[JM_FETCHROW2].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID, stopOffset);
if (jresult==NULL && getLastError())
{
logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::fetchNextRow()", getLastError());
return SFR_ERROR_FETCHROW_EXCEPTION;
}
if (jresult == NULL)
{
return SFR_NOMORE;
}
const char* char_result = jenv_->GetStringUTFChars(jresult, 0);
strcpy(buffer, char_result);
jenv_->ReleaseStringUTFChars(jresult, char_result);
jenv_->DeleteLocalRef(jresult);
return SFR_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFR_RetCode SequenceFileReader::close()
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::close() called.");
if (javaObj_ == NULL)
{
// Maybe there was an initialization error.
return SFR_OK;
}
// String close();
tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_CLOSE].methodID);
if (jresult!=NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::close()", jresult);
return SFR_ERROR_CLOSE_EXCEPTION;
}
return SFR_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
jstring SequenceFileReader::getLastError()
{
// String getLastError();
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_GETERROR].methodID);
return jresult;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
//char** SequenceFileReader::JStringArray2CharsArray(jobjectArray jarray)
//{
// char **chars_array;
// int len;
// int i;
// const char *ret_val;
// jstring jst_ret;
//
// len = jenv_->GetArrayLength(jarray);
//
// chars_array = (char **)malloc(sizeof(char*) * (len + 1));
// if (chars_array == NULL)
// {
// //TRACE(stderr, "<%s:%d> malloc() failed\n", __FILE__, __LINE__);
// return NULL;
// }
//
// for (i = 0; i < len; i++)
// {
// ret_val = "";
//
// jst_ret = (jstring)jenv_->GetObjectArrayElement(jarray, i);
// if (jst_ret != NULL)
// {
// ret_val = jenv_->GetStringUTFChars(jst_ret, 0);
// }
//
// //TRACE(stderr, "<%s:%d> %d) => %s\n", __FILE__, __LINE__, i, ret_val);
//
// chars_array[i] = strdup(ret_val);
// if (chars_array[i] == NULL)
// {
// // TODO: Add error handling
// return NULL;
// }
//
// if (jst_ret != NULL)
// {
// jenv_->ReleaseStringUTFChars(jst_ret, ret_val);
// jenv_->DeleteLocalRef(jst_ret);
// }
// }
// chars_array[i] = NULL;
// jenv_->DeleteLocalRef(jarray);
//
// return chars_array;
//}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFR_RetCode SequenceFileReader::fetchRowsIntoBuffer(Int64 stopOffset,
char* buffer,
Int64 buffSize,
Int64& bytesRead,
char rowDelimiter)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::fetchRowsIntoBuffer(stopOffset: %ld, buffSize: %ld) called.", stopOffset, buffSize);
Int32 maxRowLength = 0;
char* pos = buffer;
Int64 limit = buffSize;
SFR_RetCode retCode;
long rowsRead=0;
bytesRead = 0;
do
{
retCode = fetchNextRow(stopOffset, pos);
if (retCode == SFR_OK)
{
rowsRead++;
Int32 rowLength = strlen(pos);
pos += rowLength;
*pos = rowDelimiter;
pos++;
*pos = 0;
bytesRead += rowLength+1;
if (maxRowLength < rowLength)
maxRowLength = rowLength;
limit = buffSize - maxRowLength*2;
}
} while (retCode == SFR_OK && bytesRead < limit);
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, " =>Returning %d, read %ld bytes in %d rows.", retCode, bytesRead, rowsRead);
return retCode;
}
// ===========================================================================
// ===== Class SequenceFileWriter
// ===========================================================================
JavaMethodInit* SequenceFileWriter::JavaMethods_ = NULL;
jclass SequenceFileWriter::javaClass_ = 0;
bool SequenceFileWriter::javaMethodsInitialized_ = false;
pthread_mutex_t SequenceFileWriter::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER;
static const char* const sfwErrorEnumStr[] =
{
"JNI NewStringUTF() in open() for writing."
,"Java exception in open() for writing."
,"JNI NewStringUTF() in write()"
,"Java exception in write()"
,"Java exception in close() after writing."
,"JNI NewStringUTF() in hdfsCreate()."
,"Java exception in hdfsCreate()."
,"JNI NewStringUTF() in hdfsWrite()."
,"Java exception in hdfsWrite()."
,"Java exception in hdfsClose()."
,"JNI NewStringUTF() in hdfsMergeFiles()."
,"Java exception in hdfsMergeFiles()."
,"JNI NewStringUTF() in hdfsCleanUnloadPath()."
,"Java exception in hdfsCleanUnloadPath()."
,"JNI NewStringUTF() in hdfsExists()."
,"Java exception in hdfsExists()."
,"file already exists."
,"Java exception in release()."
,"JNI NewStringUTF() in hdfsDeletePath()."
,"Java exception in hdfsDeletePath()."
};
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
char* SequenceFileWriter::getErrorText(SFW_RetCode errEnum)
{
if (errEnum < (SFW_RetCode)JOI_LAST)
return JavaObjectInterface::getErrorText((JOI_RetCode)errEnum);
else
return (char*)sfwErrorEnumStr[errEnum-SFW_FIRST];
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SequenceFileWriter::~SequenceFileWriter()
{
close();
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFW_RetCode SequenceFileWriter::init()
{
static char className[]="org/trafodion/sql/SequenceFileWriter";
SFW_RetCode rc;
if (javaMethodsInitialized_)
return (SFW_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
else
{
pthread_mutex_lock(&javaMethodsInitMutex_);
if (javaMethodsInitialized_)
{
pthread_mutex_unlock(&javaMethodsInitMutex_);
return (SFW_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
}
JavaMethods_ = new JavaMethodInit[JM_LAST];
JavaMethods_[JM_CTOR ].jm_name = "<init>";
JavaMethods_[JM_CTOR ].jm_signature = "()V";
JavaMethods_[JM_OPEN ].jm_name = "open";
JavaMethods_[JM_OPEN ].jm_signature = "(Ljava/lang/String;I)Ljava/lang/String;";
JavaMethods_[JM_WRITE ].jm_name = "write";
JavaMethods_[JM_WRITE ].jm_signature = "(Ljava/lang/String;)Ljava/lang/String;";
JavaMethods_[JM_CLOSE ].jm_name = "close";
JavaMethods_[JM_CLOSE ].jm_signature = "()Ljava/lang/String;";
JavaMethods_[JM_HDFS_CREATE ].jm_name = "hdfsCreate";
JavaMethods_[JM_HDFS_CREATE ].jm_signature = "(Ljava/lang/String;Z)Z";
JavaMethods_[JM_HDFS_WRITE ].jm_name = "hdfsWrite";
JavaMethods_[JM_HDFS_WRITE ].jm_signature = "([BJ)Z";
JavaMethods_[JM_HDFS_CLOSE ].jm_name = "hdfsClose";
JavaMethods_[JM_HDFS_CLOSE ].jm_signature = "()Z";
JavaMethods_[JM_HDFS_MERGE_FILES].jm_name = "hdfsMergeFiles";
JavaMethods_[JM_HDFS_MERGE_FILES].jm_signature = "(Ljava/lang/String;Ljava/lang/String;)Z";
JavaMethods_[JM_HDFS_CLEAN_UNLOAD_PATH].jm_name = "hdfsCleanUnloadPath";
JavaMethods_[JM_HDFS_CLEAN_UNLOAD_PATH].jm_signature = "(Ljava/lang/String;)Z";
JavaMethods_[JM_HDFS_EXISTS].jm_name = "hdfsExists";
JavaMethods_[JM_HDFS_EXISTS].jm_signature = "(Ljava/lang/String;)Z";
JavaMethods_[JM_HDFS_DELETE_PATH].jm_name = "hdfsDeletePath";
JavaMethods_[JM_HDFS_DELETE_PATH].jm_signature = "(Ljava/lang/String;)Z";
setHBaseCompatibilityMode(FALSE);
rc = (SFW_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
javaMethodsInitialized_ = TRUE;
pthread_mutex_unlock(&javaMethodsInitMutex_);
}
return rc;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFW_RetCode SequenceFileWriter::open(const char* path, SFW_CompType compression)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::open(%s) called.", path);
jstring js_path = jenv_->NewStringUTF(path);
if (js_path == NULL)
return SFW_ERROR_OPEN_PARAM;
// String open(java.lang.String);
tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_OPEN].methodID, js_path, compression);
jenv_->DeleteLocalRef(js_path);
if (jresult != NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::open()", jresult);
return SFW_ERROR_OPEN_EXCEPTION;
}
return SFW_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFW_RetCode SequenceFileWriter::write(const char* data)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::write(%s) called.", data);
jstring js_data = jenv_->NewStringUTF(data);
if (js_data == NULL)
return SFW_ERROR_WRITE_PARAM;
// String write(java.lang.String);
tsRecentJMFromJNI = JavaMethods_[JM_WRITE].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_WRITE].methodID, js_data);
jenv_->DeleteLocalRef(js_data);
if (jresult != NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::write()", jresult);
return SFW_ERROR_WRITE_EXCEPTION;
}
return SFW_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFW_RetCode SequenceFileWriter::writeBuffer(char* data, Int64 buffSize, const char* rowDelimiter)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::writeBuffer() called.");
// Point to the first row.
char* nextRow = data;
char* buffEnd = data + buffSize;
char* rowEnd = NULL;
do
{
// Find the end of the row
rowEnd = strchr(nextRow, *rowDelimiter);
if (rowEnd && rowEnd < buffEnd)
{
// We need the row to become a null-terminated string.
*rowEnd = '\0';
SFW_RetCode result = write(nextRow);
if (result != SFW_OK)
return result;
// Skip to the next row.
nextRow = rowEnd+1;
}
} while (rowEnd && nextRow < buffEnd);
return SFW_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFW_RetCode SequenceFileWriter::close()
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::close() called.");
if (javaObj_ == NULL)
{
// Maybe there was an initialization error.
return SFW_OK;
}
// String close();
tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_CLOSE].methodID);
if (jresult != NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::close()", jresult);
return SFW_ERROR_CLOSE_EXCEPTION;
}
return SFW_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFW_RetCode SequenceFileWriter::hdfsCreate(const char* path, NABoolean compress)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::hdfsCreate(%s) called.", path);
if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
getExceptionDetails();
return SFW_ERROR_HDFS_WRITE_EXCEPTION;
}
jstring js_path = jenv_->NewStringUTF(path);
if (js_path == NULL) {
GetCliGlobals()->setJniErrorStr(getErrorText(SFW_ERROR_HDFS_CREATE_PARAM));
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_CREATE_PARAM;
}
jboolean j_compress = compress;
tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CREATE].jm_full_name;
jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_CREATE].methodID, js_path, j_compress);
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
logError(CAT_SQL_HBASE, __FILE__, __LINE__);
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsCreate()", getLastError());
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_CREATE_EXCEPTION;
}
if (jresult == false)
{
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsCreaten()", getLastError());
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_CREATE_EXCEPTION;
}
jenv_->PopLocalFrame(NULL);
return SFW_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFW_RetCode SequenceFileWriter::hdfsWrite(const char* data, Int64 len)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::hdfsWrite(%ld) called.", len);
if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
getExceptionDetails();
return SFW_ERROR_HDFS_WRITE_EXCEPTION;
}
//Write the requisite bytes into the file
jbyteArray jbArray = jenv_->NewByteArray( len);
if (!jbArray) {
GetCliGlobals()->setJniErrorStr(getErrorText(SFW_ERROR_HDFS_WRITE_PARAM));
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_WRITE_PARAM;
}
jenv_->SetByteArrayRegion(jbArray, 0, len, (const jbyte*)data);
jlong j_len = len;
tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE].jm_full_name;
jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE].methodID,jbArray , j_len);
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
logError(CAT_SQL_HBASE, __FILE__, __LINE__);
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsWrite()", getLastError());
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_WRITE_EXCEPTION;
}
if (jresult == false)
{
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsWrite()", getLastError());
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_WRITE_EXCEPTION;
}
jenv_->PopLocalFrame(NULL);
return SFW_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
SFW_RetCode SequenceFileWriter::hdfsClose()
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::close() called.");
if (javaObj_ == NULL)
{
// Maybe there was an initialization error.
return SFW_OK;
}
// String close();
tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CLOSE].jm_full_name;
jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_CLOSE].methodID);
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
logError(CAT_SQL_HBASE, __FILE__, __LINE__);
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsClose()", getLastError());
return SFW_ERROR_HDFS_CLOSE_EXCEPTION;
}
if (jresult == false)
{
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsClose()", getLastError());
return SFW_ERROR_HDFS_CLOSE_EXCEPTION;
}
return SFW_OK;
}
SFW_RetCode SequenceFileWriter::hdfsCleanUnloadPath( const NAString& uldPath)
{
QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsCleanUnloadPath(%s) called.",
uldPath.data());
if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
getExceptionDetails();
return SFW_ERROR_HDFS_CLEANUP_EXCEPTION;
}
jstring js_UldPath = jenv_->NewStringUTF(uldPath.data());
if (js_UldPath == NULL) {
GetCliGlobals()->setJniErrorStr(getErrorText(SFW_ERROR_HDFS_CLEANUP_PARAM));
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_CLEANUP_PARAM;
}
tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CLEAN_UNLOAD_PATH].jm_full_name;
jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_CLEAN_UNLOAD_PATH].methodID, js_UldPath);
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
logError(CAT_SQL_HBASE, __FILE__, __LINE__);
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsCleanUnloadPath()", getLastError());
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_CLEANUP_EXCEPTION;
}
jenv_->PopLocalFrame(NULL);
return SFW_OK;
}
SFW_RetCode SequenceFileWriter::hdfsMergeFiles( const NAString& srcPath,
const NAString& dstPath)
{
QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsMergeFiles(%s, %s) called.",
srcPath.data(), dstPath.data());
if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
getExceptionDetails();
return SFW_ERROR_HDFS_MERGE_FILES_EXCEPTION;
}
jstring js_SrcPath = jenv_->NewStringUTF(srcPath.data());
if (js_SrcPath == NULL) {
GetCliGlobals()->setJniErrorStr(getErrorText(SFW_ERROR_HDFS_MERGE_FILES_PARAM));
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_MERGE_FILES_PARAM;
}
jstring js_DstPath= jenv_->NewStringUTF(dstPath.data());
if (js_DstPath == NULL) {
GetCliGlobals()->setJniErrorStr(getErrorText(SFW_ERROR_HDFS_MERGE_FILES_PARAM));
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_MERGE_FILES_PARAM;
}
tsRecentJMFromJNI = JavaMethods_[JM_HDFS_MERGE_FILES].jm_full_name;
jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_MERGE_FILES].methodID, js_SrcPath, js_DstPath);
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
logError(CAT_SQL_HBASE, __FILE__, __LINE__);
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsMergeFiles()", getLastError());
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_MERGE_FILES_EXCEPTION;
}
if (jresult == false)
{
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsMergeFiles()", getLastError());
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_MERGE_FILES_EXCEPTION;
}
jenv_->PopLocalFrame(NULL);
return SFW_OK;
}
SFW_RetCode SequenceFileWriter::hdfsDeletePath( const NAString& delPath)
{
QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsDeletePath(%s called.",
delPath.data());
if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
getExceptionDetails();
return SFW_ERROR_HDFS_DELETE_PATH_EXCEPTION;
}
jstring js_delPath = jenv_->NewStringUTF(delPath.data());
if (js_delPath == NULL) {
GetCliGlobals()->setJniErrorStr(getErrorText(SFW_ERROR_HDFS_DELETE_PATH_PARAM));
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_DELETE_PATH_PARAM;
}
tsRecentJMFromJNI = JavaMethods_[JM_HDFS_DELETE_PATH].jm_full_name;
jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_DELETE_PATH].methodID, js_delPath);
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
logError(CAT_SQL_HBASE, __FILE__, __LINE__);
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsDeletePath()", getLastError());
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_DELETE_PATH_EXCEPTION;
}
if (jresult == false)
{
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsDeletePath()", getLastError());
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_DELETE_PATH_EXCEPTION;
}
jenv_->PopLocalFrame(NULL);
return SFW_OK;
}
SFW_RetCode SequenceFileWriter::hdfsExists( const NAString& uldPath, NABoolean & exist)
{
QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsExists(%s) called.",
uldPath.data());
if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
getExceptionDetails();
return SFW_ERROR_HDFS_EXISTS_EXCEPTION;
}
jstring js_UldPath = jenv_->NewStringUTF(uldPath.data());
if (js_UldPath == NULL) {
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_EXISTS_PARAM;
}
tsRecentJMFromJNI = JavaMethods_[JM_HDFS_EXISTS].jm_full_name;
jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_EXISTS].methodID, js_UldPath);
exist = jresult;
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
logError(CAT_SQL_HBASE, __FILE__, __LINE__);
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsExists()", getLastError());
jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_EXISTS_EXCEPTION;
}
jenv_->PopLocalFrame(NULL);
return SFW_OK;
}