blob: e8db407a48158802be2625378f189bbde0e20756 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <signal.h>
#include <glog/logging.h>
#include <iostream>
#include <string>
#include <sstream>
#include <mesos/executor.hpp>
#include <process/dispatch.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include "common/fatal.hpp"
#include "common/lock.hpp"
#include "common/logging.hpp"
#include "common/type_utils.hpp"
#include "common/utils.hpp"
#include "common/uuid.hpp"
#include "messages/messages.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace process;
using std::string;
using process::wait; // Necessary on some OS's to disambiguate.
namespace mesos {
namespace internal {
class ExecutorProcess : public ProtobufProcess<ExecutorProcess>
{
public:
ExecutorProcess(const UPID& _slave,
MesosExecutorDriver* _driver,
Executor* _executor,
const FrameworkID& _frameworkId,
const ExecutorID& _executorId,
bool _local,
const std::string& _directory)
: slave(_slave),
driver(_driver),
executor(_executor),
frameworkId(_frameworkId),
executorId(_executorId),
local(_local),
aborted(false),
directory(_directory)
{
install<ExecutorRegisteredMessage>(
&ExecutorProcess::registered,
&ExecutorRegisteredMessage::executor_info,
&ExecutorRegisteredMessage::framework_id,
&ExecutorRegisteredMessage::framework_info,
&ExecutorRegisteredMessage::slave_id,
&ExecutorRegisteredMessage::slave_info);
install<RunTaskMessage>(
&ExecutorProcess::runTask,
&RunTaskMessage::task);
install<KillTaskMessage>(
&ExecutorProcess::killTask,
&KillTaskMessage::task_id);
install<FrameworkToExecutorMessage>(
&ExecutorProcess::frameworkMessage,
&FrameworkToExecutorMessage::slave_id,
&FrameworkToExecutorMessage::framework_id,
&FrameworkToExecutorMessage::executor_id,
&FrameworkToExecutorMessage::data);
install<ShutdownExecutorMessage>(
&ExecutorProcess::shutdown);
}
virtual ~ExecutorProcess() {}
protected:
virtual void initialize()
{
VLOG(1) << "Executor started at: " << self();
link(slave);
// Register with slave.
RegisterExecutorMessage message;
message.mutable_framework_id()->MergeFrom(frameworkId);
message.mutable_executor_id()->MergeFrom(executorId);
send(slave, message);
}
void registered(const ExecutorInfo& executorInfo,
const FrameworkID& frameworkId,
const FrameworkInfo& frameworkInfo,
const SlaveID& slaveId,
const SlaveInfo& slaveInfo)
{
if (aborted) {
VLOG(1) << "Ignoring registered message from slave " << slaveId
<< " because the driver is aborted!";
return;
}
VLOG(1) << "Executor registered on slave " << slaveId;
this->slaveId = slaveId;
executor->registered(driver, executorInfo, frameworkInfo, slaveInfo);
}
void runTask(const TaskInfo& task)
{
if (aborted) {
VLOG(1) << "Ignoring run task message for task " << task.task_id()
<< " because the driver is aborted!";
return;
}
VLOG(1) << "Executor asked to run task '" << task.task_id() << "'";
executor->launchTask(driver, task);
}
void killTask(const TaskID& taskId)
{
if (aborted) {
VLOG(1) << "Ignoring kill task message for task " << taskId
<<" because the driver is aborted!";
return;
}
VLOG(1) << "Executor asked to kill task '" << taskId << "'";
executor->killTask(driver, taskId);
}
void frameworkMessage(const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const string& data)
{
if (aborted) {
VLOG(1) << "Ignoring framework message because the driver is aborted!";
return;
}
VLOG(1) << "Executor received framework message";
executor->frameworkMessage(driver, data);
}
void shutdown()
{
if (aborted) {
VLOG(1) << "Ignoring shutdown message because the driver is aborted!";
return;
}
VLOG(1) << "Executor asked to shutdown";
// TODO(benh): Any need to invoke driver.stop?
executor->shutdown(driver);
if (!local) {
exit(0);
} else {
terminate(this);
}
}
void abort()
{
VLOG(1) << "De-activating the executor libprocess";
aborted = true;
}
virtual void exited(const UPID& pid)
{
if (aborted) {
VLOG(1) << "Ignoring exited event because the driver is aborted!";
return;
}
VLOG(1) << "Slave exited, trying to shutdown";
// TODO: Pass an argument to shutdown to tell it this is abnormal?
executor->shutdown(driver);
// This is a pretty bad state ... no slave is left. Rather
// than exit lets kill our process group (which includes
// ourself) hoping to clean up any processes this executor
// launched itself.
// TODO(benh): Maybe do a SIGTERM and then later do a SIGKILL?
if (!local) {
killpg(0, SIGKILL);
} else {
terminate(this);
}
}
void sendStatusUpdate(const TaskStatus& status)
{
VLOG(1) << "Executor sending status update for task "
<< status.task_id() << " in state " << status.state();
if (status.state() == TASK_STAGING) {
VLOG(1) << "Executor is not allowed to send "
<< "TASK_STAGING status updates. Aborting!";
driver->abort();
executor->error(driver, "Attempted to send TASK_STAGING status update");
return;
}
StatusUpdateMessage message;
StatusUpdate* update = message.mutable_update();
update->mutable_framework_id()->MergeFrom(frameworkId);
update->mutable_executor_id()->MergeFrom(executorId);
update->mutable_slave_id()->MergeFrom(slaveId);
update->mutable_status()->MergeFrom(status);
update->set_timestamp(Clock::now());
update->set_uuid(UUID::random().toBytes());
send(slave, message);
}
void sendFrameworkMessage(const string& data)
{
ExecutorToFrameworkMessage message;
message.mutable_slave_id()->MergeFrom(slaveId);
message.mutable_framework_id()->MergeFrom(frameworkId);
message.mutable_executor_id()->MergeFrom(executorId);
message.set_data(data);
send(slave, message);
}
private:
friend class mesos::MesosExecutorDriver;
UPID slave;
MesosExecutorDriver* driver;
Executor* executor;
FrameworkID frameworkId;
ExecutorID executorId;
SlaveID slaveId;
bool local;
bool aborted;
const std::string directory;
};
} // namespace internal {
} // namespace mesos {
// Implementation of C++ API.
MesosExecutorDriver::MesosExecutorDriver(Executor* _executor)
: executor(_executor), status(DRIVER_NOT_STARTED), process(NULL)
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
// Create mutex and condition variable
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&mutex, &attr);
pthread_mutexattr_destroy(&attr);
pthread_cond_init(&cond, 0);
// TODO(benh): Initialize glog.
// Initialize libprocess library (but not glog, done above).
process::initialize(false);
}
MesosExecutorDriver::~MesosExecutorDriver()
{
// Just as in SchedulerProcess, we might wait here indefinitely if
// MesosExecutorDriver::stop has not been invoked.
wait(process);
delete process;
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
}
Status MesosExecutorDriver::start()
{
Lock lock(&mutex);
if (status != DRIVER_NOT_STARTED) {
return status;
}
// Set stream buffering mode to flush on newlines so that we capture logs
// from user processes even when output is redirected to a file.
setvbuf(stdout, 0, _IOLBF, 0);
setvbuf(stderr, 0, _IOLBF, 0);
bool local;
UPID slave;
FrameworkID frameworkId;
ExecutorID executorId;
std::string workDirectory;
char* value;
std::istringstream iss;
/* Check if this is local (for example, for testing). */
value = getenv("MESOS_LOCAL");
if (value != NULL) {
local = true;
} else {
local = false;
}
/* Get slave PID from environment. */
value = getenv("MESOS_SLAVE_PID");
if (value == NULL) {
fatal("expecting MESOS_SLAVE_PID in environment");
}
slave = UPID(value);
if (!slave) {
fatal("cannot parse MESOS_SLAVE_PID");
}
/* Get framework ID from environment. */
value = getenv("MESOS_FRAMEWORK_ID");
if (value == NULL) {
fatal("expecting MESOS_FRAMEWORK_ID in environment");
}
frameworkId.set_value(value);
/* Get executor ID from environment. */
value = getenv("MESOS_EXECUTOR_ID");
if (value == NULL) {
fatal("expecting MESOS_EXECUTOR_ID in environment");
}
executorId.set_value(value);
/* Get working directory from environment */
value = getenv("MESOS_DIRECTORY");
if (value == NULL) {
fatal("expecting MESOS_DIRECTORY in environment");
}
workDirectory = value;
CHECK(process == NULL);
process =
new ExecutorProcess(slave, this, executor, frameworkId,
executorId, local, workDirectory);
spawn(process);
return status = DRIVER_RUNNING;
}
Status MesosExecutorDriver::stop()
{
Lock lock(&mutex);
if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
return status;
}
CHECK(process != NULL);
terminate(process);
// TODO(benh): Set the condition variable in ExecutorProcess just as
// we do with the MesosSchedulerDriver and SchedulerProcess:
// dispatch(process, &ExecutorProcess::stop);
pthread_cond_signal(&cond);
bool aborted = status == DRIVER_ABORTED;
status = DRIVER_STOPPED;
return aborted ? DRIVER_ABORTED : status;
}
Status MesosExecutorDriver::abort()
{
Lock lock(&mutex);
if (status != DRIVER_RUNNING) {
return status;
}
CHECK(process != NULL);
// TODO(benh): Set the condition variable in ExecutorProcess just as
// we do with the MesosSchedulerDriver and SchedulerProcess.
dispatch(process, &ExecutorProcess::abort);
pthread_cond_signal(&cond);
return status = DRIVER_ABORTED;
}
Status MesosExecutorDriver::join()
{
Lock lock(&mutex);
if (status != DRIVER_RUNNING) {
return status;
}
while (status == DRIVER_RUNNING) {
pthread_cond_wait(&cond, &mutex);
}
CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
return status;
}
Status MesosExecutorDriver::run()
{
Status status = start();
return status != DRIVER_RUNNING ? status : join();
}
Status MesosExecutorDriver::sendStatusUpdate(const TaskStatus& taskStatus)
{
Lock lock(&mutex);
if (status != DRIVER_RUNNING) {
return status;
}
CHECK(process != NULL);
dispatch(process, &ExecutorProcess::sendStatusUpdate, taskStatus);
return status;
}
Status MesosExecutorDriver::sendFrameworkMessage(const string& data)
{
Lock lock(&mutex);
if (status != DRIVER_RUNNING) {
return status;
}
CHECK(process != NULL);
dispatch(process, &ExecutorProcess::sendFrameworkMessage, data);
return status;
}