| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include <stdio.h> |
| #include <stdlib.h> |
| |
| #include <list> |
| #include <map> |
| #include <string> |
| #include <vector> |
| |
| #include <process/address.hpp> |
| #include <process/after.hpp> |
| #include <process/clock.hpp> |
| #include <process/collect.hpp> |
| #include <process/defer.hpp> |
| #include <process/delay.hpp> |
| #include <process/dispatch.hpp> |
| #include <process/future.hpp> |
| #include <process/http.hpp> |
| #include <process/io.hpp> |
| #include <process/loop.hpp> |
| #include <process/owned.hpp> |
| #include <process/process.hpp> |
| #include <process/reap.hpp> |
| #include <process/shared.hpp> |
| #include <process/socket.hpp> |
| #include <process/subprocess.hpp> |
| |
| #include <stout/hashmap.hpp> |
| #include <stout/option.hpp> |
| #include <stout/os.hpp> |
| #include <stout/path.hpp> |
| #include <stout/recordio.hpp> |
| |
| #include <stout/os/constants.hpp> |
| |
| #ifndef __WINDOWS__ |
| #include <stout/posix/os.hpp> |
| #endif // __WINDOWS__ |
| |
| #include <mesos/http.hpp> |
| #include <mesos/type_utils.hpp> |
| |
| #include <mesos/agent/agent.hpp> |
| |
| #include <mesos/slave/containerizer.hpp> |
| #include <mesos/slave/container_logger.hpp> |
| |
| #include "common/http.hpp" |
| #include "common/recordio.hpp" |
| #include "common/status_utils.hpp" |
| |
| #ifdef __linux__ |
| #include "linux/systemd.hpp" |
| #endif // __linux__ |
| |
| #include "slave/flags.hpp" |
| #include "slave/state.hpp" |
| |
| #include "slave/containerizer/mesos/paths.hpp" |
| |
| #include "slave/containerizer/mesos/io/switchboard.hpp" |
| |
| namespace http = process::http; |
| |
| #ifndef __WINDOWS__ |
| namespace unix = process::network::unix; |
| #endif // __WINDOWS__ |
| |
| using namespace mesos::internal::slave::containerizer::paths; |
| |
| using std::list; |
| using std::map; |
| using std::string; |
| using std::vector; |
| |
| using process::after; |
| using process::await; |
| using process::Break; |
| using process::Continue; |
| using process::ControlFlow; |
| using process::Clock; |
| using process::ErrnoFailure; |
| using process::Failure; |
| using process::Future; |
| using process::loop; |
| using process::Owned; |
| using process::PID; |
| using process::Process; |
| using process::Promise; |
| using process::Shared; |
| using process::Subprocess; |
| |
| using process::network::internal::SocketImpl; |
| |
| using mesos::slave::ContainerConfig; |
| using mesos::slave::ContainerClass; |
| using mesos::slave::ContainerLaunchInfo; |
| using mesos::slave::ContainerLimitation; |
| using mesos::slave::ContainerLogger; |
| using mesos::slave::ContainerIO; |
| using mesos::slave::ContainerState; |
| using mesos::slave::Isolator; |
| |
| namespace mesos { |
| namespace internal { |
| namespace slave { |
| |
| Try<IOSwitchboard*> IOSwitchboard::create( |
| const Flags& flags, |
| bool local) |
| { |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| if (logger.isError()) { |
| return Error("Cannot create container logger: " + logger.error()); |
| } |
| |
| return new IOSwitchboard( |
| flags, |
| local, |
| Owned<ContainerLogger>(logger.get())); |
| } |
| |
| |
| IOSwitchboard::IOSwitchboard( |
| const Flags& _flags, |
| bool _local, |
| Owned<ContainerLogger> _logger) |
| : flags(_flags), |
| local(_local), |
| logger(_logger) {} |
| |
| |
| IOSwitchboard::~IOSwitchboard() {} |
| |
| |
| bool IOSwitchboard::supportsNesting() |
| { |
| return true; |
| } |
| |
| |
| bool IOSwitchboard::supportsStandalone() |
| { |
| return true; |
| } |
| |
| |
| Future<Nothing> IOSwitchboard::recover( |
| const vector<ContainerState>& states, |
| const hashset<ContainerID>& orphans) |
| { |
| #ifdef __WINDOWS__ |
| return Nothing(); |
| #else |
| if (local) { |
| return Nothing(); |
| } |
| |
| // Recover any active container's io switchboard info. |
| // |
| // NOTE: If a new agent is started with io switchboard server mode |
| // disabled, we will still recover the io switchboard info for |
| // containers previously launched by an agent with server mode enabled. |
| foreach (const ContainerState& state, states) { |
| const ContainerID& containerId = state.container_id(); |
| |
| const string path = getContainerIOSwitchboardPath( |
| flags.runtime_dir, containerId); |
| |
| // If we don't have a checkpoint directory created for this |
| // container's io switchboard, there is nothing to recover. This |
| // can only happen for containers that were launched with `DEFAULT` |
| // ContainerClass *and* no `TTYInfo` set. |
| if (!os::exists(path)) { |
| continue; |
| } |
| |
| Result<pid_t> pid = getContainerIOSwitchboardPid( |
| flags.runtime_dir, containerId); |
| |
| // For active containers that have an io switchboard directory, |
| // we should *always* have a valid pid file. If we don't that is a |
| // an error and we should fail appropriately. |
| if (!pid.isSome()) { |
| return Failure("Failed to get I/O switchboard server pid for" |
| " '" + stringify(containerId) + "':" |
| " " + (pid.isError() ? |
| pid.error() : |
| "pid file does not exist")); |
| } |
| |
| infos[containerId] = Owned<Info>(new Info( |
| pid.get(), |
| process::reap(pid.get()).onAny(defer( |
| PID<IOSwitchboard>(this), |
| &IOSwitchboard::reaped, |
| containerId, |
| lambda::_1)))); |
| } |
| |
| // Recover the io switchboards from any orphaned containers. |
| foreach (const ContainerID& orphan, orphans) { |
| const string path = getContainerIOSwitchboardPath( |
| flags.runtime_dir, orphan); |
| |
| // If we don't have a checkpoint directory created for this |
| // container's io switchboard, there is nothing to recover. |
| if (!os::exists(path)) { |
| continue; |
| } |
| |
| Result<pid_t> pid = getContainerIOSwitchboardPid( |
| flags.runtime_dir, orphan); |
| |
| // If we were able to retrieve the checkpointed pid, we simply |
| // populate our info struct and rely on the containerizer to |
| // destroy the orphaned container and call `cleanup()` on us later. |
| if (pid.isSome()) { |
| infos[orphan] = Owned<Info>(new Info( |
| pid.get(), |
| process::reap(pid.get()).onAny(defer( |
| PID<IOSwitchboard>(this), |
| &IOSwitchboard::reaped, |
| orphan, |
| lambda::_1)))); |
| } else { |
| // If we were not able to retrieve the checkpointed pid, we |
| // still need to populate our info struct (but with a pid value |
| // of `None()`). This way when `cleanup()` is called, we still |
| // do whatever cleanup we can (we just don't wait for the pid |
| // to be reaped -- we do it immediately). |
| // |
| // We could enter this case under 4 conditions: |
| // |
| // (1) The io switchboard we are recovering was launched, but |
| // the agent died before checkpointing its pid. |
| // (2) The io switchboard pid file was removed. |
| // (3) There was an error reading the io switchbaord pid file. |
| // (4) The io switchboard pid file was corrupted. |
| // |
| // We log an error in cases (3) and (4). |
| infos[orphan] = Owned<Info>(new Info( |
| None(), |
| Future<Option<int>>(None()))); |
| |
| if (pid.isError()) { |
| LOG(ERROR) << "Error retrieving the 'IOSwitchboard' pid file" |
| " for orphan '" << orphan << "': " << pid.error(); |
| } |
| } |
| } |
| |
| return Nothing(); |
| #endif // __WINDOWS__ |
| } |
| |
| |
| Future<Option<ContainerLaunchInfo>> IOSwitchboard::prepare( |
| const ContainerID& containerId, |
| const ContainerConfig& containerConfig) |
| { |
| // In local mode, the container will inherit agent's stdio. |
| if (local) { |
| containerIOs[containerId] = ContainerIO(); |
| return None(); |
| } |
| |
| // TODO(jieyu): Currently, if the agent fails over after the |
| // executor is launched, but before its nested containers are |
| // launched, the nested containers launched later might not have |
| // access to the root parent container's ExecutorInfo (i.e., |
| // 'containerConfig.executor_info()' will be empty). |
| return logger->prepare(containerId, containerConfig) |
| .then(defer( |
| PID<IOSwitchboard>(this), |
| &IOSwitchboard::_prepare, |
| containerId, |
| containerConfig, |
| lambda::_1)); |
| } |
| |
| |
| Future<Option<ContainerLaunchInfo>> IOSwitchboard::_prepare( |
| const ContainerID& containerId, |
| const ContainerConfig& containerConfig, |
| const ContainerIO& loggerIO) |
| { |
| bool requiresServer = IOSwitchboard::requiresServer(containerConfig); |
| |
| // On windows, we do not yet support running an io switchboard |
| // server, so we must error out if it is required. |
| #ifdef __WINDOWS__ |
| if (requiresServer) { |
| return Failure( |
| "IO Switchboard server is not supported on windows"); |
| } |
| #endif |
| |
| LOG(INFO) << "Container logger module finished preparing container " |
| << containerId << "; IOSwitchboard server is " |
| << (requiresServer ? "" : "not") << " required"; |
| |
| bool hasTTY = containerConfig.has_container_info() && |
| containerConfig.container_info().has_tty_info(); |
| |
| if (!requiresServer) { |
| CHECK(!containerIOs.contains(containerId)); |
| containerIOs[containerId] = loggerIO; |
| |
| return ContainerLaunchInfo(); |
| } |
| |
| #ifdef __WINDOWS__ |
| // NOTE: On Windows, both return values of |
| // `IOSwitchboard::requiresServer(containerConfig)` are checked and will |
| // return before reaching here. |
| UNREACHABLE(); |
| #else |
| // First make sure that we haven't already spawned an io |
| // switchboard server for this container. |
| if (infos.contains(containerId)) { |
| return Failure("Already prepared io switchboard server for container" |
| " '" + stringify(containerId) + "'"); |
| } |
| |
| // We need this so we can return the |
| // `tty_slave_path` if there is one. |
| ContainerLaunchInfo launchInfo; |
| |
| // We assign this variable to an entry in the `containerIOs` hashmap |
| // at the bottom of this function. We declare it here so we can |
| // populate it throughout this function and only store it back to |
| // the hashmap once we know this function has succeeded. |
| ContainerIO containerIO; |
| |
| // Manually construct pipes instead of using `Subprocess::PIPE` |
| // so that the ownership of the FDs is properly represented. The |
| // `Subprocess` spawned below owns one end of each pipe and will |
| // be solely responsible for closing that end. The ownership of |
| // the other end will be passed to the caller of this function |
| // and eventually passed to the container being launched. |
| int stdinToFd = -1; |
| int stdoutFromFd = -1; |
| int stderrFromFd = -1; |
| |
| // A list of file descriptors we've opened so far. |
| hashset<int> openedFds = {}; |
| |
| // A list of file descriptors that will be passed to the I/O |
| // switchboard. We need to close those file descriptors once the |
| // I/O switchboard server is forked. |
| hashset<int> ioSwitchboardFds = {}; |
| |
| // Helper for closing a set of file descriptors. |
| auto close = [](const hashset<int>& fds) { |
| foreach (int fd, fds) { |
| os::close(fd); |
| } |
| }; |
| |
| // Setup a pseudo terminal for the container. |
| if (hasTTY) { |
| // TODO(jieyu): Consider moving all TTY related method to stout. |
| // For instance, 'stout/posix/tty.hpp'. |
| |
| // Set flag 'O_NOCTTY' so that the terminal device will not become |
| // the controlling terminal for the process. |
| int master = posix_openpt(O_RDWR | O_NOCTTY | O_CLOEXEC); |
| if (master == -1) { |
| return Failure("Failed to open a master pseudo terminal"); |
| } |
| |
| openedFds.insert(master); |
| |
| Try<string> slavePath = os::ptsname(master); |
| if (slavePath.isError()) { |
| close(openedFds); |
| return Failure("Failed to get the slave pseudo terminal path: " + |
| slavePath.error()); |
| } |
| |
| // Unlock the slave end of the pseudo terminal. |
| if (unlockpt(master) != 0) { |
| close(openedFds); |
| return ErrnoFailure("Failed to unlock the slave pseudo terminal"); |
| } |
| |
| // Set proper permission and ownership for the device. |
| if (grantpt(master) != 0) { |
| close(openedFds); |
| return ErrnoFailure("Failed to grant the slave pseudo terminal"); |
| } |
| |
| if (containerConfig.has_user()) { |
| Try<Nothing> chown = os::chown( |
| containerConfig.user(), |
| slavePath.get(), |
| false); |
| |
| if (chown.isError()) { |
| close(openedFds); |
| return Failure("Failed to chown the slave pseudo terminal: " + |
| chown.error()); |
| } |
| } |
| |
| // Open the slave end of the pseudo terminal. The opened file |
| // descriptor will be dup'ed to stdin/out/err of the container. |
| Try<int> slave = os::open(slavePath.get(), O_RDWR | O_NOCTTY | O_CLOEXEC); |
| if (slave.isError()) { |
| return Failure("Failed to open the slave pseudo terminal: " + |
| slave.error()); |
| } |
| |
| openedFds.insert(slave.get()); |
| |
| LOG(INFO) << "Allocated pseudo terminal '" << slavePath.get() |
| << "' for container " << containerId; |
| |
| stdinToFd = master; |
| stdoutFromFd = master; |
| stderrFromFd = master; |
| |
| containerIO.in = ContainerIO::IO::FD(slave.get()); |
| containerIO.out = containerIO.in; |
| containerIO.err = containerIO.in; |
| |
| launchInfo.set_tty_slave_path(slavePath.get()); |
| |
| // The command executor requires the `tty_slave_path` |
| // to also be passed as a command line argument. |
| if (containerConfig.has_task_info()) { |
| launchInfo.mutable_command()->add_arguments( |
| "--tty_slave_path=" + slavePath.get()); |
| } |
| } else { |
| Try<std::array<int_fd, 2>> infds_ = os::pipe(); |
| if (infds_.isError()) { |
| close(openedFds); |
| return Failure("Failed to create stdin pipe: " + infds_.error()); |
| } |
| |
| const std::array<int_fd, 2>& infds = infds_.get(); |
| |
| openedFds.insert(infds[0]); |
| openedFds.insert(infds[1]); |
| |
| Try<std::array<int_fd, 2>> outfds_ = os::pipe(); |
| if (outfds_.isError()) { |
| close(openedFds); |
| return Failure("Failed to create stdout pipe: " + outfds_.error()); |
| } |
| |
| const std::array<int_fd, 2>& outfds = outfds_.get(); |
| |
| openedFds.insert(outfds[0]); |
| openedFds.insert(outfds[1]); |
| |
| Try<std::array<int_fd, 2>> errfds_ = os::pipe(); |
| if (errfds_.isError()) { |
| close(openedFds); |
| return Failure("Failed to create stderr pipe: " + errfds_.error()); |
| } |
| |
| const std::array<int_fd, 2>& errfds = errfds_.get(); |
| |
| openedFds.insert(errfds[0]); |
| openedFds.insert(errfds[1]); |
| |
| stdinToFd = infds[1]; |
| stdoutFromFd = outfds[0]; |
| stderrFromFd = errfds[0]; |
| |
| containerIO.in = ContainerIO::IO::FD(infds[0]); |
| containerIO.out = ContainerIO::IO::FD(outfds[1]); |
| containerIO.err = ContainerIO::IO::FD(errfds[1]); |
| } |
| |
| // Make sure all file descriptors opened have CLOEXEC set. |
| foreach (int fd, openedFds) { |
| Try<Nothing> cloexec = os::cloexec(fd); |
| if (cloexec.isError()) { |
| close(openedFds); |
| return Failure("Failed to set cloexec: " + cloexec.error()); |
| } |
| } |
| |
| ioSwitchboardFds.insert(stdinToFd); |
| ioSwitchboardFds.insert(stdoutFromFd); |
| ioSwitchboardFds.insert(stderrFromFd); |
| |
| // Set up our flags to send to the io switchboard server process. |
| IOSwitchboardServer::Flags switchboardFlags; |
| switchboardFlags.tty = hasTTY; |
| switchboardFlags.stdin_to_fd = stdinToFd; |
| switchboardFlags.stdout_from_fd = stdoutFromFd; |
| switchboardFlags.stderr_from_fd = stderrFromFd; |
| switchboardFlags.stdout_to_fd = STDOUT_FILENO; |
| switchboardFlags.stderr_to_fd = STDERR_FILENO; |
| switchboardFlags.heartbeat_interval = flags.http_heartbeat_interval; |
| |
| if (containerConfig.container_class() == ContainerClass::DEBUG) { |
| switchboardFlags.wait_for_connection = true; |
| } else { |
| switchboardFlags.wait_for_connection = false; |
| } |
| |
| switchboardFlags.socket_path = path::join( |
| stringify(os::PATH_SEPARATOR), |
| "tmp", |
| "mesos-io-switchboard-" + id::UUID::random().toString()); |
| |
| // Just before launching our io switchboard server, we need to |
| // create a directory to hold checkpointed files related to the |
| // server. The existence of this directory indicates that we |
| // intended to launch an io switchboard server on behalf of a |
| // container. The lack of any expected files in this directroy |
| // during recovery/cleanup indicates that something went wrong and |
| // we need to take appropriate action. |
| string path = getContainerIOSwitchboardPath(flags.runtime_dir, containerId); |
| |
| Try<Nothing> mkdir = os::mkdir(path); |
| if (mkdir.isError()) { |
| return Failure("Error creating 'IOSwitchboard' checkpoint directory" |
| " for container '" + stringify(containerId) + "':" |
| " " + mkdir.error()); |
| } |
| |
| // Prepare the environment for the io switchboard server process. |
| // We inherit agent environment variables except for those |
| // LIBPROCESS or MESOS prefixed environment variables since io |
| // switchboard server process does not rely on those environment |
| // variables. |
| map<string, string> environment; |
| foreachpair (const string& key, const string& value, os::environment()) { |
| if (!strings::startsWith(key, "LIBPROCESS_") && |
| !strings::startsWith(key, "MESOS_")) { |
| environment.emplace(key, value); |
| } |
| } |
| |
| // TODO(jieyu): This is to make sure the libprocess of the io |
| // switchboard can properly initialize and find the IP. Since we |
| // don't need to use the TCP socket for communication, it's OK to |
| // use a local address. Consider disable TCP socket in libprocess if |
| // libprocess supports that. |
| environment.emplace("LIBPROCESS_IP", "127.0.0.1"); |
| |
| // TODO(jieyu): Consider making this configurable. |
| environment.emplace("LIBPROCESS_NUM_WORKER_THREADS", "8"); |
| |
| VLOG(1) << "Launching '" << IOSwitchboardServer::NAME << "' with flags '" |
| << switchboardFlags << "' for container " << containerId; |
| |
| // If we are on systemd, then extend the life of the process as we |
| // do with the executor. Any grandchildren's lives will also be |
| // extended. |
| vector<Subprocess::ParentHook> parentHooks; |
| |
| #ifdef __linux__ |
| if (systemd::enabled()) { |
| parentHooks.emplace_back(Subprocess::ParentHook( |
| &systemd::mesos::extendLifetime)); |
| } |
| #endif // __linux__ |
| |
| // Launch the io switchboard server process. |
| // We `dup()` the `stdout` and `stderr` passed to us by the |
| // container logger over the `stdout` and `stderr` of the io |
| // switchboard process itself. In this way, the io switchboard |
| // process simply needs to write to its own `stdout` and |
| // `stderr` in order to send output to the logger files. |
| Try<Subprocess> child = subprocess( |
| path::join(flags.launcher_dir, IOSwitchboardServer::NAME), |
| {IOSwitchboardServer::NAME}, |
| Subprocess::PATH(os::DEV_NULL), |
| loggerIO.out, |
| loggerIO.err, |
| &switchboardFlags, |
| environment, |
| None(), |
| parentHooks, |
| {Subprocess::ChildHook::SETSID()}, |
| {stdinToFd, stdoutFromFd, stderrFromFd}); |
| |
| if (child.isError()) { |
| close(openedFds); |
| return Failure("Failed to create io switchboard" |
| " server process: " + child.error()); |
| } |
| |
| LOG(INFO) << "Created I/O switchboard server (pid: " << child->pid() |
| << ") listening on socket file '" |
| << switchboardFlags.socket_path.get() |
| << "' for container " << containerId; |
| |
| close(ioSwitchboardFds); |
| |
| // We remove the already closed file descriptors from 'openedFds' so |
| // that we don't close multiple times if failures happen below. |
| foreach (int fd, ioSwitchboardFds) { |
| openedFds.erase(fd); |
| } |
| |
| // Now that the child has come up, we checkpoint the socket |
| // address we told it to bind to so we can access it later. |
| path = getContainerIOSwitchboardSocketPath(flags.runtime_dir, containerId); |
| |
| Try<Nothing> checkpointed = slave::state::checkpoint( |
| path, switchboardFlags.socket_path.get()); |
| |
| if (checkpointed.isError()) { |
| close(openedFds); |
| return Failure("Failed to checkpoint container's socket path to" |
| " '" + path + "': " + checkpointed.error()); |
| } |
| |
| // We also checkpoint the child's pid. |
| path = getContainerIOSwitchboardPidPath(flags.runtime_dir, containerId); |
| |
| checkpointed = slave::state::checkpoint(path, stringify(child->pid())); |
| |
| if (checkpointed.isError()) { |
| close(openedFds); |
| return Failure("Failed to checkpoint container's io switchboard pid to" |
| " '" + path + "': " + checkpointed.error()); |
| } |
| |
| // Build an info struct for this container. |
| infos[containerId] = Owned<Info>(new Info( |
| child->pid(), |
| process::reap(child->pid()).onAny(defer( |
| PID<IOSwitchboard>(this), |
| &IOSwitchboard::reaped, |
| containerId, |
| lambda::_1)))); |
| |
| // Populate the `containerIOs` hashmap. |
| containerIOs[containerId] = containerIO; |
| |
| return launchInfo; |
| #endif // __WINDOWS__ |
| } |
| |
| |
| Future<http::Connection> IOSwitchboard::connect( |
| const ContainerID& containerId) const |
| { |
| return dispatch(self(), [this, containerId]() { |
| return _connect(containerId); |
| }); |
| } |
| |
| |
| Future<http::Connection> IOSwitchboard::_connect( |
| const ContainerID& containerId) const |
| { |
| #ifdef __WINDOWS__ |
| return Failure("Not supported on Windows"); |
| #else |
| if (local) { |
| return Failure("Not supported in local mode"); |
| } |
| |
| if (!infos.contains(containerId)) { |
| return Failure("I/O switchboard server was disabled for this container"); |
| } |
| |
| // Get the io switchboard address from the `containerId`. |
| Result<unix::Address> address = getContainerIOSwitchboardAddress( |
| flags.runtime_dir, containerId); |
| |
| if (!address.isSome()) { |
| return Failure("Failed to get the io switchboard address" |
| ": " + (address.isError() ? address.error() : "Not found")); |
| } |
| |
| // Wait for the server to create the domain socket file. |
| return loop( |
| self(), |
| []() { |
| return after(Milliseconds(10)); |
| }, |
| [=](const Nothing&) -> ControlFlow<Nothing> { |
| if (infos.contains(containerId) && !os::exists(address->path())) { |
| return Continue(); |
| } |
| return Break(); |
| }) |
| .then(defer(self(), [=]() -> Future<http::Connection> { |
| if (!infos.contains(containerId)) { |
| return Failure("I/O switchboard has shutdown"); |
| } |
| |
| return http::connect(address.get(), http::Scheme::HTTP); |
| })); |
| #endif // __WINDOWS__ |
| } |
| |
| |
| Future<Option<ContainerIO>> IOSwitchboard::extractContainerIO( |
| const ContainerID& containerId) |
| { |
| return dispatch(self(), [this, containerId]() { |
| return _extractContainerIO(containerId); |
| }); |
| } |
| |
| |
| Future<Option<ContainerIO>> IOSwitchboard::_extractContainerIO( |
| const ContainerID& containerId) |
| { |
| if (!containerIOs.contains(containerId)) { |
| return None(); |
| } |
| |
| ContainerIO containerIO = containerIOs[containerId]; |
| containerIOs.erase(containerId); |
| |
| return containerIO; |
| } |
| |
| |
| Future<ContainerLimitation> IOSwitchboard::watch( |
| const ContainerID& containerId) |
| { |
| #ifdef __WINDOWS__ |
| return Future<ContainerLimitation>(); |
| #else |
| if (local) { |
| return Future<ContainerLimitation>(); |
| } |
| |
| // We ignore unknown containers here because legacy containers |
| // without an io switchboard directory will not have an info struct |
| // created for during recovery. Likewise, containers launched |
| // by a previous agent with io switchboard server mode disabled will |
| // not have info structs created for them either. In both cases |
| // there is nothing to watch, so we return an unsatisfiable future. |
| if (!infos.contains(containerId)) { |
| return Future<ContainerLimitation>(); |
| } |
| |
| return infos[containerId]->limitation.future(); |
| #endif // __WINDOWS__ |
| } |
| |
| |
| Future<Nothing> IOSwitchboard::cleanup( |
| const ContainerID& containerId) |
| { |
| #ifdef __WINDOWS__ |
| // Since we don't support spawning an io switchboard server on |
| // windows yet, there is nothing to wait for here. |
| return Nothing(); |
| #else |
| if (local) { |
| return Nothing(); |
| } |
| |
| // We ignore unknown containers here because legacy containers |
| // without an io switchboard directory will not have an info struct |
| // created for them during recovery. Likewise, containers launched |
| // by a previous agent with io switchboard server mode disabled will |
| // not have info structs created for them either. In both cases |
| // there is nothing to cleanup, so we simly return `Nothing()`. |
| if (!infos.contains(containerId)) { |
| return Nothing(); |
| } |
| |
| Option<pid_t> pid = infos[containerId]->pid; |
| Future<Option<int>> status = infos[containerId]->status; |
| |
| // If we have a pid, then we attempt to send it a SIGTERM to have it |
| // shutdown gracefully. This is best effort, as it's likely that the |
| // switchboard has already shutdown in the common case. |
| // |
| // NOTE: There is an unfortunate race condition here. If the io |
| // switchboard terminates and the pid is reused by some other |
| // process, we might be sending SIGTERM to a random process. This |
| // could be a problem under high load. |
| // |
| // TODO(jieyu): We give the I/O switchboard server a grace period to |
| // wait for the connection from the containerizer. This is for the |
| // case where the container itself is short lived (e.g., a DEBUG |
| // container does an 'ls' and exits). For that case, we still want |
| // the subsequent attach output call to get the output from that |
| // container. |
| // |
| // TODO(klueska): Send a message over the io switchboard server's |
| // domain socket instead of using a signal. |
| if (pid.isSome() && status.isPending()) { |
| Clock::timer(Seconds(5), [pid, status, containerId]() { |
| if (status.isPending()) { |
| LOG(INFO) << "Sending SIGTERM to I/O switchboard server (pid: " |
| << pid.get() << ") since container " << containerId |
| << " is being destroyed"; |
| |
| os::kill(pid.get(), SIGTERM); |
| |
| Clock::timer(Seconds(60), [pid, status, containerId]() { |
| if (status.isPending()) { |
| // If we are here, something really bad must have happened for I/O |
| // switchboard server to not exit after SIGTERM has been sent. We |
| // have seen this happen due to FD leak (see MESOS-9502). We do a |
| // SIGKILL here as a safeguard so that switchboard server forcefully |
| // exits and causes this cleanup feature to be completed, thus |
| // unblocking the container's cleanup. |
| LOG(ERROR) << "Sending SIGKILL to I/O switchboard server (pid: " |
| << pid.get() << ") for container " << containerId |
| << " since the I/O switchboard server did not terminate " |
| << "60 seconds after SIGTERM was sent to it"; |
| |
| os::kill(pid.get(), SIGKILL); |
| } |
| }); |
| } |
| }); |
| } |
| |
| // NOTE: We use 'await' here so that we can handle the FAILED and |
| // DISCARDED cases as well. |
| return await(vector<Future<Option<int>>>{status}).then( |
| defer(self(), [this, containerId]() -> Future<Nothing> { |
| // We need to call `_extractContainerIO` here in case the |
| // `IOSwitchboard` still holds a reference to the container's |
| // `ContainerIO` struct. We don't care about its value at this |
| // point. We just need to extract it out of the hashmap (if |
| // it's in there) so it can drop out of scope and all open |
| // file descriptors will be closed. |
| _extractContainerIO(containerId); |
| |
| // We only remove the 'containerId from our info struct once |
| // we are sure that the I/O switchboard has shutdown. If we |
| // removed it any earlier, attempts to connect to the I/O |
| // switchboard would fail. |
| // |
| // NOTE: One caveat of this approach is that this lambda will |
| // be invoked multiple times if `cleanup()` is called multiple |
| // times before the first instance of it is triggered. This is |
| // OK for now because the logic below has no side effects. If |
| // the logic below gets more complicated, we may need to |
| // revisit this approach. |
| infos.erase(containerId); |
| |
| // Best effort removal of the unix domain socket file created for |
| // this container's `IOSwitchboardServer`. If it hasn't been |
| // checkpointed yet, or the socket file itself hasn't been created, |
| // we simply continue without error. |
| // |
| // NOTE: As the I/O switchboard creates a unix domain socket using |
| // a provisional address before initialiazing and renaming it, we assume |
| // that the absence of the unix socket at the original address means |
| // that the the I/O switchboard has been terminated before renaming. |
| Result<unix::Address> address = getContainerIOSwitchboardAddress( |
| flags.runtime_dir, containerId); |
| |
| const string socketPath = address.isSome() |
| ? address->path() |
| : getContainerIOSwitchboardSocketProvisionalPath( |
| flags.runtime_dir, containerId); |
| |
| Try<Nothing> rm = os::rm(socketPath); |
| if (rm.isError()) { |
| LOG(ERROR) << "Failed to remove unix domain socket file" |
| << " '" << socketPath << "' for container" |
| << " '" << containerId << "': " << rm.error(); |
| } |
| |
| return Nothing(); |
| })); |
| #endif // __WINDOWS__ |
| } |
| |
| |
| bool IOSwitchboard::requiresServer(const ContainerConfig& containerConfig) |
| { |
| if (containerConfig.has_container_info() && |
| containerConfig.container_info().has_tty_info()) { |
| return true; |
| } |
| |
| if (containerConfig.has_container_class() && |
| containerConfig.container_class() == |
| mesos::slave::ContainerClass::DEBUG) { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| |
| #ifndef __WINDOWS__ |
| void IOSwitchboard::reaped( |
| const ContainerID& containerId, |
| const Future<Option<int>>& future) |
| { |
| // NOTE: If reaping of the server process failed, we simply |
| // return here because it is unknown to us whether we should |
| // destroy the container or not. |
| if (!future.isReady()) { |
| LOG(ERROR) << "Failed to reap the I/O switchboard server: " |
| << (future.isFailed() ? future.failure() : "discarded"); |
| return; |
| } |
| |
| const Option<int>& status = future.get(); |
| |
| // No need to do anything if the I/O switchboard server terminates |
| // normally, or its terminal status is unknown. Only initiate the |
| // destroy of the container if we know for sure that the I/O |
| // switchboard server terminates unexpectedly. |
| if (status.isNone()) { |
| LOG(INFO) << "I/O switchboard server process for container " |
| << containerId << " has terminated (status=N/A)"; |
| return; |
| } else if (WSUCCEEDED(status.get())) { |
| LOG(INFO) << "I/O switchboard server process for container " |
| << containerId << " has terminated (status=0)"; |
| return; |
| } |
| |
| // No need to proceed if the container has or is being destroyed. |
| if (!infos.contains(containerId)) { |
| return; |
| } |
| |
| ContainerLimitation limitation; |
| limitation.set_reason(TaskStatus::REASON_IO_SWITCHBOARD_EXITED); |
| limitation.set_message("'IOSwitchboard' " + WSTRINGIFY(status.get())); |
| |
| infos[containerId]->limitation.set(limitation); |
| |
| LOG(ERROR) << "Unexpected termination of I/O switchboard server: " |
| << limitation.message() << " for container " << containerId; |
| } |
| |
| |
| const char IOSwitchboardServer::NAME[] = "mesos-io-switchboard"; |
| |
| |
| class IOSwitchboardServerProcess : public Process<IOSwitchboardServerProcess> |
| { |
| public: |
| IOSwitchboardServerProcess( |
| bool _tty, |
| int _stdinToFd, |
| int _stdoutFromFd, |
| int _stdoutToFd, |
| int _stderrFromFd, |
| int _stderrToFd, |
| const unix::Socket& _socket, |
| bool waitForConnection, |
| Option<Duration> heartbeatInterval); |
| |
| void finalize() override; |
| |
| Future<Nothing> run(); |
| |
| Future<Nothing> unblock(); |
| |
| private: |
| // TODO(bmahler): Replace this with the common StreamingHttpConnection. |
| class HttpConnection |
| { |
| public: |
| HttpConnection( |
| const http::Pipe::Writer& _writer, |
| const ContentType& _contentType) |
| : writer(_writer), |
| contentType(_contentType) {} |
| |
| bool send(const agent::ProcessIO& message) |
| { |
| string record = serialize(contentType, message); |
| |
| return writer.write(::recordio::encode(record)); |
| } |
| |
| bool close() |
| { |
| return writer.close(); |
| } |
| |
| process::Future<Nothing> closed() const |
| { |
| return writer.readerClosed(); |
| } |
| |
| private: |
| http::Pipe::Writer writer; |
| ContentType contentType; |
| }; |
| |
| // Sit in a heartbeat loop forever. |
| void heartbeatLoop(); |
| |
| // Sit in an accept loop forever. |
| void acceptLoop(); |
| |
| // Parse the request and look for `ATTACH_CONTAINER_INPUT` and |
| // `ATTACH_CONTAINER_OUTPUT` calls. We call their corresponding |
| // handler functions once we have parsed them. We accept calls as |
| // both `APPLICATION_PROTOBUF` and `APPLICATION_JSON` and respond |
| // with the same format we receive them in. |
| Future<http::Response> handler(const http::Request& request); |
| |
| // Validate `ATTACH_CONTAINER_INPUT` calls. |
| // |
| // TODO(klueska): Move this to `src/slave/validation.hpp` and make |
| // the agent validate all the calls before forwarding them to the |
| // switchboard. |
| Option<Error> validate(const agent::Call::AttachContainerInput& call); |
| |
| // Handle acknowledgment for `ATTACH_CONTAINER_INPUT` call. |
| Future<http::Response> acknowledgeContainerInputResponse(); |
| |
| // Handle `ATTACH_CONTAINER_INPUT` calls. |
| Future<http::Response> attachContainerInput( |
| const Owned<recordio::Reader<agent::Call>>& reader); |
| |
| // Handle `ATTACH_CONTAINER_OUTPUT` calls. |
| Future<http::Response> attachContainerOutput( |
| ContentType acceptType, |
| Option<ContentType> messageAcceptType); |
| |
| // Asynchronously receive data as we read it from our |
| // `stdoutFromFd` and `stderrFromFd` file descriptors. |
| void outputHook( |
| const string& data, |
| const agent::ProcessIO::Data::Type& type); |
| |
| bool tty; |
| int stdinToFd; |
| int stdoutFromFd; |
| int stdoutToFd; |
| int stderrFromFd; |
| int stderrToFd; |
| unix::Socket socket; |
| bool waitForConnection; |
| Option<Duration> heartbeatInterval; |
| bool inputConnected; |
| // Each time the agent receives a response for `ATTACH_CONTAINER_INPUT` |
| // request it sends an acknowledgment. This counter is used to delay |
| // IOSwitchboard termination until all acknowledgments are received. |
| size_t numPendingAcknowledgments; |
| Future<unix::Socket> accept; |
| Promise<Nothing> promise; |
| Promise<Nothing> startRedirect; |
| // Set when both stdout and stderr redirects finish. |
| Promise<http::Response> redirectFinished; |
| // The following must be a `std::list` |
| // for proper erase semantics later on. |
| list<HttpConnection> outputConnections; |
| Option<Failure> failure; |
| }; |
| |
| |
| Try<Owned<IOSwitchboardServer>> IOSwitchboardServer::create( |
| bool tty, |
| int stdinToFd, |
| int stdoutFromFd, |
| int stdoutToFd, |
| int stderrFromFd, |
| int stderrToFd, |
| const string& socketPath, |
| bool waitForConnection, |
| Option<Duration> heartbeatInterval) |
| { |
| Try<unix::Socket> socket = unix::Socket::create(SocketImpl::Kind::POLL); |
| if (socket.isError()) { |
| return Error("Failed to create socket: " + socket.error()); |
| } |
| |
| // Agent connects to the switchboard once it sees a unix socket. However, |
| // the unix socket is not ready to accept connections until `listen()` has |
| // been called. Therefore we initialize a unix socket using a provisional path |
| // and rename it after `listen()` has been called. |
| const string socketProvisionalPath = |
| getContainerIOSwitchboardSocketProvisionalPath(socketPath); |
| |
| Try<unix::Address> address = unix::Address::create(socketProvisionalPath); |
| if (address.isError()) { |
| return Error("Failed to build address from '" + socketProvisionalPath + "':" |
| " " + address.error()); |
| } |
| |
| Try<unix::Address> bind = socket->bind(address.get()); |
| if (bind.isError()) { |
| return Error("Failed to bind to address '" + socketProvisionalPath + "':" |
| " " + bind.error()); |
| } |
| |
| Try<Nothing> listen = socket->listen(64); |
| if (listen.isError()) { |
| return Error("Failed to listen on socket at address" |
| " '" + socketProvisionalPath + "': " + listen.error()); |
| } |
| |
| Try<Nothing> renameSocket = os::rename(socketProvisionalPath, socketPath); |
| if (renameSocket.isError()) { |
| return Error("Failed to rename socket from '" + socketProvisionalPath + "'" |
| " to '" + socketPath + "': " + renameSocket.error()); |
| } |
| |
| return new IOSwitchboardServer( |
| tty, |
| stdinToFd, |
| stdoutFromFd, |
| stdoutToFd, |
| stderrFromFd, |
| stderrToFd, |
| socket.get(), |
| waitForConnection, |
| heartbeatInterval); |
| } |
| |
| |
| IOSwitchboardServer::IOSwitchboardServer( |
| bool tty, |
| int stdinToFd, |
| int stdoutFromFd, |
| int stdoutToFd, |
| int stderrFromFd, |
| int stderrToFd, |
| const unix::Socket& socket, |
| bool waitForConnection, |
| Option<Duration> heartbeatInterval) |
| : process(new IOSwitchboardServerProcess( |
| tty, |
| stdinToFd, |
| stdoutFromFd, |
| stdoutToFd, |
| stderrFromFd, |
| stderrToFd, |
| socket, |
| waitForConnection, |
| heartbeatInterval)) |
| { |
| spawn(process.get()); |
| } |
| |
| |
| IOSwitchboardServer::~IOSwitchboardServer() |
| { |
| terminate(process.get()); |
| process::wait(process.get()); |
| } |
| |
| |
| Future<Nothing> IOSwitchboardServer::run() |
| { |
| return dispatch(process.get(), &IOSwitchboardServerProcess::run); |
| } |
| |
| |
| Future<Nothing> IOSwitchboardServer::unblock() |
| { |
| return dispatch(process.get(), &IOSwitchboardServerProcess::unblock); |
| } |
| |
| |
| IOSwitchboardServerProcess::IOSwitchboardServerProcess( |
| bool _tty, |
| int _stdinToFd, |
| int _stdoutFromFd, |
| int _stdoutToFd, |
| int _stderrFromFd, |
| int _stderrToFd, |
| const unix::Socket& _socket, |
| bool _waitForConnection, |
| Option<Duration> _heartbeatInterval) |
| : tty(_tty), |
| stdinToFd(_stdinToFd), |
| stdoutFromFd(_stdoutFromFd), |
| stdoutToFd(_stdoutToFd), |
| stderrFromFd(_stderrFromFd), |
| stderrToFd(_stderrToFd), |
| socket(_socket), |
| waitForConnection(_waitForConnection), |
| heartbeatInterval(_heartbeatInterval), |
| inputConnected(false), |
| numPendingAcknowledgments(0) {} |
| |
| |
| Future<Nothing> IOSwitchboardServerProcess::run() |
| { |
| if (!waitForConnection) { |
| startRedirect.set(Nothing()); |
| } |
| |
| startRedirect.future() |
| .then(defer(self(), [this]() { |
| Future<Nothing> stdoutRedirect = process::io::redirect( |
| stdoutFromFd, |
| stdoutToFd, |
| process::io::BUFFERED_READ_SIZE, |
| {defer(self(), |
| &Self::outputHook, |
| lambda::_1, |
| agent::ProcessIO::Data::STDOUT)}); |
| |
| // NOTE: We don't need to redirect stderr if TTY is enabled. If |
| // TTY is enabled for the container, stdout and stderr for the |
| // container will be redirected to the slave end of the pseudo |
| // terminal device. Both stdout and stderr of the container will |
| // both coming out from the master end of the pseudo terminal. |
| Future<Nothing> stderrRedirect; |
| if (tty) { |
| stderrRedirect = Nothing(); |
| } else { |
| stderrRedirect = process::io::redirect( |
| stderrFromFd, |
| stderrToFd, |
| process::io::BUFFERED_READ_SIZE, |
| {defer(self(), |
| &Self::outputHook, |
| lambda::_1, |
| agent::ProcessIO::Data::STDERR)}); |
| } |
| |
| // Set the future once our IO redirects finish. On failure, |
| // fail the future. |
| // |
| // For now we simply assume that whenever both `stdoutRedirect` |
| // and `stderrRedirect` have completed while there is no input |
| // connected then it is OK to exit the switchboard process. |
| // We assume this because `stdoutRedirect` and `stderrRedirect` |
| // will only complete after both the read end of the `stdout` |
| // stream and the read end of the `stderr` stream have been |
| // drained. Since draining these `fds` represents having |
| // read everything possible from a container's `stdout` and |
| // `stderr` this is likely sufficient termination criteria. |
| // However, there's a non-zero chance that *some* containers may |
| // decide to close their `stdout` and `stderr` while expecting to |
| // continue reading from `stdin`. For now we don't support |
| // containers with this behavior and we will exit out of the |
| // switchboard process early. |
| // |
| // If our IO redirects are finished and there are pending |
| // acknowledgments for `ATTACH_CONTAINER_INPUT` requests, then |
| // we set `redirectFinished` promise which triggers a callback for |
| // `attachContainerInput()`. This callback returns a final `HTTP 200` |
| // response to the client, even if the client has not yet sent the EOF |
| // message. |
| // |
| // NOTE: We always call `terminate()` with `false` to ensure |
| // that our event queue is drained before actually terminating. |
| // Without this, it's possible that we might drop some data we |
| // are trying to write out over any open connections we have. |
| // |
| // TODO(klueska): Add support to asynchronously detect when |
| // `stdinToFd` has become invalid before deciding to terminate. |
| stdoutRedirect |
| .onFailed(defer(self(), [this](const string& message) { |
| failure = Failure("Failed redirecting stdout: " + message); |
| terminate(self(), false); |
| })) |
| .onDiscarded(defer(self(), [this]() { |
| failure = Failure("Redirecting stdout discarded"); |
| terminate(self(), false); |
| })); |
| |
| stderrRedirect |
| .onFailed(defer(self(), [this](const string& message) { |
| failure = Failure("Failed redirecting stderr: " + message); |
| terminate(self(), false); |
| })) |
| .onDiscarded(defer(self(), [this]() { |
| failure = Failure("Redirecting stderr discarded"); |
| terminate(self(), false); |
| })); |
| |
| collect(stdoutRedirect, stderrRedirect) |
| .then(defer(self(), [this]() { |
| if (numPendingAcknowledgments > 0) { |
| redirectFinished.set(http::OK()); |
| } else { |
| terminate(self(), false); |
| } |
| return Nothing(); |
| })); |
| |
| return Nothing(); |
| })); |
| |
| // If we have a heartbeat interval set, send a heartbeat to all of |
| // our outstanding output connections at the proper interval. |
| if (heartbeatInterval.isSome()) { |
| heartbeatLoop(); |
| } |
| |
| acceptLoop(); |
| |
| return promise.future(); |
| } |
| |
| |
| Future<Nothing> IOSwitchboardServerProcess::unblock() |
| { |
| startRedirect.set(Nothing()); |
| return Nothing(); |
| } |
| |
| |
| void IOSwitchboardServerProcess::finalize() |
| { |
| // Discard the server socket's `accept` future so that we do not |
| // maintain a reference to the socket, which would cause a leak. |
| accept.discard(); |
| |
| foreach (HttpConnection& connection, outputConnections) { |
| connection.close(); |
| |
| // It is possible that the read end of the pipe has not yet |
| // finished processing the data. We wait here for the reader |
| // to signal that is has finished reading. |
| connection.closed().await(); |
| } |
| |
| if (failure.isSome()) { |
| promise.fail(failure->message); |
| } else { |
| promise.set(Nothing()); |
| } |
| } |
| |
| |
| void IOSwitchboardServerProcess::heartbeatLoop() |
| { |
| CHECK(heartbeatInterval.isSome()); |
| |
| agent::ProcessIO message; |
| message.set_type(agent::ProcessIO::CONTROL); |
| message.mutable_control()->set_type( |
| agent::ProcessIO::Control::HEARTBEAT); |
| message.mutable_control() |
| ->mutable_heartbeat() |
| ->mutable_interval() |
| ->set_nanoseconds(heartbeatInterval->ns()); |
| |
| foreach (HttpConnection& connection, outputConnections) { |
| connection.send(message); |
| } |
| |
| // Dispatch back to ourselves after the `heartbeatInterval`. |
| delay(heartbeatInterval.get(), |
| self(), |
| &IOSwitchboardServerProcess::heartbeatLoop); |
| } |
| |
| |
| void IOSwitchboardServerProcess::acceptLoop() |
| { |
| // Store the server socket's `accept` future so that we can discard |
| // it during process finalization. Otherwise, we would maintain a |
| // reference to the socket, causing a leak. |
| accept = socket.accept() |
| .onAny(defer(self(), [this](const Future<unix::Socket>& socket) { |
| if (!socket.isReady()) { |
| failure = Failure("Failed trying to accept connection"); |
| terminate(self(), false); |
| return; |
| } |
| |
| // We intentionally ignore errors on the serve path, and assume |
| // that they will eventually be propagated back to the client in |
| // one form or another (e.g. a timeout on the client side). We |
| // explicitly *don't* want to kill the whole server though, just |
| // beause a single connection fails. |
| http::serve( |
| socket.get(), |
| defer(self(), &Self::handler, lambda::_1)); |
| |
| // Use `dispatch` to limit the size of the call stack. |
| dispatch(self(), &Self::acceptLoop); |
| })); |
| } |
| |
| |
| Future<http::Response> IOSwitchboardServerProcess::handler( |
| const http::Request& request) |
| { |
| CHECK_EQ("POST", request.method); |
| |
| if (request.url.path == "/acknowledge_container_input_response") { |
| return acknowledgeContainerInputResponse(); |
| } |
| |
| Option<string> contentType_ = request.headers.get("Content-Type"); |
| CHECK_SOME(contentType_); |
| |
| ContentType contentType; |
| if (contentType_.get() == APPLICATION_JSON) { |
| contentType = ContentType::JSON; |
| } else if (contentType_.get() == APPLICATION_PROTOBUF) { |
| contentType = ContentType::PROTOBUF; |
| } else if (contentType_.get() == APPLICATION_RECORDIO) { |
| contentType = ContentType::RECORDIO; |
| } else { |
| LOG(FATAL) << "Unexpected 'Content-Type' header: " << contentType_.get(); |
| } |
| |
| Option<ContentType> messageContentType; |
| Option<string> messageContentType_ = |
| request.headers.get(MESSAGE_CONTENT_TYPE); |
| |
| if (streamingMediaType(contentType)) { |
| if (messageContentType_.isNone()) { |
| return http::BadRequest( |
| "Expecting '" + stringify(MESSAGE_CONTENT_TYPE) + "' to be" + |
| " set for streaming requests"); |
| } |
| |
| if (messageContentType_.get() == APPLICATION_JSON) { |
| messageContentType = Option<ContentType>(ContentType::JSON); |
| } else if (messageContentType_.get() == APPLICATION_PROTOBUF) { |
| messageContentType = Option<ContentType>(ContentType::PROTOBUF); |
| } else { |
| return http::UnsupportedMediaType( |
| string("Expecting '") + MESSAGE_CONTENT_TYPE + "' of " + |
| APPLICATION_JSON + " or " + APPLICATION_PROTOBUF); |
| } |
| } else { |
| // The 'Message-Content-Type' header should not be set |
| // for non-streaming requests. |
| CHECK_NONE(messageContentType); |
| } |
| |
| ContentType acceptType; |
| if (request.acceptsMediaType(APPLICATION_JSON)) { |
| acceptType = ContentType::JSON; |
| } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) { |
| acceptType = ContentType::PROTOBUF; |
| } else if (request.acceptsMediaType(APPLICATION_RECORDIO)) { |
| acceptType = ContentType::RECORDIO; |
| } else { |
| Option<string> acceptType_ = request.headers.get("Accept"); |
| CHECK_SOME(acceptType_); |
| |
| LOG(FATAL) << "Unexpected 'Accept' header: " << acceptType_.get(); |
| } |
| |
| Option<ContentType> messageAcceptType; |
| if (streamingMediaType(acceptType)) { |
| if (request.acceptsMediaType(MESSAGE_ACCEPT, APPLICATION_JSON)) { |
| messageAcceptType = ContentType::JSON; |
| } else if (request.acceptsMediaType(MESSAGE_ACCEPT, APPLICATION_PROTOBUF)) { |
| messageAcceptType = ContentType::PROTOBUF; |
| } else { |
| Option<string> messageAcceptType_ = request.headers.get(MESSAGE_ACCEPT); |
| CHECK_SOME(messageAcceptType_); |
| |
| LOG(FATAL) << "Unexpected '" << MESSAGE_ACCEPT << "' header: " |
| << messageAcceptType_.get(); |
| } |
| } else { |
| // The 'Message-Accept' header should not be set |
| // for a non-streaming response. |
| CHECK_NONE(request.headers.get(MESSAGE_ACCEPT)); |
| } |
| |
| CHECK_EQ(http::Request::PIPE, request.type); |
| CHECK_SOME(request.reader); |
| |
| if (streamingMediaType(contentType)) { |
| CHECK_EQ(ContentType::RECORDIO, contentType); |
| CHECK_SOME(messageContentType); |
| |
| Owned<recordio::Reader<agent::Call>> reader( |
| new recordio::Reader<agent::Call>( |
| lambda::bind( |
| deserialize<agent::Call>, |
| messageContentType.get(), |
| lambda::_1), |
| request.reader.get())); |
| |
| return reader->read() |
| .then(defer( |
| self(), |
| [=](const Result<agent::Call>& call) -> Future<http::Response> { |
| if (call.isNone()) { |
| return http::BadRequest( |
| "IOSwitchboard received EOF while reading request body"); |
| } |
| |
| if (call.isError()) { |
| return Failure(call.error()); |
| } |
| |
| // Should have already been validated by the agent. |
| CHECK(call->has_type()); |
| CHECK_EQ(agent::Call::ATTACH_CONTAINER_INPUT, call->type()); |
| CHECK(call->has_attach_container_input()); |
| CHECK_EQ(mesos::agent::Call::AttachContainerInput::CONTAINER_ID, |
| call->attach_container_input().type()); |
| CHECK(call->attach_container_input().has_container_id()); |
| CHECK(call->attach_container_input().container_id().has_value()); |
| |
| return attachContainerInput(reader); |
| })); |
| } else { |
| http::Pipe::Reader reader = request.reader.get(); // Remove const. |
| |
| return reader.readAll() |
| .then(defer( |
| self(), |
| [=](const string& body) -> Future<http::Response> { |
| Try<agent::Call> call = deserialize<agent::Call>(contentType, body); |
| if (call.isError()) { |
| return http::BadRequest(call.error()); |
| } |
| |
| // Should have already been validated by the agent. |
| CHECK(call->has_type()); |
| CHECK_EQ(agent::Call::ATTACH_CONTAINER_OUTPUT, call->type()); |
| |
| return attachContainerOutput(acceptType, messageAcceptType); |
| })); |
| } |
| } |
| |
| |
| Option<Error> IOSwitchboardServerProcess::validate( |
| const agent::Call::AttachContainerInput& call) |
| { |
| switch (call.type()) { |
| case agent::Call::AttachContainerInput::UNKNOWN: |
| case agent::Call::AttachContainerInput::CONTAINER_ID: { |
| return Error( |
| "Expecting 'attach_container_input.type' to be 'PROCESS_IO'" |
| " instead of: '" + stringify(call.type()) + "'"); |
| } |
| case agent::Call::AttachContainerInput::PROCESS_IO: { |
| if (!call.has_process_io()) { |
| return Error( |
| "Expecting 'attach_container_input.process_io' to be present"); |
| } |
| |
| const agent::ProcessIO& message = call.process_io(); |
| |
| if (!message.has_type()) { |
| return Error("Expecting 'process_io.type' to be present"); |
| } |
| |
| switch (message.type()) { |
| case agent::ProcessIO::UNKNOWN: { |
| return Error("'process_io.type' is unknown"); |
| } |
| case agent::ProcessIO::CONTROL: { |
| if (!message.has_control()) { |
| return Error("Expecting 'process_io.control' to be present"); |
| } |
| |
| if (!message.control().has_type()) { |
| return Error("Expecting 'process_io.control.type' to be present"); |
| } |
| |
| switch (message.control().type()) { |
| case agent::ProcessIO::Control::UNKNOWN: { |
| return Error("'process_io.control.type' is unknown"); |
| } |
| case agent::ProcessIO::Control::TTY_INFO: { |
| if (!message.control().has_tty_info()) { |
| return Error( |
| "Expecting 'process_io.control.tty_info' to be present"); |
| } |
| |
| const TTYInfo& ttyInfo = message.control().tty_info(); |
| |
| if (!ttyInfo.has_window_size()) { |
| return Error("Expecting 'tty_info.window_size' to be present"); |
| } |
| |
| return None(); |
| } |
| case agent::ProcessIO::Control::HEARTBEAT: { |
| if (!message.control().has_heartbeat()) { |
| return Error( |
| "Expecting 'process_io.control.heartbeat' to be present"); |
| } |
| |
| return None(); |
| } |
| } |
| |
| UNREACHABLE(); |
| } |
| case agent::ProcessIO::DATA: { |
| if (!message.has_data()) { |
| return Error("Expecting 'process_io.data' to be present"); |
| } |
| |
| if (!message.data().has_type()) { |
| return Error("Expecting 'process_io.data.type' to be present"); |
| } |
| |
| if (message.data().type() != agent::ProcessIO::Data::STDIN) { |
| return Error("Expecting 'process_io.data.type' to be 'STDIN'"); |
| } |
| |
| if (!message.data().has_data()) { |
| return Error("Expecting 'process_io.data.data' to be present"); |
| } |
| |
| return None(); |
| } |
| } |
| } |
| } |
| |
| UNREACHABLE(); |
| } |
| |
| |
| Future<http::Response> |
| IOSwitchboardServerProcess::acknowledgeContainerInputResponse() |
| { |
| // Check if this is an acknowledgment sent by the agent. This acknowledgment |
| // means that response for `ATTACH_CONTAINER_INPUT` call has been received by |
| // the agent. |
| CHECK_GT(numPendingAcknowledgments, 0u); |
| if (--numPendingAcknowledgments == 0) { |
| // If IO redirects are finished or writing to `stdin` failed we want to |
| // terminate ourselves (after flushing any outstanding messages from our |
| // message queue). |
| if (!redirectFinished.future().isPending() || failure.isSome()) { |
| terminate(self(), false); |
| } |
| } |
| return http::OK(); |
| } |
| |
| |
| Future<http::Response> IOSwitchboardServerProcess::attachContainerInput( |
| const Owned<recordio::Reader<agent::Call>>& reader) |
| { |
| ++numPendingAcknowledgments; |
| |
| // Only allow a single input connection at a time. |
| if (inputConnected) { |
| return http::Conflict("Multiple input connections are not allowed"); |
| } |
| |
| // We set `inputConnected` to true here and then reset it to false |
| // at the bottom of this function once our asynchronous loop has |
| // terminated. This way another connection can be established once |
| // the current one is complete. |
| inputConnected = true; |
| |
| // Loop through each record and process it. Return a proper |
| // response once the last record has been fully processed. |
| auto readLoop = loop( |
| self(), |
| [=]() { |
| return reader->read(); |
| }, |
| [=](const Result<agent::Call>& record) |
| -> Future<ControlFlow<http::Response>> { |
| if (record.isNone()) { |
| return Break(http::OK()); |
| } |
| |
| if (record.isError()) { |
| return Break(http::BadRequest(record.error())); |
| } |
| |
| // Should have already been validated by the agent. |
| CHECK(record->has_type()); |
| CHECK_EQ(mesos::agent::Call::ATTACH_CONTAINER_INPUT, record->type()); |
| CHECK(record->has_attach_container_input()); |
| |
| // Validate the rest of the `AttachContainerInput` message. |
| Option<Error> error = validate(record->attach_container_input()); |
| if (error.isSome()) { |
| return Break(http::BadRequest(error->message)); |
| } |
| |
| const agent::ProcessIO& message = |
| record->attach_container_input().process_io(); |
| |
| switch (message.type()) { |
| case agent::ProcessIO::CONTROL: { |
| switch (message.control().type()) { |
| case agent::ProcessIO::Control::TTY_INFO: { |
| // TODO(klueska): Return a failure if the container we are |
| // attaching to does not have a tty associated with it. |
| |
| // Update the window size. |
| Try<Nothing> window = os::setWindowSize( |
| stdinToFd, |
| message.control().tty_info().window_size().rows(), |
| message.control().tty_info().window_size().columns()); |
| |
| if (window.isError()) { |
| return Break(http::BadRequest( |
| "Unable to set the window size: " + window.error())); |
| } |
| |
| return Continue(); |
| } |
| case agent::ProcessIO::Control::HEARTBEAT: { |
| // For now, we ignore any interval information |
| // sent along with the heartbeat. |
| return Continue(); |
| } |
| default: { |
| UNREACHABLE(); |
| } |
| } |
| } |
| case agent::ProcessIO::DATA: { |
| // Receiving a `DATA` message with length 0 indicates |
| // `EOF`, so we should close `stdinToFd` if there is no tty. |
| // If tty is enabled, the client is expected to send `EOT` instead. |
| if (!tty && message.data().data().length() == 0) { |
| os::close(stdinToFd); |
| return Continue(); |
| } |
| |
| // Write the STDIN data to `stdinToFd`. If there is a |
| // failure, we set the `failure` member variable and exit |
| // the loop. In the resulting `.then()` callback, we then |
| // terminate the process. We don't terminate the process |
| // here because we want to propagate an `InternalServerError` |
| // back to the client. |
| return process::io::write(stdinToFd, message.data().data()) |
| .then(defer(self(), [=](const Nothing&) |
| -> ControlFlow<http::Response> { |
| return Continue(); |
| })) |
| .recover(defer(self(), [=]( |
| const Future<ControlFlow<http::Response>>& future) |
| -> ControlFlow<http::Response> { |
| failure = Failure( |
| "Failed writing to stdin: " + stringify(future)); |
| return Break(http::InternalServerError(failure->message)); |
| })); |
| } |
| default: { |
| UNREACHABLE(); |
| } |
| } |
| }); |
| |
| // We create a new promise, which is transitioned to `READY` when either |
| // the read loop finishes or IO redirects finish. Once this promise is set, |
| // we return a final response to the client. |
| // |
| // We use `defer(self(), ...)` to use this process as a synchronization point |
| // when changing state of the promise. |
| Owned<Promise<http::Response>> promise(new Promise<http::Response>()); |
| |
| readLoop.onAny( |
| defer(self(), [promise](const Future<http::Response>& response) { |
| promise->set(response); |
| })); |
| |
| // Since IOSwitchboard might receive an acknowledgment for the |
| // `ATTACH_CONTAINER_INPUT` request before reading a final message from |
| // the corresponding connection, we need to give IOSwitchboard a chance to |
| // read the final message. Otherwise, the agent might get `HTTP 500` |
| // "broken pipe" while attempting to write the final message. |
| redirectFinished.future().onAny( |
| defer(self(), [=](const Future<http::Response>& response) { |
| // TODO(abudnik): Ideally, we would have used `process::delay()` to |
| // delay a dispatch of the lambda to this process. |
| after(Seconds(1)) |
| .onAny(defer(self(), [promise, response](const Future<Nothing>&) { |
| promise->set(response); |
| })); |
| })); |
| |
| // We explicitly specify the return type to avoid a type deduction |
| // issue in some versions of clang. See MESOS-2943. |
| return promise->future().then( |
| defer(self(), [=](const http::Response& response) -> http::Response { |
| // Reset `inputConnected` to allow future input connections. |
| inputConnected = false; |
| |
| return response; |
| })); |
| } |
| |
| |
| Future<http::Response> IOSwitchboardServerProcess::attachContainerOutput( |
| ContentType acceptType, |
| Option<ContentType> messageAcceptType) |
| { |
| http::Pipe pipe; |
| http::OK ok; |
| |
| ok.headers["Content-Type"] = stringify(acceptType); |
| |
| // If a client sets the 'Accept' header expecting a streaming response, |
| // `messageAcceptType` would always be set and we use it as the value of |
| // 'Message-Content-Type' response header. |
| ContentType messageContentType = acceptType; |
| if (streamingMediaType(acceptType)) { |
| CHECK_SOME(messageAcceptType); |
| ok.headers[MESSAGE_CONTENT_TYPE] = stringify(messageAcceptType.get()); |
| messageContentType = messageAcceptType.get(); |
| } |
| |
| ok.type = http::Response::PIPE; |
| ok.reader = pipe.reader(); |
| |
| // We store the connection in a list and wait for asynchronous |
| // calls to `receiveOutput()` to actually push data out over the |
| // connection. If we ever detect a connection has been closed, |
| // we remove it from this list. |
| HttpConnection connection(pipe.writer(), messageContentType); |
| auto iterator = outputConnections.insert(outputConnections.end(), connection); |
| |
| // We use the `startRedirect` promise to indicate when we should |
| // begin reading data from our `stdoutFromFd` and `stderrFromFd` |
| // file descriptors. If we were started with the `waitForConnection` |
| // parameter set to `true`, only set this promise here once the |
| // first connection has been established. |
| if (!startRedirect.future().isReady()) { |
| startRedirect.set(Nothing()); |
| } |
| |
| connection.closed() |
| .then(defer(self(), [this, iterator]() { |
| // Erasing from a `std::list` only invalidates the iterator of |
| // the object being erased. All other iterators remain valid. |
| outputConnections.erase(iterator); |
| return Nothing(); |
| })); |
| |
| return ok; |
| } |
| |
| |
| void IOSwitchboardServerProcess::outputHook( |
| const string& data, |
| const agent::ProcessIO::Data::Type& type) |
| { |
| // Break early if there are no connections to send the data to. |
| if (outputConnections.size() == 0) { |
| return; |
| } |
| |
| // Build a `ProcessIO` message from the data. |
| agent::ProcessIO message; |
| message.set_type(agent::ProcessIO::DATA); |
| message.mutable_data()->set_type(type); |
| message.mutable_data()->set_data(data); |
| |
| // Walk through our list of connections and write the message to |
| // them. It's possible that a write might fail if the writer has |
| // been closed. That's OK because we already take care of removing |
| // closed connections from our list via the future returned by |
| // the `HttpConnection::closed()` call above. We might do a few |
| // unnecessary writes if we have a bunch of messages queued up, |
| // but that shouldn't be a problem. |
| foreach (HttpConnection& connection, outputConnections) { |
| connection.send(message); |
| } |
| } |
| #endif // __WINDOWS__ |
| |
| } // namespace slave { |
| } // namespace internal { |
| } // namespace mesos { |