| //********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // ********************************************************************** |
| |
| #include "QRLogger.h" |
| #include "Globals.h" |
| #include "Context.h" |
| #include "jni.h" |
| #include "HdfsClient_JNI.h" |
| #include "org_trafodion_sql_HDFSClient.h" |
| #include "ComCompressionInfo.h" |
| |
| // =========================================================================== |
| // ===== Class HdfsScan |
| // =========================================================================== |
| |
| JavaMethodInit* HdfsScan::JavaMethods_ = NULL; |
| jclass HdfsScan::javaClass_ = 0; |
| bool HdfsScan::javaMethodsInitialized_ = false; |
| pthread_mutex_t HdfsScan::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER; |
| |
| static const char* const hdfsScanErrorEnumStr[] = |
| { |
| "Error in HdfsScan::setScanRanges" |
| ,"Java exception in HdfsScan::setScanRanges" |
| ,"Error in HdfsScan::trafHdfsRead" |
| ,"Java exception in HdfsScan::trafHdfsRead" |
| , "Hdfs scan End of Ranges" |
| ,"Error in HdfsScan::stop" |
| ,"Java exception in HdfsScan::stop" |
| }; |
| |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // |
| ////////////////////////////////////////////////////////////////////////////// |
| HdfsScan::~HdfsScan() |
| { |
| if (j_buf1_ != NULL) { |
| jenv_->DeleteGlobalRef(j_buf1_); |
| j_buf1_ = NULL; |
| } |
| if (j_buf2_ != NULL) { |
| jenv_->DeleteGlobalRef(j_buf2_); |
| j_buf2_ = NULL; |
| } |
| } |
| |
| HDFS_Scan_RetCode HdfsScan::init() |
| { |
| static char className[]="org/trafodion/sql/HdfsScan"; |
| HDFS_Scan_RetCode rc; |
| |
| if (javaMethodsInitialized_) |
| return (HDFS_Scan_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); |
| else |
| { |
| pthread_mutex_lock(&javaMethodsInitMutex_); |
| if (javaMethodsInitialized_) |
| { |
| pthread_mutex_unlock(&javaMethodsInitMutex_); |
| return (HDFS_Scan_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); |
| } |
| JavaMethods_ = new JavaMethodInit[JM_LAST]; |
| |
| JavaMethods_[JM_CTOR ].jm_name = "<init>"; |
| JavaMethods_[JM_CTOR ].jm_signature = "()V"; |
| JavaMethods_[JM_SET_SCAN_RANGES].jm_name = "setScanRanges"; |
| JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;I[Ljava/lang/String;[J[J[I[S)V"; |
| JavaMethods_[JM_TRAF_HDFS_READ].jm_name = "trafHdfsRead"; |
| JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I"; |
| JavaMethods_[JM_STOP].jm_name = "stop"; |
| JavaMethods_[JM_STOP].jm_signature = "()V"; |
| |
| rc = (HDFS_Scan_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); |
| if (rc == HDFS_SCAN_OK) |
| javaMethodsInitialized_ = TRUE; |
| pthread_mutex_unlock(&javaMethodsInitMutex_); |
| } |
| return rc; |
| } |
| |
| char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum) |
| { |
| if (errEnum < (HDFS_Scan_RetCode)JOI_LAST) |
| return JavaObjectInterface::getErrorText((JOI_RetCode)errEnum); |
| else |
| return (char*)hdfsScanErrorEnumStr[errEnum-HDFS_SCAN_FIRST]; |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, int hdfsIoByteArraySizeInKB, |
| HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::setScanRanges() called."); |
| |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM; |
| |
| jobject j_buf1 = jenv_->NewDirectByteBuffer(hdfsScanBuf[0].buf_, scanBufSize); |
| if (j_buf1 == NULL) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM; |
| } |
| j_buf1_ = jenv_->NewGlobalRef(j_buf1); |
| if (j_buf1_ == NULL) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM; |
| } |
| jobject j_buf2 = jenv_->NewDirectByteBuffer(hdfsScanBuf[1].buf_, scanBufSize); |
| if (j_buf2 == NULL) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM; |
| } |
| j_buf2_ = jenv_->NewGlobalRef(j_buf2); |
| if (j_buf2_ == NULL) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM; |
| } |
| jint j_hdfsIoByteArraySizeInKB = hdfsIoByteArraySizeInKB; |
| jobjectArray j_filenames = NULL; |
| jlongArray j_offsets = NULL; |
| jlongArray j_lens = NULL; |
| jintArray j_rangenums = NULL; |
| jshortArray j_compress = NULL; |
| HdfsFileInfo *hdfo; |
| jstring j_obj; |
| |
| HDFS_Scan_RetCode hdfsScanRetCode = HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM; |
| int arrayLen = hdfsFileInfoArray->entries(); |
| |
| for (Int32 i = beginRangeNum, rangeCount=0; i < arrayLen; i++, rangeCount++) { |
| if (rangeCount >= numRanges) |
| break; |
| hdfo = hdfsFileInfoArray->at(i); |
| j_obj = jenv_->NewStringUTF(hdfo->fileName()); |
| if (jenv_->ExceptionCheck()) { |
| jenv_->PopLocalFrame(NULL); |
| return hdfsScanRetCode; |
| } |
| if (j_filenames == NULL) { |
| j_filenames = jenv_->NewObjectArray(numRanges, jenv_->GetObjectClass(j_obj), NULL); |
| if (jenv_->ExceptionCheck()) { |
| jenv_->PopLocalFrame(NULL); |
| return hdfsScanRetCode; |
| } |
| } |
| jenv_->SetObjectArrayElement(j_filenames, rangeCount, (jobject)j_obj); |
| jenv_->DeleteLocalRef(j_obj); |
| |
| if (j_offsets == NULL) { |
| j_offsets = jenv_->NewLongArray(numRanges); |
| if (jenv_->ExceptionCheck()) { |
| jenv_->PopLocalFrame(NULL); |
| return hdfsScanRetCode; |
| } |
| } |
| long offset = hdfo->getStartOffset(); |
| jenv_->SetLongArrayRegion(j_offsets, rangeCount, 1, &offset); |
| |
| if (j_lens == NULL) { |
| j_lens = jenv_->NewLongArray(numRanges); |
| if (jenv_->ExceptionCheck()) { |
| jenv_->PopLocalFrame(NULL); |
| return hdfsScanRetCode; |
| } |
| } |
| long len; |
| if (hdfo->getBytesToRead() > (LONG_MAX-rangeTailIOSize)) |
| len = LONG_MAX; |
| else |
| len = hdfo->getBytesToRead()+rangeTailIOSize; |
| jenv_->SetLongArrayRegion(j_lens, rangeCount, 1, &len); |
| |
| if (j_rangenums == NULL) { |
| j_rangenums = jenv_->NewIntArray(numRanges); |
| if (jenv_->ExceptionCheck()) { |
| jenv_->PopLocalFrame(NULL); |
| return hdfsScanRetCode; |
| } |
| } |
| jint tdbRangeNum = i; |
| jenv_->SetIntArrayRegion(j_rangenums, rangeCount, 1, &tdbRangeNum); |
| |
| if (j_compress == NULL) { |
| j_compress = jenv_->NewShortArray(numRanges); |
| if (jenv_->ExceptionCheck()) { |
| jenv_->PopLocalFrame(NULL); |
| return hdfsScanRetCode; |
| } |
| } |
| short compressionMethod = (short)hdfo->getCompressionMethod(); |
| ex_assert(compressionMethod >= 0 && compressionMethod < ComCompressionInfo::SUPPORTED_COMPRESSIONS, "Illegal CompressionMethod Value"); |
| jenv_->SetShortArrayRegion(j_compress, rangeCount, 1, &compressionMethod); |
| } |
| |
| if (hdfsStats_ != NULL) |
| hdfsStats_->getHdfsTimer().start(); |
| tsRecentJMFromJNI = JavaMethods_[JM_SET_SCAN_RANGES].jm_full_name; |
| jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1, j_buf2, j_hdfsIoByteArraySizeInKB, |
| j_filenames, j_offsets, j_lens, j_rangenums, j_compress); |
| if (hdfsStats_ != NULL) { |
| hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); |
| hdfsStats_->incHdfsCalls(); |
| } |
| |
| if (jenv_->ExceptionCheck()) { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsScan::setScanRanges()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_SCAN_ERROR_SET_SCAN_RANGES_EXCEPTION; |
| } |
| return HDFS_SCAN_OK; |
| } |
| |
| HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, |
| int hdfsIoByteArraySizeInKB, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize, |
| ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::newInstance() called."); |
| |
| if (initJNIEnv() != JOI_OK) |
| return NULL; |
| hdfsScanRetCode = HDFS_SCAN_OK; |
| HdfsScan *hdfsScan = new (heap) HdfsScan(heap); |
| if (hdfsScan != NULL) { |
| hdfsScanRetCode = hdfsScan->init(); |
| if (hdfsScanRetCode == HDFS_SCAN_OK) |
| hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize, hdfsIoByteArraySizeInKB, |
| hdfsFileInfoArray, beginRangeNum, numRanges, rangeTailIOSize); |
| if (hdfsScanRetCode == HDFS_SCAN_OK) |
| hdfsScan->setHdfsStats(hdfsStats); |
| else { |
| NADELETE(hdfsScan, HdfsScan, heap); |
| hdfsScan = NULL; |
| } |
| } |
| return hdfsScan; |
| } |
| |
| |
| HDFS_Scan_RetCode HdfsScan::trafHdfsRead(int retArray[], short arrayLen) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::trafHdfsRead() called."); |
| |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_SCAN_ERROR_TRAF_HDFS_READ_PARAM; |
| |
| if (hdfsStats_ != NULL) |
| hdfsStats_->getHdfsTimer().start(); |
| tsRecentJMFromJNI = JavaMethods_[JM_TRAF_HDFS_READ].jm_full_name; |
| jintArray j_retArray = (jintArray)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_TRAF_HDFS_READ].methodID); |
| if (hdfsStats_ != NULL) { |
| hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); |
| hdfsStats_->incHdfsCalls(); |
| } |
| |
| if (jenv_->ExceptionCheck()) { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsScan::setScanRanges()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_SCAN_ERROR_TRAF_HDFS_READ_EXCEPTION; |
| } |
| if (j_retArray == NULL) |
| return HDFS_SCAN_EOR; |
| |
| short retArrayLen = jenv_->GetArrayLength(j_retArray); |
| ex_assert(retArrayLen == arrayLen, "HdfsScan::trafHdfsRead() InternalError: retArrayLen != arrayLen"); |
| jenv_->GetIntArrayRegion(j_retArray, 0, 4, retArray); |
| return HDFS_SCAN_OK; |
| } |
| |
| HDFS_Scan_RetCode HdfsScan::stop() |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::stop() called."); |
| |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_SCAN_ERROR_STOP_PARAM; |
| |
| if (hdfsStats_ != NULL) |
| hdfsStats_->getHdfsTimer().start(); |
| tsRecentJMFromJNI = JavaMethods_[JM_STOP].jm_full_name; |
| jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_STOP].methodID); |
| if (hdfsStats_ != NULL) { |
| hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); |
| hdfsStats_->incHdfsCalls(); |
| } |
| |
| if (jenv_->ExceptionCheck()) { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsScan::stop()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_SCAN_ERROR_STOP_EXCEPTION; |
| } |
| return HDFS_SCAN_OK; |
| } |
| |
| // =========================================================================== |
| // ===== Class HdfsClient |
| // =========================================================================== |
| |
| JavaMethodInit* HdfsClient::JavaMethods_ = NULL; |
| jclass HdfsClient::javaClass_ = 0; |
| bool HdfsClient::javaMethodsInitialized_ = false; |
| pthread_mutex_t HdfsClient::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER; |
| |
| static const char* const hdfsClientErrorEnumStr[] = |
| { |
| "JNI NewStringUTF() in HdfsClient::hdfsCreate()." |
| ,"Java exception in HdfsClient::hdfsCreate()." |
| ,"JNI NewStringUTF() in HdfsClient::hdfsOpen()." |
| ,"Java exception in HdfsClient::hdfsOpen()." |
| ,"JNI NewStringUTF() in HdfsClient::hdfsWrite()." |
| ,"Java exception in HdfsClient::hdfsWrite()." |
| ,"JNI NewStringUTF() in HdfsClient::hdfsWriteImmediate()." |
| ,"Java exception in HdfsClient::hdfsWriteImmediate()." |
| ,"Error in HdfsClient::hdfsRead()." |
| ,"Java exception in HdfsClient::hdfsRead()." |
| ,"Java exception in HdfsClient::hdfsClose()." |
| ,"JNI NewStringUTF() in HdfsClient::hdfsMergeFiles()." |
| ,"Java exception in HdfsClient::hdfsMergeFiles()." |
| ,"JNI NewStringUTF() in HdfsClient::hdfsCleanUnloadPath()." |
| ,"Java exception in HdfsClient::hdfsCleanUnloadPath()." |
| ,"JNI NewStringUTF() in HdfsClient::hdfsExists()." |
| ,"Java exception in HdfsClient::hdfsExists()." |
| ,"JNI NewStringUTF() in HdfsClient::hdfsDeletePath()." |
| ,"Java exception in HdfsClient::hdfsDeletePath()." |
| ,"Error in HdfsClient::setHdfsFileInfo()." |
| ,"Error in HdfsClient::hdfsListDirectory()." |
| ,"Java exception in HdfsClient::hdfsListDirectory()." |
| ,"preparing parameters for HdfsClient::getHiveTableMaxModificationTs()." |
| ,"java exception in HdfsClient::getHiveTableMaxModificationTs()." |
| ,"Error in HdfsClient::getFsDefaultName()." |
| ,"Java exception in HdfsClient::getFsDefaultName()." |
| ,"Buffer is small in HdfsClient::getFsDefaultName()." |
| ,"Error in HdfsClient::hdfsCreateDirectory()." |
| ,"Java exception in HdfsClient::hdfsCreateDirectory()." |
| ,"Error in HdfsClient::hdfsRename()." |
| ,"Java exception in HdfsClient::hdfsRename()." |
| ,"Error in HdfsClient::hdfsSize()." |
| ,"Java exception in HdfsClient::hdfsSize()." |
| }; |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // |
| ////////////////////////////////////////////////////////////////////////////// |
| HdfsClient::~HdfsClient() |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::~HdfsClient() called."); |
| deleteHdfsFileInfo(); |
| if (path_ != NULL) |
| NADELETEBASIC(path_, getHeap()); |
| path_ = NULL; |
| } |
| |
| void HdfsClient::deleteHdfsFileInfo() |
| { |
| for (int i = 0; i < numFiles_ ; i ++) { |
| NADELETEBASIC(hdfsFileInfo_[i].mName, getHeap()); |
| NADELETEBASIC(hdfsFileInfo_[i].mOwner, getHeap()); |
| NADELETEBASIC(hdfsFileInfo_[i].mGroup, getHeap()); |
| } |
| if (hdfsFileInfo_ != NULL) |
| NADELETEBASICARRAY(hdfsFileInfo_, getHeap()); |
| numFiles_ = 0; |
| hdfsFileInfo_ = NULL; |
| } |
| |
| HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode, int hdfsIoByteArraySizeInKB) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::newInstance() called."); |
| |
| if (initJNIEnv() != JOI_OK) |
| return NULL; |
| retCode = HDFS_CLIENT_OK; |
| HdfsClient *hdfsClient = new (heap) HdfsClient(heap); |
| if (hdfsClient != NULL) { |
| retCode = hdfsClient->init(); |
| if (retCode == HDFS_CLIENT_OK) { |
| hdfsClient->setHdfsStats(hdfsStats); |
| hdfsClient->setIoByteArraySize(hdfsIoByteArraySizeInKB); |
| } |
| else { |
| NADELETE(hdfsClient, HdfsClient, heap); |
| hdfsClient = NULL; |
| } |
| } |
| return hdfsClient; |
| } |
| |
| HdfsClient* HdfsClient::getInstance() |
| { |
| ContextCli *currContext = GetCliGlobals()->currContext(); |
| HdfsClient *hdfsClient = currContext->getHDFSClient(); |
| HDFS_Client_RetCode retcode; |
| if (hdfsClient == NULL) { |
| NAHeap *heap = currContext->exHeap(); |
| hdfsClient = newInstance(heap, NULL, retcode); |
| if (retcode != HDFS_CLIENT_OK) |
| return NULL; |
| currContext->setHDFSClient(hdfsClient); |
| } |
| return hdfsClient; |
| } |
| |
| void HdfsClient::deleteInstance(HdfsClient *hdfsClient) |
| { |
| hdfsClient->hdfsClose(); |
| NADELETE(hdfsClient, HdfsClient, hdfsClient->getHeap()); |
| } |
| |
| HDFS_Client_RetCode HdfsClient::init() |
| { |
| static char className[]="org/trafodion/sql/HDFSClient"; |
| HDFS_Client_RetCode rc; |
| |
| if (javaMethodsInitialized_) |
| return (HDFS_Client_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); |
| else |
| { |
| pthread_mutex_lock(&javaMethodsInitMutex_); |
| if (javaMethodsInitialized_) |
| { |
| pthread_mutex_unlock(&javaMethodsInitMutex_); |
| return (HDFS_Client_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); |
| } |
| JavaMethods_ = new JavaMethodInit[JM_LAST]; |
| |
| JavaMethods_[JM_CTOR ].jm_name = "<init>"; |
| JavaMethods_[JM_CTOR ].jm_signature = "()V"; |
| JavaMethods_[JM_HDFS_CREATE ].jm_name = "hdfsCreate"; |
| JavaMethods_[JM_HDFS_CREATE ].jm_signature = "(Ljava/lang/String;ZZ)Z"; |
| JavaMethods_[JM_HDFS_OPEN ].jm_name = "hdfsOpen"; |
| JavaMethods_[JM_HDFS_OPEN ].jm_signature = "(Ljava/lang/String;Z)Z"; |
| JavaMethods_[JM_HDFS_WRITE ].jm_name = "hdfsWrite"; |
| JavaMethods_[JM_HDFS_WRITE ].jm_signature = "([B)I"; |
| JavaMethods_[JM_HDFS_WRITE_IMMEDIATE].jm_name = "hdfsWriteImmediate"; |
| JavaMethods_[JM_HDFS_WRITE_IMMEDIATE].jm_signature = "([B)J"; |
| JavaMethods_[JM_HDFS_READ ].jm_name = "hdfsRead"; |
| JavaMethods_[JM_HDFS_READ ].jm_signature = "(JLjava/nio/ByteBuffer;)I"; |
| JavaMethods_[JM_HDFS_CLOSE ].jm_name = "hdfsClose"; |
| JavaMethods_[JM_HDFS_CLOSE ].jm_signature = "()Z"; |
| JavaMethods_[JM_HDFS_MERGE_FILES].jm_name = "hdfsMergeFiles"; |
| JavaMethods_[JM_HDFS_MERGE_FILES].jm_signature = "(Ljava/lang/String;Ljava/lang/String;)Z"; |
| JavaMethods_[JM_HDFS_CLEAN_UNLOAD_PATH].jm_name = "hdfsCleanUnloadPath"; |
| JavaMethods_[JM_HDFS_CLEAN_UNLOAD_PATH].jm_signature = "(Ljava/lang/String;)Z"; |
| JavaMethods_[JM_HDFS_EXISTS].jm_name = "hdfsExists"; |
| JavaMethods_[JM_HDFS_EXISTS].jm_signature = "(Ljava/lang/String;)Z"; |
| JavaMethods_[JM_HDFS_DELETE_PATH].jm_name = "hdfsDeletePath"; |
| JavaMethods_[JM_HDFS_DELETE_PATH].jm_signature = "(Ljava/lang/String;)Z"; |
| JavaMethods_[JM_HDFS_LIST_DIRECTORY].jm_name = "hdfsListDirectory"; |
| JavaMethods_[JM_HDFS_LIST_DIRECTORY].jm_signature = "(Ljava/lang/String;J)I"; |
| JavaMethods_[JM_HIVE_TBL_MAX_MODIFICATION_TS].jm_name = "getHiveTableMaxModificationTs"; |
| JavaMethods_[JM_HIVE_TBL_MAX_MODIFICATION_TS ].jm_signature = "(Ljava/lang/String;I)J"; |
| JavaMethods_[JM_GET_FS_DEFAULT_NAME].jm_name = "getFsDefaultName"; |
| JavaMethods_[JM_GET_FS_DEFAULT_NAME].jm_signature = "()Ljava/lang/String;"; |
| JavaMethods_[JM_HDFS_CREATE_DIRECTORY].jm_name = "hdfsCreateDirectory"; |
| JavaMethods_[JM_HDFS_CREATE_DIRECTORY].jm_signature = "(Ljava/lang/String;)Z"; |
| JavaMethods_[JM_HDFS_RENAME].jm_name = "hdfsRename"; |
| JavaMethods_[JM_HDFS_RENAME].jm_signature = "(Ljava/lang/String;Ljava/lang/String;)Z"; |
| JavaMethods_[JM_HDFS_SIZE].jm_name = "hdfsSize"; |
| JavaMethods_[JM_HDFS_SIZE].jm_signature = "()J"; |
| JavaMethods_[JM_HDFS_SIZE_FOR_FILE].jm_name = "hdfsSize"; |
| JavaMethods_[JM_HDFS_SIZE_FOR_FILE].jm_signature = "(Ljava/lang/String;)J"; |
| rc = (HDFS_Client_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); |
| if (rc == HDFS_CLIENT_OK) |
| javaMethodsInitialized_ = TRUE; |
| pthread_mutex_unlock(&javaMethodsInitMutex_); |
| } |
| return rc; |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // |
| ////////////////////////////////////////////////////////////////////////////// |
| char* HdfsClient::getErrorText(HDFS_Client_RetCode errEnum) |
| { |
| if (errEnum < (HDFS_Client_RetCode)HDFS_CLIENT_FIRST) |
| return JavaObjectInterface::getErrorText((JOI_RetCode)errEnum); |
| else |
| return (char*)hdfsClientErrorEnumStr[errEnum-HDFS_CLIENT_FIRST]; |
| } |
| |
| void HdfsClient::setPath(const char *path) |
| { |
| if (path_ != NULL) |
| NADELETEBASIC(path_, getHeap()); |
| size_t len = strlen(path); |
| path_ = new (getHeap()) char[len+1]; |
| strcpy(path_, path); |
| } |
| |
| HDFS_Client_RetCode HdfsClient::hdfsCreate(const char* path, NABoolean overwrite, NABoolean compress) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsCreate(%s) called.", path); |
| |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_CLIENT_ERROR_HDFS_CREATE_PARAM; |
| setPath(path); |
| jstring js_path = jenv_->NewStringUTF(path); |
| if (js_path == NULL) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_CREATE_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_CREATE_PARAM; |
| } |
| |
| jboolean j_compress = compress; |
| jboolean j_overwrite = overwrite; |
| |
| if (hdfsStats_ != NULL) |
| hdfsStats_->getHdfsTimer().start(); |
| |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CREATE].jm_full_name; |
| jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_CREATE].methodID, js_path, j_overwrite, j_compress); |
| if (hdfsStats_ != NULL) { |
| hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); |
| hdfsStats_->incHdfsCalls(); |
| } |
| |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsCreate()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_CREATE_EXCEPTION; |
| } |
| |
| if (jresult == false) |
| { |
| logError(CAT_SQL_HDFS, "HdfsClient::hdfsCreate()", getLastError()); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_CREATE_PARAM; |
| } |
| |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_OK; |
| } |
| |
| HDFS_Client_RetCode HdfsClient::hdfsOpen(const char* path, NABoolean compress) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsOpen(%s) called.", path); |
| |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_CLIENT_ERROR_HDFS_OPEN_PARAM; |
| setPath(path); |
| jstring js_path = jenv_->NewStringUTF(path); |
| if (js_path == NULL) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_OPEN_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_OPEN_PARAM; |
| } |
| |
| jboolean j_compress = compress; |
| if (hdfsStats_ != NULL) |
| hdfsStats_->getHdfsTimer().start(); |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_OPEN].jm_full_name; |
| jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_OPEN].methodID, js_path, j_compress); |
| if (hdfsStats_ != NULL) { |
| hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); |
| hdfsStats_->incHdfsCalls(); |
| } |
| |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsOpen()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_OPEN_EXCEPTION; |
| } |
| |
| if (jresult == false) |
| { |
| logError(CAT_SQL_HDFS, "HdfsClient::hdfsOpen()", getLastError()); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_OPEN_PARAM; |
| } |
| |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_OK; |
| } |
| |
| Int64 HdfsClient::hdfsSize(HDFS_Client_RetCode &hdfsClientRetcode) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsSize() called."); |
| |
| if (initJNIEnv() != JOI_OK) { |
| hdfsClientRetcode = HDFS_CLIENT_ERROR_SIZE_PARAM; |
| return -1; |
| } |
| |
| if (hdfsStats_ != NULL) |
| hdfsStats_->getHdfsTimer().start(); |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_SIZE].jm_full_name; |
| jlong jresult = jenv_->CallLongMethod(javaObj_, JavaMethods_[JM_HDFS_SIZE].methodID); |
| if (hdfsStats_ != NULL) { |
| hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); |
| hdfsStats_->incHdfsCalls(); |
| } |
| |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsSize()"); |
| jenv_->PopLocalFrame(NULL); |
| hdfsClientRetcode = HDFS_CLIENT_ERROR_SIZE_EXCEPTION; |
| return -1; |
| } |
| hdfsClientRetcode = HDFS_CLIENT_OK; |
| return jresult; |
| } |
| |
| |
| Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode &hdfsClientRetcode, int maxChunkSize) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsWrite(%ld) called.", len); |
| |
| if (initJNIEnv() != JOI_OK) { |
| hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION; |
| return 0; |
| } |
| Int64 lenRemain = len; |
| Int64 writeLen; |
| Int64 chunkLen = (maxChunkSize > 0 ? maxChunkSize : (ioByteArraySizeInKB_ > 0 ? ioByteArraySizeInKB_ * 1024 : 0)); |
| Int64 offset = 0; |
| jint bytesWritten; |
| do |
| { |
| if ((chunkLen > 0) && (lenRemain > chunkLen)) |
| writeLen = chunkLen; |
| else |
| writeLen = lenRemain; |
| //Write the requisite bytes into the file |
| jbyteArray jbArray = jenv_->NewByteArray(writeLen); |
| if (!jbArray) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM; |
| return 0; |
| } |
| jenv_->SetByteArrayRegion(jbArray, 0, writeLen, (const jbyte*)(data+offset)); |
| |
| if (hdfsStats_ != NULL) |
| hdfsStats_->getHdfsTimer().start(); |
| |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE].jm_full_name; |
| // Java method returns the cumulative bytes written |
| bytesWritten = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE].methodID, jbArray); |
| |
| if (hdfsStats_ != NULL) { |
| hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); |
| hdfsStats_->incHdfsCalls(); |
| } |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsWrite()"); |
| jenv_->PopLocalFrame(NULL); |
| hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION; |
| return 0; |
| } |
| lenRemain -= writeLen; |
| offset += writeLen; |
| } while (lenRemain > 0); |
| jenv_->PopLocalFrame(NULL); |
| hdfsClientRetcode = HDFS_CLIENT_OK; |
| return bytesWritten; |
| } |
| |
| Int64 HdfsClient::hdfsWriteImmediate(const char* data, Int64 len, HDFS_Client_RetCode &hdfsClientRetcode, int maxChunkSize) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsWriteImmediate(%ld) called.", len); |
| |
| if (initJNIEnv() != JOI_OK) { |
| hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_EXCEPTION; |
| return 0; |
| } |
| Int64 lenRemain = len; |
| Int64 writeLen; |
| Int64 chunkLen = (maxChunkSize > 0 ? maxChunkSize : (ioByteArraySizeInKB_ > 0 ? ioByteArraySizeInKB_ * 1024 : 0)); |
| Int64 offset = 0; |
| jlong writeOffset = -1; |
| jlong chunkWriteOffset; |
| do |
| { |
| if ((chunkLen > 0) && (lenRemain > chunkLen)) |
| writeLen = chunkLen; |
| else |
| writeLen = lenRemain; |
| //Write the requisite bytes into the file |
| jbyteArray jbArray = jenv_->NewByteArray(writeLen); |
| if (!jbArray) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_PARAM; |
| return 0; |
| } |
| jenv_->SetByteArrayRegion(jbArray, 0, writeLen, (const jbyte*)(data+offset)); |
| |
| if (hdfsStats_ != NULL) |
| hdfsStats_->getHdfsTimer().start(); |
| |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE_IMMEDIATE].jm_full_name; |
| // Java method returns the cumulative bytes written |
| chunkWriteOffset = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE_IMMEDIATE].methodID, jbArray); |
| if (writeOffset == -1) |
| writeOffset = chunkWriteOffset; |
| |
| if (hdfsStats_ != NULL) { |
| hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); |
| hdfsStats_->incHdfsCalls(); |
| } |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsWrite()"); |
| jenv_->PopLocalFrame(NULL); |
| hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_EXCEPTION; |
| return 0; |
| } |
| lenRemain -= writeLen; |
| offset += writeLen; |
| } while (lenRemain > 0); |
| jenv_->PopLocalFrame(NULL); |
| hdfsClientRetcode = HDFS_CLIENT_OK; |
| return writeOffset; |
| } |
| |
| Int32 HdfsClient::hdfsRead(Int64 pos, const char* data, Int64 len, HDFS_Client_RetCode &hdfsClientRetcode) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsRead(%ld) called.", len); |
| |
| if (initJNIEnv() != JOI_OK) { |
| hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_READ_EXCEPTION; |
| return 0; |
| } |
| jobject j_buf = jenv_->NewDirectByteBuffer((BYTE *)data, len); |
| if (j_buf == NULL) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_READ_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_READ_PARAM; |
| } |
| if (hdfsStats_ != NULL) |
| hdfsStats_->getHdfsTimer().start(); |
| |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_READ].jm_full_name; |
| jint bytesRead = 0; |
| jlong j_pos = pos; |
| bytesRead = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_READ].methodID, j_pos, j_buf); |
| |
| if (hdfsStats_ != NULL) { |
| hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); |
| hdfsStats_->incHdfsCalls(); |
| } |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsRead()"); |
| jenv_->PopLocalFrame(NULL); |
| hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_READ_EXCEPTION; |
| return 0; |
| } |
| jenv_->PopLocalFrame(NULL); |
| hdfsClientRetcode = HDFS_CLIENT_OK; |
| return bytesRead; |
| } |
| |
| HDFS_Client_RetCode HdfsClient::hdfsClose() |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::close() called."); |
| |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION; |
| |
| // String close(); |
| if (hdfsStats_ != NULL) |
| hdfsStats_->getHdfsTimer().start(); |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CLOSE].jm_full_name; |
| jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_CLOSE].methodID); |
| |
| if (hdfsStats_ != NULL) { |
| hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); |
| hdfsStats_->incHdfsCalls(); |
| } |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsClose()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION; |
| } |
| |
| if (jresult == false) |
| { |
| logError(CAT_SQL_HDFS, "HdfsClient::hdfsClose()", getLastError()); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION; |
| } |
| |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_OK; |
| } |
| |
| Int64 HdfsClient::hdfsSize(const char *filename, HDFS_Client_RetCode &hdfsClientRetcode) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsSize() called."); |
| |
| if (initJNIEnv() != JOI_OK) { |
| hdfsClientRetcode = HDFS_CLIENT_ERROR_SIZE_PARAM; |
| return -1; |
| } |
| if (getInstance() == NULL) |
| return HDFS_CLIENT_ERROR_SIZE_PARAM; |
| |
| jstring j_filename = jenv_->NewStringUTF(filename); |
| if (j_filename == NULL) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_SIZE_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_SIZE_PARAM; |
| } |
| |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_SIZE_FOR_FILE].jm_full_name; |
| jlong jresult = jenv_->CallStaticLongMethod(javaClass_, JavaMethods_[JM_HDFS_SIZE_FOR_FILE].methodID, j_filename); |
| |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsSize()"); |
| jenv_->PopLocalFrame(NULL); |
| hdfsClientRetcode = HDFS_CLIENT_ERROR_SIZE_EXCEPTION; |
| return -1; |
| } |
| hdfsClientRetcode = HDFS_CLIENT_OK; |
| return jresult; |
| } |
| |
| HDFS_Client_RetCode HdfsClient::hdfsCleanUnloadPath( const NAString& uldPath) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsCleanUnloadPath(%s) called.", |
| uldPath.data()); |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_CLIENT_ERROR_HDFS_CLEANUP_PARAM; |
| if (getInstance() == NULL) |
| return HDFS_CLIENT_ERROR_HDFS_CLEANUP_PARAM; |
| jstring js_UldPath = jenv_->NewStringUTF(uldPath.data()); |
| if (js_UldPath == NULL) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_CLEANUP_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_CLEANUP_PARAM; |
| } |
| |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CLEAN_UNLOAD_PATH].jm_full_name; |
| jboolean jresult = jenv_->CallStaticBooleanMethod(javaClass_, JavaMethods_[JM_HDFS_CLEAN_UNLOAD_PATH].methodID, js_UldPath); |
| |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsCleanUnloadPath()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_CLEANUP_EXCEPTION; |
| } |
| |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_OK; |
| } |
| |
| HDFS_Client_RetCode HdfsClient::hdfsMergeFiles( const NAString& srcPath, |
| const NAString& dstPath) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsMergeFiles(%s, %s) called.", |
| srcPath.data(), dstPath.data()); |
| |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_PARAM; |
| if (getInstance() == NULL) |
| return HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_PARAM; |
| jstring js_SrcPath = jenv_->NewStringUTF(srcPath.data()); |
| |
| if (js_SrcPath == NULL) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_PARAM; |
| } |
| jstring js_DstPath= jenv_->NewStringUTF(dstPath.data()); |
| if (js_DstPath == NULL) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_PARAM; |
| } |
| |
| |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_MERGE_FILES].jm_full_name; |
| jboolean jresult = jenv_->CallStaticBooleanMethod(javaClass_, JavaMethods_[JM_HDFS_MERGE_FILES].methodID, js_SrcPath, js_DstPath); |
| |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsMergeFiles()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_EXCEPTION; |
| } |
| |
| if (jresult == false) |
| { |
| logError(CAT_SQL_HDFS, "HdfsClient::hdfsMergeFiles()", getLastError()); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_EXCEPTION; |
| } |
| |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_OK; |
| } |
| |
| HDFS_Client_RetCode HdfsClient::hdfsDeletePath( const NAString& delPath) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsDeletePath(%s called.", |
| delPath.data()); |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_PARAM; |
| if (getInstance() == NULL) |
| return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_PARAM; |
| |
| jstring js_delPath = jenv_->NewStringUTF(delPath.data()); |
| if (js_delPath == NULL) { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_PARAM; |
| } |
| |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_DELETE_PATH].jm_full_name; |
| jboolean jresult = jenv_->CallStaticBooleanMethod(javaClass_, JavaMethods_[JM_HDFS_DELETE_PATH].methodID, js_delPath); |
| |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsDeletePath()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_EXCEPTION; |
| } |
| |
| if (jresult == false) |
| { |
| logError(CAT_SQL_HDFS, "HdfsClient::hdfsDeletePath()", getLastError()); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_EXCEPTION; |
| } |
| |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_OK; |
| } |
| |
| HDFS_Client_RetCode HdfsClient::hdfsListDirectory(const char *pathStr, HDFS_FileInfo **hdfsFileInfo, int *numFiles) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsListDirectory(%s) called.", pathStr); |
| |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_CLIENT_ERROR_HDFS_LIST_DIR_PARAM; |
| |
| jstring js_pathStr = jenv_->NewStringUTF(pathStr); |
| if (js_pathStr == NULL) { |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_LIST_DIR_PARAM; |
| } |
| jlong jniObj = (long)this; |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_LIST_DIRECTORY].jm_full_name; |
| |
| jint retNumFiles = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_LIST_DIRECTORY].methodID, |
| js_pathStr, jniObj); |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsListDirectory()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_LIST_DIR_EXCEPTION; |
| } |
| *numFiles = retNumFiles; |
| *hdfsFileInfo = hdfsFileInfo_; |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_OK; |
| } |
| |
| HDFS_Client_RetCode HdfsClient::hdfsExists( const NAString& uldPath, NABoolean & exist) |
| { |
| QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsExists(%s) called.", |
| uldPath.data()); |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_CLIENT_ERROR_HDFS_EXISTS_PARAM; |
| if (getInstance() == NULL) |
| return HDFS_CLIENT_ERROR_HDFS_EXISTS_PARAM; |
| |
| jstring js_UldPath = jenv_->NewStringUTF(uldPath.data()); |
| if (js_UldPath == NULL) { |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_EXISTS_PARAM; |
| } |
| |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_EXISTS].jm_full_name; |
| jboolean jresult = jenv_->CallStaticBooleanMethod(javaClass_, JavaMethods_[JM_HDFS_EXISTS].methodID, js_UldPath); |
| exist = jresult; |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsExists()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HDFS_EXISTS_EXCEPTION; |
| } |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_OK; |
| } |
| |
| HDFS_Client_RetCode HdfsClient::getHiveTableMaxModificationTs( Int64& maxModificationTs, const char * tableDirPaths, int levelDeep) |
| { |
| QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "Enter HDFSClient_JNI::getHiveTableMaxModificationTs(%s) called.",tableDirPaths); |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_CLIENT_ERROR_HIVE_TBL_MAX_MODIFICATION_TS_PARAM; |
| if (getInstance() == NULL) |
| return HDFS_CLIENT_ERROR_HIVE_TBL_MAX_MODIFICATION_TS_PARAM; |
| jstring js_tableDirPaths = jenv_->NewStringUTF(tableDirPaths); |
| if (js_tableDirPaths == NULL) |
| { |
| GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HIVE_TBL_MAX_MODIFICATION_TS_PARAM)); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HIVE_TBL_MAX_MODIFICATION_TS_PARAM; |
| } |
| |
| jint jlevelDeep = levelDeep; |
| tsRecentJMFromJNI = JavaMethods_[JM_HIVE_TBL_MAX_MODIFICATION_TS].jm_full_name; |
| jlong jresult = jenv_->CallStaticLongMethod(javaClass_, |
| JavaMethods_[JM_HIVE_TBL_MAX_MODIFICATION_TS].methodID, |
| js_tableDirPaths, jlevelDeep); |
| jenv_->DeleteLocalRef(js_tableDirPaths); |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::getHiveTableMaxModificationTS()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_HIVE_TBL_MAX_MODIFICATION_TS_EXCEPTION; |
| } |
| QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, |
| "Exit HDFSClient_JNI::getHiveTableMaxModificationTs() called."); |
| maxModificationTs = jresult; |
| jenv_->PopLocalFrame(NULL); |
| |
| return HDFS_CLIENT_OK; |
| } |
| |
| HDFS_Client_RetCode HdfsClient::getFsDefaultName(char* buf, int buf_len) |
| { |
| QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "Enter HDFSClient_JNI::getFsDefaultName() called."); |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_CLIENT_ERROR_GET_FS_DEFAULT_NAME_PARAM; |
| if (getInstance() == NULL) |
| return HDFS_CLIENT_ERROR_GET_FS_DEFAULT_NAME_PARAM; |
| |
| tsRecentJMFromJNI = JavaMethods_[JM_GET_FS_DEFAULT_NAME].jm_full_name; |
| jstring jresult = |
| (jstring)jenv_->CallStaticObjectMethod(javaClass_, |
| JavaMethods_[JM_GET_FS_DEFAULT_NAME].methodID); |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::getFsDefaultName()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_GET_FS_DEFAULT_NAME_EXCEPTION; |
| } |
| const char* char_result = jenv_->GetStringUTFChars(jresult, 0); |
| |
| HDFS_Client_RetCode retcode = HDFS_CLIENT_OK; |
| if ( buf_len >= strlen(char_result) ) { |
| strcpy(buf, char_result); |
| } else |
| retcode = HDFS_CLIENT_ERROR_GET_FS_DEFAULT_NAME_BUFFER_TOO_SMALL; |
| |
| jenv_->ReleaseStringUTFChars(jresult, char_result); |
| jenv_->PopLocalFrame(NULL); |
| |
| return retcode; |
| } |
| |
| HDFS_Client_RetCode HdfsClient::hdfsCreateDirectory(const NAString &dirName) |
| { |
| QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "Enter HDFSClient_JNI::createDirectory() called."); |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_CLIENT_ERROR_CREATE_DIRECTORY_PARAM; |
| if (getInstance() == NULL) |
| return HDFS_CLIENT_ERROR_CREATE_DIRECTORY_PARAM; |
| |
| jstring js_dirName = jenv_->NewStringUTF(dirName.data()); |
| if (js_dirName == NULL) { |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_CREATE_DIRECTORY_PARAM; |
| } |
| |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CREATE_DIRECTORY].jm_full_name; |
| jstring jresult = |
| (jstring)jenv_->CallStaticObjectMethod(javaClass_, |
| JavaMethods_[JM_HDFS_CREATE_DIRECTORY].methodID, js_dirName); |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsCreateDirectory()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_CREATE_DIRECTORY_EXCEPTION; |
| } |
| if (jresult == false) |
| { |
| logError(CAT_SQL_HDFS, "HdfsClient::hdfsCreateDirectory()", getLastError()); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_CREATE_DIRECTORY_EXCEPTION; |
| } |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_OK; |
| } |
| |
| HDFS_Client_RetCode HdfsClient::hdfsRename(const NAString &fromPath, const NAString &toPath) |
| { |
| QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "Enter HDFSClient_JNI::hdfsRename() called."); |
| if (initJNIEnv() != JOI_OK) |
| return HDFS_CLIENT_ERROR_RENAME_PARAM; |
| if (getInstance() == NULL) |
| return HDFS_CLIENT_ERROR_RENAME_PARAM; |
| |
| jstring js_fromPath = jenv_->NewStringUTF(fromPath.data()); |
| if (js_fromPath == NULL) { |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_RENAME_PARAM; |
| } |
| |
| jstring js_toPath = jenv_->NewStringUTF(toPath.data()); |
| if (js_toPath == NULL) { |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_RENAME_PARAM; |
| } |
| |
| tsRecentJMFromJNI = JavaMethods_[JM_HDFS_RENAME].jm_full_name; |
| jstring jresult = |
| (jstring)jenv_->CallStaticObjectMethod(javaClass_, |
| JavaMethods_[JM_HDFS_RENAME].methodID, js_fromPath, js_toPath); |
| if (jenv_->ExceptionCheck()) |
| { |
| getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsRename()"); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_RENAME_EXCEPTION; |
| } |
| if (jresult == false) |
| { |
| logError(CAT_SQL_HDFS, "HdfsClient::hdfsRename()", getLastError()); |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_ERROR_RENAME_EXCEPTION; |
| } |
| jenv_->PopLocalFrame(NULL); |
| return HDFS_CLIENT_OK; |
| } |
| |
| HDFS_Client_RetCode HdfsClient::setHdfsFileInfo(JNIEnv *jenv, jint numFiles, jint fileNo, jboolean isDir, |
| jstring filename, jlong modTime, jlong len, jshort numReplicas, jlong blockSize, |
| jstring owner, jstring group, jshort permissions, jlong accessTime) |
| { |
| HDFS_FileInfo *hdfsFileInfo; |
| |
| if (fileNo == 0 && hdfsFileInfo_ != NULL) |
| deleteHdfsFileInfo(); |
| |
| if (hdfsFileInfo_ == NULL) { |
| hdfsFileInfo_ = new (getHeap()) HDFS_FileInfo[numFiles]; |
| numFiles_ = numFiles; |
| } |
| |
| if (fileNo >= numFiles_) |
| return HDFS_CLIENT_ERROR_SET_HDFSFILEINFO; |
| hdfsFileInfo = &hdfsFileInfo_[fileNo]; |
| if (isDir) |
| hdfsFileInfo->mKind = HDFS_DIRECTORY_KIND; |
| else |
| hdfsFileInfo->mKind = HDFS_FILE_KIND; |
| hdfsFileInfo->mLastMod = modTime; |
| hdfsFileInfo->mSize = len; |
| hdfsFileInfo->mReplication = numReplicas; |
| hdfsFileInfo->mBlockSize = blockSize; |
| hdfsFileInfo->mPermissions = permissions; |
| hdfsFileInfo->mLastAccess = accessTime; |
| jint tempLen = jenv->GetStringUTFLength(filename); |
| hdfsFileInfo->mName = new (getHeap()) char[tempLen+1]; |
| const char *temp = jenv->GetStringUTFChars(filename, NULL); |
| strncpy(hdfsFileInfo->mName, temp, tempLen); |
| hdfsFileInfo->mName[tempLen] = '\0'; |
| jenv_->ReleaseStringUTFChars(filename, temp); |
| tempLen = jenv->GetStringUTFLength(owner); |
| hdfsFileInfo->mOwner = new (getHeap()) char[tempLen+1]; |
| temp = jenv->GetStringUTFChars(owner, NULL); |
| strncpy(hdfsFileInfo->mOwner, temp, tempLen); |
| hdfsFileInfo->mOwner[tempLen] = '\0'; |
| jenv_->ReleaseStringUTFChars(owner, temp); |
| tempLen = jenv->GetStringUTFLength(group); |
| hdfsFileInfo->mGroup = new (getHeap()) char[tempLen+1]; |
| temp = jenv->GetStringUTFChars(group, NULL); |
| strncpy(hdfsFileInfo->mGroup, temp, tempLen); |
| hdfsFileInfo->mGroup[tempLen] = '\0'; |
| jenv_->ReleaseStringUTFChars(group, temp); |
| return HDFS_CLIENT_OK; |
| } |
| |
| |
| #ifdef __cplusplus |
| extern "C" { |
| #endif |
| |
| jint JNICALL Java_org_trafodion_sql_HDFSClient_sendFileStatus |
| (JNIEnv *jenv, jobject j_obj, jlong hdfsClientJniObj, jint numFiles, jint fileNo, jboolean isDir, |
| jstring filename, jlong modTime, jlong len, jshort numReplicas, jlong blockSize, |
| jstring owner, jstring group, jshort permissions, jlong accessTime) |
| { |
| HDFS_Client_RetCode retcode; |
| HdfsClient *hdfsClient = (HdfsClient *)hdfsClientJniObj; |
| retcode = hdfsClient->setHdfsFileInfo(jenv, numFiles, fileNo, isDir, filename, modTime, len, numReplicas, blockSize, owner, |
| group, permissions, accessTime); |
| return (jint) retcode; |
| } |
| |
| JNIEXPORT jint JNICALL Java_org_trafodion_sql_HDFSClient_copyToByteBuffer |
| (JNIEnv *jenv, jobject j_obj, jobject j_buf, jint offset, jbyteArray j_bufArray, jint copyLen) |
| { |
| void *bufBacking; |
| |
| bufBacking = jenv->GetDirectBufferAddress(j_buf); |
| if (bufBacking == NULL) |
| return -1; |
| jlong capacity = jenv->GetDirectBufferCapacity(j_buf); |
| jbyte *byteBufferAddr = (jbyte *)bufBacking + offset; |
| if ((offset + copyLen) > capacity) |
| return -2; |
| jenv->GetByteArrayRegion(j_bufArray, 0, copyLen, byteBufferAddr); |
| return 0; |
| } |
| |
| #ifdef __cplusplus |
| } |
| #endif |