blob: bb3925914cb37d9ec1d20917a7d22bbc155e2566 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using Org.Apache.REEF.Common.Metrics.Api;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities.Attributes;
namespace Org.Apache.REEF.Common.Metrics.MutableMetricsLayer
{
/// <summary>
/// Default implementation of <see cref="IMetricsSource"/>. Contains user friendly aceess to
/// add and update metrics. Can be extended or totally ignored by the user.
/// </summary>
[Unstable("0.16", "Contract may change.")]
public class DefaultMetricsSourceImpl : IMetricsSource, IDisposable
{
private readonly IMetricContainer<ICounter> _counters;
private readonly IMetricContainer<ILongGauge> _longGauges;
private readonly IMetricContainer<IDoubleGauge> _doubleGauges;
private readonly IMetricContainer<IRate> _rates;
private readonly Dictionary<string, MetricsTag> _tags = new Dictionary<string, MetricsTag>();
private readonly IList<IObserver<SnapshotRequest>> _observers = new List<IObserver<SnapshotRequest>>();
private readonly object _lock = new object();
private readonly string _contextOrTaskName;
private readonly string _evaluatorId;
private readonly string _sourceContext;
private readonly string _recordName;
private readonly IMetricsFactory _metricsFactory;
[Inject]
private DefaultMetricsSourceImpl(
[Parameter(typeof(DefaultMetricsSourceParameters.ContextOrTaskName))] string contextOrTaskName,
[Parameter(typeof(DefaultMetricsSourceParameters.EvaluatorId))] string evaluatorId,
[Parameter(typeof(DefaultMetricsSourceParameters.SourceContext))] string sourceContext,
[Parameter(typeof(DefaultMetricsSourceParameters.RecordName))] string recordName,
IMetricsFactory metricsFactory)
{
_contextOrTaskName = contextOrTaskName;
_evaluatorId = evaluatorId;
_sourceContext = sourceContext;
_recordName = recordName;
_metricsFactory = metricsFactory;
_counters = new MutableMetricContainer<ICounter>(
(name, desc) => metricsFactory.CreateCounter(name, desc),
this);
_longGauges = new MutableMetricContainer<ILongGauge>(
(name, desc) => metricsFactory.CreateLongGauge(name, desc),
this);
_doubleGauges = new MutableMetricContainer<IDoubleGauge>(
(name, desc) => metricsFactory.CreateDoubleGauge(name, desc),
this);
_rates = new MutableMetricContainer<IRate>(
(name, desc) => metricsFactory.CreateRateMetric(name, desc),
this);
}
/// <summary>
/// Returns indexable counters container.
/// </summary>
public IMetricContainer<ICounter> Counters
{
get { return _counters; }
}
/// <summary>
/// Returns indexable long guage container.
/// </summary>
public IMetricContainer<ILongGauge> LongGauges
{
get { return _longGauges; }
}
/// <summary>
/// Returns indexable double gauge container.
/// </summary>
public IMetricContainer<IDoubleGauge> DoubleGauges
{
get { return _doubleGauges; }
}
/// <summary>
/// Returns indexable double gauge container.
/// </summary>
public IMetricContainer<IRate> Rates
{
get { return _rates; }
}
/// <summary>
/// Adds a tag to the source. Replaces old one if it exists.
/// </summary>
/// <param name="name">Name of the tag.</param>
/// <param name="desc">Description of the tag.</param>
/// <param name="value">Value of the tag.</param>
public void AddTag(string name, string desc, string value)
{
lock (_lock)
{
_tags[name] = _metricsFactory.CreateTag(new MetricsInfoImpl(name, desc), value);
}
}
/// <summary>
/// Returns tag by name.
/// </summary>
/// <param name="name">Name of the tag.</param>
/// <returns>Tag if it exists, null otherwise.</returns>
public MetricsTag GetTag(string name)
{
lock (_lock)
{
MetricsTag tag;
_tags.TryGetValue(name, out tag);
return tag;
}
}
/// <summary>
/// Subscribes the <see cref="MutableMetricBase"/> whose snapshot
/// will be taken. This function will be used if user has its own metric types except
/// from the ones used in this class. For example, a ML algorithm can subscribe itself
/// as observer, and directly add metric vlaues like iterations, loss function etc. in the
/// record.
/// </summary>
/// <param name="observer">Observer that takes snapshot of the metric.</param>
/// <returns>A disposable handler used to unsubscribe the observer.</returns>
public IDisposable Subscribe(IObserver<SnapshotRequest> observer)
{
lock (_lock)
{
if (!_observers.Contains(observer))
{
_observers.Add(observer);
}
return new Unsubscriber(_observers, observer, _lock);
}
}
/// <summary>
/// Gets metrics from the source.
/// </summary>
/// <param name="collector">Collector that stores the resulting metrics snapshot as records.</param>
/// <param name="all">If true, gets metric values even if they are unchanged.</param>
public void GetMetrics(IMetricsCollector collector, bool all)
{
lock (_lock)
{
var rb = collector.CreateRecord(_recordName)
.SetContext(_sourceContext)
.AddTag("TaskOrContextName", _contextOrTaskName)
.AddTag("EvaluatorId", _evaluatorId)
.AddTag("SourceType", "DefaultSource");
var request = new SnapshotRequest(rb, all);
foreach (var entry in _observers)
{
entry.OnNext(request);
}
foreach (var entry in _tags)
{
rb.Add(entry.Value);
}
}
}
/// <summary>
/// Diposes the <see cref="DefaultMetricsSourceImpl"/>.
/// Removes all the observers.
/// </summary>
public void Dispose()
{
foreach (var observer in _observers)
{
observer.OnCompleted();
}
}
private class Unsubscriber : IDisposable
{
private readonly IList<IObserver<SnapshotRequest>> _observers;
private readonly IObserver<SnapshotRequest> _observer;
private readonly object _lock;
public Unsubscriber(IList<IObserver<SnapshotRequest>> observers, IObserver<SnapshotRequest> observer, object lockObject)
{
_observers = observers;
_observer = observer;
_lock = lockObject;
}
public void Dispose()
{
lock (_lock)
{
if (_observer != null && _observers.Contains(_observer))
{
_observers.Remove(_observer);
}
}
}
}
}
}