blob: bdf9e214236e0403aac1a69efff09e93cf898917 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System.Linq;
using Org.Apache.REEF.Common.Metrics.Api;
using Org.Apache.REEF.Common.Metrics.MutableMetricsLayer;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Xunit;
namespace Org.Apache.REEF.Common.Tests.Metrics
{
/// <summary>
/// Tests different functionalities of <see cref="DefaultMetricsSourceImpl"/>
/// </summary>
public sealed class DefaultMetricSourceTests
{
/// <summary>
/// Tests <see cref="DefaultMetricsSourceConfiguration"/>
/// </summary>
[Fact]
public void TestMetricsSourceConfiguration()
{
const string evalId = "evalId";
const string taskId = "taskId";
const string sourceContext = "test";
const string recordName = "testrecord";
DefaultMetricsSourceImpl metricsSource = TangFactory.GetTang().NewInjector(
GenerateMetricsSourceConfiguration(evalId, taskId, sourceContext, recordName))
.GetInstance<DefaultMetricsSourceImpl>();
Assert.NotNull(metricsSource);
MetricTestUtils.MetricsCollectorTestImpl collector = new MetricTestUtils.MetricsCollectorTestImpl();
metricsSource.GetMetrics(collector, true);
Assert.Equal(collector.CurrentRecordBuilder.Name, recordName);
Assert.Equal(collector.CurrentRecordBuilder.Context, sourceContext);
collector.CurrentRecordBuilder.Validate("TaskOrContextName", taskId);
collector.CurrentRecordBuilder.Validate("EvaluatorId", evalId);
}
/// <summary>
/// Tests creating, accessing and updating counters from <see cref="DefaultMetricsSourceImpl"/>
/// </summary>
[Fact]
public void TestDefaultMetricsSourceCounters()
{
const string evalId = "evalId";
const string taskId = "taskId";
const string sourceContext = "test";
const string recordName = "testrecord";
string[] counterNames = { "cname1", "cname2" };
const long counterIncr = 2;
DefaultMetricsSourceImpl metricsSource = TangFactory.GetTang().NewInjector(
GenerateMetricsSourceConfiguration(evalId, taskId, sourceContext, recordName))
.GetInstance<DefaultMetricsSourceImpl>();
MetricTestUtils.MetricsCollectorTestImpl collector = new MetricTestUtils.MetricsCollectorTestImpl();
// Create Counters
foreach (var name in counterNames)
{
metricsSource.Counters.Create(name, name);
}
// Update counters.
int counter = 1;
foreach (var name in counterNames)
{
metricsSource.Counters[name].Increment(counter * counterIncr);
counter++;
}
metricsSource.GetMetrics(collector, true);
var rb = collector.CurrentRecordBuilder;
// Validate counters
counter = 1;
foreach (var name in counterNames)
{
rb.Validate(name, counter * counterIncr);
counter++;
}
}
/// <summary>
/// Tests creating, accessing and updating long gauge from <see cref="DefaultMetricsSourceImpl"/>
/// </summary>
[Fact]
public void TestDefaultMetricsSourceLongGauges()
{
const string evalId = "evalId";
const string taskId = "taskId";
const string sourceContext = "test";
const string recordName = "testrecord";
string[] longGaugeNames = { "lname1", "lname2" };
const long longGaugeIncr = 3;
DefaultMetricsSourceImpl metricsSource = TangFactory.GetTang().NewInjector(
GenerateMetricsSourceConfiguration(evalId, taskId, sourceContext, recordName))
.GetInstance<DefaultMetricsSourceImpl>();
MetricTestUtils.MetricsCollectorTestImpl collector = new MetricTestUtils.MetricsCollectorTestImpl();
// Create long gauge
foreach (var name in longGaugeNames)
{
metricsSource.LongGauges.Create(name, name);
}
// Update long gauge
int counter = 1;
foreach (var name in longGaugeNames)
{
metricsSource.LongGauges[name].Increment(counter * longGaugeIncr);
counter++;
}
metricsSource.GetMetrics(collector, true);
var rb = collector.CurrentRecordBuilder;
// Validate long gauges
counter = 1;
foreach (var name in longGaugeNames)
{
rb.Validate(name, counter * longGaugeIncr);
counter++;
}
}
/// <summary>
/// Tests creating, accessing and updating double gauge from <see cref="DefaultMetricsSourceImpl"/>
/// </summary>
[Fact]
public void TestDefaultMetricsSourceDoubleGauges()
{
const string evalId = "evalId";
const string taskId = "taskId";
const string sourceContext = "test";
const string recordName = "testrecord";
string[] doubleGaugeNames = { "dname1", "dname2" };
const double doubleGaugeDecr = 1.2;
DefaultMetricsSourceImpl metricsSource = TangFactory.GetTang().NewInjector(
GenerateMetricsSourceConfiguration(evalId, taskId, sourceContext, recordName))
.GetInstance<DefaultMetricsSourceImpl>();
MetricTestUtils.MetricsCollectorTestImpl collector = new MetricTestUtils.MetricsCollectorTestImpl();
// Create double gauge
foreach (var name in doubleGaugeNames)
{
metricsSource.DoubleGauges.Create(name, name);
}
// Update double gauge
int counter = 1;
foreach (var name in doubleGaugeNames)
{
metricsSource.DoubleGauges[name].Decrement(counter * doubleGaugeDecr);
counter++;
}
metricsSource.GetMetrics(collector, true);
var rb = collector.CurrentRecordBuilder;
// Validate double gauges
counter = 1;
foreach (var name in doubleGaugeNames)
{
rb.Validate(name, -counter * doubleGaugeDecr, 1e-10);
counter++;
}
}
/// <summary>
/// Tests creating, accessing and updating rate from <see cref="DefaultMetricsSourceImpl"/>
/// </summary>
[Fact]
public void TestDefaultMetricsSourceRate()
{
const string evalId = "evalId";
const string taskId = "taskId";
const string sourceContext = "test";
const string recordName = "testrecord";
const string rateName = "rname1";
double[] samples = { 2, 3 };
DefaultMetricsSourceImpl metricsSource = TangFactory.GetTang().NewInjector(
GenerateMetricsSourceConfiguration(evalId, taskId, sourceContext, recordName))
.GetInstance<DefaultMetricsSourceImpl>();
MetricTestUtils.MetricsCollectorTestImpl collector = new MetricTestUtils.MetricsCollectorTestImpl();
// Create rate.
metricsSource.Rates.Create(rateName, rateName);
// Update rate
foreach (var sample in samples)
{
metricsSource.Rates[rateName].Sample(sample);
}
metricsSource.GetMetrics(collector, true);
var rb = collector.CurrentRecordBuilder;
// Validate rate
rb.Validate(rateName + "-Num", samples.Length);
rb.Validate(rateName + "-RunningAvg", samples.Sum() / samples.Length, 1e-10);
}
/// <summary>
/// Tests creating and accessing tags from <see cref="DefaultMetricsSourceImpl"/>
/// </summary>
[Fact]
public void TestDefaultMetricsSourceTags()
{
const string evalId = "evalId";
const string taskId = "taskId";
const string sourceContext = "test";
const string recordName = "testrecord";
string[] tagNames = { "tname1", "tname2" };
string[] tagValues = { "tvalue1", "tValue2" };
DefaultMetricsSourceImpl metricsSource = TangFactory.GetTang().NewInjector(
GenerateMetricsSourceConfiguration(evalId, taskId, sourceContext, recordName))
.GetInstance<DefaultMetricsSourceImpl>();
MetricTestUtils.MetricsCollectorTestImpl collector = new MetricTestUtils.MetricsCollectorTestImpl();
// Create tags
for (int i = 0; i < tagNames.Length; i++)
{
metricsSource.AddTag(tagNames[i], tagNames[i], tagValues[i]);
}
metricsSource.GetMetrics(collector, true);
var rb = collector.CurrentRecordBuilder;
// Validate tags
for (int i = 0; i < tagNames.Length; i++)
{
rb.Validate(tagNames[i], tagValues[i]);
}
// Verify GetTag functionality
var tag = metricsSource.GetTag(tagNames[0]);
Assert.NotNull(tag);
Assert.Equal(tag.Name, tagNames[0]);
Assert.Equal(tag.Value, tagValues[0]);
string inValidName = "invalid";
tag = metricsSource.GetTag(inValidName);
Assert.Null(tag);
}
/// <summary>
/// Tests whether we can externally subscribe metrics to <see cref="DefaultMetricsSourceImpl"/>
/// and then unsubscribe.
/// </summary>
[Fact]
public void TestDefaultMetricsSourceExternalMetricsSubscription()
{
const string evalId = "evalId";
const string taskId = "taskId";
const string sourceContext = "test";
const string recordName = "testrecord";
const string subscribedCounterName = "subscribedcounter";
const long subscribedCounterValue = 1001;
DefaultMetricsSourceImpl metricsSource = TangFactory.GetTang().NewInjector(
GenerateMetricsSourceConfiguration(evalId, taskId, sourceContext, recordName))
.GetInstance<DefaultMetricsSourceImpl>();
IMetricsFactory factory = TangFactory.GetTang().NewInjector().GetInstance<IMetricsFactory>();
MetricTestUtils.MetricsCollectorTestImpl collector = new MetricTestUtils.MetricsCollectorTestImpl();
var extraCounter = factory.CreateCounter(subscribedCounterName,
subscribedCounterName,
subscribedCounterValue);
extraCounter.Subscribe(metricsSource);
metricsSource.GetMetrics(collector, true);
var rb = collector.CurrentRecordBuilder;
rb.Validate(subscribedCounterName, subscribedCounterValue);
extraCounter.Increment();
extraCounter.OnCompleted(); // This should remove the counter from the metrics source.
metricsSource.GetMetrics(collector, true);
rb = collector.CurrentRecordBuilder;
Assert.False(rb.LongKeyPresent(subscribedCounterName), "The counter should not be present now in the source");
}
/// <summary>
/// Tests whether exceptions are thrown while trying to access not created
/// metrics in <see cref="DefaultMetricsSourceImpl"/>
/// </summary>
[Fact]
public void TestDefaultMetricsSourceInvalidAccessExceptions()
{
const string evalId = "evalId";
const string taskId = "taskId";
const string sourceContext = "test";
const string recordName = "testrecord";
DefaultMetricsSourceImpl metricsSource = TangFactory.GetTang().NewInjector(
GenerateMetricsSourceConfiguration(evalId, taskId, sourceContext, recordName))
.GetInstance<DefaultMetricsSourceImpl>();
// Checks that exception is thrown while trying to get metrics that are not created.
string inValidName = "invalid";
Assert.Throws<MetricsException>(() => metricsSource.Counters[inValidName]);
Assert.Throws<MetricsException>(() => metricsSource.DoubleGauges[inValidName]);
Assert.Throws<MetricsException>(() => metricsSource.LongGauges[inValidName]);
Assert.Throws<MetricsException>(() => metricsSource.Rates[inValidName]);
}
/// <summary>
/// Tests whether exceptions are thrown while trying to recreate already
/// existing metrics in <see cref="DefaultMetricsSourceImpl"/>
/// </summary>
[Fact]
public void TestDefaultMetricsSourceRecreationExceptions()
{
const string evalId = "evalId";
const string taskId = "taskId";
const string sourceContext = "test";
const string recordName = "testrecord";
string counterName = "cname1";
string longGaugeName = "lname1";
string doubleGaugeName = "dname1";
const string rateName = "rname1";
DefaultMetricsSourceImpl metricsSource = TangFactory.GetTang().NewInjector(
GenerateMetricsSourceConfiguration(evalId, taskId, sourceContext, recordName))
.GetInstance<DefaultMetricsSourceImpl>();
metricsSource.Counters.Create(counterName, counterName);
metricsSource.LongGauges.Create(longGaugeName, longGaugeName);
metricsSource.DoubleGauges.Create(doubleGaugeName, doubleGaugeName);
metricsSource.Rates.Create(rateName, rateName);
Assert.Throws<MetricsException>(() => metricsSource.Counters.Create(counterName, counterName));
Assert.Throws<MetricsException>(
() => metricsSource.DoubleGauges.Create(doubleGaugeName, doubleGaugeName));
Assert.Throws<MetricsException>(
() => metricsSource.LongGauges.Create(longGaugeName, longGaugeName));
Assert.Throws<MetricsException>(() => metricsSource.Rates.Create(rateName, rateName));
}
/// <summary>
/// Tests whether metrics are properly deleted from <see cref="DefaultMetricsSourceImpl"/>
/// </summary>
[Fact]
public void TestDefaultMetricsSourceMetricDeletion()
{
const string evalId = "evalId";
const string taskId = "taskId";
const string sourceContext = "test";
const string recordName = "testrecord";
string counterName = "cname1";
string longGaugeName = "lname1";
string doubleGaugeName = "dname1";
const string rateName = "rname1";
DefaultMetricsSourceImpl metricsSource = TangFactory.GetTang().NewInjector(
GenerateMetricsSourceConfiguration(evalId, taskId, sourceContext, recordName))
.GetInstance<DefaultMetricsSourceImpl>();
metricsSource.Counters.Create(counterName, counterName);
metricsSource.LongGauges.Create(longGaugeName, longGaugeName);
metricsSource.DoubleGauges.Create(doubleGaugeName, doubleGaugeName);
metricsSource.Rates.Create(rateName, rateName);
MetricTestUtils.MetricsCollectorTestImpl collector = new MetricTestUtils.MetricsCollectorTestImpl();
metricsSource.Counters.Delete(counterName);
Assert.Throws<MetricsException>(() => metricsSource.Counters[counterName]);
metricsSource.GetMetrics(collector, true);
var rb = collector.CurrentRecordBuilder;
Assert.False(rb.LongKeyPresent(counterName), "Counter was removed, so it should not be present");
}
/// <summary>
/// Verifies that record builder does not get old stats if the flag is set to false.
/// </summary>
[Fact]
public void TestDefaultMetricsSourceGetMetricsFlag()
{
const string evalId = "evalId";
const string taskId = "taskId";
const string sourceContext = "test";
const string recordName = "testrecord";
string[] counterNames = { "cname1", "cname2" };
DefaultMetricsSourceImpl metricsSource = TangFactory.GetTang().NewInjector(
GenerateMetricsSourceConfiguration(evalId, taskId, sourceContext, recordName))
.GetInstance<DefaultMetricsSourceImpl>();
MetricTestUtils.MetricsCollectorTestImpl collector = new MetricTestUtils.MetricsCollectorTestImpl();
// Create Counters
foreach (var name in counterNames)
{
metricsSource.Counters.Create(name, name);
}
metricsSource.GetMetrics(collector, true);
// Do not increment second counter.
metricsSource.Counters[counterNames[0]].Increment();
// Verifies that record builder does not get old stats if the flag is set to false.
// Only first counter should be present.
metricsSource.GetMetrics(collector, false);
var rb = collector.CurrentRecordBuilder;
Assert.True(rb.LongKeyPresent(counterNames[0]), "Counter was updated, so it should be present");
Assert.False(rb.LongKeyPresent(counterNames[1]), "Counter was not updated, so it should not be present");
}
/// <summary>
/// Tests inserting counters, gauges, rates in one go to <see cref="DefaultMetricsSourceImpl"/>
/// to make sure there are no across container bugs.
/// </summary>
[Fact]
public void TestDefaultMetricsSourceHetergeneousFunctionalities()
{
const string evalId = "evalId";
const string taskId = "taskId";
const string sourceContext = "test";
const string recordName = "testrecord";
string[] counterNames = { "cname1", "cname2" };
string[] longGaugeNames = { "lname1", "lname2" };
string[] doubleGaugeNames = { "dname1", "dname2" };
const string rateName = "rname1";
double[] samples = { 2, 3 };
string[] tagNames = { "tname1", "tname2" };
string[] tagValues = { "tvalue1", "tValue2" };
const long counterIncr = 2;
const long longGaugeIncr = 3;
const double doubleGaugeDecr = 1.2;
const string subscribedCounterName = "subscribedcounter";
const long subscribedCounterValue = 1001;
DefaultMetricsSourceImpl metricsSource = TangFactory.GetTang().NewInjector(
GenerateMetricsSourceConfiguration(evalId, taskId, sourceContext, recordName))
.GetInstance<DefaultMetricsSourceImpl>();
IMetricsFactory factory = TangFactory.GetTang().NewInjector().GetInstance<IMetricsFactory>();
MetricTestUtils.MetricsCollectorTestImpl collector = new MetricTestUtils.MetricsCollectorTestImpl();
// Create Counters
foreach (var name in counterNames)
{
metricsSource.Counters.Create(name, name);
}
// Create long gauge
foreach (var name in longGaugeNames)
{
metricsSource.LongGauges.Create(name, name);
}
// Create double gauge
foreach (var name in doubleGaugeNames)
{
metricsSource.DoubleGauges.Create(name, name);
}
// Create rate.
metricsSource.Rates.Create(rateName, rateName);
// Update counters.
int counter = 1;
foreach (var name in counterNames)
{
metricsSource.Counters[name].Increment(counter * counterIncr);
counter++;
}
// Update long gauge
counter = 1;
foreach (var name in longGaugeNames)
{
metricsSource.LongGauges[name].Increment(counter * longGaugeIncr);
counter++;
}
// Update double gauge
counter = 1;
foreach (var name in doubleGaugeNames)
{
metricsSource.DoubleGauges[name].Decrement(counter * doubleGaugeDecr);
counter++;
}
// Update rate
foreach (var sample in samples)
{
metricsSource.Rates[rateName].Sample(sample);
}
// Create tags
for (int i = 0; i < tagNames.Length; i++)
{
metricsSource.AddTag(tagNames[i], tagNames[i], tagValues[i]);
}
var extraCounter = factory.CreateCounter(subscribedCounterName,
subscribedCounterName,
subscribedCounterValue);
extraCounter.Subscribe(metricsSource);
metricsSource.GetMetrics(collector, true);
var rb = collector.CurrentRecordBuilder;
// Validate counters
counter = 1;
foreach (var name in counterNames)
{
rb.Validate(name, counter * counterIncr);
counter++;
}
rb.Validate(subscribedCounterName, subscribedCounterValue);
// Validate long gauges
counter = 1;
foreach (var name in longGaugeNames)
{
rb.Validate(name, counter * longGaugeIncr);
counter++;
}
// Validate double gauges
counter = 1;
foreach (var name in doubleGaugeNames)
{
rb.Validate(name, -counter * doubleGaugeDecr, 1e-10);
counter++;
}
// Validate tags
for (int i = 0; i < tagNames.Length; i++)
{
rb.Validate(tagNames[i], tagValues[i]);
}
// Validate rate
rb.Validate(rateName + "-Num", samples.Length);
rb.Validate(rateName + "-RunningAvg", samples.Sum() / samples.Length, 1e-10);
}
private static IConfiguration GenerateMetricsSourceConfiguration(string evalId, string taskId, string sourceContext, string recordName)
{
return
DefaultMetricsSourceConfiguration.ConfigurationModule.Set(DefaultMetricsSourceConfiguration.EvaluatorId,
evalId)
.Set(DefaultMetricsSourceConfiguration.TaskOrContextId, taskId)
.Set(DefaultMetricsSourceConfiguration.RecordId, recordName)
.Set(DefaultMetricsSourceConfiguration.SourceContext, sourceContext)
.Build();
}
}
}