| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "checks/checker_process.hpp" |
| |
| #include <cstdint> |
| #include <map> |
| #include <memory> |
| #include <string> |
| #include <tuple> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| |
| #include <mesos/mesos.hpp> |
| #include <mesos/type_utils.hpp> |
| |
| #include <mesos/agent/agent.hpp> |
| |
| #include <process/collect.hpp> |
| #include <process/defer.hpp> |
| #include <process/delay.hpp> |
| #include <process/future.hpp> |
| #include <process/io.hpp> |
| #include <process/protobuf.hpp> |
| #include <process/subprocess.hpp> |
| #include <process/time.hpp> |
| |
| #include <stout/check.hpp> |
| #include <stout/duration.hpp> |
| #include <stout/foreach.hpp> |
| #include <stout/jsonify.hpp> |
| #include <stout/nothing.hpp> |
| #include <stout/option.hpp> |
| #include <stout/protobuf.hpp> |
| #include <stout/stopwatch.hpp> |
| #include <stout/strings.hpp> |
| #include <stout/try.hpp> |
| #include <stout/uuid.hpp> |
| |
| #include <stout/os/environment.hpp> |
| #include <stout/os/killtree.hpp> |
| |
| #include "common/http.hpp" |
| #include "common/protobuf_utils.hpp" |
| #include "common/status_utils.hpp" |
| |
| #include "internal/evolve.hpp" |
| |
| #ifdef __linux__ |
| #include "linux/ns.hpp" |
| #endif |
| |
| namespace http = process::http; |
| |
| using process::Failure; |
| using process::Future; |
| using process::Owned; |
| using process::Promise; |
| using process::Subprocess; |
| |
| using std::map; |
| using std::shared_ptr; |
| using std::string; |
| using std::tuple; |
| using std::vector; |
| |
| namespace mesos { |
| namespace internal { |
| namespace checks { |
| |
| #ifndef __WINDOWS__ |
| constexpr char HTTP_CHECK_COMMAND[] = "curl"; |
| constexpr char TCP_CHECK_COMMAND[] = "mesos-tcp-connect"; |
| #else |
| constexpr char HTTP_CHECK_COMMAND[] = "curl.exe"; |
| constexpr char TCP_CHECK_COMMAND[] = "mesos-tcp-connect.exe"; |
| #endif // __WINDOWS__ |
| |
| static const string DEFAULT_HTTP_SCHEME = "http"; |
| |
| // Use '127.0.0.1' instead of 'localhost', because the host |
| // file in some container images may not contain 'localhost'. |
| constexpr char DEFAULT_DOMAIN[] = "127.0.0.1"; |
| |
| |
| #ifdef __linux__ |
| // TODO(alexr): Instead of defining this ad-hoc clone function, provide a |
| // general solution for entering namespace in child processes, see MESOS-6184. |
| static pid_t cloneWithSetns( |
| const lambda::function<int()>& func, |
| const Option<pid_t>& taskPid, |
| const vector<string>& namespaces) |
| { |
| auto child = [=]() -> int { |
| if (taskPid.isSome()) { |
| foreach (const string& ns, namespaces) { |
| Try<Nothing> setns = ns::setns(taskPid.get(), ns); |
| if (setns.isError()) { |
| // This effectively aborts the check. |
| LOG(FATAL) << "Failed to enter the " << ns << " namespace of task" |
| << " (pid: " << taskPid.get() << "): " << setns.error(); |
| } |
| |
| VLOG(1) << "Entered the " << ns << " namespace of task" |
| << " (pid: " << taskPid.get() << ") successfully"; |
| } |
| } |
| |
| return func(); |
| }; |
| |
| pid_t pid = ::fork(); |
| if (pid == -1) { |
| return -1; |
| } else if (pid == 0) { |
| // Child. |
| ::exit(child()); |
| UNREACHABLE(); |
| } else { |
| // Parent. |
| return pid; |
| } |
| } |
| #endif |
| |
| |
| CheckerProcess::CheckerProcess( |
| const CheckInfo& _check, |
| const string& _launcherDir, |
| const lambda::function<void(const Try<CheckStatusInfo>&)>& _callback, |
| const TaskID& _taskId, |
| const Option<pid_t>& _taskPid, |
| const vector<string>& _namespaces, |
| const Option<ContainerID>& _taskContainerId, |
| const Option<http::URL>& _agentURL, |
| const Option<string>& _authorizationHeader, |
| const Option<string>& _scheme, |
| const std::string& _name, |
| bool _commandCheckViaAgent) |
| : ProcessBase(process::ID::generate("checker")), |
| check(_check), |
| launcherDir(_launcherDir), |
| updateCallback(_callback), |
| taskId(_taskId), |
| taskPid(_taskPid), |
| namespaces(_namespaces), |
| taskContainerId(_taskContainerId), |
| agentURL(_agentURL), |
| authorizationHeader(_authorizationHeader), |
| scheme(_scheme), |
| commandCheckViaAgent(_commandCheckViaAgent), |
| name(_name), |
| paused(false) |
| { |
| Try<Duration> create = Duration::create(check.delay_seconds()); |
| CHECK_SOME(create); |
| checkDelay = create.get(); |
| |
| create = Duration::create(check.interval_seconds()); |
| CHECK_SOME(create); |
| checkInterval = create.get(); |
| |
| // Zero value means infinite timeout. |
| create = Duration::create(check.timeout_seconds()); |
| CHECK_SOME(create); |
| checkTimeout = |
| (create.get() > Duration::zero()) ? create.get() : Duration::max(); |
| |
| #ifdef __linux__ |
| if (!namespaces.empty()) { |
| clone = lambda::bind(&cloneWithSetns, lambda::_1, taskPid, namespaces); |
| } |
| #endif |
| } |
| |
| |
| void CheckerProcess::initialize() |
| { |
| scheduleNext(checkDelay); |
| } |
| |
| |
| void CheckerProcess::finalize() |
| { |
| LOG(INFO) << "Stopped " << name << " for task '" << taskId << "'"; |
| } |
| |
| |
| void CheckerProcess::performCheck() |
| { |
| if (paused) { |
| return; |
| } |
| |
| Stopwatch stopwatch; |
| stopwatch.start(); |
| |
| switch (check.type()) { |
| case CheckInfo::COMMAND: { |
| Future<int> future = commandCheckViaAgent ? nestedCommandCheck() |
| : commandCheck(); |
| future.onAny(defer( |
| self(), |
| &Self::processCommandCheckResult, stopwatch, lambda::_1)); |
| break; |
| } |
| |
| case CheckInfo::HTTP: { |
| httpCheck().onAny(defer( |
| self(), |
| &Self::processHttpCheckResult, stopwatch, lambda::_1)); |
| break; |
| } |
| |
| case CheckInfo::TCP: { |
| tcpCheck().onAny(defer( |
| self(), |
| &Self::processTcpCheckResult, stopwatch, lambda::_1)); |
| break; |
| } |
| |
| case CheckInfo::UNKNOWN: { |
| LOG(FATAL) << "Received UNKNOWN check type"; |
| break; |
| } |
| } |
| } |
| |
| |
| void CheckerProcess::scheduleNext(const Duration& duration) |
| { |
| CHECK(!paused); |
| |
| VLOG(1) << "Scheduling " << name << " for task '" << taskId << "' in " |
| << duration; |
| |
| delay(duration, self(), &Self::performCheck); |
| } |
| |
| |
| void CheckerProcess::pause() |
| { |
| if (!paused) { |
| VLOG(1) << "Paused " << name << " for task '" << taskId << "'"; |
| |
| paused = true; |
| } |
| } |
| |
| |
| void CheckerProcess::resume() |
| { |
| if (paused) { |
| VLOG(1) << "Resumed " << name << " for task '" << taskId << "'"; |
| |
| paused = false; |
| |
| // Schedule a check immediately. |
| scheduleNext(Duration::zero()); |
| } |
| } |
| |
| |
| void CheckerProcess::processCheckResult( |
| const Stopwatch& stopwatch, |
| const Result<CheckStatusInfo>& result) |
| { |
| // `Checker` might have been paused while performing the check. |
| if (paused) { |
| LOG(INFO) << "Ignoring " << name << " result for" |
| << " task '" << taskId << "': checking is paused"; |
| return; |
| } |
| |
| // `result` will be: |
| // |
| // 1. `Some(CheckStatusInfo)` if it was possible to perform the check. |
| // 2. An `Error` if the check failed due to a non-transient error, |
| // e.g., timed out. |
| // 3. `None` if the check failed due to a transient error - this kind of |
| // failure will be silently ignored. |
| if (result.isSome()) { |
| // It was possible to perform the check. |
| VLOG(1) << "Performed " << name << " for task '" << taskId << "' in " |
| << stopwatch.elapsed(); |
| |
| updateCallback(result.get()); |
| } else if (result.isError()) { |
| // The check failed due to a non-transient error. |
| updateCallback(Error(result.error())); |
| } else { |
| // The check failed due to a transient error. |
| LOG(INFO) << name << " for task '" << taskId << "' is not available"; |
| } |
| |
| scheduleNext(checkInterval); |
| } |
| |
| |
| Future<int> CheckerProcess::commandCheck() |
| { |
| CHECK_EQ(CheckInfo::COMMAND, check.type()); |
| CHECK(check.has_command()); |
| |
| const CommandInfo& command = check.command().command(); |
| |
| map<string, string> environment = os::environment(); |
| |
| foreach (const Environment::Variable& variable, |
| command.environment().variables()) { |
| environment[variable.name()] = variable.value(); |
| } |
| |
| // Launch the subprocess. |
| Try<Subprocess> s = Error("Not launched"); |
| |
| if (command.shell()) { |
| // Use the shell variant. |
| VLOG(1) << "Launching " << name << " '" << command.value() << "'" |
| << " for task '" << taskId << "'"; |
| |
| s = process::subprocess( |
| command.value(), |
| Subprocess::PATH(os::DEV_NULL), |
| Subprocess::FD(STDERR_FILENO), |
| Subprocess::FD(STDERR_FILENO), |
| environment, |
| clone); |
| } else { |
| // Use the exec variant. |
| vector<string> argv( |
| std::begin(command.arguments()), std::end(command.arguments())); |
| |
| VLOG(1) << "Launching " << name << " [" << command.value() << ", " |
| << strings::join(", ", argv) << "] for task '" << taskId << "'"; |
| |
| s = process::subprocess( |
| command.value(), |
| argv, |
| Subprocess::PATH(os::DEV_NULL), |
| Subprocess::FD(STDERR_FILENO), |
| Subprocess::FD(STDERR_FILENO), |
| nullptr, |
| environment, |
| clone); |
| } |
| |
| if (s.isError()) { |
| return Failure("Failed to create subprocess: " + s.error()); |
| } |
| |
| // TODO(alexr): Use lambda named captures for |
| // these cached values once it is available. |
| const pid_t commandPid = s->pid(); |
| const string _name = name; |
| const Duration timeout = checkTimeout; |
| const TaskID _taskId = taskId; |
| |
| return s->status() |
| .after( |
| timeout, |
| [timeout, commandPid, _name, _taskId](Future<Option<int>> future) |
| { |
| future.discard(); |
| |
| if (commandPid != -1) { |
| // Cleanup the external command process. |
| VLOG(1) << "Killing the " << _name << " process '" << commandPid |
| << "' for task '" << _taskId << "'"; |
| |
| os::killtree(commandPid, SIGKILL); |
| } |
| |
| return Failure("Command timed out after " + stringify(timeout)); |
| }) |
| .then([](const Option<int>& exitCode) -> Future<int> { |
| if (exitCode.isNone()) { |
| return Failure("Failed to reap the command process"); |
| } |
| |
| return exitCode.get(); |
| }); |
| } |
| |
| |
| Future<int> CheckerProcess::nestedCommandCheck() |
| { |
| CHECK_EQ(CheckInfo::COMMAND, check.type()); |
| CHECK(check.has_command()); |
| CHECK_SOME(taskContainerId); |
| CHECK_SOME(agentURL); |
| |
| VLOG(1) << "Launching " << name << " for task '" << taskId << "'"; |
| |
| // We don't want recoverable errors, e.g., the agent responding with |
| // HTTP status code 503, to trigger a check failure. |
| // |
| // The future returned by this method represents the result of a |
| // check. It will be set to the exit status of the check command if it |
| // succeeded, to a `Failure` if there was a non-transient error, and |
| // discarded if there was a transient error. |
| auto promise = std::make_shared<Promise<int>>(); |
| |
| if (previousCheckContainerId.isSome()) { |
| agent::Call call; |
| call.set_type(agent::Call::REMOVE_NESTED_CONTAINER); |
| |
| mesos::ContainerID previousId = previousCheckContainerId.get(); |
| |
| agent::Call::RemoveNestedContainer* removeContainer = |
| call.mutable_remove_nested_container(); |
| |
| removeContainer->mutable_container_id()->CopyFrom( |
| previousCheckContainerId.get()); |
| |
| http::Request request; |
| request.method = "POST"; |
| request.url = agentURL.get(); |
| request.body = serialize(ContentType::PROTOBUF, evolve(call)); |
| request.headers = {{"Accept", stringify(ContentType::PROTOBUF)}, |
| {"Content-Type", stringify(ContentType::PROTOBUF)}}; |
| |
| if (authorizationHeader.isSome()) { |
| request.headers["Authorization"] = authorizationHeader.get(); |
| } |
| |
| http::request(request, false) |
| .onFailed(defer(self(), |
| [this, promise, previousId](const string& failure) { |
| LOG(WARNING) << "Connection to remove the nested container '" |
| << previousId << "' used for the " << name << " for" |
| << " task '" << taskId << "' failed: " << failure; |
| |
| // Something went wrong while sending the request, we treat this |
| // as a transient failure and discard the promise. |
| promise->discard(); |
| })) |
| .onReady(defer(self(), [this, promise, previousId]( |
| const http::Response& response) { |
| if (response.code != http::Status::OK) { |
| // The agent was unable to remove the check container, we |
| // treat this as a transient failure and discard the promise. |
| LOG(WARNING) << "Received '" << response.status << "' (" |
| << response.body << ") while removing the nested" |
| << " container '" << previousId << "' used for" |
| << " the " << name << " for task '" << taskId << "'"; |
| |
| promise->discard(); |
| } else { |
| previousCheckContainerId = None(); |
| _nestedCommandCheck(promise); |
| } |
| })); |
| } else { |
| _nestedCommandCheck(promise); |
| } |
| |
| return promise->future(); |
| } |
| |
| |
| void CheckerProcess::_nestedCommandCheck(shared_ptr<Promise<int>> promise) |
| { |
| // TODO(alexr): Use lambda named captures for |
| // these cached values once they are available. |
| const TaskID _taskId = taskId; |
| const string _name = name; |
| |
| http::connect(agentURL.get()) |
| .onFailed(defer(self(), [_taskId, _name, promise](const string& failure) { |
| LOG(WARNING) << "Unable to establish connection with the agent to launch " |
| << _name << " for task '" << _taskId << "'" |
| << ": " << failure; |
| |
| // We treat this as a transient failure. |
| promise->discard(); |
| })) |
| .onReady(defer(self(), &Self::__nestedCommandCheck, promise, lambda::_1)); |
| } |
| |
| |
| void CheckerProcess::__nestedCommandCheck( |
| shared_ptr<Promise<int>> promise, |
| http::Connection connection) |
| { |
| ContainerID checkContainerId; |
| checkContainerId.set_value("check-" + UUID::random().toString()); |
| checkContainerId.mutable_parent()->CopyFrom(taskContainerId.get()); |
| |
| previousCheckContainerId = checkContainerId; |
| |
| CommandInfo command(check.command().command()); |
| |
| agent::Call call; |
| call.set_type(agent::Call::LAUNCH_NESTED_CONTAINER_SESSION); |
| |
| agent::Call::LaunchNestedContainerSession* launch = |
| call.mutable_launch_nested_container_session(); |
| |
| launch->mutable_container_id()->CopyFrom(checkContainerId); |
| launch->mutable_command()->CopyFrom(command); |
| |
| http::Request request; |
| request.method = "POST"; |
| request.url = agentURL.get(); |
| request.body = serialize(ContentType::PROTOBUF, evolve(call)); |
| request.headers = {{"Accept", stringify(ContentType::RECORDIO)}, |
| {"Message-Accept", stringify(ContentType::PROTOBUF)}, |
| {"Content-Type", stringify(ContentType::PROTOBUF)}}; |
| |
| if (authorizationHeader.isSome()) { |
| request.headers["Authorization"] = authorizationHeader.get(); |
| } |
| |
| // TODO(alexr): Use a lambda named capture for |
| // this cached value once it is available. |
| const Duration timeout = checkTimeout; |
| |
| auto checkTimedOut = std::make_shared<bool>(false); |
| |
| // `LAUNCH_NESTED_CONTAINER_SESSION` returns a streamed response with |
| // the output of the container. The agent will close the stream once |
| // the container has exited, or kill the container if the client |
| // closes the connection. |
| // |
| // We're calling `Connection::send` with `streamed = false`, so that |
| // it returns an HTTP response of type 'BODY' once the entire response |
| // is received. |
| // |
| // This means that this future will not be completed until after the |
| // check command has finished or the connection has been closed. |
| connection.send(request, false) |
| .after(checkTimeout, |
| defer(self(), |
| [timeout, checkTimedOut](Future<http::Response> future) { |
| future.discard(); |
| |
| *checkTimedOut = true; |
| |
| return Failure("Command timed out after " + stringify(timeout)); |
| })) |
| .onFailed(defer(self(), |
| &Self::nestedCommandCheckFailure, |
| promise, |
| connection, |
| checkContainerId, |
| checkTimedOut, |
| lambda::_1)) |
| .onReady(defer(self(), |
| &Self::___nestedCommandCheck, |
| promise, |
| checkContainerId, |
| lambda::_1)); |
| } |
| |
| |
| void CheckerProcess::___nestedCommandCheck( |
| shared_ptr<Promise<int>> promise, |
| const ContainerID& checkContainerId, |
| const http::Response& launchResponse) |
| { |
| if (launchResponse.code != http::Status::OK) { |
| // The agent was unable to launch the check container, |
| // we treat this as a transient failure. |
| LOG(WARNING) << "Received '" << launchResponse.status << "' (" |
| << launchResponse.body << ") while launching " << name |
| << " for task '" << taskId << "'"; |
| |
| // We'll try to remove the container created for the check at the |
| // beginning of the next check. In order to prevent a failure, the |
| // promise should only be completed once we're sure that the |
| // container has terminated. |
| waitNestedContainer(checkContainerId) |
| .onAny([promise](const Future<Option<int>>&) { |
| // We assume that once `WaitNestedContainer` returns, |
| // irrespective of whether the response contains a failure, the |
| // container will be in a terminal state, and that it will be |
| // possible to remove it. |
| promise->discard(); |
| }); |
| |
| return; |
| } |
| |
| waitNestedContainer(checkContainerId) |
| .onFailed([promise](const string& failure) { |
| promise->fail( |
| "Unable to get the exit code: " + failure); |
| }) |
| .onReady([promise](const Option<int>& status) -> void { |
| if (status.isNone()) { |
| promise->fail("Unable to get the exit code"); |
| // TODO(gkleiman): Make sure that the following block works on Windows. |
| } else if (WIFSIGNALED(status.get()) && |
| WTERMSIG(status.get()) == SIGKILL) { |
| // The check container was signaled, probably because the task |
| // finished while the check was still in-flight, so we discard |
| // the result. |
| promise->discard(); |
| } else { |
| promise->set(status.get()); |
| } |
| }); |
| } |
| |
| |
| void CheckerProcess::nestedCommandCheckFailure( |
| shared_ptr<Promise<int>> promise, |
| http::Connection connection, |
| const ContainerID& checkContainerId, |
| shared_ptr<bool> checkTimedOut, |
| const string& failure) |
| { |
| if (*checkTimedOut) { |
| // The check timed out, closing the connection will make the agent |
| // kill the container. |
| connection.disconnect(); |
| |
| // If the check delay interval is zero, we'll try to perform another |
| // check right after we finish processing the current timeout. |
| // |
| // We'll try to remove the container created for the check at the |
| // beginning of the next check. In order to prevent a failure, the |
| // promise should only be completed once we're sure that the |
| // container has terminated. |
| waitNestedContainer(checkContainerId) |
| .onAny([failure, promise](const Future<Option<int>>&) { |
| // We assume that once `WaitNestedContainer` returns, |
| // irrespective of whether the response contains a failure, the |
| // container will be in a terminal state, and that it will be |
| // possible to remove it. |
| // |
| // This means that we don't need to retry the `WaitNestedContainer` |
| // call. |
| promise->fail(failure); |
| }); |
| } else { |
| // The agent was not able to complete the request, discarding the |
| // promise signals the checker that it should retry the check. |
| // |
| // This will allow us to recover from a blip. The executor will |
| // pause the checker when it detects that the agent is not |
| // available. Here we do not need to wait the check container since |
| // the agent may have been unavailable, and when the agent is back, |
| // it will destroy the check container as orphan container, and we |
| // will eventually remove it in `nestedCommandCheck()`. |
| LOG(WARNING) << "Connection to the agent to launch " << name |
| << " for task '" << taskId << "' failed: " << failure; |
| |
| promise->discard(); |
| } |
| } |
| |
| |
| Future<Option<int>> CheckerProcess::waitNestedContainer( |
| const ContainerID& containerId) |
| { |
| agent::Call call; |
| call.set_type(agent::Call::WAIT_NESTED_CONTAINER); |
| |
| agent::Call::WaitNestedContainer* containerWait = |
| call.mutable_wait_nested_container(); |
| |
| containerWait->mutable_container_id()->CopyFrom(containerId); |
| |
| http::Request request; |
| request.method = "POST"; |
| request.url = agentURL.get(); |
| request.body = serialize(ContentType::PROTOBUF, evolve(call)); |
| request.headers = {{"Accept", stringify(ContentType::PROTOBUF)}, |
| {"Content-Type", stringify(ContentType::PROTOBUF)}}; |
| |
| if (authorizationHeader.isSome()) { |
| request.headers["Authorization"] = authorizationHeader.get(); |
| } |
| |
| // TODO(alexr): Use a lambda named capture for |
| // this cached value once it is available. |
| const string _name = name; |
| |
| return http::request(request, false) |
| .repair([containerId, _name](const Future<http::Response>& future) { |
| return Failure( |
| "Connection to wait for " + _name + " container '" + |
| stringify(containerId) + "' failed: " + future.failure()); |
| }) |
| .then(defer(self(), |
| &Self::_waitNestedContainer, containerId, lambda::_1)); |
| } |
| |
| |
| Future<Option<int>> CheckerProcess::_waitNestedContainer( |
| const ContainerID& containerId, |
| const http::Response& httpResponse) |
| { |
| if (httpResponse.code != http::Status::OK) { |
| return Failure( |
| "Received '" + httpResponse.status + "' (" + httpResponse.body + |
| ") while waiting on " + name + " container '" + |
| stringify(containerId) + "'"); |
| } |
| |
| Try<agent::Response> response = |
| deserialize<agent::Response>(ContentType::PROTOBUF, httpResponse.body); |
| CHECK_SOME(response); |
| |
| CHECK(response->has_wait_nested_container()); |
| |
| return ( |
| response->wait_nested_container().has_exit_status() |
| ? Option<int>(response->wait_nested_container().exit_status()) |
| : Option<int>::none()); |
| } |
| |
| |
| void CheckerProcess::processCommandCheckResult( |
| const Stopwatch& stopwatch, |
| const Future<int>& future) |
| { |
| CHECK(!future.isPending()); |
| |
| Result<CheckStatusInfo> result = None(); |
| |
| // On Posix, `future` corresponds to termination information in the |
| // `stat_loc` area. On Windows, `status` is obtained via calling the |
| // `GetExitCodeProcess()` function. |
| // |
| // TODO(alexr): Ensure `WEXITSTATUS` family macros are no-op on Windows, |
| // see MESOS-7242. |
| if (future.isReady() && WIFEXITED(future.get())) { |
| const int exitCode = WEXITSTATUS(future.get()); |
| VLOG(1) << name << " for task '" << taskId << "' returned: " << exitCode; |
| |
| CheckStatusInfo checkStatusInfo; |
| checkStatusInfo.set_type(check.type()); |
| checkStatusInfo.mutable_command()->set_exit_code( |
| static_cast<int32_t>(exitCode)); |
| |
| result = Result<CheckStatusInfo>(checkStatusInfo); |
| } else if (future.isDiscarded()) { |
| // Check's status is currently not available due to a transient error, |
| // e.g., due to the agent failover, no `CheckStatusInfo` message should |
| // be sent to the callback. |
| result = None(); |
| } else { |
| result = Result<CheckStatusInfo>(Error(future.failure())); |
| } |
| |
| processCheckResult(stopwatch, result); |
| } |
| |
| |
| Future<int> CheckerProcess::httpCheck() |
| { |
| CHECK_EQ(CheckInfo::HTTP, check.type()); |
| CHECK(check.has_http()); |
| |
| const CheckInfo::Http& http = check.http(); |
| |
| const string _scheme = scheme.isSome() ? scheme.get() : DEFAULT_HTTP_SCHEME; |
| const string path = http.has_path() ? http.path() : ""; |
| const string url = _scheme + "://" + DEFAULT_DOMAIN + ":" + |
| stringify(http.port()) + path; |
| |
| VLOG(1) << "Launching " << name << " '" << url << "'" |
| << " for task '" << taskId << "'"; |
| |
| const vector<string> argv = { |
| HTTP_CHECK_COMMAND, |
| "-s", // Don't show progress meter or error messages. |
| "-S", // Makes curl show an error message if it fails. |
| "-L", // Follows HTTP 3xx redirects. |
| "-k", // Ignores SSL validation when scheme is https. |
| "-w", "%{http_code}", // Displays HTTP response code on stdout. |
| "-o", os::DEV_NULL, // Ignores output. |
| url |
| }; |
| |
| // TODO(alexr): Consider launching the helper binary once per task lifetime, |
| // see MESOS-6766. |
| Try<Subprocess> s = process::subprocess( |
| HTTP_CHECK_COMMAND, |
| argv, |
| Subprocess::PATH(os::DEV_NULL), |
| Subprocess::PIPE(), |
| Subprocess::PIPE(), |
| nullptr, |
| None(), |
| clone); |
| |
| if (s.isError()) { |
| return Failure( |
| "Failed to create the " + string(HTTP_CHECK_COMMAND) + |
| " subprocess: " + s.error()); |
| } |
| |
| // TODO(alexr): Use lambda named captures for |
| // these cached values once it is available. |
| const pid_t curlPid = s->pid(); |
| const string _name = name; |
| const Duration timeout = checkTimeout; |
| const TaskID _taskId = taskId; |
| |
| return await( |
| s->status(), |
| process::io::read(s->out().get()), |
| process::io::read(s->err().get())) |
| .after( |
| timeout, |
| [timeout, curlPid, _name, _taskId](Future<tuple<Future<Option<int>>, |
| Future<string>, |
| Future<string>>> future) |
| { |
| future.discard(); |
| |
| if (curlPid != -1) { |
| // Cleanup the HTTP_CHECK_COMMAND process. |
| VLOG(1) << "Killing the " << _name << " process " << curlPid |
| << " for task '" << _taskId << "'"; |
| |
| os::killtree(curlPid, SIGKILL); |
| } |
| |
| return Failure( |
| string(HTTP_CHECK_COMMAND) + " timed out after " + |
| stringify(timeout)); |
| }) |
| .then(defer(self(), &Self::_httpCheck, lambda::_1)); |
| } |
| |
| |
| Future<int> CheckerProcess::_httpCheck( |
| const tuple<Future<Option<int>>, Future<string>, Future<string>>& t) |
| { |
| const Future<Option<int>>& status = std::get<0>(t); |
| if (!status.isReady()) { |
| return Failure( |
| "Failed to get the exit status of the " + string(HTTP_CHECK_COMMAND) + |
| " process: " + (status.isFailed() ? status.failure() : "discarded")); |
| } |
| |
| if (status->isNone()) { |
| return Failure( |
| "Failed to reap the " + string(HTTP_CHECK_COMMAND) + " process"); |
| } |
| |
| int exitCode = status->get(); |
| if (exitCode != 0) { |
| const Future<string>& error = std::get<2>(t); |
| if (!error.isReady()) { |
| return Failure( |
| string(HTTP_CHECK_COMMAND) + " " + WSTRINGIFY(exitCode) + |
| "; reading stderr failed: " + |
| (error.isFailed() ? error.failure() : "discarded")); |
| } |
| |
| return Failure( |
| string(HTTP_CHECK_COMMAND) + " " + WSTRINGIFY(exitCode) + ": " + |
| error.get()); |
| } |
| |
| const Future<string>& output = std::get<1>(t); |
| if (!output.isReady()) { |
| return Failure( |
| "Failed to read stdout from " + string(HTTP_CHECK_COMMAND) + ": " + |
| (output.isFailed() ? output.failure() : "discarded")); |
| } |
| |
| // Parse the output and get the HTTP status code. |
| Try<int> statusCode = numify<int>(output.get()); |
| if (statusCode.isError()) { |
| return Failure( |
| "Unexpected output from " + string(HTTP_CHECK_COMMAND) + ": " + |
| output.get()); |
| } |
| |
| return statusCode.get(); |
| } |
| |
| |
| void CheckerProcess::processHttpCheckResult( |
| const Stopwatch& stopwatch, |
| const Future<int>& future) |
| { |
| CHECK(!future.isPending()); |
| |
| Result<CheckStatusInfo> result = None(); |
| |
| if (future.isReady()) { |
| VLOG(1) << name << " for task '" << taskId << "'" |
| << " returned: " << future.get(); |
| |
| CheckStatusInfo checkStatusInfo; |
| checkStatusInfo.set_type(check.type()); |
| checkStatusInfo.mutable_http()->set_status_code( |
| static_cast<uint32_t>(future.get())); |
| |
| result = Result<CheckStatusInfo>(checkStatusInfo); |
| } else if (future.isDiscarded()) { |
| // Check's status is currently not available due to a transient error, |
| // e.g., due to the agent failover, no `CheckStatusInfo` message should |
| // be sent to the callback. |
| result = None(); |
| } else { |
| result = Result<CheckStatusInfo>(Error(future.failure())); |
| } |
| |
| processCheckResult(stopwatch, result); |
| } |
| |
| |
| Future<bool> CheckerProcess::tcpCheck() |
| { |
| CHECK_EQ(CheckInfo::TCP, check.type()); |
| CHECK(check.has_tcp()); |
| |
| // TCP_CHECK_COMMAND should be reachable. |
| CHECK(os::exists(launcherDir)); |
| |
| const CheckInfo::Tcp& tcp = check.tcp(); |
| |
| VLOG(1) << "Launching " << name << " for task '" << taskId << "'" |
| << " at port " << tcp.port(); |
| |
| const string command = path::join(launcherDir, TCP_CHECK_COMMAND); |
| |
| const vector<string> argv = { |
| command, |
| "--ip=" + stringify(DEFAULT_DOMAIN), |
| "--port=" + stringify(tcp.port()) |
| }; |
| |
| // TODO(alexr): Consider launching the helper binary once per task lifetime, |
| // see MESOS-6766. |
| Try<Subprocess> s = subprocess( |
| command, |
| argv, |
| Subprocess::PATH(os::DEV_NULL), |
| Subprocess::PIPE(), |
| Subprocess::PIPE(), |
| nullptr, |
| None(), |
| clone); |
| |
| if (s.isError()) { |
| return Failure( |
| "Failed to create the " + command + " subprocess: " + s.error()); |
| } |
| |
| // TODO(alexr): Use lambda named captures for |
| // these cached values once they are available. |
| pid_t commandPid = s->pid(); |
| const string _name = name; |
| const Duration timeout = checkTimeout; |
| const TaskID _taskId = taskId; |
| |
| return await( |
| s->status(), |
| process::io::read(s->out().get()), |
| process::io::read(s->err().get())) |
| .after( |
| timeout, [timeout, commandPid, _name, _taskId]( |
| Future<tuple<Future<Option<int>>, |
| Future<string>, |
| Future<string>>> future) |
| { |
| future.discard(); |
| |
| if (commandPid != -1) { |
| // Cleanup the TCP_CHECK_COMMAND process. |
| VLOG(1) << "Killing the " << _name << " process " << commandPid |
| << " for task '" << _taskId << "'"; |
| |
| os::killtree(commandPid, SIGKILL); |
| } |
| |
| return Failure( |
| string(TCP_CHECK_COMMAND) + " timed out after " + stringify(timeout)); |
| }) |
| .then(defer(self(), &Self::_tcpCheck, lambda::_1)); |
| } |
| |
| |
| Future<bool> CheckerProcess::_tcpCheck( |
| const tuple<Future<Option<int>>, Future<string>, Future<string>>& t) |
| { |
| const Future<Option<int>>& status = std::get<0>(t); |
| if (!status.isReady()) { |
| return Failure( |
| "Failed to get the exit status of the " + string(TCP_CHECK_COMMAND) + |
| " process: " + (status.isFailed() ? status.failure() : "discarded")); |
| } |
| |
| if (status->isNone()) { |
| return Failure( |
| "Failed to reap the " + string(TCP_CHECK_COMMAND) + " process"); |
| } |
| |
| int exitCode = status->get(); |
| |
| const Future<string>& commandOutput = std::get<1>(t); |
| if (commandOutput.isReady()) { |
| VLOG(1) << string(TCP_CHECK_COMMAND) << ": " << commandOutput.get(); |
| } |
| |
| if (exitCode != 0) { |
| const Future<string>& commandError = std::get<2>(t); |
| if (commandError.isReady()) { |
| VLOG(1) << string(TCP_CHECK_COMMAND) << ": " << commandError.get(); |
| } |
| } |
| |
| // Non-zero exit code of TCP_CHECK_COMMAND can mean configuration problem |
| // (e.g., bad command flag), system error (e.g., a socket cannot be |
| // created), or actually a failed connection. We cannot distinguish between |
| // these cases, hence treat all of them as connection failure. |
| return (exitCode == 0 ? true : false); |
| } |
| |
| |
| void CheckerProcess::processTcpCheckResult( |
| const Stopwatch& stopwatch, |
| const Future<bool>& future) |
| { |
| CHECK(!future.isPending()); |
| |
| Result<CheckStatusInfo> result = None(); |
| |
| if (future.isReady()) { |
| VLOG(1) << name << " for task '" << taskId << "'" |
| << " returned: " << future.get(); |
| |
| CheckStatusInfo checkStatusInfo; |
| checkStatusInfo.set_type(check.type()); |
| checkStatusInfo.mutable_tcp()->set_succeeded(future.get()); |
| |
| result = Result<CheckStatusInfo>(checkStatusInfo); |
| } else if (future.isDiscarded()) { |
| // Check's status is currently not available due to a transient error, |
| // e.g., due to the agent failover, no `CheckStatusInfo` message should |
| // be sent to the callback. |
| result = None(); |
| } else { |
| result = Result<CheckStatusInfo>(Error(future.failure())); |
| } |
| |
| processCheckResult(stopwatch, result); |
| } |
| |
| } // namespace checks { |
| } // namespace internal { |
| } // namespace mesos { |