blob: 5a5f95f04a6ce096079b67397cb324575409f795 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <signal.h>
#include <stdio.h>
#ifndef __WINDOWS__
#include <sys/wait.h>
#endif // __WINDOWS__
#include <iostream>
#include <list>
#include <string>
#include <vector>
#include <mesos/mesos.hpp>
#include <mesos/v1/executor.hpp>
#include <mesos/v1/mesos.hpp>
#include <mesos/type_utils.hpp>
#include <process/clock.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/future.hpp>
#include <process/io.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <process/subprocess.hpp>
#include <process/reap.hpp>
#include <process/time.hpp>
#include <process/timer.hpp>
#ifdef __WINDOWS__
#include <process/windows/winsock.hpp> // WSAStartup code.
#endif // __WINDOWS__
#include <stout/duration.hpp>
#include <stout/flags.hpp>
#include <stout/json.hpp>
#include <stout/lambda.hpp>
#include <stout/linkedhashmap.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
#include <stout/strings.hpp>
#ifdef __WINDOWS__
#include <stout/windows.hpp>
#endif // __WINDOWS__
#include <stout/os/kill.hpp>
#include <stout/os/killtree.hpp>
#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/status_utils.hpp"
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#ifdef __linux__
#include "linux/fs.hpp"
#endif
#include "executor/v0_v1executor.hpp"
#include "health-check/health_checker.hpp"
#include "launcher/executor.hpp"
#include "logging/logging.hpp"
#include "messages/messages.hpp"
#include "slave/constants.hpp"
#ifdef __linux__
namespace fs = mesos::internal::fs;
#endif
using namespace mesos::internal::slave;
using std::cout;
using std::cerr;
using std::endl;
using std::queue;
using std::string;
using std::vector;
using process::Clock;
using process::Future;
using process::Owned;
using process::Subprocess;
using process::Time;
using process::Timer;
using mesos::internal::devolve;
using mesos::internal::evolve;
using mesos::internal::HealthChecker;
using mesos::internal::TaskHealthStatus;
using mesos::internal::protobuf::frameworkHasCapability;
using mesos::v1::ExecutorID;
using mesos::v1::FrameworkID;
using mesos::v1::executor::Call;
using mesos::v1::executor::Event;
using mesos::v1::executor::Mesos;
using mesos::v1::executor::MesosBase;
using mesos::v1::executor::V0ToV1Adapter;
namespace mesos {
namespace v1 {
namespace internal {
class CommandExecutor: public ProtobufProcess<CommandExecutor>
{
public:
CommandExecutor(
const string& _launcherDir,
const Option<string>& _rootfs,
const Option<string>& _sandboxDirectory,
const Option<string>& _workingDirectory,
const Option<string>& _user,
const Option<string>& _taskCommand,
const FrameworkID& _frameworkId,
const ExecutorID& _executorId,
const Duration& _shutdownGracePeriod)
: state(DISCONNECTED),
launched(false),
killed(false),
killedByHealthCheck(false),
terminated(false),
pid(-1),
shutdownGracePeriod(_shutdownGracePeriod),
frameworkInfo(None()),
taskId(None()),
launcherDir(_launcherDir),
rootfs(_rootfs),
sandboxDirectory(_sandboxDirectory),
workingDirectory(_workingDirectory),
user(_user),
taskCommand(_taskCommand),
frameworkId(_frameworkId),
executorId(_executorId),
task(None())
{
#ifdef __WINDOWS__
processHandle = INVALID_HANDLE_VALUE;
#endif
}
virtual ~CommandExecutor()
{
#ifdef __WINDOWS__
if (processHandle != INVALID_HANDLE_VALUE) {
::CloseHandle(processHandle);
}
#endif // __WINDOWS__
}
void connected()
{
state = CONNECTED;
doReliableRegistration();
}
void disconnected()
{
state = DISCONNECTED;
}
void received(queue<Event> events)
{
while (!events.empty()) {
Event event = events.front();
events.pop();
cout << "Received " << event.type() << " event" << endl;
switch (event.type()) {
case Event::SUBSCRIBED: {
cout << "Subscribed executor on "
<< event.subscribed().agent_info().hostname() << endl;
frameworkInfo = event.subscribed().framework_info();
state = SUBSCRIBED;
break;
}
case Event::LAUNCH: {
launch(event.launch().task());
break;
}
case Event::KILL: {
Option<KillPolicy> override = event.kill().has_kill_policy()
? Option<KillPolicy>(event.kill().kill_policy())
: None();
kill(event.kill().task_id(), override);
break;
}
case Event::ACKNOWLEDGED: {
// Remove the corresponding update.
updates.erase(UUID::fromBytes(event.acknowledged().uuid()).get());
// Remove the corresponding task.
task = None();
break;
}
case Event::SHUTDOWN: {
shutdown();
break;
}
case Event::MESSAGE: {
break;
}
case Event::ERROR: {
cerr << "Error: " << event.error().message() << endl;
break;
}
case Event::UNKNOWN: {
LOG(WARNING) << "Received an UNKNOWN event and ignored";
break;
}
}
}
}
protected:
virtual void initialize()
{
// TODO(qianzhang): Currently, the `mesos-health-check` binary can only
// send unversioned `TaskHealthStatus` messages. This needs to be revisited
// as part of MESOS-5103.
install<TaskHealthStatus>(
&CommandExecutor::taskHealthUpdated,
&TaskHealthStatus::task_id,
&TaskHealthStatus::healthy,
&TaskHealthStatus::kill_task);
Option<string> value = os::getenv("MESOS_HTTP_COMMAND_EXECUTOR");
// We initialize the library here to ensure that callbacks are only invoked
// after the process has spawned.
if (value.isSome() && value.get() == "1") {
mesos.reset(new Mesos(
mesos::ContentType::PROTOBUF,
defer(self(), &Self::connected),
defer(self(), &Self::disconnected),
defer(self(), &Self::received, lambda::_1)));
} else {
mesos.reset(new V0ToV1Adapter(
defer(self(), &Self::connected),
defer(self(), &Self::disconnected),
defer(self(), &Self::received, lambda::_1)));
}
}
void taskHealthUpdated(
const mesos::TaskID& taskID,
const bool healthy,
const bool initiateTaskKill)
{
cout << "Received task health update, healthy: "
<< stringify(healthy) << endl;
update(evolve(taskID), TASK_RUNNING, healthy);
if (initiateTaskKill) {
killedByHealthCheck = true;
kill(evolve(taskID));
}
}
void doReliableRegistration()
{
if (state == SUBSCRIBED || state == DISCONNECTED) {
return;
}
Call call;
call.set_type(Call::SUBSCRIBE);
call.mutable_framework_id()->CopyFrom(frameworkId);
call.mutable_executor_id()->CopyFrom(executorId);
Call::Subscribe* subscribe = call.mutable_subscribe();
// Send all unacknowledged updates.
foreach (const Call::Update& update, updates.values()) {
subscribe->add_unacknowledged_updates()->MergeFrom(update);
}
// Send the unacknowledged task.
if (task.isSome()) {
subscribe->add_unacknowledged_tasks()->MergeFrom(task.get());
}
mesos->send(call);
delay(Seconds(1), self(), &Self::doReliableRegistration);
}
void launch(const TaskInfo& _task)
{
CHECK_EQ(SUBSCRIBED, state);
if (launched) {
update(
_task.task_id(),
TASK_FAILED,
None(),
"Attempted to run multiple tasks using a \"command\" executor");
return;
}
// Capture the task.
task = _task;
// Capture the TaskID.
taskId = task->task_id();
// Capture the kill policy.
if (task->has_kill_policy()) {
killPolicy = task->kill_policy();
}
// Determine the command to launch the task.
CommandInfo command;
if (taskCommand.isSome()) {
// Get CommandInfo from a JSON string.
Try<JSON::Object> object = JSON::parse<JSON::Object>(taskCommand.get());
if (object.isError()) {
ABORT("Failed to parse JSON: " + object.error());
}
Try<CommandInfo> parse = protobuf::parse<CommandInfo>(object.get());
if (parse.isError()) {
ABORT("Failed to parse protobuf: " + parse.error());
}
command = parse.get();
} else if (task->has_command()) {
command = task->command();
} else {
LOG(FATAL) << "Expecting task '" << task->task_id() << "' "
<< "to have a command";
}
// TODO(jieyu): For now, we just fail the executor if the task's
// CommandInfo is not valid. The framework will receive
// TASK_FAILED for the task, and will most likely find out the
// cause with some debugging. This is a temporary solution. A more
// correct solution is to perform this validation at master side.
if (command.shell()) {
CHECK(command.has_value())
<< "Shell command of task '" << task->task_id()
<< "' is not specified!";
} else {
CHECK(command.has_value())
<< "Executable of task '" << task->task_id()
<< "' is not specified!";
}
cout << "Starting task " << task->task_id() << endl;
#ifndef __WINDOWS__
pid = launchTaskPosix(
command,
launcherDir,
user,
rootfs,
sandboxDirectory,
workingDirectory);
#else
// A Windows process is started using the `CREATE_SUSPENDED` flag
// and is part of a job object. While the process handle is kept
// open the reap function will work.
PROCESS_INFORMATION processInformation = launchTaskWindows(
command,
rootfs);
pid = processInformation.dwProcessId;
::ResumeThread(processInformation.hThread);
CloseHandle(processInformation.hThread);
processHandle = processInformation.hProcess;
#endif
cout << "Forked command at " << pid << endl;
if (task->has_health_check()) {
Try<Owned<HealthChecker>> _checker = HealthChecker::create(
devolve(task->health_check()),
self(),
devolve(task->task_id()));
if (_checker.isError()) {
// TODO(gilbert): Consider ABORT and return a TASK_FAILED here.
cerr << "Failed to create health checker: "
<< _checker.error() << endl;
} else {
checker = _checker.get();
checker->healthCheck()
.onAny([](const Future<Nothing>& future) {
// Only possible to be a failure.
if (future.isFailed()) {
cerr << "Healh check failed" << endl;
}
});
}
}
// Monitor this process.
process::reap(pid)
.onAny(defer(self(), &Self::reaped, pid, lambda::_1));
update(task->task_id(), TASK_RUNNING);
launched = true;
}
void kill(const TaskID& taskId, const Option<KillPolicy>& override = None())
{
// Default grace period is set to 3s for backwards compatibility.
//
// TODO(alexr): Replace it with a more meaningful default, e.g.
// `shutdownGracePeriod` after the deprecation cycle, started in 1.0.
Duration gracePeriod = Seconds(3);
// Kill policy provided in the `Kill` event takes precedence
// over kill policy specified when the task was launched.
if (override.isSome() && override->has_grace_period()) {
gracePeriod = Nanoseconds(override->grace_period().nanoseconds());
} else if (killPolicy.isSome() && killPolicy->has_grace_period()) {
gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds());
}
cout << "Received kill for task " << taskId.value()
<< " with grace period of " << gracePeriod << endl;
kill(taskId, gracePeriod);
}
void shutdown()
{
cout << "Shutting down" << endl;
// NOTE: We leave a small buffer of time to do the forced kill, otherwise
// the agent may destroy the container before we can send `TASK_KILLED`.
//
// TODO(alexr): Remove `MAX_REAP_INTERVAL` once the reaper signals
// immediately after the watched process has exited.
Duration gracePeriod =
shutdownGracePeriod - process::MAX_REAP_INTERVAL() - Seconds(1);
// Since the command executor manages a single task,
// shutdown boils down to killing this task.
//
// TODO(bmahler): If a shutdown arrives after a kill task within
// the grace period of the `KillPolicy`, we may need to escalate
// more quickly (e.g. the shutdown grace period allotted by the
// agent is smaller than the kill grace period).
if (launched) {
CHECK_SOME(taskId);
kill(taskId.get(), gracePeriod);
}
}
private:
void kill(const TaskID& _taskId, const Duration& gracePeriod)
{
if (terminated) {
return;
}
// If the task is being killed but has not terminated yet and
// we receive another kill request. Check if we need to adjust
// the remaining grace period.
if (killed && !terminated) {
// When a kill request arrives on the executor, we cannot simply
// restart the escalation timer, because the scheduler may retry
// and this must be a no-op.
//
// The escalation grace period can be only decreased. We disallow
// increasing the total grace period for the terminating task in
// order to avoid possible confusion when a subsequent kill overrides
// the previous one and gives the task _more_ time to clean up. Other
// systems, e.g., docker, do not allow this.
//
// The escalation grace period can be only decreased. We intentionally
// do not support increasing the total grace period for the terminating
// task, because we do not want users to "slow down" a kill that is in
// progress. Also note that docker does not support this currently.
//
// Here are some examples to illustrate:
//
// 20, 30 -> Increased grace period is a no-op, grace period remains 20.
// 20, 20 -> Retries are a no-op, grace period remains 20.
// 20, 5 -> if `elapsed` >= 5:
// SIGKILL immediately, total grace period is `elapsed`.
// if `elapsed` < 5:
// SIGKILL in (5 - `elapsed`), total grace period is 5.
CHECK_SOME(killGracePeriodStart);
CHECK_SOME(killGracePeriodTimer);
if (killGracePeriodStart.get() + gracePeriod >
killGracePeriodTimer->timeout().time()) {
return;
}
Duration elapsed = Clock::now() - killGracePeriodStart.get();
Duration remaining = gracePeriod > elapsed
? gracePeriod - elapsed
: Duration::zero();
cout << "Rescheduling escalation to SIGKILL in " << remaining
<< " from now" << endl;
Clock::cancel(killGracePeriodTimer.get());
killGracePeriodTimer = delay(
remaining, self(), &Self::escalated, gracePeriod);
}
// Issue the kill signal if the task has been launched
// and this is the first time we've received the kill.
if (launched && !killed) {
// Send TASK_KILLING if the framework can handle it.
CHECK_SOME(frameworkInfo);
CHECK_SOME(taskId);
CHECK(taskId.get() == _taskId);
if (frameworkHasCapability(
devolve(frameworkInfo.get()),
mesos::FrameworkInfo::Capability::TASK_KILLING_STATE)) {
update(taskId.get(), TASK_KILLING);
}
// Now perform signal escalation to begin killing the task.
CHECK_GT(pid, 0);
cout << "Sending SIGTERM to process tree at pid " << pid << endl;
Try<std::list<os::ProcessTree> > trees =
os::killtree(pid, SIGTERM, true, true);
if (trees.isError()) {
cerr << "Failed to kill the process tree rooted at pid " << pid
<< ": " << trees.error() << endl;
// Send SIGTERM directly to process 'pid' as it may not have
// received signal before os::killtree() failed.
os::kill(pid, SIGTERM);
} else {
cout << "Sent SIGTERM to the following process trees:\n"
<< stringify(trees.get()) << endl;
}
cout << "Scheduling escalation to SIGKILL in " << gracePeriod
<< " from now" << endl;
killGracePeriodTimer =
delay(gracePeriod, self(), &Self::escalated, gracePeriod);
killGracePeriodStart = Clock::now();
killed = true;
}
}
void reaped(pid_t pid, const Future<Option<int> >& status_)
{
terminated = true;
TaskState taskState;
string message;
if (killGracePeriodTimer.isSome()) {
Clock::cancel(killGracePeriodTimer.get());
}
if (!status_.isReady()) {
taskState = TASK_FAILED;
message =
"Failed to get exit status for Command: " +
(status_.isFailed() ? status_.failure() : "future discarded");
} else if (status_.get().isNone()) {
taskState = TASK_FAILED;
message = "Failed to get exit status for Command";
} else {
int status = status_.get().get();
CHECK(WIFEXITED(status) || WIFSIGNALED(status)) << status;
if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
taskState = TASK_FINISHED;
} else if (killed) {
// Send TASK_KILLED if the task was killed as a result of
// kill() or shutdown().
taskState = TASK_KILLED;
} else {
taskState = TASK_FAILED;
}
message = "Command " + WSTRINGIFY(status);
}
cout << message << " (pid: " << pid << ")" << endl;
CHECK_SOME(taskId);
if (killed && killedByHealthCheck) {
update(taskId.get(), taskState, false, message);
} else {
update(taskId.get(), taskState, None(), message);
}
// TODO(qianzhang): Remove this hack since the executor now receives
// acknowledgements for status updates. The executor can terminate
// after it receives an ACK for a terminal status update.
os::sleep(Seconds(1));
terminate(self());
}
void escalated(const Duration& timeout)
{
if (terminated) {
return;
}
cout << "Process " << pid << " did not terminate after " << timeout
<< ", sending SIGKILL to process tree at " << pid << endl;
// TODO(nnielsen): Sending SIGTERM in the first stage of the
// shutdown may leave orphan processes hanging off init. This
// scenario will be handled when PID namespace encapsulated
// execution is in place.
Try<std::list<os::ProcessTree> > trees =
os::killtree(pid, SIGKILL, true, true);
if (trees.isError()) {
cerr << "Failed to kill the process tree rooted at pid "
<< pid << ": " << trees.error() << endl;
// Process 'pid' may not have received signal before
// os::killtree() failed. To make sure process 'pid' is reaped
// we send SIGKILL directly.
os::kill(pid, SIGKILL);
} else {
cout << "Killed the following process trees:\n" << stringify(trees.get())
<< endl;
}
}
void update(
const TaskID& taskID,
const TaskState& state,
const Option<bool>& healthy = None(),
const Option<string>& message = None())
{
UUID uuid = UUID::random();
TaskStatus status;
status.mutable_task_id()->CopyFrom(taskID);
status.mutable_executor_id()->CopyFrom(executorId);
status.set_state(state);
status.set_source(TaskStatus::SOURCE_EXECUTOR);
status.set_uuid(uuid.toBytes());
if (healthy.isSome()) {
status.set_healthy(healthy.get());
}
if (message.isSome()) {
status.set_message(message.get());
}
Call call;
call.set_type(Call::UPDATE);
call.mutable_framework_id()->CopyFrom(frameworkId);
call.mutable_executor_id()->CopyFrom(executorId);
call.mutable_update()->mutable_status()->CopyFrom(status);
// Capture the status update.
updates[uuid] = call.update();
mesos->send(call);
}
enum State
{
CONNECTED,
DISCONNECTED,
SUBSCRIBED
} state;
// TODO(alexr): Introduce a state enum and document transitions,
// see MESOS-5252.
bool launched;
bool killed;
bool killedByHealthCheck;
bool terminated;
Option<Time> killGracePeriodStart;
Option<Timer> killGracePeriodTimer;
pid_t pid;
#ifdef __WINDOWS__
HANDLE processHandle;
#endif
Duration shutdownGracePeriod;
Option<KillPolicy> killPolicy;
Option<FrameworkInfo> frameworkInfo;
Option<TaskID> taskId;
string launcherDir;
Option<string> rootfs;
Option<string> sandboxDirectory;
Option<string> workingDirectory;
Option<string> user;
Option<string> taskCommand;
const FrameworkID frameworkId;
const ExecutorID executorId;
Owned<MesosBase> mesos;
LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates.
Option<TaskInfo> task; // Unacknowledged task.
Owned<HealthChecker> checker;
};
} // namespace internal {
} // namespace v1 {
} // namespace mesos {
class Flags : public flags::FlagsBase
{
public:
Flags()
{
add(&rootfs,
"rootfs",
"The path to the root filesystem for the task");
// The following flags are only applicable when a rootfs is
// provisioned for this command.
add(&sandbox_directory,
"sandbox_directory",
"The absolute path for the directory in the container where the\n"
"sandbox is mapped to");
add(&working_directory,
"working_directory",
"The working directory for the task in the container.");
add(&user,
"user",
"The user that the task should be running as.");
add(&task_command,
"task_command",
"If specified, this is the overrided command for launching the\n"
"task (instead of the command from TaskInfo).");
add(&launcher_dir,
"launcher_dir",
"Directory path of Mesos binaries.",
PKGLIBEXECDIR);
// TODO(nnielsen): Add 'prefix' option to enable replacing
// 'sh -c' with user specified wrapper.
}
Option<string> rootfs;
Option<string> sandbox_directory;
Option<string> working_directory;
Option<string> user;
Option<string> task_command;
string launcher_dir;
};
int main(int argc, char** argv)
{
Flags flags;
FrameworkID frameworkId;
ExecutorID executorId;
#ifdef __WINDOWS__
process::Winsock winsock;
#endif
// Load flags from command line.
Try<flags::Warnings> load = flags.load(None(), &argc, &argv);
if (load.isError()) {
cerr << flags.usage(load.error()) << endl;
return EXIT_FAILURE;
}
if (flags.help) {
cout << flags.usage() << endl;
return EXIT_SUCCESS;
}
// Log any flag warnings (after logging is initialized).
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
Option<string> value = os::getenv("MESOS_FRAMEWORK_ID");
if (value.isNone()) {
EXIT(EXIT_FAILURE)
<< "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";
}
frameworkId.set_value(value.get());
value = os::getenv("MESOS_EXECUTOR_ID");
if (value.isNone()) {
EXIT(EXIT_FAILURE)
<< "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";
}
executorId.set_value(value.get());
// Get executor shutdown grace period from the environment.
//
// NOTE: We avoided introducing a command executor flag for this
// because the command executor exits if it sees an unknown flag.
// This makes it difficult to add or remove command executor flags
// that are unconditionally set by the agent.
Duration shutdownGracePeriod = DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD;
value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
if (value.isSome()) {
Try<Duration> parse = Duration::parse(value.get());
if (parse.isError()) {
cerr << "Failed to parse value '" << value.get() << "'"
<< " of 'MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD': " << parse.error();
return EXIT_FAILURE;
}
shutdownGracePeriod = parse.get();
}
Owned<mesos::v1::internal::CommandExecutor> executor(
new mesos::v1::internal::CommandExecutor(
flags.launcher_dir,
flags.rootfs,
flags.sandbox_directory,
flags.working_directory,
flags.user,
flags.task_command,
frameworkId,
executorId,
shutdownGracePeriod));
process::spawn(executor.get());
process::wait(executor.get());
return EXIT_SUCCESS;
}