[REEF-1735] Define IMetricsSink interface
* Add IMetricsSink interface and default impl
* Add named parameter for MetricsSinks
* Add MetricsSinks to MetricsService
* Modify test cases
JIRA:
[REEF-1735](https://issues.apache.org/jira/browse/REEF-1735)
Pull request
This closes #1262
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index 8feb0e1..4bdf723 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -247,12 +247,18 @@
<Compile Include="Tasks\TaskConfigurationOptions.cs" />
<Compile Include="Tasks\TaskMessage.cs" />
<Compile Include="Telemetry\Counter.cs" />
+ <Compile Include="Telemetry\CounterData.cs" />
+ <Compile Include="Telemetry\CountersData.cs" />
<Compile Include="Telemetry\Counters.cs" />
+ <Compile Include="Telemetry\CounterSinkThreshold.cs" />
+ <Compile Include="Telemetry\DefaultMetricsSink.cs" />
<Compile Include="Telemetry\EvaluatorMetrics.cs" />
<Compile Include="Telemetry\ICounter.cs" />
<Compile Include="Telemetry\ICounters.cs" />
<Compile Include="Telemetry\IEvaluatorMetrics.cs" />
+ <Compile Include="Telemetry\IMetricsSink.cs" />
<Compile Include="Telemetry\MessageSenderConfigurationModule.cs" />
+ <Compile Include="Telemetry\MetricSinks.cs" />
<Compile Include="Telemetry\MetricsMessageSender.cs" />
<Compile Include="Telemetry\MetricsService.cs" />
</ItemGroup>
diff --git a/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs
index 4a8c546..d126a7e 100644
--- a/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs
@@ -62,6 +62,12 @@
"b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" +
"618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")]
+[assembly: InternalsVisibleTo("Org.Apache.REEF.Tests, publickey=" +
+ "00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9" +
+ "9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc" +
+ "b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" +
+ "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")]
+
// Allow NSubstitute to create proxy implementations
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=002400000480000" +
"0940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a36" +
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs
new file mode 100644
index 0000000..5f23262
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// This class wraps a Counter object and the increment value since last sink
+ /// </summary>
+ internal sealed class CounterData
+ {
+ /// <summary>
+ /// Counter object
+ /// </summary>
+ private ICounter _counter;
+
+ /// <summary>
+ /// Counter increment value since last sink
+ /// </summary>
+ internal int IncrementSinceLastSink { get; private set; }
+
+ /// <summary>
+ /// Constructor for CounterData
+ /// </summary>
+ /// <param name="counter"></param>
+ /// <param name="initialValue"></param>
+ internal CounterData(ICounter counter, int initialValue)
+ {
+ _counter = counter;
+ IncrementSinceLastSink = initialValue;
+ }
+
+ /// <summary>
+ /// clear the increment since last sink
+ /// </summary>
+ internal void ResetSinceLastSink()
+ {
+ IncrementSinceLastSink = 0;
+ }
+
+ internal void UpdateCounter(ICounter counter)
+ {
+ IncrementSinceLastSink += counter.Value - _counter.Value;
+
+ //// TODO: [REEF-1748] The following cases need to be considered in determine how to update the counter:
+ //// if evaluator contains the aggregated values, the value will override existing value
+ //// if evaluator only keep delta, the value should be added at here. But the value in the evaluator should be reset after message is sent
+ //// For the counters from multiple evaluators with the same counter name, the value should be aggregated here
+ //// We also need to consider failure cases.
+ _counter = counter;
+ }
+
+ /// <summary>
+ /// Get count name and value as KeyValuePair
+ /// </summary>
+ /// <returns></returns>
+ internal KeyValuePair<string, string> GetKeyValuePair()
+ {
+ return new KeyValuePair<string, string>(_counter.Name, _counter.Value.ToString());
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
new file mode 100644
index 0000000..0f458c0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
+ public class CounterSinkThreshold : Name<int>
+ {
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
new file mode 100644
index 0000000..55393b0
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// This class maintains a collection of the data for all the counters for metrics service.
+ /// When new counter data is received, the data in the collection will be updated.
+ /// After the data is processed, the increment since last process will be reset.
+ /// </summary>
+ internal sealed class CountersData
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(CountersData));
+
+ /// <summary>
+ /// Registration of counters
+ /// </summary>
+ private readonly IDictionary<string, CounterData> _counterMap = new ConcurrentDictionary<string, CounterData>();
+
+ [Inject]
+ private CountersData()
+ {
+ }
+
+ /// <summary>
+ /// Update counters
+ /// </summary>
+ /// <param name="counters"></param>
+ internal void Update(ICounters counters)
+ {
+ foreach (var counter in counters.GetCounters())
+ {
+ CounterData counterData;
+ if (_counterMap.TryGetValue(counter.Name, out counterData))
+ {
+ counterData.UpdateCounter(counter);
+ }
+ else
+ {
+ _counterMap.Add(counter.Name, new CounterData(counter, counter.Value));
+ }
+
+ Logger.Log(Level.Verbose, "Counter name: {0}, value: {1}, description: {2}, time: {3}, incrementSinceLastSink: {4}.",
+ counter.Name, counter.Value, counter.Description, new DateTime(counter.Timestamp), _counterMap[counter.Name].IncrementSinceLastSink);
+ }
+ }
+
+ /// <summary>
+ /// Reset increment since last sink for each counter
+ /// </summary>
+ internal void Reset()
+ {
+ foreach (var c in _counterMap.Values)
+ {
+ c.ResetSinceLastSink();
+ }
+ }
+
+ /// <summary>
+ /// Convert the counter data into ISet for sink
+ /// </summary>
+ /// <returns></returns>
+ internal ISet<KeyValuePair<string, string>> GetCounterData()
+ {
+ var set = new HashSet<KeyValuePair<string, string>>();
+ foreach (var c in _counterMap)
+ {
+ set.Add(c.Value.GetKeyValuePair());
+ }
+ return set;
+ }
+
+ /// <summary>
+ /// The condition that triggers the sink. The condition can be modified later.
+ /// </summary>
+ /// <returns></returns>
+ internal bool TriggerSink(int counterSinkThreshold)
+ {
+ return _counterMap.Values.Sum(e => e.IncrementSinceLastSink) > counterSinkThreshold;
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
new file mode 100644
index 0000000..d302812
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// This default IMetricsSink is just an example of IMetricsSink
+ /// Here the data is logged in Sink() method
+ /// It is more useful in test
+ /// </summary>
+ internal sealed class DefaultMetricsSink : IMetricsSink
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(DefaultMetricsSink));
+
+ [Inject]
+ private DefaultMetricsSink()
+ {
+ }
+
+ /// <summary>
+ /// Simple sink for metrics data
+ /// </summary>
+ /// <param name="metrics"></param>
+ public void Sink(ISet<KeyValuePair<string, string>> metrics)
+ {
+ foreach (var m in metrics)
+ {
+ Logger.Log(Level.Info, "Metrics - Name:{0}, Value:{1}.", m.Key, m.Value);
+ }
+ }
+
+ /// <summary>
+ /// This is intentionally empty as we don't have any resource to release in the implementation.
+ /// </summary>
+ public void Dispose()
+ {
+ }
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
new file mode 100644
index 0000000..b27bd3d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// Interface for metrics sink.
+ /// It is used to output IMRU metrics.
+ /// </summary>
+ [DefaultImplementation(typeof(DefaultMetricsSink))]
+ public interface IMetricsSink : IDisposable
+ {
+ void Sink(ISet<KeyValuePair<string, string>> metrics);
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinks.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinks.cs
new file mode 100644
index 0000000..09b7598
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinks.cs
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+ /// <summary>
+ /// A named parameter for a set of IMetricsSink.
+ /// </summary>
+ [NamedParameter(DefaultClasses = new Type[] { typeof(DefaultMetricsSink) })]
+ public sealed class MetricSinks : Name<ISet<IMetricsSink>>
+ {
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
index 7ff3c26..75c8cc2 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
@@ -16,9 +16,9 @@
// under the License.
using System;
-using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
+using System.Threading.Tasks;
using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities;
@@ -34,14 +34,35 @@
internal sealed class MetricsService : IObserver<IContextMessage>
{
private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsService));
- private readonly IDictionary<string, ICounter> _counters = new ConcurrentDictionary<string, ICounter>();
+
+ /// <summary>
+ /// Contains Counters received in the Metrics service
+ /// </summary>
+ private readonly CountersData _countersData;
+
+ /// <summary>
+ /// A set of metrics sinks
+ /// </summary>
+ private readonly ISet<IMetricsSink> _metricsSinks;
+
+ /// <summary>
+ /// The threshold that triggers the sinks.
+ /// Currently only one threshold is defined for all the counters. Later, it can be extended to define a threshold per counter.
+ /// </summary>
+ private readonly int _counterSinkThreshold;
/// <summary>
/// It can be bound with driver configuration as a context message handler
/// </summary>
[Inject]
- private MetricsService()
+ private MetricsService(
+ [Parameter(typeof(MetricSinks))] ISet<IMetricsSink> metricsSinks,
+ [Parameter(typeof(CounterSinkThreshold))] int counterSinkThreshold,
+ CountersData countersData)
{
+ _metricsSinks = metricsSinks;
+ _counterSinkThreshold = counterSinkThreshold;
+ _countersData = countersData;
}
/// <summary>
@@ -54,24 +75,34 @@
var counters = new EvaluatorMetrics(msgReceived).GetMetricsCounters();
Logger.Log(Level.Info, "Received {0} counters with context message: {1}.", counters.GetCounters().Count(), msgReceived);
- foreach (var counter in counters.GetCounters())
- {
- ICounter c;
- if (_counters.TryGetValue(counter.Name, out c))
- {
- //// TODO: [REEF-1748] The following cases need to be considered in determine how to update the counter:
- //// if evaluator contains the aggregated values, the value will override existing value
- //// if evaluator only keep delta, the value should be added at here. But the value in the evaluator should be reset after message is sent
- //// For the counters from multiple evaluators with the same counter name, the value should be aggregated here
- //// We also need to consider failure cases.
- _counters[counter.Name] = counter;
- }
- else
- {
- _counters.Add(counter.Name, counter);
- }
+ _countersData.Update(counters);
- Logger.Log(Level.Verbose, "Counter name: {0}, value: {1}, description: {2}, time: {3}.", counter.Name, counter.Value, counter.Description, new DateTime(counter.Timestamp));
+ if (_countersData.TriggerSink(_counterSinkThreshold))
+ {
+ Sink(_countersData.GetCounterData());
+ _countersData.Reset();
+ }
+ }
+
+ /// <summary>
+ /// Call each Sink to sink the data in the counters
+ /// </summary>
+ private void Sink(ISet<KeyValuePair<string, string>> set)
+ {
+ foreach (var s in _metricsSinks)
+ {
+ try
+ {
+ Task.Run(() => s.Sink(set));
+ }
+ catch (Exception e)
+ {
+ Logger.Log(Level.Error, "Exception happens during the sink for Sink {0} with Exception: {1}.", s.GetType().AssemblyQualifiedName, e);
+ }
+ finally
+ {
+ s.Dispose();
+ }
}
}
diff --git a/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
index b7e75d5..957d505 100644
--- a/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
@@ -24,8 +24,14 @@
namespace Org.Apache.REEF.Driver
{
+ /// <summary>
+ /// Configuration module for MetricsService.
+ /// </summary>
public sealed class MetricsServiceConfigurationModule : ConfigurationModuleBuilder
{
+ public static readonly OptionalImpl<IMetricsSink> OnMetricsSink = new OptionalImpl<IMetricsSink>();
+ public static readonly OptionalParameter<int> CounterSinkThreshold = new OptionalParameter<int>();
+
/// <summary>
/// It provides the configuration for MetricsService
/// </summary>
@@ -33,6 +39,8 @@
.BindSetEntry<DriverBridgeConfigurationOptions.ContextMessageHandlers, MetricsService, IObserver<IContextMessage>>(
GenericType<DriverBridgeConfigurationOptions.ContextMessageHandlers>.Class,
GenericType<MetricsService>.Class)
+ .BindSetEntry(GenericType<MetricSinks>.Class, OnMetricsSink)
+ .BindNamedParameter(GenericType<CounterSinkThreshold>.Class, CounterSinkThreshold)
.Build();
}
}
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
index 12eb9f9..f447f75 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+using Org.Apache.REEF.Common.Telemetry;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Interface;
@@ -51,7 +52,10 @@
.Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
.Build();
- var c2 = MetricsServiceConfigurationModule.ConfigurationModule.Build();
+ var c2 = MetricsServiceConfigurationModule.ConfigurationModule
+ .Set(MetricsServiceConfigurationModule.OnMetricsSink, GenericType<DefaultMetricsSink>.Class)
+ .Set(MetricsServiceConfigurationModule.CounterSinkThreshold, "5")
+ .Build();
return Configurations.Merge(c1, c2);
}