blob: fa8029325122f6b1f4ce2ea8492e7a33a7a982f3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <errno.h>
#include <signal.h>
#include <stdio.h> // For perror.
#include <string.h>
#include <list>
#include <map>
#include <set>
#include <process/clock.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <stout/check.hpp>
#include <stout/exit.hpp>
#include <stout/foreach.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/uuid.hpp>
#include "common/type_utils.hpp"
#include "slave/flags.hpp"
#include "slave/process_isolator.hpp"
#include "slave/state.hpp"
using namespace process;
using std::map;
using std::set;
using std::string;
using process::defer;
using process::wait; // Necessary on some OS's to disambiguate.
namespace mesos {
namespace internal {
namespace slave {
using launcher::ExecutorLauncher;
using state::SlaveState;
using state::FrameworkState;
using state::ExecutorState;
using state::RunState;
ProcessIsolator::ProcessIsolator()
: ProcessBase(ID::generate("process-isolator")),
initialized(false) {}
void ProcessIsolator::initialize(
const Flags& _flags,
const Resources& _,
bool _local,
const PID<Slave>& _slave)
{
flags = _flags;
local = _local;
slave = _slave;
initialized = true;
}
void ProcessIsolator::launchExecutor(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const FrameworkInfo& frameworkInfo,
const ExecutorInfo& executorInfo,
const UUID& uuid,
const string& directory,
const Resources& resources)
{
CHECK(initialized) << "Cannot launch executors before initialization!";
const ExecutorID& executorId = executorInfo.executor_id();
LOG(INFO) << "Launching " << executorId
<< " (" << executorInfo.command().value() << ")"
<< " in " << directory
<< " with resources " << resources
<< "' for framework " << frameworkId;
ProcessInfo* info = new ProcessInfo(frameworkId, executorId);
infos[frameworkId][executorId] = info;
// Use pipes to determine which child has successfully changed session.
int pipes[2];
if (pipe(pipes) < 0) {
PLOG(FATAL) << "Failed to create a pipe";
}
// Set the FD_CLOEXEC flags on these pipes
Try<Nothing> cloexec = os::cloexec(pipes[0]);
CHECK_SOME(cloexec) << "Error setting FD_CLOEXEC on pipe[0]";
cloexec = os::cloexec(pipes[1]);
CHECK_SOME(cloexec) << "Error setting FD_CLOEXEC on pipe[1]";
// Create the ExecutorLauncher instance before the fork for the
// child process to use.
ExecutorLauncher launcher(
slaveId,
frameworkId,
executorInfo.executor_id(),
uuid,
executorInfo.command(),
frameworkInfo.user(),
directory,
flags.work_dir,
slave,
flags.frameworks_home,
flags.hadoop_home,
!local,
flags.switch_user,
frameworkInfo.checkpoint(),
flags.recovery_timeout);
// We get the environment map for launching mesos-launcher before
// the fork, because we have seen deadlock issues with ostringstream
// in the forked process before it calls exec.
map<string, string> env = launcher.getLauncherEnvironment();
pid_t pid;
if ((pid = fork()) == -1) {
PLOG(FATAL) << "Failed to fork to launch new executor";
}
if (pid > 0) {
os::close(pipes[1]);
// Get the child's pid via the pipe.
if (read(pipes[0], &pid, sizeof(pid)) == -1) {
PLOG(FATAL) << "Failed to get child PID from pipe";
}
os::close(pipes[0]);
// In parent process.
LOG(INFO) << "Forked executor at " << pid;
// Record the pid (should also be the pgid since we setsid below).
infos[frameworkId][executorId]->pid = pid;
reaper.monitor(pid)
.onAny(defer(PID<ProcessIsolator>(this),
&ProcessIsolator::reaped,
pid,
lambda::_1));
// Tell the slave this executor has started.
dispatch(slave, &Slave::executorStarted, frameworkId, executorId, pid);
} else {
// In child process, we make cleanup easier by putting process
// into it's own session. DO NOT USE GLOG!
os::close(pipes[0]);
// NOTE: We setsid() in a loop because setsid() might fail if another
// process has the same process group id as the calling process.
while ((pid = setsid()) == -1) {
perror("Could not put executor in its own session");
std::cout << "Forking another process and retrying ..." << std::endl;
if ((pid = fork()) == -1) {
perror("Failed to fork to launch executor");
abort();
}
if (pid > 0) {
// In parent process.
exit(0);
}
}
if (write(pipes[1], &pid, sizeof(pid)) != sizeof(pid)) {
perror("Failed to write PID on pipe");
abort();
}
os::close(pipes[1]);
// Setup the environment for launcher.
foreachpair (const string& key, const string& value, env) {
os::setenv(key, value);
}
const char** args = (const char**) new char*[2];
// Determine path for mesos-launcher.
Try<string> realpath = os::realpath(
path::join(flags.launcher_dir, "mesos-launcher"));
if (realpath.isError()) {
EXIT(1) << "Failed to determine the canonical path "
<< "for the mesos-launcher '"
<< path::join(flags.launcher_dir, "mesos-launcher")
<< "': " << realpath.error();
}
// Grab a copy of the path so that we can reliably use 'c_str()'.
const string& path = realpath.get();
args[0] = path.c_str();
args[1] = NULL;
// Execute the mesos-launcher!
execvp(args[0], (char* const*) args);
// If we get here, the execvp call failed.
perror("Failed to execvp the mesos-launcher");
abort();
}
}
// NOTE: This function can be called by the isolator itself or by the
// slave if it doesn't hear about an executor exit after it sends a
// shutdown message.
void ProcessIsolator::killExecutor(
const FrameworkID& frameworkId,
const ExecutorID& executorId)
{
CHECK(initialized) << "Cannot kill executors before initialization!";
if (!infos.contains(frameworkId) ||
!infos[frameworkId].contains(executorId) ||
infos[frameworkId][executorId]->killed) {
LOG(ERROR) << "Asked to kill an unknown/killed executor! " << executorId;
return;
}
const Option<pid_t>& pid = infos[frameworkId][executorId]->pid;
if (pid.isSome()) {
// TODO(vinod): Call killtree on the pid of the actual executor process
// that is running the tasks (stored in the local storage by the
// executor module).
Try<std::list<os::ProcessTree> > trees =
os::killtree(pid.get(), SIGKILL, true, true);
if (trees.isError()) {
LOG(WARNING) << "Failed to kill the process tree rooted at pid "
<< pid.get() << ": " << trees.error();
} else {
LOG(INFO) << "Killed the following process trees:\n"
<< stringify(trees.get());
}
// Also kill all processes that belong to the process group of the executor.
// This is valuable in situations where the top level executor process
// exited and hence killtree is unable to kill any spawned orphans.
// NOTE: This assumes that the process group id of the executor process is
// same as its pid (which is expected to be the case with setsid()).
// TODO(vinod): Also (recursively) kill processes belonging to the
// same session, but have a different process group id.
if (killpg(pid.get(), SIGKILL) == -1 && errno != ESRCH) {
PLOG(WARNING) << "Failed to kill process group " << pid.get();
}
infos[frameworkId][executorId]->killed = true;
}
}
void ProcessIsolator::resourcesChanged(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const Resources& resources)
{
CHECK(initialized) << "Cannot do resourcesChanged before initialization!";
if (!infos.contains(frameworkId) ||
!infos[frameworkId].contains(executorId) ||
infos[frameworkId][executorId]->killed) {
LOG(INFO) << "Asked to update resources for an unknown/killed executor '"
<< executorId << "' of framework " << frameworkId;
return;
}
ProcessInfo* info = CHECK_NOTNULL(infos[frameworkId][executorId]);
info->resources = resources;
// Do nothing; subclasses may override this.
}
Future<Nothing> ProcessIsolator::recover(
const Option<SlaveState>& state)
{
LOG(INFO) << "Recovering isolator";
if (state.isNone()) {
return Nothing();
}
foreachvalue (const FrameworkState& framework, state.get().frameworks) {
foreachvalue (const ExecutorState& executor, framework.executors) {
LOG(INFO) << "Recovering executor '" << executor.id
<< "' of framework " << framework.id;
if (executor.info.isNone()) {
LOG(WARNING) << "Skipping recovery of executor '" << executor.id
<< "' of framework " << framework.id
<< " because its info cannot be recovered";
continue;
}
if (executor.latest.isNone()) {
LOG(WARNING) << "Skipping recovery of executor '" << executor.id
<< "' of framework " << framework.id
<< " because its latest run cannot be recovered";
continue;
}
// We are only interested in the latest run of the executor!
const UUID& uuid = executor.latest.get();
CHECK(executor.runs.contains(uuid));
const RunState& run = executor.runs.get(uuid).get();
if (run.completed) {
VLOG(1) << "Skipping recovery of executor '" << executor.id
<< "' of framework " << framework.id
<< " because its latest run " << uuid << " is completed";
continue;
}
ProcessInfo* info =
new ProcessInfo(framework.id, executor.id, run.forkedPid);
infos[framework.id][executor.id] = info;
// Add the pid to the reaper to monitor exit status.
if (run.forkedPid.isSome()) {
reaper.monitor(run.forkedPid.get())
.onAny(defer(PID<ProcessIsolator>(this),
&ProcessIsolator::reaped,
run.forkedPid.get(),
lambda::_1));
}
}
}
return Nothing();
}
Future<ResourceStatistics> ProcessIsolator::usage(
const FrameworkID& frameworkId,
const ExecutorID& executorId)
{
if (!infos.contains(frameworkId) ||
!infos[frameworkId].contains(executorId) ||
infos[frameworkId][executorId]->killed) {
return Future<ResourceStatistics>::failed("Unknown/killed executor");
}
ProcessInfo* info = infos[frameworkId][executorId];
CHECK_NOTNULL(info);
ResourceStatistics result;
result.set_timestamp(Clock::now().secs());
// Set the resource allocations.
const Option<Bytes>& mem = info->resources.mem();
if (mem.isSome()) {
result.set_mem_limit_bytes(mem.get().bytes());
}
const Option<double>& cpus = info->resources.cpus();
if (cpus.isSome()) {
result.set_cpus_limit(cpus.get());
}
CHECK_SOME(info->pid);
Result<os::Process> process = os::process(info->pid.get());
if (!process.isSome()) {
return Future<ResourceStatistics>::failed(
process.isError() ? process.error() : "Process does not exist");
}
result.set_timestamp(Clock::now().secs());
if (process.get().rss.isSome()) {
result.set_mem_rss_bytes(process.get().rss.get().bytes());
}
// We only show utime and stime when both are available, otherwise
// we're exposing a partial view of the CPU times.
if (process.get().utime.isSome() && process.get().stime.isSome()) {
result.set_cpus_user_time_secs(process.get().utime.get().secs());
result.set_cpus_system_time_secs(process.get().stime.get().secs());
}
// Now aggregate all descendant process usage statistics.
const Try<set<pid_t> >& children = os::children(info->pid.get(), true);
if (children.isError()) {
return Future<ResourceStatistics>::failed(
"Failed to get children of " + stringify(info->pid.get()) + ": " +
children.error());
}
// Aggregate the usage of all child processes.
foreach (pid_t child, children.get()) {
process = os::process(child);
// Skip processes that disappear.
if (process.isNone()) {
continue;
}
if (process.isError()) {
LOG(WARNING) << "Failed to get status of descendant process " << child
<< " of parent " << info->pid.get() << ": "
<< process.error();
continue;
}
if (process.get().rss.isSome()) {
result.set_mem_rss_bytes(
result.mem_rss_bytes() + process.get().rss.get().bytes());
}
// We only show utime and stime when both are available, otherwise
// we're exposing a partial view of the CPU times.
if (process.get().utime.isSome() && process.get().stime.isSome()) {
result.set_cpus_user_time_secs(
result.cpus_user_time_secs() + process.get().utime.get().secs());
result.set_cpus_system_time_secs(
result.cpus_system_time_secs() + process.get().stime.get().secs());
}
}
return result;
}
void ProcessIsolator::reaped(pid_t pid, const Future<Option<int> >& status)
{
foreachkey (const FrameworkID& frameworkId, infos) {
foreachkey (const ExecutorID& executorId, infos[frameworkId]) {
ProcessInfo* info = infos[frameworkId][executorId];
if (info->pid.isSome() && info->pid.get() == pid) {
if (!status.isReady()) {
LOG(ERROR) << "Failed to get the status for executor '" << executorId
<< "' of framework " << frameworkId << ": "
<< (status.isFailed() ? status.failure() : "discarded");
return;
}
LOG(INFO) << "Telling slave of terminated executor '" << executorId
<< "' of framework " << frameworkId;
dispatch(slave,
&Slave::executorTerminated,
frameworkId,
executorId,
status.get(),
false,
"Executor terminated");
if (!info->killed) {
// Try and cleanup after the executor.
killExecutor(frameworkId, executorId);
}
if (infos[frameworkId].size() == 1) {
infos.erase(frameworkId);
} else {
infos[frameworkId].erase(executorId);
}
delete info;
return;
}
}
}
}
} // namespace slave {
} // namespace internal {
} // namespace mesos {