blob: 463376e296b3e2761e967185917291320db8e9d8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "master/allocator/mesos/metrics.hpp"
#include <string>
#include <mesos/quota/quota.hpp>
#include <process/metrics/pull_gauge.hpp>
#include <process/metrics/push_gauge.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/hashmap.hpp>
#include "common/protobuf_utils.hpp"
#include "master/metrics.hpp"
#include "master/allocator/mesos/hierarchical.hpp"
using std::string;
using process::metrics::PullGauge;
using process::metrics::PushGauge;
namespace mesos {
namespace internal {
namespace master {
namespace allocator {
namespace internal {
template<class Quantities>
void QuotaMetrics::update(const string& role, const Quantities& quantities)
{
hashmap<string, PushGauge>& gauges = metrics[role];
hashset<string> namesToRemove = gauges.keys();
for (const auto& quantity : quantities) {
const string& name = quantity.first;
const double value = quantity.second.value();
namesToRemove.erase(name);
if (gauges.contains(name)) {
gauges.at(name) = value;
continue;
}
PushGauge newGauge(
"allocator/mesos/quota/roles/" + role + "/resources/" + name + suffix);
newGauge = value;
process::metrics::add(newGauge);
gauges.put(name, newGauge);
}
for (const string& name : namesToRemove) {
process::metrics::remove(gauges.at(name));
gauges.erase(name);
}
if (gauges.empty()) {
metrics.erase(role);
}
}
QuotaMetrics::~QuotaMetrics() {
foreachkey(const string& role, metrics) {
foreachvalue(const PushGauge& gauge, metrics.at(role)) {
process::metrics::remove(gauge);
}
}
}
Metrics::Metrics(const HierarchicalAllocatorProcess& _allocator)
: allocator(_allocator.self()),
event_queue_dispatches(
"allocator/mesos/event_queue_dispatches",
process::defer(
allocator, &HierarchicalAllocatorProcess::_event_queue_dispatches)),
event_queue_dispatches_(
"allocator/event_queue_dispatches",
process::defer(
allocator, &HierarchicalAllocatorProcess::_event_queue_dispatches)),
allocation_runs("allocator/mesos/allocation_runs"),
allocation_run("allocator/mesos/allocation_run", Hours(1)),
allocation_run_latency("allocator/mesos/allocation_run_latency", Hours(1))
{
process::metrics::add(event_queue_dispatches);
process::metrics::add(event_queue_dispatches_);
process::metrics::add(allocation_runs);
process::metrics::add(allocation_run);
process::metrics::add(allocation_run_latency);
// Create and install gauges for the total and allocated
// amount of standard scalar resources.
//
// TODO(bbannier) Add support for more than just scalar resources.
// TODO(bbannier) Simplify this once MESOS-3214 is fixed.
// TODO(dhamon): Set these up dynamically when adding a slave based on the
// resources the slave exposes.
string resources[] = {"cpus", "mem", "disk"};
foreach (const string& resource, resources) {
PullGauge total(
"allocator/mesos/resources/" + resource + "/total",
defer(allocator,
&HierarchicalAllocatorProcess::_resources_total,
resource));
PullGauge offered_or_allocated(
"allocator/mesos/resources/" + resource + "/offered_or_allocated",
defer(allocator,
&HierarchicalAllocatorProcess::_resources_offered_or_allocated,
resource));
resources_total.push_back(total);
resources_offered_or_allocated.push_back(offered_or_allocated);
process::metrics::add(total);
process::metrics::add(offered_or_allocated);
}
}
Metrics::~Metrics()
{
process::metrics::remove(event_queue_dispatches);
process::metrics::remove(event_queue_dispatches_);
process::metrics::remove(allocation_runs);
process::metrics::remove(allocation_run);
process::metrics::remove(allocation_run_latency);
foreach (const PullGauge& gauge, resources_total) {
process::metrics::remove(gauge);
}
foreach (const PullGauge& gauge, resources_offered_or_allocated) {
process::metrics::remove(gauge);
}
foreachkey (const string& role, quota_allocated) {
foreachvalue (const PullGauge& gauge, quota_allocated.at(role)) {
process::metrics::remove(gauge);
}
}
foreachvalue (const PullGauge& gauge, offer_filters_active) {
process::metrics::remove(gauge);
}
}
void Metrics::updateQuota(const string& role, const Quota& quota)
{
quotaGuarantees.update(role, quota.guarantees);
quotaLimits.update(role, quota.limits);
// First remove the existing metrics.
foreachvalue (const PullGauge& gauge, quota_allocated[role]) {
process::metrics::remove(gauge);
}
quota_allocated.erase(role);
// This is the original quota "remove" case where the
// role's quota is set to the default. We used to not
// show the quota allocated_or_offered metric in this
// case, so we keep this behavior for now.
//
// TODO(asekretenko): move offered_or_allocated to PushGauge-based
// QuotaMetrics. This also solves the problem of exposing consumption of
// roles with a default quota.
if (quota == DEFAULT_QUOTA) {
return;
}
hashset<string> names;
foreach (auto& quantity, quota.guarantees) {
names.insert(quantity.first);
}
foreach (auto& quantity, quota.limits) {
names.insert(quantity.first);
}
foreach (const string& name, names) {
PullGauge offered_or_allocated(
"allocator/mesos/quota"
"/roles/" + role +
"/resources/" + name +
"/offered_or_allocated",
defer(allocator,
&HierarchicalAllocatorProcess::_quota_offered_or_allocated,
role,
name));
process::metrics::add(offered_or_allocated);
quota_allocated[role].put(name, offered_or_allocated);
}
}
void Metrics::updateConsumed(
const string& role, const ResourceQuantities& consumed)
{
quotaConsumed.update(role, consumed);
}
void Metrics::addRole(const string& role)
{
CHECK(!offer_filters_active.contains(role));
PullGauge gauge(
"allocator/mesos/offer_filters/roles/" + role + "/active",
defer(allocator,
&HierarchicalAllocatorProcess::_offer_filters_active,
role));
offer_filters_active.put(role, gauge);
process::metrics::add(gauge);
}
void Metrics::removeRole(const string& role)
{
Option<PullGauge> gauge = offer_filters_active.get(role);
CHECK_SOME(gauge);
offer_filters_active.erase(role);
process::metrics::remove(gauge.get());
}
FrameworkMetrics::FrameworkMetrics(
const FrameworkInfo& _frameworkInfo,
const bool _publishPerFrameworkMetrics)
: frameworkInfo(_frameworkInfo),
publishPerFrameworkMetrics(_publishPerFrameworkMetrics)
{
// TODO(greggomann): Calling `getRoles` below copies the roles from the
// framework info, which could become expensive if the number of roles grows
// large. Consider optimizing this.
foreach (
const string& role,
protobuf::framework::getRoles(frameworkInfo)) {
addSubscribedRole(role);
}
}
FrameworkMetrics::~FrameworkMetrics()
{
foreach (const string& role, suppressed.keys()) {
removeSubscribedRole(role);
}
CHECK(suppressed.empty());
}
void FrameworkMetrics::reviveRole(const string& role)
{
auto iter = suppressed.find(role);
CHECK(iter != suppressed.end());
iter->second = 0;
}
void FrameworkMetrics::suppressRole(const string& role)
{
auto iter = suppressed.find(role);
CHECK(iter != suppressed.end());
iter->second = 1;
}
void FrameworkMetrics::addSubscribedRole(const string& role)
{
auto result = suppressed.emplace(
role,
PushGauge(
getFrameworkMetricPrefix(frameworkInfo) + "roles/" +
role + "/suppressed"));
CHECK(result.second);
addMetric(result.first->second);
}
void FrameworkMetrics::removeSubscribedRole(const string& role)
{
auto iter = suppressed.find(role);
CHECK(iter != suppressed.end());
removeMetric(iter->second);
suppressed.erase(iter);
}
template <typename T>
void FrameworkMetrics::addMetric(const T& metric) {
if (publishPerFrameworkMetrics) {
process::metrics::add(metric);
}
}
template <typename T>
void FrameworkMetrics::removeMetric(const T& metric) {
if (publishPerFrameworkMetrics) {
process::metrics::remove(metric);
}
}
} // namespace internal {
} // namespace allocator {
} // namespace master {
} // namespace internal {
} // namespace mesos {