[REEF-1732] Build Metrics System (#1460)

Enhance the capabilities of MetricService by adding the following functionality:
   * Let the user specify the type of the metric, as opposed to being restricted to Counters.
   * Let the user maintain a timeseries and keep track of the history of updates for a given metric.
   * Add example implementations and functional tests to demonstrate usage.

JIRA: [REEF-1732](https://issues.apache.org/jira/browse/REEF-1732)

Closes #1460 
diff --git a/lang/cs/Org.Apache.REEF.Common.Tests/Telemetry/CounterTests.cs b/lang/cs/Org.Apache.REEF.Common.Tests/Telemetry/CounterTests.cs
deleted file mode 100644
index 3973ad2..0000000
--- a/lang/cs/Org.Apache.REEF.Common.Tests/Telemetry/CounterTests.cs
+++ /dev/null
@@ -1,92 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using System.Collections.Generic;
-using Org.Apache.REEF.Common.Telemetry;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Xunit;
-
-namespace Org.Apache.REEF.Common.Tests.Telemetry
-{
-    public class CounterTests
-    {
-        /// <summary>
-        /// Test ICounters and IEvaluatorMetrics API.
-        /// </summary>
-        [Fact]
-        public void TestEvaluatorMetrics()
-        {
-            var metrics = TangFactory.GetTang().NewInjector().GetInstance<IEvaluatorMetrics>();
-            var counters = metrics.GetMetricsCounters();
-            counters.TryRegisterCounter("counter1", "counter1 description");
-            counters.TryRegisterCounter("counter2", "counter2 description");
-            ValidateCounter(counters, "counter1", 0);
-            ValidateCounter(counters, "counter2", 0);
-
-            counters.Increment("counter1", 3);
-            counters.Increment("counter1", 1);
-            counters.Increment("counter2", 2);
-            counters.Increment("counter2", 3);
-            ValidateCounter(counters, "counter1", 4);
-            ValidateCounter(counters, "counter2", 5);
-
-            var counterStr = metrics.Serialize();
-
-            var metrics2 = new EvaluatorMetrics(counterStr);
-            var counters2 = metrics2.GetMetricsCounters();
-            ValidateCounter(counters2, "counter1", 4);
-            ValidateCounter(counters2, "counter2", 5);
-        }
-
-        /// <summary>
-        /// Test TryRegisterCounter with a duplicated counter name
-        /// </summary>
-        [Fact]
-        public void TestDuplicatedCounters()
-        {
-            var counters = CreateCounters();
-            counters.TryRegisterCounter("counter1", "counter1 description");
-            Assert.False(counters.TryRegisterCounter("counter1", "counter1 description"));
-        }
-
-        /// <summary>
-        /// Test Increment for a non-registered counter.
-        /// </summary>
-        [Fact]
-        public void TestNoExistCounter()
-        {
-            var counters = CreateCounters();
-            Action increment = () => counters.Increment("counter1", 2);
-            Assert.Throws<ApplicationException>(increment);
-        }
-
-        private static void ValidateCounter(ICounters counters, string name, int expectedValue)
-        {
-            ICounter c1;
-            counters.TryGetValue(name, out c1);
-            Assert.Equal(expectedValue, c1.Value);
-        }
-
-        private static ICounters CreateCounters()
-        {
-            var m = TangFactory.GetTang().NewInjector().GetInstance<IEvaluatorMetrics>();
-            var c = m.GetMetricsCounters();
-            return c;
-        }
-    }
-}
diff --git a/lang/cs/Org.Apache.REEF.Common.Tests/Telemetry/MetricTests.cs b/lang/cs/Org.Apache.REEF.Common.Tests/Telemetry/MetricTests.cs
new file mode 100644
index 0000000..1701d53
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common.Tests/Telemetry/MetricTests.cs
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Linq;
+using Org.Apache.REEF.Common.Telemetry;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Xunit;
+
+namespace Org.Apache.REEF.Common.Tests.Telemetry
+{
+    public class MetricsTests
+    {
+        /// <summary>
+        /// Creates and registers a counter metric without tracking to the Evaluator metrics,
+        /// alters the values, then serializes and deserializes.
+        /// </summary>
+        [Fact]
+        public void TestEvaluatorMetricsCountersOnly()
+        {
+            var evalMetrics1 = TangFactory.GetTang().NewInjector().GetInstance<IEvaluatorMetrics>();
+            var counter1 = evalMetrics1.CreateAndRegisterMetric<CounterMetric>(
+                "counter1",
+                "Counter1 description",
+                false);
+
+            for (int i = 0; i < 5; i++)
+            {
+                counter1.Increment();
+            }
+            var evalMetricsData = evalMetrics1.GetMetricsData();
+            ValidateMetric(evalMetricsData, "counter1", 5);
+
+            foreach (var t in evalMetricsData.GetMetricTrackers())
+            {
+                // Records for counter1 should be null because KeepUpdateHistory is false.
+                Assert.Null(t.GetMetricRecords());
+            }
+
+            var metricsStr = evalMetrics1.Serialize();
+            var evalMetrics2 = new EvaluatorMetrics(metricsStr);
+            Assert.False(evalMetrics2.GetMetricsData().TryGetMetric("counter1", out CounterMetric metricObj));
+            Assert.Null(metricObj);
+        }
+
+        /// <summary>
+        /// Tests updating metric value.
+        /// </summary>
+        [Fact]
+        public void TestMetricSetValue()
+        {
+            var evalMetrics = TangFactory.GetTang().NewInjector().GetInstance<IEvaluatorMetrics>();
+            var intMetric = evalMetrics.CreateAndRegisterMetric<IntegerMetric>(
+                "IntMetric",
+                "metric of type int",
+                true);
+            var doubleMetric = evalMetrics.CreateAndRegisterMetric<DoubleMetric>(
+                "DouMetric",
+                "metric of type double",
+                true);
+
+            var evalMetricsData = evalMetrics.GetMetricsData();
+            ValidateMetric(evalMetricsData, "IntMetric", default(int));
+            ValidateMetric(evalMetricsData, "DouMetric", default(double));
+
+            intMetric.AssignNewValue(3);
+            doubleMetric.AssignNewValue(3.0);
+            ValidateMetric(evalMetricsData, "IntMetric", 3);
+            ValidateMetric(evalMetricsData, "DouMetric", 3.0);
+        }
+
+        /// <summary>
+        /// Test TryRegisterCounter with a duplicated counter name
+        /// </summary>
+        [Fact]
+        public void TestDuplicatedNames()
+        {
+            var metrics = CreateMetrics();
+            metrics.RegisterMetric(new CounterMetric("metric1", "metric description"));
+            Assert.Throws<ArgumentException>(
+                () => metrics.RegisterMetric(new CounterMetric("metric1", "duplicate name")));
+        }
+
+        [Fact]
+        public void TestMetricsSimulateHeartbeat()
+        {
+            var evalMetrics1 = TangFactory.GetTang().NewInjector().GetInstance<IEvaluatorMetrics>();
+            evalMetrics1.CreateAndRegisterMetric<CounterMetric>("counter", "counter with no records", false);
+            evalMetrics1.CreateAndRegisterMetric<IntegerMetric>("iteration", "iteration with records", true);
+            Assert.True(evalMetrics1.TryGetMetric("counter", out CounterMetric counter));
+            Assert.True(evalMetrics1.TryGetMetric("iteration", out IntegerMetric iter));
+            for (int i = 1; i <= 5; i++)
+            {
+                counter.Increment();
+                iter.AssignNewValue(i);
+            }
+
+            var me1Str = evalMetrics1.Serialize();
+            var evalMetrics2 = new EvaluatorMetrics(me1Str);
+            var metricsData2 = evalMetrics2.GetMetricsData();
+
+            var sink = TangFactory.GetTang().NewInjector().GetInstance<IMetricsSink>();
+            sink.Sink(metricsData2.FlushMetricRecords());
+
+            foreach (var t in metricsData2.GetMetricTrackers())
+            {
+                Assert.Equal(0, t.GetMetricRecords().ToList().Count);
+            }
+        }
+
+        private static void ValidateMetric(IMetricSet metricSet, string name, object expectedValue)
+        {
+            Assert.True(metricSet.TryGetMetric(name, out IMetric metric));
+            Assert.Equal(expectedValue, metric.ValueUntyped);
+        }
+
+        private static IMetricSet CreateMetrics()
+        {
+            var m = TangFactory.GetTang().NewInjector().GetInstance<IEvaluatorMetrics>();
+            var c = m.GetMetricsData();
+            return c;
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/Counter.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/Counter.cs
deleted file mode 100644
index de46bae..0000000
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/Counter.cs
+++ /dev/null
@@ -1,133 +0,0 @@
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using System.Runtime.Serialization;
-using Newtonsoft.Json;
-using Org.Apache.REEF.Utilities.Attributes;
-
-namespace Org.Apache.REEF.Common.Telemetry
-{
-    /// <summary>
-    /// Counter implementation
-    /// The properties that need to be serialized will be revisited later. We should only serialize minimum data to reduce the network load
-    /// For example, the name can be mapped to a unique number (byte) and description should not be serialized.
-    /// </summary>
-    [Unstable("0.16", "This is a simple counter for evaluator metrics.")]
-    [DataContract]
-    internal sealed class Counter : ICounter
-    {
-        /// <summary>
-        /// Name of the counter.
-        /// </summary>
-        private readonly string _name;
-
-        /// <summary>
-        /// Description of the counter.
-        /// </summary>
-        private readonly string _description;
-
-        /// <summary>
-        /// Time that the counter is updated.
-        /// </summary>
-        private long _timeStamp;
-
-        /// <summary>
-        /// Value of the counter.
-        /// </summary>
-        private int _value;
-
-        /// <summary>
-        /// Constructor to create a new counter.
-        /// </summary>
-        /// <param name="name"></param>
-        /// <param name="description"></param>
-        internal Counter(string name, string description)
-        {
-            _name = name;
-            _description = description;
-            _timeStamp = DateTime.Now.Ticks;
-            _value = 0;
-        }
-
-        /// <summary>
-        /// Constructor to create a counter from a serialized counter string
-        /// </summary>
-        [JsonConstructor]
-        internal Counter(string name, string description, long timeStamp, int value)
-        {
-            _name = name;
-            _description = description;
-            _timeStamp = timeStamp;
-            _value = value;
-        }
-
-        /// <summary>
-        /// Description of the counter.
-        /// </summary>
-        [DataMember]
-        public string Description
-        {
-            get
-            {
-                return _description;
-            }
-        }
-
-        /// <summary>
-        /// Name of the counter.
-        /// </summary>
-        [DataMember]
-        public string Name
-        {
-            get
-            {
-                return _name;
-            }
-        }
-
-        /// <summary>
-        /// Time that the counter is updated in the form of ticks.
-        /// </summary>
-        [DataMember]
-        public long Timestamp
-        {
-            get
-            {
-                return _timeStamp;
-            }
-        }
-
-        /// <summary>
-        /// Value of the counter.
-        /// </summary>
-        [DataMember]
-        public int Value
-        {
-            get
-            {
-                return _value;
-            }
-        }
-
-        /// <summary>
-        /// Increase the counter value and update the time stamp.
-        /// </summary>
-        /// <param name="number"></param>
-        public void Increment(int number)
-        {
-            _value += number;
-            _timeStamp = DateTime.Now.Ticks;
-        }
-    }
-}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs
deleted file mode 100644
index 5f23262..0000000
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterData.cs
+++ /dev/null
@@ -1,77 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System.Collections.Generic;
-
-namespace Org.Apache.REEF.Common.Telemetry
-{
-    /// <summary>
-    /// This class wraps a Counter object and the increment value since last sink
-    /// </summary>
-    internal sealed class CounterData
-    {
-        /// <summary>
-        /// Counter object
-        /// </summary>
-        private ICounter _counter;
-
-        /// <summary>
-        /// Counter increment value since last sink
-        /// </summary>
-        internal int IncrementSinceLastSink { get; private set; }
-
-        /// <summary>
-        /// Constructor for CounterData
-        /// </summary>
-        /// <param name="counter"></param>
-        /// <param name="initialValue"></param>
-        internal CounterData(ICounter counter, int initialValue)
-        {
-            _counter = counter;
-            IncrementSinceLastSink = initialValue;
-        }
-
-        /// <summary>
-        /// clear the increment since last sink
-        /// </summary>
-        internal void ResetSinceLastSink()
-        {
-            IncrementSinceLastSink = 0;
-        }
-
-        internal void UpdateCounter(ICounter counter)
-        {
-            IncrementSinceLastSink += counter.Value - _counter.Value;
-
-            //// TODO: [REEF-1748] The following cases need to be considered in determine how to update the counter:
-            //// if evaluator contains the aggregated values, the value will override existing value
-            //// if evaluator only keep delta, the value should be added at here. But the value in the evaluator should be reset after message is sent
-            //// For the counters from multiple evaluators with the same counter name, the value should be aggregated here
-            //// We also need to consider failure cases.  
-            _counter = counter;
-        }
-
-        /// <summary>
-        /// Get count name and value as KeyValuePair
-        /// </summary>
-        /// <returns></returns>
-        internal KeyValuePair<string, string> GetKeyValuePair()
-        {
-            return new KeyValuePair<string, string>(_counter.Name, _counter.Value.ToString());
-        }
-    }
-}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterMetric.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterMetric.cs
new file mode 100644
index 0000000..d5c74eb
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterMetric.cs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Threading;
+using Newtonsoft.Json;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// Counter metric implementation.
+    /// </summary>
+    public sealed class CounterMetric : MetricBase<int>, ICounter
+    {
+        public CounterMetric()
+        {
+        }
+
+        internal CounterMetric(string name, string description, bool keepHistory = false)
+            : base(name, description, keepHistory)
+        {
+        }
+
+        public void Increment(int number = 1)
+        {
+            _tracker.Track(Interlocked.Add(ref _typedValue, number));
+        }
+
+        public void Decrement(int number = 1)
+        {
+            _tracker.Track(Interlocked.Add(ref _typedValue, -number));
+        }
+
+        public override void AssignNewValue(int value)
+        {
+            Interlocked.Exchange(ref _typedValue, value);
+            _tracker.Track(value);
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/Counters.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/Counters.cs
deleted file mode 100644
index d681ec6..0000000
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/Counters.cs
+++ /dev/null
@@ -1,142 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using System.Collections.Generic;
-using Newtonsoft.Json;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Attributes;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Common.Telemetry
-{
-    [Unstable("0.16", "This is to build a collection of counters for evaluator metrics.")]
-    internal sealed class Counters : ICounters
-    {
-        private static readonly Logger Logger = Logger.GetLogger(typeof(Counters));
-
-        /// <summary>
-        /// It contains name and count pairs
-        /// </summary>
-        private readonly IDictionary<string, ICounter> _counters = new Dictionary<string, ICounter>();
-
-        /// <summary>
-        /// The lock for counters
-        /// </summary>
-        private readonly object _counterLock = new object();
-
-        [Inject]
-        private Counters()
-        {
-        }
-
-        /// <summary>
-        /// Deserialize a counters serialized string into a Counters object
-        /// </summary>
-        /// <param name="serializedCountersString"></param>
-        internal Counters(string serializedCountersString)
-        {
-            var c = JsonConvert.DeserializeObject<IEnumerable<Counter>>(serializedCountersString);
-            foreach (var ct in c)
-            {
-                _counters.Add(ct.Name, ct);
-            }
-        }
-
-        public IEnumerable<ICounter> GetCounters()
-        {
-            return _counters.Values;
-        }
-
-        /// <summary>
-        /// Register a new counter with a specified name.
-        /// If name does not exist, the counter will be added and true will be returned
-        /// Otherwise the counter will be not added and false will be returned. 
-        /// </summary>
-        /// <param name="name">Counter name</param>
-        /// <param name="description">Counter description</param>
-        /// <returns>Returns a boolean to indicate if the counter is added.</returns>
-        public bool TryRegisterCounter(string name, string description)
-        {
-            lock (_counterLock)
-            {
-                if (_counters.ContainsKey(name))
-                {
-                    Logger.Log(Level.Warning, "The counter [{0}] already exists.", name);
-                    return false;
-                }
-                _counters.Add(name, new Counter(name, description));
-            }
-            return true;
-        }
-
-        /// <summary>
-        /// Get counter for a given name
-        /// return false if the counter doesn't exist
-        /// </summary>
-        /// <param name="name">Name of the counter</param>
-        /// <param name="value">Value of the counter returned</param>
-        /// <returns>Returns a boolean to indicate if the value is found.</returns>
-        public bool TryGetValue(string name, out ICounter value)
-        {
-            lock (_counterLock)
-            {
-                return _counters.TryGetValue(name, out value);
-            }
-        }
-
-        /// <summary>
-        /// Increase the counter with the given number
-        /// </summary>
-        /// <param name="name">Name of the counter</param>
-        /// <param name="number">number to increase</param>
-        public void Increment(string name, int number)
-        {
-            ICounter counter;
-            if (TryGetValue(name, out counter))
-            {
-                lock (_counterLock)
-                {
-                    counter.Increment(number);
-                }
-            }
-            else
-            {
-                Logger.Log(Level.Error, "The counter [{0}]  has not registered.", name);
-                throw new ApplicationException("Counter has not registered:" + name);
-            }
-        }
-
-        /// <summary>
-        /// return serialized string of counter data
-        /// TODO: [REEF-] use an unique number for the counter name mapping to reduce the data transfer over the wire
-        /// TODO: [REEF-] use Avro schema if that can make the serialized string more compact
-        /// </summary>
-        /// <returns>Returns serialized string of the counters.</returns>
-        public string Serialize()
-        {
-            lock (_counterLock)
-            {
-                if (_counters.Count > 0)
-                {
-                    return JsonConvert.SerializeObject(_counters.Values);
-                }
-                return null;
-            }
-        }
-    }
-}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
deleted file mode 100644
index b8c22c8..0000000
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CountersData.cs
+++ /dev/null
@@ -1,98 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Linq;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.Common.Telemetry
-{
-    /// <summary>
-    /// This class maintains a collection of the data for all the counters for metrics service. 
-    /// When new counter data is received, the data in the collection will be updated.
-    /// After the data is processed, the increment since last process will be reset.
-    /// </summary>
-    internal sealed class CountersData
-    {
-        private static readonly Logger Logger = Logger.GetLogger(typeof(CountersData));
-
-        /// <summary>
-        /// Registration of counters
-        /// </summary>
-        private readonly IDictionary<string, CounterData> _counterMap = new ConcurrentDictionary<string, CounterData>();
-
-        [Inject]
-        private CountersData()
-        {            
-        }
-
-        /// <summary>
-        /// Update counters 
-        /// </summary>
-        /// <param name="counters"></param>
-        internal void Update(ICounters counters)
-        {
-            foreach (var counter in counters.GetCounters())
-            {
-                CounterData counterData;
-                if (_counterMap.TryGetValue(counter.Name, out counterData))
-                {
-                    counterData.UpdateCounter(counter);
-                }
-                else
-                {
-                    _counterMap.Add(counter.Name, new CounterData(counter, counter.Value));
-                }
-
-                Logger.Log(Level.Verbose, "Counter name: {0}, value: {1}, description: {2}, time: {3},  incrementSinceLastSink: {4}.",
-                    counter.Name, counter.Value, counter.Description, new DateTime(counter.Timestamp), _counterMap[counter.Name].IncrementSinceLastSink);
-            }
-        }
-
-        /// <summary>
-        /// Reset increment since last sink for each counter
-        /// </summary>
-        internal void Reset()
-        {
-            foreach (var c in _counterMap.Values)
-            {
-                c.ResetSinceLastSink();
-            }
-        }
-
-        /// <summary>
-        /// Convert the counter data into ISet for sink
-        /// </summary>
-        /// <returns></returns>
-        internal IEnumerable<KeyValuePair<string, string>> GetCounterData()
-        {
-            return _counterMap.Select(counter => counter.Value.GetKeyValuePair());
-        }
-
-        /// <summary>
-        /// The condition that triggers the sink. The condition can be modified later.
-        /// </summary>
-        /// <returns></returns>
-        internal bool TriggerSink(int counterSinkThreshold)
-        {
-            return _counterMap.Values.Sum(e => e.IncrementSinceLastSink) > counterSinkThreshold;
-        }
-    }
-}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
index 7f4fd95..ef5dbc8 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DefaultMetricsSink.cs
@@ -22,9 +22,8 @@
 namespace Org.Apache.REEF.Common.Telemetry
 {
     /// <summary>
-    /// This default IMetricsSink is just an example of IMetricsSink
-    /// Here the data is logged in Sink() method
-    /// It is more useful in test
+    /// This default IMetricsSink is a simple implementation of IMetricsSink
+    /// that logs the metrics on sink.
     /// </summary>
     internal sealed class DefaultMetricsSink : IMetricsSink
     {
@@ -36,14 +35,14 @@
         }
 
         /// <summary>
-        /// Simple sink for metrics data
+        /// Simple sink that logs metrics.
         /// </summary>
-        /// <param name="metrics">A collection of metrics data in Key value pair format.</param>
-        public void Sink(IEnumerable<KeyValuePair<string, string>> metrics)
+        /// <param name="metrics">A collection of metrics.</param>
+        public void Sink(IEnumerable<KeyValuePair<string, MetricTracker.MetricRecord>> metrics)
         {
             foreach (var m in metrics)
             {
-                Logger.Log(Level.Info, "Metrics - Name:{0}, Value:{1}.", m.Key, m.Value);
+                Logger.Log(Level.Info, "Metrics - Name:{0}, Value:{1}.", m.Key, m.Value.Value);
             }
         }
 
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DoubleMetric.cs
similarity index 60%
copy from lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
copy to lang/cs/Org.Apache.REEF.Common/Telemetry/DoubleMetric.cs
index 0f458c0..a15e716 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DoubleMetric.cs
@@ -15,12 +15,29 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using Org.Apache.REEF.Tang.Annotations;
+using System.Threading;
+using Newtonsoft.Json;
 
 namespace Org.Apache.REEF.Common.Telemetry
 {
-    [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
-    public class CounterSinkThreshold : Name<int>
+    /// <summary>
+    /// Double metric implementation.
+    /// </summary>
+    public class DoubleMetric : MetricBase<double>
     {
+        public DoubleMetric()
+        {
+        }
+
+        internal DoubleMetric(string name, string description, bool keepUpdateHistory = true)
+            : base(name, description, keepUpdateHistory)
+        {
+        }
+
+        public override void AssignNewValue(double value)
+        {
+            Interlocked.Exchange(ref _typedValue, value);
+            _tracker.Track(value);
+        }
     }
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs
index 2d634e3..9d41095 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetrics.cs
@@ -5,9 +5,9 @@
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
-// 
+//
 //   http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -15,25 +15,49 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using System;
+using StringMetric = Org.Apache.REEF.Common.Telemetry.MetricClass<string>;
 
 namespace Org.Apache.REEF.Common.Telemetry
 {
     /// <summary>
-    /// A simple driver metrics.
-    /// It contains system state for now.
-    /// It can be extended later to include more driver metrics data.
+    /// Driver metrics implementation that contains the system state.
     /// </summary>
     public sealed class DriverMetrics : IDriverMetrics
     {
-        public DriverMetrics(string systemState, DateTime timeUpdated)
+        private readonly MetricsData _metricsData;
+
+        public static string DriverStateMetric = "DriverState";
+
+        public DriverMetrics()
         {
-            SystemState = systemState;
-            TimeUpdated = timeUpdated;
+            _metricsData = new MetricsData();
+            var stateMetric = CreateAndRegisterMetric<StringMetric>(DriverStateMetric, "driver state.", false);
         }
 
-        public string SystemState { get; private set; }
+        public IMetricSet GetMetricsData()
+        {
+            return _metricsData;
+        }
 
-        public DateTime TimeUpdated { get; private set; }
+        public T CreateAndRegisterMetric<T>(string name, string description, bool keepUpdateHistory)
+            where T : MetricBase, new()
+        {
+            var metric = new T
+            {
+                Name = name,
+                Description = description,
+                KeepUpdateHistory = keepUpdateHistory
+            };
+            _metricsData.RegisterMetric(metric);
+            return metric;
+        }
+
+        public bool TryGetMetric<T>(string name, out T metric)
+            where T : IMetric
+        {
+            var ret = _metricsData.TryGetMetric(name, out IMetric me);
+            metric = (T)me;
+            return ret;
+        }
     }
 }
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs
index d7e00e5..5488667 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObserverConfigurationModule.cs
@@ -5,9 +5,9 @@
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
-// 
+//
 //   http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,7 +39,8 @@
         /// </summary>
         public readonly static ConfigurationModule ConfigurationModule = new DriverMetricsObserverConfigurationModule()
             .BindSetEntry(GenericType<DriverMetricsObservers>.Class, OnDriverMetrics)
-            .BindSetEntry<DriverMetricsObservers, MetricsService, IObserver<IDriverMetrics>>(GenericType<DriverMetricsObservers>.Class, GenericType<MetricsService>.Class)
+            .BindSetEntry<DriverMetricsObservers, MetricsService, IObserver<IDriverMetrics>>(
+                GenericType<DriverMetricsObservers>.Class, GenericType<MetricsService>.Class)
             .Build();
     }
 }
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs
index 21a3f4a..613e1a5 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/DriverMetricsObservers.cs
@@ -5,9 +5,9 @@
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
-// 
+//
 //   http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/EvaluatorMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/EvaluatorMetrics.cs
index 38f789c..72625c4 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/EvaluatorMetrics.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/EvaluatorMetrics.cs
@@ -16,19 +16,20 @@
 // under the License.
 
 using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Attributes;
 
 namespace Org.Apache.REEF.Common.Telemetry
 {
-    [Unstable("0.16", "This is to build a simple metrics with counters only. More metrics will be added in future.")]
+    /// <summary>
+    /// An evaluator metrics implementation that maintains a collection of metrics.
+    /// </summary>
     internal sealed class EvaluatorMetrics : IEvaluatorMetrics
     {
-        private readonly Counters _counters;
+        private readonly MetricsData _metricsData;
 
         [Inject]
-        private EvaluatorMetrics(Counters counters)
+        private EvaluatorMetrics(MetricsData metrics)
         {
-            _counters = counters;
+            _metricsData = metrics;
         }
 
         /// <summary>
@@ -37,29 +38,42 @@
         /// <param name="serializedMsg"></param>
         internal EvaluatorMetrics(string serializedMsg)
         {
-            _counters = new Counters(serializedMsg);
+            _metricsData = new MetricsData(serializedMsg);
         }
 
-        /// <summary>
-        /// Returns counters
-        /// </summary>
-        /// <returns>Returns counters.</returns>
-        public ICounters GetMetricsCounters()
+        public T CreateAndRegisterMetric<T>(string name, string description, bool keepUpdateHistory)
+            where T : MetricBase, new()
         {
-            return _counters;
+            var metric = new T
+            {
+                Name = name,
+                Description = description,
+                KeepUpdateHistory = keepUpdateHistory
+            };
+            _metricsData.RegisterMetric(metric);
+            return metric;
         }
 
-        /// <summary>
-        /// return serialized string of metrics counters data
-        /// </summary>
-        /// <returns>Returns serialized string of counters.</returns>
+        public IMetricSet GetMetricsData()
+        {
+            return _metricsData;
+        }
+
         public string Serialize()
         {
-            if (_counters != null)
+            if (_metricsData != null)
             {
-                return _counters.Serialize();
+                return _metricsData.SerializeAndReset();
             }
             return null;
         }
+
+        public bool TryGetMetric<T>(string name, out T metric)
+            where T : IMetric
+        {
+            var ret = _metricsData.TryGetMetric(name, out IMetric me);
+            metric = (T)me;
+            return ret;
+        }
     }
 }
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/FloatMetric.cs
similarity index 60%
copy from lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
copy to lang/cs/Org.Apache.REEF.Common/Telemetry/FloatMetric.cs
index 0f458c0..75f3b84 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/FloatMetric.cs
@@ -15,12 +15,29 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using Org.Apache.REEF.Tang.Annotations;
+using System.Threading;
+using Newtonsoft.Json;
 
 namespace Org.Apache.REEF.Common.Telemetry
 {
-    [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
-    public class CounterSinkThreshold : Name<int>
+    /// <summary>
+    /// Float Metric implementation.
+    /// </summary>
+    public class FloatMetric : MetricBase<float>
     {
+        public FloatMetric()
+        {
+        }
+
+        internal FloatMetric(string name, string description, bool keepUpdateHistory = true)
+            : base(name, description, keepUpdateHistory)
+        {
+        }
+
+        public override void AssignNewValue(float value)
+        {
+            Interlocked.Exchange(ref _typedValue, value);
+            _tracker.Track(value);
+        }
     }
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounter.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounter.cs
index be2e3c9..9e38974 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounter.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounter.cs
@@ -1,4 +1,8 @@
-// to you under the Apache License, Version 2.0 (the
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
 //
@@ -11,36 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using Org.Apache.REEF.Utilities.Attributes;
-
 namespace Org.Apache.REEF.Common.Telemetry
 {
-    [Unstable("0.16", "This is a simple counter for evaluator metrics.")]
     public interface ICounter
     {
         /// <summary>
-        /// Time the counter is updated.
-        /// </summary>
-        long Timestamp { get; }
-
-        /// <summary>
-        /// Name of the counter.
-        /// </summary>
-        string Name { get; }
-
-        /// <summary>
-        /// The description of the counter.
-        /// </summary>
-        string Description { get; }
-
-        /// <summary>
-        /// The value of the counter.
-        /// </summary>
-        int Value { get; }
-
-        /// <summary>
         /// Increase the current counter value with the number specified.
         /// </summary>
         void Increment(int number);
+
+        /// <summary>
+        /// Decrease the current counter value with the number specified.
+        /// </summary>
+        void Decrement(int number);
     }
 }
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounters.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounters.cs
deleted file mode 100644
index b6f8809..0000000
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/ICounters.cs
+++ /dev/null
@@ -1,65 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System.Collections.Generic;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Attributes;
-
-namespace Org.Apache.REEF.Common.Telemetry
-{
-    [Unstable("0.16", "This is to build a collection of counters for evaluator metrics.")]
-    [DefaultImplementation(typeof(Counters))]
-    public interface ICounters
-    {
-        /// <summary>
-        /// Register a new counter with a specified name.
-        /// If name does not exist, the counter will be added and true will be returned
-        /// Otherwise the counter will be not added and false will be returned. 
-        /// </summary>
-        /// <param name="name">Name of the counter to be registered.</param>
-        /// <param name="description">Description of the counter to be registered.</param>
-        /// <returns>Returns a boolean to indicate if the counter is added.</returns>
-        bool TryRegisterCounter(string name, string description);
-
-        /// <summary>
-        /// Get counter value for a given counter name
-        /// </summary>
-        /// <param name="name">Name of the counter</param>
-        /// <param name="counter">The counter object returned</param>
-        /// <returns>Returns a boolean to indicate if the value is found.</returns>
-        bool TryGetValue(string name, out ICounter counter);
-
-        /// <summary>
-        /// Increase the counter with the given number
-        /// </summary>
-        /// <param name="name">Name of the counter</param>
-        /// <param name="number">number to increase</param>
-        void Increment(string name, int number);
-
-        /// <summary>
-        /// Returns all the counters
-        /// </summary>
-        /// <returns></returns>
-        IEnumerable<ICounter> GetCounters();
-
-        /// <summary>
-        /// Serialize the  counter into a string
-        /// </summary>
-        /// <returns>Returns serialized string of the counters.</returns>
-        string Serialize();
-    }
-}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs
index 4f2c05d..39c8904 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IDriverMetrics.cs
@@ -5,9 +5,9 @@
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
-// 
+//
 //   http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -15,24 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using System;
 using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Attributes;
 
 namespace Org.Apache.REEF.Common.Telemetry
 {
-    [Unstable("0.16", "This is to build a simple metrics with system state only. More metrics will be added in future.")]
     [DefaultImplementation(typeof(DriverMetrics))]
-    public interface IDriverMetrics
+    public interface IDriverMetrics : IMetrics
     {
-        /// <summary>
-        /// System state
-        /// </summary>
-        string SystemState { get; }
-
-        /// <summary>
-        /// DateTime that the system state is updated
-        /// </summary>
-        DateTime TimeUpdated { get; }
     }
 }
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IEvaluatorMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IEvaluatorMetrics.cs
index f4476f5..4fd6dd9 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/IEvaluatorMetrics.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IEvaluatorMetrics.cs
@@ -16,22 +16,14 @@
 // under the License.
 
 using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Utilities.Attributes;
 
 namespace Org.Apache.REEF.Common.Telemetry
 {
-    [Unstable("0.16", "This is to build a simple metrics with counters only. More metrics will be added in future.")]
     [DefaultImplementation(typeof(EvaluatorMetrics))]
-    public interface IEvaluatorMetrics
+    public interface IEvaluatorMetrics : IMetrics
     {
         /// <summary>
-        /// Returns metrics counters
-        /// </summary>
-        /// <returns>Returns ICounters.</returns>
-        ICounters GetMetricsCounters();
-
-        /// <summary>
-        /// Serialize the metrics data into a string
+        /// Serializes the metrics data into a string.
         /// </summary>
         /// <returns>Returns serialized string of metrics</returns>
         string Serialize();
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetric.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetric.cs
new file mode 100644
index 0000000..f717551
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetric.cs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// Metric interface. A generic interface for individual metrics.
+    /// </summary>
+    public interface IMetric
+    {
+        /// <summary>
+        /// Name of the metric.
+        /// </summary>
+        string Name { get; }
+
+        /// <summary>
+        /// Description of the metric.
+        /// </summary>
+        string Description { get; }
+
+        /// <summary>
+        /// Value of the metric, stored as object.
+        /// </summary>
+        object ValueUntyped { get; }
+
+        /// <summary>
+        /// Flag for the immutability of the metric.
+        /// </summary>
+        bool KeepUpdateHistory { get; }
+
+        /// <summary>
+        /// Assign a tracker to track the metric.
+        /// </summary>
+        /// <param name="tracker">The metric tracker assigned to track updates.</param>
+        /// <returns></returns>
+        IDisposable Subscribe(ITracker tracker);
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricSet.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricSet.cs
new file mode 100644
index 0000000..928ba8f
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricSet.cs
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// Interface for a collection of metrics.
+    /// </summary>
+    [DefaultImplementation(typeof(MetricsData))]
+    public interface IMetricSet
+    {
+        /// <summary>
+        /// Add a metric object to this collection.
+        /// </summary>
+        /// <param name="metric">The metric object to add.</param>
+        void RegisterMetric(IMetric metric);
+
+        /// <summary>
+        /// Get metric value given the metric name.
+        /// </summary>
+        /// <param name="name">Name of the metric</param>
+        /// <param name="metric">The metric object returned</param>
+        /// <returns>Returns a boolean to indicate if the value is found.</returns>
+        bool TryGetMetric<T>(string name, out T metric)
+            where T : IMetric;
+
+        /// <summary>
+        /// Returns all the metric trackers.
+        /// </summary>
+        /// <returns></returns>
+        IEnumerable<MetricTracker> GetMetricTrackers();
+
+        /// <summary>
+        /// Empties the cached records for each metric.
+        /// </summary>
+        /// <returns>Key Value pair of metric name and record.</returns>
+        IEnumerable<KeyValuePair<string, MetricTracker.MetricRecord>> FlushMetricRecords();
+
+        /// <summary>
+        /// Serializes the metrics data.
+        /// </summary>
+        /// <returns>Serialized string.</returns>
+        string Serialize();
+    }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetrics.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetrics.cs
new file mode 100644
index 0000000..c1130f7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetrics.cs
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    public interface IMetrics
+    {
+        /// <summary>
+        /// Creates a system metric.
+        /// </summary>
+        /// <typeparam name="T">Type of the metric object.</typeparam>
+        /// <param name="name">Name of the metric.</param>
+        /// <param name="description">Description of the metric.</param>
+        /// <param name="keepUpdateHistory">whether to keep a history of updates on this metric.</param>
+        /// <returns></returns>
+        T CreateAndRegisterMetric<T>(string name, string description, bool keepUpdateHistory)
+            where T : MetricBase, new();
+
+        /// <summary>
+        /// Method that returns the collection of metric data.
+        /// </summary>
+        /// <returns></returns>
+        IMetricSet GetMetricsData();
+
+        /// <summary>
+        /// Extracts the metric object if it has been registered.
+        /// </summary>
+        /// <param name="name">Name of the metric.</param>
+        /// <param name="metric">The registered metric. null if not found.</param>
+        /// <returns></returns>
+        bool TryGetMetric<T>(string name, out T metric)
+            where T : IMetric;
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
index eef54db..42da671 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IMetricsSink.cs
@@ -23,11 +23,10 @@
 {
     /// <summary>
     /// Interface for metrics sink.
-    /// It is used to output IMRU metrics.
     /// </summary>
     [DefaultImplementation(typeof(DefaultMetricsSink))]
     public interface IMetricsSink : IDisposable
     {
-        void Sink(IEnumerable<KeyValuePair<string, string>> metrics);
+        void Sink(IEnumerable<KeyValuePair<string, MetricTracker.MetricRecord>> metrics);
     }
 }
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/ITracker.cs
similarity index 79%
copy from lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
copy to lang/cs/Org.Apache.REEF.Common/Telemetry/ITracker.cs
index 0f458c0..976b36a 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/ITracker.cs
@@ -15,12 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using Org.Apache.REEF.Tang.Annotations;
-
 namespace Org.Apache.REEF.Common.Telemetry
 {
-    [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
-    public class CounterSinkThreshold : Name<int>
+    /// <summary>
+    /// Tracker interface to track metrics.
+    /// </summary>
+    public interface ITracker
     {
+        void Track(object value);
     }
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/IntegerMetric.cs
similarity index 60%
copy from lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
copy to lang/cs/Org.Apache.REEF.Common/Telemetry/IntegerMetric.cs
index 0f458c0..7cd0e5a 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/IntegerMetric.cs
@@ -15,12 +15,29 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using Org.Apache.REEF.Tang.Annotations;
+using System.Threading;
+using Newtonsoft.Json;
 
 namespace Org.Apache.REEF.Common.Telemetry
 {
-    [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
-    public class CounterSinkThreshold : Name<int>
+    /// <summary>
+    /// Integer metric implementation.
+    /// </summary>
+    public class IntegerMetric : MetricBase<int>
     {
+        public IntegerMetric()
+        {
+        }
+
+        internal IntegerMetric(string name, string description, bool keepHistory = true)
+            : base(name, description, keepHistory)
+        {
+        }
+
+        public override void AssignNewValue(int value)
+        {
+            Interlocked.Exchange(ref _typedValue, value);
+            _tracker.Track(value);
+        }
     }
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/LongMetric.cs
similarity index 61%
copy from lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
copy to lang/cs/Org.Apache.REEF.Common/Telemetry/LongMetric.cs
index 0f458c0..08bad2b 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/LongMetric.cs
@@ -15,12 +15,29 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using Org.Apache.REEF.Tang.Annotations;
+using System.Threading;
+using Newtonsoft.Json;
 
 namespace Org.Apache.REEF.Common.Telemetry
 {
-    [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
-    public class CounterSinkThreshold : Name<int>
+    /// <summary>
+    /// Long metric implementation.
+    /// </summary>
+    public class LongMetric : MetricBase<long>
     {
+        public LongMetric()
+        {
+        }
+
+        internal LongMetric(string name, string description, bool keepHistory = true)
+            : base(name, description, keepHistory)
+        {
+        }
+
+        public override void AssignNewValue(long value)
+        {
+            Interlocked.Exchange(ref _typedValue, value);
+            _tracker.Track(value);
+        }
     }
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs
index 6230b65..7dfb5f7 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MessageSenderConfigurationModule.cs
@@ -5,9 +5,9 @@
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
-// 
+//
 //   http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricBase.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricBase.cs
new file mode 100644
index 0000000..7803ac1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricBase.cs
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Newtonsoft.Json;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// Provides a base implementation. Value of metric should be provided in derived classes.
+    /// </summary>
+    public abstract class MetricBase : IMetric
+    {
+        protected ITracker _tracker;
+
+        public string Name
+        {
+            get; internal set;
+        }
+
+        public string Description
+        {
+            get; internal set;
+        }
+
+        public bool KeepUpdateHistory
+        {
+            get; internal set;
+        }
+
+        public abstract object ValueUntyped
+        {
+            get;
+        }
+
+        protected MetricBase()
+        {
+        }
+
+        protected MetricBase(string name, string description, bool keepUpdateHistory = true)
+        {
+            Name = name;
+            Description = description;
+            KeepUpdateHistory = keepUpdateHistory;
+        }
+
+        public IDisposable Subscribe(ITracker tracker)
+        {
+            _tracker = tracker;
+            return new Unsubscriber(tracker);
+        }
+
+        private class Unsubscriber : IDisposable
+        {
+            private ITracker _tracker;
+
+            public Unsubscriber(ITracker tracker)
+            {
+                _tracker = tracker;
+            }
+
+            public void Dispose()
+            {
+                _tracker = null;
+            }
+        }
+    }
+
+    /// <summary>
+    /// Base implementation with a generic value type.
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    public abstract class MetricBase<T> : MetricBase
+    {
+        protected T _typedValue;
+
+        public override object ValueUntyped
+        {
+            get { return _typedValue; }
+        }
+
+        public T Value
+        {
+            get { return _typedValue; }
+        }
+
+        public MetricBase()
+        {
+            _typedValue = default;
+        }
+
+        protected MetricBase(string name, string description, bool keepUpdateHistory = true)
+            : base(name, description, keepUpdateHistory)
+        {
+            _typedValue = default;
+        }
+
+        /// <summary>
+        /// Assign and track the new value to metric.
+        /// In most cases, this method should be overridden in derived classes using Interlocked.
+        /// </summary>
+        /// <param name="value">Value to assign the metric.</param>
+        public abstract void AssignNewValue(T value);
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricClass.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricClass.cs
new file mode 100644
index 0000000..15cd302
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricClass.cs
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Threading;
+using Newtonsoft.Json;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// Metrics of reference types (such as strings) should inherit from this class.
+    /// </summary>
+    /// <typeparam name="T">The type of the metric should be of reference type.</typeparam>
+    public class MetricClass<T> : MetricBase<T> where T : class
+    {
+        public MetricClass()
+        {
+        }
+
+        internal MetricClass(string name, string description, bool keepHistory = true)
+            : base(name, description, keepHistory)
+        {
+        }
+
+        public override void AssignNewValue(T value)
+        {
+            Interlocked.Exchange(ref _typedValue, value);
+            _tracker.Track(value);
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinkThreshold.cs
similarity index 89%
rename from lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
rename to lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinkThreshold.cs
index 0f458c0..c2d592f 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/CounterSinkThreshold.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricSinkThreshold.cs
@@ -19,8 +19,8 @@
 
 namespace Org.Apache.REEF.Common.Telemetry
 {
-    [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "CounterSinkThreshold", DefaultValue = "1")]
-    public class CounterSinkThreshold : Name<int>
+    [NamedParameter(Documentation = "Threshold to trigger the sink.", ShortName = "MetricSinkThreshold", DefaultValue = "1")]
+    public class MetricSinkThreshold : Name<int>
     {
     }
 }
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricTracker.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricTracker.cs
new file mode 100644
index 0000000..f272a13
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricTracker.cs
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Threading;
+using Newtonsoft.Json;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// MetricData class maintains the current value of a single metric and keeps count of the
+    /// number of times this metric has been updated. If the metric is immutable, it keeps a
+    /// record of updates. Once the data has been processed, the records and count will reset.
+    /// </summary>
+    [JsonObject]
+    public sealed class MetricTracker : ITracker
+    {
+        [JsonProperty]
+        public readonly string MetricName;
+
+        [JsonProperty]
+        internal readonly bool KeepUpdateHistory;
+
+        /// <summary>
+        /// if KeepUpdateHistory is true, keeps a history of updates.
+        /// </summary>
+        [JsonProperty]
+        private ConcurrentQueue<MetricRecord> Records;
+
+        private IMetric _metric;
+
+        private IDisposable _unsubscriber;
+
+        /// <summary>
+        /// Constructor for MetricData called when metric is registered.
+        /// </summary>
+        /// <param name="metric"></param>
+        /// <param name="initialValue"></param>
+        internal MetricTracker(IMetric metric)
+        {
+            MetricName = metric.Name;
+            Subscribe(metric);
+            KeepUpdateHistory = metric.KeepUpdateHistory;
+            Records = KeepUpdateHistory ? new ConcurrentQueue<MetricRecord>() : null;
+            Records?.Enqueue(CreateMetricRecord(metric));
+        }
+
+        [JsonConstructor]
+        internal MetricTracker(
+            string metricName,
+            IEnumerable<MetricRecord> records,
+            bool keepUpdateHistory)
+        {
+            MetricName = metricName;
+            Records = new ConcurrentQueue<MetricRecord>(records);
+            KeepUpdateHistory = keepUpdateHistory;
+        }
+
+        /// <summary>
+        /// Flush records currently held in the records queue.
+        /// </summary>
+        /// <returns>A queue containing all the flushed records.</returns>
+        internal IEnumerable<MetricRecord> FlushRecordsCache()
+        {
+            ConcurrentQueue<MetricRecord> records = new ConcurrentQueue<MetricRecord>();
+            if (Records != null)
+            {
+                while (Records.TryDequeue(out MetricRecord record))
+                {
+                    records.Enqueue(record);
+                }
+            }
+            else
+            {
+                // Records will be empty only on eval side when tracker doesn't keep history.
+                records.Enqueue(CreateMetricRecord(_metric));
+            }
+            return records;
+        }
+
+        /// <summary>
+        /// When new metric data is received, update the value and records so it reflects the
+        /// new data. Called when Driver receives metrics from Evaluator.
+        /// </summary>
+        /// <param name="metric">Metric data received.</param>
+        internal MetricTracker UpdateMetric(MetricTracker metric)
+        {
+            if (metric.GetRecordCount() > 0)
+            {
+                var recordsToAdd = metric.GetMetricRecords();
+                if (KeepUpdateHistory)
+                {
+                    foreach (MetricRecord record in recordsToAdd)
+                    {
+                        Records.Enqueue(record);
+                    }
+                }
+                else
+                {
+                    Interlocked.Exchange(
+                        ref Records,
+                        new ConcurrentQueue<MetricRecord>(recordsToAdd));
+                }
+            }
+            return this;
+        }
+
+        /// <summary>
+        /// Get the metric with its most recent value.
+        /// </summary>
+        /// <return></returns>
+        internal IMetric GetMetric()
+        {
+            return _metric;
+        }
+
+        internal int GetRecordCount()
+        {
+            return Records.Count;
+        }
+
+        /// <summary>
+        /// If KeepUpdateHistory is true, it will return all the records; otherwise, it will
+        /// return one record with the most recent value.
+        /// </summary>
+        /// <returns>The history of the metric records.</returns>
+        internal IEnumerable<MetricRecord> GetMetricRecords()
+        {
+            return Records;
+        }
+
+        /// <summary>
+        /// Subscribes the tracker to a metric object.
+        /// </summary>
+        /// <param name="provider">The metric to track.</param>
+        public void Subscribe(IMetric provider)
+        {
+            _metric = provider;
+            _unsubscriber = provider.Subscribe(this);
+        }
+
+        /// <summary>
+        /// Unsubscribes the tracker from the metric it is tracking.
+        /// </summary>
+        public void Unsubscribe()
+        {
+            _unsubscriber.Dispose();
+        }
+
+        /// <summary>
+        /// Creates and queues a new metric record with a value.
+        /// </summary>
+        /// <param name="value">Value of the new record.</param>
+        public void Track(object value)
+        {
+            Records?.Enqueue(CreateMetricRecord(value));
+        }
+
+        private static MetricRecord CreateMetricRecord(IMetric metric)
+        {
+            return new MetricRecord(metric);
+        }
+
+        private static MetricRecord CreateMetricRecord(object val)
+        {
+            return new MetricRecord(val);
+        }
+
+        [JsonObject]
+        public class MetricRecord
+        {
+            [JsonProperty]
+            public readonly object Value;
+
+            [JsonProperty]
+            public readonly long Timestamp;
+
+            [JsonConstructor]
+            public MetricRecord(object value, long? timestamp = null)
+            {
+                Value = value;
+                Timestamp = timestamp.GetValueOrDefault(DateTime.Now.Ticks);
+            }
+
+            public MetricRecord(IMetric metric)
+            {
+                Value = metric.ValueUntyped;
+                Timestamp = DateTime.Now.Ticks;
+            }
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsData.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsData.cs
new file mode 100644
index 0000000..f55a448
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsData.cs
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Tang.Annotations;
+using Newtonsoft.Json;
+
+namespace Org.Apache.REEF.Common.Telemetry
+{
+    /// <summary>
+    /// This class maintains a collection of the data for all the metrics for metrics service.
+    /// When new metric data is received, the data in the collection will be updated.
+    /// After the data is processed, the changes since last process will be reset.
+    /// </summary>
+    public sealed class MetricsData : IMetricSet
+    {
+        private static readonly JsonSerializerSettings settings = new JsonSerializerSettings()
+        {
+            TypeNameHandling = TypeNameHandling.All
+        };
+
+        /// <summary>
+        /// Registration of metrics
+        /// </summary>
+        private readonly ConcurrentDictionary<string, MetricTracker> _metricsMap =
+            new ConcurrentDictionary<string, MetricTracker>();
+
+        [Inject]
+        internal MetricsData()
+        {
+        }
+
+        /// <summary>
+        /// Deserialization.
+        /// </summary>
+        /// <param name="serializedMetricsString">string to Deserialize.</param>
+        [JsonConstructor]
+        internal MetricsData(string serializedMetricsString)
+        {
+            var metrics = JsonConvert.DeserializeObject<IEnumerable<MetricTracker>>(
+                serializedMetricsString,
+                settings);
+
+            foreach (var m in metrics)
+            {
+                _metricsMap.TryAdd(m.MetricName, m);
+            }
+        }
+
+        public void RegisterMetric(IMetric metric)
+        {
+            if (!_metricsMap.TryAdd(metric.Name, new MetricTracker(metric)))
+            {
+                throw new ArgumentException("The metric [{0}] already exists.", metric.Name);
+            }
+        }
+
+        public bool TryGetMetric<T>(string name, out T metric)
+            where T : IMetric
+        {
+            bool success = _metricsMap.TryGetValue(name, out MetricTracker tracker);
+            metric = (T)tracker?.GetMetric();
+            return success && metric != null;
+        }
+
+        public IEnumerable<MetricTracker> GetMetricTrackers()
+        {
+            return _metricsMap.Values;
+        }
+
+        /// <summary>
+        /// Flushes changes since last sink for each metric.
+        /// Called when Driver is sinking metrics.
+        /// </summary>
+        /// <returns>Key value pairs of metric name and record that was flushed.</returns>
+        public IEnumerable<KeyValuePair<string, MetricTracker.MetricRecord>> FlushMetricRecords()
+        {
+            // for each metric, flush the records and create key value pairs
+            return _metricsMap.SelectMany(
+                kv => kv.Value.FlushRecordsCache().Select(
+                    r => new KeyValuePair<string, MetricTracker.MetricRecord>(kv.Key, r)));
+        }
+
+        /// <summary>
+        /// Updates metrics given another <see cref="MetricsData"/> object.
+        /// For every metric in the new set, if it is registered then update the value,
+        /// if it is not then add it to the registration.
+        /// </summary>
+        /// <param name="metrics">New metric values to be updated.</param>
+        internal void Update(IMetricSet metrics)
+        {
+            foreach (var tracker in metrics.GetMetricTrackers())
+            {
+                _metricsMap.AddOrUpdate(
+                    tracker.MetricName,
+                    tracker,
+                    (k, v) => v.UpdateMetric(tracker));
+            }
+        }
+
+        /// <summary>
+        /// Flushes that trackers contained in the queue.
+        /// Called when Evaluator is sending metrics information to Driver.
+        /// </summary>
+        /// <returns>Queue of trackers containing metric records.</returns>
+        internal IEnumerable<MetricTracker> FlushMetricTrackers()
+        {
+            return new ConcurrentQueue<MetricTracker>(_metricsMap.Select(
+                kv => new MetricTracker(
+                    kv.Value.MetricName,
+                    kv.Value.FlushRecordsCache(),
+                    kv.Value.KeepUpdateHistory)));
+        }
+
+        /// <summary>
+        /// Sums up the total changes to metrics to see if it has reached the sink threshold.
+        /// </summary>
+        /// <returns>Returns whether the sink threshold has been met.</returns>
+        internal bool TriggerSink(int metricSinkThreshold)
+        {
+            return _metricsMap.Values.Sum(e => e.GetRecordCount()) > metricSinkThreshold;
+        }
+
+        public string Serialize()
+        {
+            return Serialize(_metricsMap.Values);
+        }
+
+        internal string Serialize(IEnumerable<MetricTracker> trackers)
+        {
+            return JsonConvert.SerializeObject(trackers, settings);
+        }
+
+        internal string SerializeAndReset()
+        {
+            return Serialize(FlushMetricTrackers());
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs
index 640c483..81d095c 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsMessageSender.cs
@@ -5,9 +5,9 @@
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
-// 
+//
 //   http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,15 +18,14 @@
 using Org.Apache.REEF.Common.Context;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities;
-using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Common.Telemetry
 {
     /// <summary>
-    /// This class implements IContextMessageSource that is responsible to send context message
+    /// This class implements IContextMessageSource that is responsible to send Evaluator
+    /// metrics as a context message to the Driver.
     /// </summary>
-    [Unstable("0.16", "The metrics API is in development.")]
     internal sealed class MetricsMessageSender : IContextMessageSource
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsMessageSender));
@@ -58,6 +57,7 @@
         {
             get
             {
+                Logger.Log(Level.Info, "Getting context msg for eval metrics.");
                 var s = _evaluatorMetrics.Serialize();
                 if (s != null)
                 {
diff --git a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
index 44d2b32..4be778b 100644
--- a/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Telemetry/MetricsService.cs
@@ -5,9 +5,9 @@
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
-// 
+//
 //   http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,35 +22,31 @@
 using Org.Apache.REEF.Common.Context;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities;
-using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Common.Telemetry
 {
     /// <summary>
-    /// Metrics service. It is also a context message handler.
+    /// Metrics Service that handles metrics from the Evaluator and Driver.
     /// </summary>
-    [Unstable("0.16", "This is a simple MetricsService. More functionalities will be added.")]
     internal sealed class MetricsService : IObserver<IContextMessage>, IObserver<IDriverMetrics>
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsService));
 
         /// <summary>
-        /// Contains Counters received in the Metrics service
+        /// The set of metrics Metrics Service maintains.
         /// </summary>
-        private readonly CountersData _countersData;
+        private readonly MetricsData _metricsData;
 
         /// <summary>
-        /// A set of metrics sinks
+        /// A set of Metric Sinks.
         /// </summary>
         private readonly ISet<IMetricsSink> _metricsSinks;
 
         /// <summary>
-        /// The threshold that triggers the sinks. 
-        /// Currently only one threshold is defined for all the counters.
-        /// Later, it can be extended to define a threshold per counter.
+        /// The total number of changes that has to be met to trigger the sinks.
         /// </summary>
-        private readonly int _counterSinkThreshold;
+        private readonly int _metricSinkThreshold;
 
         /// <summary>
         /// It can be bound with driver configuration as a context message handler
@@ -58,45 +54,45 @@
         [Inject]
         private MetricsService(
             [Parameter(typeof(MetricSinks))] ISet<IMetricsSink> metricsSinks,
-            [Parameter(typeof(CounterSinkThreshold))] int counterSinkThreshold,
-            CountersData countersData)
+            [Parameter(typeof(MetricSinkThreshold))] int metricSinkThreshold,
+            MetricsData metricsData)
         {
             _metricsSinks = metricsSinks;
-            _counterSinkThreshold = counterSinkThreshold;
-            _countersData = countersData;
+            _metricSinkThreshold = metricSinkThreshold;
+            _metricsData = metricsData;
         }
 
         /// <summary>
-        /// It is called whenever context message is received
+        /// Called whenever context message is received
         /// </summary>
         /// <param name="contextMessage">Serialized EvaluatorMetrics</param>
         public void OnNext(IContextMessage contextMessage)
         {
             var msgReceived = ByteUtilities.ByteArraysToString(contextMessage.Message);
-            var counters = new EvaluatorMetrics(msgReceived).GetMetricsCounters();
+            var evalMetrics = new EvaluatorMetrics(msgReceived);
+            var metricsData = evalMetrics.GetMetricsData();
 
-            Logger.Log(Level.Info, "Received {0} counters with context message: {1}.",
-                counters.GetCounters().Count(), msgReceived);
+            Logger.Log(Level.Info, "Received {0} metrics with context message of length {1}",
+                metricsData.GetMetricTrackers().Count(), msgReceived.Length);
 
-            _countersData.Update(counters);
+            _metricsData.Update(metricsData);
 
-            if (_countersData.TriggerSink(_counterSinkThreshold))
+            if (_metricsData.TriggerSink(_metricSinkThreshold))
             {
-                Sink(_countersData.GetCounterData());
-                _countersData.Reset();
+                Sink(_metricsData.FlushMetricRecords());
             }
         }
 
         /// <summary>
-        /// Call each Sink to sink the data in the counters
+        /// Call each Sink to process the cached metric records.
         /// </summary>
-        private void Sink(IEnumerable<KeyValuePair<string, string>> metrics)
+        private void Sink(IEnumerable<KeyValuePair<string, MetricTracker.MetricRecord>> metricRecords)
         {
             foreach (var s in _metricsSinks)
             {
                 try
                 {
-                    Task.Run(() => s.Sink(metrics));
+                    Task.Run(() => s.Sink(metricRecords));
                 }
                 catch (Exception e)
                 {
@@ -109,9 +105,13 @@
             }
         }
 
+        /// <summary>
+        /// Called when task is completed to sink cached metrics.
+        /// </summary>
         public void OnCompleted()
         {
-            Logger.Log(Level.Info, "Completed");
+            Sink(_metricsData.FlushMetricRecords());
+            Logger.Log(Level.Info, "MetricsService completed");
         }
 
         public void OnError(Exception error)
@@ -121,17 +121,12 @@
 
         /// <summary>
         /// Observer of IDriverMetrics.
-        /// When Driver metrics data is changed, this method will be called.
-        /// It calls Sink to store/log the metrics data.
+        /// Called when Driver metrics data is changed.
         /// </summary>
         /// <param name="driverMetrics">driver metrics data.</param>
         public void OnNext(IDriverMetrics driverMetrics)
         {
-            Sink(new Dictionary<string, string>()
-            {
-                { "SystemState", driverMetrics.SystemState },
-                { "TimeUpdated", driverMetrics.TimeUpdated.ToLongTimeString() }
-            });
+            Sink(driverMetrics.GetMetricsData().FlushMetricRecords());
         }
     }
 }
diff --git a/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
index 957d505..e77f36f 100644
--- a/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/MetricsServiceConfigurationModule.cs
@@ -30,7 +30,7 @@
     public sealed class MetricsServiceConfigurationModule : ConfigurationModuleBuilder
     {
         public static readonly OptionalImpl<IMetricsSink> OnMetricsSink = new OptionalImpl<IMetricsSink>();
-        public static readonly OptionalParameter<int> CounterSinkThreshold = new OptionalParameter<int>();
+        public static readonly OptionalParameter<int> MetricSinkThreshold = new OptionalParameter<int>();
 
         /// <summary>
         /// It provides the configuration for MetricsService
@@ -40,7 +40,7 @@
                 GenericType<DriverBridgeConfigurationOptions.ContextMessageHandlers>.Class,
                 GenericType<MetricsService>.Class)
             .BindSetEntry(GenericType<MetricSinks>.Class, OnMetricsSink)
-            .BindNamedParameter(GenericType<CounterSinkThreshold>.Class, CounterSinkThreshold)
+            .BindNamedParameter(GenericType<MetricSinkThreshold>.Class, MetricSinkThreshold)
             .Build();
     }
 }
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
index 73df585..55cece4 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsDriver.cs
@@ -30,6 +30,7 @@
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Tests.Functional.Messaging;
 using Org.Apache.REEF.Utilities.Logging;
+using StringMetric = Org.Apache.REEF.Common.Telemetry.MetricClass<string>;
 
 namespace Org.Apache.REEF.Tests.Functional.Telemetry
 {
@@ -46,6 +47,8 @@
         private readonly IEvaluatorRequestor _evaluatorRequestor;
         internal const string EventPrefix = "TestState";
 
+        private IDriverMetrics _driverMetrics;
+
         /// <summary>
         /// a set of driver metrics observers.
         /// </summary>
@@ -53,7 +56,7 @@
 
         /// <summary>
         /// This driver inject DriverMetricsObservers and IDriverMetrics.
-        /// It keeps updating the driver metrics when receiving events. 
+        /// It keeps updating the driver metrics when receiving events.
         /// </summary>
         /// <param name="evaluatorRequestor"></param>
         /// <param name="driverMetricsObservers"></param>
@@ -67,6 +70,7 @@
 
         public void OnNext(IDriverStarted value)
         {
+            _driverMetrics = new DriverMetrics();
             UpdateMetrics(TestSystemState.DriverStarted);
 
             var request =
@@ -116,7 +120,7 @@
         {
             Logger.Log(Level.Info, "Received ICompletedTask");
             UpdateMetrics(TestSystemState.TaskCompleted);
-
+            FlushMetricsCache();
             value.ActiveContext.Dispose();
         }
 
@@ -135,11 +139,20 @@
         /// </summary>
         private void UpdateMetrics(TestSystemState systemState)
         {
-            var driverMetrics = new DriverMetrics(EventPrefix + systemState, DateTime.Now);
+            _driverMetrics.TryGetMetric(DriverMetrics.DriverStateMetric, out StringMetric stateMetric);
+            stateMetric.AssignNewValue(EventPrefix + systemState.ToString());
 
             foreach (var metricsObserver in _driverMetricsObservers)
             {
-                metricsObserver.OnNext(driverMetrics);
+                metricsObserver.OnNext(_driverMetrics);
+            }
+        }
+
+        private void FlushMetricsCache()
+        {
+            foreach(var metricsObserver in _driverMetricsObservers)
+            {
+                metricsObserver.OnCompleted();
             }
         }
     }
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs
index 88e9461..b475651 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/MetricsTask.cs
@@ -30,26 +30,29 @@
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof(MetricsTask));
 
-        public const string TestCounter1 = "TestCounter1";
-        public const string TestCounter2 = "TestCounter2";
+        public const string TestCounter = "TestCounter";
+        public const string TestIntMetric = "Iterations";
 
-        private readonly ICounters _counters;
+        private readonly IEvaluatorMetrics _evaluatorMetrics;
+
+        public CounterMetric metric1;
+        public IntegerMetric metric2;
 
         [Inject]
         private MetricsTask(IEvaluatorMetrics evaluatorMetrics)
         {
-            _counters = evaluatorMetrics.GetMetricsCounters();
-            _counters.TryRegisterCounter(TestCounter1, "This is " + TestCounter1);
-            _counters.TryRegisterCounter(TestCounter2, "This is " + TestCounter2);
+            _evaluatorMetrics = evaluatorMetrics;
+            metric1 = _evaluatorMetrics.CreateAndRegisterMetric<CounterMetric>(TestCounter, TestCounter + " description", false);
+            metric2 = _evaluatorMetrics.CreateAndRegisterMetric<IntegerMetric>(TestIntMetric, TestIntMetric + " description", true);
         }
 
         public byte[] Call(byte[] memento)
         {
-            for (int i = 0; i < 100; i++)
+            for (int i = 1; i <= 2000; i++)
             {
-                _counters.Increment(TestCounter1, 1);
-                _counters.Increment(TestCounter2, 2);
-                Thread.Sleep(100);
+                metric1.Increment();
+                metric2.AssignNewValue(i);
+                Thread.Sleep(10);
             }
             return null;
         }
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
index 0f10ac8..c0aa2ab 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Telemetry/TestMetricsMessage.cs
@@ -38,7 +38,7 @@
             TestRun(DriverConfigurations(), typeof(MetricsDriver), 1, "sendMessages", "local", testFolder);
             ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
             string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 240);
-            var receivedCounterMessage = GetMessageCount(lines, "Received 2 counters with context message:");
+            var receivedCounterMessage = GetMessageCount(lines, "Received 2 metrics with context message of length");
             Assert.True(receivedCounterMessage > 1);
 
             var messageCount = GetMessageCount(lines, MetricsDriver.EventPrefix);
@@ -59,7 +59,7 @@
 
             var metricServiceConfig = MetricsServiceConfigurationModule.ConfigurationModule
                 .Set(MetricsServiceConfigurationModule.OnMetricsSink, GenericType<DefaultMetricsSink>.Class)
-                .Set(MetricsServiceConfigurationModule.CounterSinkThreshold, "5")
+                .Set(MetricsServiceConfigurationModule.MetricSinkThreshold, "1000")
                 .Build();
 
             var driverMetricConfig = DriverMetricsObserverConfigurationModule.ConfigurationModule.Build();