blob: bc7ee9d7109661ae97fa264fef1c4644be279af6 [file] [log] [blame]
// **********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
// **********************************************************************
#include "OrcFileReader.h"
#include "QRLogger.h"
// ===========================================================================
// ===== Class OrcFileReader
// ===========================================================================
JavaMethodInit* OrcFileReader::JavaMethods_ = NULL;
jclass OrcFileReader::javaClass_ = 0;
bool OrcFileReader::javaMethodsInitialized_ = false;
pthread_mutex_t OrcFileReader::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER;
static const char* const sfrErrorEnumStr[] =
{
"No more data."
,"JNI NewStringUTF() in initSerDe()"
,"Java exception in initSerDe()"
,"JNI NewStringUTF() in open()"
,"Java exception in open()"
,"Java exception in getPos()"
,"Java exception in seeknSync()"
,"Java exception in isEOF()"
,"Java exception in fetchNextRow()"
,"Java exception in close()"
,"Java exception in getRowNum()"
};
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
char* OrcFileReader::getErrorText(OFR_RetCode errEnum)
{
if (errEnum < (OFR_RetCode)JOI_LAST)
return JavaObjectInterface::getErrorText((JOI_RetCode)errEnum);
else
return (char*)sfrErrorEnumStr[errEnum-JOI_LAST];
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
OrcFileReader::~OrcFileReader()
{
close();
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
OFR_RetCode OrcFileReader::init()
{
static char className[]="org/trafodion/sql/OrcFileReader";
OFR_RetCode lv_retcode = OFR_OK;
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER,
LL_DEBUG,
"Enter OrcFileReader::init()");
if (isInitialized())
return lv_retcode;
if (javaMethodsInitialized_)
return (OFR_RetCode)JavaObjectInterface::init(className,
javaClass_,
JavaMethods_,
(Int32)JM_LAST, javaMethodsInitialized_);
else
{
pthread_mutex_lock(&javaMethodsInitMutex_);
if (javaMethodsInitialized_)
{
pthread_mutex_unlock(&javaMethodsInitMutex_);
return (OFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
}
JavaMethods_ = new JavaMethodInit[JM_LAST];
JavaMethods_[JM_CTOR ].jm_name = "<init>";
JavaMethods_[JM_CTOR ].jm_signature = "()V";
JavaMethods_[JM_GETERROR ].jm_name = "getLastError";
JavaMethods_[JM_GETERROR ].jm_signature = "()Ljava/lang/String;";
JavaMethods_[JM_OPEN ].jm_name = "open";
JavaMethods_[JM_OPEN ].jm_signature = "(Ljava/lang/String;)Ljava/lang/String;";
JavaMethods_[JM_GETPOS ].jm_name = "getPosition";
JavaMethods_[JM_GETPOS ].jm_signature = "()J";
JavaMethods_[JM_SYNC ].jm_name = "seeknSync";
JavaMethods_[JM_SYNC ].jm_signature = "(J)Ljava/lang/String;";
JavaMethods_[JM_ISEOF ].jm_name = "isEOF";
JavaMethods_[JM_ISEOF ].jm_signature = "()Z";
// JavaMethods_[JM_FETCHROW ].jm_name = "fetchNextRow";
// JavaMethods_[JM_FETCHROW ].jm_signature = "()Ljava/lang/String;";
// JavaMethods_[JM_FETCHROW2 ].jm_name = "fetchNextRow";
// JavaMethods_[JM_FETCHROW2 ].jm_signature = "(J)Ljava/lang/String;";
// JavaMethods_[JM_FETCHROW ].jm_name = "fetchNextRow";
// JavaMethods_[JM_FETCHROW ].jm_signature = "(C)Ljava/lang/String;";
// JavaMethods_[JM_FETCHROW2 ].jm_name = "fetchNextRow";
// JavaMethods_[JM_FETCHROW2 ].jm_signature = "(C)Ljava/lang/String;";
JavaMethods_[JM_FETCHROW ].jm_name = "fetchNextRow";
JavaMethods_[JM_FETCHROW ].jm_signature = "()[B";
// JavaMethods_[JM_FETCHROW2 ].jm_name = "fetchNextRow";
// JavaMethods_[JM_FETCHROW2 ].jm_signature = "()[B";
JavaMethods_[JM_FETCHROW2 ].jm_name = "fetchNextRowObj";
JavaMethods_[JM_FETCHROW2 ].jm_signature = "()Lorg/trafodion/sql/OrcFileReader$OrcRowReturnSQL;";
JavaMethods_[JM_GETNUMROWS ].jm_name = "getNumberOfRows";
JavaMethods_[JM_GETNUMROWS ].jm_signature = "()J";
// JavaMethods_[JM_FETCHBUFF1].jm_name = "fetchArrayOfRows";
// JavaMethods_[JM_FETCHBUFF1].jm_signature = "(I)[Ljava/lang/String;";
// JavaMethods_[JM_FETCHBUFF2].jm_name = "fetchArrayOfRows";
// JavaMethods_[JM_FETCHBUFF2].jm_signature = "(II)[Ljava/lang/String;";
JavaMethods_[JM_CLOSE ].jm_name = "close";
JavaMethods_[JM_CLOSE ].jm_signature = "()Ljava/lang/String;";
lv_retcode = (OFR_RetCode)JavaObjectInterface::init(className,
javaClass_,
JavaMethods_,
(Int32)JM_LAST, javaMethodsInitialized_);
if (lv_retcode == OFR_OK)
javaMethodsInitialized_ = TRUE;
pthread_mutex_unlock(&javaMethodsInitMutex_);
}
return lv_retcode;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
OFR_RetCode OrcFileReader::open(const char* path)
{
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::open(%s) called.", path);
if (initJNIEnv() != JOI_OK)
return OFR_ERROR_OPEN_PARAM;
jstring js_path = jenv_->NewStringUTF(path);
if (js_path == NULL)
{
jenv_->PopLocalFrame(NULL);
return OFR_ERROR_OPEN_PARAM;
}
// String open(java.lang.String);
tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_OPEN].methodID, js_path);
if (jenv_->ExceptionCheck())
{
getExceptionDetails(__FILE__, __LINE__, "OrcFileReader::open()");
jenv_->PopLocalFrame(NULL);
return OFR_ERROR_OPEN_EXCEPTION;
}
jenv_->PopLocalFrame(NULL);
return OFR_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
OFR_RetCode OrcFileReader::getPosition(Int64& pos)
{
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::getPosition(%ld) called.", pos);
if (initJNIEnv() != JOI_OK)
return OFR_ERROR_GETPOS_EXCEPTION;
// long getPosition();
tsRecentJMFromJNI = JavaMethods_[JM_GETPOS].jm_full_name;
Int64 result = jenv_->CallLongMethod(javaObj_, JavaMethods_[JM_GETPOS].methodID);
if (jenv_->ExceptionCheck())
{
getExceptionDetails(__FILE__, __LINE__, "OrcFileReader::getPosition()");
jenv_->PopLocalFrame(NULL);
return OFR_ERROR_GETPOS_EXCEPTION;
}
if (result == -1) {
logError(CAT_SQL_HDFS_ORC_FILE_READER,
"OrcFileReader::getPosition()",
getLastError());
jenv_->PopLocalFrame(NULL);
return OFR_ERROR_GETPOS_EXCEPTION;
}
pos = result;
jenv_->PopLocalFrame(NULL);
return OFR_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
OFR_RetCode OrcFileReader::seeknSync(Int64 pos)
{
Int64 orcPos;
if (initJNIEnv() != JOI_OK)
return OFR_ERROR_SYNC_EXCEPTION;
orcPos = pos -1; //When you position in ORC, reading the NEXT row will be one greater than what you wanted.
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::seeknSync(%ld) called.", pos);
// String seeknSync(long);
tsRecentJMFromJNI = JavaMethods_[JM_SYNC].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_SYNC].methodID, orcPos);
if (jenv_->ExceptionCheck())
{
getExceptionDetails(__FILE__, __LINE__, "OrcFileReader::seeknSync()");
jenv_->PopLocalFrame(NULL);
return OFR_ERROR_SYNC_EXCEPTION;
}
if (jresult != NULL) {
const char *my_string = jenv_->GetStringUTFChars(jresult,
JNI_FALSE);
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER,
LL_DEBUG,
"OrcFileReader::seeknSync(%ld) error: %s\n",
pos,
my_string);
logError(CAT_SQL_HDFS_ORC_FILE_READER,
"OrcFileReader::seeknSync()",
jresult);
jenv_->PopLocalFrame(NULL);
return OFR_ERROR_SYNC_EXCEPTION;
}
jenv_->PopLocalFrame(NULL);
return OFR_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
OFR_RetCode OrcFileReader::isEOF(bool& isEOF)
{
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::isEOF() called.");
if (initJNIEnv() != JOI_OK)
return OFR_ERROR_ISEOF_EXCEPTION;
// boolean isEOF();
tsRecentJMFromJNI = JavaMethods_[JM_ISEOF].jm_full_name;
bool result = jenv_->CallBooleanMethod(javaObj_,
JavaMethods_[JM_ISEOF].methodID);
if (jenv_->ExceptionCheck())
{
getExceptionDetails(__FILE__, __LINE__, "OrcFileReader::isEOF()");
jenv_->PopLocalFrame(NULL);
return OFR_ERROR_ISEOF_EXCEPTION;
}
isEOF = result;
jenv_->PopLocalFrame(NULL);
return OFR_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
//OFR_RetCode OrcFileReader::fetchNextRow(Int64 stopOffset, char* buffer)
OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long& rowNumber, int& num_columns)
{
if (initJNIEnv() != JOI_OK)
return OFR_ERROR_FETCHROW_EXCEPTION;
/*
// java.lang.String fetchNextRow(long stopOffset);
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID, stopOffset);
if (jresult==NULL && getLastError())
{
logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError());
jenv_->PopLocalFrame(NULL);
return OFR_ERROR_FETCHROW_EXCEPTION;
}
if (jresult == NULL)
{
return OFR_NOMORE;
}
const char* char_result = jenv_->GetStringUTFChars(jresult, 0);
strcpy(buffer, char_result);
jenv_->ReleaseStringUTFChars(jresult, char_result);
jenv_->PopLocalFrame(NULL);
return OFR_OK;
*/
jfieldID fid;
tsRecentJMFromJNI = JavaMethods_[JM_FETCHROW2].jm_full_name;
jobject jresult = (jobject)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID);
if (jenv_->ExceptionCheck())
{
getExceptionDetails(__FILE__, __LINE__, "OrcFileReader::fetchNextRow()");
jenv_->PopLocalFrame(NULL);
return OFR_ERROR_FETCHROW_EXCEPTION;
}
if (jresult==NULL && getLastError())
{
logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError());
return OFR_ERROR_FETCHROW_EXCEPTION;
}
if (jresult == NULL)
return (OFR_NOMORE); //No more rows
//Retrieve row and associated data
jclass cls = jenv_->GetObjectClass(jresult);
fid = jenv_->GetFieldID(cls,"m_row_length","I");
if (fid ==NULL)
{
jenv_->PopLocalFrame(NULL);
return (OFR_ERROR_FETCHROW_EXCEPTION);
}
jint row_length = (jint)jenv_->GetIntField(jresult, fid);
array_length = (long)row_length;
fid = jenv_->GetFieldID(cls,"m_column_count","I");
if (fid ==NULL)
{
jenv_->PopLocalFrame(NULL);
return(OFR_ERROR_FETCHROW_EXCEPTION);
}
jint column_count = (jint)jenv_->GetIntField(jresult, fid);
num_columns = column_count;
fid = jenv_->GetFieldID(cls,"m_row_number","J");
if (fid ==NULL)
{
jenv_->PopLocalFrame(NULL);
return(OFR_ERROR_FETCHROW_EXCEPTION);
}
jlong rowNum = (jlong)jenv_->GetIntField(jresult, fid);
rowNumber = rowNum;
// Get the actual row (it is a byte array). Use the row_length above to specify how much to copy
fid = jenv_->GetFieldID(cls,"m_row_ba","[B");
if (fid ==NULL)
{
jenv_->PopLocalFrame(NULL);
return (OFR_ERROR_FETCHROW_EXCEPTION);
}
jbyteArray jrow = (jbyteArray)jenv_->GetObjectField(jresult, fid);
if (jrow == NULL)
{
jenv_->PopLocalFrame(NULL);
return (OFR_ERROR_FETCHROW_EXCEPTION);
}
jenv_->GetByteArrayRegion(jrow, 0, row_length, (jbyte*)buffer);
jenv_->PopLocalFrame(NULL);
return (OFR_OK);
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
OFR_RetCode OrcFileReader::close()
{
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::close() called.");
if (initJNIEnv() != JOI_OK)
return OFR_ERROR_GETPOS_EXCEPTION;
// String close();
tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_CLOSE].methodID);
if (jenv_->ExceptionCheck())
{
getExceptionDetails(__FILE__, __LINE__, "OrcFileReader::close()");
jenv_->PopLocalFrame(NULL);
return OFR_ERROR_CLOSE_EXCEPTION;
}
if (jresult!=NULL) {
logError(CAT_SQL_HDFS_ORC_FILE_READER,
"OrcFileReader::close()",
jresult);
jenv_->PopLocalFrame(NULL);
return OFR_ERROR_CLOSE_EXCEPTION;
}
jenv_->PopLocalFrame(NULL);
return OFR_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
OFR_RetCode OrcFileReader::getRowCount(Int64& count)
{
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::getRowCount() called.");
if (initJNIEnv() != JOI_OK)
return OFR_ERROR_GETNUMROWS_EXCEPTION;
tsRecentJMFromJNI = JavaMethods_[JM_GETNUMROWS].jm_full_name;
jlong jresult = (jlong)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_GETNUMROWS].methodID);
count = jresult;
if (jenv_->ExceptionCheck())
{
getExceptionDetails(__FILE__, __LINE__, "OrcFileReader::getRowCount()");
jenv_->PopLocalFrame(NULL);
return OFR_ERROR_GETNUMROWS_EXCEPTION;
}
jenv_->PopLocalFrame(NULL);
return OFR_OK;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
jstring OrcFileReader::getLastError()
{
// String getLastError();
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_GETERROR].methodID);
return jresult;
}
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
OFR_RetCode OrcFileReader::fetchRowsIntoBuffer(Int64 stopOffset,
char* buffer,
Int64 buffSize,
Int64& bytesRead,
char rowDelimiter)
{
/*
Removed until implemented
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::fetchRowsIntoBuffer(stopOffset: %ld, buffSize: %ld) called.", stopOffset, buffSize);
Int32 maxRowLength = 0;
char* pos = buffer;
Int64 limit = buffSize;
OFR_RetCode retCode;
long rowsRead=0;
bytesRead = 0;
do
{
retCode = fetchNextRow(stopOffset, pos);
if (retCode == OFR_OK)
{
rowsRead++;
Int32 rowLength = strlen(pos);
pos += rowLength;
*pos = rowDelimiter;
pos++;
*pos = 0;
bytesRead += rowLength+1;
if (maxRowLength < rowLength)
maxRowLength = rowLength;
limit = buffSize - maxRowLength*2;
}
} while (retCode == OFR_OK && bytesRead < limit);
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, " =>Returning %d, read %ld bytes in %d rows.", retCode, bytesRead, rowsRead);
return retCode;
*/
return (OFR_NOMORE);
}