blob: e2b2baa5a45f5cdb4bab279edc405e1a0fc9331c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "checks/checker.hpp"
#include <cstdint>
#include <string>
#include <vector>
#include <glog/logging.h>
#include <mesos/mesos.hpp>
#include <mesos/type_utils.hpp>
#include <process/future.hpp>
#include <stout/exit.hpp>
#include <stout/option.hpp>
#include <stout/protobuf.hpp>
#include <stout/stopwatch.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include <stout/uuid.hpp>
#include <stout/variant.hpp>
#include "checks/checker_process.hpp"
#include "checks/checks_runtime.hpp"
#include "common/http.hpp"
#include "common/status_utils.hpp"
#include "common/validation.hpp"
namespace http = process::http;
using process::Future;
using process::Owned;
using std::string;
using std::vector;
namespace mesos {
namespace internal {
namespace checks {
// Creates a valid instance of `CheckStatusInfo` with the `type` set in
// accordance to the associated `CheckInfo`.
static CheckStatusInfo createEmptyCheckStatusInfo(const CheckInfo& checkInfo) {
CheckStatusInfo checkStatusInfo;
checkStatusInfo.set_type(checkInfo.type());
switch (checkInfo.type()) {
case CheckInfo::COMMAND: {
checkStatusInfo.mutable_command();
break;
}
case CheckInfo::HTTP: {
checkStatusInfo.mutable_http();
break;
}
case CheckInfo::TCP: {
checkStatusInfo.mutable_tcp();
break;
}
case CheckInfo::UNKNOWN: {
LOG(FATAL) << "Received UNKNOWN check type";
break;
}
}
return checkStatusInfo;
}
Try<Owned<Checker>> Checker::create(
const CheckInfo& check,
const string& launcherDir,
const lambda::function<void(const CheckStatusInfo&)>& callback,
const TaskID& taskId,
Variant<runtime::Plain, runtime::Docker, runtime::Nested> runtime)
{
// Validate the `CheckInfo` protobuf.
Option<Error> error = common::validation::validateCheckInfo(check);
if (error.isSome()) {
return error.get();
}
return Owned<Checker>(
new Checker(
check,
launcherDir,
callback,
taskId,
std::move(runtime)));
}
Checker::Checker(
const CheckInfo& _check,
const string& _launcherDir,
const lambda::function<void(const CheckStatusInfo&)>& _callback,
const TaskID& _taskId,
Variant<runtime::Plain, runtime::Docker, runtime::Nested> _runtime)
: check(_check),
callback(_callback),
taskId(_taskId),
name(CheckInfo::Type_Name(check.type()) + " check"),
previousCheckStatus(createEmptyCheckStatusInfo(_check))
{
VLOG(1) << "Check configuration for task '" << taskId << "':"
<< " '" << jsonify(JSON::Protobuf(check)) << "'";
process.reset(
new CheckerProcess(
_check,
_launcherDir,
std::bind(&Checker::processCheckResult, this, lambda::_1),
_taskId,
name,
std::move(_runtime),
None(),
false));
spawn(process.get());
}
Checker::~Checker()
{
terminate(process.get());
wait(process.get());
}
void Checker::pause()
{
dispatch(process.get(), &CheckerProcess::pause);
}
void Checker::resume()
{
dispatch(process.get(), &CheckerProcess::resume);
}
void Checker::processCheckResult(const Try<CheckStatusInfo>& result) {
CheckStatusInfo checkStatusInfo;
if (result.isError()) {
LOG(WARNING) << name << " for task '" << taskId << "'"
<< " failed: " << result.error();
checkStatusInfo = createEmptyCheckStatusInfo(check);
} else {
checkStatusInfo = result.get();
}
// Trigger the callback if check info changes.
if (checkStatusInfo != previousCheckStatus) {
// We assume this is a local send, i.e., the checker library is not used
// in a binary external to the executor and hence can not exit before
// the data is sent to the executor.
callback(checkStatusInfo);
previousCheckStatus = checkStatusInfo;
}
}
} // namespace checks {
} // namespace internal {
} // namespace mesos {