| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <list> |
| #include <map> |
| #include <string> |
| #include <vector> |
| |
| #include <process/check.hpp> |
| #include <process/defer.hpp> |
| #include <process/delay.hpp> |
| #include <process/io.hpp> |
| #include <process/reap.hpp> |
| #include <process/subprocess.hpp> |
| |
| #include <stout/fs.hpp> |
| #include <stout/hashmap.hpp> |
| #include <stout/hashset.hpp> |
| #include <stout/os.hpp> |
| |
| #include "common/status_utils.hpp" |
| |
| #ifdef __linux__ |
| #include "linux/cgroups.hpp" |
| #endif // __linux__ |
| |
| #include "slave/paths.hpp" |
| #include "slave/slave.hpp" |
| |
| #include "slave/containerizer/containerizer.hpp" |
| #include "slave/containerizer/docker.hpp" |
| #include "slave/containerizer/fetcher.hpp" |
| |
| #include "slave/containerizer/isolators/cgroups/constants.hpp" |
| |
| #include "usage/usage.hpp" |
| |
| |
| using std::list; |
| using std::map; |
| using std::string; |
| using std::vector; |
| |
| using namespace process; |
| |
| namespace mesos { |
| namespace internal { |
| namespace slave { |
| |
| using state::SlaveState; |
| using state::FrameworkState; |
| using state::ExecutorState; |
| using state::RunState; |
| |
| |
| // Declared in header, see explanation there. |
| const string DOCKER_NAME_PREFIX = "mesos-"; |
| |
| // Declared in header, see explanation there. |
| const string DOCKER_NAME_SEPERATOR = "."; |
| |
| // Declared in header, see explanation there. |
| const string DOCKER_SYMLINK_DIRECTORY = "docker/links"; |
| |
| // Parse the ContainerID from a Docker container and return None if |
| // the container was not launched from Mesos. |
| Option<ContainerID> parse(const Docker::Container& container) |
| { |
| Option<string> name = None(); |
| |
| if (strings::startsWith(container.name, DOCKER_NAME_PREFIX)) { |
| name = strings::remove( |
| container.name, DOCKER_NAME_PREFIX, strings::PREFIX); |
| } else if (strings::startsWith(container.name, "/" + DOCKER_NAME_PREFIX)) { |
| name = strings::remove( |
| container.name, "/" + DOCKER_NAME_PREFIX, strings::PREFIX); |
| } |
| |
| if (name.isSome()) { |
| // For Mesos version < 0.23.0, the docker container name format |
| // was DOCKER_NAME_PREFIX + containerId, and starting with 0.23.0 |
| // it is changed to DOCKER_NAME_PREFIX + slaveId + |
| // DOCKER_NAME_SEPERATOR + containerId. |
| // To be backward compatible during upgrade, we still have to |
| // support the previous format. |
| // TODO(tnachen): Remove this check after deprecation cycle. |
| if (!strings::contains(name.get(), DOCKER_NAME_SEPERATOR)) { |
| ContainerID id; |
| id.set_value(name.get()); |
| return id; |
| } |
| |
| vector<string> parts = strings::split(name.get(), DOCKER_NAME_SEPERATOR); |
| if (parts.size() == 2 || parts.size() == 3) { |
| ContainerID id; |
| id.set_value(parts[1]); |
| return id; |
| } |
| } |
| |
| return None(); |
| } |
| |
| |
| Try<DockerContainerizer*> DockerContainerizer::create( |
| const Flags& flags, |
| Fetcher* fetcher) |
| { |
| Try<Docker*> create = Docker::create(flags.docker); |
| if (create.isError()) { |
| return Error("Failed to create docker: " + create.error()); |
| } |
| |
| Shared<Docker> docker(create.get()); |
| |
| if (flags.docker_mesos_image.isSome()) { |
| Try<Nothing> validateResult = docker->validateVersion(Version(1, 5, 0)); |
| if (validateResult.isError()) { |
| string message = "Docker with mesos images requires docker 1.5+"; |
| message += validateResult.error(); |
| return Error(message); |
| } |
| } |
| |
| return new DockerContainerizer(flags, fetcher, docker); |
| } |
| |
| |
| DockerContainerizer::DockerContainerizer( |
| const Owned<DockerContainerizerProcess>& _process) |
| : process(_process) |
| { |
| spawn(process.get()); |
| } |
| |
| |
| DockerContainerizer::DockerContainerizer( |
| const Flags& flags, |
| Fetcher* fetcher, |
| Shared<Docker> docker) |
| : process(new DockerContainerizerProcess(flags, fetcher, docker)) |
| { |
| spawn(process.get()); |
| } |
| |
| |
| DockerContainerizer::~DockerContainerizer() |
| { |
| terminate(process.get()); |
| process::wait(process.get()); |
| } |
| |
| |
| docker::Flags dockerFlags( |
| const Flags& flags, |
| const string& name, |
| const string& directory) |
| { |
| docker::Flags dockerFlags; |
| dockerFlags.container = name; |
| dockerFlags.docker = flags.docker; |
| dockerFlags.sandbox_directory = directory; |
| dockerFlags.mapped_directory = flags.sandbox_directory; |
| dockerFlags.stop_timeout = flags.docker_stop_timeout; |
| dockerFlags.launcher_dir = flags.launcher_dir; |
| return dockerFlags; |
| } |
| |
| |
| Try<DockerContainerizerProcess::Container*> |
| DockerContainerizerProcess::Container::create( |
| const ContainerID& id, |
| const Option<TaskInfo>& taskInfo, |
| const ExecutorInfo& executorInfo, |
| const string& directory, |
| const Option<string>& user, |
| const SlaveID& slaveId, |
| const PID<Slave>& slavePid, |
| bool checkpoint, |
| const Flags& flags) |
| { |
| // Before we do anything else we first make sure the stdout/stderr |
| // files exist and have the right file ownership. |
| Try<Nothing> touch = os::touch(path::join(directory, "stdout")); |
| |
| if (touch.isError()) { |
| return Error("Failed to touch 'stdout': " + touch.error()); |
| } |
| |
| touch = os::touch(path::join(directory, "stderr")); |
| |
| if (touch.isError()) { |
| return Error("Failed to touch 'stderr': " + touch.error()); |
| } |
| |
| if (user.isSome()) { |
| Try<Nothing> chown = os::chown(user.get(), directory); |
| |
| if (chown.isError()) { |
| return Error("Failed to chown: " + chown.error()); |
| } |
| } |
| |
| string dockerSymlinkPath = path::join( |
| paths::getSlavePath(flags.work_dir, slaveId), |
| DOCKER_SYMLINK_DIRECTORY); |
| |
| if (!os::exists(dockerSymlinkPath)) { |
| Try<Nothing> mkdir = os::mkdir(dockerSymlinkPath); |
| if (mkdir.isError()) { |
| return Error("Unable to create symlink folder for docker " + |
| dockerSymlinkPath + ": " + mkdir.error()); |
| } |
| } |
| |
| bool symlinked = false; |
| string containerWorkdir = directory; |
| // We need to symlink the sandbox directory if the directory |
| // path has a colon, as Docker CLI uses the colon as a seperator. |
| if (strings::contains(directory, ":")) { |
| containerWorkdir = path::join(dockerSymlinkPath, id.value()); |
| |
| Try<Nothing> symlink = ::fs::symlink(directory, containerWorkdir); |
| |
| if (symlink.isError()) { |
| return Error("Failed to symlink directory '" + directory + |
| "' to '" + containerWorkdir + "': " + symlink.error()); |
| } |
| |
| symlinked = true; |
| } |
| |
| Option<ContainerInfo> containerInfo = None(); |
| Option<CommandInfo> commandInfo = None(); |
| Option<std::map<string, string>> environment = None(); |
| bool launchesExecutorContainer = false; |
| if (taskInfo.isSome() && flags.docker_mesos_image.isSome()) { |
| // Override the container and command to launch an executor |
| // in a docker container. |
| ContainerInfo newContainerInfo; |
| |
| // Mounting in the docker socket so the executor can communicate to |
| // the host docker daemon. We are assuming the current instance is |
| // launching docker containers to the host daemon as well. |
| Volume* dockerSockVolume = newContainerInfo.add_volumes(); |
| dockerSockVolume->set_host_path(flags.docker_socket); |
| dockerSockVolume->set_container_path(flags.docker_socket); |
| dockerSockVolume->set_mode(Volume::RO); |
| |
| // Mounting in sandbox so the logs from the executor can be |
| // persisted over container failures. |
| Volume* sandboxVolume = newContainerInfo.add_volumes(); |
| sandboxVolume->set_host_path(containerWorkdir); |
| sandboxVolume->set_container_path(containerWorkdir); |
| sandboxVolume->set_mode(Volume::RW); |
| |
| ContainerInfo::DockerInfo dockerInfo; |
| dockerInfo.set_image(flags.docker_mesos_image.get()); |
| |
| newContainerInfo.mutable_docker()->CopyFrom(dockerInfo); |
| |
| docker::Flags dockerExecutorFlags = dockerFlags( |
| flags, |
| Container::name(slaveId, stringify(id)), |
| containerWorkdir); |
| |
| CommandInfo newCommandInfo; |
| // TODO(tnachen): Pass flags directly into docker run. |
| newCommandInfo.set_value( |
| path::join(flags.launcher_dir, "mesos-docker-executor") + |
| " " + stringify(dockerExecutorFlags)); |
| |
| newCommandInfo.set_shell(true); |
| |
| containerInfo = newContainerInfo; |
| commandInfo = newCommandInfo; |
| environment = executorEnvironment( |
| executorInfo, |
| containerWorkdir, |
| slaveId, |
| slavePid, |
| checkpoint, |
| flags, |
| false); |
| launchesExecutorContainer = true; |
| } |
| |
| return new Container( |
| id, |
| taskInfo, |
| executorInfo, |
| containerWorkdir, |
| user, |
| slaveId, |
| slavePid, |
| checkpoint, |
| symlinked, |
| flags, |
| commandInfo, |
| containerInfo, |
| environment, |
| launchesExecutorContainer); |
| } |
| |
| |
| Future<Nothing> DockerContainerizerProcess::fetch( |
| const ContainerID& containerId, |
| const SlaveID& slaveId) |
| { |
| CHECK(containers_.contains(containerId)); |
| Container* container = containers_[containerId]; |
| |
| return fetcher->fetch( |
| containerId, |
| container->command, |
| container->directory, |
| None(), |
| slaveId, |
| flags); |
| } |
| |
| |
| Future<Nothing> DockerContainerizerProcess::pull( |
| const ContainerID& containerId) |
| { |
| if (!containers_.contains(containerId)) { |
| return Failure("Container is already destroyed"); |
| } |
| |
| Container* container = containers_[containerId]; |
| container->state = Container::PULLING; |
| |
| string image = container->image(); |
| |
| Future<Docker::Image> future = docker->pull( |
| container->directory, |
| image, |
| container->forcePullImage()); |
| |
| containers_[containerId]->pull = future; |
| |
| return future.then(defer(self(), [=]() { |
| VLOG(1) << "Docker pull " << image << " completed"; |
| return Nothing(); |
| })); |
| } |
| |
| |
| Try<Nothing> DockerContainerizerProcess::checkpoint( |
| const ContainerID& containerId, |
| pid_t pid) |
| { |
| CHECK(containers_.contains(containerId)); |
| |
| Container* container = containers_[containerId]; |
| |
| container->executorPid = pid; |
| |
| if (container->checkpoint) { |
| const string& path = |
| slave::paths::getForkedPidPath( |
| slave::paths::getMetaRootDir(flags.work_dir), |
| container->slaveId, |
| container->executor.framework_id(), |
| container->executor.executor_id(), |
| containerId); |
| |
| LOG(INFO) << "Checkpointing pid " << pid << " to '" << path << "'"; |
| |
| return slave::state::checkpoint(path, stringify(pid)); |
| } |
| |
| return Nothing(); |
| } |
| |
| |
| Future<Nothing> DockerContainerizer::recover( |
| const Option<SlaveState>& state) |
| { |
| return dispatch( |
| process.get(), |
| &DockerContainerizerProcess::recover, |
| state); |
| } |
| |
| |
| Future<bool> DockerContainerizer::launch( |
| const ContainerID& containerId, |
| const ExecutorInfo& executorInfo, |
| const string& directory, |
| const Option<string>& user, |
| const SlaveID& slaveId, |
| const PID<Slave>& slavePid, |
| bool checkpoint) |
| { |
| return dispatch( |
| process.get(), |
| &DockerContainerizerProcess::launch, |
| containerId, |
| None(), |
| executorInfo, |
| directory, |
| user, |
| slaveId, |
| slavePid, |
| checkpoint); |
| } |
| |
| |
| Future<bool> DockerContainerizer::launch( |
| const ContainerID& containerId, |
| const TaskInfo& taskInfo, |
| const ExecutorInfo& executorInfo, |
| const string& directory, |
| const Option<string>& user, |
| const SlaveID& slaveId, |
| const PID<Slave>& slavePid, |
| bool checkpoint) |
| { |
| return dispatch( |
| process.get(), |
| &DockerContainerizerProcess::launch, |
| containerId, |
| taskInfo, |
| executorInfo, |
| directory, |
| user, |
| slaveId, |
| slavePid, |
| checkpoint); |
| } |
| |
| |
| Future<Nothing> DockerContainerizer::update( |
| const ContainerID& containerId, |
| const Resources& resources) |
| { |
| return dispatch( |
| process.get(), |
| &DockerContainerizerProcess::update, |
| containerId, |
| resources); |
| } |
| |
| |
| Future<ResourceStatistics> DockerContainerizer::usage( |
| const ContainerID& containerId) |
| { |
| return dispatch( |
| process.get(), |
| &DockerContainerizerProcess::usage, |
| containerId); |
| } |
| |
| |
| Future<containerizer::Termination> DockerContainerizer::wait( |
| const ContainerID& containerId) |
| { |
| return dispatch( |
| process.get(), |
| &DockerContainerizerProcess::wait, |
| containerId); |
| } |
| |
| |
| void DockerContainerizer::destroy(const ContainerID& containerId) |
| { |
| dispatch( |
| process.get(), |
| &DockerContainerizerProcess::destroy, |
| containerId, true); |
| } |
| |
| |
| Future<hashset<ContainerID>> DockerContainerizer::containers() |
| { |
| return dispatch(process.get(), &DockerContainerizerProcess::containers); |
| } |
| |
| |
| // A Subprocess async-safe "setup" helper used by |
| // DockerContainerizerProcess when launching the mesos-docker-executor |
| // that does a 'setsid' and then synchronizes with the parent. |
| static int setup(const string& directory) |
| { |
| // Put child into its own process session to prevent slave suicide |
| // on child process SIGKILL/SIGTERM. |
| if (::setsid() == -1) { |
| return errno; |
| } |
| |
| // Run the process in the specified directory. |
| if (!directory.empty()) { |
| if (::chdir(directory.c_str()) == -1) { |
| return errno; |
| } |
| } |
| |
| // Synchronize with parent process by reading a byte from stdin. |
| char c; |
| ssize_t length; |
| while ((length = read(STDIN_FILENO, &c, sizeof(c))) == -1 && errno == EINTR); |
| |
| if (length != sizeof(c)) { |
| // This will occur if the slave terminates during executor launch. |
| // There's a reasonable probability this will occur during slave |
| // restarts across a large/busy cluster. |
| ABORT("Failed to synchronize with slave (it has probably exited)"); |
| } |
| |
| return 0; |
| } |
| |
| |
| Future<Nothing> DockerContainerizerProcess::recover( |
| const Option<SlaveState>& state) |
| { |
| LOG(INFO) << "Recovering Docker containers"; |
| |
| if (state.isSome()) { |
| // Get the list of all Docker containers (running and exited) in |
| // order to remove any orphans and reconcile checkpointed executors. |
| // TODO(tnachen): Remove this when we expect users to have already |
| // upgraded to 0.23. |
| return docker->ps(true, DOCKER_NAME_PREFIX + state.get().id.value()) |
| .then(defer(self(), &Self::_recover, state.get(), lambda::_1)); |
| } |
| |
| return Nothing(); |
| } |
| |
| |
| Future<Nothing> DockerContainerizerProcess::_recover( |
| const SlaveState& state, |
| const list<Docker::Container>& _containers) |
| { |
| // Although the slave checkpoints executor pids, before 0.23 |
| // docker containers without custom executors didn't record the |
| // container type in the executor info, therefore the Docker |
| // containerizer doesn't know if it should recover that container |
| // as it could be launched from another containerizer. The |
| // workaround is to reconcile running Docker containers and see |
| // if we can find an known container that matches the |
| // checkpointed container id. |
| // TODO(tnachen): Remove this explicit reconciliation 0.24. |
| hashset<ContainerID> existingContainers; |
| // Tracks all the task containers that launched an executor in |
| // a docker container. |
| hashset<ContainerID> executorContainers; |
| foreach (const Docker::Container& container, _containers) { |
| Option<ContainerID> id = parse(container); |
| if (id.isSome()) { |
| existingContainers.insert(id.get()); |
| if (strings::contains(container.name, ".executor")) { |
| executorContainers.insert(id.get()); |
| } |
| } |
| } |
| |
| // Collection of pids that we've started reaping in order to |
| // detect very unlikely duplicate scenario (see below). |
| hashmap<ContainerID, pid_t> pids; |
| |
| foreachvalue (const FrameworkState& framework, state.frameworks) { |
| foreachvalue (const ExecutorState& executor, framework.executors) { |
| if (executor.info.isNone()) { |
| LOG(WARNING) << "Skipping recovery of executor '" << executor.id |
| << "' of framework " << framework.id |
| << " because its info could not be recovered"; |
| continue; |
| } |
| |
| if (executor.latest.isNone()) { |
| LOG(WARNING) << "Skipping recovery of executor '" << executor.id |
| << "' of framework " << framework.id |
| << " because its latest run could not be recovered"; |
| continue; |
| } |
| |
| // We are only interested in the latest run of the executor! |
| const ContainerID& containerId = executor.latest.get(); |
| Option<RunState> run = executor.runs.get(containerId); |
| CHECK_SOME(run); |
| CHECK_SOME(run.get().id); |
| CHECK_EQ(containerId, run.get().id.get()); |
| |
| // We need the pid so the reaper can monitor the executor so |
| // skip this executor if it's not present. This is not an |
| // error because the slave will try to wait on the container |
| // which will return a failed Termination and everything will |
| // get cleaned up. |
| if (!run.get().forkedPid.isSome()) { |
| continue; |
| } |
| |
| if (run.get().completed) { |
| VLOG(1) << "Skipping recovery of executor '" << executor.id |
| << "' of framework " << framework.id |
| << " because its latest run " |
| << containerId << " is completed"; |
| continue; |
| } |
| |
| const ExecutorInfo executorInfo = executor.info.get(); |
| if (executorInfo.has_container() && |
| executorInfo.container().type() != ContainerInfo::DOCKER) { |
| LOG(INFO) << "Skipping recovery of executor '" << executor.id |
| << "' of framework " << framework.id |
| << " because it was not launched from docker containerizer"; |
| continue; |
| } |
| |
| if (!executorInfo.has_container() && |
| !existingContainers.contains(containerId)) { |
| LOG(INFO) << "Skipping recovery of executor '" << executor.id |
| << "' of framework " << framework.id |
| << " because its executor is not marked as docker " |
| << "and the docker container doesn't exist"; |
| continue; |
| } |
| |
| LOG(INFO) << "Recovering container '" << containerId |
| << "' for executor '" << executor.id |
| << "' of framework " << framework.id; |
| |
| // Create and store a container. |
| Container* container = new Container(containerId); |
| containers_[containerId] = container; |
| container->slaveId = state.id; |
| container->state = Container::RUNNING; |
| container->launchesExecutorContainer = |
| executorContainers.contains(containerId); |
| |
| pid_t pid = run.get().forkedPid.get(); |
| |
| container->status.set(process::reap(pid)); |
| |
| container->status.future().get() |
| .onAny(defer(self(), &Self::reaped, containerId)); |
| |
| if (pids.containsValue(pid)) { |
| // This should (almost) never occur. There is the |
| // possibility that a new executor is launched with the same |
| // pid as one that just exited (highly unlikely) and the |
| // slave dies after the new executor is launched but before |
| // it hears about the termination of the earlier executor |
| // (also unlikely). |
| return Failure( |
| "Detected duplicate pid " + stringify(pid) + |
| " for container " + stringify(containerId)); |
| } |
| |
| pids.put(containerId, pid); |
| } |
| } |
| |
| if (flags.docker_kill_orphans) { |
| return __recover(_containers); |
| } |
| |
| return Nothing(); |
| } |
| |
| |
| Future<Nothing> DockerContainerizerProcess::__recover( |
| const list<Docker::Container>& _containers) |
| { |
| foreach (const Docker::Container& container, _containers) { |
| VLOG(1) << "Checking if Docker container named '" |
| << container.name << "' was started by Mesos"; |
| |
| Option<ContainerID> id = parse(container); |
| |
| // Ignore containers that Mesos didn't start. |
| if (id.isNone()) { |
| continue; |
| } |
| |
| VLOG(1) << "Checking if Mesos container with ID '" |
| << stringify(id.get()) << "' has been orphaned"; |
| |
| // Check if we're watching an executor for this container ID and |
| // if not, rm -f the Docker container. |
| if (!containers_.contains(id.get())) { |
| // TODO(tnachen): Consider using executor_shutdown_grace_period. |
| docker->stop(container.id, flags.docker_stop_timeout, true); |
| } |
| } |
| |
| return Nothing(); |
| } |
| |
| |
| Future<bool> DockerContainerizerProcess::launch( |
| const ContainerID& containerId, |
| const Option<TaskInfo>& taskInfo, |
| const ExecutorInfo& executorInfo, |
| const string& directory, |
| const Option<string>& user, |
| const SlaveID& slaveId, |
| const PID<Slave>& slavePid, |
| bool checkpoint) |
| { |
| if (containers_.contains(containerId)) { |
| return Failure("Container already started"); |
| } |
| |
| Option<ContainerInfo> containerInfo; |
| |
| if (taskInfo.isSome() && taskInfo.get().has_container()) { |
| containerInfo = taskInfo.get().container(); |
| } else if (executorInfo.has_container()) { |
| containerInfo = executorInfo.container(); |
| } |
| |
| if (containerInfo.isNone()) { |
| LOG(INFO) << "No container info found, skipping launch"; |
| return false; |
| } |
| |
| if (containerInfo.get().type() != ContainerInfo::DOCKER) { |
| LOG(INFO) << "Skipping non-docker container"; |
| return false; |
| } |
| |
| Try<Container*> container = Container::create( |
| containerId, |
| taskInfo, |
| executorInfo, |
| directory, |
| user, |
| slaveId, |
| slavePid, |
| checkpoint, |
| flags); |
| |
| if (container.isError()) { |
| return Failure("Failed to create container: " + container.error()); |
| } |
| |
| containers_[containerId] = container.get(); |
| |
| if (taskInfo.isSome()) { |
| LOG(INFO) << "Starting container '" << containerId |
| << "' for task '" << taskInfo.get().task_id() |
| << "' (and executor '" << executorInfo.executor_id() |
| << "') of framework '" << executorInfo.framework_id() << "'"; |
| } else { |
| LOG(INFO) << "Starting container '" << containerId |
| << "' for executor '" << executorInfo.executor_id() |
| << "' and framework '" << executorInfo.framework_id() << "'"; |
| } |
| |
| if (taskInfo.isSome() && flags.docker_mesos_image.isNone()) { |
| // Launching task by forking a subprocess to run docker executor. |
| return container.get()->launch = fetch(containerId, slaveId) |
| .then(defer(self(), [=]() { return pull(containerId); })) |
| .then(defer(self(), [=]() { return launchExecutorProcess(containerId); })) |
| .then(defer(self(), [=](pid_t pid) { |
| return reapExecutor(containerId, pid); |
| })); |
| } |
| |
| string containerName = container.get()->name(); |
| |
| if (container.get()->executorName().isSome()) { |
| // Launch the container with the executor name as we expect the |
| // executor will launch the docker container. |
| containerName = container.get()->executorName().get(); |
| } |
| |
| // Launching task or executor by launching a seperate docker |
| // container to run the executor. |
| // We need to do so for launching a task because as the slave |
| // is running in a container (via docker_mesos_image flag) |
| // we want the executor to keep running when the slave container |
| // dies. |
| return container.get()->launch = fetch(containerId, slaveId) |
| .then(defer(self(), [=]() { return pull(containerId); })) |
| .then(defer(self(), [=]() { |
| return launchExecutorContainer(containerId, containerName); |
| })) |
| .then(defer(self(), [=](const Docker::Container& dockerContainer) { |
| return checkpointExecutor(containerId, dockerContainer); |
| })) |
| .then(defer(self(), [=](pid_t pid) { |
| return reapExecutor(containerId, pid); |
| })); |
| } |
| |
| |
| Future<Docker::Container> DockerContainerizerProcess::launchExecutorContainer( |
| const ContainerID& containerId, |
| const string& containerName) |
| { |
| if (!containers_.contains(containerId)) { |
| return Failure("Container is already destroyed"); |
| } |
| |
| Container* container = containers_[containerId]; |
| container->state = Container::RUNNING; |
| |
| // Start the executor in a Docker container. |
| // This executor could either be a custom executor specified by an |
| // ExecutorInfo, or the docker executor. |
| Future<Nothing> run = docker->run( |
| container->container, |
| container->command, |
| containerName, |
| container->directory, |
| flags.sandbox_directory, |
| container->resources, |
| container->environment, |
| path::join(container->directory, "stdout"), |
| path::join(container->directory, "stderr")); |
| |
| Owned<Promise<Docker::Container>> promise(new Promise<Docker::Container>()); |
| // We like to propogate the run failure when run fails so slave can |
| // send this failure back to the scheduler. Otherwise we return |
| // inspect's result or its failure, which should not fail when |
| // the container isn't launched. |
| Future<Docker::Container> inspect = |
| docker->inspect(containerName, slave::DOCKER_INSPECT_DELAY) |
| .onAny([=](Future<Docker::Container> f) { |
| // We cannot associate the promise outside of the callback |
| // because we like to propagate run's failure when |
| // available. |
| promise->associate(f); |
| }); |
| |
| run.onFailed([=](const string& failure) mutable { |
| inspect.discard(); |
| promise->fail(failure); |
| }); |
| |
| return promise->future(); |
| } |
| |
| |
| Future<pid_t> DockerContainerizerProcess::launchExecutorProcess( |
| const ContainerID& containerId) |
| { |
| if (!containers_.contains(containerId)) { |
| return Failure("Container is already destroyed"); |
| } |
| |
| Container* container = containers_[containerId]; |
| container->state = Container::RUNNING; |
| |
| // Prepare environment variables for the executor. |
| map<string, string> environment = executorEnvironment( |
| container->executor, |
| container->directory, |
| container->slaveId, |
| container->slavePid, |
| container->checkpoint, |
| flags, |
| false); |
| |
| // Include any enviroment variables from ExecutorInfo. |
| foreach (const Environment::Variable& variable, |
| container->executor.command().environment().variables()) { |
| environment[variable.name()] = variable.value(); |
| } |
| |
| // Pass GLOG flag to the executor. |
| const Option<string> glog = os::getenv("GLOG_v"); |
| if (glog.isSome()) { |
| environment["GLOG_v"] = glog.get(); |
| } |
| |
| vector<string> argv; |
| argv.push_back("mesos-docker-executor"); |
| |
| // Construct the mesos-docker-executor using the "name" we gave the |
| // container (to distinguish it from Docker containers not created |
| // by Mesos). |
| Try<Subprocess> s = subprocess( |
| path::join(flags.launcher_dir, "mesos-docker-executor"), |
| argv, |
| Subprocess::PIPE(), |
| Subprocess::PATH(path::join(container->directory, "stdout")), |
| Subprocess::PATH(path::join(container->directory, "stderr")), |
| dockerFlags(flags, container->name(), container->directory), |
| environment, |
| lambda::bind(&setup, container->directory)); |
| |
| if (s.isError()) { |
| return Failure("Failed to fork executor: " + s.error()); |
| } |
| |
| // Checkpoint the executor's pid (if necessary). |
| Try<Nothing> checkpointed = checkpoint(containerId, s.get().pid()); |
| |
| if (checkpointed.isError()) { |
| // Close the subprocess's stdin so that it aborts. |
| CHECK_SOME(s.get().in()); |
| os::close(s.get().in().get()); |
| |
| return Failure( |
| "Failed to checkpoint executor's pid: " + checkpointed.error()); |
| } |
| |
| // Checkpoing complete, now synchronize with the process so that it |
| // can continue to execute. |
| CHECK_SOME(s.get().in()); |
| char c; |
| ssize_t length; |
| while ((length = write(s.get().in().get(), &c, sizeof(c))) == -1 && |
| errno == EINTR); |
| |
| if (length != sizeof(c)) { |
| string error = string(strerror(errno)); |
| os::close(s.get().in().get()); |
| return Failure("Failed to synchronize with child process: " + error); |
| } |
| |
| return s.get().pid(); |
| } |
| |
| |
| Future<pid_t> DockerContainerizerProcess::checkpointExecutor( |
| const ContainerID& containerId, |
| const Docker::Container& dockerContainer) |
| { |
| // After we do Docker::run we shouldn't remove a container until |
| // after we set Container::status. |
| CHECK(containers_.contains(containerId)); |
| |
| Option<int> pid = dockerContainer.pid; |
| |
| if (!pid.isSome()) { |
| return Failure("Unable to get executor pid after launch"); |
| } |
| |
| Try<Nothing> checkpointed = checkpoint(containerId, pid.get()); |
| |
| if (checkpointed.isError()) { |
| return Failure( |
| "Failed to checkpoint executor's pid: " + checkpointed.error()); |
| } |
| |
| return pid.get(); |
| } |
| |
| |
| Future<bool> DockerContainerizerProcess::reapExecutor( |
| const ContainerID& containerId, |
| pid_t pid) |
| { |
| // After we do Docker::run we shouldn't remove a container until |
| // after we set 'status', which we do in this function. |
| CHECK(containers_.contains(containerId)); |
| |
| Container* container = containers_[containerId]; |
| |
| // And finally watch for when the container gets reaped. |
| container->status.set(process::reap(pid)); |
| |
| container->status.future().get() |
| .onAny(defer(self(), &Self::reaped, containerId)); |
| |
| return true; |
| } |
| |
| |
| Future<Nothing> DockerContainerizerProcess::update( |
| const ContainerID& containerId, |
| const Resources& _resources) |
| { |
| if (!containers_.contains(containerId)) { |
| LOG(WARNING) << "Ignoring updating unknown container: " |
| << containerId; |
| return Nothing(); |
| } |
| |
| Container* container = containers_[containerId]; |
| |
| if (container->state == Container::DESTROYING) { |
| LOG(INFO) << "Ignoring updating container '" << containerId |
| << "' that is being destroyed"; |
| return Nothing(); |
| } |
| |
| if (container->resources == _resources) { |
| LOG(INFO) << "Ignoring updating container '" << containerId |
| << "' with resources passed to update is identical to " |
| << "existing resources"; |
| return Nothing(); |
| } |
| |
| |
| // Store the resources for usage(). |
| container->resources = _resources; |
| |
| #ifdef __linux__ |
| if (!_resources.cpus().isSome() && !_resources.mem().isSome()) { |
| LOG(WARNING) << "Ignoring update as no supported resources are present"; |
| return Nothing(); |
| } |
| |
| // Skip inspecting the docker container if we already have the pid. |
| if (container->pid.isSome()) { |
| return __update(containerId, _resources, container->pid.get()); |
| } |
| |
| return docker->inspect(containers_[containerId]->name()) |
| .then(defer(self(), &Self::_update, containerId, _resources, lambda::_1)); |
| #else |
| return Nothing(); |
| #endif // __linux__ |
| } |
| |
| |
| Future<Nothing> DockerContainerizerProcess::_update( |
| const ContainerID& containerId, |
| const Resources& _resources, |
| const Docker::Container& container) |
| { |
| if (container.pid.isNone()) { |
| return Nothing(); |
| } |
| |
| if (!containers_.contains(containerId)) { |
| LOG(INFO) << "Container has been removed after docker inspect, " |
| << "skipping update"; |
| return Nothing(); |
| } |
| |
| containers_[containerId]->pid = container.pid.get(); |
| |
| return __update(containerId, _resources, container.pid.get()); |
| } |
| |
| |
| Future<Nothing> DockerContainerizerProcess::__update( |
| const ContainerID& containerId, |
| const Resources& _resources, |
| pid_t pid) |
| { |
| #ifdef __linux__ |
| // Determine the the cgroups hierarchies where the 'cpu' and |
| // 'memory' subsystems are mounted (they may be the same). Note that |
| // we make these static so we can reuse the result for subsequent |
| // calls. |
| static Result<string> cpuHierarchy = cgroups::hierarchy("cpu"); |
| static Result<string> memoryHierarchy = cgroups::hierarchy("memory"); |
| |
| if (cpuHierarchy.isError()) { |
| return Failure("Failed to determine the cgroup hierarchy " |
| "where the 'cpu' subsystem is mounted: " + |
| cpuHierarchy.error()); |
| } |
| |
| if (memoryHierarchy.isError()) { |
| return Failure("Failed to determine the cgroup hierarchy " |
| "where the 'memory' subsystem is mounted: " + |
| memoryHierarchy.error()); |
| } |
| |
| // We need to find the cgroup(s) this container is currently running |
| // in for both the hierarchy with the 'cpu' subsystem attached and |
| // the hierarchy with the 'memory' subsystem attached so we can |
| // update the proper cgroup control files. |
| |
| // Determine the cgroup for the 'cpu' subsystem (based on the |
| // container's pid). |
| Result<string> cpuCgroup = cgroups::cpu::cgroup(pid); |
| |
| if (cpuCgroup.isError()) { |
| return Failure("Failed to determine cgroup for the 'cpu' subsystem: " + |
| cpuCgroup.error()); |
| } else if (cpuCgroup.isNone()) { |
| LOG(WARNING) << "Container " << containerId |
| << " does not appear to be a member of a cgroup " |
| << "where the 'cpu' subsystem is mounted"; |
| } |
| |
| // And update the CPU shares (if applicable). |
| if (cpuHierarchy.isSome() && |
| cpuCgroup.isSome() && |
| _resources.cpus().isSome()) { |
| double cpuShares = _resources.cpus().get(); |
| |
| uint64_t shares = |
| std::max((uint64_t) (CPU_SHARES_PER_CPU * cpuShares), MIN_CPU_SHARES); |
| |
| Try<Nothing> write = |
| cgroups::cpu::shares(cpuHierarchy.get(), cpuCgroup.get(), shares); |
| |
| if (write.isError()) { |
| return Failure("Failed to update 'cpu.shares': " + write.error()); |
| } |
| |
| LOG(INFO) << "Updated 'cpu.shares' to " << shares |
| << " at " << path::join(cpuHierarchy.get(), cpuCgroup.get()) |
| << " for container " << containerId; |
| } |
| |
| // Now determine the cgroup for the 'memory' subsystem. |
| Result<string> memoryCgroup = cgroups::memory::cgroup(pid); |
| |
| if (memoryCgroup.isError()) { |
| return Failure("Failed to determine cgroup for the 'memory' subsystem: " + |
| memoryCgroup.error()); |
| } else if (memoryCgroup.isNone()) { |
| LOG(WARNING) << "Container " << containerId |
| << " does not appear to be a member of a cgroup " |
| << "where the 'memory' subsystem is mounted"; |
| } |
| |
| // And update the memory limits (if applicable). |
| if (memoryHierarchy.isSome() && |
| memoryCgroup.isSome() && |
| _resources.mem().isSome()) { |
| // TODO(tnachen): investigate and handle OOM with docker. |
| Bytes mem = _resources.mem().get(); |
| Bytes limit = std::max(mem, MIN_MEMORY); |
| |
| // Always set the soft limit. |
| Try<Nothing> write = |
| cgroups::memory::soft_limit_in_bytes( |
| memoryHierarchy.get(), memoryCgroup.get(), limit); |
| |
| if (write.isError()) { |
| return Failure("Failed to set 'memory.soft_limit_in_bytes': " + |
| write.error()); |
| } |
| |
| LOG(INFO) << "Updated 'memory.soft_limit_in_bytes' to " << limit |
| << " for container " << containerId; |
| |
| // Read the existing limit. |
| Try<Bytes> currentLimit = |
| cgroups::memory::limit_in_bytes( |
| memoryHierarchy.get(), memoryCgroup.get()); |
| |
| if (currentLimit.isError()) { |
| return Failure("Failed to read 'memory.limit_in_bytes': " + |
| currentLimit.error()); |
| } |
| |
| // Only update if new limit is higher. |
| // TODO(benh): Introduce a MemoryWatcherProcess which monitors the |
| // discrepancy between usage and soft limit and introduces a |
| // "manual oom" if necessary. |
| if (limit > currentLimit.get()) { |
| write = cgroups::memory::limit_in_bytes( |
| memoryHierarchy.get(), memoryCgroup.get(), limit); |
| |
| if (write.isError()) { |
| return Failure("Failed to set 'memory.limit_in_bytes': " + |
| write.error()); |
| } |
| |
| LOG(INFO) << "Updated 'memory.limit_in_bytes' to " << limit << " at " |
| << path::join(memoryHierarchy.get(), memoryCgroup.get()) |
| << " for container " << containerId; |
| } |
| } |
| #endif // __linux__ |
| |
| return Nothing(); |
| } |
| |
| |
| Future<ResourceStatistics> DockerContainerizerProcess::usage( |
| const ContainerID& containerId) |
| { |
| #ifndef __linux__ |
| return Failure("Does not support usage() on non-linux platform"); |
| #else |
| if (!containers_.contains(containerId)) { |
| return Failure("Unknown container: " + stringify(containerId)); |
| } |
| |
| Container* container = containers_[containerId]; |
| |
| if (container->state == Container::DESTROYING) { |
| return Failure("Container is being removed: " + stringify(containerId)); |
| } |
| |
| auto collectUsage = [this, containerId]( |
| pid_t pid) -> Future<ResourceStatistics> { |
| // First make sure container is still there. |
| if (!containers_.contains(containerId)) { |
| return Failure("Container has been destroyed: " + stringify(containerId)); |
| } |
| |
| Container* container = containers_[containerId]; |
| |
| if (container->state == Container::DESTROYING) { |
| return Failure("Container is being removed: " + stringify(containerId)); |
| } |
| |
| const Try<ResourceStatistics> cgroupStats = cgroupsStatistics(pid); |
| if (cgroupStats.isError()) { |
| return Failure("Failed to collect cgroup stats: " + cgroupStats.error()); |
| } |
| |
| ResourceStatistics result = cgroupStats.get(); |
| |
| // Set the resource allocations. |
| const Resources& resource = container->resources; |
| const Option<Bytes> mem = resource.mem(); |
| if (mem.isSome()) { |
| result.set_mem_limit_bytes(mem.get().bytes()); |
| } |
| |
| const Option<double> cpus = resource.cpus(); |
| if (cpus.isSome()) { |
| result.set_cpus_limit(cpus.get()); |
| } |
| |
| return result; |
| }; |
| |
| // Skip inspecting the docker container if we already have the pid. |
| if (container->pid.isSome()) { |
| return collectUsage(container->pid.get()); |
| } |
| |
| return docker->inspect(container->name()) |
| .then(defer( |
| self(), |
| [this, containerId, collectUsage] |
| (const Docker::Container& _container) -> Future<ResourceStatistics> { |
| const Option<pid_t> pid = _container.pid; |
| if (pid.isNone()) { |
| return Failure("Container is not running"); |
| } |
| |
| if (!containers_.contains(containerId)) { |
| return Failure( |
| "Container has been destroyed:" + stringify(containerId)); |
| } |
| |
| Container* container = containers_[containerId]; |
| |
| // Update the container's pid now. We ran inspect because we didn't have |
| // a pid for the container. |
| container->pid = pid; |
| |
| return collectUsage(pid.get()); |
| })); |
| #endif // __linux__ |
| } |
| |
| |
| Try<ResourceStatistics> DockerContainerizerProcess::cgroupsStatistics( |
| pid_t pid) const |
| { |
| #ifndef __linux__ |
| return Error("Does not support cgroups on non-linux platform"); |
| #else |
| const Result<string> cpuHierarchy = cgroups::hierarchy("cpuacct"); |
| const Result<string> memHierarchy = cgroups::hierarchy("memory"); |
| |
| if (cpuHierarchy.isError()) { |
| return Error( |
| "Failed to determine the cgroup 'cpu' subsystem hierarchy: " + |
| cpuHierarchy.error()); |
| } |
| |
| if (memHierarchy.isError()) { |
| return Error( |
| "Failed to determine the cgroup 'memory' subsystem hierarchy: " + |
| memHierarchy.error()); |
| } |
| |
| const Result<string> cpuCgroup = cgroups::cpuacct::cgroup(pid); |
| if (cpuCgroup.isError()) { |
| return Error( |
| "Failed to determine cgroup for the 'cpu' subsystem: " + |
| cpuCgroup.error()); |
| } else if (cpuCgroup.isNone()) { |
| return Error("Unable to find 'cpu' cgroup subsystem"); |
| } |
| |
| const Result<string> memCgroup = cgroups::memory::cgroup(pid); |
| if (memCgroup.isError()) { |
| return Error( |
| "Failed to determine cgroup for the 'memory' subsystem: " + |
| memCgroup.error()); |
| } else if (memCgroup.isNone()) { |
| return Error("Unable to find 'memory' cgroup subsystem"); |
| } |
| |
| const Try<cgroups::cpuacct::Stats> cpuAcctStat = |
| cgroups::cpuacct::stat(cpuHierarchy.get(), cpuCgroup.get()); |
| |
| if (cpuAcctStat.isError()) { |
| return Error("Failed to get cpu.stat: " + cpuAcctStat.error()); |
| } |
| |
| const Try<hashmap<string, uint64_t>> memStats = |
| cgroups::stat(memHierarchy.get(), memCgroup.get(), "memory.stat"); |
| |
| if (memStats.isError()) { |
| return Error( |
| "Error getting memory statistics from cgroups memory subsystem: " + |
| memStats.error()); |
| } |
| |
| if (!memStats.get().contains("rss")) { |
| return Error("cgroups memory stats does not contain 'rss' data"); |
| } |
| |
| ResourceStatistics result; |
| result.set_cpus_system_time_secs(cpuAcctStat.get().system.secs()); |
| result.set_cpus_user_time_secs(cpuAcctStat.get().user.secs()); |
| result.set_mem_rss_bytes(memStats.get().at("rss")); |
| |
| return result; |
| #endif // __linux__ |
| } |
| |
| |
| Future<containerizer::Termination> DockerContainerizerProcess::wait( |
| const ContainerID& containerId) |
| { |
| if (!containers_.contains(containerId)) { |
| return Failure("Unknown container: " + stringify(containerId)); |
| } |
| |
| return containers_[containerId]->termination.future(); |
| } |
| |
| |
| void DockerContainerizerProcess::destroy( |
| const ContainerID& containerId, |
| bool killed) |
| { |
| if (!containers_.contains(containerId)) { |
| LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId; |
| return; |
| } |
| |
| Container* container = containers_[containerId]; |
| |
| if (container->launch.isFailed()) { |
| VLOG(1) << "Container '" << containerId << "' launch failed"; |
| |
| // This means we failed to launch the container and we're trying to |
| // cleanup. |
| CHECK_PENDING(container->status.future()); |
| containerizer::Termination termination; |
| termination.set_killed(killed); |
| termination.set_message( |
| "Failed to launch container: " + container->launch.failure()); |
| container->termination.set(termination); |
| |
| containers_.erase(containerId); |
| delete container; |
| |
| return; |
| } |
| |
| if (container->state == Container::DESTROYING) { |
| // Destroy has already been initiated. |
| return; |
| } |
| |
| LOG(INFO) << "Destroying container '" << containerId << "'"; |
| |
| // It's possible that destroy is getting called before |
| // DockerContainerizer::launch has completed (i.e., after we've |
| // returned a future but before we've completed the fetching of the |
| // URIs, or the Docker::run, or the wait, etc.). |
| // |
| // If we're FETCHING, we want to stop the fetching and then |
| // cleanup. Note, we need to make sure that we deal with the race |
| // with trying to terminate the fetcher so that even if the fetcher |
| // returns successfully we won't try to do a Docker::run. |
| // |
| // If we're PULLING, we want to terminate the 'docker pull' and then |
| // cleanup. Just as above, we'll need to deal with the race with |
| // 'docker pull' returning successfully. |
| // |
| // If we're RUNNING, we want to wait for the status to get set, then |
| // do a Docker::kill, then wait for the status to complete, then |
| // cleanup. |
| |
| if (container->state == Container::FETCHING) { |
| LOG(INFO) << "Destroying Container '" |
| << containerId << "' in FETCHING state"; |
| |
| fetcher->kill(containerId); |
| |
| containerizer::Termination termination; |
| termination.set_killed(killed); |
| termination.set_message("Container destroyed while fetching"); |
| container->termination.set(termination); |
| |
| // Even if the fetch succeeded just before we did the killtree, |
| // removing the container here means that we won't proceed with |
| // the Docker::run. |
| containers_.erase(containerId); |
| delete container; |
| |
| return; |
| } |
| |
| if (container->state == Container::PULLING) { |
| LOG(INFO) << "Destroying Container '" |
| << containerId << "' in PULLING state"; |
| |
| container->pull.discard(); |
| |
| containerizer::Termination termination; |
| termination.set_killed(killed); |
| termination.set_message("Container destroyed while pulling image"); |
| container->termination.set(termination); |
| |
| containers_.erase(containerId); |
| delete container; |
| |
| return; |
| } |
| |
| CHECK(container->state == Container::RUNNING); |
| |
| container->state = Container::DESTROYING; |
| |
| if (killed && container->executorPid.isSome()) { |
| LOG(INFO) << "Sending SIGTERM to executor with pid: " |
| << container->executorPid.get(); |
| // We need to clean up the executor as the executor might not have |
| // received run task due to a failed containerizer update. |
| // We also kill the executor first since container->status below |
| // is waiting for the executor to finish. |
| Try<std::list<os::ProcessTree>> kill = |
| os::killtree(container->executorPid.get(), SIGTERM); |
| |
| if (kill.isError()) { |
| // Ignoring the error from killing executor as it can already |
| // have exited. |
| VLOG(1) << "Ignoring error when killing executor pid " |
| << container->executorPid.get() << " in destroy, error: " |
| << kill.error(); |
| } |
| } |
| |
| // Otherwise, wait for Docker::run to succeed, in which case we'll |
| // continue in _destroy (calling Docker::kill) or for Docker::run to |
| // fail, in which case we'll re-execute this function and cleanup |
| // above. |
| container->status.future() |
| .onAny(defer(self(), &Self::_destroy, containerId, killed)); |
| } |
| |
| |
| void DockerContainerizerProcess::_destroy( |
| const ContainerID& containerId, |
| bool killed) |
| { |
| CHECK(containers_.contains(containerId)); |
| |
| Container* container = containers_[containerId]; |
| |
| CHECK(container->state == Container::DESTROYING); |
| |
| // Do a 'docker stop' which we'll then find out about in '_destroy' |
| // after we've reaped either the container's root process (in the |
| // event that we had just launched a container for an executor) or |
| // the mesos-docker-executor (in the case we launched a container |
| // for a task). |
| LOG(INFO) << "Running docker stop on container '" << containerId << "'"; |
| |
| if (killed) { |
| docker->stop(container->name(), flags.docker_stop_timeout) |
| .onAny(defer(self(), &Self::__destroy, containerId, killed, lambda::_1)); |
| } else { |
| __destroy(containerId, killed, Nothing()); |
| } |
| } |
| |
| |
| void DockerContainerizerProcess::__destroy( |
| const ContainerID& containerId, |
| bool killed, |
| const Future<Nothing>& kill) |
| { |
| CHECK(containers_.contains(containerId)); |
| |
| Container* container = containers_[containerId]; |
| |
| if (!kill.isReady() && !container->status.future().isReady()) { |
| // TODO(benh): This means we've failed to do a Docker::kill, which |
| // means it's possible that the container is still going to be |
| // running after we return! We either need to have a periodic |
| // "garbage collector", or we need to retry the Docker::kill |
| // indefinitely until it has been sucessful. |
| container->termination.fail( |
| "Failed to kill the Docker container: " + |
| (kill.isFailed() ? kill.failure() : "discarded future")); |
| |
| containers_.erase(containerId); |
| |
| delay( |
| flags.docker_remove_delay, |
| self(), |
| &Self::remove, |
| container->name(), |
| container->executorName()); |
| |
| delete container; |
| |
| return; |
| } |
| |
| // Status must be ready since we did a Docker::kill. |
| CHECK_READY(container->status.future()); |
| |
| container->status.future().get() |
| .onAny(defer(self(), &Self::___destroy, containerId, killed, lambda::_1)); |
| } |
| |
| |
| void DockerContainerizerProcess::___destroy( |
| const ContainerID& containerId, |
| bool killed, |
| const Future<Option<int>>& status) |
| { |
| CHECK(containers_.contains(containerId)); |
| |
| Container* container = containers_[containerId]; |
| |
| containerizer::Termination termination; |
| termination.set_killed(killed); |
| |
| if (status.isReady() && status.get().isSome()) { |
| termination.set_status(status.get().get()); |
| } |
| |
| termination.set_message( |
| killed ? "Container killed" : "Container terminated"); |
| |
| container->termination.set(termination); |
| |
| containers_.erase(containerId); |
| |
| delay( |
| flags.docker_remove_delay, |
| self(), |
| &Self::remove, |
| container->name(), |
| container->executorName()); |
| |
| delete container; |
| } |
| |
| |
| Future<hashset<ContainerID>> DockerContainerizerProcess::containers() |
| { |
| return containers_.keys(); |
| } |
| |
| |
| void DockerContainerizerProcess::reaped(const ContainerID& containerId) |
| { |
| if (!containers_.contains(containerId)) { |
| return; |
| } |
| |
| LOG(INFO) << "Executor for container '" << containerId << "' has exited"; |
| |
| // The executor has exited so destroy the container. |
| destroy(containerId, false); |
| } |
| |
| |
| void DockerContainerizerProcess::remove( |
| const string& containerName, |
| const Option<string>& executor) |
| { |
| docker->rm(containerName, true); |
| if (executor.isSome()) { |
| docker->rm(executor.get(), true); |
| } |
| } |
| |
| |
| } // namespace slave { |
| } // namespace internal { |
| } // namespace mesos { |