blob: fff0aac504b4283a210f936e00c977fa60d88b3d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "checks/checker.hpp"
#include <cstdint>
#include <string>
#include <vector>
#include <glog/logging.h>
#include <mesos/mesos.hpp>
#include <mesos/type_utils.hpp>
#include <process/future.hpp>
#include <stout/exit.hpp>
#include <stout/option.hpp>
#include <stout/protobuf.hpp>
#include <stout/stopwatch.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include <stout/uuid.hpp>
#include "checks/checker_process.hpp"
#include "common/http.hpp"
#include "common/status_utils.hpp"
#include "common/validation.hpp"
namespace http = process::http;
using process::Future;
using process::Owned;
using std::string;
using std::vector;
namespace mesos {
namespace internal {
namespace checks {
// Creates a valid instance of `CheckStatusInfo` with the `type` set in
// accordance to the associated `CheckInfo`.
static CheckStatusInfo createEmptyCheckStatusInfo(const CheckInfo& checkInfo) {
CheckStatusInfo checkStatusInfo;
checkStatusInfo.set_type(checkInfo.type());
switch (checkInfo.type()) {
case CheckInfo::COMMAND: {
checkStatusInfo.mutable_command();
break;
}
case CheckInfo::HTTP: {
checkStatusInfo.mutable_http();
break;
}
case CheckInfo::TCP: {
checkStatusInfo.mutable_tcp();
break;
}
case CheckInfo::UNKNOWN: {
LOG(FATAL) << "Received UNKNOWN check type";
break;
}
}
return checkStatusInfo;
}
Try<Owned<Checker>> Checker::create(
const CheckInfo& check,
const string& launcherDir,
const lambda::function<void(const CheckStatusInfo&)>& callback,
const TaskID& taskId,
const Option<pid_t>& taskPid,
const vector<string>& namespaces)
{
// Validate the `CheckInfo` protobuf.
Option<Error> error = validation::checkInfo(check);
if (error.isSome()) {
return error.get();
}
return Owned<Checker>(
new Checker(
check,
launcherDir,
callback,
taskId,
taskPid,
namespaces,
None(),
None(),
None(),
false));
}
Try<Owned<Checker>> Checker::create(
const CheckInfo& check,
const string& launcherDir,
const lambda::function<void(const CheckStatusInfo&)>& callback,
const TaskID& taskId,
const ContainerID& taskContainerId,
const http::URL& agentURL,
const Option<string>& authorizationHeader)
{
// Validate the `CheckInfo` protobuf.
Option<Error> error = validation::checkInfo(check);
if (error.isSome()) {
return error.get();
}
return Owned<Checker>(
new Checker(
check,
launcherDir,
callback,
taskId,
None(),
{},
taskContainerId,
agentURL,
authorizationHeader,
true));
}
Checker::Checker(
const CheckInfo& _check,
const string& _launcherDir,
const lambda::function<void(const CheckStatusInfo&)>& _callback,
const TaskID& _taskId,
const Option<pid_t>& _taskPid,
const vector<string>& _namespaces,
const Option<ContainerID>& _taskContainerId,
const Option<http::URL>& _agentURL,
const Option<string>& _authorizationHeader,
bool _commandCheckViaAgent)
: check(_check),
callback(_callback),
name(CheckInfo::Type_Name(check.type()) + " check"),
taskId(_taskId),
previousCheckStatus(createEmptyCheckStatusInfo(_check))
{
VLOG(1) << "Check configuration for task '" << taskId << "':"
<< " '" << jsonify(JSON::Protobuf(check)) << "'";
process.reset(
new CheckerProcess(
_check,
_launcherDir,
std::bind(&Checker::processCheckResult, this, lambda::_1),
_taskId,
_taskPid,
_namespaces,
_taskContainerId,
_agentURL,
_authorizationHeader,
None(),
name,
_commandCheckViaAgent));
spawn(process.get());
}
Checker::~Checker()
{
terminate(process.get());
wait(process.get());
}
void Checker::pause()
{
dispatch(process.get(), &CheckerProcess::pause);
}
void Checker::resume()
{
dispatch(process.get(), &CheckerProcess::resume);
}
void Checker::processCheckResult(const Try<CheckStatusInfo>& result) {
CheckStatusInfo checkStatusInfo;
if (result.isError()) {
LOG(WARNING) << name << " for task '" << taskId << "'"
<< " failed: " << result.error();
checkStatusInfo = createEmptyCheckStatusInfo(check);
} else {
checkStatusInfo = result.get();
}
// Trigger the callback if check info changes.
if (checkStatusInfo != previousCheckStatus) {
// We assume this is a local send, i.e., the checker library is not used
// in a binary external to the executor and hence can not exit before
// the data is sent to the executor.
callback(checkStatusInfo);
previousCheckStatus = checkStatusInfo;
}
}
namespace validation {
Option<Error> checkInfo(const CheckInfo& checkInfo)
{
if (!checkInfo.has_type()) {
return Error("CheckInfo must specify 'type'");
}
switch (checkInfo.type()) {
case CheckInfo::COMMAND: {
if (!checkInfo.has_command()) {
return Error("Expecting 'command' to be set for COMMAND check");
}
const CommandInfo& command = checkInfo.command().command();
if (!command.has_value()) {
string commandType =
(command.shell() ? "'shell command'" : "'executable path'");
return Error("Command check must contain " + commandType);
}
Option<Error> error =
common::validation::validateCommandInfo(command);
if (error.isSome()) {
return Error(
"Check's `CommandInfo` is invalid: " + error->message);
}
// TODO(alexr): Make sure irrelevant fields, e.g., `uris` are not set.
break;
}
case CheckInfo::HTTP: {
if (!checkInfo.has_http()) {
return Error("Expecting 'http' to be set for HTTP check");
}
const CheckInfo::Http& http = checkInfo.http();
if (http.has_path() && !strings::startsWith(http.path(), '/')) {
return Error(
"The path '" + http.path() + "' of HTTP check must start with '/'");
}
break;
}
case CheckInfo::TCP: {
if (!checkInfo.has_tcp()) {
return Error("Expecting 'tcp' to be set for TCP check");
}
break;
}
case CheckInfo::UNKNOWN: {
return Error(
"'" + CheckInfo::Type_Name(checkInfo.type()) + "'"
" is not a valid check type");
}
}
if (checkInfo.has_delay_seconds() && checkInfo.delay_seconds() < 0.0) {
return Error("Expecting 'delay_seconds' to be non-negative");
}
if (checkInfo.has_interval_seconds() && checkInfo.interval_seconds() < 0.0) {
return Error("Expecting 'interval_seconds' to be non-negative");
}
if (checkInfo.has_timeout_seconds() && checkInfo.timeout_seconds() < 0.0) {
return Error("Expecting 'timeout_seconds' to be non-negative");
}
return None();
}
Option<Error> checkStatusInfo(const CheckStatusInfo& checkStatusInfo)
{
if (!checkStatusInfo.has_type()) {
return Error("CheckStatusInfo must specify 'type'");
}
switch (checkStatusInfo.type()) {
case CheckInfo::COMMAND: {
if (!checkStatusInfo.has_command()) {
return Error(
"Expecting 'command' to be set for COMMAND check's status");
}
break;
}
case CheckInfo::HTTP: {
if (!checkStatusInfo.has_http()) {
return Error("Expecting 'http' to be set for HTTP check's status");
}
break;
}
case CheckInfo::TCP: {
if (!checkStatusInfo.has_tcp()) {
return Error("Expecting 'tcp' to be set for TCP check's status");
}
break;
}
case CheckInfo::UNKNOWN: {
return Error(
"'" + CheckInfo::Type_Name(checkStatusInfo.type()) + "'"
" is not a valid check's status type");
}
}
return None();
}
} // namespace validation {
} // namespace checks {
} // namespace internal {
} // namespace mesos {