blob: 881275693e67f3c9fb670c7e70cb5014090ed7a5 [file] [log] [blame]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#include <gtest/gtest.h>
#include <map>
#include <string>
#include <vector>
#include <stout/base64.hpp>
#include <stout/duration.hpp>
#include <stout/gtest.hpp>
#include <process/authenticator.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gtest.hpp>
#include <process/http.hpp>
#include <process/process.hpp>
#include <process/statistics.hpp>
#include <process/time.hpp>
#include <process/metrics/counter.hpp>
#include <process/metrics/metrics.hpp>
#include <process/metrics/pull_gauge.hpp>
#include <process/metrics/push_gauge.hpp>
#include <process/metrics/timer.hpp>
namespace authentication = process::http::authentication;
namespace http = process::http;
namespace metrics = process::metrics;
using authentication::Authenticator;
using authentication::BasicAuthenticator;
using http::BadRequest;
using http::OK;
using http::Response;
using http::Unauthorized;
using metrics::Counter;
using metrics::PullGauge;
using metrics::PushGauge;
using metrics::Timer;
using process::Clock;
using process::Failure;
using process::Future;
using process::PID;
using process::Process;
using process::Promise;
using process::READONLY_HTTP_AUTHENTICATION_REALM;
using process::Statistics;
using process::UPID;
using std::map;
using std::string;
using std::vector;
class PullGaugeProcess : public Process<PullGaugeProcess>
{
public:
double get()
{
return 42.0;
}
Future<double> fail()
{
return Failure("failure");
}
Future<double> pending()
{
return promise.future();
}
// Need to use a promise for the call to pending instead of just a
// `Future<double>()` so we don't return an abandoned future.
Promise<double> promise;
};
// TODO(greggomann): Move this into a base class in 'mesos.hpp'.
class MetricsTest : public ::testing::Test
{
protected:
Future<Nothing> setAuthenticator(
const string& realm,
process::Owned<Authenticator> authenticator)
{
realms.insert(realm);
return authentication::setAuthenticator(realm, authenticator);
}
void TearDown() override
{
foreach (const string& realm, realms) {
// We need to wait in order to ensure that the operation
// completes before we leave TearDown. Otherwise, we may
// leak a mock object.
AWAIT_READY(authentication::unsetAuthenticator(realm));
}
realms.clear();
}
private:
hashset<string> realms;
};
TEST_F(MetricsTest, Counter)
{
Counter counter("test/counter");
AWAIT_READY(metrics::add(counter));
AWAIT_EXPECT_EQ(0.0, counter.value());
++counter;
AWAIT_EXPECT_EQ(1.0, counter.value());
counter++;
AWAIT_EXPECT_EQ(2.0, counter.value());
counter.reset();
AWAIT_EXPECT_EQ(0.0, counter.value());
counter += 42;
AWAIT_EXPECT_EQ(42.0, counter.value());
EXPECT_NONE(counter.statistics());
AWAIT_READY(metrics::remove(counter));
}
TEST_F(MetricsTest, PullGauge)
{
PullGaugeProcess process;
PID<PullGaugeProcess> pid = spawn(&process);
ASSERT_TRUE(pid);
// PullGauge with a value.
PullGauge gauge("test/gauge", defer(pid, &PullGaugeProcess::get));
AWAIT_READY(metrics::add(gauge));
AWAIT_EXPECT_EQ(42.0, gauge.value());
AWAIT_READY(metrics::remove(gauge));
// Failing gauge.
gauge = PullGauge("test/failedgauge", defer(pid, &PullGaugeProcess::fail));
AWAIT_READY(metrics::add(gauge));
AWAIT_EXPECT_FAILED(gauge.value());
AWAIT_READY(metrics::remove(gauge));
terminate(process);
wait(process);
}
TEST_F(MetricsTest, PushGauge)
{
// Gauge with a value.
PushGauge gauge("test/gauge");
AWAIT_READY(metrics::add(gauge));
AWAIT_EXPECT_EQ(0.0, gauge.value());
++gauge;
AWAIT_EXPECT_EQ(1.0, gauge.value());
gauge += 42;
AWAIT_EXPECT_EQ(43.0, gauge.value());
--gauge;
AWAIT_EXPECT_EQ(42.0, gauge.value());
gauge -= 42;
AWAIT_EXPECT_EQ(0.0, gauge.value());
gauge = 42;
AWAIT_EXPECT_EQ(42.0, gauge.value());
EXPECT_NONE(gauge.statistics());
AWAIT_READY(metrics::remove(gauge));
}
TEST_F(MetricsTest, Statistics)
{
Counter counter("test/counter", process::TIME_SERIES_WINDOW);
// We have to pause the clock to ensure the time series
// entries are unique.
Clock::pause();
AWAIT_READY(metrics::add(counter));
for (size_t i = 0; i < 10; ++i) {
Clock::advance(Seconds(1));
++counter;
}
Option<Statistics<double>> statistics = counter.statistics();
EXPECT_SOME(statistics);
EXPECT_EQ(11u, statistics->count);
EXPECT_DOUBLE_EQ(0.0, statistics->min);
EXPECT_DOUBLE_EQ(10.0, statistics->max);
EXPECT_DOUBLE_EQ(5.0, statistics->p50);
EXPECT_DOUBLE_EQ(9.0, statistics->p90);
EXPECT_DOUBLE_EQ(9.5, statistics->p95);
EXPECT_DOUBLE_EQ(9.9, statistics->p99);
EXPECT_DOUBLE_EQ(9.99, statistics->p999);
EXPECT_DOUBLE_EQ(9.999, statistics->p9999);
AWAIT_READY(metrics::remove(counter));
}
TEST_F(MetricsTest, THREADSAFE_Snapshot)
{
UPID upid("metrics", process::address());
Clock::pause();
// Add a pull-gauge and a counter.
PullGaugeProcess process;
PID<PullGaugeProcess> pid = spawn(&process);
ASSERT_TRUE(pid);
PullGauge gauge("test/gauge", defer(pid, &PullGaugeProcess::get));
PullGauge gaugeFail("test/gauge_fail", defer(pid, &PullGaugeProcess::fail));
PullGauge gaugeConst("test/gauge_const", []() { return 99.0; });
Counter counter("test/counter");
AWAIT_READY(metrics::add(gauge));
AWAIT_READY(metrics::add(gaugeFail));
AWAIT_READY(metrics::add(gaugeConst));
AWAIT_READY(metrics::add(counter));
// Advance the clock to avoid rate limit.
Clock::advance(Seconds(1));
// Get the snapshot.
Future<Response> response = http::get(upid, "snapshot");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
// Parse the response.
Try<JSON::Object> responseJSON = JSON::parse<JSON::Object>(response->body);
ASSERT_SOME(responseJSON);
map<string, JSON::Value> values = responseJSON->values;
EXPECT_EQ(1u, values.count("test/counter"));
EXPECT_DOUBLE_EQ(0.0, values["test/counter"].as<JSON::Number>().as<double>());
EXPECT_EQ(1u, values.count("test/gauge"));
EXPECT_DOUBLE_EQ(42.0, values["test/gauge"].as<JSON::Number>().as<double>());
EXPECT_EQ(1u, values.count("test/gauge_const"));
EXPECT_DOUBLE_EQ(
99.0, values["test/gauge_const"].as<JSON::Number>().as<double>());
EXPECT_EQ(0u, values.count("test/gauge_fail"));
// Remove the metrics and ensure they are no longer in the snapshot.
AWAIT_READY(metrics::remove(gauge));
AWAIT_READY(metrics::remove(gaugeFail));
AWAIT_READY(metrics::remove(gaugeConst));
AWAIT_READY(metrics::remove(counter));
// Advance the clock to avoid rate limit.
Clock::advance(Seconds(1));
// Ensure MetricsProcess has removed the metrics.
Clock::settle();
response = http::get(upid, "snapshot");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
// Parse the response.
responseJSON = JSON::parse<JSON::Object>(response->body);
ASSERT_SOME(responseJSON);
values = responseJSON->values;
EXPECT_EQ(0u, values.count("test/counter"));
EXPECT_EQ(0u, values.count("test/gauge"));
EXPECT_EQ(0u, values.count("test/gauge_fail"));
EXPECT_EQ(0u, values.count("test/gauge_const"));
terminate(process);
wait(process);
}
// Ensure the response string has the JSON keys sorted
// alphabetically, we do this for easier human consumption.
TEST_F(MetricsTest, SnapshotAlphabetical)
{
UPID upid("metrics", process::address());
Clock::pause();
vector<Counter> counters = {
Counter("test/f"),
Counter("test/e"),
Counter("test/d"),
Counter("test/c"),
Counter("test/b"),
Counter("test/a"),
};
foreach (const Counter& counter, counters) {
AWAIT_READY(metrics::add(counter));
}
// Advance the clock to avoid rate limit.
Clock::advance(Seconds(1));
// Get the snapshot.
Future<Response> response = http::get(upid, "snapshot");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
// Ensure the response is ordered alphabetically.
EXPECT_LT(response->body.find("test/e"),
response->body.find("test/f"));
EXPECT_LT(response->body.find("test/d"),
response->body.find("test/e"));
EXPECT_LT(response->body.find("test/c"),
response->body.find("test/d"));
EXPECT_LT(response->body.find("test/b"),
response->body.find("test/c"));
EXPECT_LT(response->body.find("test/a"),
response->body.find("test/b"));
foreach (const Counter& counter, counters) {
AWAIT_READY(metrics::remove(counter));
}
}
TEST_F(MetricsTest, THREADSAFE_SnapshotTimeout)
{
UPID upid("metrics", process::address());
Clock::pause();
// Advance the clock to avoid rate limit.
Clock::advance(Seconds(1));
// Ensure the timeout parameter is validated.
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
BadRequest().status,
http::get(upid, "snapshot", "timeout=foobar"));
// Advance the clock to avoid rate limit.
Clock::advance(Seconds(1));
// Add pull-gauges and a counter.
PullGaugeProcess process;
PID<PullGaugeProcess> pid = spawn(&process);
ASSERT_TRUE(pid);
PullGauge gauge(
"test/gauge",
defer(pid, &PullGaugeProcess::get));
PullGauge gaugeFail(
"test/gauge_fail",
defer(pid, &PullGaugeProcess::fail));
PullGauge gaugeTimeout(
"test/gauge_timeout",
defer(pid, &PullGaugeProcess::pending));
Counter counter(
"test/counter");
AWAIT_READY(metrics::add(gauge));
AWAIT_READY(metrics::add(gaugeFail));
AWAIT_READY(metrics::add(gaugeTimeout));
AWAIT_READY(metrics::add(counter));
// Advance the clock to avoid rate limit.
Clock::advance(Seconds(1));
// Get the snapshot.
Future<Response> response = http::get(upid, "snapshot", "timeout=2secs");
// Make sure the request is pending before the timeout is exceeded.
//
// TODO(neilc): Replace the `sleep` here with a less flaky
// synchronization method.
os::sleep(Milliseconds(10));
Clock::settle();
ASSERT_TRUE(response.isPending());
// Advance the clock to trigger the timeout.
Clock::advance(Seconds(2));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
// Parse the response.
Try<JSON::Object> responseJSON = JSON::parse<JSON::Object>(response->body);
ASSERT_SOME(responseJSON);
// We can't use simple JSON equality testing here as initializing
// libprocess adds metrics to the system. We want to only check if
// the metrics from this test are correctly handled.
map<string, JSON::Value> values = responseJSON->values;
EXPECT_EQ(1u, values.count("test/counter"));
EXPECT_DOUBLE_EQ(0.0, values["test/counter"].as<JSON::Number>().as<double>());
EXPECT_EQ(1u, values.count("test/gauge"));
EXPECT_DOUBLE_EQ(42.0, values["test/gauge"].as<JSON::Number>().as<double>());
EXPECT_EQ(0u, values.count("test/gauge_fail"));
EXPECT_EQ(0u, values.count("test/gauge_timeout"));
// Remove the metrics and ensure they are no longer in the snapshot.
AWAIT_READY(metrics::remove(gauge));
AWAIT_READY(metrics::remove(gaugeFail));
AWAIT_READY(metrics::remove(gaugeTimeout));
AWAIT_READY(metrics::remove(counter));
// Advance the clock to avoid rate limit.
Clock::advance(Seconds(1));
// Ensure MetricsProcess has removed the metrics.
Clock::settle();
response = http::get(upid, "snapshot", "timeout=2secs");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
// Parse the response.
responseJSON = JSON::parse<JSON::Object>(response->body);
ASSERT_SOME(responseJSON);
values = responseJSON->values;
ASSERT_SOME(responseJSON);
EXPECT_EQ(0u, values.count("test/counter"));
EXPECT_EQ(0u, values.count("test/gauge"));
EXPECT_EQ(0u, values.count("test/gauge_fail"));
EXPECT_EQ(0u, values.count("test/gauge_timeout"));
terminate(process);
wait(process);
}
// Ensures that the aggregate statistics are correct in the snapshot.
TEST_F(MetricsTest, SnapshotStatistics)
{
UPID upid("metrics", process::address());
Clock::pause();
Counter counter("test/counter", process::TIME_SERIES_WINDOW);
AWAIT_READY(metrics::add(counter));
for (size_t i = 0; i < 10; ++i) {
Clock::advance(Seconds(1));
++counter;
}
// We can't use simple JSON equality testing here as initializing
// libprocess adds metrics to the system. We want to only check if
// the metrics from this test are correctly handled.
hashmap<string, double> expected;
expected["test/counter"] = 10.0;
expected["test/counter/count"] = 11;
expected["test/counter/min"] = 0.0;
expected["test/counter/max"] = 10.0;
expected["test/counter/p50"] = 5.0;
expected["test/counter/p90"] = 9.0;
expected["test/counter/p95"] = 9.5;
expected["test/counter/p99"] = 9.9;
expected["test/counter/p999"] = 9.99;
expected["test/counter/p9999"] = 9.999;
Future<Response> response = http::get(upid, "snapshot");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
Try<JSON::Object> responseJSON = JSON::parse<JSON::Object>(response->body);
ASSERT_SOME(responseJSON);
// Pull the response values into a map.
hashmap<string, double> responseValues;
foreachpair (const string& key,
const JSON::Value& value,
responseJSON->values) {
if (value.is<JSON::Number>()) {
// "test/counter/count" is an integer, everything else is a double.
JSON::Number number = value.as<JSON::Number>();
responseValues[key] = number.as<double>();
}
}
// Ensure the expected keys are in the response and that the values match
// expectations.
foreachkey (const string& key, expected) {
EXPECT_DOUBLE_EQ(expected[key], responseValues[key]);
}
AWAIT_READY(metrics::remove(counter));
}
TEST_F(MetricsTest, Timer)
{
metrics::Timer<Nanoseconds> timer("test/timer");
EXPECT_EQ("test/timer_ns", timer.name());
AWAIT_READY(metrics::add(timer));
// It is not an error to stop a timer that hasn't been started.
timer.stop();
// Time a no-op.
Clock::pause();
timer.start();
Clock::advance(Microseconds(1));
timer.stop();
Future<double> value = timer.value();
AWAIT_READY(value);
EXPECT_DOUBLE_EQ(value.get(), static_cast<double>(Microseconds(1).ns()));
// It is not an error to stop a timer that has already been stopped.
timer.stop();
AWAIT_READY(metrics::remove(timer));
}
static Future<int> advanceAndReturn()
{
Clock::advance(Seconds(1));
return 42;
}
TEST_F(MetricsTest, AsyncTimer)
{
metrics::Timer<Microseconds> t("test/timer");
EXPECT_EQ("test/timer_us", t.name());
AWAIT_READY(metrics::add(t));
// Time a Future that returns immediately. Even though the method
// advances the clock and we advance the clock here, the Future
// should be timed as if it takes 0 time. Ie, we're not timing the
// method call but the Future.
Clock::pause();
Future<int> result = t.time(advanceAndReturn());
Clock::advance(Seconds(1));
AWAIT_READY(result);
EXPECT_EQ(42, result.get());
// The future should have taken zero time.
AWAIT_READY(t.value());
EXPECT_DOUBLE_EQ(t.value().get(), 0.0);
AWAIT_READY(metrics::remove(t));
}
// Tests that the `/metrics/snapshot` endpoint rejects unauthenticated requests
// when HTTP authentication is enabled.
TEST_F(MetricsTest, THREADSAFE_SnapshotAuthenticationEnabled)
{
process::Owned<Authenticator> authenticator(
new BasicAuthenticator(
READONLY_HTTP_AUTHENTICATION_REALM, {{"foo", "bar"}}));
AWAIT_READY(
setAuthenticator(READONLY_HTTP_AUTHENTICATION_REALM, authenticator));
UPID upid("metrics", process::address());
Clock::pause();
// Advance the clock to avoid rate limit.
Clock::advance(Seconds(1));
// A request with no authentication header.
Future<Response> response = http::get(upid, "snapshot");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(Unauthorized({}).status, response);
}