[REEF-1732] Build Metrics System (#1460)
Enhance the capabilities of MetricService by adding the following functionality:
* Let the user specify the type of the metric, as opposed to being restricted to Counters.
* Let the user maintain a timeseries and keep track of the history of updates for a given metric.
* Add example implementations and functional tests to demonstrate usage.
JIRA: [REEF-1732](https://issues.apache.org/jira/browse/REEF-1732)
Closes #1460
diff --git a/lang/cs/Org.Apache.REEF.Common.Tests/Telemetry/CounterTests.cs b/lang/cs/Org.Apache.REEF.Common.Tests/Telemetry/CounterTests.cs
deleted file mode 100644
index 3973ad2..0000000
--- a/lang/cs/Org.Apache.REEF.Common.Tests/Telemetry/CounterTests.cs
+++ /dev/null
@@ -1,92 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using System.Collections.Generic;
-using Org.Apache.REEF.Common.Telemetry;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Xunit;
-
-namespace Org.Apache.REEF.Common.Tests.Telemetry
-{
- public class CounterTests
- {
- /// <summary>
- /// Test ICounters and IEvaluatorMetrics API.
- /// </summary>
- [Fact]
- public void TestEvaluatorMetrics()
- {
- var metrics = TangFactory.GetTang().NewInjector().GetInstance<IEvaluatorMetrics>();
- var counters = metrics.GetMetricsCounters();
- counters.TryRegisterCounter("counter1", "counter1 description");
- counters.TryRegisterCounter("counter2", "counter2 description");
- ValidateCounter(counters, "counter1", 0);
- ValidateCounter(counters, "counter2", 0);
-
- counters.Increment("counter1", 3);
- counters.Increment("counter1", 1);
- counters.Increment("counter2", 2);
- counters.Increment("counter2", 3);
- ValidateCounter(counters, "counter1", 4);
- ValidateCounter(counters, "counter2", 5);
-
- var counterStr = metrics.Serialize();
-
- var metrics2 = new EvaluatorMetrics(counterStr);
- var counters2 = metrics2.GetMetricsCounters();
- ValidateCounter(counters2, "counter1", 4);
- ValidateCounter(counters2, "counter2", 5);
- }
-
- /// <summary>
- /// Test TryRegisterCounter with a duplicated counter name
- /// </summary>
- [Fact]
- public void TestDuplicatedCounters()
- {
- var counters = CreateCounters();
- counters.TryRegisterCounter("counter1", "counter1 description");
- Assert.False(counters.TryRegisterCounter("counter1", "counter1 description"));
- }
-
- /// <summary>
- /// Test Increment for a non-registered counter.
- /// </summary>
- [Fact]
- public void TestNoExistCounter()
- {
- var counters = CreateCounters();
- Action increment = () => counters.Increment("counter1", 2);
- Assert.Throws<ApplicationException>(increment);
- }
-
- private static void ValidateCounter(ICounters counters, string name, int expectedValue)
- {
- ICounter c1;
- counters.TryGetValue(name, out c1);
- Assert.Equal(expectedValue, c1.Value);
- }
-
- private static ICounters CreateCounters()
- {
- var m = TangFactory.GetTang().NewInjector().GetInstance<IEvaluatorMetrics>();
- var c = m.GetMetricsCounters();
- return c;
- }
- }
-}
diff --git a/lang/cs/Org.Apache.REEF.Common.Tests/Telemetry/MetricTests.cs b/lang/cs/Org.Apache.REEF.Common.Tests/Telemetry/MetricTests.cs
new file mode 100644
index 0000000..1701d53
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common.Tests/Telemetry/MetricTests.cs
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Linq;
+using Org.Apache.REEF.Common.Telemetry;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Xunit;
+
+namespace Org.Apache.REEF.Common.Tests.Telemetry
+{
+ public class MetricsTests
+ {
+ /// <summary>
+ /// Creates and registers a counter metric without tracking to the Evaluator metrics,
+ /// alters the values, then serializes and deserializes.
+ /// </summary>
+ [Fact]
+ public void TestEvaluatorMetricsCountersOnly()
+ {
+ var evalMetrics1 = TangFactory.GetTang().NewInjector().GetInstance<IEvaluatorMetrics>();
+ var counter1 = evalMetrics1.CreateAndRegisterMetric<CounterMetric>(
+ "counter1",
+ "Counter1 description",
+ false);
+
+ for (int i = 0; i < 5; i++)
+ {
+ counter1.Increment();
+ }
+ var evalMetricsData = evalMetrics1.GetMetricsData();
+ ValidateMetric(evalMetricsData, "counter1", 5);
+
+ foreach (var t in evalMetricsData.GetMetricTrackers())
+ {
+ // Records for counter1 should be null because KeepUpdateHistory is false.
+ Assert.Null(t.GetMetricRecords());
+ }
+
+ var metricsStr = evalMetrics1.Serialize();
+ var evalMetrics2 = new EvaluatorMetrics(metricsStr);
+ Assert.False(evalMetrics2.GetMetricsData().TryGetMetric("counter1", out CounterMetric metricObj));
+ Assert.Null(metricObj);
+ }
+
+ /// <summary>
+ /// Tests updating metric value.
+ /// </summary>
+ [Fact]
+ public void TestMetricSetValue()
+ {
+ var evalMetrics = TangFactory.GetTang().NewInjector().GetInstance<IEvaluatorMetrics>();
+ var intMetric = evalMetrics.CreateAndRegisterMetric<IntegerMetric>(
+ "IntMetric",
+ "metric of type int",
+ true);
+ var doubleMetric = evalMetrics.CreateAndRegisterMetric<DoubleMetric>(
+ "DouMetric",
+ "metric of type double",
+ true);
+
+ var evalMetricsData = evalMetrics.GetMetricsData();
+ ValidateMetric(evalMetricsData, "IntMetric", default(int));
+ ValidateMetric(evalMetricsData, "DouMetric", default(double));
+
+ intMetric.AssignNewValue(3);
+ doubleMetric.AssignNewValue(3.0);
+ ValidateMetric(evalMetricsData, "IntMetric", 3);
+ ValidateMetric(evalMetricsData, "DouMetric", 3.0);
+ }
+
+ /// <summary>
+ /// Test TryRegisterCounter with a duplicated counter name
+ /// </summary>
+ [Fact]
+ public void TestDuplicatedNames()
+ {
+ var metrics = CreateMetrics();
+ metrics.RegisterMetric(new CounterMetric("metric1", "metric description"));
+ Assert.Throws<ArgumentException>(
+ () => metrics.RegisterMetric(new CounterMetric("metric1", "duplicate name")));
+ }
+
+ [Fact]
+ public void TestMetricsSimulateHeartbeat()
+ {
+ var evalMetrics1 = TangFactory.GetTang().NewInjector().GetInstance<IEvaluatorMetrics>();
+ evalMetrics1.CreateAndRegisterMetric<CounterMetric>("counter", "counter with no records", false);
+ evalMetrics1.CreateAndRegisterMetric<IntegerMetric>("iteration", "iteration with records", true);
+ Assert.True(evalMetrics1.TryGetMetric("counter", out CounterMetric counter));
+ Assert.True(evalMetrics1.TryGetMetric("iteration", out IntegerMetric iter));
+ for (int i = 1; i <= 5; i++)
+ {
+ counter.Increment();
+ iter.AssignNewValue(i);
+ }
+
+ var me1Str = evalMetrics1.Serialize();
+ var evalMetrics2 = new EvaluatorMetrics(me1Str);
+ var metricsData2 = evalMetrics2.GetMetricsData();
+
+ var sink = TangFactory.GetTang().NewInjector().GetInstance<IMetricsSink>();
+ sink.Sink(metricsData2.FlushMetricRecords());
+
+ foreach (var t in metricsData2.GetMetricTrackers())
+ {
+ Assert.Equal(0, t.GetMetricRecords().ToList().Count);
+ }
+ }
+
+ private static void ValidateMetric(IMetricSet metricSet, string name, object expectedValue)
+ {
+ Assert.True(metricSet.TryGetMetric(name, out IMetric metric));
+ Assert.Equal(expectedValue, metric.ValueUntyped);
+ }
+
+ private static IMetricSet CreateMetrics()
+ {
+ var m = TangFactory.GetTang().NewInjector().GetInstance<IEvaluatorMetrics>();
+ var c = m.GetMetricsData();
+ return c;
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/Counter.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/Counter.cs
deleted file mode 100644
index de46bae..0000000
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/Counter.cs
+++ /dev/null
@@ -1,133 +0,0 @@
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using System.Runtime.Serialization;
-using Newtonsoft.Json;
-using Org.Apache.REEF.Utilities.Attributes;
-
-namespace Org.Apache.REEF.Common.Telemetry
-{
- /// <summary>
- /// Counter implementation
- /// The properties that need to be serialized will be revisited later. We should only serialize minimum data to reduce the network load
- /// For example, the name can be mapped to a unique number (byte) and description should not be serialized.
- /// </summary>
- [Unstable("0.16", "This is a simple counter for evaluator metrics.")]
- [DataContract]
- internal sealed class Counter : ICounter
- {
- /// <summary>
- /// Name of the counter.
- /// </summary>
- private readonly string _name;
-
- /// <summary>
- /// Description of the counter.
- /// </summary>
- private readonly string _description;
-
- /// <summary>
- /// Time that the counter is updated.
- /// </summary>
- private long _timeStamp;
-
- /// <summary>
- /// Value of the counter.
- /// </summary>
- private int _value;
-
- /// <summary>
- /// Constructor to create a new counter.
- /// </summary>
- /// <param name="name"></param>
- /// <param name="description"></param>
- internal Counter(string name, string description)
- {
- _name = name;
- _description = description;
- _timeStamp = DateTime.Now.Ticks;
- _value = 0;
- }
-
- /// <summary>
- /// Constructor to create a counter from a serialized counter string
- /// </summary>
- [JsonConstructor]
- internal Counter(string name, string description, long timeStamp, int value)
- {
- _name = name;
- _description = description;
- _timeStamp = timeStamp;
- _value = value;
- }
-
- /// <summary>
- /// Description of the counter.
- /// </summary>
- [DataMember]
- public string Description
- {
- get
- {
- return _description;
- }
- }
-
- /// <summary>
- /// Name of the counter.
- /// </summary>
- [DataMember]
- public string Name
- {
- get
- {
- return _name;
- }
- }
-
- /// <summary>
- /// Time that the counter is updated in the form of ticks.
- /// </summary>
- [DataMember]
- public long Timestamp
- {
- get
- {
- return _timeStamp;
- }
- }
-
- /// <summary>
- /// Value of the counter.
- /// </summary>
- [DataMember]
- public int Value
- {
- get
- {
- return _value;
- }
- }
-
- /// <summary>
- /// Increase the counter value and update the time stamp.
- /// </summary>
- /// <param name="number"></param>
- public void Increment(int number)
- {
- _value += number;
- _timeStamp = DateTime.Now.Ticks;
- }
- }
-}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs
deleted file mode 100644
index 5f23262..0000000
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs
+++ /dev/null
@@ -1,77 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System.Collections.Generic;
-
-namespace Org.Apache.REEF.Common.Telemetry
-{
- /// <summary>
- /// This class wraps a Counter object and the increment value since last sink
- /// </summary>
- internal sealed class CounterData
- {
- /// <summary>
- /// Counter object
- /// </summary>
- private ICounter _counter;
-
- /// <summary>
- /// Counter increment value since last sink
- /// </summary>
- internal int IncrementSinceLastSink { get; private set; }
-
- /// <summary>
- /// Constructor for CounterData
- /// </summary>
- /// <param name="counter"></param>
- /// <param name="initialValue"></param>
- internal CounterData(ICounter counter, int initialValue)
- {
- _counter = counter;
- IncrementSinceLastSink = initialValue;
- }
-
- /// <summary>
- /// clear the increment since last sink
- /// </summary>
- internal void ResetSinceLastSink()
- {
- IncrementSinceLastSink = 0;
- }
-
- internal void UpdateCounter(ICounter counter)
- {
- IncrementSinceLastSink += counter.Value - _counter.Value;
-
- //// TODO: [REEF-1748] The following cases need to be considered in determine how to update the counter:
- //// if evaluator contains the aggregated values, the value will override existing value
- //// if evaluator only keep delta, the value should be added at here. But the value in the evaluator should be reset after message is sent
- //// For the counters from multiple evaluators with the same counter name, the value should be aggregated here
- //// We also need to consider failure cases.
- _counter = counter;
- }
-
- /// <summary>
- /// Get count name and value as KeyValuePair
- /// </summary>
- /// <returns></returns>
- internal KeyValuePair<string, string> GetKeyValuePair()
- {
- return new KeyValuePair<string, string>(_counter.Name, _counter.Value.ToString());
- }
- }
-}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterMetric.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterMetric.cs
new file mode 100644
index 0000000..d5c74eb
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterMetric.cs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Threading;
+using Newtonsoft.Json;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// Counter metric implementation.
+ /// </summary>
+ public sealed class CounterMetric : MetricBase<int>, ICounter
+ {
+ public CounterMetric()
+ {
+ }
+
+ internal CounterMetric(string name, string description, bool keepHistory = false)
+ : base(name, description, keepHistory)
+ {
+ }
+
+ public void Increment(int number = 1)
+ {
+ _tracker.Track(Interlocked.Add(ref _typedValue, number));
+ }
+
+ public void Decrement(int number = 1)
+ {
+ _tracker.Track(Interlocked.Add(ref _typedValue, -number));
+ }
+
+ public override void AssignNewValue(int value)
+ {
+ Interlocked.Exchange(ref _typedValue, value);
+ _tracker.Track(value);
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/Counters.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/Counters.cs
deleted file mode 100644
index d681ec6..0000000
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/Counters.cs
+++ /dev/null
@@ -1,142 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using System.Collections.Generic;
-using Newtonsoft.Json;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Attributes;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Common.Telemetry
-{
- [Unstable("0.16", "This is to build a collection of counters for evaluator metrics.")]
- internal sealed class Counters : ICounters
- {
- private static readonly Logger Logger = Logger.GetLogger(typeof(Counters));
-
- /// <summary>
- /// It contains name and count pairs
- /// </summary>
- private readonly IDictionary<string, ICounter> _counters = new Dictionary<string, ICounter>();
-
- /// <summary>
- /// The lock for counters
- /// </summary>
- private readonly object _counterLock = new object();
-
- [Inject]
- private Counters()
- {
- }
-
- /// <summary>
- /// Deserialize a counters serialized string into a Counters object
- /// </summary>
- /// <param name="serializedCountersString"></param>
- internal Counters(string serializedCountersString)
- {
- var c = JsonConvert.DeserializeObject<IEnumerable<Counter>>(serializedCountersString);
- foreach (var ct in c)
- {
- _counters.Add(ct.Name, ct);
- }
- }
-
- public IEnumerable<ICounter> GetCounters()
- {
- return _counters.Values;
- }
-
- /// <summary>
- /// Register a new counter with a specified name.
- /// If name does not exist, the counter will be added and true will be returned
- /// Otherwise the counter will be not added and false will be returned.
- /// </summary>
- /// <param name="name">Counter name</param>
- /// <param name="description">Counter description</param>
- /// <returns>Returns a boolean to indicate if the counter is added.</returns>
- public bool TryRegisterCounter(string name, string description)
- {
- lock (_counterLock)
- {
- if (_counters.ContainsKey(name))
- {
- Logger.Log(Level.Warning, "The counter [{0}] already exists.", name);
- return false;
- }
- _counters.Add(name, new Counter(name, description));
- }
- return true;
- }
-
- /// <summary>
- /// Get counter for a given name
- /// return false if the counter doesn't exist
- /// </summary>
- /// <param name="name">Name of the counter</param>
- /// <param name="value">Value of the counter returned</param>
- /// <returns>Returns a boolean to indicate if the value is found.</returns>
- public bool TryGetValue(string name, out ICounter value)
- {
- lock (_counterLock)
- {
- return _counters.TryGetValue(name, out value);
- }
- }
-
- /// <summary>
- /// Increase the counter with the given number
- /// </summary>
- /// <param name="name">Name of the counter</param>
- /// <param name="number">number to increase</param>
- public void Increment(string name, int number)
- {
- ICounter counter;
- if (TryGetValue(name, out counter))
- {
- lock (_counterLock)
- {
- counter.Increment(number);
- }
- }
- else
- {
- Logger.Log(Level.Error, "The counter [{0}] has not registered.", name);
- throw new ApplicationException("Counter has not registered:" + name);
- }
- }
-
- /// <summary>
- /// return serialized string of counter data
- /// TODO: [REEF-] use an unique number for the counter name mapping to reduce the data transfer over the wire
- /// TODO: [REEF-] use Avro schema if that can make the serialized string more compact
- /// </summary>
- /// <returns>Returns serialized string of the counters.</returns>
- public string Serialize()
- {
- lock (_counterLock)
- {
- if (_counters.Count > 0)
- {
- return JsonConvert.SerializeObject(_counters.Values);
- }
- return null;
- }
- }
- }
-}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
deleted file mode 100644
index b8c22c8..0000000
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
+++ /dev/null
@@ -1,98 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Linq;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Common.Telemetry
-{
- /// <summary>
- /// This class maintains a collection of the data for all the counters for metrics service.
- /// When new counter data is received, the data in the collection will be updated.
- /// After the data is processed, the increment since last process will be reset.
- /// </summary>
- internal sealed class CountersData
- {
- private static readonly Logger Logger = Logger.GetLogger(typeof(CountersData));
-
- /// <summary>
- /// Registration of counters
- /// </summary>
- private readonly IDictionary<string, CounterData> _counterMap = new ConcurrentDictionary<string, CounterData>();
-
- [Inject]
- private CountersData()
- {
- }
-
- /// <summary>
- /// Update counters
- /// </summary>
- /// <param name="counters"></param>
- internal void Update(ICounters counters)
- {
- foreach (var counter in counters.GetCounters())
- {
- CounterData counterData;
- if (_counterMap.TryGetValue(counter.Name, out counterData))
- {
- counterData.UpdateCounter(counter);
- }
- else
- {
- _counterMap.Add(counter.Name, new CounterData(counter, counter.Value));
- }
-
- Logger.Log(Level.Verbose, "Counter name: {0}, value: {1}, description: {2}, time: {3}, incrementSinceLastSink: {4}.",
- counter.Name, counter.Value, counter.Description, new DateTime(counter.Timestamp), _counterMap[counter.Name].IncrementSinceLastSink);
- }
- }
-
- /// <summary>
- /// Reset increment since last sink for each counter
- /// </summary>
- internal void Reset()
- {
- foreach (var c in _counterMap.Values)
- {
- c.ResetSinceLastSink();
- }
- }
-
- /// <summary>
- /// Convert the counter data into ISet for sink
- /// </summary>
- /// <returns></returns>
- internal IEnumerable<KeyValuePair<string, string>> GetCounterData()
- {
- return _counterMap.Select(counter => counter.Value.GetKeyValuePair());
- }
-
- /// <summary>
- /// The condition that triggers the sink. The condition can be modified later.
- /// </summary>
- /// <returns></returns>
- internal bool TriggerSink(int counterSinkThreshold)
- {
- return _counterMap.Values.Sum(e => e.IncrementSinceLastSink) > counterSinkThreshold;
- }
- }
-}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
index 7f4fd95..ef5dbc8 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
@@ -22,9 +22,8 @@
namespace Org.Apache.REEF.Common.Telemetry
{
/// <summary>
- /// This default IMetricsSink is just an example of IMetricsSink
- /// Here the data is logged in Sink() method
- /// It is more useful in test
+ /// This default IMetricsSink is a simple implementation of IMetricsSink
+ /// that logs the metrics on sink.
/// </summary>
internal sealed class DefaultMetricsSink : IMetricsSink
{
@@ -36,14 +35,14 @@
}
/// <summary>
- /// Simple sink for metrics data
+ /// Simple sink that logs metrics.
/// </summary>
- /// <param name="metrics">A collection of metrics data in Key value pair format.</param>
- public void Sink(IEnumerable<KeyValuePair<string, string>> metrics)
+ /// <param name="metrics">A collection of metrics.</param>
+ public void Sink(IEnumerable<KeyValuePair<string, MetricTracker.MetricRecord>> metrics)
{
foreach (var m in metrics)
{
- Logger.Log(Level.Info, "Metrics - Name:{0}, Value:{1}.", m.Key, m.Value);
+ Logger.Log(Level.Info, "Metrics - Name:{0}, Value:{1}.", m.Key, m.Value.Value);
}
}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DoubleMetric.cs
similarity index 60%
copy from lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
copy to lang/cs/Org.Apache.REEF.Common/Telemetry/DoubleMetric.cs
index 0f458c0..a15e716 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DoubleMetric.cs
@@ -15,12 +15,29 @@
// specific language governing permissions and limitations
// under the License.
-using Org.Apache.REEF.Tang.Annotations;
+using System.Threading;
+using Newtonsoft.Json;
namespace Org.Apache.REEF.Common.Telemetry
{
- [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
- public class CounterSinkThreshold : Name<int>
+ /// <summary>
+ /// Double metric implementation.
+ /// </summary>
+ public class DoubleMetric : MetricBase<double>
{
+ public DoubleMetric()
+ {
+ }
+
+ internal DoubleMetric(string name, string description, bool keepUpdateHistory = true)
+ : base(name, description, keepUpdateHistory)
+ {
+ }
+
+ public override void AssignNewValue(double value)
+ {
+ Interlocked.Exchange(ref _typedValue, value);
+ _tracker.Track(value);
+ }
}
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs
index 2d634e3..9d41095 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs
@@ -5,9 +5,9 @@
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
-//
+//
// http://www.apache.org/licenses/LICENSE-2.0
-//
+//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -15,25 +15,49 @@
// specific language governing permissions and limitations
// under the License.
-using System;
+using StringMetric = Org.Apache.REEF.Common.Telemetry.MetricClass<string>;
namespace Org.Apache.REEF.Common.Telemetry
{
/// <summary>
- /// A simple driver metrics.
- /// It contains system state for now.
- /// It can be extended later to include more driver metrics data.
+ /// Driver metrics implementation that contains the system state.
/// </summary>
public sealed class DriverMetrics : IDriverMetrics
{
- public DriverMetrics(string systemState, DateTime timeUpdated)
+ private readonly MetricsData _metricsData;
+
+ public static string DriverStateMetric = "DriverState";
+
+ public DriverMetrics()
{
- SystemState = systemState;
- TimeUpdated = timeUpdated;
+ _metricsData = new MetricsData();
+ var stateMetric = CreateAndRegisterMetric<StringMetric>(DriverStateMetric, "driver state.", false);
}
- public string SystemState { get; private set; }
+ public IMetricSet GetMetricsData()
+ {
+ return _metricsData;
+ }
- public DateTime TimeUpdated { get; private set; }
+ public T CreateAndRegisterMetric<T>(string name, string description, bool keepUpdateHistory)
+ where T : MetricBase, new()
+ {
+ var metric = new T
+ {
+ Name = name,
+ Description = description,
+ KeepUpdateHistory = keepUpdateHistory
+ };
+ _metricsData.RegisterMetric(metric);
+ return metric;
+ }
+
+ public bool TryGetMetric<T>(string name, out T metric)
+ where T : IMetric
+ {
+ var ret = _metricsData.TryGetMetric(name, out IMetric me);
+ metric = (T)me;
+ return ret;
+ }
}
}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs
index d7e00e5..5488667 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs
@@ -5,9 +5,9 @@
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
-//
+//
// http://www.apache.org/licenses/LICENSE-2.0
-//
+//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,7 +39,8 @@
/// </summary>
public readonly static ConfigurationModule ConfigurationModule = new DriverMetricsObserverConfigurationModule()
.BindSetEntry(GenericType<DriverMetricsObservers>.Class, OnDriverMetrics)
- .BindSetEntry<DriverMetricsObservers, MetricsService, IObserver<IDriverMetrics>>(GenericType<DriverMetricsObservers>.Class, GenericType<MetricsService>.Class)
+ .BindSetEntry<DriverMetricsObservers, MetricsService, IObserver<IDriverMetrics>>(
+ GenericType<DriverMetricsObservers>.Class, GenericType<MetricsService>.Class)
.Build();
}
}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs
index 21a3f4a..613e1a5 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs
@@ -5,9 +5,9 @@
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
-//
+//
// http://www.apache.org/licenses/LICENSE-2.0
-//
+//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/EvaluatorMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/EvaluatorMetrics.cs
index 38f789c..72625c4 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/EvaluatorMetrics.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/EvaluatorMetrics.cs
@@ -16,19 +16,20 @@
// under the License.
using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Attributes;
namespace Org.Apache.REEF.Common.Telemetry
{
- [Unstable("0.16", "This is to build a simple metrics with counters only. More metrics will be added in future.")]
+ /// <summary>
+ /// An evaluator metrics implementation that maintains a collection of metrics.
+ /// </summary>
internal sealed class EvaluatorMetrics : IEvaluatorMetrics
{
- private readonly Counters _counters;
+ private readonly MetricsData _metricsData;
[Inject]
- private EvaluatorMetrics(Counters counters)
+ private EvaluatorMetrics(MetricsData metrics)
{
- _counters = counters;
+ _metricsData = metrics;
}
/// <summary>
@@ -37,29 +38,42 @@
/// <param name="serializedMsg"></param>
internal EvaluatorMetrics(string serializedMsg)
{
- _counters = new Counters(serializedMsg);
+ _metricsData = new MetricsData(serializedMsg);
}
- /// <summary>
- /// Returns counters
- /// </summary>
- /// <returns>Returns counters.</returns>
- public ICounters GetMetricsCounters()
+ public T CreateAndRegisterMetric<T>(string name, string description, bool keepUpdateHistory)
+ where T : MetricBase, new()
{
- return _counters;
+ var metric = new T
+ {
+ Name = name,
+ Description = description,
+ KeepUpdateHistory = keepUpdateHistory
+ };
+ _metricsData.RegisterMetric(metric);
+ return metric;
}
- /// <summary>
- /// return serialized string of metrics counters data
- /// </summary>
- /// <returns>Returns serialized string of counters.</returns>
+ public IMetricSet GetMetricsData()
+ {
+ return _metricsData;
+ }
+
public string Serialize()
{
- if (_counters != null)
+ if (_metricsData != null)
{
- return _counters.Serialize();
+ return _metricsData.SerializeAndReset();
}
return null;
}
+
+ public bool TryGetMetric<T>(string name, out T metric)
+ where T : IMetric
+ {
+ var ret = _metricsData.TryGetMetric(name, out IMetric me);
+ metric = (T)me;
+ return ret;
+ }
}
}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/FloatMetric.cs
similarity index 60%
copy from lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
copy to lang/cs/Org.Apache.REEF.Common/Telemetry/FloatMetric.cs
index 0f458c0..75f3b84 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/FloatMetric.cs
@@ -15,12 +15,29 @@
// specific language governing permissions and limitations
// under the License.
-using Org.Apache.REEF.Tang.Annotations;
+using System.Threading;
+using Newtonsoft.Json;
namespace Org.Apache.REEF.Common.Telemetry
{
- [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
- public class CounterSinkThreshold : Name<int>
+ /// <summary>
+ /// Float Metric implementation.
+ /// </summary>
+ public class FloatMetric : MetricBase<float>
{
+ public FloatMetric()
+ {
+ }
+
+ internal FloatMetric(string name, string description, bool keepUpdateHistory = true)
+ : base(name, description, keepUpdateHistory)
+ {
+ }
+
+ public override void AssignNewValue(float value)
+ {
+ Interlocked.Exchange(ref _typedValue, value);
+ _tracker.Track(value);
+ }
}
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounter.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounter.cs
index be2e3c9..9e38974 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounter.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounter.cs
@@ -1,4 +1,8 @@
-// to you under the Apache License, Version 2.0 (the
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
@@ -11,36 +15,18 @@
// specific language governing permissions and limitations
// under the License.
-using Org.Apache.REEF.Utilities.Attributes;
-
namespace Org.Apache.REEF.Common.Telemetry
{
- [Unstable("0.16", "This is a simple counter for evaluator metrics.")]
public interface ICounter
{
/// <summary>
- /// Time the counter is updated.
- /// </summary>
- long Timestamp { get; }
-
- /// <summary>
- /// Name of the counter.
- /// </summary>
- string Name { get; }
-
- /// <summary>
- /// The description of the counter.
- /// </summary>
- string Description { get; }
-
- /// <summary>
- /// The value of the counter.
- /// </summary>
- int Value { get; }
-
- /// <summary>
/// Increase the current counter value with the number specified.
/// </summary>
void Increment(int number);
+
+ /// <summary>
+ /// Decrease the current counter value with the number specified.
+ /// </summary>
+ void Decrement(int number);
}
}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounters.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounters.cs
deleted file mode 100644
index b6f8809..0000000
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounters.cs
+++ /dev/null
@@ -1,65 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System.Collections.Generic;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Attributes;
-
-namespace Org.Apache.REEF.Common.Telemetry
-{
- [Unstable("0.16", "This is to build a collection of counters for evaluator metrics.")]
- [DefaultImplementation(typeof(Counters))]
- public interface ICounters
- {
- /// <summary>
- /// Register a new counter with a specified name.
- /// If name does not exist, the counter will be added and true will be returned
- /// Otherwise the counter will be not added and false will be returned.
- /// </summary>
- /// <param name="name">Name of the counter to be registered.</param>
- /// <param name="description">Description of the counter to be registered.</param>
- /// <returns>Returns a boolean to indicate if the counter is added.</returns>
- bool TryRegisterCounter(string name, string description);
-
- /// <summary>
- /// Get counter value for a given counter name
- /// </summary>
- /// <param name="name">Name of the counter</param>
- /// <param name="counter">The counter object returned</param>
- /// <returns>Returns a boolean to indicate if the value is found.</returns>
- bool TryGetValue(string name, out ICounter counter);
-
- /// <summary>
- /// Increase the counter with the given number
- /// </summary>
- /// <param name="name">Name of the counter</param>
- /// <param name="number">number to increase</param>
- void Increment(string name, int number);
-
- /// <summary>
- /// Returns all the counters
- /// </summary>
- /// <returns></returns>
- IEnumerable<ICounter> GetCounters();
-
- /// <summary>
- /// Serialize the counter into a string
- /// </summary>
- /// <returns>Returns serialized string of the counters.</returns>
- string Serialize();
- }
-}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs
index 4f2c05d..39c8904 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs
@@ -5,9 +5,9 @@
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
-//
+//
// http://www.apache.org/licenses/LICENSE-2.0
-//
+//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -15,24 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-using System;
using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Attributes;
namespace Org.Apache.REEF.Common.Telemetry
{
- [Unstable("0.16", "This is to build a simple metrics with system state only. More metrics will be added in future.")]
[DefaultImplementation(typeof(DriverMetrics))]
- public interface IDriverMetrics
+ public interface IDriverMetrics : IMetrics
{
- /// <summary>
- /// System state
- /// </summary>
- string SystemState { get; }
-
- /// <summary>
- /// DateTime that the system state is updated
- /// </summary>
- DateTime TimeUpdated { get; }
}
}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IEvaluatorMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IEvaluatorMetrics.cs
index f4476f5..4fd6dd9 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/IEvaluatorMetrics.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IEvaluatorMetrics.cs
@@ -16,22 +16,14 @@
// under the License.
using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Attributes;
namespace Org.Apache.REEF.Common.Telemetry
{
- [Unstable("0.16", "This is to build a simple metrics with counters only. More metrics will be added in future.")]
[DefaultImplementation(typeof(EvaluatorMetrics))]
- public interface IEvaluatorMetrics
+ public interface IEvaluatorMetrics : IMetrics
{
/// <summary>
- /// Returns metrics counters
- /// </summary>
- /// <returns>Returns ICounters.</returns>
- ICounters GetMetricsCounters();
-
- /// <summary>
- /// Serialize the metrics data into a string
+ /// Serializes the metrics data into a string.
/// </summary>
/// <returns>Returns serialized string of metrics</returns>
string Serialize();
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetric.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetric.cs
new file mode 100644
index 0000000..f717551
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetric.cs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// Metric interface. A generic interface for individual metrics.
+ /// </summary>
+ public interface IMetric
+ {
+ /// <summary>
+ /// Name of the metric.
+ /// </summary>
+ string Name { get; }
+
+ /// <summary>
+ /// Description of the metric.
+ /// </summary>
+ string Description { get; }
+
+ /// <summary>
+ /// Value of the metric, stored as object.
+ /// </summary>
+ object ValueUntyped { get; }
+
+ /// <summary>
+ /// Flag for the immutability of the metric.
+ /// </summary>
+ bool KeepUpdateHistory { get; }
+
+ /// <summary>
+ /// Assign a tracker to track the metric.
+ /// </summary>
+ /// <param name="tracker">The metric tracker assigned to track updates.</param>
+ /// <returns></returns>
+ IDisposable Subscribe(ITracker tracker);
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricSet.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricSet.cs
new file mode 100644
index 0000000..928ba8f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricSet.cs
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// Interface for a collection of metrics.
+ /// </summary>
+ [DefaultImplementation(typeof(MetricsData))]
+ public interface IMetricSet
+ {
+ /// <summary>
+ /// Add a metric object to this collection.
+ /// </summary>
+ /// <param name="metric">The metric object to add.</param>
+ void RegisterMetric(IMetric metric);
+
+ /// <summary>
+ /// Get metric value given the metric name.
+ /// </summary>
+ /// <param name="name">Name of the metric</param>
+ /// <param name="metric">The metric object returned</param>
+ /// <returns>Returns a boolean to indicate if the value is found.</returns>
+ bool TryGetMetric<T>(string name, out T metric)
+ where T : IMetric;
+
+ /// <summary>
+ /// Returns all the metric trackers.
+ /// </summary>
+ /// <returns></returns>
+ IEnumerable<MetricTracker> GetMetricTrackers();
+
+ /// <summary>
+ /// Empties the cached records for each metric.
+ /// </summary>
+ /// <returns>Key Value pair of metric name and record.</returns>
+ IEnumerable<KeyValuePair<string, MetricTracker.MetricRecord>> FlushMetricRecords();
+
+ /// <summary>
+ /// Serializes the metrics data.
+ /// </summary>
+ /// <returns>Serialized string.</returns>
+ string Serialize();
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetrics.cs
new file mode 100644
index 0000000..c1130f7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetrics.cs
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ public interface IMetrics
+ {
+ /// <summary>
+ /// Creates a system metric.
+ /// </summary>
+ /// <typeparam name="T">Type of the metric object.</typeparam>
+ /// <param name="name">Name of the metric.</param>
+ /// <param name="description">Description of the metric.</param>
+ /// <param name="keepUpdateHistory">whether to keep a history of updates on this metric.</param>
+ /// <returns></returns>
+ T CreateAndRegisterMetric<T>(string name, string description, bool keepUpdateHistory)
+ where T : MetricBase, new();
+
+ /// <summary>
+ /// Method that returns the collection of metric data.
+ /// </summary>
+ /// <returns></returns>
+ IMetricSet GetMetricsData();
+
+ /// <summary>
+ /// Extracts the metric object if it has been registered.
+ /// </summary>
+ /// <param name="name">Name of the metric.</param>
+ /// <param name="metric">The registered metric. null if not found.</param>
+ /// <returns></returns>
+ bool TryGetMetric<T>(string name, out T metric)
+ where T : IMetric;
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
index eef54db..42da671 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
@@ -23,11 +23,10 @@
{
/// <summary>
/// Interface for metrics sink.
- /// It is used to output IMRU metrics.
/// </summary>
[DefaultImplementation(typeof(DefaultMetricsSink))]
public interface IMetricsSink : IDisposable
{
- void Sink(IEnumerable<KeyValuePair<string, string>> metrics);
+ void Sink(IEnumerable<KeyValuePair<string, MetricTracker.MetricRecord>> metrics);
}
}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/ITracker.cs
similarity index 79%
copy from lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
copy to lang/cs/Org.Apache.REEF.Common/Telemetry/ITracker.cs
index 0f458c0..976b36a 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/ITracker.cs
@@ -15,12 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-using Org.Apache.REEF.Tang.Annotations;
-
namespace Org.Apache.REEF.Common.Telemetry
{
- [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
- public class CounterSinkThreshold : Name<int>
+ /// <summary>
+ /// Tracker interface to track metrics.
+ /// </summary>
+ public interface ITracker
{
+ void Track(object value);
}
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IntegerMetric.cs
similarity index 60%
copy from lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
copy to lang/cs/Org.Apache.REEF.Common/Telemetry/IntegerMetric.cs
index 0f458c0..7cd0e5a 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IntegerMetric.cs
@@ -15,12 +15,29 @@
// specific language governing permissions and limitations
// under the License.
-using Org.Apache.REEF.Tang.Annotations;
+using System.Threading;
+using Newtonsoft.Json;
namespace Org.Apache.REEF.Common.Telemetry
{
- [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
- public class CounterSinkThreshold : Name<int>
+ /// <summary>
+ /// Integer metric implementation.
+ /// </summary>
+ public class IntegerMetric : MetricBase<int>
{
+ public IntegerMetric()
+ {
+ }
+
+ internal IntegerMetric(string name, string description, bool keepHistory = true)
+ : base(name, description, keepHistory)
+ {
+ }
+
+ public override void AssignNewValue(int value)
+ {
+ Interlocked.Exchange(ref _typedValue, value);
+ _tracker.Track(value);
+ }
}
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/LongMetric.cs
similarity index 61%
copy from lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
copy to lang/cs/Org.Apache.REEF.Common/Telemetry/LongMetric.cs
index 0f458c0..08bad2b 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/LongMetric.cs
@@ -15,12 +15,29 @@
// specific language governing permissions and limitations
// under the License.
-using Org.Apache.REEF.Tang.Annotations;
+using System.Threading;
+using Newtonsoft.Json;
namespace Org.Apache.REEF.Common.Telemetry
{
- [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
- public class CounterSinkThreshold : Name<int>
+ /// <summary>
+ /// Long metric implementation.
+ /// </summary>
+ public class LongMetric : MetricBase<long>
{
+ public LongMetric()
+ {
+ }
+
+ internal LongMetric(string name, string description, bool keepHistory = true)
+ : base(name, description, keepHistory)
+ {
+ }
+
+ public override void AssignNewValue(long value)
+ {
+ Interlocked.Exchange(ref _typedValue, value);
+ _tracker.Track(value);
+ }
}
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs
index 6230b65..7dfb5f7 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs
@@ -5,9 +5,9 @@
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
-//
+//
// http://www.apache.org/licenses/LICENSE-2.0
-//
+//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricBase.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricBase.cs
new file mode 100644
index 0000000..7803ac1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricBase.cs
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Newtonsoft.Json;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// Provides a base implementation. Value of metric should be provided in derived classes.
+ /// </summary>
+ public abstract class MetricBase : IMetric
+ {
+ protected ITracker _tracker;
+
+ public string Name
+ {
+ get; internal set;
+ }
+
+ public string Description
+ {
+ get; internal set;
+ }
+
+ public bool KeepUpdateHistory
+ {
+ get; internal set;
+ }
+
+ public abstract object ValueUntyped
+ {
+ get;
+ }
+
+ protected MetricBase()
+ {
+ }
+
+ protected MetricBase(string name, string description, bool keepUpdateHistory = true)
+ {
+ Name = name;
+ Description = description;
+ KeepUpdateHistory = keepUpdateHistory;
+ }
+
+ public IDisposable Subscribe(ITracker tracker)
+ {
+ _tracker = tracker;
+ return new Unsubscriber(tracker);
+ }
+
+ private class Unsubscriber : IDisposable
+ {
+ private ITracker _tracker;
+
+ public Unsubscriber(ITracker tracker)
+ {
+ _tracker = tracker;
+ }
+
+ public void Dispose()
+ {
+ _tracker = null;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Base implementation with a generic value type.
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ public abstract class MetricBase<T> : MetricBase
+ {
+ protected T _typedValue;
+
+ public override object ValueUntyped
+ {
+ get { return _typedValue; }
+ }
+
+ public T Value
+ {
+ get { return _typedValue; }
+ }
+
+ public MetricBase()
+ {
+ _typedValue = default;
+ }
+
+ protected MetricBase(string name, string description, bool keepUpdateHistory = true)
+ : base(name, description, keepUpdateHistory)
+ {
+ _typedValue = default;
+ }
+
+ /// <summary>
+ /// Assign and track the new value to metric.
+ /// In most cases, this method should be overridden in derived classes using Interlocked.
+ /// </summary>
+ /// <param name="value">Value to assign the metric.</param>
+ public abstract void AssignNewValue(T value);
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricClass.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricClass.cs
new file mode 100644
index 0000000..15cd302
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricClass.cs
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Threading;
+using Newtonsoft.Json;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// Metrics of reference types (such as strings) should inherit from this class.
+ /// </summary>
+ /// <typeparam name="T">The type of the metric should be of reference type.</typeparam>
+ public class MetricClass<T> : MetricBase<T> where T : class
+ {
+ public MetricClass()
+ {
+ }
+
+ internal MetricClass(string name, string description, bool keepHistory = true)
+ : base(name, description, keepHistory)
+ {
+ }
+
+ public override void AssignNewValue(T value)
+ {
+ Interlocked.Exchange(ref _typedValue, value);
+ _tracker.Track(value);
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinkThreshold.cs
similarity index 89%
rename from lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
rename to lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinkThreshold.cs
index 0f458c0..c2d592f 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinkThreshold.cs
@@ -19,8 +19,8 @@
namespace Org.Apache.REEF.Common.Telemetry
{
- [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
- public class CounterSinkThreshold : Name<int>
+ [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "MetricSinkThreshold", DefaultValue = "1")]
+ public class MetricSinkThreshold : Name<int>
{
}
}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricTracker.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricTracker.cs
new file mode 100644
index 0000000..f272a13
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricTracker.cs
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Threading;
+using Newtonsoft.Json;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// MetricData class maintains the current value of a single metric and keeps count of the
+ /// number of times this metric has been updated. If the metric is immutable, it keeps a
+ /// record of updates. Once the data has been processed, the records and count will reset.
+ /// </summary>
+ [JsonObject]
+ public sealed class MetricTracker : ITracker
+ {
+ [JsonProperty]
+ public readonly string MetricName;
+
+ [JsonProperty]
+ internal readonly bool KeepUpdateHistory;
+
+ /// <summary>
+ /// if KeepUpdateHistory is true, keeps a history of updates.
+ /// </summary>
+ [JsonProperty]
+ private ConcurrentQueue<MetricRecord> Records;
+
+ private IMetric _metric;
+
+ private IDisposable _unsubscriber;
+
+ /// <summary>
+ /// Constructor for MetricData called when metric is registered.
+ /// </summary>
+ /// <param name="metric"></param>
+ /// <param name="initialValue"></param>
+ internal MetricTracker(IMetric metric)
+ {
+ MetricName = metric.Name;
+ Subscribe(metric);
+ KeepUpdateHistory = metric.KeepUpdateHistory;
+ Records = KeepUpdateHistory ? new ConcurrentQueue<MetricRecord>() : null;
+ Records?.Enqueue(CreateMetricRecord(metric));
+ }
+
+ [JsonConstructor]
+ internal MetricTracker(
+ string metricName,
+ IEnumerable<MetricRecord> records,
+ bool keepUpdateHistory)
+ {
+ MetricName = metricName;
+ Records = new ConcurrentQueue<MetricRecord>(records);
+ KeepUpdateHistory = keepUpdateHistory;
+ }
+
+ /// <summary>
+ /// Flush records currently held in the records queue.
+ /// </summary>
+ /// <returns>A queue containing all the flushed records.</returns>
+ internal IEnumerable<MetricRecord> FlushRecordsCache()
+ {
+ ConcurrentQueue<MetricRecord> records = new ConcurrentQueue<MetricRecord>();
+ if (Records != null)
+ {
+ while (Records.TryDequeue(out MetricRecord record))
+ {
+ records.Enqueue(record);
+ }
+ }
+ else
+ {
+ // Records will be empty only on eval side when tracker doesn't keep history.
+ records.Enqueue(CreateMetricRecord(_metric));
+ }
+ return records;
+ }
+
+ /// <summary>
+ /// When new metric data is received, update the value and records so it reflects the
+ /// new data. Called when Driver receives metrics from Evaluator.
+ /// </summary>
+ /// <param name="metric">Metric data received.</param>
+ internal MetricTracker UpdateMetric(MetricTracker metric)
+ {
+ if (metric.GetRecordCount() > 0)
+ {
+ var recordsToAdd = metric.GetMetricRecords();
+ if (KeepUpdateHistory)
+ {
+ foreach (MetricRecord record in recordsToAdd)
+ {
+ Records.Enqueue(record);
+ }
+ }
+ else
+ {
+ Interlocked.Exchange(
+ ref Records,
+ new ConcurrentQueue<MetricRecord>(recordsToAdd));
+ }
+ }
+ return this;
+ }
+
+ /// <summary>
+ /// Get the metric with its most recent value.
+ /// </summary>
+ /// <return></returns>
+ internal IMetric GetMetric()
+ {
+ return _metric;
+ }
+
+ internal int GetRecordCount()
+ {
+ return Records.Count;
+ }
+
+ /// <summary>
+ /// If KeepUpdateHistory is true, it will return all the records; otherwise, it will
+ /// return one record with the most recent value.
+ /// </summary>
+ /// <returns>The history of the metric records.</returns>
+ internal IEnumerable<MetricRecord> GetMetricRecords()
+ {
+ return Records;
+ }
+
+ /// <summary>
+ /// Subscribes the tracker to a metric object.
+ /// </summary>
+ /// <param name="provider">The metric to track.</param>
+ public void Subscribe(IMetric provider)
+ {
+ _metric = provider;
+ _unsubscriber = provider.Subscribe(this);
+ }
+
+ /// <summary>
+ /// Unsubscribes the tracker from the metric it is tracking.
+ /// </summary>
+ public void Unsubscribe()
+ {
+ _unsubscriber.Dispose();
+ }
+
+ /// <summary>
+ /// Creates and queues a new metric record with a value.
+ /// </summary>
+ /// <param name="value">Value of the new record.</param>
+ public void Track(object value)
+ {
+ Records?.Enqueue(CreateMetricRecord(value));
+ }
+
+ private static MetricRecord CreateMetricRecord(IMetric metric)
+ {
+ return new MetricRecord(metric);
+ }
+
+ private static MetricRecord CreateMetricRecord(object val)
+ {
+ return new MetricRecord(val);
+ }
+
+ [JsonObject]
+ public class MetricRecord
+ {
+ [JsonProperty]
+ public readonly object Value;
+
+ [JsonProperty]
+ public readonly long Timestamp;
+
+ [JsonConstructor]
+ public MetricRecord(object value, long? timestamp = null)
+ {
+ Value = value;
+ Timestamp = timestamp.GetValueOrDefault(DateTime.Now.Ticks);
+ }
+
+ public MetricRecord(IMetric metric)
+ {
+ Value = metric.ValueUntyped;
+ Timestamp = DateTime.Now.Ticks;
+ }
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsData.cs
new file mode 100644
index 0000000..f55a448
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsData.cs
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Tang.Annotations;
+using Newtonsoft.Json;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// This class maintains a collection of the data for all the metrics for metrics service.
+ /// When new metric data is received, the data in the collection will be updated.
+ /// After the data is processed, the changes since last process will be reset.
+ /// </summary>
+ public sealed class MetricsData : IMetricSet
+ {
+ private static readonly JsonSerializerSettings settings = new JsonSerializerSettings()
+ {
+ TypeNameHandling = TypeNameHandling.All
+ };
+
+ /// <summary>
+ /// Registration of metrics
+ /// </summary>
+ private readonly ConcurrentDictionary<string, MetricTracker> _metricsMap =
+ new ConcurrentDictionary<string, MetricTracker>();
+
+ [Inject]
+ internal MetricsData()
+ {
+ }
+
+ /// <summary>
+ /// Deserialization.
+ /// </summary>
+ /// <param name="serializedMetricsString">string to Deserialize.</param>
+ [JsonConstructor]
+ internal MetricsData(string serializedMetricsString)
+ {
+ var metrics = JsonConvert.DeserializeObject<IEnumerable<MetricTracker>>(
+ serializedMetricsString,
+ settings);
+
+ foreach (var m in metrics)
+ {
+ _metricsMap.TryAdd(m.MetricName, m);
+ }
+ }
+
+ public void RegisterMetric(IMetric metric)
+ {
+ if (!_metricsMap.TryAdd(metric.Name, new MetricTracker(metric)))
+ {
+ throw new ArgumentException("The metric [{0}] already exists.", metric.Name);
+ }
+ }
+
+ public bool TryGetMetric<T>(string name, out T metric)
+ where T : IMetric
+ {
+ bool success = _metricsMap.TryGetValue(name, out MetricTracker tracker);
+ metric = (T)tracker?.GetMetric();
+ return success && metric != null;
+ }
+
+ public IEnumerable<MetricTracker> GetMetricTrackers()
+ {
+ return _metricsMap.Values;
+ }
+
+ /// <summary>
+ /// Flushes changes since last sink for each metric.
+ /// Called when Driver is sinking metrics.
+ /// </summary>
+ /// <returns>Key value pairs of metric name and record that was flushed.</returns>
+ public IEnumerable<KeyValuePair<string, MetricTracker.MetricRecord>> FlushMetricRecords()
+ {
+ // for each metric, flush the records and create key value pairs
+ return _metricsMap.SelectMany(
+ kv => kv.Value.FlushRecordsCache().Select(
+ r => new KeyValuePair<string, MetricTracker.MetricRecord>(kv.Key, r)));
+ }
+
+ /// <summary>
+ /// Updates metrics given another <see cref="MetricsData"/> object.
+ /// For every metric in the new set, if it is registered then update the value,
+ /// if it is not then add it to the registration.
+ /// </summary>
+ /// <param name="metrics">New metric values to be updated.</param>
+ internal void Update(IMetricSet metrics)
+ {
+ foreach (var tracker in metrics.GetMetricTrackers())
+ {
+ _metricsMap.AddOrUpdate(
+ tracker.MetricName,
+ tracker,
+ (k, v) => v.UpdateMetric(tracker));
+ }
+ }
+
+ /// <summary>
+ /// Flushes that trackers contained in the queue.
+ /// Called when Evaluator is sending metrics information to Driver.
+ /// </summary>
+ /// <returns>Queue of trackers containing metric records.</returns>
+ internal IEnumerable<MetricTracker> FlushMetricTrackers()
+ {
+ return new ConcurrentQueue<MetricTracker>(_metricsMap.Select(
+ kv => new MetricTracker(
+ kv.Value.MetricName,
+ kv.Value.FlushRecordsCache(),
+ kv.Value.KeepUpdateHistory)));
+ }
+
+ /// <summary>
+ /// Sums up the total changes to metrics to see if it has reached the sink threshold.
+ /// </summary>
+ /// <returns>Returns whether the sink threshold has been met.</returns>
+ internal bool TriggerSink(int metricSinkThreshold)
+ {
+ return _metricsMap.Values.Sum(e => e.GetRecordCount()) > metricSinkThreshold;
+ }
+
+ public string Serialize()
+ {
+ return Serialize(_metricsMap.Values);
+ }
+
+ internal string Serialize(IEnumerable<MetricTracker> trackers)
+ {
+ return JsonConvert.SerializeObject(trackers, settings);
+ }
+
+ internal string SerializeAndReset()
+ {
+ return Serialize(FlushMetricTrackers());
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs
index 640c483..81d095c 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs
@@ -5,9 +5,9 @@
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
-//
+//
// http://www.apache.org/licenses/LICENSE-2.0
-//
+//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,15 +18,14 @@
using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities;
-using Org.Apache.REEF.Utilities.Attributes;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.Common.Telemetry
{
/// <summary>
- /// This class implements IContextMessageSource that is responsible to send context message
+ /// This class implements IContextMessageSource that is responsible to send Evaluator
+ /// metrics as a context message to the Driver.
/// </summary>
- [Unstable("0.16", "The metrics API is in development.")]
internal sealed class MetricsMessageSender : IContextMessageSource
{
private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsMessageSender));
@@ -58,6 +57,7 @@
{
get
{
+ Logger.Log(Level.Info, "Getting context msg for eval metrics.");
var s = _evaluatorMetrics.Serialize();
if (s != null)
{
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
index 44d2b32..4be778b 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
@@ -5,9 +5,9 @@
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
-//
+//
// http://www.apache.org/licenses/LICENSE-2.0
-//
+//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,35 +22,31 @@
using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities;
-using Org.Apache.REEF.Utilities.Attributes;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.Common.Telemetry
{
/// <summary>
- /// Metrics service. It is also a context message handler.
+ /// Metrics Service that handles metrics from the Evaluator and Driver.
/// </summary>
- [Unstable("0.16", "This is a simple MetricsService. More functionalities will be added.")]
internal sealed class MetricsService : IObserver<IContextMessage>, IObserver<IDriverMetrics>
{
private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsService));
/// <summary>
- /// Contains Counters received in the Metrics service
+ /// The set of metrics Metrics Service maintains.
/// </summary>
- private readonly CountersData _countersData;
+ private readonly MetricsData _metricsData;
/// <summary>
- /// A set of metrics sinks
+ /// A set of Metric Sinks.
/// </summary>
private readonly ISet<IMetricsSink> _metricsSinks;
/// <summary>
- /// The threshold that triggers the sinks.
- /// Currently only one threshold is defined for all the counters.
- /// Later, it can be extended to define a threshold per counter.
+ /// The total number of changes that has to be met to trigger the sinks.
/// </summary>
- private readonly int _counterSinkThreshold;
+ private readonly int _metricSinkThreshold;
/// <summary>
/// It can be bound with driver configuration as a context message handler
@@ -58,45 +54,45 @@
[Inject]
private MetricsService(
[Parameter(typeof(MetricSinks))] ISet<IMetricsSink> metricsSinks,
- [Parameter(typeof(CounterSinkThreshold))] int counterSinkThreshold,
- CountersData countersData)
+ [Parameter(typeof(MetricSinkThreshold))] int metricSinkThreshold,
+ MetricsData metricsData)
{
_metricsSinks = metricsSinks;
- _counterSinkThreshold = counterSinkThreshold;
- _countersData = countersData;
+ _metricSinkThreshold = metricSinkThreshold;
+ _metricsData = metricsData;
}
/// <summary>
- /// It is called whenever context message is received
+ /// Called whenever context message is received
/// </summary>
/// <param name="contextMessage">Serialized EvaluatorMetrics</param>
public void OnNext(IContextMessage contextMessage)
{
var msgReceived = ByteUtilities.ByteArraysToString(contextMessage.Message);
- var counters = new EvaluatorMetrics(msgReceived).GetMetricsCounters();
+ var evalMetrics = new EvaluatorMetrics(msgReceived);
+ var metricsData = evalMetrics.GetMetricsData();
- Logger.Log(Level.Info, "Received {0} counters with context message: {1}.",
- counters.GetCounters().Count(), msgReceived);
+ Logger.Log(Level.Info, "Received {0} metrics with context message of length {1}",
+ metricsData.GetMetricTrackers().Count(), msgReceived.Length);
- _countersData.Update(counters);
+ _metricsData.Update(metricsData);
- if (_countersData.TriggerSink(_counterSinkThreshold))
+ if (_metricsData.TriggerSink(_metricSinkThreshold))
{
- Sink(_countersData.GetCounterData());
- _countersData.Reset();
+ Sink(_metricsData.FlushMetricRecords());
}
}
/// <summary>
- /// Call each Sink to sink the data in the counters
+ /// Call each Sink to process the cached metric records.
/// </summary>
- private void Sink(IEnumerable<KeyValuePair<string, string>> metrics)
+ private void Sink(IEnumerable<KeyValuePair<string, MetricTracker.MetricRecord>> metricRecords)
{
foreach (var s in _metricsSinks)
{
try
{
- Task.Run(() => s.Sink(metrics));
+ Task.Run(() => s.Sink(metricRecords));
}
catch (Exception e)
{
@@ -109,9 +105,13 @@
}
}
+ /// <summary>
+ /// Called when task is completed to sink cached metrics.
+ /// </summary>
public void OnCompleted()
{
- Logger.Log(Level.Info, "Completed");
+ Sink(_metricsData.FlushMetricRecords());
+ Logger.Log(Level.Info, "MetricsService completed");
}
public void OnError(Exception error)
@@ -121,17 +121,12 @@
/// <summary>
/// Observer of IDriverMetrics.
- /// When Driver metrics data is changed, this method will be called.
- /// It calls Sink to store/log the metrics data.
+ /// Called when Driver metrics data is changed.
/// </summary>
/// <param name="driverMetrics">driver metrics data.</param>
public void OnNext(IDriverMetrics driverMetrics)
{
- Sink(new Dictionary<string, string>()
- {
- { "SystemState", driverMetrics.SystemState },
- { "TimeUpdated", driverMetrics.TimeUpdated.ToLongTimeString() }
- });
+ Sink(driverMetrics.GetMetricsData().FlushMetricRecords());
}
}
}
diff --git a/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
index 957d505..e77f36f 100644
--- a/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
@@ -30,7 +30,7 @@
public sealed class MetricsServiceConfigurationModule : ConfigurationModuleBuilder
{
public static readonly OptionalImpl<IMetricsSink> OnMetricsSink = new OptionalImpl<IMetricsSink>();
- public static readonly OptionalParameter<int> CounterSinkThreshold = new OptionalParameter<int>();
+ public static readonly OptionalParameter<int> MetricSinkThreshold = new OptionalParameter<int>();
/// <summary>
/// It provides the configuration for MetricsService
@@ -40,7 +40,7 @@
GenericType<DriverBridgeConfigurationOptions.ContextMessageHandlers>.Class,
GenericType<MetricsService>.Class)
.BindSetEntry(GenericType<MetricSinks>.Class, OnMetricsSink)
- .BindNamedParameter(GenericType<CounterSinkThreshold>.Class, CounterSinkThreshold)
+ .BindNamedParameter(GenericType<MetricSinkThreshold>.Class, MetricSinkThreshold)
.Build();
}
}
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
index 73df585..55cece4 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
@@ -30,6 +30,7 @@
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Tests.Functional.Messaging;
using Org.Apache.REEF.Utilities.Logging;
+using StringMetric = Org.Apache.REEF.Common.Telemetry.MetricClass<string>;
namespace Org.Apache.REEF.Tests.Functional.Telemetry
{
@@ -46,6 +47,8 @@
private readonly IEvaluatorRequestor _evaluatorRequestor;
internal const string EventPrefix = "TestState";
+ private IDriverMetrics _driverMetrics;
+
/// <summary>
/// a set of driver metrics observers.
/// </summary>
@@ -53,7 +56,7 @@
/// <summary>
/// This driver inject DriverMetricsObservers and IDriverMetrics.
- /// It keeps updating the driver metrics when receiving events.
+ /// It keeps updating the driver metrics when receiving events.
/// </summary>
/// <param name="evaluatorRequestor"></param>
/// <param name="driverMetricsObservers"></param>
@@ -67,6 +70,7 @@
public void OnNext(IDriverStarted value)
{
+ _driverMetrics = new DriverMetrics();
UpdateMetrics(TestSystemState.DriverStarted);
var request =
@@ -116,7 +120,7 @@
{
Logger.Log(Level.Info, "Received ICompletedTask");
UpdateMetrics(TestSystemState.TaskCompleted);
-
+ FlushMetricsCache();
value.ActiveContext.Dispose();
}
@@ -135,11 +139,20 @@
/// </summary>
private void UpdateMetrics(TestSystemState systemState)
{
- var driverMetrics = new DriverMetrics(EventPrefix + systemState, DateTime.Now);
+ _driverMetrics.TryGetMetric(DriverMetrics.DriverStateMetric, out StringMetric stateMetric);
+ stateMetric.AssignNewValue(EventPrefix + systemState.ToString());
foreach (var metricsObserver in _driverMetricsObservers)
{
- metricsObserver.OnNext(driverMetrics);
+ metricsObserver.OnNext(_driverMetrics);
+ }
+ }
+
+ private void FlushMetricsCache()
+ {
+ foreach(var metricsObserver in _driverMetricsObservers)
+ {
+ metricsObserver.OnCompleted();
}
}
}
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs
index 88e9461..b475651 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs
@@ -30,26 +30,29 @@
{
private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsTask));
- public const string TestCounter1 = "TestCounter1";
- public const string TestCounter2 = "TestCounter2";
+ public const string TestCounter = "TestCounter";
+ public const string TestIntMetric = "Iterations";
- private readonly ICounters _counters;
+ private readonly IEvaluatorMetrics _evaluatorMetrics;
+
+ public CounterMetric metric1;
+ public IntegerMetric metric2;
[Inject]
private MetricsTask(IEvaluatorMetrics evaluatorMetrics)
{
- _counters = evaluatorMetrics.GetMetricsCounters();
- _counters.TryRegisterCounter(TestCounter1, "This is " + TestCounter1);
- _counters.TryRegisterCounter(TestCounter2, "This is " + TestCounter2);
+ _evaluatorMetrics = evaluatorMetrics;
+ metric1 = _evaluatorMetrics.CreateAndRegisterMetric<CounterMetric>(TestCounter, TestCounter + " description", false);
+ metric2 = _evaluatorMetrics.CreateAndRegisterMetric<IntegerMetric>(TestIntMetric, TestIntMetric + " description", true);
}
public byte[] Call(byte[] memento)
{
- for (int i = 0; i < 100; i++)
+ for (int i = 1; i <= 2000; i++)
{
- _counters.Increment(TestCounter1, 1);
- _counters.Increment(TestCounter2, 2);
- Thread.Sleep(100);
+ metric1.Increment();
+ metric2.AssignNewValue(i);
+ Thread.Sleep(10);
}
return null;
}
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
index 0f10ac8..c0aa2ab 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
@@ -38,7 +38,7 @@
TestRun(DriverConfigurations(), typeof(MetricsDriver), 1, "sendMessages", "local", testFolder);
ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240);
- var receivedCounterMessage = GetMessageCount(lines, "Received 2 counters with context message:");
+ var receivedCounterMessage = GetMessageCount(lines, "Received 2 metrics with context message of length");
Assert.True(receivedCounterMessage > 1);
var messageCount = GetMessageCount(lines, MetricsDriver.EventPrefix);
@@ -59,7 +59,7 @@
var metricServiceConfig = MetricsServiceConfigurationModule.ConfigurationModule
.Set(MetricsServiceConfigurationModule.OnMetricsSink, GenericType<DefaultMetricsSink>.Class)
- .Set(MetricsServiceConfigurationModule.CounterSinkThreshold, "5")
+ .Set(MetricsServiceConfigurationModule.MetricSinkThreshold, "1000")
.Build();
var driverMetricConfig = DriverMetricsObserverConfigurationModule.ConfigurationModule.Build();