blob: 7afd0e960ed7a908a66b21d0bd9bdc0e7f8a2e59 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <queue>
#include <string>
#include <vector>
#include <mesos/mesos.hpp>
#include <mesos/scheduler.hpp>
#include <mesos/v1/scheduler.hpp>
#include <mesos/v1/scheduler/scheduler.hpp>
#include <process/clock.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <stout/check.hpp>
#include <stout/exit.hpp>
#include <stout/unreachable.hpp>
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "jvm/jvm.hpp"
#include "master/constants.hpp"
#include "master/validation.hpp"
#include "convert.hpp"
#include "construct.hpp"
#include "org_apache_mesos_v1_scheduler_V0Mesos.h"
using namespace mesos;
using namespace mesos::internal::master;
using std::queue;
using std::string;
using std::vector;
using mesos::internal::devolve;
using mesos::internal::evolve;
using mesos::v1::scheduler::Call;
using mesos::v1::scheduler::Event;
using process::Clock;
using process::Owned;
using process::Timer;
class V0ToV1AdapterProcess; // Forward declaration.
// This interface acts as an adapter from the v0 (driver + scheduler) to the
// v1 Mesos scheduler.
class V0ToV1Adapter : public mesos::Scheduler, public v1::scheduler::MesosBase
{
public:
V0ToV1Adapter(
JNIEnv* env,
jweak jmesos,
const FrameworkInfo& frameworkInfo,
const string& master,
const Option<Credential>& credential);
~V0ToV1Adapter() override;
// v0 Scheduler interface overrides.
void registered(
SchedulerDriver* driver,
const FrameworkID& frameworkId,
const MasterInfo& masterInfo) override;
void reregistered(
SchedulerDriver* driver,
const MasterInfo& masterInfo) override;
void disconnected(SchedulerDriver* driver) override;
void resourceOffers(
SchedulerDriver* driver,
const vector<Offer>& offers) override;
void offerRescinded(
SchedulerDriver* driver,
const OfferID& offerId) override;
void statusUpdate(
SchedulerDriver* driver,
const TaskStatus& status) override;
void frameworkMessage(
SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
const string& data) override;
void slaveLost(
SchedulerDriver* driver,
const SlaveID& slaveId) override;
void executorLost(
SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
int status) override;
void error(
SchedulerDriver* driver,
const string& message) override;
// v1 MesosBase interface overrides.
void send(const v1::scheduler::Call& call) override;
void reconnect() override
{
// The driver does not support explicit reconnection with the master.
UNREACHABLE();
}
process::Future<v1::scheduler::APIResult> call(
const v1::scheduler::Call& callMessage) override
{
// The driver does not support sending a `v1::scheduler::Call` that returns
// a `v1::scheduler::Response`.
UNREACHABLE();
}
process::Owned<V0ToV1AdapterProcess> process;
private:
Owned<MesosSchedulerDriver> driver;
};
// The process (below) is responsible for ensuring synchronized access between
// callbacks received from the driver and calls invoked by the adapter.
class V0ToV1AdapterProcess : public process::Process<V0ToV1AdapterProcess>
{
public:
V0ToV1AdapterProcess(JNIEnv* env, jweak jmesos);
~V0ToV1AdapterProcess() override = default;
void registered(const FrameworkID& frameworkId, const MasterInfo& masterInfo);
void reregistered(const MasterInfo& masterInfo);
void disconnected();
void resourceOffers(const vector<Offer>& offers);
void offerRescinded(const OfferID& offerId);
void statusUpdate(const TaskStatus& status);
void frameworkMessage(
const ExecutorID& executorId,
const SlaveID& slaveId,
const string& data);
void slaveLost(const SlaveID& slaveId);
void executorLost(
const ExecutorID& executorId,
const SlaveID& slaveId,
int status);
void error(const string& message);
void send(SchedulerDriver*, const v1::scheduler::Call& call);
JavaVM* jvm;
JNIEnv* env;
jweak jmesos;
protected:
void received(const Event& event);
void _received();
void __received(const Event& event);
void connect();
void heartbeat();
void disconnect();
private:
bool subscribeCall;
const Duration interval;
queue<Event> pending;
Option<FrameworkID> frameworkId;
Option<Timer> heartbeatTimer;
};
V0ToV1Adapter::V0ToV1Adapter(
JNIEnv* env,
jweak jmesos,
const FrameworkInfo& frameworkInfo,
const string& master,
const Option<Credential>& credential)
: process(new V0ToV1AdapterProcess(env, jmesos))
{
spawn(process.get());
driver.reset(
credential.isSome()
// Disable implicit acks.
? new MesosSchedulerDriver(
this, frameworkInfo, master, false, credential.get())
: new MesosSchedulerDriver(this, frameworkInfo, master, false));
driver->start();
}
V0ToV1Adapter::~V0ToV1Adapter()
{
terminate(process.get());
wait(process.get());
}
void V0ToV1Adapter::error(
SchedulerDriver*,
const string& message)
{
process::dispatch(process.get(), &V0ToV1AdapterProcess::error, message);
}
void V0ToV1Adapter::executorLost(
SchedulerDriver*,
const ExecutorID& executorId,
const SlaveID& slaveId,
int status)
{
process::dispatch(
process.get(),
&V0ToV1AdapterProcess::executorLost,
executorId,
slaveId,
status);
}
void V0ToV1Adapter::slaveLost(
SchedulerDriver*,
const SlaveID& slaveId)
{
process::dispatch(process.get(), &V0ToV1AdapterProcess::slaveLost, slaveId);
}
void V0ToV1Adapter::frameworkMessage(
SchedulerDriver*,
const ExecutorID& executorId,
const SlaveID& slaveId,
const string& data)
{
process::dispatch(
process.get(),
&V0ToV1AdapterProcess::frameworkMessage,
executorId,
slaveId,
data);
}
void V0ToV1Adapter::statusUpdate(
SchedulerDriver*,
const TaskStatus& status)
{
process::dispatch(process.get(), &V0ToV1AdapterProcess::statusUpdate, status);
}
void V0ToV1Adapter::offerRescinded(
SchedulerDriver*,
const OfferID& offerId)
{
process::dispatch(
process.get(), &V0ToV1AdapterProcess::offerRescinded, offerId);
}
void V0ToV1Adapter::resourceOffers(
SchedulerDriver*,
const vector<Offer>& offers)
{
process::dispatch(
process.get(), &V0ToV1AdapterProcess::resourceOffers, offers);
}
void V0ToV1Adapter::registered(
SchedulerDriver*,
const FrameworkID &frameworkId,
const MasterInfo& masterInfo)
{
process::dispatch(
process.get(),
&V0ToV1AdapterProcess::registered,
frameworkId,
masterInfo);
}
void V0ToV1Adapter::reregistered(
SchedulerDriver*,
const MasterInfo& masterInfo)
{
process::dispatch(
process.get(), &V0ToV1AdapterProcess::reregistered, masterInfo);
}
void V0ToV1Adapter::disconnected(SchedulerDriver*)
{
process::dispatch(process.get(), &V0ToV1AdapterProcess::disconnected);
}
void V0ToV1Adapter::send(const Call& call)
{
process::dispatch(
process.get(), &V0ToV1AdapterProcess::send, driver.get(), call);
}
V0ToV1AdapterProcess::V0ToV1AdapterProcess(
JNIEnv* _env,
jweak _jmesos)
: ProcessBase(process::ID::generate("SchedulerV0ToV1Adapter")),
jvm(nullptr),
env(_env),
jmesos(_jmesos),
subscribeCall(false),
interval(DEFAULT_HEARTBEAT_INTERVAL)
{
env->GetJavaVM(&jvm);
}
void V0ToV1AdapterProcess::registered(
const FrameworkID& _frameworkId,
const MasterInfo& masterInfo)
{
LOG(INFO) << "Registered with the Mesos master; invoking connected callback";
connect();
// We need this copy to populate the fields in `Event::Subscribed` upon
// receiving a `reregistered()` callback later.
frameworkId = _frameworkId;
// These events are queued and delivered to the scheduler upon receiving the
// subscribe call later. See comments in `send()` for more details.
{
Event event;
event.set_type(Event::SUBSCRIBED);
Event::Subscribed* subscribed = event.mutable_subscribed();
subscribed->mutable_framework_id()->CopyFrom(evolve(frameworkId.get()));
subscribed->set_heartbeat_interval_seconds(interval.secs());
subscribed->mutable_master_info()->CopyFrom(evolve(masterInfo));
received(event);
}
{
Event event;
event.set_type(Event::HEARTBEAT);
received(event);
}
}
void V0ToV1AdapterProcess::reregistered(const MasterInfo& masterInfo)
{
CHECK_SOME(frameworkId);
registered(frameworkId.get(), masterInfo);
}
void V0ToV1AdapterProcess::disconnected()
{
// Upon noticing a disconnection with the master, we drain the pending
// events in the queue that were waiting to be sent to the scheduler
// upon receiving the subscribe call.
// It's fine to do so because:
// - Any outstanding offers are invalidated by the master upon a scheduler
// (re-)registration.
// - Any task status updates could be reconciled by the scheduler.
LOG(INFO) << "Dropping " << pending.size() << " pending event(s)"
<< " because master disconnected";
pending = queue<Event>();
subscribeCall = false;
if (heartbeatTimer.isSome()) {
Clock::cancel(heartbeatTimer.get());
heartbeatTimer = None();
}
LOG(INFO) << "Disconnected with the Mesos master;"
<< " invoking disconnected callback";
disconnect();
}
void V0ToV1AdapterProcess::resourceOffers(const vector<Offer>& _offers)
{
Event event;
event.set_type(Event::OFFERS);
Event::Offers* offers = event.mutable_offers();
foreach (const Offer& offer, _offers) {
offers->add_offers()->CopyFrom(evolve(offer));
}
received(event);
}
void V0ToV1AdapterProcess::offerRescinded(const OfferID& offerId)
{
Event event;
event.set_type(Event::RESCIND);
event.mutable_rescind()->mutable_offer_id()->
CopyFrom(evolve(offerId));
received(event);
}
void V0ToV1AdapterProcess::statusUpdate(const TaskStatus& status)
{
Event event;
event.set_type(Event::UPDATE);
event.mutable_update()->mutable_status()->
CopyFrom(mesos::internal::evolve(status));
received(event);
}
void V0ToV1AdapterProcess::frameworkMessage(
const ExecutorID& executorId,
const SlaveID& slaveId,
const string& data)
{
Event event;
event.set_type(Event::MESSAGE);
event.mutable_message()->mutable_agent_id()->
CopyFrom(mesos::internal::evolve(slaveId));
event.mutable_message()->mutable_executor_id()->
CopyFrom(mesos::internal::evolve(executorId));
event.mutable_message()->set_data(data.data());
received(event);
}
void V0ToV1AdapterProcess::slaveLost(const SlaveID& slaveId)
{
Event event;
event.set_type(Event::FAILURE);
event.mutable_failure()->mutable_agent_id()->
CopyFrom(mesos::internal::evolve(slaveId));
received(event);
}
void V0ToV1AdapterProcess::executorLost(
const ExecutorID& executorId,
const SlaveID& slaveId,
int status)
{
Event event;
event.set_type(Event::FAILURE);
event.mutable_failure()->mutable_agent_id()->
CopyFrom(mesos::internal::evolve(slaveId));
event.mutable_failure()->mutable_executor_id()->
CopyFrom(mesos::internal::evolve(executorId));
event.mutable_failure()->set_status(status);
received(event);
}
void V0ToV1AdapterProcess::error(const string& message)
{
Event event;
event.set_type(Event::ERROR);
event.mutable_error()->set_message(message);
// There might be an error during the communication with the master or
// implicit registration happening on driver initialization. Since
// `Scheduler.connect` is called upon a successful registration only, the
// scheduler will never try to subscribe and hence will never receive the
// error. This workaround satisfies the invariant of the v1 interface that
// a scheduler can receive an event only after successfully connecting with
// the master.
if (!subscribeCall) {
LOG(INFO) << "Implicitly connecting the scheduler to send an error";
connect();
}
received(event);
}
void V0ToV1AdapterProcess::send(SchedulerDriver* driver, const Call& _call)
{
CHECK_NOTNULL(driver);
scheduler::Call call = devolve(_call);
Option<Error> error = validation::scheduler::call::validate(call);
if (error.isSome()) {
LOG(WARNING) << "Dropping " << call.type() << ": due to error "
<< error->message;
return;
}
switch (call.type()) {
case scheduler::Call::SUBSCRIBE: {
subscribeCall = true;
heartbeatTimer = process::delay(interval, self(), &Self::heartbeat);
// The driver subscribes implicitly with the master upon initialization.
// For compatibility with the v1 interface, send the already enqueued
// subscribed event (or subscription error) upon receiving the subscribe
// request.
_received();
break;
}
case scheduler::Call::TEARDOWN: {
driver->stop(false);
break;
}
case scheduler::Call::ACCEPT: {
vector<OfferID> offerIds;
foreach (const OfferID& offerId, call.accept().offer_ids()) {
offerIds.emplace_back(offerId);
}
vector<Offer::Operation> operations;
foreach (const Offer::Operation& operation, call.accept().operations()) {
operations.emplace_back(operation);
}
if (call.accept().has_filters()) {
driver->acceptOffers(offerIds, operations, call.accept().filters());
} else {
driver->acceptOffers(offerIds, operations);
}
break;
}
case scheduler::Call::ACCEPT_INVERSE_OFFERS:
case scheduler::Call::DECLINE_INVERSE_OFFERS:
case scheduler::Call::SHUTDOWN:
case scheduler::Call::UPDATE_FRAMEWORK: {
// TODO(anand): Throw java error.
LOG(ERROR) << "Received an unexpected " << call.type() << " call";
break;
}
case scheduler::Call::DECLINE: {
foreach (const OfferID& offerId, call.decline().offer_ids()) {
if (call.decline().has_filters()) {
driver->declineOffer(offerId, call.decline().filters());
} else {
driver->declineOffer(offerId);
}
}
break;
}
case scheduler::Call::REVIVE: {
driver->reviveOffers();
break;
}
case scheduler::Call::KILL: {
driver->killTask(call.kill().task_id());
break;
}
case scheduler::Call::ACKNOWLEDGE: {
TaskStatus status;
status.mutable_task_id()->CopyFrom(call.acknowledge().task_id());
status.mutable_slave_id()->CopyFrom(call.acknowledge().slave_id());
status.set_uuid(call.acknowledge().uuid());
driver->acknowledgeStatusUpdate(status);
break;
}
// TODO(greggomann): Implement operation status acknowledgement.
case scheduler::Call::ACKNOWLEDGE_OPERATION_STATUS:
break;
case scheduler::Call::RECONCILE: {
vector<TaskStatus> statuses;
foreach (const scheduler::Call::Reconcile::Task& task,
call.reconcile().tasks()) {
TaskStatus status;
status.mutable_task_id()->CopyFrom(task.task_id());
statuses.emplace_back(status);
}
driver->reconcileTasks(statuses);
break;
}
// TODO(greggomann): Implement operation reconciliation.
case scheduler::Call::RECONCILE_OPERATIONS:
break;
case scheduler::Call::MESSAGE: {
driver->sendFrameworkMessage(
call.message().executor_id(),
call.message().slave_id(),
string(call.message().data()));
break;
}
case scheduler::Call::REQUEST: {
vector<Request> requests;
foreach (const Request& request, call.request().requests()) {
requests.emplace_back(request);
}
driver->requestResources(requests);
break;
}
case scheduler::Call::SUPPRESS: {
driver->suppressOffers();
break;
}
case scheduler::Call::UNKNOWN: {
EXIT(EXIT_FAILURE) << "Received an unexpected " << call.type()
<< " call";
break;
}
}
}
void V0ToV1AdapterProcess::received(const Event& event)
{
// For compatibility with the v1 interface, we only start sending events
// once the scheduler has sent the subscribe call. An exception to this
// is an error event, which can be sent before the subscribe call.
if (!subscribeCall) {
pending.push(event);
return;
}
pending.push(event);
_received();
}
void V0ToV1AdapterProcess::_received()
{
CHECK(subscribeCall);
while (!pending.empty()) {
__received(pending.front());
pending.pop();
}
}
void V0ToV1AdapterProcess::__received(const Event& event)
{
jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
jclass clazz = env->GetObjectClass(jmesos);
jfieldID scheduler =
env->GetFieldID(clazz, "scheduler",
"Lorg/apache/mesos/v1/scheduler/Scheduler;");
jobject jscheduler = env->GetObjectField(jmesos, scheduler);
clazz = env->GetObjectClass(jscheduler);
// scheduler.received(mesos, event);
jmethodID received =
env->GetMethodID(clazz, "received",
"(Lorg/apache/mesos/v1/scheduler/Mesos;"
"Lorg/apache/mesos/v1/scheduler/Protos$Event;)V");
jobject jevent = convert<Event>(env, event);
env->ExceptionClear();
env->CallVoidMethod(jscheduler, received, jmesos, jevent);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
jvm->DetachCurrentThread();
ABORT("Exception thrown during `received` call");
}
jvm->DetachCurrentThread();
}
void V0ToV1AdapterProcess::connect()
{
jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
jclass clazz = env->GetObjectClass(jmesos);
jfieldID scheduler =
env->GetFieldID(clazz, "scheduler",
"Lorg/apache/mesos/v1/scheduler/Scheduler;");
jobject jscheduler = env->GetObjectField(jmesos, scheduler);
clazz = env->GetObjectClass(jscheduler);
// scheduler.connected(mesos);
jmethodID connected =
env->GetMethodID(clazz, "connected",
"(Lorg/apache/mesos/v1/scheduler/Mesos;)V");
env->ExceptionClear();
env->CallVoidMethod(jscheduler, connected, jmesos);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
jvm->DetachCurrentThread();
ABORT("Exception thrown during `connected` call");
}
jvm->DetachCurrentThread();
}
void V0ToV1AdapterProcess::heartbeat()
{
// It is possible that we were unable to cancel this timer upon a
// disconnection. If this occurs, don't bother sending the heartbeat
// event.
if (heartbeatTimer.isNone() || !heartbeatTimer->timeout().expired()) {
return;
}
CHECK(subscribeCall)
<< "Cannot send heartbeat events to the scheduler without receiving a "
<< "subscribe call";
Event event;
event.set_type(Event::HEARTBEAT);
received(event);
heartbeatTimer = process::delay(interval, self(), &Self::heartbeat);
}
void V0ToV1AdapterProcess::disconnect()
{
jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
jclass clazz = env->GetObjectClass(jmesos);
jfieldID scheduler =
env->GetFieldID(clazz, "scheduler",
"Lorg/apache/mesos/v1/scheduler/Scheduler;");
jobject jscheduler = env->GetObjectField(jmesos, scheduler);
clazz = env->GetObjectClass(jscheduler);
// scheduler.disconnected(mesos);
jmethodID disconnected =
env->GetMethodID(clazz, "disconnected",
"(Lorg/apache/mesos/v1/scheduler/Mesos;)V");
env->ExceptionClear();
env->CallVoidMethod(jscheduler, disconnected, jmesos);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
jvm->DetachCurrentThread();
ABORT("Exception thrown during `disconnected` call");
}
jvm->DetachCurrentThread();
}
extern "C" {
/*
* Class: org_apache_mesos_v1_scheduler_V0Mesos
* Method: initialize
* Signature: ()V
*
*/
JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_V0Mesos_initialize
(JNIEnv* env, jobject thiz)
{
jclass clazz = env->GetObjectClass(thiz);
// Create a weak global reference to the Scheduler
// instance (we want a global reference so the GC doesn't collect
// the instance but we make it weak so the JVM can exit).
jweak jmesos = env->NewWeakGlobalRef(thiz);
// Get out the FrameworkInfo passed into the constructor.
jfieldID framework =
env->GetFieldID(clazz, "framework",
"Lorg/apache/mesos/v1/Protos$FrameworkInfo;");
jobject jframework = env->GetObjectField(thiz, framework);
// Get out the master passed into the constructor.
jfieldID master = env->GetFieldID(clazz, "master", "Ljava/lang/String;");
jobject jmaster = env->GetObjectField(thiz, master);
// Get out the credential passed into the constructor.
jfieldID credential =
env->GetFieldID(clazz, "credential",
"Lorg/apache/mesos/v1/Protos$Credential;");
jobject jcredential = env->GetObjectField(thiz, credential);
Option<Credential> credential_;
if (!env->IsSameObject(jcredential, nullptr)) {
credential_ = construct<Credential>(env, jcredential);
}
// Create the C++ scheduler and initialize the `__mesos` variable.
V0ToV1Adapter* mesos =
new V0ToV1Adapter(
env,
jmesos,
devolve(construct<v1::FrameworkInfo>(env, jframework)),
construct<string>(env, jmaster),
credential_);
jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
env->SetLongField(thiz, __mesos, (jlong) mesos);
}
/*
* Class: org_apache_mesos_v1_scheduler_V0Mesos
* Method: finalize
* Signature: ()V
*/
JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_V0Mesos_finalize
(JNIEnv* env, jobject thiz)
{
jclass clazz = env->GetObjectClass(thiz);
jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
V0ToV1Adapter* mesos =
(V0ToV1Adapter*) env->GetLongField(thiz, __mesos);
env->DeleteWeakGlobalRef(mesos->process->jmesos);
delete mesos;
}
/*
* Class: org_apache_mesos_v1_scheduler_V0Mesos
* Method: send
* Signature: (Lorg/apache/mesos/v1/scheduler/Protos/Call;)V
*/
JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_V0Mesos_send
(JNIEnv* env, jobject thiz, jobject jcall)
{
jclass clazz = env->GetObjectClass(thiz);
jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
V0ToV1Adapter* mesos =
(V0ToV1Adapter*) env->GetLongField(thiz, __mesos);
mesos->send(construct<Call>(env, jcall));
}
} // extern "C" {