blob: f16b3da7854a953e3b5797d433505d7f15e7a2e8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <mesos/executor.hpp>
#include "jvm/jvm.hpp"
#include "construct.hpp"
#include "convert.hpp"
#include "org_apache_mesos_MesosExecutorDriver.h"
using namespace mesos;
using std::string;
class JNIExecutor : public Executor
{
public:
JNIExecutor(JNIEnv* _env, jweak _jdriver)
: jvm(nullptr), env(_env), jdriver(_jdriver)
{
env->GetJavaVM(&jvm);
}
~JNIExecutor() override {}
void registered(ExecutorDriver* driver,
const ExecutorInfo& executorInfo,
const FrameworkInfo& frameworkInfo,
const SlaveInfo& slaveInfo) override;
void reregistered(
ExecutorDriver* driver, const SlaveInfo& slaveInfo) override;
void disconnected(ExecutorDriver* driver) override;
void launchTask(ExecutorDriver* driver, const TaskInfo& task) override;
void killTask(ExecutorDriver* driver, const TaskID& taskId) override;
void frameworkMessage(ExecutorDriver* driver, const string& data) override;
void shutdown(ExecutorDriver* driver) override;
void error(ExecutorDriver* driver, const string& message) override;
JavaVM* jvm;
JNIEnv* env;
jweak jdriver;
};
void JNIExecutor::registered(ExecutorDriver* driver,
const ExecutorInfo& executorInfo,
const FrameworkInfo& frameworkInfo,
const SlaveInfo& slaveInfo)
{
jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
jclass clazz = env->GetObjectClass(jdriver);
jfieldID executor =
env->GetFieldID(clazz, "executor", "Lorg/apache/mesos/Executor;");
jobject jexecutor = env->GetObjectField(jdriver, executor);
clazz = env->GetObjectClass(jexecutor);
// executor.registered(driver);
jmethodID registered =
env->GetMethodID(clazz, "registered",
"(Lorg/apache/mesos/ExecutorDriver;"
"Lorg/apache/mesos/Protos$ExecutorInfo;"
"Lorg/apache/mesos/Protos$FrameworkInfo;"
"Lorg/apache/mesos/Protos$SlaveInfo;)V");
jobject jexecutorInfo = convert<ExecutorInfo>(env, executorInfo);
jobject jframeworkInfo = convert<FrameworkInfo>(env, frameworkInfo);
jobject jslaveInfo = convert<SlaveInfo>(env, slaveInfo);
env->ExceptionClear();
env->CallVoidMethod(jexecutor, registered, jdriver, jexecutorInfo,
jframeworkInfo, jslaveInfo);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
jvm->DetachCurrentThread();
driver->abort();
return;
}
jvm->DetachCurrentThread();
}
void JNIExecutor::reregistered(ExecutorDriver* driver,
const SlaveInfo& slaveInfo)
{
jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
jclass clazz = env->GetObjectClass(jdriver);
jfieldID executor =
env->GetFieldID(clazz, "executor", "Lorg/apache/mesos/Executor;");
jobject jexecutor = env->GetObjectField(jdriver, executor);
clazz = env->GetObjectClass(jexecutor);
// executor.reregistered(driver, slaveInfo);
jmethodID reregistered =
env->GetMethodID(clazz, "reregistered",
"(Lorg/apache/mesos/ExecutorDriver;"
"Lorg/apache/mesos/Protos$SlaveInfo;)V");
jobject jslaveInfo = convert<SlaveInfo>(env, slaveInfo);
env->ExceptionClear();
env->CallVoidMethod(jexecutor, reregistered, jdriver, jslaveInfo);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
jvm->DetachCurrentThread();
driver->abort();
return;
}
jvm->DetachCurrentThread();
}
void JNIExecutor::disconnected(ExecutorDriver* driver)
{
jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
jclass clazz = env->GetObjectClass(jdriver);
jfieldID executor =
env->GetFieldID(clazz, "executor", "Lorg/apache/mesos/Executor;");
jobject jexecutor = env->GetObjectField(jdriver, executor);
clazz = env->GetObjectClass(jexecutor);
// executor.disconnected(driver);
jmethodID disconnected =
env->GetMethodID(clazz, "disconnected",
"(Lorg/apache/mesos/ExecutorDriver;)V");
env->ExceptionClear();
env->CallVoidMethod(jexecutor, disconnected, jdriver);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
jvm->DetachCurrentThread();
driver->abort();
return;
}
jvm->DetachCurrentThread();
}
void JNIExecutor::launchTask(ExecutorDriver* driver, const TaskInfo& desc)
{
jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
jclass clazz = env->GetObjectClass(jdriver);
jfieldID executor =
env->GetFieldID(clazz, "executor", "Lorg/apache/mesos/Executor;");
jobject jexecutor = env->GetObjectField(jdriver, executor);
clazz = env->GetObjectClass(jexecutor);
// executor.launchTask(driver, desc);
jmethodID launchTask = env->GetMethodID(
clazz,
"launchTask",
"(Lorg/apache/mesos/ExecutorDriver;"
"Lorg/apache/mesos/Protos$TaskInfo;)V");
jobject jdesc = convert<TaskInfo>(env, desc);
env->ExceptionClear();
env->CallVoidMethod(jexecutor, launchTask, jdriver, jdesc);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
jvm->DetachCurrentThread();
driver->abort();
return;
}
jvm->DetachCurrentThread();
}
void JNIExecutor::killTask(ExecutorDriver* driver, const TaskID& taskId)
{
jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
jclass clazz = env->GetObjectClass(jdriver);
jfieldID executor =
env->GetFieldID(clazz, "executor", "Lorg/apache/mesos/Executor;");
jobject jexecutor = env->GetObjectField(jdriver, executor);
clazz = env->GetObjectClass(jexecutor);
// executor.killTask(driver, taskId);
jmethodID killTask = env->GetMethodID(
clazz,
"killTask",
"(Lorg/apache/mesos/ExecutorDriver;"
"Lorg/apache/mesos/Protos$TaskID;)V");
jobject jtaskId = convert<TaskID>(env, taskId);
env->ExceptionClear();
env->CallVoidMethod(jexecutor, killTask, jdriver, jtaskId);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
jvm->DetachCurrentThread();
driver->abort();
return;
}
jvm->DetachCurrentThread();
}
void JNIExecutor::frameworkMessage(ExecutorDriver* driver, const string& data)
{
jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
jclass clazz = env->GetObjectClass(jdriver);
jfieldID executor =
env->GetFieldID(clazz, "executor", "Lorg/apache/mesos/Executor;");
jobject jexecutor = env->GetObjectField(jdriver, executor);
clazz = env->GetObjectClass(jexecutor);
// executor.frameworkMessage(driver, data);
jmethodID frameworkMessage = env->GetMethodID(
clazz,
"frameworkMessage",
"(Lorg/apache/mesos/ExecutorDriver;"
"[B)V");
// byte[] data = ..;
jbyteArray jdata = env->NewByteArray(data.size());
env->SetByteArrayRegion(jdata, 0, data.size(), (jbyte*) data.data());
env->ExceptionClear();
env->CallVoidMethod(jexecutor, frameworkMessage, jdriver, jdata);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
jvm->DetachCurrentThread();
driver->abort();
return;
}
jvm->DetachCurrentThread();
}
void JNIExecutor::shutdown(ExecutorDriver* driver)
{
jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
jclass clazz = env->GetObjectClass(jdriver);
jfieldID executor =
env->GetFieldID(clazz, "executor", "Lorg/apache/mesos/Executor;");
jobject jexecutor = env->GetObjectField(jdriver, executor);
clazz = env->GetObjectClass(jexecutor);
// executor.shutdown(driver);
jmethodID shutdown =
env->GetMethodID(clazz, "shutdown", "(Lorg/apache/mesos/ExecutorDriver;)V");
env->ExceptionClear();
env->CallVoidMethod(jexecutor, shutdown, jdriver);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
jvm->DetachCurrentThread();
driver->abort();
return;
}
jvm->DetachCurrentThread();
}
void JNIExecutor::error(ExecutorDriver* driver, const string& message)
{
jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
jclass clazz = env->GetObjectClass(jdriver);
jfieldID executor =
env->GetFieldID(clazz, "executor", "Lorg/apache/mesos/Executor;");
jobject jexecutor = env->GetObjectField(jdriver, executor);
clazz = env->GetObjectClass(jexecutor);
// executor.error(driver, message);
jmethodID error = env->GetMethodID(
clazz,
"error",
"(Lorg/apache/mesos/ExecutorDriver;"
"Ljava/lang/String;)V");
jobject jmessage = convert<string>(env, message);
env->ExceptionClear();
env->CallVoidMethod(jexecutor, error, jdriver, jmessage);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
jvm->DetachCurrentThread();
driver->abort();
return;
}
jvm->DetachCurrentThread();
}
extern "C" {
/*
* Class: org_apache_mesos_MesosExecutorDriver
* Method: initialize
* Signature: ()V
*/
JNIEXPORT void JNICALL Java_org_apache_mesos_MesosExecutorDriver_initialize
(JNIEnv* env, jobject thiz)
{
jclass clazz = env->GetObjectClass(thiz);
// Create a weak global reference to the MesosExecutorDriver
// instance (we want a global reference so the GC doesn't collect
// the instance but we make it weak so the JVM can exit).
jweak jdriver = env->NewWeakGlobalRef(thiz);
// Create the C++ executor and initialize the __executor variable.
JNIExecutor* executor = new JNIExecutor(env, jdriver);
jfieldID __executor = env->GetFieldID(clazz, "__executor", "J");
env->SetLongField(thiz, __executor, (jlong) executor);
// Create the C++ driver and initialize the __driver variable.
MesosExecutorDriver* driver = new MesosExecutorDriver(executor);
jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
env->SetLongField(thiz, __driver, (jlong) driver);
}
/*
* Class: org_apache_mesos_MesosExecutorDriver
* Method: finalize
* Signature: ()V
*/
JNIEXPORT void JNICALL Java_org_apache_mesos_MesosExecutorDriver_finalize
(JNIEnv* env, jobject thiz)
{
jclass clazz = env->GetObjectClass(thiz);
jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
MesosExecutorDriver* driver =
(MesosExecutorDriver*) env->GetLongField(thiz, __driver);
// Note that we DO NOT want to call 'abort' or 'stop' as this may be
// misinterpreted by the executor. It is possible, however, that
// since we haven't called 'abort' or 'stop' there are still threads
// executing within the executor callbacks but the
// MesosExecutorDriver destructor will wait until this is not the
// case before returning.
delete driver;
jfieldID __executor = env->GetFieldID(clazz, "__executor", "J");
JNIExecutor* executor = (JNIExecutor*) env->GetLongField(thiz, __executor);
env->DeleteWeakGlobalRef(executor->jdriver);
delete executor;
}
/*
* Class: org_apache_mesos_MesosExecutorDriver
* Method: start
* Signature: ()Lorg/apache/mesos/Protos/Status;
*/
JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosExecutorDriver_start
(JNIEnv* env, jobject thiz)
{
jclass clazz = env->GetObjectClass(thiz);
jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
MesosExecutorDriver* driver =
(MesosExecutorDriver*) env->GetLongField(thiz, __driver);
Status status = driver->start();
return convert<Status>(env, status);
}
/*
* Class: org_apache_mesos_MesosExecutorDriver
* Method: stop
* Signature: (Z)Lorg/apache/mesos/Protos/Status;
*/
JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosExecutorDriver_stop
(JNIEnv* env, jobject thiz)
{
jclass clazz = env->GetObjectClass(thiz);
jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
MesosExecutorDriver* driver =
(MesosExecutorDriver*) env->GetLongField(thiz, __driver);
Status status = driver->stop();
return convert<Status>(env, status);
}
/*
* Class: org_apache_mesos_MesosExecutorDriver
* Method: abort
* Signature: ()Lorg/apache/mesos/Protos/Status;
*/
JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosExecutorDriver_abort
(JNIEnv* env, jobject thiz)
{
jclass clazz = env->GetObjectClass(thiz);
jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
MesosExecutorDriver* driver =
(MesosExecutorDriver*) env->GetLongField(thiz, __driver);
Status status = driver->abort();
return convert<Status>(env, status);
}
/*
* Class: org_apache_mesos_MesosExecutorDriver
* Method: join
* Signature: ()Lorg/apache/mesos/Protos/Status;
*/
JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosExecutorDriver_join
(JNIEnv* env, jobject thiz)
{
jclass clazz = env->GetObjectClass(thiz);
jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
MesosExecutorDriver* driver =
(MesosExecutorDriver*) env->GetLongField(thiz, __driver);
Status status = driver->join();
return convert<Status>(env, status);
}
/*
* Class: org_apache_mesos_MesosExecutorDriver
* Method: sendStatusUpdate
* Signature: (Lorg/apache/mesos/Protos/TaskStatus;)Lorg/apache/mesos/Protos/Status;
*/
JNIEXPORT jobject JNICALL
Java_org_apache_mesos_MesosExecutorDriver_sendStatusUpdate(
JNIEnv* env, jobject thiz, jobject jstatus)
{
// Construct a C++ TaskStatus from the Java TaskStatus.
const TaskStatus& taskStatus = construct<TaskStatus>(env, jstatus);
// Now invoke the underlying driver.
jclass clazz = env->GetObjectClass(thiz);
jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
MesosExecutorDriver* driver =
(MesosExecutorDriver*) env->GetLongField(thiz, __driver);
Status status = driver->sendStatusUpdate(taskStatus);
return convert<Status>(env, status);
}
/*
* Class: org_apache_mesos_MesosExecutorDriver
* Method: sendFrameworkMessage
* Signature: ([B)Lorg/apache/mesos/Protos/Status;
*/
JNIEXPORT jobject JNICALL
Java_org_apache_mesos_MesosExecutorDriver_sendFrameworkMessage(
JNIEnv* env, jobject thiz, jbyteArray jdata)
{
// Construct a C++ string from the Java byte array.
jbyte* data = env->GetByteArrayElements(jdata, nullptr);
jsize length = env->GetArrayLength(jdata);
string temp((char*) data, (size_t) length);
env->ReleaseByteArrayElements(jdata, data, 0);
// Now invoke the underlying driver.
jclass clazz = env->GetObjectClass(thiz);
jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
MesosExecutorDriver* driver =
(MesosExecutorDriver*) env->GetLongField(thiz, __driver);
Status status = driver->sendFrameworkMessage(temp);
return convert<Status>(env, status);
}
} // extern "C" {