[GRIFFIN-234] Add applicationId to MetricWrapper
In order to store more detailed information about certain metric the YARN application id associated with that metric has been added to MetricWrapper's flush method.
Author: dershovGD <dershov@griddynamics.com>
Closes #483 from dershovGD/GRIFFIN-233-modify-ElasticSearchSink.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index b0759c5..2fdf409 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -44,7 +44,7 @@
val dataFrameCache: DataFrameCache = DataFrameCache()
- val metricWrapper: MetricWrapper = MetricWrapper(name)
+ val metricWrapper: MetricWrapper = MetricWrapper(name, sparkSession.sparkContext.applicationId)
val writeMode = WriteMode.defaultMode(procType)
val dataSourceNames: Seq[String] = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
index cec737f..df162a7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
@@ -23,11 +23,12 @@
/**
* wrap metrics into one, each calculation produces one metric map
*/
-case class MetricWrapper(name: String) extends Serializable {
+case class MetricWrapper(name: String, applicationId: String) extends Serializable {
val _Name = "name"
val _Timestamp = "tmst"
val _Value = "value"
+ val _ApplicationId = "applicationId"
val metrics: MutableMap[Long, Map[String, Any]] = MutableMap()
@@ -45,7 +46,8 @@
(timestamp, Map[String, Any](
(_Name -> name),
(_Timestamp -> timestamp),
- (_Value -> value)
+ (_Value -> value),
+ (_ApplicationId -> applicationId)
))
}
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
index c835611..8ad7d5d 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
@@ -23,19 +23,20 @@
class MetricWrapperTest extends FlatSpec with Matchers {
"metric wrapper" should "flush empty if no metric inserted" in {
- val metricWrapper = MetricWrapper("name")
+ val metricWrapper = MetricWrapper("name", "appId")
metricWrapper.flush should be (Map[Long, Map[String, Any]]())
}
it should "flush all metrics inserted" in {
- val metricWrapper = MetricWrapper("test")
+ val metricWrapper = MetricWrapper("test", "appId")
metricWrapper.insertMetric(1, Map("total" -> 10, "miss"-> 2))
metricWrapper.insertMetric(1, Map("match" -> 8))
metricWrapper.insertMetric(2, Map("total" -> 20))
metricWrapper.insertMetric(2, Map("miss" -> 4))
metricWrapper.flush should be (Map(
- 1L -> Map("name" -> "test", "tmst" -> 1, "value" -> Map("total" -> 10, "miss"-> 2, "match" -> 8)),
- 2L -> Map("name" -> "test", "tmst" -> 2, "value" -> Map("total" -> 20, "miss"-> 4))
+ 1L -> Map("name" -> "test", "tmst" -> 1, "value" -> Map("total" -> 10, "miss"-> 2, "match" -> 8),
+ "applicationId" -> "appId"),
+ 2L -> Map("name" -> "test", "tmst" -> 2, "value" -> Map("total" -> 20, "miss"-> 4), "applicationId" -> "appId")
))
}