[GRIFFIN-358] New Profiling Measure
diff --git a/measure/src/main/resources/config-batch-preproc.json b/measure/src/main/resources/config-batch-preproc.json
index 6de4b4f..c895241 100644
--- a/measure/src/main/resources/config-batch-preproc.json
+++ b/measure/src/main/resources/config-batch-preproc.json
@@ -15,7 +15,7 @@
         },
         "pre.proc": [
           "select split(value, ',') as part from this",
-          "select part[0] as user_id, part[1] as first_name, part[2] as last_name, part[3] as address, part[4] as email, part[5] as phone, part[6] as post_code from this"
+          "select cast(part[0] as long) as user_id, part[1] as first_name, part[2] as last_name, part[3] as address, part[4] as email, part[5] as phone, part[6] as post_code from this"
         ]
       }
     }
@@ -35,6 +35,26 @@
           "flatten": "map"
         }
       ]
+    },
+    {
+      "name": "profiling_measure",
+      "type": "profiling",
+      "data.source": "source",
+      "config": {
+        "expr": [
+          "user_id",
+          "post_code"
+        ],
+        "approx.distinct.count": true,
+        "round.scale": 2
+      },
+      "out": [
+        {
+          "type": "metric",
+          "name": "prof_metric",
+          "flatten": "default"
+        }
+      ]
     }
   ],
   "sinks": [
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
index ee2bd28..8d73e1b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
@@ -25,7 +25,7 @@
 
   type MeasureType = Value
 
-  val Completeness, SparkSQL = Value
+  val Completeness, Profiling, SparkSQL = Value
 
   override def withNameWithDefault(name: String): MeasureType = {
     super.withNameWithDefault(name)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
index c503d6a..30b1738 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
@@ -52,4 +52,6 @@
     }
   }
 
+  def clear(): Unit = metrics.clear()
+
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
index c8d4149..a327f0e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
@@ -37,27 +37,31 @@
   val supportsRecordWrite: Boolean
   val supportsMetricWrite: Boolean
 
-  final val measureViolationsColName = s"${MeasureColPrefix}_${measureParam.getName}"
+  final val valueColumn = s"${MeasureColPrefix}_${measureParam.getName}"
 
   def getFromConfig[T: ClassTag](key: String, defValue: T): T = {
     measureParam.getConfig.getAnyRef[T](key, defValue)
   }
 
   def preProcessMetrics(input: DataFrame): DataFrame = {
-    val measureType = measureParam.getType.toString.toLowerCase(Locale.ROOT)
+    if (supportsMetricWrite) {
+      val measureType = measureParam.getType.toString.toLowerCase(Locale.ROOT)
 
-    input
-      .withColumn(MeasureName, typedLit[String](measureParam.getName))
-      .withColumn(MeasureType, typedLit[String](measureType))
-      .withColumn("value", col(measureViolationsColName))
-      .withColumn("data_source", typedLit[String](measureParam.getDataSource))
-      .select(MeasureName, MeasureType, "data_source", "value")
+      input
+        .withColumn(MeasureName, typedLit[String](measureParam.getName))
+        .withColumn(MeasureType, typedLit[String](measureType))
+        .withColumn("value", col(valueColumn))
+        .withColumn("data_source", typedLit[String](measureParam.getDataSource))
+        .select(MeasureName, MeasureType, "data_source", "value")
+    } else input
   }
 
   def preProcessRecords(input: DataFrame): DataFrame = {
-    input
-      .withColumn(Status, when(col(measureViolationsColName) === 0, "good").otherwise("bad"))
-      .drop(measureViolationsColName)
+    if (supportsRecordWrite) {
+      input
+        .withColumn(Status, when(col(valueColumn) === 0, "good").otherwise("bad"))
+        .drop(valueColumn)
+    } else input
   }
 
   def impl(sparkSession: SparkSession): (DataFrame, DataFrame)
@@ -65,13 +69,16 @@
   def execute(sparkSession: SparkSession, batchId: Option[Long]): (DataFrame, DataFrame) = {
     val (badRecordsDf, metricDf) = impl(sparkSession)
 
+    val processedRecordDf = preProcessRecords(badRecordsDf)
+    val processedMetricDf = preProcessMetrics(metricDf)
+
     var batchDetailsOpt = StringUtils.EMPTY
     val res = batchId match {
       case Some(batchId) =>
         implicit val bId: Long = batchId
         batchDetailsOpt = s"for batch id $bId"
-        (appendBatchIdIfAvailable(badRecordsDf), appendBatchIdIfAvailable(metricDf))
-      case None => (badRecordsDf, metricDf)
+        (appendBatchIdIfAvailable(processedRecordDf), appendBatchIdIfAvailable(processedMetricDf))
+      case None => (processedRecordDf, processedMetricDf)
     }
 
     info(
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
index 7f6eed7..115cc1b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
@@ -80,6 +80,7 @@
   private def createMeasure(measureParam: MeasureParam): Measure = {
     measureParam.getType match {
       case MeasureTypes.Completeness => CompletenessMeasure(measureParam)
+      case MeasureTypes.Profiling => ProfilingMeasure(measureParam)
       case _ =>
         val errorMsg = s"Measure type '${measureParam.getType}' is not supported."
         val exception = new NotImplementedError(errorMsg)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
index 1f201f4..b1178f8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
@@ -49,19 +49,16 @@
     }
 
     val selectCols = Seq(Total, Complete, InComplete).flatMap(e => Seq(lit(e), col(e)))
-    val metricColumn: Column = map(selectCols: _*).as(measureViolationsColName)
+    val metricColumn: Column = map(selectCols: _*).as(valueColumn)
 
     val input = sparkSession.read.table(measureParam.getDataSource)
-    val resultDf = input.withColumn(measureViolationsColName, column)
+    val badRecordsDf = input.withColumn(valueColumn, column)
 
-    val metricDf = preProcessMetrics(
-      resultDf
-        .withColumn(Total, lit(1))
-        .agg(sum(Total).as(Total), sum(measureViolationsColName).as(InComplete))
-        .withColumn(Complete, col(Total) - col(InComplete))
-        .select(metricColumn))
-
-    val badRecordsDf = preProcessRecords(resultDf)
+    val metricDf = badRecordsDf
+      .withColumn(Total, lit(1))
+      .agg(sum(Total).as(Total), sum(valueColumn).as(InComplete))
+      .withColumn(Complete, col(Total) - col(InComplete))
+      .select(metricColumn)
 
     (badRecordsDf, metricDf)
   }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
new file mode 100644
index 0000000..ef6ed73
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
@@ -0,0 +1,152 @@
+package org.apache.griffin.measure.execution.impl
+
+import java.util.Locale
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types._
+
+import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
+import org.apache.griffin.measure.execution.Measure
+import org.apache.griffin.measure.step.builder.ConstantColumns
+
+case class ProfilingMeasure(measureParam: MeasureParam) extends Measure {
+
+  import Measure._
+  import ProfilingMeasure._
+
+  override val supportsRecordWrite: Boolean = false
+
+  override val supportsMetricWrite: Boolean = true
+
+  private val roundScale: Int = getFromConfig[java.lang.Integer](RoundScaleStr, 3)
+  private val approxDistinctCount: Boolean =
+    getFromConfig[java.lang.Boolean](ApproxDistinctCountStr, true)
+
+  override def impl(sparkSession: SparkSession): (DataFrame, DataFrame) = {
+    val input = sparkSession.read.table(measureParam.getDataSource)
+    val profilingColNames =
+      getFromConfig[Seq[String]](Expression, input.columns)
+        .map(_.trim.toLowerCase(Locale.ROOT))
+        .toSet
+
+    assert(
+      profilingColNames.nonEmpty,
+      s"Invalid columns [${profilingColNames.mkString(", ")}] were provided for profiling")
+
+    val profilingCols =
+      input.schema.fields.filter(f =>
+        profilingColNames.contains(f.name) && !f.name.equalsIgnoreCase(ConstantColumns.tmst))
+
+    val profilingExprs = profilingCols.foldLeft(Array.empty[Column])((exprList, field) => {
+      val colName = field.name
+      val profilingExprs = getProfilingExprs(field, roundScale, approxDistinctCount)
+
+      exprList.:+(
+        map(lit(colName).as(ColName), struct(profilingExprs: _*))
+          .as(s"$DetailsPrefix$colName"))
+    })
+
+    val aggregateDf = profilingCols
+      .foldLeft(input)((df, field) => {
+        val colName = field.name
+        val column = col(colName)
+
+        val lengthColName = lengthColFn(colName)
+        val nullColName = nullsInColFn(colName)
+
+        df.withColumn(lengthColName, length(column))
+          .withColumn(nullColName, when(isnull(column).or(isnan(column)), 1L).otherwise(0L))
+      })
+      .agg(count(lit(1L)).as(TotalCount), profilingExprs: _*)
+
+    val detailCols = aggregateDf.columns.filter(_.startsWith(DetailsPrefix)).map(col)
+
+    val metricDf = aggregateDf
+      .withColumn(ColumnDetails, array(detailCols: _*))
+      .select(TotalCount, ColumnDetails)
+      .select(to_json(struct(AllColumns)).as(valueColumn))
+
+    (sparkSession.emptyDataFrame, metricDf)
+  }
+
+}
+
+object ProfilingMeasure {
+
+  /**
+   * Options Keys
+   */
+  private final val RoundScaleStr: String = "round.scale"
+  private final val ApproxDistinctCountStr: String = "approx.distinct.count"
+
+  /**
+   * Structure Keys
+   */
+  private final val ColumnDetails: String = "column_details"
+  private final val ColName: String = "col_name"
+  private final val DataTypeStr: String = "data_type"
+  private final val TotalCount: String = "total_count"
+
+  /**
+   * Prefix Keys
+   */
+  private final val ApproxPrefix: String = "approx_"
+  private final val DetailsPrefix: String = "details_"
+  private final val ColumnLengthPrefix: String = "col_len"
+  private final val IsNullPrefix: String = "is_null"
+
+  /**
+   * Column Detail Keys
+   */
+  private final val NullCount: String = "null_count"
+  private final val DistinctCount: String = "distinct_count"
+  private final val Min: String = "min"
+  private final val Max: String = "max"
+  private final val Avg: String = "avg"
+  private final val StdDeviation: String = "std_dev"
+  private final val Variance: String = "variance"
+  private final val Kurtosis: String = "kurtosis"
+  private final val MinColLength: String = s"${Min}_$ColumnLengthPrefix"
+  private final val MaxColLength: String = s"${Max}_$ColumnLengthPrefix"
+  private final val AvgColLength: String = s"${Avg}_$ColumnLengthPrefix"
+
+  private final val AllColumns: String = "*"
+
+  private def lengthColFn(colName: String): String = s"${ColumnLengthPrefix}_$colName"
+
+  private def nullsInColFn(colName: String): String = s"${IsNullPrefix}_$colName"
+
+  private def forNumericFn(t: DataType, value: Column, alias: String): Column = {
+    (if (t.isInstanceOf[NumericType]) value else lit(null)).as(alias)
+  }
+
+  private def getProfilingExprs(
+      field: StructField,
+      roundScale: Int,
+      approxDistinctCount: Boolean): Seq[Column] = {
+    val colName = field.name
+    val colType = field.dataType
+
+    val column = col(colName)
+    val lengthColExpr = col(lengthColFn(colName))
+    val nullColExpr = col(nullsInColFn(colName))
+    val distinctCountExpr =
+      if (approxDistinctCount) approx_count_distinct(column).as(s"$ApproxPrefix$DistinctCount")
+      else countDistinct(column).as(DistinctCount)
+
+    Seq(
+      lit(colType.catalogString).as(DataTypeStr),
+      min(lengthColExpr).as(MinColLength),
+      max(lengthColExpr).as(MaxColLength),
+      forNumericFn(colType, avg(lengthColExpr), AvgColLength),
+      forNumericFn(colType, min(column), Min),
+      forNumericFn(colType, max(column), Max),
+      forNumericFn(colType, bround(avg(column), roundScale), Avg),
+      forNumericFn(colType, bround(stddev(column), roundScale), StdDeviation),
+      forNumericFn(colType, bround(variance(column), roundScale), Variance),
+      forNumericFn(colType, bround(kurtosis(column), roundScale), Kurtosis),
+      distinctCountExpr,
+      sum(nullColExpr).as(NullCount))
+  }
+}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
index 8459e88..936d329 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
@@ -45,13 +45,13 @@
   def validate(): Boolean = true
 
   override def open(applicationId: String): Unit = {
-    griffinLogger.info(
+    info(
       s"Opened ConsoleSink for job with name '$jobName', " +
         s"timestamp '$timeStamp' and applicationId '$applicationId'")
   }
 
   override def close(): Unit = {
-    griffinLogger.info(
+    info(
       s"Closed ConsoleSink for job with name '$jobName' and timestamp '$timeStamp'")
   }
 
@@ -60,7 +60,7 @@
   override def sinkRecords(records: Iterable[String], name: String): Unit = {}
 
   override def sinkMetrics(metrics: Map[String, Any]): Unit = {
-    griffinLogger.info(s"$jobName [$timeStamp] metrics:\n${JsonUtil.toJson(metrics)}")
+    info(s"$jobName [$timeStamp] metrics:\n${JsonUtil.toJson(metrics)}")
   }
 
   override def sinkBatchRecords(dataset: DataFrame, key: Option[String] = None): Unit = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 980bded..f6802ec 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -31,7 +31,7 @@
   val writeTimestampOpt: Option[Long] = None
 
   def execute(context: DQContext): Try[Boolean] = Try {
-    context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
+    val res = context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
       val (t, metric) = pair
       val pr =
         try {
@@ -50,6 +50,9 @@
         }
       ret && pr
     }
+
+    context.metricWrapper.clear()
+    res
   }
 
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
index db6bb63..43cfcee 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
@@ -21,20 +21,22 @@
 
 import scala.reflect._
 
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.commons.lang3.StringEscapeUtils
 
 object JsonUtil {
   val mapper = new ObjectMapper()
   mapper.registerModule(DefaultScalaModule)
   mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+  mapper.configure(SerializationFeature.INDENT_OUTPUT, true)
 
   def toJson(value: Map[Symbol, Any]): String = {
     toJson(value map { case (k, v) => k.name -> v })
   }
 
   def toJson(value: Any): String = {
-    mapper.writeValueAsString(value)
+    StringEscapeUtils.unescapeJava(mapper.writeValueAsString(value))
   }
 
   def fromJson[T: ClassTag](json: String): T = {