| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <signal.h> |
| #include <stdio.h> |
| |
| #include <sys/wait.h> |
| |
| #include <iostream> |
| #include <list> |
| #include <string> |
| #include <vector> |
| |
| #include <mesos/executor.hpp> |
| |
| #include <process/defer.hpp> |
| #include <process/delay.hpp> |
| #include <process/future.hpp> |
| #include <process/io.hpp> |
| #include <process/process.hpp> |
| #include <process/protobuf.hpp> |
| #include <process/subprocess.hpp> |
| #include <process/reap.hpp> |
| #include <process/timer.hpp> |
| |
| #include <stout/duration.hpp> |
| #include <stout/flags.hpp> |
| #include <stout/path.hpp> |
| #include <stout/protobuf.hpp> |
| #include <stout/lambda.hpp> |
| #include <stout/option.hpp> |
| #include <stout/os.hpp> |
| #include <stout/strings.hpp> |
| |
| #include "common/http.hpp" |
| #include "common/type_utils.hpp" |
| #include "common/status_utils.hpp" |
| |
| #include "logging/logging.hpp" |
| |
| #include "messages/messages.hpp" |
| |
| #include "slave/constants.hpp" |
| |
| using process::wait; // Necessary on some OS's to disambiguate. |
| |
| using std::cout; |
| using std::cerr; |
| using std::endl; |
| using std::string; |
| using std::vector; |
| |
| namespace mesos { |
| namespace internal { |
| |
| using namespace process; |
| |
| class CommandExecutorProcess : public ProtobufProcess<CommandExecutorProcess> |
| { |
| public: |
| CommandExecutorProcess(Option<char**> override, const string& _healthCheckDir) |
| : launched(false), |
| killed(false), |
| killedByHealthCheck(false), |
| pid(-1), |
| healthPid(-1), |
| escalationTimeout(slave::EXECUTOR_SIGNAL_ESCALATION_TIMEOUT), |
| driver(None()), |
| healthCheckDir(_healthCheckDir), |
| override(override) {} |
| |
| virtual ~CommandExecutorProcess() {} |
| |
| void registered( |
| ExecutorDriver* _driver, |
| const ExecutorInfo& executorInfo, |
| const FrameworkInfo& frameworkInfo, |
| const SlaveInfo& slaveInfo) |
| { |
| cout << "Registered executor on " << slaveInfo.hostname() << endl; |
| driver = _driver; |
| } |
| |
| void reregistered( |
| ExecutorDriver* driver, |
| const SlaveInfo& slaveInfo) |
| { |
| cout << "Re-registered executor on " << slaveInfo.hostname() << endl; |
| } |
| |
| void disconnected(ExecutorDriver* driver) {} |
| |
| void launchTask(ExecutorDriver* driver, const TaskInfo& task) |
| { |
| if (launched) { |
| TaskStatus status; |
| status.mutable_task_id()->MergeFrom(task.task_id()); |
| status.set_state(TASK_FAILED); |
| status.set_message( |
| "Attempted to run multiple tasks using a \"command\" executor"); |
| |
| driver->sendStatusUpdate(status); |
| return; |
| } |
| |
| // Skip sanity checks for TaskInfo if override is provided since |
| // the executor will be running the override command. |
| if (override.isNone()) { |
| // Sanity checks. |
| CHECK(task.has_command()) << "Expecting task " << task.task_id() |
| << " to have a command!"; |
| |
| // TODO(jieyu): For now, we just fail the executor if the task's |
| // CommandInfo is not valid. The framework will receive |
| // TASK_FAILED for the task, and will most likely find out the |
| // cause with some debugging. This is a temporary solution. A more |
| // correct solution is to perform this validation at master side. |
| if (task.command().shell()) { |
| CHECK(task.command().has_value()) |
| << "Shell command of task " << task.task_id() |
| << " is not specified!"; |
| } else { |
| CHECK(task.command().has_value()) |
| << "Executable of task " << task.task_id() |
| << " is not specified!"; |
| } |
| } |
| |
| cout << "Starting task " << task.task_id() << endl; |
| |
| // TODO(benh): Clean this up with the new 'Fork' abstraction. |
| // Use pipes to determine which child has successfully changed |
| // session. This is needed as the setsid call can fail from other |
| // processes having the same group id. |
| int pipes[2]; |
| if (pipe(pipes) < 0) { |
| perror("Failed to create a pipe"); |
| abort(); |
| } |
| |
| // Set the FD_CLOEXEC flags on these pipes |
| Try<Nothing> cloexec = os::cloexec(pipes[0]); |
| if (cloexec.isError()) { |
| cerr << "Failed to cloexec(pipe[0]): " << cloexec.error() << endl; |
| abort(); |
| } |
| |
| cloexec = os::cloexec(pipes[1]); |
| if (cloexec.isError()) { |
| cerr << "Failed to cloexec(pipe[1]): " << cloexec.error() << endl; |
| abort(); |
| } |
| |
| // Prepare the argv before fork as it's not async signal safe. |
| char **argv = new char*[task.command().arguments().size() + 1]; |
| for (int i = 0; i < task.command().arguments().size(); i++) { |
| argv[i] = (char*) task.command().arguments(i).c_str(); |
| } |
| argv[task.command().arguments().size()] = NULL; |
| |
| // Prepare the command log message. |
| string command; |
| if (override.isSome()) { |
| char** argv = override.get(); |
| // argv is guaranteed to be NULL terminated and we rely on |
| // that fact to print command to be executed. |
| for (int i = 0; argv[i] != NULL; i++) { |
| command += string(argv[i]) + " "; |
| } |
| } else if (task.command().shell()) { |
| command = "sh -c '" + task.command().value() + "'"; |
| } else { |
| command = |
| "[" + task.command().value() + ", " + |
| strings::join(", ", task.command().arguments()) + "]"; |
| } |
| |
| if ((pid = fork()) == -1) { |
| cerr << "Failed to fork to run " << command << ": " |
| << strerror(errno) << endl; |
| abort(); |
| } |
| |
| // TODO(jieyu): Make the child process async signal safe. |
| if (pid == 0) { |
| // In child process, we make cleanup easier by putting process |
| // into it's own session. |
| os::close(pipes[0]); |
| |
| // NOTE: We setsid() in a loop because setsid() might fail if another |
| // process has the same process group id as the calling process. |
| while ((pid = setsid()) == -1) { |
| perror("Could not put command in its own session, setsid"); |
| |
| cout << "Forking another process and retrying" << endl; |
| |
| if ((pid = fork()) == -1) { |
| perror("Failed to fork to launch command"); |
| abort(); |
| } |
| |
| if (pid > 0) { |
| // In parent process. It is ok to suicide here, because |
| // we're not watching this process. |
| exit(0); |
| } |
| } |
| |
| if (write(pipes[1], &pid, sizeof(pid)) != sizeof(pid)) { |
| perror("Failed to write PID on pipe"); |
| abort(); |
| } |
| |
| os::close(pipes[1]); |
| |
| cout << command << endl; |
| |
| // The child has successfully setsid, now run the command. |
| if (override.isNone()) { |
| if (task.command().shell()) { |
| execl( |
| "/bin/sh", |
| "sh", |
| "-c", |
| task.command().value().c_str(), |
| (char*) NULL); |
| } else { |
| execvp(task.command().value().c_str(), argv); |
| } |
| } else { |
| char** argv = override.get(); |
| execvp(argv[0], argv); |
| } |
| |
| perror("Failed to exec"); |
| abort(); |
| } |
| |
| delete[] argv; |
| |
| // In parent process. |
| os::close(pipes[1]); |
| |
| // Get the child's pid via the pipe. |
| if (read(pipes[0], &pid, sizeof(pid)) == -1) { |
| cerr << "Failed to get child PID from pipe, read: " << strerror(errno) |
| << endl; |
| abort(); |
| } |
| |
| os::close(pipes[0]); |
| |
| cout << "Forked command at " << pid << endl; |
| |
| launchHealthCheck(task); |
| |
| // Monitor this process. |
| process::reap(pid) |
| .onAny(defer(self(), |
| &Self::reaped, |
| driver, |
| task.task_id(), |
| pid, |
| lambda::_1)); |
| |
| TaskStatus status; |
| status.mutable_task_id()->MergeFrom(task.task_id()); |
| status.set_state(TASK_RUNNING); |
| driver->sendStatusUpdate(status); |
| |
| launched = true; |
| } |
| |
| void killTask(ExecutorDriver* driver, const TaskID& taskId) |
| { |
| shutdown(driver); |
| if (healthPid != -1) { |
| // Cleanup health check process |
| ::kill(healthPid, SIGKILL); |
| } |
| } |
| |
| void frameworkMessage(ExecutorDriver* driver, const string& data) {} |
| |
| void shutdown(ExecutorDriver* driver) |
| { |
| cout << "Shutting down" << endl; |
| |
| if (pid > 0 && !killed) { |
| cout << "Sending SIGTERM to process tree at pid " |
| << pid << endl; |
| |
| Try<std::list<os::ProcessTree> > trees = |
| os::killtree(pid, SIGTERM, true, true); |
| |
| if (trees.isError()) { |
| cerr << "Failed to kill the process tree rooted at pid " |
| << pid << ": " << trees.error() << endl; |
| |
| // Send SIGTERM directly to process 'pid' as it may not have |
| // received signal before os::killtree() failed. |
| ::kill(pid, SIGTERM); |
| } else { |
| cout << "Killing the following process trees:\n" |
| << stringify(trees.get()) << endl; |
| } |
| |
| // TODO(nnielsen): Make escalationTimeout configurable through |
| // slave flags and/or per-framework/executor. |
| escalationTimer = delay( |
| escalationTimeout, |
| self(), |
| &Self::escalated); |
| |
| killed = true; |
| } |
| } |
| |
| virtual void error(ExecutorDriver* driver, const string& message) {} |
| |
| protected: |
| virtual void initialize() |
| { |
| install<TaskHealthStatus>( |
| &CommandExecutorProcess::taskHealthUpdated, |
| &TaskHealthStatus::task_id, |
| &TaskHealthStatus::healthy, |
| &TaskHealthStatus::kill_task); |
| } |
| |
| void taskHealthUpdated( |
| const TaskID& taskID, |
| const bool& healthy, |
| const bool& initiateTaskKill) |
| { |
| if (driver.isNone()) { |
| return; |
| } |
| |
| cout << "Received task health update, healthy: " |
| << stringify(healthy) << endl; |
| |
| TaskStatus status; |
| status.mutable_task_id()->CopyFrom(taskID); |
| status.set_healthy(healthy); |
| status.set_state(TASK_RUNNING); |
| driver.get()->sendStatusUpdate(status); |
| |
| if (initiateTaskKill) { |
| killedByHealthCheck = true; |
| killTask(driver.get(), taskID); |
| } |
| } |
| |
| |
| private: |
| void reaped( |
| ExecutorDriver* driver, |
| const TaskID& taskId, |
| pid_t pid, |
| const Future<Option<int> >& status_) |
| { |
| TaskState state; |
| string message; |
| |
| Timer::cancel(escalationTimer); |
| |
| if (!status_.isReady()) { |
| state = TASK_FAILED; |
| message = |
| "Failed to get exit status for Command: " + |
| (status_.isFailed() ? status_.failure() : "future discarded"); |
| } else if (status_.get().isNone()) { |
| state = TASK_FAILED; |
| message = "Failed to get exit status for Command"; |
| } else { |
| int status = status_.get().get(); |
| CHECK(WIFEXITED(status) || WIFSIGNALED(status)) << status; |
| |
| if (WIFEXITED(status) && WEXITSTATUS(status) == 0) { |
| state = TASK_FINISHED; |
| } else if (killed) { |
| // Send TASK_KILLED if the task was killed as a result of |
| // killTask() or shutdown(). |
| state = TASK_KILLED; |
| } else { |
| state = TASK_FAILED; |
| } |
| |
| message = "Command " + WSTRINGIFY(status); |
| } |
| |
| cout << message << " (pid: " << pid << ")" << endl; |
| |
| TaskStatus taskStatus; |
| taskStatus.mutable_task_id()->MergeFrom(taskId); |
| taskStatus.set_state(state); |
| taskStatus.set_message(message); |
| if (killed && killedByHealthCheck) { |
| taskStatus.set_healthy(false); |
| } |
| |
| driver->sendStatusUpdate(taskStatus); |
| |
| // A hack for now ... but we need to wait until the status update |
| // is sent to the slave before we shut ourselves down. |
| os::sleep(Seconds(1)); |
| driver->stop(); |
| } |
| |
| void escalated() |
| { |
| cout << "Process " << pid << " did not terminate after " |
| << escalationTimeout << ", sending SIGKILL to " |
| << "process tree at " << pid << endl; |
| |
| // TODO(nnielsen): Sending SIGTERM in the first stage of the |
| // shutdown may leave orphan processes hanging off init. This |
| // scenario will be handled when PID namespace encapsulated |
| // execution is in place. |
| Try<std::list<os::ProcessTree> > trees = |
| os::killtree(pid, SIGKILL, true, true); |
| |
| if (trees.isError()) { |
| cerr << "Failed to kill the process tree rooted at pid " |
| << pid << ": " << trees.error() << endl; |
| |
| // Process 'pid' may not have received signal before |
| // os::killtree() failed. To make sure process 'pid' is reaped |
| // we send SIGKILL directly. |
| ::kill(pid, SIGKILL); |
| } else { |
| cout << "Killed the following process trees:\n" << stringify(trees.get()) |
| << endl; |
| } |
| } |
| |
| void launchHealthCheck(const TaskInfo& task) |
| { |
| if (task.has_health_check()) { |
| JSON::Object json = JSON::Protobuf(task.health_check()); |
| |
| // Launch the subprocess using 'exec' style so that quotes can |
| // be properly handled. |
| vector<string> argv(4); |
| argv[0] = "mesos-health-check"; |
| argv[1] = "--executor=" + stringify(self()); |
| argv[2] = "--health_check_json=" + stringify(json); |
| argv[3] = "--task_id=" + task.task_id().value(); |
| |
| cout << "Launching health check process: " |
| << path::join(healthCheckDir, "mesos-health-check") |
| << " " << argv[1] << " " << argv[2] << " " << argv[3] << endl; |
| |
| Try<Subprocess> healthProcess = |
| process::subprocess( |
| path::join(healthCheckDir, "mesos-health-check"), |
| argv, |
| // Intentionally not sending STDIN to avoid health check |
| // commands that expect STDIN input to block. |
| Subprocess::PATH("/dev/null"), |
| Subprocess::FD(STDOUT_FILENO), |
| Subprocess::FD(STDERR_FILENO)); |
| |
| if (healthProcess.isError()) { |
| cerr << "Unable to launch health process: " << healthProcess.error(); |
| } else { |
| healthPid = healthProcess.get().pid(); |
| |
| cout << "Health check process launched at pid: " |
| << stringify(healthPid) << endl; |
| } |
| } |
| } |
| |
| bool launched; |
| bool killed; |
| bool killedByHealthCheck; |
| pid_t pid; |
| pid_t healthPid; |
| Duration escalationTimeout; |
| Timer escalationTimer; |
| Option<ExecutorDriver*> driver; |
| string healthCheckDir; |
| Option<char**> override; |
| }; |
| |
| |
| class CommandExecutor: public Executor |
| { |
| public: |
| CommandExecutor(Option<char**> override, string healthCheckDir) |
| { |
| process = new CommandExecutorProcess(override, healthCheckDir); |
| spawn(process); |
| } |
| |
| virtual ~CommandExecutor() |
| { |
| terminate(process); |
| wait(process); |
| delete process; |
| } |
| |
| virtual void registered( |
| ExecutorDriver* driver, |
| const ExecutorInfo& executorInfo, |
| const FrameworkInfo& frameworkInfo, |
| const SlaveInfo& slaveInfo) |
| { |
| dispatch(process, |
| &CommandExecutorProcess::registered, |
| driver, |
| executorInfo, |
| frameworkInfo, |
| slaveInfo); |
| } |
| |
| virtual void reregistered( |
| ExecutorDriver* driver, |
| const SlaveInfo& slaveInfo) |
| { |
| dispatch(process, |
| &CommandExecutorProcess::reregistered, |
| driver, |
| slaveInfo); |
| } |
| |
| virtual void disconnected(ExecutorDriver* driver) |
| { |
| dispatch(process, &CommandExecutorProcess::disconnected, driver); |
| } |
| |
| virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task) |
| { |
| dispatch(process, &CommandExecutorProcess::launchTask, driver, task); |
| } |
| |
| virtual void killTask(ExecutorDriver* driver, const TaskID& taskId) |
| { |
| dispatch(process, &CommandExecutorProcess::killTask, driver, taskId); |
| } |
| |
| virtual void frameworkMessage(ExecutorDriver* driver, const string& data) |
| { |
| dispatch(process, &CommandExecutorProcess::frameworkMessage, driver, data); |
| } |
| |
| virtual void shutdown(ExecutorDriver* driver) |
| { |
| dispatch(process, &CommandExecutorProcess::shutdown, driver); |
| } |
| |
| virtual void error(ExecutorDriver* driver, const string& data) |
| { |
| dispatch(process, &CommandExecutorProcess::error, driver, data); |
| } |
| |
| private: |
| CommandExecutorProcess* process; |
| }; |
| |
| } // namespace internal { |
| } // namespace mesos { |
| |
| |
| void usage(const char* argv0, const flags::FlagsBase& flags) |
| { |
| cerr << "Usage: " << os::basename(argv0).get() << " [...]" << endl |
| << endl |
| << "Supported options:" << endl |
| << flags.usage(); |
| } |
| |
| |
| class Flags : public flags::FlagsBase |
| { |
| public: |
| Flags() |
| { |
| add(&override, |
| "override", |
| "Whether or not to override the command the executor should run\n" |
| "when the task is launched. Only this flag is expected to be on\n" |
| "the command line and all arguments after the flag will be used as\n" |
| "the subsequent 'argv' to be used with 'execvp'", |
| false); |
| |
| // TODO(nnielsen): Add 'prefix' option to enable replacing |
| // 'sh -c' with user specified wrapper. |
| } |
| |
| bool override; |
| }; |
| |
| |
| int main(int argc, char** argv) |
| { |
| Flags flags; |
| |
| bool help; |
| flags.add(&help, |
| "help", |
| "Prints this help message", |
| false); |
| |
| // Load flags from environment and command line. |
| Try<Nothing> load = flags.load(None(), &argc, &argv); |
| |
| if (load.isError()) { |
| cerr << load.error() << endl; |
| usage(argv[0], flags); |
| return -1; |
| } |
| |
| if (help) { |
| usage(argv[0], flags); |
| return -1; |
| } |
| |
| // After flags.load(..., &argc, &argv) all flags will have been |
| // stripped from argv. Additionally, arguments after a "--" |
| // terminator will be preservered in argv and it is therefore |
| // possible to pass override and prefix commands which use |
| // "--foobar" style flags. |
| Option<char**> override = None(); |
| if (flags.override) { |
| if (argc > 1) { |
| override = argv + 1; |
| } |
| } |
| |
| string path = os::getenv("MESOS_LAUNCHER_DIR", false); |
| if (path.empty()) { |
| path = os::realpath(dirname(argv[0])).get(); |
| } |
| mesos::internal::CommandExecutor executor(override, path); |
| mesos::MesosExecutorDriver driver(&executor); |
| return driver.run() == mesos::DRIVER_STOPPED ? 0 : 1; |
| } |