[GRIFFIN-289] New feature for griffin COMPLETENESS dq type

As describing in GRIFFIN-289, add two new ways to check 'incompleteness' record: regular expression and  enumeration.

Add 'error.confs' in dq json file. Each json object in 'error.confs' list means one column configuration.

If do not have 'error.confs', using old 'incompleteness' process, which is compatible for existing json file.

Add ut for the new json format.

Author: ‘Zhao <mrlzbebop@gmail.com>
Author: Zhao Li <mrlzbebop@gmail.com>

Closes #538 from LittleZhao/griffin-289.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index d41abf3..ffbf8d1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -141,6 +141,7 @@
   * @param details    detail config of rule (optional)
   * @param cache      cache the result for multiple usage (optional, valid for "spark-sql" and "df-ops" mode)
   * @param outputs    output ways configuration (optional)
+  * @param errorConfs error configuration (valid for 'COMPLETENESS' mode)
   */
 @JsonInclude(Include.NON_NULL)
 case class RuleParam(@JsonProperty("dsl.type") private val dslType: String,
@@ -150,7 +151,8 @@
                      @JsonProperty("rule") private val rule: String = null,
                      @JsonProperty("details") private val details: Map[String, Any] = null,
                      @JsonProperty("cache") private val cache: Boolean = false,
-                     @JsonProperty("out") private val outputs: List[RuleOutputParam] = null
+                     @JsonProperty("out") private val outputs: List[RuleOutputParam] = null,
+                     @JsonProperty("error.confs") private val errorConfs: List[RuleErrorConfParam] = null
                     ) extends Param {
   def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("")
   def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
@@ -164,6 +166,8 @@
   def getOutputs: Seq[RuleOutputParam] = if (outputs != null) outputs else Nil
   def getOutputOpt(tp: OutputType): Option[RuleOutputParam] = getOutputs.filter(_.getOutputType == tp).headOption
 
+  def getErrorConfs: Seq[RuleErrorConfParam] = if (errorConfs != null) errorConfs else Nil
+
   def replaceInDfName(newName: String): RuleParam = {
     if (StringUtils.equals(newName, inDfName)) this
     else RuleParam(dslType, dqType, newName, outDfName, rule, details, cache, outputs)
@@ -186,6 +190,7 @@
       "unknown dq type for griffin dsl")
 
     getOutputs.foreach(_.validate)
+    getErrorConfs.foreach(_.validate)
   }
 }
 
@@ -206,3 +211,24 @@
 
   def validate(): Unit = {}
 }
+
+/**
+  * error configuration parameter
+  * @param columnName the name of the column
+  * @param errorType  the way to match error, regex or enumeration
+  * @param values     error value list
+  */
+@JsonInclude(Include.NON_NULL)
+case class RuleErrorConfParam( @JsonProperty("column.name") private val columnName: String,
+                               @JsonProperty("type") private val errorType: String,
+                               @JsonProperty("values") private val values: List[String]
+                             ) extends Param {
+  def getColumnName: Option[String] = if (StringUtils.isNotBlank(columnName)) Some(columnName) else None
+  def getErrorType: Option[String] = if (StringUtils.isNotBlank(errorType)) Some(errorType) else None
+  def getValues: Seq[String] = if (values != null) values else Nil
+
+  def validate(): Unit = {
+    assert("regex".equalsIgnoreCase(getErrorType.get) ||
+      "enumeration".equalsIgnoreCase(getErrorType.get), "error error.conf type")
+  }
+}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 7312f29..4d3344e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -18,7 +18,7 @@
 */
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.dqdefinition.{RuleErrorConfParam, RuleParam}
 import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
@@ -85,8 +85,16 @@
 
       // 2. incomplete record
       val incompleteRecordsTableName = "__incompleteRecords"
-      val completeWhereClause = aliases.map(a => s"`${a}` IS NOT NULL").mkString(" AND ")
-      val incompleteWhereClause = s"NOT (${completeWhereClause})"
+      val errorConfs: Seq[RuleErrorConfParam] = ruleParam.getErrorConfs
+      var incompleteWhereClause: String = ""
+      if (errorConfs.size == 0) {
+        // without errorConfs
+        val completeWhereClause = aliases.map(a => s"`${a}` IS NOT NULL").mkString(" AND ")
+        incompleteWhereClause = s"NOT (${completeWhereClause})"
+      } else {
+        // with errorConfs
+        incompleteWhereClause = this.getErrorConfCompleteWhereClause(errorConfs)
+      }
 
       val incompleteRecordsSql =
         s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}"
@@ -167,4 +175,46 @@
     }
   }
 
+  /**
+    * get 'error' where clause
+    * @param errorConfs error configuraion list
+    * @return 'error' where clause
+    */
+  def getErrorConfCompleteWhereClause(errorConfs: Seq[RuleErrorConfParam]): String = {
+    errorConfs.map(errorConf => this.getEachErrorWhereClause(errorConf)).mkString(" OR ")
+  }
+
+  /**
+    * get error sql for each column
+    * @param errorConf  error configuration
+    * @return 'error' sql for each column
+    */
+  def getEachErrorWhereClause(errorConf: RuleErrorConfParam): String = {
+    val errorType: Option[String] = errorConf.getErrorType
+    val columnName: String = errorConf.getColumnName.get
+    if ("regex".equalsIgnoreCase(errorType.get)) {
+      // only have one regular expression
+      val regexValue: String = errorConf.getValues.apply(0)
+      val afterReplace: String = regexValue.replaceAll("""\\""", """\\\\""")
+      val result: String = s"`${columnName}` REGEXP '${afterReplace}'"
+      return result
+    } else if ("enumeration".equalsIgnoreCase(errorType.get)) {
+      val values: Seq[String] = errorConf.getValues
+      // hive_none means None
+      var hasNone: Boolean = false
+      if (values.contains("hive_none")) {
+        hasNone = true
+      }
+
+      val valueWithQuote: String = values.filter(value => !"hive_none".equals(value))
+        .map(value => s"'${value}'").mkString(", ")
+
+      var result = s"(`${columnName}` IN (${valueWithQuote}))"
+      if (hasNone) {
+        result = s"((${result}) OR (`${columnName}` IS NULL))"
+      }
+      return result
+    }
+    throw new IllegalArgumentException("type in error.confs only supports regex and enumeration way")
+  }
 }
diff --git a/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json b/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
new file mode 100644
index 0000000..e3b1f1c
--- /dev/null
+++ b/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
@@ -0,0 +1,51 @@
+{
+  "data.sources": [
+    {
+      "connectors": [
+        {
+          "dataframe.name": "prof_table",
+          "config": {
+            "table.name": "efg",
+            "database": "abc",
+            "where": "`date`=\"20190825\""
+          },
+          "type": "hive"
+        }
+      ],
+      "name": "source"
+    }
+  ],
+  "sinks": [
+    "CONSOLE"
+  ],
+  "name": "test_griffin_complete_lizhao.bd",
+  "evaluate.rule": {
+    "rules": [
+      {
+        "rule": "user",
+        "out.dataframe.name": "prof",
+        "dsl.type": "griffin-dsl",
+        "dq.type": "completeness",
+        "error.confs":[
+          {
+            "column.name": "user",
+            "type": "enumeration",
+            "values":["1", "2", "hive_none", ""]
+          },
+          {
+            "column.name": "name",
+            "type": "regex",
+            "values":["^zhanglei.natur\\w{1}$"]
+          }
+        ],
+        "out": [
+          {
+            "type": "metric",
+            "flatten": "map"
+          }
+        ]
+      }
+    ]
+  },
+  "process.type": "batch"
+}
diff --git a/measure/src/test/resources/invalidconfigs/invalidtype_completeness_batch_griffindal.json b/measure/src/test/resources/invalidconfigs/invalidtype_completeness_batch_griffindal.json
new file mode 100644
index 0000000..be6435d
--- /dev/null
+++ b/measure/src/test/resources/invalidconfigs/invalidtype_completeness_batch_griffindal.json
@@ -0,0 +1,46 @@
+{
+  "data.sources": [
+    {
+      "connectors": [
+        {
+          "dataframe.name": "prof_table",
+          "config": {
+            "table.name": "efg",
+            "database": "abc",
+            "where": "`date`=\"20190825\""
+          },
+          "type": "hive"
+        }
+      ],
+      "name": "source"
+    }
+  ],
+  "sinks": [
+    "CONSOLE"
+  ],
+  "name": "test_griffin_complete",
+  "evaluate.rule": {
+    "rules": [
+      {
+        "rule": "user",
+        "out.dataframe.name": "prof",
+        "dsl.type": "griffin-dsl",
+        "dq.type": "completeness",
+        "error.confs":[
+          {
+            "column.name": "user",
+            "type": "abc",
+            "values":["1", "2", "hive_none", ""]
+          }
+        ],
+        "out": [
+          {
+            "type": "metric",
+            "flatten": "map"
+          }
+        ]
+      }
+    ]
+  },
+  "process.type": "batch"
+}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
index dfa2598..e2135ea 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
@@ -52,4 +52,27 @@
 
   }
 
+  it should "fail for an invalid completeness json file" in {
+    val reader: ParamFileReader = ParamFileReader(getClass.getResource("/invalidconfigs/invalidtype_completeness_batch_griffindal.json").getFile)
+    val params = reader.readConfig[DQConfig]
+    params match {
+      case Success(_) =>
+        fail("it is an invalid config file")
+      case Failure(e) =>
+        e.getMessage contains ("error error.conf type")
+    }
+  }
+
+  it should "be parsed from a valid errorconf completeness json file" in {
+    val reader :ParamReader = ParamFileReader(getClass.getResource("/_completeness_errorconf-batch-griffindsl.json").getFile)
+    val params = reader.readConfig[DQConfig]
+    params match {
+      case Success(v) =>
+        v.getEvaluateRule.getRules(0).getErrorConfs.length should === (2)
+        v.getEvaluateRule.getRules(0).getErrorConfs(0).getColumnName.get should === ("user")
+        v.getEvaluateRule.getRules(0).getErrorConfs(1).getColumnName.get should === ("name")
+      case Failure(_) =>
+        fail("it should not happen")
+    }
+  }
 }