blob: 8ad58339b09876189439767b20a755e9bf47ccfb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <cmath>
#include <map>
#include <mutex>
#include <utility>
#include <vector>
#include <stout/error.hpp>
#include <stout/foreach.hpp>
#include <stout/json.hpp>
#include <stout/lambda.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/result.hpp>
#include <stout/strings.hpp>
#include <stout/stringify.hpp>
#include <stout/os/constants.hpp>
#include <stout/os/killtree.hpp>
#include <stout/os/read.hpp>
#include <stout/os/write.hpp>
#ifdef __WINDOWS__
#include <stout/os/windows/jobobject.hpp>
#endif // __WINDOWS__
#include <process/check.hpp>
#include <process/collect.hpp>
#include <process/io.hpp>
#ifdef __WINDOWS__
#include <process/windows/jobobject.hpp>
#endif // __WINDOWS__
#include "common/status_utils.hpp"
#include "docker/docker.hpp"
#ifdef __linux__
#include "linux/cgroups.hpp"
#endif // __linux__
#include "slave/containerizer/mesos/utils.hpp"
#include "slave/containerizer/mesos/isolators/cgroups/constants.hpp"
#include "slave/constants.hpp"
using namespace mesos;
using namespace mesos::internal::slave;
using namespace process;
using std::map;
using std::mutex;
using std::pair;
using std::shared_ptr;
using std::string;
using std::vector;
using mesos::internal::ContainerDNSInfo;
template <typename T>
static Future<T> failure(
const string& cmd,
int status,
const string& err)
{
return Failure(
"Failed to run '" + cmd + "': " + WSTRINGIFY(status) +
"; stderr='" + err + "'");
}
static Future<Nothing> _checkError(const string& cmd, const Subprocess& s)
{
Option<int> status = s.status().get();
if (status.isNone()) {
return Failure("No status found for '" + cmd + "'");
}
if (status.get() != 0) {
// TODO(tnachen): Consider returning stdout as well.
CHECK_SOME(s.err());
return io::read(s.err().get())
.then(lambda::bind(failure<Nothing>, cmd, status.get(), lambda::_1));
}
return Nothing();
}
// Returns a failure if no status or non-zero status returned from
// subprocess.
static Future<Nothing> checkError(const string& cmd, const Subprocess& s)
{
return s.status()
.then(lambda::bind(_checkError, cmd, s));
}
Try<Owned<Docker>> Docker::create(
const string& path,
const string& socket,
bool validate,
const Option<JSON::Object>& config)
{
#ifndef __WINDOWS__
// TODO(hausdorff): Currently, `path::absolute` does not handle all the edge
// cases of Windows. Revisit this when MESOS-3442 is resolved.
//
// NOTE: When we do come back and fix this bug, it is also worth noting that
// on Windows an empty value of `socket` is frequently used to connect to the
// Docker host (i.e., the user wants to connect 'npipes://', with an empty
// socket path). A full solution should accommodate this.
if (!path::is_absolute(socket)) {
return Error("Invalid Docker socket path: " + socket);
}
#endif // __WINDOWS__
Owned<Docker> docker(new Docker(path, socket, config));
if (!validate) {
return docker;
}
#ifdef __linux__
// Make sure that cgroups are mounted, and at least the 'cpu'
// subsystem is attached.
Result<string> hierarchy = cgroups::hierarchy("cpu");
if (hierarchy.isNone()) {
return Error("Failed to find a mounted cgroups hierarchy "
"for the 'cpu' subsystem; you probably need "
"to mount cgroups manually");
}
#endif // __linux__
Try<Nothing> validateVersion = docker->validateVersion(Version(1, 8, 0));
if (validateVersion.isError()) {
return Error(validateVersion.error());
}
return docker;
}
void commandDiscarded(const Subprocess& s, const string& cmd)
{
if (s.status().isPending()) {
VLOG(1) << "'" << cmd << "' is being discarded";
os::kill(s.pid(), SIGKILL);
}
}
vector<Subprocess::ParentHook> createParentHooks()
{
return {
#ifdef __WINDOWS__
// To correctly discard the docker cli process tree in `commandDiscarded`,
// we need to wrap the process in a job object.
Subprocess::ParentHook::CREATE_JOB(),
Subprocess::ParentHook(&os::set_job_kill_on_close_limit),
#endif // __WINDOWS__
};
}
Future<Version> Docker::version() const
{
string cmd = path + " -H " + socket + " --version";
Try<Subprocess> s = subprocess(
cmd,
Subprocess::PATH(os::DEV_NULL),
Subprocess::PIPE(),
Subprocess::PIPE(),
None(),
None(),
createParentHooks());
if (s.isError()) {
return Failure("Failed to create subprocess '" + cmd + "': " + s.error());
}
return s->status()
.then(lambda::bind(&Docker::_version, cmd, s.get()));
}
Future<Version> Docker::_version(const string& cmd, const Subprocess& s)
{
const Option<int>& status = s.status().get();
if (status.isNone() || status.get() != 0) {
string msg = "Failed to execute '" + cmd + "': ";
if (status.isSome()) {
msg += WSTRINGIFY(status.get());
} else {
msg += "unknown exit status";
}
return Failure(msg);
}
CHECK_SOME(s.out());
return io::read(s.out().get())
.then(lambda::bind(&Docker::__version, lambda::_1));
}
Future<Version> Docker::__version(const Future<string>& output)
{
vector<string> parts = strings::split(output.get(), ",");
if (!parts.empty()) {
vector<string> subParts = strings::split(parts.front(), " ");
if (!subParts.empty()) {
// Docker version output in Fedora 22 is "x.x.x.fc22" which does not match
// the Semantic Versioning specification(<major>[.<minor>[.<patch>]]). We
// remove the overflow components here before parsing the docker version
// output to a Version struct.
string versionString = subParts.back();
vector<string> components = strings::split(versionString, ".");
if (components.size() > 3) {
components.erase(components.begin() + 3, components.end());
}
versionString = strings::join(".", components);
Try<Version> version = Version::parse(versionString);
if (version.isError()) {
return Failure("Failed to parse docker version: " +
version.error());
}
return version;
}
}
return Failure("Unable to find docker version in output");
}
Try<Nothing> Docker::validateVersion(const Version& minVersion) const
{
// Validate the version (and that we can use Docker at all).
Future<Version> version = this->version();
if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) {
return Error("Timed out getting docker version");
}
if (version.isFailed()) {
return Error("Failed to get docker version: " + version.failure());
}
if (version.get() < minVersion) {
string msg = "Insufficient version '" + stringify(version.get()) +
"' of Docker. Please upgrade to >=' " +
stringify(minVersion) + "'";
return Error(msg);
}
return Nothing();
}
// TODO(josephw): Parse this string with a protobuf.
Try<Docker::Container> Docker::Container::create(const string& output)
{
Try<JSON::Array> parse = JSON::parse<JSON::Array>(output);
if (parse.isError()) {
return Error("Failed to parse JSON: " + parse.error());
}
// TODO(benh): Handle the case where the short container ID was
// not sufficiently unique and 'array.values.size() > 1'.
JSON::Array array = parse.get();
if (array.values.size() != 1) {
return Error("Failed to find container");
}
CHECK(array.values.front().is<JSON::Object>());
JSON::Object json = array.values.front().as<JSON::Object>();
Result<JSON::String> idValue = json.find<JSON::String>("Id");
if (idValue.isNone()) {
return Error("Unable to find Id in container");
} else if (idValue.isError()) {
return Error("Error finding Id in container: " + idValue.error());
}
string id = idValue->value;
Result<JSON::String> nameValue = json.find<JSON::String>("Name");
if (nameValue.isNone()) {
return Error("Unable to find Name in container");
} else if (nameValue.isError()) {
return Error("Error finding Name in container: " + nameValue.error());
}
string name = nameValue->value;
Result<JSON::Object> stateValue = json.find<JSON::Object>("State");
if (stateValue.isNone()) {
return Error("Unable to find State in container");
} else if (stateValue.isError()) {
return Error("Error finding State in container: " + stateValue.error());
}
Result<JSON::Number> pidValue = stateValue->find<JSON::Number>("Pid");
if (pidValue.isNone()) {
return Error("Unable to find Pid in State");
} else if (pidValue.isError()) {
return Error("Error finding Pid in State: " + pidValue.error());
}
pid_t pid = pid_t(pidValue->as<int64_t>());
Option<pid_t> optionalPid;
if (pid != 0) {
optionalPid = pid;
}
Result<JSON::String> startedAtValue =
stateValue->find<JSON::String>("StartedAt");
if (startedAtValue.isNone()) {
return Error("Unable to find StartedAt in State");
} else if (startedAtValue.isError()) {
return Error("Error finding StartedAt in State: " + startedAtValue.error());
}
bool started = startedAtValue->value != "0001-01-01T00:00:00Z";
Option<string> ipAddress;
Option<string> ip6Address;
bool findDeprecatedIP = false;
Result<JSON::String> networkMode =
json.find<JSON::String>("HostConfig.NetworkMode");
if (!networkMode.isSome()) {
// We need to fallback to the old field as Docker added NetworkMode
// since Docker remote API 1.15.
VLOG(1) << "Unable to detect HostConfig.NetworkMode, "
<< "attempting deprecated IP field";
findDeprecatedIP = true;
} else {
// We currently rely on the fact that we always set --net when
// we shell out to docker run, and therefore the network mode
// matches what --net is. Without --net, the network mode would be set
// to 'default' and we won't be able to find the IP address as
// it will be in 'Networks.bridge' key.
string addressLocation = "NetworkSettings.Networks." +
networkMode->value + ".IPAddress";
Result<JSON::String> ipAddressValue =
json.find<JSON::String>(addressLocation);
if (!ipAddressValue.isSome()) {
// We also need to fallback to the old field as the IP Address
// field location also changed since Docker remote API 1.20.
VLOG(1) << "Unable to detect IP Address at '" << addressLocation << "',"
<< " attempting deprecated field";
findDeprecatedIP = true;
} else if (!ipAddressValue->value.empty()) {
ipAddress = ipAddressValue->value;
}
// Check if the container has an IPv6 address.
//
// NOTE: For IPv6 we don't need to worry about the old method of
// looking at the deprecated "NetworkSettings.IPAddress" since we
// want to support IPv6 addresses for docker versions that support
// USER mode networking, which is a relatively recent feature.
string address6Location = "NetworkSettings.Networks." +
networkMode->value + ".GlobalIPv6Address";
Result<JSON::String> ip6AddressValue =
json.find<JSON::String>(address6Location);
if (ip6AddressValue.isSome() && !ip6AddressValue->value.empty()) {
ip6Address = ip6AddressValue->value;
}
}
if (findDeprecatedIP) {
Result<JSON::String> ipAddressValue =
json.find<JSON::String>("NetworkSettings.IPAddress");
if (ipAddressValue.isNone()) {
return Error("Unable to find NetworkSettings.IPAddress in container");
} else if (ipAddressValue.isError()) {
return Error(
"Error finding NetworkSettings.IPAddress in container: " +
ipAddressValue.error());
} else if (!ipAddressValue->value.empty()) {
ipAddress = ipAddressValue->value;
}
}
vector<Device> devices;
Result<JSON::Array> devicesArray =
json.find<JSON::Array>("HostConfig.Devices");
if (devicesArray.isError()) {
return Error("Failed to parse HostConfig.Devices: " + devicesArray.error());
}
if (devicesArray.isSome()) {
foreach (const JSON::Value& entry, devicesArray->values) {
if (!entry.is<JSON::Object>()) {
return Error("Malformed HostConfig.Devices"
" entry '" + stringify(entry) + "'");
}
JSON::Object object = entry.as<JSON::Object>();
Result<JSON::String> hostPath =
object.at<JSON::String>("PathOnHost");
Result<JSON::String> containerPath =
object.at<JSON::String>("PathInContainer");
Result<JSON::String> permissions =
object.at<JSON::String>("CgroupPermissions");
if (!hostPath.isSome() ||
!containerPath.isSome() ||
!permissions.isSome()) {
return Error("Malformed HostConfig.Devices entry"
" '" + stringify(object) + "'");
}
Device device;
device.hostPath = Path(hostPath->value);
device.containerPath = Path(containerPath->value);
device.access.read = strings::contains(permissions->value, "r");
device.access.write = strings::contains(permissions->value, "w");
device.access.mknod = strings::contains(permissions->value, "m");
devices.push_back(device);
}
}
vector<string> dns;
Result<JSON::Array> dnsArray =
json.find<JSON::Array>("HostConfig.Dns");
if (dnsArray.isError()) {
return Error("Failed to parse HostConfig.Dns: " + dnsArray.error());
}
if (dnsArray.isSome()) {
foreach (const JSON::Value& entry, dnsArray->values) {
if (!entry.is<JSON::String>()) {
return Error("Malformed HostConfig.Dns"
" entry '" + stringify(entry) + "'");
}
dns.push_back(entry.as<JSON::String>().value);
}
}
vector<string> dnsOptions;
Result<JSON::Array> dnsOptionArray =
json.find<JSON::Array>("HostConfig.DnsOptions");
if (dnsOptionArray.isError()) {
return Error("Failed to parse HostConfig.DnsOptions: " +
dnsOptionArray.error());
}
if (dnsOptionArray.isSome()) {
foreach (const JSON::Value& entry, dnsOptionArray->values) {
if (!entry.is<JSON::String>()) {
return Error("Malformed HostConfig.DnsOptions"
" entry '" + stringify(entry) + "'");
}
dnsOptions.push_back(entry.as<JSON::String>().value);
}
}
vector<string> dnsSearch;
Result<JSON::Array> dnsSearchArray =
json.find<JSON::Array>("HostConfig.DnsSearch");
if (dnsSearchArray.isError()) {
return Error("Failed to parse HostConfig.DnsSearch: " +
dnsSearchArray.error());
}
if (dnsSearchArray.isSome()) {
foreach (const JSON::Value& entry, dnsSearchArray->values) {
if (!entry.is<JSON::String>()) {
return Error("Malformed HostConfig.DnsSearch"
" entry '" + stringify(entry) + "'");
}
dnsSearch.push_back(entry.as<JSON::String>().value);
}
}
return Container(
output,
id,
name,
optionalPid,
started,
ipAddress,
ip6Address,
devices,
dns,
dnsOptions,
dnsSearch);
}
Try<Docker::Image> Docker::Image::create(const JSON::Object& json)
{
Result<JSON::Value> entrypoint =
json.find<JSON::Value>("ContainerConfig.Entrypoint");
if (entrypoint.isError()) {
return Error("Failed to find 'ContainerConfig.Entrypoint': " +
entrypoint.error());
} else if (entrypoint.isNone()) {
return Error("Unable to find 'ContainerConfig.Entrypoint'");
}
Option<vector<string>> entrypointOption = None();
if (!entrypoint->is<JSON::Null>()) {
if (!entrypoint->is<JSON::Array>()) {
return Error("Unexpected type found for 'ContainerConfig.Entrypoint'");
}
const vector<JSON::Value>& values = entrypoint->as<JSON::Array>().values;
if (values.size() != 0) {
vector<string> result;
foreach (const JSON::Value& value, values) {
if (!value.is<JSON::String>()) {
return Error("Expecting entrypoint value to be type string");
}
result.push_back(value.as<JSON::String>().value);
}
entrypointOption = result;
}
}
Result<JSON::Value> env =
json.find<JSON::Value>("ContainerConfig.Env");
if (env.isError()) {
return Error("Failed to find 'ContainerConfig.Env': " +
env.error());
} else if (env.isNone()) {
return Error("Unable to find 'ContainerConfig.Env'");
}
Option<map<string, string>> envOption = None();
if (!env->is<JSON::Null>()) {
if (!env->is<JSON::Array>()) {
return Error("Unexpected type found for 'ContainerConfig.Env'");
}
const vector<JSON::Value>& values = env->as<JSON::Array>().values;
if (values.size() != 0) {
map<string, string> result;
foreach (const JSON::Value& value, values) {
if (!value.is<JSON::String>()) {
return Error("Expecting environment value to be type string");
}
const vector<string> tokens =
strings::split(value.as<JSON::String>().value, "=", 2);
if (tokens.size() != 2) {
return Error("Unexpected Env format for 'ContainerConfig.Env'");
}
if (result.count(tokens[0]) > 0) {
return Error("Unexpected duplicate environment variables '"
+ tokens[0] + "'");
}
result[tokens[0]] = tokens[1];
}
envOption = result;
}
}
return Docker::Image(entrypointOption, envOption);
}
Try<Docker::RunOptions> Docker::RunOptions::create(
const ContainerInfo& containerInfo,
const CommandInfo& commandInfo,
const string& name,
const string& sandboxDirectory,
const string& mappedDirectory,
const Option<Resources>& resourceRequests,
bool enableCfsQuota,
const Option<map<string, string>>& env,
const Option<vector<Device>>& devices,
const Option<ContainerDNSInfo>& defaultContainerDNS,
const Option<google::protobuf::Map<string, Value::Scalar>>& resourceLimits)
{
if (!containerInfo.has_docker()) {
return Error("No docker info found in container info");
}
const ContainerInfo::DockerInfo& dockerInfo = containerInfo.docker();
RunOptions options;
options.privileged = dockerInfo.privileged();
Option<double> cpuRequest, cpuLimit, memLimit;
Option<Bytes> memRequest;
if (resourceRequests.isSome()) {
// TODO(yifan): Support other resources (e.g. disk).
cpuRequest = resourceRequests->cpus();
memRequest = resourceRequests->mem();
}
if (resourceLimits.isSome()) {
foreach (auto&& limit, resourceLimits.get()) {
if (limit.first == "cpus") {
cpuLimit = limit.second.value();
} else if (limit.first == "mem") {
memLimit = limit.second.value();
}
}
}
if (cpuRequest.isSome()) {
options.cpuShares = std::max(
static_cast<uint64_t>(CPU_SHARES_PER_CPU * cpuRequest.get()),
MIN_CPU_SHARES);
}
// Set the `--cpu-quota` option to CPU limit (if it is not an infinite
// value) or to CPU request if the flag `--cgroups_enable_cfs` is true.
// If CPU limit is infinite, `--cpu-quota` will not be set at all which
// means the Docker container will run with infinite CPU quota.
if (cpuLimit.isSome()) {
if (!std::isinf(cpuLimit.get())) {
const Duration quota =
std::max(CPU_CFS_PERIOD * cpuLimit.get(), MIN_CPU_CFS_QUOTA);
options.cpuQuota = static_cast<uint64_t>(quota.us());
}
} else if (enableCfsQuota && cpuRequest.isSome()) {
const Duration quota =
std::max(CPU_CFS_PERIOD * cpuRequest.get(), MIN_CPU_CFS_QUOTA);
options.cpuQuota = static_cast<uint64_t>(quota.us());
}
// Set the `--memory` option to memory limit (if it is not an infinite
// value) or to memory request. If memory limits is infinite, `--memory`
// will not be set at all which means the Docker container will run with
// infinite memory limit.
if (memLimit.isSome()) {
if (!std::isinf(memLimit.get())) {
options.memory =
std::max(Megabytes(static_cast<uint64_t>(memLimit.get())), MIN_MEMORY);
}
if (memRequest.isSome()) {
options.memoryReservation = std::max(memRequest.get(), MIN_MEMORY);
if (std::isinf(memLimit.get()) ||
memRequest.get() < Megabytes(static_cast<uint64_t>(memLimit.get()))) {
Try<int> oomScoreAdj = calculateOOMScoreAdj(memRequest.get());
if (oomScoreAdj.isError()) {
return Error(
"Failed to calculate OOM score adjustment: " +
oomScoreAdj.error());
}
options.oomScoreAdj = oomScoreAdj.get();
}
}
} else if (memRequest.isSome()) {
options.memory = std::max(memRequest.get(), MIN_MEMORY);
}
if (env.isSome()) {
foreachpair (const string& key, const string& value, env.get()) {
options.env[key] = value;
}
}
foreach (const Environment::Variable& variable,
commandInfo.environment().variables()) {
if (env.isSome() &&
env->find(variable.name()) != env->end()) {
// Skip to avoid duplicate environment variables.
continue;
}
options.env[variable.name()] = variable.value();
}
options.env["MESOS_SANDBOX"] = mappedDirectory;
options.env["MESOS_CONTAINER_NAME"] = name;
if (resourceRequests.isSome()) {
// Set the `MESOS_ALLOCATION_ROLE` environment variable. Please note
// that tasks and executors are not allowed to mix resources allocated
// to different roles, see MESOS-6636.
const Resource resource = *resourceRequests->begin();
options.env["MESOS_ALLOCATION_ROLE"] = resource.allocation_info().role();
}
Option<string> volumeDriver;
foreach (const Volume& volume, containerInfo.volumes()) {
// The 'container_path' can be either an absolute path or a
// relative path. If it is a relative path, it would be prefixed
// with the container sandbox directory.
string volumeConfig = path::is_absolute(volume.container_path())
? volume.container_path()
: path::join(mappedDirectory, volume.container_path());
// TODO(gyliu513): Set `host_path` as source.
if (volume.has_host_path()) {
// If both 'host_path' and 'container_path' are relative paths,
// return a failure because the user can just directly access the
// volume in the sandbox.
if (!path::is_absolute(volume.host_path()) &&
!path::is_absolute(volume.container_path())) {
return Error(
"Both host_path '" + volume.host_path() + "' " +
"and container_path '" + volume.container_path() + "' " +
"of a volume are relative");
}
if (!path::is_absolute(volume.host_path()) &&
!dockerInfo.has_volume_driver()) {
// When volume driver is empty and host path is a relative path, mapping
// host path from the sandbox.
volumeConfig =
path::join(sandboxDirectory, volume.host_path()) + ":" + volumeConfig;
} else {
volumeConfig = volume.host_path() + ":" + volumeConfig;
}
switch (volume.mode()) {
case Volume::RW: volumeConfig += ":rw"; break;
case Volume::RO: volumeConfig += ":ro"; break;
default: return Error("Unsupported volume mode");
}
} else if (volume.has_source()) {
if (volume.source().type() != Volume::Source::DOCKER_VOLUME) {
VLOG(1) << "Ignored volume type '" << volume.source().type()
<< "' for container '" << name << "' as only "
<< "'DOCKER_VOLUME' was supported by docker";
continue;
}
volumeConfig = volume.source().docker_volume().name() +
":" + volumeConfig;
if (volume.source().docker_volume().has_driver()) {
const string& currentDriver = volume.source().docker_volume().driver();
if (volumeDriver.isSome() &&
volumeDriver.get() != currentDriver) {
return Error("Only one volume driver is supported");
}
volumeDriver = currentDriver;
}
switch (volume.mode()) {
case Volume::RW: volumeConfig += ":rw"; break;
case Volume::RO: volumeConfig += ":ro"; break;
default: return Error("Unsupported volume mode");
}
} else {
return Error("Host path or volume source is required");
}
options.volumes.push_back(volumeConfig);
}
// Mapping sandbox directory into the container mapped directory.
options.volumes.push_back(sandboxDirectory + ":" + mappedDirectory);
// TODO(gyliu513): Deprecate this after the release cycle of 1.0.
// It will be replaced by Volume.Source.DockerVolume.driver.
if (dockerInfo.has_volume_driver()) {
if (volumeDriver.isSome() &&
volumeDriver.get() != dockerInfo.volume_driver()) {
return Error("Only one volume driver per task is supported");
}
volumeDriver = dockerInfo.volume_driver();
}
options.volumeDriver = volumeDriver;
ContainerInfo::DockerInfo::Network network;
if (dockerInfo.has_network()) {
network = dockerInfo.network();
} else {
// If no network was given, then use the OS specific default.
#ifdef __WINDOWS__
network = ContainerInfo::DockerInfo::BRIDGE;
#else
network = ContainerInfo::DockerInfo::HOST;
#endif // __WINDOWS__
}
// See https://docs.microsoft.com/en-us/virtualization/windowscontainers/manage-containers/container-networking // NOLINT(whitespace/line_length)
// and https://docs.docker.com/engine/userguide/networking/ on what network
// modes are supported for Windows and Linux docker respectively.
switch (network) {
case ContainerInfo::DockerInfo::HOST: {
#ifdef __WINDOWS__
return Error("Unsupported Network mode: " + stringify(network));
#else
options.network = "host";
break;
#endif // __WINDOWS__
}
case ContainerInfo::DockerInfo::BRIDGE: {
#ifdef __WINDOWS__
// Windows "nat" network mode is equivalent to Linux "bridge" mode.
options.network = "nat";
#else
options.network = "bridge";
#endif // __WINDOWS__
break;
}
case ContainerInfo::DockerInfo::NONE: {
options.network = "none";
break;
}
case ContainerInfo::DockerInfo::USER: {
if (containerInfo.network_infos_size() == 0) {
return Error("No network info found in container info");
}
if (containerInfo.network_infos_size() > 1) {
return Error("Only a single network can be defined in Docker run");
}
const NetworkInfo& networkInfo = containerInfo.network_infos(0);
if (!networkInfo.has_name()) {
return Error("No network name found in network info");
}
options.network = networkInfo.name();
break;
}
default: return Error("Unsupported Network mode: " + stringify(network));
}
if (containerInfo.has_hostname()) {
if (options.network.isSome() && options.network.get() == "host") {
return Error("Unable to set hostname with host network mode");
}
options.hostname = containerInfo.hostname();
}
if (dockerInfo.port_mappings().size() > 0) {
if (options.network.isSome() &&
(options.network.get() == "host" || options.network.get() == "none")) {
return Error("Port mappings are only supported for bridge and "
"user-defined networks");
}
if (!resourceRequests.isSome()) {
return Error("Port mappings require resources");
}
Option<Value::Ranges> portRanges = resourceRequests->ports();
if (!portRanges.isSome()) {
return Error("Port mappings require port resources");
}
foreach (const ContainerInfo::DockerInfo::PortMapping& mapping,
dockerInfo.port_mappings()) {
bool found = false;
foreach (const Value::Range& range, portRanges->range()) {
if (mapping.host_port() >= range.begin() &&
mapping.host_port() <= range.end()) {
found = true;
break;
}
}
if (!found) {
return Error("Port [" + stringify(mapping.host_port()) + "] not " +
"included in resources");
}
Docker::PortMapping portMapping;
portMapping.hostPort = mapping.host_port();
portMapping.containerPort = mapping.container_port();
if (mapping.has_protocol()) {
portMapping.protocol = mapping.protocol();
}
options.portMappings.push_back(portMapping);
}
}
if (devices.isSome()) {
options.devices = devices.get();
}
options.name = name;
bool dnsSpecified = false;
foreach (const Parameter& parameter, dockerInfo.parameters()) {
options.additionalOptions.push_back(
"--" + parameter.key() + "=" + parameter.value());
// In Docker 1.13.0, `--dns-option` was added and `--dns-opt` was hidden
// (but it can still be used), so here we need to check both of them.
if (!dnsSpecified &&
(parameter.key() == "dns" ||
parameter.key() == "dns-search" ||
parameter.key() == "dns-opt" ||
parameter.key() == "dns-option")) {
dnsSpecified = true;
}
}
if (!dnsSpecified && defaultContainerDNS.isSome()) {
Option<ContainerDNSInfo::DockerInfo> bridgeDNS;
Option<ContainerDNSInfo::DockerInfo> defaultUserDNS;
hashmap<string, ContainerDNSInfo::DockerInfo> userDNSMap;
foreach (const ContainerDNSInfo::DockerInfo& dnsInfo,
defaultContainerDNS->docker()) {
// Currently we only support setting DNS for containers which join
// Docker bridge network or user-defined network.
if (dnsInfo.network_mode() == ContainerDNSInfo::DockerInfo::BRIDGE) {
bridgeDNS = dnsInfo;
} else if (dnsInfo.network_mode() == ContainerDNSInfo::DockerInfo::USER) {
if (!dnsInfo.has_network_name()) {
// The DNS info which has network node set as `USER` and has no
// network name set is considered as the default DNS for all
// user-defined networks. It applies to the Docker container which
// joins a user-defined network but that network can not be found in
// `defaultContainerDNS`.
defaultUserDNS = dnsInfo;
} else {
userDNSMap[dnsInfo.network_name()] = dnsInfo;
}
}
}
auto setDNSInfo = [&](const ContainerDNSInfo::DockerInfo& dnsInfo) {
options.dns.assign(
dnsInfo.dns().nameservers().begin(),
dnsInfo.dns().nameservers().end());
options.dnsSearch.assign(
dnsInfo.dns().search().begin(),
dnsInfo.dns().search().end());
options.dnsOpt.assign(
dnsInfo.dns().options().begin(),
dnsInfo.dns().options().end());
};
if (dockerInfo.network() == ContainerInfo::DockerInfo::BRIDGE &&
bridgeDNS.isSome()) {
setDNSInfo(bridgeDNS.get());
} else if (dockerInfo.network() == ContainerInfo::DockerInfo::USER) {
if (userDNSMap.contains(options.network.get())) {
setDNSInfo(userDNSMap.at(options.network.get()));
} else if (defaultUserDNS.isSome()) {
setDNSInfo(defaultUserDNS.get());
}
}
}
options.image = dockerInfo.image();
if (commandInfo.shell()) {
// We override the entrypoint if shell is enabled because we
// assume the user intends to run the command within a shell
// and not the default entrypoint of the image. View MESOS-1770
// for more details.
#ifdef __WINDOWS__
options.entrypoint = "cmd";
#else
options.entrypoint = "/bin/sh";
#endif // __WINDOWS__
}
if (commandInfo.shell()) {
if (!commandInfo.has_value()) {
return Error("Shell specified but no command value provided");
}
// The Docker CLI only supports a single word for overriding the
// entrypoint, so we must specify `-c` (or `/c` on Windows)
// for the other parts of the command.
#ifdef __WINDOWS__
options.arguments.push_back("/c");
#else
options.arguments.push_back("-c");
#endif // __WINDOWS__
options.arguments.push_back(commandInfo.value());
} else {
if (commandInfo.has_value()) {
options.arguments.push_back(commandInfo.value());
}
foreach (const string& argument, commandInfo.arguments()) {
options.arguments.push_back(argument);
}
}
return options;
}
Future<Option<int>> Docker::run(
const Docker::RunOptions& options,
const process::Subprocess::IO& _stdout,
const process::Subprocess::IO& _stderr) const
{
vector<string> argv;
argv.push_back(path);
argv.push_back("-H");
argv.push_back(socket);
argv.push_back("run");
if (options.privileged) {
argv.push_back("--privileged");
}
if (options.cpuShares.isSome()) {
argv.push_back("--cpu-shares");
argv.push_back(stringify(options.cpuShares.get()));
}
if (options.cpuQuota.isSome()) {
argv.push_back("--cpu-quota");
argv.push_back(stringify(options.cpuQuota.get()));
}
if (options.memoryReservation.isSome()) {
argv.push_back("--memory-reservation");
argv.push_back(stringify(options.memoryReservation->bytes()));
}
if (options.memory.isSome()) {
argv.push_back("--memory");
argv.push_back(stringify(options.memory->bytes()));
}
if (options.oomScoreAdj.isSome()) {
argv.push_back("--oom-score-adj");
argv.push_back(stringify(options.oomScoreAdj.get()));
}
foreachpair(const string& key, const string& value, options.env) {
argv.push_back("-e");
argv.push_back(key + "=" + value);
}
foreach(const string& volume, options.volumes) {
argv.push_back("-v");
argv.push_back(volume);
}
if (options.volumeDriver.isSome()) {
argv.push_back("--volume-driver=" + options.volumeDriver.get());
}
if (options.network.isSome()) {
const string& network = options.network.get();
argv.push_back("--net");
argv.push_back(network);
if (network != "host" &&
network != "bridge" &&
network != "none") {
// User defined networks require Docker version >= 1.9.0.
Try<Nothing> validateVer = validateVersion(Version(1, 9, 0));
if (validateVer.isError()) {
return Failure("User defined networks require Docker "
"version 1.9.0 or higher");
}
}
if (network == "host" && !options.dns.empty()) {
// `--dns` option with host network requires Docker version >= 1.12.0,
// see https://github.com/moby/moby/pull/22408 for details.
Try<Nothing> validateVer = validateVersion(Version(1, 12, 0));
if (validateVer.isError()) {
return Failure("--dns option with host network requires Docker "
"version 1.12.0 or higher");
}
}
}
foreach (const string& dns, options.dns) {
argv.push_back("--dns");
argv.push_back(dns);
}
foreach (const string& search, options.dnsSearch) {
argv.push_back("--dns-search");
argv.push_back(search);
}
if (!options.dnsOpt.empty()) {
// `--dns-opt` option requires Docker version >= 1.9.0,
// see https://github.com/moby/moby/pull/16031 for details.
Try<Nothing> validateVer = validateVersion(Version(1, 9, 0));
if (validateVer.isError()) {
return Failure("--dns-opt option requires Docker "
"version 1.9.0 or higher");
}
}
foreach (const string& opt, options.dnsOpt) {
argv.push_back("--dns-opt");
argv.push_back(opt);
}
if (options.hostname.isSome()) {
argv.push_back("--hostname");
argv.push_back(options.hostname.get());
}
foreach (const Docker::PortMapping& mapping, options.portMappings) {
argv.push_back("-p");
string portMapping = stringify(mapping.hostPort) + ":" +
stringify(mapping.containerPort);
if (mapping.protocol.isSome()) {
portMapping += "/" + strings::lower(mapping.protocol.get());
}
argv.push_back(portMapping);
}
foreach (const Device& device, options.devices) {
if (!device.hostPath.is_absolute()) {
return Failure("Device path '" + device.hostPath.string() + "'"
" is not an absolute path");
}
string permissions;
permissions += device.access.read ? "r" : "";
permissions += device.access.write ? "w" : "";
permissions += device.access.mknod ? "m" : "";
// Docker doesn't handle this case (it fails by saying
// that an absolute path is not being provided).
if (permissions.empty()) {
return Failure("At least one access required for --devices:"
" none specified for"
" '" + device.hostPath.string() + "'");
}
// Note that docker silently does not handle default devices
// passed in with restricted permissions (e.g. /dev/null), so
// we don't bother checking this case either.
argv.push_back(
"--device=" +
device.hostPath.string() + ":" +
device.containerPath.string() + ":" +
permissions);
}
if (options.entrypoint.isSome()) {
argv.push_back("--entrypoint");
argv.push_back(options.entrypoint.get());
}
if (options.name.isSome()) {
argv.push_back("--name");
argv.push_back(options.name.get());
}
foreach (const string& option, options.additionalOptions) {
argv.push_back(option);
}
argv.push_back(options.image);
foreach(const string& argument, options.arguments) {
argv.push_back(argument);
}
string cmd = strings::join(" ", argv);
VLOG(1) << "Running " << cmd;
Try<Subprocess> s = subprocess(
path,
argv,
Subprocess::PATH(os::DEV_NULL),
_stdout,
_stderr,
nullptr,
None(),
None(),
createParentHooks());
if (s.isError()) {
return Failure("Failed to create subprocess '" + path + "': " + s.error());
}
s->status().onDiscard(lambda::bind(&commandDiscarded, s.get(), cmd));
// Ideally we could capture the stderr when docker itself fails,
// however due to the stderr redirection used here we cannot.
//
// TODO(bmahler): Determine a way to redirect stderr while still
// capturing the stderr when 'docker run' itself fails. E.g. we
// could use 'docker logs' in conjunction with a "detached" form
// of 'docker run' to isolate 'docker run' failure messages from
// the container stderr.
return s->status();
}
// NOTE: A known issue in Docker 1.12/1.13 sometimes leaks its mount
// namespace, causing `docker rm` to fail. As a workaround, we do a
// best-effort `docker rm` and log the error insteaf of return a
// failure when `remove` is set to true (MESOS-7777).
Future<Nothing> Docker::stop(
const string& containerName,
const Duration& timeout,
bool remove) const
{
int timeoutSecs = (int) timeout.secs();
if (timeoutSecs < 0) {
return Failure("A negative timeout cannot be applied to docker stop: " +
stringify(timeoutSecs));
}
vector<string> argv;
argv.push_back(path);
argv.push_back("-H");
argv.push_back(socket);
argv.push_back("stop");
argv.push_back("-t");
argv.push_back(stringify(timeoutSecs));
argv.push_back(containerName);
const string cmd = strings::join(" ", argv);
VLOG(1) << "Running " << cmd;
Try<Subprocess> s = subprocess(
path,
argv,
Subprocess::PATH(os::DEV_NULL),
Subprocess::PATH(os::DEV_NULL),
Subprocess::PIPE(),
nullptr,
None(),
None(),
createParentHooks());
if (s.isError()) {
return Failure("Failed to create subprocess '" + cmd + "': " + s.error());
}
return s->status()
.then(lambda::bind(
&Docker::_stop,
*this,
containerName,
cmd,
s.get(),
remove))
.onDiscard(lambda::bind(&commandDiscarded, s.get(), cmd));
}
Future<Nothing> Docker::_stop(
const Docker& docker,
const string& containerName,
const string& cmd,
const Subprocess& s,
bool remove)
{
Option<int> status = s.status().get();
if (remove) {
bool force = !status.isSome() || status.get() != 0;
return docker.rm(containerName, force)
.repair([=](const Future<Nothing>& future) {
LOG(ERROR) << "Unable to remove Docker container '"
<< containerName + "': " << future.failure();
return Nothing();
});
}
return checkError(cmd, s);
}
Future<Nothing> Docker::kill(
const string& containerName,
int signal) const
{
vector<string> argv;
argv.push_back(path);
argv.push_back("-H");
argv.push_back(socket);
argv.push_back("kill");
argv.push_back("--signal=" + stringify(signal));
argv.push_back(containerName);
const string cmd = strings::join(" ", argv);
VLOG(1) << "Running " << cmd;
Try<Subprocess> s = subprocess(
path,
argv,
Subprocess::PATH(os::DEV_NULL),
Subprocess::PATH(os::DEV_NULL),
Subprocess::PIPE(),
nullptr,
None(),
None(),
createParentHooks());
if (s.isError()) {
return Failure("Failed to create subprocess '" + cmd + "': " + s.error());
}
return checkError(cmd, s.get());
}
Future<Nothing> Docker::rm(
const string& containerName,
bool force) const
{
// The `-v` flag removes Docker volumes that may be present.
vector<string> argv;
argv.push_back(path);
argv.push_back("-H");
argv.push_back(socket);
argv.push_back("rm");
if (force) {
argv.push_back("-f");
}
argv.push_back("-v");
argv.push_back(containerName);
const string cmd = strings::join(" ", argv);
VLOG(1) << "Running " << cmd;
Try<Subprocess> s = subprocess(
path,
argv,
Subprocess::PATH(os::DEV_NULL),
Subprocess::PATH(os::DEV_NULL),
Subprocess::PIPE(),
nullptr,
None(),
None(),
createParentHooks());
if (s.isError()) {
return Failure("Failed to create subprocess '" + cmd + "': " + s.error());
}
return checkError(cmd, s.get());
}
Future<Docker::Container> Docker::inspect(
const string& containerName,
const Option<Duration>& retryInterval) const
{
Owned<Promise<Docker::Container>> promise(new Promise<Docker::Container>());
// Holds a callback used for cleanup in case this call to 'docker inspect' is
// discarded, and a mutex to control access to the callback.
auto callback = std::make_shared<pair<lambda::function<void()>, mutex>>();
vector<string> argv;
argv.push_back(path);
argv.push_back("-H");
argv.push_back(socket);
argv.push_back("inspect");
argv.push_back("--type=container");
argv.push_back(containerName);
_inspect(argv, promise, retryInterval, callback);
return promise->future()
.onDiscard([callback]() {
synchronized (callback->second) {
callback->first();
}
});
}
void Docker::_inspect(
const vector<string>& argv,
const Owned<Promise<Docker::Container>>& promise,
const Option<Duration>& retryInterval,
shared_ptr<pair<lambda::function<void()>, mutex>> callback)
{
if (promise->future().hasDiscard()) {
return;
}
const string cmd = strings::join(" ", argv);
VLOG(1) << "Running " << cmd;
Try<Subprocess> s = subprocess(
argv[0],
argv,
Subprocess::PATH(os::DEV_NULL),
Subprocess::PIPE(),
Subprocess::PIPE(),
nullptr,
None(),
None(),
createParentHooks());
if (s.isError()) {
promise->fail("Failed to create subprocess '" + cmd + "': " + s.error());
return;
}
// Set the `onDiscard` callback which will clean up the subprocess if the
// caller discards the `Future` that we returned.
synchronized (callback->second) {
// It's possible that the caller has discarded their future while we were
// creating a new subprocess, so we clean up here if necessary.
if (promise->future().hasDiscard()) {
commandDiscarded(s.get(), cmd);
return;
}
callback->first = [promise, s, cmd]() {
promise->discard();
CHECK_SOME(s);
commandDiscarded(s.get(), cmd);
};
}
// Start reading from stdout so writing to the pipe won't block
// to handle cases where the output is larger than the pipe
// capacity.
const Future<string> output = io::read(s->out().get());
s->status()
.onAny([=]() {
__inspect(argv, promise, retryInterval, output, s.get(), callback);
});
}
void Docker::__inspect(
const vector<string>& argv,
const Owned<Promise<Docker::Container>>& promise,
const Option<Duration>& retryInterval,
Future<string> output,
const Subprocess& s,
shared_ptr<pair<lambda::function<void()>, mutex>> callback)
{
if (promise->future().hasDiscard()) {
return;
}
// Check the exit status of 'docker inspect'.
CHECK_READY(s.status());
Option<int> status = s.status().get();
const string cmd = strings::join(" ", argv);
if (!status.isSome()) {
promise->fail("No status found from '" + cmd + "'");
} else if (status.get() != 0) {
output.discard();
if (retryInterval.isSome()) {
VLOG(1) << "Retrying inspect with non-zero status code. cmd: '"
<< cmd << "', interval: " << stringify(retryInterval.get());
Clock::timer(retryInterval.get(),
[=]() { _inspect(argv, promise, retryInterval, callback); });
return;
}
CHECK_SOME(s.err());
io::read(s.err().get())
.then(lambda::bind(
failure<Nothing>,
cmd,
status.get(),
lambda::_1))
.onAny([=](const Future<Nothing>& future) {
CHECK_FAILED(future);
promise->fail(future.failure());
});
return;
}
// Read to EOF.
CHECK_SOME(s.out());
output
.onAny([=](const Future<string>& output) {
___inspect(argv, promise, retryInterval, output, callback);
});
}
void Docker::___inspect(
const vector<string>& argv,
const Owned<Promise<Docker::Container>>& promise,
const Option<Duration>& retryInterval,
const Future<string>& output,
shared_ptr<pair<lambda::function<void()>, mutex>> callback)
{
if (promise->future().hasDiscard()) {
return;
}
if (!output.isReady()) {
promise->fail(output.isFailed() ? output.failure() : "future discarded");
return;
}
Try<Docker::Container> container = Docker::Container::create(
output.get());
if (container.isError()) {
promise->fail("Unable to create container: " + container.error());
return;
}
const string cmd = strings::join(" ", argv);
if (retryInterval.isSome() && !container->started) {
VLOG(1) << "Retrying inspect since container not yet started. cmd: '"
<< cmd << "', interval: " << stringify(retryInterval.get());
Clock::timer(retryInterval.get(),
[=]() { _inspect(argv, promise, retryInterval, callback); });
return;
}
promise->set(container.get());
}
Future<vector<Docker::Container>> Docker::ps(
bool all,
const Option<string>& prefix) const
{
vector<string> argv;
argv.push_back(path);
argv.push_back("-H");
argv.push_back(socket);
argv.push_back("ps");
if (all) {
argv.push_back("-a");
}
const string cmd = strings::join(" ", argv);
VLOG(1) << "Running " << cmd;
Try<Subprocess> s = subprocess(
cmd,
Subprocess::PATH(os::DEV_NULL),
Subprocess::PIPE(),
Subprocess::PIPE(),
None(),
None(),
createParentHooks());
if (s.isError()) {
return Failure("Failed to create subprocess '" + cmd + "': " + s.error());
}
// Start reading from stdout so writing to the pipe won't block
// to handle cases where the output is larger than the pipe
// capacity.
const Future<string>& output = io::read(s->out().get());
return s->status()
.then(lambda::bind(&Docker::_ps, *this, cmd, s.get(), prefix, output));
}
Future<vector<Docker::Container>> Docker::_ps(
const Docker& docker,
const string& cmd,
const Subprocess& s,
const Option<string>& prefix,
Future<string> output)
{
Option<int> status = s.status().get();
if (!status.isSome()) {
output.discard();
return Failure("No status found from '" + cmd + "'");
} else if (status.get() != 0) {
output.discard();
CHECK_SOME(s.err());
return io::read(s.err().get())
.then(lambda::bind(
failure<vector<Docker::Container>>,
cmd,
status.get(),
lambda::_1));
}
// Read to EOF.
return output.then(lambda::bind(&Docker::__ps, docker, prefix, lambda::_1));
}
Future<vector<Docker::Container>> Docker::__ps(
const Docker& docker,
const Option<string>& prefix,
const string& output)
{
Owned<vector<string>> lines(new vector<string>());
*lines = strings::tokenize(output, "\n");
// Skip the header.
CHECK(!lines->empty());
lines->erase(lines->begin());
Owned<vector<Docker::Container>> containers(new vector<Docker::Container>());
Owned<Promise<vector<Docker::Container>>> promise(
new Promise<vector<Docker::Container>>());
// Limit number of parallel calls to docker inspect at once to prevent
// reaching system's open file descriptor limit.
inspectBatches(containers, lines, promise, docker, prefix);
return promise->future();
}
// TODO(chenlily): Generalize functionality into a concurrency limiter
// within libprocess.
void Docker::inspectBatches(
Owned<vector<Docker::Container>> containers,
Owned<vector<string>> lines,
Owned<Promise<vector<Docker::Container>>> promise,
const Docker& docker,
const Option<string>& prefix)
{
vector<Future<Docker::Container>> batch =
createInspectBatch(lines, docker, prefix);
collect(batch).onAny([=](const Future<vector<Docker::Container>>& c) {
if (c.isReady()) {
foreach (const Docker::Container& container, c.get()) {
containers->push_back(container);
}
if (lines->empty()) {
promise->set(*containers);
}
else {
inspectBatches(containers, lines, promise, docker, prefix);
}
} else {
if (c.isFailed()) {
promise->fail("Docker ps batch failed " + c.failure());
}
else {
promise->fail("Docker ps batch discarded");
}
}
});
}
vector<Future<Docker::Container>> Docker::createInspectBatch(
Owned<vector<string>> lines,
const Docker& docker,
const Option<string>& prefix)
{
vector<Future<Docker::Container>> batch;
while (!lines->empty() && batch.size() < DOCKER_PS_MAX_INSPECT_CALLS) {
string line = lines->back();
lines->pop_back();
// Inspect the containers that we are interested in depending on
// whether or not a 'prefix' was specified.
vector<string> columns = strings::split(strings::trim(line), " ");
// We expect the name column to be the last column from ps.
string name = columns[columns.size() - 1];
if (prefix.isNone() || strings::startsWith(name, prefix.get())) {
batch.push_back(docker.inspect(name));
}
}
return batch;
}
Future<Docker::Image> Docker::pull(
const string& directory,
const string& image,
bool force) const
{
vector<string> argv;
string dockerImage = image;
// Check if the specified image has a tag. Also split on "/" in case
// the user specified a registry server (ie: localhost:5000/image)
// to get the actual image name. If no tag was given we add a
// 'latest' tag to avoid pulling down the repository.
vector<string> parts = strings::split(image, "/");
if (!strings::contains(parts.back(), ":")) {
dockerImage += ":latest";
}
if (force) {
// Skip inspect and docker pull the image.
return Docker::__pull(*this, directory, image, path, socket, config);
}
argv.push_back(path);
argv.push_back("-H");
argv.push_back(socket);
argv.push_back("inspect");
argv.push_back(dockerImage);
string cmd = strings::join(" ", argv);
VLOG(1) << "Running " << cmd;
Try<Subprocess> s = subprocess(
path,
argv,
Subprocess::PATH(os::DEV_NULL),
Subprocess::PIPE(),
Subprocess::PIPE(),
nullptr,
None(),
None(),
createParentHooks());
if (s.isError()) {
return Failure("Failed to create subprocess '" + cmd + "': " + s.error());
}
// Start reading from stdout so writing to the pipe won't block
// to handle cases where the output is larger than the pipe
// capacity.
const Future<string> output = io::read(s->out().get());
// We assume docker inspect to exit quickly and do not need to be
// discarded.
return s->status()
.then(lambda::bind(
&Docker::_pull,
*this,
s.get(),
directory,
dockerImage,
path,
socket,
config,
output))
.onDiscard(lambda::bind(&commandDiscarded, s.get(), cmd));
}
Future<Docker::Image> Docker::_pull(
const Docker& docker,
const Subprocess& s,
const string& directory,
const string& image,
const string& path,
const string& socket,
const Option<JSON::Object>& config,
Future<string> output)
{
Option<int> status = s.status().get();
if (status.isSome() && status.get() == 0) {
return output
.then(lambda::bind(&Docker::____pull, lambda::_1));
}
output.discard();
return Docker::__pull(docker, directory, image, path, socket, config);
}
Future<Docker::Image> Docker::__pull(
const Docker& docker,
const string& directory,
const string& image,
const string& path,
const string& socket,
const Option<JSON::Object>& config)
{
vector<string> argv;
argv.push_back(path);
argv.push_back("-H");
argv.push_back(socket);
argv.push_back("pull");
argv.push_back(image);
string cmd = strings::join(" ", argv);
VLOG(1) << "Running " << cmd;
// Set the HOME path where docker config file locates.
Option<string> home;
if (config.isSome()) {
Try<string> _home = os::mkdtemp();
if (_home.isError()) {
return Failure("Failed to create temporary directory for docker config"
"file: " + _home.error());
}
home = _home.get();
Result<JSON::Object> auths = config->find<JSON::Object>("auths");
if (auths.isError()) {
return Failure("Failed to find 'auths' in docker config file: " +
auths.error());
}
const string path = auths.isSome()
? path::join(home.get(), ".docker")
: home.get();
Try<Nothing> mkdir = os::mkdir(path);
if (mkdir.isError()) {
return Failure("Failed to create path '" + path + "': " + mkdir.error());
}
const string file = path::join(path, auths.isSome()
? "config.json"
: ".dockercfg");
Try<Nothing> write = os::write(file, stringify(config.get()));
if (write.isError()) {
return Failure("Failed to write docker config file to '" +
file + "': " + write.error());
}
}
// Currently the Docker CLI picks up .docker/config.json (old
// .dockercfg by looking for the config file in the $HOME
// directory. The docker config file can either be specified by
// the agent flag '--docker_config', or by one of the URIs
// provided which is a docker config file we want docker to be
// able to pick it up from the sandbox directory where we store
// all the URI downloads.
//
// NOTE: On Windows, Docker users $USERPROFILE instead of $HOME.
// See MESOS-8619 for more details.
//
// TODO(gilbert): Deprecate the fetching docker config file
// specified as URI method on 0.30.0 release.
#ifdef __WINDOWS__
const std::string HOME = "USERPROFILE";
#else
const std::string HOME = "HOME";
#endif // __WINDOWS__
map<string, string> environment = os::environment();
environment[HOME] = directory;
bool configExisted =
os::exists(path::join(directory, ".docker", "config.json")) ||
os::exists(path::join(directory, ".dockercfg"));
// We always set the sandbox as the 'HOME' directory, unless
// there is no docker config file downloaded in the sandbox
// and another docker config file is specified using the
// '--docker_config' agent flag.
if (!configExisted && home.isSome()) {
environment[HOME] = home.get();
}
Try<Subprocess> s_ = subprocess(
path,
argv,
Subprocess::PATH(os::DEV_NULL),
Subprocess::PATH(os::DEV_NULL),
Subprocess::PIPE(),
nullptr,
environment,
None(),
createParentHooks());
if (s_.isError()) {
return Failure("Failed to execute '" + cmd + "': " + s_.error());
}
// Docker pull can run for a long time due to large images, so
// we allow the future to be discarded and it will kill the pull
// process.
return s_->status()
.then(lambda::bind(
&Docker::___pull,
docker,
s_.get(),
cmd,
directory,
image))
.onDiscard(lambda::bind(&commandDiscarded, s_.get(), cmd))
.onAny([home]() {
if (home.isSome()) {
Try<Nothing> rmdir = os::rmdir(home.get());
if (rmdir.isError()) {
LOG(WARNING) << "Failed to remove docker config file temporary"
<< "'HOME' directory '" << home.get() << "': "
<< rmdir.error();
}
}
});
}
Future<Docker::Image> Docker::___pull(
const Docker& docker,
const Subprocess& s,
const string& cmd,
const string& directory,
const string& image)
{
Option<int> status = s.status().get();
if (!status.isSome()) {
return Failure("No status found from '" + cmd + "'");
} else if (status.get() != 0) {
return io::read(s.err().get())
.then(lambda::bind(&failure<Image>, cmd, status.get(), lambda::_1));
}
// We re-invoke Docker::pull in order to now do an 'inspect' since
// the image should be present (see Docker::pull).
// TODO(benh): Factor out inspect code from Docker::pull to be
// reused rather than this (potentially infinite) recursive call.
return docker.pull(directory, image);
}
Future<Docker::Image> Docker::____pull(
const string& output)
{
Try<JSON::Array> parse = JSON::parse<JSON::Array>(output);
if (parse.isError()) {
return Failure("Failed to parse JSON: " + parse.error());
}
JSON::Array array = parse.get();
// Only return if only one image identified with name.
if (array.values.size() == 1) {
CHECK(array.values.front().is<JSON::Object>());
Try<Docker::Image> image =
Docker::Image::create(array.values.front().as<JSON::Object>());
if (image.isError()) {
return Failure("Unable to create image: " + image.error());
}
return image.get();
}
// TODO(tnachen): Handle the case where the short image ID was
// not sufficiently unique and 'array.values.size() > 1'.
return Failure("Failed to find image");
}