blob: 4d35513cdd9c044d37d876a6db7dd9321ceaca53 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <map>
#include <vector>
#include <stout/lambda.hpp>
#include <stout/os.hpp>
#include <stout/result.hpp>
#include <stout/strings.hpp>
#include <stout/os/read.hpp>
#include <process/check.hpp>
#include <process/collect.hpp>
#include <process/io.hpp>
#include "common/status_utils.hpp"
#include "docker/docker.hpp"
#ifdef __linux__
#include "linux/cgroups.hpp"
#endif // __linux__
#include "slave/containerizer/mesos/isolators/cgroups/cpushare.hpp"
#include "slave/containerizer/mesos/isolators/cgroups/mem.hpp"
#include "slave/constants.hpp"
using namespace mesos;
using namespace mesos::internal::slave;
using namespace process;
using std::list;
using std::map;
using std::string;
using std::vector;
Nothing _nothing() { return Nothing(); }
template <typename T>
static Future<T> failure(
const string& cmd,
int status,
const string& err)
{
return Failure(
"Failed to '" + cmd + "': exit status = " +
WSTRINGIFY(status) + " stderr = " + err);
}
static Future<Nothing> _checkError(const string& cmd, const Subprocess& s)
{
Option<int> status = s.status().get();
if (status.isNone()) {
return Failure("No status found for '" + cmd + "'");
}
if (status.get() != 0) {
// TODO(tnachen): Consider returning stdout as well.
CHECK_SOME(s.err());
return io::read(s.err().get())
.then(lambda::bind(failure<Nothing>, cmd, status.get(), lambda::_1));
}
return Nothing();
}
// Returns a failure if no status or non-zero status returned from
// subprocess.
static Future<Nothing> checkError(const string& cmd, const Subprocess& s)
{
return s.status()
.then(lambda::bind(_checkError, cmd, s));
}
Try<Owned<Docker>> Docker::create(
const string& path,
const string& socket,
bool validate)
{
if (!strings::startsWith(socket, "/")) {
return Error("Invalid Docker socket path: " + socket);
}
Owned<Docker> docker(new Docker(path, socket));
if (!validate) {
return docker;
}
#ifdef __linux__
// Make sure that cgroups are mounted, and at least the 'cpu'
// subsystem is attached.
Result<string> hierarchy = cgroups::hierarchy("cpu");
if (hierarchy.isNone()) {
return Error("Failed to find a mounted cgroups hierarchy "
"for the 'cpu' subsystem; you probably need "
"to mount cgroups manually");
}
#endif // __linux__
Try<Nothing> validateVersion = docker->validateVersion(Version(1, 0, 0));
if (validateVersion.isError()) {
return Error(validateVersion.error());
}
return docker;
}
void commandDiscarded(const Subprocess& s, const string& cmd)
{
VLOG(1) << "'" << cmd << "' is being discarded";
os::killtree(s.pid(), SIGKILL);
}
Future<Version> Docker::version() const
{
string cmd = path + " -H " + socket + " --version";
Try<Subprocess> s = subprocess(
cmd,
Subprocess::PATH("/dev/null"),
Subprocess::PIPE(),
Subprocess::PIPE());
if (s.isError()) {
return Failure(s.error());
}
return s.get().status()
.then(lambda::bind(&Docker::_version, cmd, s.get()));
}
Future<Version> Docker::_version(const string& cmd, const Subprocess& s)
{
const Option<int>& status = s.status().get();
if (status.isNone() || status.get() != 0) {
string msg = "Failed to execute '" + cmd + "': ";
if (status.isSome()) {
msg += WSTRINGIFY(status.get());
} else {
msg += "unknown exit status";
}
return Failure(msg);
}
CHECK_SOME(s.out());
return io::read(s.out().get())
.then(lambda::bind(&Docker::__version, lambda::_1));
}
Future<Version> Docker::__version(const Future<string>& output)
{
vector<string> parts = strings::split(output.get(), ",");
if (!parts.empty()) {
vector<string> subParts = strings::split(parts.front(), " ");
if (!subParts.empty()) {
// Docker version output in Fedora 22 is "x.x.x.fc22" which does not match
// the Semantic Versioning specification(<major>[.<minor>[.<patch>]]). We
// remove the overflow components here before parsing the docker version
// output to a Version struct.
string versionString = subParts.back();
vector<string> components = strings::split(versionString, ".");
if (components.size() > 3) {
components.erase(components.begin() + 3, components.end());
}
versionString = strings::join(".", components);
Try<Version> version = Version::parse(versionString);
if (version.isError()) {
return Failure("Failed to parse docker version: " +
version.error());
}
return version;
}
}
return Failure("Unable to find docker version in output");
}
Try<Nothing> Docker::validateVersion(const Version& minVersion) const
{
// Validate the version (and that we can use Docker at all).
Future<Version> version = this->version();
if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) {
return Error("Timed out getting docker version");
}
if (version.isFailed()) {
return Error("Failed to get docker version: " + version.failure());
}
if (version.get() < minVersion) {
string msg = "Insufficient version '" + stringify(version.get()) +
"' of Docker. Please upgrade to >=' " +
stringify(minVersion) + "'";
return Error(msg);
}
return Nothing();
}
// TODO(josephw): Parse this string with a protobuf.
Try<Docker::Container> Docker::Container::create(const string& output)
{
Try<JSON::Array> parse = JSON::parse<JSON::Array>(output);
if (parse.isError()) {
return Error("Failed to parse JSON: " + parse.error());
}
// TODO(benh): Handle the case where the short container ID was
// not sufficiently unique and 'array.values.size() > 1'.
JSON::Array array = parse.get();
if (array.values.size() != 1) {
return Error("Failed to find container");
}
CHECK(array.values.front().is<JSON::Object>());
JSON::Object json = array.values.front().as<JSON::Object>();
Result<JSON::String> idValue = json.find<JSON::String>("Id");
if (idValue.isNone()) {
return Error("Unable to find Id in container");
} else if (idValue.isError()) {
return Error("Error finding Id in container: " + idValue.error());
}
string id = idValue.get().value;
Result<JSON::String> nameValue = json.find<JSON::String>("Name");
if (nameValue.isNone()) {
return Error("Unable to find Name in container");
} else if (nameValue.isError()) {
return Error("Error finding Name in container: " + nameValue.error());
}
string name = nameValue.get().value;
Result<JSON::Object> stateValue = json.find<JSON::Object>("State");
if (stateValue.isNone()) {
return Error("Unable to find State in container");
} else if (stateValue.isError()) {
return Error("Error finding State in container: " + stateValue.error());
}
Result<JSON::Number> pidValue = stateValue.get().find<JSON::Number>("Pid");
if (pidValue.isNone()) {
return Error("Unable to find Pid in State");
} else if (pidValue.isError()) {
return Error("Error finding Pid in State: " + pidValue.error());
}
pid_t pid = pid_t(pidValue.get().as<int64_t>());
Option<pid_t> optionalPid;
if (pid != 0) {
optionalPid = pid;
}
Result<JSON::String> startedAtValue =
stateValue.get().find<JSON::String>("StartedAt");
if (startedAtValue.isNone()) {
return Error("Unable to find StartedAt in State");
} else if (startedAtValue.isError()) {
return Error("Error finding StartedAt in State: " + startedAtValue.error());
}
bool started = startedAtValue.get().value != "0001-01-01T00:00:00Z";
Option<string> ipAddress;
bool findDeprecatedIP = false;
Result<JSON::String> networkMode =
json.find<JSON::String>("HostConfig.NetworkMode");
if (!networkMode.isSome()) {
// We need to fail back to the old field as Docker added NetworkMode
// since Docker remote API 1.15.
VLOG(1) << "Unable to detect HostConfig.NetworkMode, "
<< "attempting deprecated IP field";
findDeprecatedIP = true;
} else {
// We currently rely on the fact that we always set --net when
// we shell out to docker run, and therefore the network mode
// matches what --net is. Without --net, the network mode would be set
// to 'default' and we won't be able to find the IP address as
// it will be in 'Networks.bridge' key.
string addressLocation = "NetworkSettings.Networks." +
networkMode->value + ".IPAddress";
Result<JSON::String> ipAddressValue =
json.find<JSON::String>(addressLocation);
if (!ipAddressValue.isSome()) {
// We also need to failback to the old field as the IP Address
// field location also changed since Docker remote API 1.20.
VLOG(1) << "Unable to detect IP Address at '" << addressLocation << "',"
<< " attempting deprecated field";
findDeprecatedIP = true;
} else if (!ipAddressValue->value.empty()) {
ipAddress = ipAddressValue->value;
}
}
if (findDeprecatedIP) {
Result<JSON::String> ipAddressValue =
json.find<JSON::String>("NetworkSettings.IPAddress");
if (ipAddressValue.isNone()) {
return Error("Unable to find NetworkSettings.IPAddress in container");
} else if (ipAddressValue.isError()) {
return Error(
"Error finding NetworkSettings.IPAddress in container: " +
ipAddressValue.error());
} else if (!ipAddressValue->value.empty()) {
ipAddress = ipAddressValue->value;
}
}
return Docker::Container(output, id, name, optionalPid, started, ipAddress);
}
Try<Docker::Image> Docker::Image::create(const JSON::Object& json)
{
Result<JSON::Value> entrypoint =
json.find<JSON::Value>("ContainerConfig.Entrypoint");
if (entrypoint.isError()) {
return Error("Failed to find 'ContainerConfig.Entrypoint': " +
entrypoint.error());
} else if (entrypoint.isNone()) {
return Error("Unable to find 'ContainerConfig.Entrypoint'");
}
Option<vector<string>> entrypointOption = None();
if (!entrypoint.get().is<JSON::Null>()) {
if (!entrypoint.get().is<JSON::Array>()) {
return Error("Unexpected type found for 'ContainerConfig.Entrypoint'");
}
const vector<JSON::Value>& values =
entrypoint.get().as<JSON::Array>().values;
if (values.size() != 0) {
vector<string> result;
foreach (const JSON::Value& value, values) {
if (!value.is<JSON::String>()) {
return Error("Expecting entrypoint value to be type string");
}
result.push_back(value.as<JSON::String>().value);
}
entrypointOption = result;
}
}
Result<JSON::Value> env =
json.find<JSON::Value>("ContainerConfig.Env");
if (env.isError()) {
return Error("Failed to find 'ContainerConfig.Env': " +
env.error());
} else if (env.isNone()) {
return Error("Unable to find 'ContainerConfig.Env'");
}
Option<map<string, string>> envOption = None();
if (!env.get().is<JSON::Null>()) {
if (!env.get().is<JSON::Array>()) {
return Error("Unexpected type found for 'ContainerConfig.Env'");
}
const vector<JSON::Value>& values = env.get().as<JSON::Array>().values;
if (values.size() != 0) {
map<string, string> result;
foreach (const JSON::Value& value, values) {
if (!value.is<JSON::String>()) {
return Error("Expecting environment value to be type string");
}
const vector<string> tokens =
strings::split(value.as<JSON::String>().value, "=", 2);
if (tokens.size() != 2) {
return Error("Unexpected Env format for 'ContainerConfig.Env'");
}
if (result.count(tokens[0])) {
return Error("Unexpected duplicate environment variables '"
+ tokens[0] + "'");
}
result[tokens[0]] = tokens[1];
}
envOption = result;
}
}
return Docker::Image(entrypointOption, envOption);
}
Future<Nothing> Docker::run(
const ContainerInfo& containerInfo,
const CommandInfo& commandInfo,
const string& name,
const string& sandboxDirectory,
const string& mappedDirectory,
const Option<Resources>& resources,
const Option<map<string, string>>& env,
const process::Subprocess::IO& stdout,
const process::Subprocess::IO& stderr) const
{
if (!containerInfo.has_docker()) {
return Failure("No docker info found in container info");
}
const ContainerInfo::DockerInfo& dockerInfo = containerInfo.docker();
vector<string> argv;
argv.push_back(path);
argv.push_back("-H");
argv.push_back(socket);
argv.push_back("run");
if (dockerInfo.privileged()) {
argv.push_back("--privileged");
}
if (resources.isSome()) {
// TODO(yifan): Support other resources (e.g. disk).
Option<double> cpus = resources.get().cpus();
if (cpus.isSome()) {
uint64_t cpuShare =
std::max((uint64_t) (CPU_SHARES_PER_CPU * cpus.get()), MIN_CPU_SHARES);
argv.push_back("--cpu-shares");
argv.push_back(stringify(cpuShare));
}
Option<Bytes> mem = resources.get().mem();
if (mem.isSome()) {
Bytes memLimit = std::max(mem.get(), MIN_MEMORY);
argv.push_back("--memory");
argv.push_back(stringify(memLimit.bytes()));
}
}
if (env.isSome()) {
foreachpair (string key, string value, env.get()) {
argv.push_back("-e");
argv.push_back(key + "=" + value);
}
}
foreach (const Environment::Variable& variable,
commandInfo.environment().variables()) {
if (env.isSome() &&
env.get().find(variable.name()) != env.get().end()) {
// Skip to avoid duplicate environment variables.
continue;
}
argv.push_back("-e");
argv.push_back(variable.name() + "=" + variable.value());
}
argv.push_back("-e");
argv.push_back("MESOS_SANDBOX=" + mappedDirectory);
argv.push_back("-e");
argv.push_back("MESOS_CONTAINER_NAME=" + name);
foreach (const Volume& volume, containerInfo.volumes()) {
string volumeConfig = volume.container_path();
if (volume.has_host_path()) {
if (!strings::startsWith(volume.host_path(), "/") &&
!dockerInfo.has_volume_driver()) {
// When volume dirver is empty and host path is a relative path, mapping
// host path from the sandbox.
volumeConfig =
path::join(sandboxDirectory, volume.host_path()) + ":" + volumeConfig;
} else {
volumeConfig = volume.host_path() + ":" + volumeConfig;
}
if (volume.has_mode()) {
switch (volume.mode()) {
case Volume::RW: volumeConfig += ":rw"; break;
case Volume::RO: volumeConfig += ":ro"; break;
default: return Failure("Unsupported volume mode");
}
}
} else if (volume.has_mode() && !volume.has_host_path()) {
return Failure("Host path is required with mode");
}
argv.push_back("-v");
argv.push_back(volumeConfig);
}
// Mapping sandbox directory into the container mapped directory.
argv.push_back("-v");
argv.push_back(sandboxDirectory + ":" + mappedDirectory);
if (dockerInfo.has_volume_driver()) {
argv.push_back("--volume-driver=" + dockerInfo.volume_driver());
}
const string& image = dockerInfo.image();
argv.push_back("--net");
string network;
switch (dockerInfo.network()) {
case ContainerInfo::DockerInfo::HOST: network = "host"; break;
case ContainerInfo::DockerInfo::BRIDGE: network = "bridge"; break;
case ContainerInfo::DockerInfo::NONE: network = "none"; break;
default: return Failure("Unsupported Network mode: " +
stringify(dockerInfo.network()));
}
argv.push_back(network);
if (containerInfo.has_hostname()) {
if (network == "host") {
return Failure("Unable to set hostname with host network mode");
}
argv.push_back("--hostname");
argv.push_back(containerInfo.hostname());
}
foreach (const Parameter& parameter, dockerInfo.parameters()) {
argv.push_back("--" + parameter.key() + "=" + parameter.value());
}
if (dockerInfo.port_mappings().size() > 0) {
if (network != "bridge") {
return Failure("Port mappings are only supported for bridge network");
}
if (!resources.isSome()) {
return Failure("Port mappings require resources");
}
Option<Value::Ranges> portRanges = resources.get().ports();
if (!portRanges.isSome()) {
return Failure("Port mappings require port resources");
}
foreach (const ContainerInfo::DockerInfo::PortMapping& mapping,
dockerInfo.port_mappings()) {
bool found = false;
foreach (const Value::Range& range, portRanges.get().range()) {
if (mapping.host_port() >= range.begin() &&
mapping.host_port() <= range.end()) {
found = true;
break;
}
}
if (!found) {
return Failure("Port [" + stringify(mapping.host_port()) + "] not " +
"included in resources");
}
string portMapping = stringify(mapping.host_port()) + ":" +
stringify(mapping.container_port());
if (mapping.has_protocol()) {
portMapping += "/" + strings::lower(mapping.protocol());
}
argv.push_back("-p");
argv.push_back(portMapping);
}
}
if (commandInfo.shell()) {
// We override the entrypoint if shell is enabled because we
// assume the user intends to run the command within /bin/sh
// and not the default entrypoint of the image. View MESOS-1770
// for more details.
argv.push_back("--entrypoint");
argv.push_back("/bin/sh");
}
argv.push_back("--name");
argv.push_back(name);
argv.push_back(image);
if (commandInfo.shell()) {
if (!commandInfo.has_value()) {
return Failure("Shell specified but no command value provided");
}
// Adding -c here because Docker cli only supports a single word
// for overriding entrypoint, so adding the -c flag for /bin/sh
// as part of the command.
argv.push_back("-c");
argv.push_back(commandInfo.value());
} else {
if (commandInfo.has_value()) {
argv.push_back(commandInfo.value());
}
foreach (const string& argument, commandInfo.arguments()) {
argv.push_back(argument);
}
}
string cmd = strings::join(" ", argv);
VLOG(1) << "Running " << cmd;
map<string, string> environment = os::environment();
// Currently the Docker CLI picks up dockerconfig by looking for
// the config file in the $HOME directory. If one of the URIs
// provided is a docker config file we want docker to be able to
// pick it up from the sandbox directory where we store all the
// URI downloads.
environment["HOME"] = sandboxDirectory;
Try<Subprocess> s = subprocess(
path,
argv,
Subprocess::PATH("/dev/null"),
stdout,
stderr,
None(),
environment);
if (s.isError()) {
return Failure(s.error());
}
// We don't call checkError here to avoid printing the stderr
// of the docker container task as docker run with attach forwards
// the container's stderr to the client's stderr.
return s.get().status()
.then(lambda::bind(
&Docker::_run,
lambda::_1))
.onDiscard(lambda::bind(&commandDiscarded, s.get(), cmd));
}
Future<Nothing> Docker::_run(const Option<int>& status)
{
if (status.isNone()) {
return Failure("Failed to get exit status");
} else if (status.get() != 0) {
return Failure("Container exited on error: " + WSTRINGIFY(status.get()));
}
return Nothing();
}
Future<Nothing> Docker::stop(
const string& containerName,
const Duration& timeout,
bool remove) const
{
int timeoutSecs = (int) timeout.secs();
if (timeoutSecs < 0) {
return Failure("A negative timeout can not be applied to docker stop: " +
stringify(timeoutSecs));
}
string cmd = path + " -H " + socket + " stop -t " + stringify(timeoutSecs) +
" " + containerName;
VLOG(1) << "Running " << cmd;
Try<Subprocess> s = subprocess(
cmd,
Subprocess::PATH("/dev/null"),
Subprocess::PATH("/dev/null"),
Subprocess::PIPE());
if (s.isError()) {
return Failure(s.error());
}
return s.get().status()
.then(lambda::bind(
&Docker::_stop,
*this,
containerName,
cmd,
s.get(),
remove));
}
Future<Nothing> Docker::_stop(
const Docker& docker,
const string& containerName,
const string& cmd,
const Subprocess& s,
bool remove)
{
Option<int> status = s.status().get();
if (remove) {
bool force = !status.isSome() || status.get() != 0;
return docker.rm(containerName, force);
}
return checkError(cmd, s);
}
Future<Nothing> Docker::rm(
const string& containerName,
bool force) const
{
// The `-v` flag removes Docker volumes that may be present.
const string cmd =
path + " -H " + socket +
(force ? " rm -f -v " : " rm -v ") + containerName;
VLOG(1) << "Running " << cmd;
Try<Subprocess> s = subprocess(
cmd,
Subprocess::PATH("/dev/null"),
Subprocess::PATH("/dev/null"),
Subprocess::PIPE());
if (s.isError()) {
return Failure(s.error());
}
return checkError(cmd, s.get());
}
Future<Docker::Container> Docker::inspect(
const string& containerName,
const Option<Duration>& retryInterval) const
{
Owned<Promise<Docker::Container>> promise(new Promise<Docker::Container>());
const string cmd = path + " -H " + socket + " inspect " + containerName;
_inspect(cmd, promise, retryInterval);
return promise->future();
}
void Docker::_inspect(
const string& cmd,
const Owned<Promise<Docker::Container>>& promise,
const Option<Duration>& retryInterval)
{
if (promise->future().hasDiscard()) {
promise->discard();
return;
}
VLOG(1) << "Running " << cmd;
Try<Subprocess> s = subprocess(
cmd,
Subprocess::PATH("/dev/null"),
Subprocess::PIPE(),
Subprocess::PIPE());
if (s.isError()) {
promise->fail(s.error());
return;
}
// Start reading from stdout so writing to the pipe won't block
// to handle cases where the output is larger than the pipe
// capacity.
const Future<string> output = io::read(s.get().out().get());
s.get().status()
.onAny([=]() { __inspect(cmd, promise, retryInterval, output, s.get()); });
}
void Docker::__inspect(
const string& cmd,
const Owned<Promise<Docker::Container>>& promise,
const Option<Duration>& retryInterval,
Future<string> output,
const Subprocess& s)
{
if (promise->future().hasDiscard()) {
promise->discard();
output.discard();
return;
}
// Check the exit status of 'docker inspect'.
CHECK_READY(s.status());
Option<int> status = s.status().get();
if (!status.isSome()) {
promise->fail("No status found from '" + cmd + "'");
} else if (status.get() != 0) {
output.discard();
if (retryInterval.isSome()) {
VLOG(1) << "Retrying inspect with non-zero status code. cmd: '"
<< cmd << "', interval: " << stringify(retryInterval.get());
Clock::timer(retryInterval.get(),
[=]() { _inspect(cmd, promise, retryInterval); } );
return;
}
CHECK_SOME(s.err());
io::read(s.err().get())
.then(lambda::bind(
failure<Nothing>,
cmd,
status.get(),
lambda::_1))
.onAny([=](const Future<Nothing>& future) {
CHECK_FAILED(future);
promise->fail(future.failure());
});
return;
}
// Read to EOF.
CHECK_SOME(s.out());
output
.onAny([=](const Future<string>& output) {
___inspect(cmd, promise, retryInterval, output);
});
}
void Docker::___inspect(
const string& cmd,
const Owned<Promise<Docker::Container>>& promise,
const Option<Duration>& retryInterval,
const Future<string>& output)
{
if (promise->future().hasDiscard()) {
promise->discard();
return;
}
if (!output.isReady()) {
promise->fail(output.isFailed() ? output.failure() : "future discarded");
return;
}
Try<Docker::Container> container = Docker::Container::create(
output.get());
if (container.isError()) {
promise->fail("Unable to create container: " + container.error());
return;
}
if (retryInterval.isSome() && !container.get().started) {
VLOG(1) << "Retrying inspect since container not yet started. cmd: '"
<< cmd << "', interval: " << stringify(retryInterval.get());
Clock::timer(retryInterval.get(),
[=]() { _inspect(cmd, promise, retryInterval); } );
return;
}
promise->set(container.get());
}
Future<list<Docker::Container>> Docker::ps(
bool all,
const Option<string>& prefix) const
{
string cmd = path + " -H " + socket + (all ? " ps -a" : " ps");
VLOG(1) << "Running " << cmd;
Try<Subprocess> s = subprocess(
cmd,
Subprocess::PATH("/dev/null"),
Subprocess::PIPE(),
Subprocess::PIPE());
if (s.isError()) {
return Failure(s.error());
}
// Start reading from stdout so writing to the pipe won't block
// to handle cases where the output is larger than the pipe
// capacity.
const Future<string>& output = io::read(s.get().out().get());
return s.get().status()
.then(lambda::bind(&Docker::_ps, *this, cmd, s.get(), prefix, output));
}
Future<list<Docker::Container>> Docker::_ps(
const Docker& docker,
const string& cmd,
const Subprocess& s,
const Option<string>& prefix,
Future<string> output)
{
Option<int> status = s.status().get();
if (!status.isSome()) {
output.discard();
return Failure("No status found from '" + cmd + "'");
} else if (status.get() != 0) {
output.discard();
CHECK_SOME(s.err());
return io::read(s.err().get())
.then(lambda::bind(
failure<list<Docker::Container>>,
cmd,
status.get(),
lambda::_1));
}
// Read to EOF.
return output.then(lambda::bind(&Docker::__ps, docker, prefix, lambda::_1));
}
Future<list<Docker::Container>> Docker::__ps(
const Docker& docker,
const Option<string>& prefix,
const string& output)
{
Owned<vector<string>> lines(new vector<string>());
*lines = strings::tokenize(output, "\n");
// Skip the header.
CHECK(!lines->empty());
lines->erase(lines->begin());
Owned<list<Docker::Container>> containers(new list<Docker::Container>());
Owned<Promise<list<Docker::Container>>> promise(
new Promise<list<Docker::Container>>());
// Limit number of parallel calls to docker inspect at once to prevent
// reaching system's open file descriptor limit.
inspectBatches(containers, lines, promise, docker, prefix);
return promise->future();
}
// TODO(chenlily): Generalize functionality into a concurrency limiter
// within libprocess.
void Docker::inspectBatches(
Owned<list<Docker::Container>> containers,
Owned<vector<string>> lines,
Owned<Promise<list<Docker::Container>>> promise,
const Docker& docker,
const Option<string>& prefix)
{
list<Future<Docker::Container>> batch =
createInspectBatch(lines, docker, prefix);
collect(batch).onAny([=](const Future<list<Docker::Container>>& c) {
if (c.isReady()) {
foreach (const Docker::Container& container, c.get()) {
containers->push_back(container);
}
if (lines->empty()) {
promise->set(*containers);
}
else {
inspectBatches(containers, lines, promise, docker, prefix);
}
} else {
if (c.isFailed()) {
promise->fail("Docker ps batch failed " + c.failure());
}
else {
promise->fail("Docker ps batch discarded");
}
}
});
}
list<Future<Docker::Container>> Docker::createInspectBatch(
Owned<vector<string>> lines,
const Docker& docker,
const Option<string>& prefix)
{
list<Future<Docker::Container>> batch;
while (!lines->empty() && batch.size() < DOCKER_PS_MAX_INSPECT_CALLS) {
string line = lines->back();
lines->pop_back();
// Inspect the containers that we are interested in depending on
// whether or not a 'prefix' was specified.
vector<string> columns = strings::split(strings::trim(line), " ");
// We expect the name column to be the last column from ps.
string name = columns[columns.size() - 1];
if (prefix.isNone() || strings::startsWith(name, prefix.get())) {
batch.push_back(docker.inspect(name));
}
}
return batch;
}
Future<Docker::Image> Docker::pull(
const string& directory,
const string& image,
bool force) const
{
vector<string> argv;
string dockerImage = image;
// Check if the specified image has a tag. Also split on "/" in case
// the user specified a registry server (ie: localhost:5000/image)
// to get the actual image name. If no tag was given we add a
// 'latest' tag to avoid pulling down the repository.
vector<string> parts = strings::split(image, "/");
if (!strings::contains(parts.back(), ":")) {
dockerImage += ":latest";
}
if (force) {
// Skip inspect and docker pull the image.
return Docker::__pull(*this, directory, image, path, socket);
}
argv.push_back(path);
argv.push_back("-H");
argv.push_back(socket);
argv.push_back("inspect");
argv.push_back(dockerImage);
string cmd = strings::join(" ", argv);
VLOG(1) << "Running " << cmd;
Try<Subprocess> s = subprocess(
path,
argv,
Subprocess::PATH("/dev/null"),
Subprocess::PIPE(),
Subprocess::PIPE(),
None());
if (s.isError()) {
return Failure("Failed to execute '" + cmd + "': " + s.error());
}
// Start reading from stdout so writing to the pipe won't block
// to handle cases where the output is larger than the pipe
// capacity.
const Future<string> output = io::read(s.get().out().get());
// We assume docker inspect to exit quickly and do not need to be
// discarded.
return s.get().status()
.then(lambda::bind(
&Docker::_pull,
*this,
s.get(),
directory,
dockerImage,
path,
socket,
output));
}
Future<Docker::Image> Docker::_pull(
const Docker& docker,
const Subprocess& s,
const string& directory,
const string& image,
const string& path,
const string& socket,
Future<string> output)
{
Option<int> status = s.status().get();
if (status.isSome() && status.get() == 0) {
return output
.then(lambda::bind(&Docker::____pull, lambda::_1));
}
output.discard();
return Docker::__pull(docker, directory, image, path, socket);
}
Future<Docker::Image> Docker::__pull(
const Docker& docker,
const string& directory,
const string& image,
const string& path,
const string& socket)
{
vector<string> argv;
argv.push_back(path);
argv.push_back("-H");
argv.push_back(socket);
argv.push_back("pull");
argv.push_back(image);
string cmd = strings::join(" ", argv);
VLOG(1) << "Running " << cmd;
// Set HOME variable to pick up .dockercfg.
map<string, string> environment = os::environment();
environment["HOME"] = directory;
Try<Subprocess> s_ = subprocess(
path,
argv,
Subprocess::PATH("/dev/null"),
Subprocess::PIPE(),
Subprocess::PIPE(),
None(),
environment);
if (s_.isError()) {
return Failure("Failed to execute '" + cmd + "': " + s_.error());
}
// Docker pull can run for a long time due to large images, so
// we allow the future to be discarded and it will kill the pull
// process.
return s_.get().status()
.then(lambda::bind(
&Docker::___pull,
docker,
s_.get(),
cmd,
directory,
image))
.onDiscard(lambda::bind(&commandDiscarded, s_.get(), cmd));
}
Future<Docker::Image> Docker::___pull(
const Docker& docker,
const Subprocess& s,
const string& cmd,
const string& directory,
const string& image)
{
Option<int> status = s.status().get();
if (!status.isSome()) {
return Failure("No status found from '" + cmd + "'");
} else if (status.get() != 0) {
return io::read(s.err().get())
.then(lambda::bind(&failure<Image>, cmd, status.get(), lambda::_1));
}
// We re-invoke Docker::pull in order to now do an 'inspect' since
// the image should be present (see Docker::pull).
// TODO(benh): Factor out inspect code from Docker::pull to be
// reused rather than this (potentially infinite) recursive call.
return docker.pull(directory, image);
}
Future<Docker::Image> Docker::____pull(
const string& output)
{
Try<JSON::Array> parse = JSON::parse<JSON::Array>(output);
if (parse.isError()) {
return Failure("Failed to parse JSON: " + parse.error());
}
JSON::Array array = parse.get();
// Only return if only one image identified with name.
if (array.values.size() == 1) {
CHECK(array.values.front().is<JSON::Object>());
Try<Docker::Image> image =
Docker::Image::create(array.values.front().as<JSON::Object>());
if (image.isError()) {
return Failure("Unable to create image: " + image.error());
}
return image.get();
}
// TODO(tnachen): Handle the case where the short image ID was
// not sufficiently unique and 'array.values.size() > 1'.
return Failure("Failed to find image");
}