blob: 4149f084b0b234fb995df04e9d1ca704feec082e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <signal.h>
#include <stdio.h>
#include <sys/wait.h>
#include <iostream>
#include <list>
#include <string>
#include <vector>
#include <mesos/executor.hpp>
#include <mesos/type_utils.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/future.hpp>
#include <process/io.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <process/subprocess.hpp>
#include <process/reap.hpp>
#include <process/timer.hpp>
#include <stout/duration.hpp>
#include <stout/flags.hpp>
#include <stout/json.hpp>
#include <stout/lambda.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
#include <stout/strings.hpp>
#include "common/http.hpp"
#include "common/status_utils.hpp"
#ifdef __linux__
#include "linux/fs.hpp"
#endif
#include "logging/logging.hpp"
#include "messages/messages.hpp"
#include "slave/constants.hpp"
using namespace mesos::internal::slave;
using process::wait; // Necessary on some OS's to disambiguate.
using std::cout;
using std::cerr;
using std::endl;
using std::string;
using std::vector;
namespace mesos {
namespace internal {
using namespace process;
class CommandExecutorProcess : public ProtobufProcess<CommandExecutorProcess>
{
public:
CommandExecutorProcess(
const Option<char**>& override,
const string& _healthCheckDir,
const Option<string>& _sandboxDirectory,
const Option<string>& _workingDirectory,
const Option<string>& _user,
const Option<string>& _taskCommand)
: state(REGISTERING),
launched(false),
killed(false),
killedByHealthCheck(false),
pid(-1),
healthPid(-1),
escalationTimeout(slave::EXECUTOR_SIGNAL_ESCALATION_TIMEOUT),
driver(None()),
frameworkInfo(None()),
taskId(None()),
healthCheckDir(_healthCheckDir),
override(override),
sandboxDirectory(_sandboxDirectory),
workingDirectory(_workingDirectory),
user(_user),
taskCommand(_taskCommand) {}
virtual ~CommandExecutorProcess() {}
void registered(
ExecutorDriver* _driver,
const ExecutorInfo& _executorInfo,
const FrameworkInfo& _frameworkInfo,
const SlaveInfo& slaveInfo)
{
CHECK_EQ(REGISTERING, state);
cout << "Registered executor on " << slaveInfo.hostname() << endl;
driver = _driver;
frameworkInfo = _frameworkInfo;
state = REGISTERED;
}
void reregistered(
ExecutorDriver* driver,
const SlaveInfo& slaveInfo)
{
CHECK(state == REGISTERED || state == REGISTERING) << state;
cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
state = REGISTERED;
}
void disconnected(ExecutorDriver* driver) {}
void launchTask(ExecutorDriver* driver, const TaskInfo& task)
{
CHECK_EQ(REGISTERED, state);
if (launched) {
TaskStatus status;
status.mutable_task_id()->MergeFrom(task.task_id());
status.set_state(TASK_FAILED);
status.set_message(
"Attempted to run multiple tasks using a \"command\" executor");
driver->sendStatusUpdate(status);
return;
}
// Capture the TaskID.
taskId = task.task_id();
// Determine the command to launch the task.
CommandInfo command;
if (taskCommand.isSome()) {
// Get CommandInfo from a JSON string.
Try<JSON::Object> object = JSON::parse<JSON::Object>(taskCommand.get());
if (object.isError()) {
cerr << "Failed to parse JSON: " << object.error() << endl;
abort();
}
Try<CommandInfo> parse = protobuf::parse<CommandInfo>(object.get());
if (parse.isError()) {
cerr << "Failed to parse protobuf: " << parse.error() << endl;
abort();
}
command = parse.get();
} else if (task.has_command()) {
command = task.command();
} else {
CHECK_SOME(override)
<< "Expecting task '" << task.task_id()
<< "' to have a command!";
}
if (override.isNone()) {
// TODO(jieyu): For now, we just fail the executor if the task's
// CommandInfo is not valid. The framework will receive
// TASK_FAILED for the task, and will most likely find out the
// cause with some debugging. This is a temporary solution. A more
// correct solution is to perform this validation at master side.
if (command.shell()) {
CHECK(command.has_value())
<< "Shell command of task '" << task.task_id()
<< "' is not specified!";
} else {
CHECK(command.has_value())
<< "Executable of task '" << task.task_id()
<< "' is not specified!";
}
}
cout << "Starting task " << task.task_id() << endl;
// TODO(benh): Clean this up with the new 'Fork' abstraction.
// Use pipes to determine which child has successfully changed
// session. This is needed as the setsid call can fail from other
// processes having the same group id.
int pipes[2];
if (pipe(pipes) < 0) {
perror("Failed to create a pipe");
abort();
}
// Set the FD_CLOEXEC flags on these pipes.
Try<Nothing> cloexec = os::cloexec(pipes[0]);
if (cloexec.isError()) {
cerr << "Failed to cloexec(pipe[0]): " << cloexec.error() << endl;
abort();
}
cloexec = os::cloexec(pipes[1]);
if (cloexec.isError()) {
cerr << "Failed to cloexec(pipe[1]): " << cloexec.error() << endl;
abort();
}
Option<string> rootfs;
if (sandboxDirectory.isSome()) {
// If 'sandbox_diretory' is specified, that means the user
// task specifies a root filesystem, and that root filesystem has
// already been prepared at COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH.
// The command executor is responsible for mounting the sandbox
// into the root filesystem, chrooting into it and changing the
// user before exec-ing the user process.
//
// TODO(gilbert): Consider a better way to detect if a root
// filesystem is specified for the command task.
#ifdef __linux__
Result<string> user = os::user();
if (user.isError()) {
cerr << "Failed to get current user: " << user.error() << endl;
abort();
} else if (user.isNone()) {
cerr << "Current username is not found" << endl;
abort();
} else if (user.get() != "root") {
cerr << "The command executor requires root with rootfs" << endl;
abort();
}
rootfs = path::join(
os::getcwd(), COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH);
string sandbox = path::join(rootfs.get(), sandboxDirectory.get());
if (!os::exists(sandbox)) {
Try<Nothing> mkdir = os::mkdir(sandbox);
if (mkdir.isError()) {
cerr << "Failed to create sandbox mount point at '"
<< sandbox << "': " << mkdir.error() << endl;
abort();
}
}
// Mount the sandbox into the container rootfs.
// We need to perform a recursive mount because we want all the
// volume mounts in the sandbox to be also mounted in the container
// root filesystem. However, since the container root filesystem
// is also mounted in the sandbox, after the recursive mount we
// also need to unmount the root filesystem in the mounted sandbox.
Try<Nothing> mount = fs::mount(
os::getcwd(),
sandbox,
None(),
MS_BIND | MS_REC,
NULL);
if (mount.isError()) {
cerr << "Unable to mount the work directory into container "
<< "rootfs: " << mount.error() << endl;;
abort();
}
// Umount the root filesystem path in the mounted sandbox after
// the recursive mount.
Try<Nothing> unmountAll = fs::unmountAll(path::join(
sandbox,
COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH));
if (unmountAll.isError()) {
cerr << "Unable to unmount rootfs under mounted sandbox: "
<< unmountAll.error() << endl;
abort();
}
#else
cerr << "Not expecting root volume with non-linux platform." << endl;
abort();
#endif // __linux__
}
// Prepare the argv before fork as it's not async signal safe.
char **argv = new char*[command.arguments().size() + 1];
for (int i = 0; i < command.arguments().size(); i++) {
argv[i] = (char*) command.arguments(i).c_str();
}
argv[command.arguments().size()] = NULL;
// Prepare the command log message.
string commandString;
if (override.isSome()) {
char** argv = override.get();
// argv is guaranteed to be NULL terminated and we rely on
// that fact to print command to be executed.
for (int i = 0; argv[i] != NULL; i++) {
commandString += string(argv[i]) + " ";
}
} else if (command.shell()) {
commandString = "sh -c '" + command.value() + "'";
} else {
commandString =
"[" + command.value() + ", " +
strings::join(", ", command.arguments()) + "]";
}
if ((pid = fork()) == -1) {
cerr << "Failed to fork to run " << commandString << ": "
<< os::strerror(errno) << endl;
abort();
}
// TODO(jieyu): Make the child process async signal safe.
if (pid == 0) {
// In child process, we make cleanup easier by putting process
// into it's own session.
os::close(pipes[0]);
// NOTE: We setsid() in a loop because setsid() might fail if another
// process has the same process group id as the calling process.
while ((pid = setsid()) == -1) {
perror("Could not put command in its own session, setsid");
cout << "Forking another process and retrying" << endl;
if ((pid = fork()) == -1) {
perror("Failed to fork to launch command");
abort();
}
if (pid > 0) {
// In parent process. It is ok to suicide here, because
// we're not watching this process.
exit(0);
}
}
if (write(pipes[1], &pid, sizeof(pid)) != sizeof(pid)) {
perror("Failed to write PID on pipe");
abort();
}
os::close(pipes[1]);
if (rootfs.isSome()) {
#ifdef __linux__
if (user.isSome()) {
// This is a work around to fix the problem that after we chroot
// os::su call afterwards failed because the linker may not be
// able to find the necessary library in the rootfs.
// We call os::su before chroot here to force the linker to load
// into memory.
// We also assume it's safe to su to "root" user since
// filesystem/linux.cpp checks for root already.
os::su("root");
}
Try<Nothing> chroot = fs::chroot::enter(rootfs.get());
if (chroot.isError()) {
cerr << "Failed to enter chroot '" << rootfs.get()
<< "': " << chroot.error() << endl;;
abort();
}
// Determine the current working directory for the executor.
string cwd;
if (workingDirectory.isSome()) {
cwd = workingDirectory.get();
} else {
CHECK_SOME(sandboxDirectory);
cwd = sandboxDirectory.get();
}
Try<Nothing> chdir = os::chdir(cwd);
if (chdir.isError()) {
cerr << "Failed to chdir into current working directory '"
<< cwd << "': " << chdir.error() << endl;
abort();
}
if (user.isSome()) {
Try<Nothing> su = os::su(user.get());
if (su.isError()) {
cerr << "Failed to change user to '" << user.get() << "': "
<< su.error() << endl;
abort();
}
}
#else
cerr << "Rootfs is only supported on Linux" << endl;
abort();
#endif // __linux__
}
cout << commandString << endl;
// The child has successfully setsid, now run the command.
if (override.isNone()) {
if (command.shell()) {
execlp(
"sh",
"sh",
"-c",
command.value().c_str(),
(char*) NULL);
} else {
execvp(command.value().c_str(), argv);
}
} else {
char** argv = override.get();
execvp(argv[0], argv);
}
perror("Failed to exec");
abort();
}
delete[] argv;
// In parent process.
os::close(pipes[1]);
// Get the child's pid via the pipe.
if (read(pipes[0], &pid, sizeof(pid)) == -1) {
cerr << "Failed to get child PID from pipe, read: "
<< os::strerror(errno) << endl;
abort();
}
os::close(pipes[0]);
cout << "Forked command at " << pid << endl;
launchHealthCheck(task);
// Monitor this process.
process::reap(pid)
.onAny(defer(self(), &Self::reaped, driver, pid, lambda::_1));
TaskStatus status;
status.mutable_task_id()->MergeFrom(task.task_id());
status.set_state(TASK_RUNNING);
driver->sendStatusUpdate(status);
launched = true;
}
void killTask(ExecutorDriver* driver, const TaskID& taskId)
{
cout << "Received killTask" << endl;
// Since the command executor manages a single task, we
// shutdown completely when we receive a killTask.
shutdown(driver);
}
void frameworkMessage(ExecutorDriver* driver, const string& data) {}
void shutdown(ExecutorDriver* driver)
{
cout << "Shutting down" << endl;
if (launched && !killed) {
// Send TASK_KILLING if the framework can handle it.
CHECK_SOME(frameworkInfo);
CHECK_SOME(taskId);
foreach (const FrameworkInfo::Capability& c,
frameworkInfo->capabilities()) {
if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) {
TaskStatus status;
status.mutable_task_id()->CopyFrom(taskId.get());
status.set_state(TASK_KILLING);
driver->sendStatusUpdate(status);
break;
}
}
// Now perform signal escalation to begin killing the task.
CHECK_GT(pid, 0);
cout << "Sending SIGTERM to process tree at pid " << pid << endl;
Try<std::list<os::ProcessTree> > trees =
os::killtree(pid, SIGTERM, true, true);
if (trees.isError()) {
cerr << "Failed to kill the process tree rooted at pid " << pid
<< ": " << trees.error() << endl;
// Send SIGTERM directly to process 'pid' as it may not have
// received signal before os::killtree() failed.
::kill(pid, SIGTERM);
} else {
cout << "Sent SIGTERM to the following process trees:\n"
<< stringify(trees.get()) << endl;
}
// TODO(nnielsen): Make escalationTimeout configurable through
// slave flags and/or per-framework/executor.
escalationTimer = delay(
escalationTimeout,
self(),
&Self::escalated);
killed = true;
}
// Cleanup health check process.
//
// TODO(bmahler): Consider doing this after the task has been
// reaped, since a framework may be interested in health
// information while the task is being killed (consider a
// task that takes 30 minutes to be cleanly killed).
if (healthPid != -1) {
os::killtree(healthPid, SIGKILL);
}
}
virtual void error(ExecutorDriver* driver, const string& message) {}
protected:
virtual void initialize()
{
install<TaskHealthStatus>(
&CommandExecutorProcess::taskHealthUpdated,
&TaskHealthStatus::task_id,
&TaskHealthStatus::healthy,
&TaskHealthStatus::kill_task);
}
void taskHealthUpdated(
const TaskID& taskID,
const bool& healthy,
const bool& initiateTaskKill)
{
if (driver.isNone()) {
return;
}
cout << "Received task health update, healthy: "
<< stringify(healthy) << endl;
TaskStatus status;
status.mutable_task_id()->CopyFrom(taskID);
status.set_healthy(healthy);
status.set_state(TASK_RUNNING);
driver.get()->sendStatusUpdate(status);
if (initiateTaskKill) {
killedByHealthCheck = true;
killTask(driver.get(), taskID);
}
}
private:
void reaped(
ExecutorDriver* driver,
pid_t pid,
const Future<Option<int> >& status_)
{
TaskState taskState;
string message;
Clock::cancel(escalationTimer);
if (!status_.isReady()) {
taskState = TASK_FAILED;
message =
"Failed to get exit status for Command: " +
(status_.isFailed() ? status_.failure() : "future discarded");
} else if (status_.get().isNone()) {
taskState = TASK_FAILED;
message = "Failed to get exit status for Command";
} else {
int status = status_.get().get();
CHECK(WIFEXITED(status) || WIFSIGNALED(status)) << status;
if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
taskState = TASK_FINISHED;
} else if (killed) {
// Send TASK_KILLED if the task was killed as a result of
// killTask() or shutdown().
taskState = TASK_KILLED;
} else {
taskState = TASK_FAILED;
}
message = "Command " + WSTRINGIFY(status);
}
cout << message << " (pid: " << pid << ")" << endl;
CHECK_SOME(taskId);
TaskStatus taskStatus;
taskStatus.mutable_task_id()->MergeFrom(taskId.get());
taskStatus.set_state(taskState);
taskStatus.set_message(message);
if (killed && killedByHealthCheck) {
taskStatus.set_healthy(false);
}
driver->sendStatusUpdate(taskStatus);
// This is a hack to ensure the message is sent to the
// slave before we exit the process. Without this, we
// may exit before libprocess has sent the data over
// the socket. See MESOS-4111.
os::sleep(Seconds(1));
driver->stop();
}
void escalated()
{
cout << "Process " << pid << " did not terminate after "
<< escalationTimeout << ", sending SIGKILL to "
<< "process tree at " << pid << endl;
// TODO(nnielsen): Sending SIGTERM in the first stage of the
// shutdown may leave orphan processes hanging off init. This
// scenario will be handled when PID namespace encapsulated
// execution is in place.
Try<std::list<os::ProcessTree> > trees =
os::killtree(pid, SIGKILL, true, true);
if (trees.isError()) {
cerr << "Failed to kill the process tree rooted at pid "
<< pid << ": " << trees.error() << endl;
// Process 'pid' may not have received signal before
// os::killtree() failed. To make sure process 'pid' is reaped
// we send SIGKILL directly.
::kill(pid, SIGKILL);
} else {
cout << "Killed the following process trees:\n" << stringify(trees.get())
<< endl;
}
}
void launchHealthCheck(const TaskInfo& task)
{
if (task.has_health_check()) {
JSON::Object json = JSON::protobuf(task.health_check());
// Launch the subprocess using 'exec' style so that quotes can
// be properly handled.
vector<string> argv(4);
argv[0] = "mesos-health-check";
argv[1] = "--executor=" + stringify(self());
argv[2] = "--health_check_json=" + stringify(json);
argv[3] = "--task_id=" + task.task_id().value();
cout << "Launching health check process: "
<< path::join(healthCheckDir, "mesos-health-check")
<< " " << argv[1] << " " << argv[2] << " " << argv[3] << endl;
Try<Subprocess> healthProcess =
process::subprocess(
path::join(healthCheckDir, "mesos-health-check"),
argv,
// Intentionally not sending STDIN to avoid health check
// commands that expect STDIN input to block.
Subprocess::PATH("/dev/null"),
Subprocess::FD(STDOUT_FILENO),
Subprocess::FD(STDERR_FILENO));
if (healthProcess.isError()) {
cerr << "Unable to launch health process: " << healthProcess.error();
} else {
healthPid = healthProcess.get().pid();
cout << "Health check process launched at pid: "
<< stringify(healthPid) << endl;
}
}
}
enum State
{
REGISTERING, // Executor is launched but not (re-)registered yet.
REGISTERED, // Executor has (re-)registered.
} state;
bool launched;
bool killed;
bool killedByHealthCheck;
pid_t pid;
pid_t healthPid;
Duration escalationTimeout;
Timer escalationTimer;
Option<ExecutorDriver*> driver;
Option<FrameworkInfo> frameworkInfo;
Option<TaskID> taskId;
string healthCheckDir;
Option<char**> override;
Option<string> sandboxDirectory;
Option<string> workingDirectory;
Option<string> user;
Option<string> taskCommand;
};
class CommandExecutor: public Executor
{
public:
CommandExecutor(
const Option<char**>& override,
const string& healthCheckDir,
const Option<string>& sandboxDirectory,
const Option<string>& workingDirectory,
const Option<string>& user,
const Option<string>& taskCommand)
{
process = new CommandExecutorProcess(override,
healthCheckDir,
sandboxDirectory,
workingDirectory,
user,
taskCommand);
spawn(process);
}
virtual ~CommandExecutor()
{
terminate(process);
wait(process);
delete process;
}
virtual void registered(
ExecutorDriver* driver,
const ExecutorInfo& executorInfo,
const FrameworkInfo& frameworkInfo,
const SlaveInfo& slaveInfo)
{
dispatch(process,
&CommandExecutorProcess::registered,
driver,
executorInfo,
frameworkInfo,
slaveInfo);
}
virtual void reregistered(
ExecutorDriver* driver,
const SlaveInfo& slaveInfo)
{
dispatch(process,
&CommandExecutorProcess::reregistered,
driver,
slaveInfo);
}
virtual void disconnected(ExecutorDriver* driver)
{
dispatch(process, &CommandExecutorProcess::disconnected, driver);
}
virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
{
dispatch(process, &CommandExecutorProcess::launchTask, driver, task);
}
virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
{
dispatch(process, &CommandExecutorProcess::killTask, driver, taskId);
}
virtual void frameworkMessage(ExecutorDriver* driver, const string& data)
{
dispatch(process, &CommandExecutorProcess::frameworkMessage, driver, data);
}
virtual void shutdown(ExecutorDriver* driver)
{
dispatch(process, &CommandExecutorProcess::shutdown, driver);
}
virtual void error(ExecutorDriver* driver, const string& data)
{
dispatch(process, &CommandExecutorProcess::error, driver, data);
}
private:
CommandExecutorProcess* process;
};
} // namespace internal {
} // namespace mesos {
class Flags : public flags::FlagsBase
{
public:
Flags()
{
// TODO(gilbert): Deprecate the 'override' flag since no one is
// using it, and it may cause confusing with 'task_command' flag.
add(&override,
"override",
"Whether to override the command the executor should run when the\n"
"task is launched. Only this flag is expected to be on the command\n"
"line and all arguments after the flag will be used as the\n"
"subsequent 'argv' to be used with 'execvp'",
false);
// The following flags are only applicable when a rootfs is
// provisioned for this command.
add(&sandbox_directory,
"sandbox_directory",
"The absolute path for the directory in the container where the\n"
"sandbox is mapped to");
add(&working_directory,
"working_directory",
"The working directory for the task in the container.");
add(&user,
"user",
"The user that the task should be running as.");
add(&task_command,
"task_command",
"If specified, this is the overrided command for launching the\n"
"task (instead of the command from TaskInfo).");
// TODO(nnielsen): Add 'prefix' option to enable replacing
// 'sh -c' with user specified wrapper.
}
bool override;
Option<string> sandbox_directory;
Option<string> working_directory;
Option<string> user;
Option<string> task_command;
};
int main(int argc, char** argv)
{
Flags flags;
// Load flags from command line.
Try<Nothing> load = flags.load(None(), &argc, &argv);
if (load.isError()) {
cerr << flags.usage(load.error()) << endl;
return EXIT_FAILURE;
}
if (flags.help) {
cout << flags.usage() << endl;
return EXIT_SUCCESS;
}
// After flags.load(..., &argc, &argv) all flags will have been
// stripped from argv. Additionally, arguments after a "--"
// terminator will be preservered in argv and it is therefore
// possible to pass override and prefix commands which use
// "--foobar" style flags.
Option<char**> override = None();
if (flags.override) {
if (argc > 1) {
override = argv + 1;
}
}
const Option<string> envPath = os::getenv("MESOS_LAUNCHER_DIR");
string path = envPath.isSome()
? envPath.get()
: os::realpath(Path(argv[0]).dirname()).get();
mesos::internal::CommandExecutor executor(
override,
path,
flags.sandbox_directory,
flags.working_directory,
flags.user,
flags.task_command);
mesos::MesosExecutorDriver driver(&executor);
return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
}