blob: ecb217cfccf2057aff2ab7147d7ee71b8ee77abd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef QUICK_BUILD
#include "org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h"
#endif
#include "lib/commons.h"
#include "jni_md.h"
#include "lib/jniutils.h"
#include "BatchHandler.h"
#include "lib/NativeObjectFactory.h"
///////////////////////////////////////////////////////////////
// NativeBatchProcessor jni util methods
///////////////////////////////////////////////////////////////
static jfieldID InputBufferFieldID = NULL;
static jfieldID OutputBufferFieldID = NULL;
static jmethodID FlushOutputMethodID = NULL;
static jmethodID FinishOutputMethodID = NULL;
static jmethodID SendCommandToJavaMethodID = NULL;
///////////////////////////////////////////////////////////////
// BatchHandler methods
///////////////////////////////////////////////////////////////
namespace NativeTask {
ReadWriteBuffer * JNU_ByteArraytoReadWriteBuffer(JNIEnv * jenv, jbyteArray src) {
if (NULL == src) {
return NULL;
}
jsize len = jenv->GetArrayLength(src);
ReadWriteBuffer * ret = new ReadWriteBuffer(len);
jenv->GetByteArrayRegion(src, 0, len, (jbyte*)ret->getBuff());
ret->setWritePoint(len);
return ret;
}
jbyteArray JNU_ReadWriteBufferToByteArray(JNIEnv * jenv, ReadWriteBuffer * result) {
if (NULL == result || result->getWritePoint() == 0) {
return NULL;
}
jbyteArray ret = jenv->NewByteArray(result->getWritePoint());
jenv->SetByteArrayRegion(ret, 0, result->getWritePoint(), (jbyte*)result->getBuff());
return ret;
}
BatchHandler::BatchHandler()
: _processor(NULL), _config(NULL) {
}
BatchHandler::~BatchHandler() {
releaseProcessor();
if (NULL != _config) {
delete _config;
_config = NULL;
}
}
void BatchHandler::releaseProcessor() {
if (_processor != NULL) {
JNIEnv * env = JNU_GetJNIEnv();
env->DeleteGlobalRef((jobject)_processor);
_processor = NULL;
}
}
void BatchHandler::onInputData(uint32_t length) {
_in.rewind(0, length);
handleInput(_in);
}
void BatchHandler::flushOutput() {
if (NULL == _out.base()) {
return;
}
uint32_t length = _out.position();
_out.position(0);
if (length == 0) {
return;
}
JNIEnv * env = JNU_GetJNIEnv();
env->CallVoidMethod((jobject)_processor, FlushOutputMethodID, (jint)length);
if (env->ExceptionCheck()) {
THROW_EXCEPTION(JavaException, "FlushOutput throw exception");
}
}
void BatchHandler::finishOutput() {
if (NULL == _out.base()) {
return;
}
JNIEnv * env = JNU_GetJNIEnv();
env->CallVoidMethod((jobject)_processor, FinishOutputMethodID);
if (env->ExceptionCheck()) {
THROW_EXCEPTION(JavaException, "FinishOutput throw exception");
}
}
void BatchHandler::onSetup(Config * config, char * inputBuffer, uint32_t inputBufferCapacity,
char * outputBuffer, uint32_t outputBufferCapacity) {
this->_config = config;
_in.reset(inputBuffer, inputBufferCapacity);
if (NULL != outputBuffer) {
if (outputBufferCapacity <= 1024) {
THROW_EXCEPTION(IOException, "Output buffer size too small for BatchHandler");
}
_out.reset(outputBuffer, outputBufferCapacity);
_out.rewind(0, outputBufferCapacity);
LOG("[BatchHandler::onSetup] input Capacity %d, output capacity %d",
inputBufferCapacity, _out.limit());
}
configure(_config);
}
ResultBuffer * BatchHandler::call(const Command& cmd, ParameterBuffer * param) {
JNIEnv * env = JNU_GetJNIEnv();
jbyteArray jcmdData = JNU_ReadWriteBufferToByteArray(env, param);
jbyteArray ret = (jbyteArray)env->CallObjectMethod((jobject)_processor, SendCommandToJavaMethodID,
cmd.id(), jcmdData);
if (env->ExceptionCheck()) {
THROW_EXCEPTION(JavaException, "SendCommandToJava throw exception");
}
return JNU_ByteArraytoReadWriteBuffer(env, ret);
}
} // namespace NativeTask
///////////////////////////////////////////////////////////////
// NativeBatchProcessor jni methods
///////////////////////////////////////////////////////////////
using namespace NativeTask;
/*
* Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
* Method: setupHandler
* Signature: (J)V
*/
void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_setupHandler(
JNIEnv * jenv, jobject processor, jlong handler, jobjectArray configs) {
try {
NativeTask::Config * config = new NativeTask::Config();
jsize len = jenv->GetArrayLength(configs);
for (jsize i = 0; i + 1 < len; i += 2) {
jbyteArray key_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i);
jbyteArray val_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i + 1);
config->set(JNU_ByteArrayToString(jenv, key_obj), JNU_ByteArrayToString(jenv, val_obj));
}
NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
if (NULL == batchHandler) {
JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException", "BatchHandler is null");
return;
}
jobject jinputBuffer = jenv->GetObjectField(processor, InputBufferFieldID);
char * inputBufferAddr = NULL;
uint32_t inputBufferCapacity = 0;
if (NULL != jinputBuffer) {
inputBufferAddr = (char*)(jenv->GetDirectBufferAddress(jinputBuffer));
inputBufferCapacity = jenv->GetDirectBufferCapacity(jinputBuffer);
}
jobject joutputBuffer = jenv->GetObjectField(processor, OutputBufferFieldID);
char * outputBufferAddr = NULL;
uint32_t outputBufferCapacity = 0;
if (NULL != joutputBuffer) {
outputBufferAddr = (char*)(jenv->GetDirectBufferAddress(joutputBuffer));
outputBufferCapacity = jenv->GetDirectBufferCapacity(joutputBuffer);
}
batchHandler->setProcessor(jenv->NewGlobalRef(processor));
batchHandler->onSetup(config, inputBufferAddr, inputBufferCapacity, outputBufferAddr,
outputBufferCapacity);
} catch (NativeTask::UnsupportException & e) {
JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
} catch (NativeTask::OutOfMemoryException & e) {
JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
} catch (NativeTask::IOException & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (NativeTask::JavaException & e) {
LOG("JavaException: %s", e.what());
// Do nothing, let java side handle
} catch (std::exception & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (...) {
JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
}
}
/*
* Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
* Method: nativeProcessInput
* Signature: (JI)V
*/
void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeProcessInput(
JNIEnv * jenv, jobject processor, jlong handler, jint length) {
try {
NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
if (NULL == batchHandler) {
JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
"handler not instance of BatchHandler");
return;
}
batchHandler->onInputData(length);
} catch (NativeTask::UnsupportException & e) {
JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
} catch (NativeTask::OutOfMemoryException & e) {
JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
} catch (NativeTask::IOException & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (NativeTask::JavaException & e) {
LOG("JavaException: %s", e.what());
// Do nothing, let java side handle
} catch (std::exception & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (...) {
JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
}
}
/*
* Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
* Method: nativeFinish
* Signature: (J)V
*/
void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeFinish(
JNIEnv * jenv, jobject processor, jlong handler) {
try {
NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
if (NULL == batchHandler) {
JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
"handler not instance of BatchHandler");
return;
}
batchHandler->onFinish();
} catch (NativeTask::UnsupportException & e) {
JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
} catch (NativeTask::OutOfMemoryException & e) {
JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
} catch (NativeTask::IOException & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (NativeTask::JavaException & e) {
LOG("JavaException: %s", e.what());
// Do nothing, let java side handle
} catch (std::exception & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (...) {
JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
}
}
void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeLoadData(
JNIEnv * jenv, jobject processor, jlong handler) {
try {
NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
if (NULL == batchHandler) {
JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
"handler not instance of BatchHandler");
return;
}
batchHandler->onLoadData();
} catch (NativeTask::UnsupportException & e) {
JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
} catch (NativeTask::OutOfMemoryException & e) {
JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
} catch (NativeTask::IOException & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (NativeTask::JavaException & e) {
LOG("JavaException: %s", e.what());
// Do nothing, let java side handle
} catch (std::exception & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (...) {
JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
}
}
/*
* Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
* Method: nativeCommand
* Signature: (J[B)[B
*/
jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeCommand(
JNIEnv * jenv, jobject processor, jlong handler, jint command, jbyteArray cmdData) {
try {
NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
if (NULL == batchHandler) {
JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
"handler not instance of BatchHandler");
return NULL;
}
Command cmd(command);
ParameterBuffer * param = JNU_ByteArraytoReadWriteBuffer(jenv, cmdData);
ResultBuffer * result = batchHandler->onCall(cmd, param);
jbyteArray ret = JNU_ReadWriteBufferToByteArray(jenv, result);
delete result;
delete param;
return ret;
} catch (NativeTask::UnsupportException & e) {
JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
} catch (NativeTask::OutOfMemoryException & e) {
JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
} catch (NativeTask::IOException & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (const NativeTask::JavaException & e) {
LOG("JavaException: %s", e.what());
// Do nothing, let java side handle
} catch (std::exception & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (...) {
JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
}
return NULL;
}
/*
* Class: org_apace_hadoop_mapred_nativetask_NativeBatchProcessor
* Method: InitIDs
* Signature: ()V
*/
void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_InitIDs(JNIEnv * jenv,
jclass processorClass) {
InputBufferFieldID = jenv->GetFieldID(processorClass, "rawOutputBuffer", "Ljava/nio/ByteBuffer;");
OutputBufferFieldID = jenv->GetFieldID(processorClass, "rawInputBuffer", "Ljava/nio/ByteBuffer;");
FlushOutputMethodID = jenv->GetMethodID(processorClass, "flushOutput", "(I)V");
FinishOutputMethodID = jenv->GetMethodID(processorClass, "finishOutput", "()V");
SendCommandToJavaMethodID = jenv->GetMethodID(processorClass, "sendCommandToJava", "(I[B)[B");
}