optimize get metric maps in 'MetricWriteStep'
**Why/What changes?**
In 'MetricWriteStep.getMetricMaps()' the dataframe was transformed to json rdd, and then collect, and then transformed to Seq[Map].
It's not elegant and hard to understand. More optimized way is to collect it first, and then transform it to Seq[Map] directly.
We have test it with our DQ cases. It works well.
Author: yuxiaoyu <yuxiaoyu@bytedance.com>
Closes #566 from XiaoyuBD/optimizeMetricWriteGetMaps.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index d43a265..cdf337b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -82,16 +82,10 @@
private def getMetricMaps(context: DQContext): Seq[Map[String, Any]] = {
try {
val pdf = context.sparkSession.table(s"`$inputName`")
- val records = pdf.toJSON.collect()
- if (records.length > 0) {
- records.flatMap { rec =>
- try {
- val value = JsonUtil.toAnyMap(rec)
- Some(value)
- } catch {
- case _: Throwable => None
- }
- }.toSeq
+ val rows = pdf.collect()
+ val columns = pdf.columns
+ if (rows.size > 0) {
+ rows.map(_.getValuesMap(columns))
} else Nil
} catch {
case e: Throwable =>