blob: ecb217cfccf2057aff2ab7147d7ee71b8ee77abd [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include "org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h"
#include "lib/commons.h"
#include "jni_md.h"
#include "lib/jniutils.h"
#include "BatchHandler.h"
#include "lib/NativeObjectFactory.h"
// NativeBatchProcessor jni util methods
static jfieldID InputBufferFieldID = NULL;
static jfieldID OutputBufferFieldID = NULL;
static jmethodID FlushOutputMethodID = NULL;
static jmethodID FinishOutputMethodID = NULL;
static jmethodID SendCommandToJavaMethodID = NULL;
// BatchHandler methods
namespace NativeTask {
ReadWriteBuffer * JNU_ByteArraytoReadWriteBuffer(JNIEnv * jenv, jbyteArray src) {
if (NULL == src) {
return NULL;
jsize len = jenv->GetArrayLength(src);
ReadWriteBuffer * ret = new ReadWriteBuffer(len);
jenv->GetByteArrayRegion(src, 0, len, (jbyte*)ret->getBuff());
return ret;
jbyteArray JNU_ReadWriteBufferToByteArray(JNIEnv * jenv, ReadWriteBuffer * result) {
if (NULL == result || result->getWritePoint() == 0) {
return NULL;
jbyteArray ret = jenv->NewByteArray(result->getWritePoint());
jenv->SetByteArrayRegion(ret, 0, result->getWritePoint(), (jbyte*)result->getBuff());
return ret;
: _processor(NULL), _config(NULL) {
BatchHandler::~BatchHandler() {
if (NULL != _config) {
delete _config;
_config = NULL;
void BatchHandler::releaseProcessor() {
if (_processor != NULL) {
JNIEnv * env = JNU_GetJNIEnv();
_processor = NULL;
void BatchHandler::onInputData(uint32_t length) {
_in.rewind(0, length);
void BatchHandler::flushOutput() {
if (NULL == _out.base()) {
uint32_t length = _out.position();
if (length == 0) {
JNIEnv * env = JNU_GetJNIEnv();
env->CallVoidMethod((jobject)_processor, FlushOutputMethodID, (jint)length);
if (env->ExceptionCheck()) {
THROW_EXCEPTION(JavaException, "FlushOutput throw exception");
void BatchHandler::finishOutput() {
if (NULL == _out.base()) {
JNIEnv * env = JNU_GetJNIEnv();
env->CallVoidMethod((jobject)_processor, FinishOutputMethodID);
if (env->ExceptionCheck()) {
THROW_EXCEPTION(JavaException, "FinishOutput throw exception");
void BatchHandler::onSetup(Config * config, char * inputBuffer, uint32_t inputBufferCapacity,
char * outputBuffer, uint32_t outputBufferCapacity) {
this->_config = config;
_in.reset(inputBuffer, inputBufferCapacity);
if (NULL != outputBuffer) {
if (outputBufferCapacity <= 1024) {
THROW_EXCEPTION(IOException, "Output buffer size too small for BatchHandler");
_out.reset(outputBuffer, outputBufferCapacity);
_out.rewind(0, outputBufferCapacity);
LOG("[BatchHandler::onSetup] input Capacity %d, output capacity %d",
inputBufferCapacity, _out.limit());
ResultBuffer * BatchHandler::call(const Command& cmd, ParameterBuffer * param) {
JNIEnv * env = JNU_GetJNIEnv();
jbyteArray jcmdData = JNU_ReadWriteBufferToByteArray(env, param);
jbyteArray ret = (jbyteArray)env->CallObjectMethod((jobject)_processor, SendCommandToJavaMethodID,, jcmdData);
if (env->ExceptionCheck()) {
THROW_EXCEPTION(JavaException, "SendCommandToJava throw exception");
return JNU_ByteArraytoReadWriteBuffer(env, ret);
} // namespace NativeTask
// NativeBatchProcessor jni methods
using namespace NativeTask;
* Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
* Method: setupHandler
* Signature: (J)V
void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_setupHandler(
JNIEnv * jenv, jobject processor, jlong handler, jobjectArray configs) {
try {
NativeTask::Config * config = new NativeTask::Config();
jsize len = jenv->GetArrayLength(configs);
for (jsize i = 0; i + 1 < len; i += 2) {
jbyteArray key_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i);
jbyteArray val_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i + 1);
config->set(JNU_ByteArrayToString(jenv, key_obj), JNU_ByteArrayToString(jenv, val_obj));
NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
if (NULL == batchHandler) {
JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException", "BatchHandler is null");
jobject jinputBuffer = jenv->GetObjectField(processor, InputBufferFieldID);
char * inputBufferAddr = NULL;
uint32_t inputBufferCapacity = 0;
if (NULL != jinputBuffer) {
inputBufferAddr = (char*)(jenv->GetDirectBufferAddress(jinputBuffer));
inputBufferCapacity = jenv->GetDirectBufferCapacity(jinputBuffer);
jobject joutputBuffer = jenv->GetObjectField(processor, OutputBufferFieldID);
char * outputBufferAddr = NULL;
uint32_t outputBufferCapacity = 0;
if (NULL != joutputBuffer) {
outputBufferAddr = (char*)(jenv->GetDirectBufferAddress(joutputBuffer));
outputBufferCapacity = jenv->GetDirectBufferCapacity(joutputBuffer);
batchHandler->onSetup(config, inputBufferAddr, inputBufferCapacity, outputBufferAddr,
} catch (NativeTask::UnsupportException & e) {
JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
} catch (NativeTask::OutOfMemoryException & e) {
JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
} catch (NativeTask::IOException & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (NativeTask::JavaException & e) {
LOG("JavaException: %s", e.what());
// Do nothing, let java side handle
} catch (std::exception & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (...) {
JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
* Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
* Method: nativeProcessInput
* Signature: (JI)V
void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeProcessInput(
JNIEnv * jenv, jobject processor, jlong handler, jint length) {
try {
NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
if (NULL == batchHandler) {
JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
"handler not instance of BatchHandler");
} catch (NativeTask::UnsupportException & e) {
JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
} catch (NativeTask::OutOfMemoryException & e) {
JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
} catch (NativeTask::IOException & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (NativeTask::JavaException & e) {
LOG("JavaException: %s", e.what());
// Do nothing, let java side handle
} catch (std::exception & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (...) {
JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
* Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
* Method: nativeFinish
* Signature: (J)V
void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeFinish(
JNIEnv * jenv, jobject processor, jlong handler) {
try {
NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
if (NULL == batchHandler) {
JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
"handler not instance of BatchHandler");
} catch (NativeTask::UnsupportException & e) {
JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
} catch (NativeTask::OutOfMemoryException & e) {
JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
} catch (NativeTask::IOException & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (NativeTask::JavaException & e) {
LOG("JavaException: %s", e.what());
// Do nothing, let java side handle
} catch (std::exception & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (...) {
JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeLoadData(
JNIEnv * jenv, jobject processor, jlong handler) {
try {
NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
if (NULL == batchHandler) {
JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
"handler not instance of BatchHandler");
} catch (NativeTask::UnsupportException & e) {
JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
} catch (NativeTask::OutOfMemoryException & e) {
JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
} catch (NativeTask::IOException & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (NativeTask::JavaException & e) {
LOG("JavaException: %s", e.what());
// Do nothing, let java side handle
} catch (std::exception & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (...) {
JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
* Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor
* Method: nativeCommand
* Signature: (J[B)[B
jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeCommand(
JNIEnv * jenv, jobject processor, jlong handler, jint command, jbyteArray cmdData) {
try {
NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler);
if (NULL == batchHandler) {
JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
"handler not instance of BatchHandler");
return NULL;
Command cmd(command);
ParameterBuffer * param = JNU_ByteArraytoReadWriteBuffer(jenv, cmdData);
ResultBuffer * result = batchHandler->onCall(cmd, param);
jbyteArray ret = JNU_ReadWriteBufferToByteArray(jenv, result);
delete result;
delete param;
return ret;
} catch (NativeTask::UnsupportException & e) {
JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
} catch (NativeTask::OutOfMemoryException & e) {
JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
} catch (NativeTask::IOException & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (const NativeTask::JavaException & e) {
LOG("JavaException: %s", e.what());
// Do nothing, let java side handle
} catch (std::exception & e) {
JNU_ThrowByName(jenv, "java/io/IOException", e.what());
} catch (...) {
JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
return NULL;
* Class: org_apace_hadoop_mapred_nativetask_NativeBatchProcessor
* Method: InitIDs
* Signature: ()V
void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_InitIDs(JNIEnv * jenv,
jclass processorClass) {
InputBufferFieldID = jenv->GetFieldID(processorClass, "rawOutputBuffer", "Ljava/nio/ByteBuffer;");
OutputBufferFieldID = jenv->GetFieldID(processorClass, "rawInputBuffer", "Ljava/nio/ByteBuffer;");
FlushOutputMethodID = jenv->GetMethodID(processorClass, "flushOutput", "(I)V");
FinishOutputMethodID = jenv->GetMethodID(processorClass, "finishOutput", "()V");
SendCommandToJavaMethodID = jenv->GetMethodID(processorClass, "sendCommandToJava", "(I[B)[B");