[GRIFFIN-281] Add custom sink testcase

Add custom sink test.

I also have two question:
1. In `ConsoleSink`,`ElasticSearchSink`,`MongoSink` , the `sinkRecords` method doesn't really sink records ?
2.  Why should we have four metrics output types ?

```
* flatten: Aggregation method used before sending data frame result into the sink:
      - default: use "array" if data frame returned multiple records, otherwise use "entries"
      - entries: sends first row of data frame as metric results, like like `{"agg_col": "value"}`
      - array: wraps all metrics into a map, like `{"my_out_name": [{"agg_col": "value"}]}`
      - map: wraps first row of data frame into a map, like `{"my_out_name": {"agg_col": "value"}}`

```

Author: wankunde <wankunde@163.com>

Closes #524 from wankunde/sink.
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
new file mode 100644
index 0000000..01ccaba
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
@@ -0,0 +1,59 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.sink
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+
+/**
+  * sink records and metrics in memory for test.
+  *
+  * @param sinkContext
+  */
+case class CustomSink(sinkContext: SinkContext) extends Sink {
+  val config: Map[String, Any] = sinkContext.config
+  val metricName: String = sinkContext.metricName
+  val timeStamp: Long = sinkContext.timeStamp
+  val block: Boolean = sinkContext.block
+
+  def available(): Boolean = true
+
+  def start(msg: String): Unit = {}
+
+  def finish(): Unit = {}
+
+  def log(rt: Long, msg: String): Unit = {}
+
+  val allRecords = mutable.ListBuffer[String]()
+
+  def sinkRecords(records: RDD[String], name: String): Unit = {
+    allRecords ++= records.collect()
+  }
+
+  def sinkRecords(records: Iterable[String], name: String): Unit = {
+    allRecords ++= records
+  }
+
+  val allMetrics = mutable.Map[String, Any]()
+
+  def sinkMetrics(metrics: Map[String, Any]): Unit = {
+    allMetrics ++= metrics
+  }
+}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
new file mode 100644
index 0000000..3eeb430
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
@@ -0,0 +1,136 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.sink
+
+import scala.collection.mutable
+
+import org.apache.griffin.measure.configuration.dqdefinition.{RuleOutputParam, SinkParam}
+import org.apache.griffin.measure.configuration.enums.FlattenType
+import org.apache.griffin.measure.step.write.{MetricFlushStep, MetricWriteStep, RecordWriteStep}
+
+class CustomSinkTest extends SinkTestBase {
+
+  var sinkParam = SinkParam("custom",
+    Map("class" -> "org.apache.griffin.measure.sink.CustomSink"))
+  var sinkParams = Seq(sinkParam)
+
+  def withCustomSink[A](func: (MultiSinks) => A): A = {
+    val sinkFactory = SinkFactory(sinkParams, "Test Sink Factory")
+    val timestamp = System.currentTimeMillis
+    val sinks = sinkFactory.getSinks(timestamp, true)
+    func(sinks)
+  }
+
+  "custom sink" can "sink metrics" in {
+    val actualMetrics = withCustomSink((sinks) => {
+      sinks.sinkMetrics(Map("sum" -> 10))
+      sinks.sinkMetrics(Map("count" -> 5))
+      sinks.headSinkOpt match {
+        case Some(sink: CustomSink) => sink.allMetrics
+        case _ => mutable.ListBuffer[String]()
+      }
+    })
+
+    val expected = Map("sum" -> 10, "count" -> 5)
+    actualMetrics should be(expected)
+  }
+
+  "custom sink" can "sink records" in {
+    val actualRecords = withCustomSink((sinks) => {
+      val rdd1 = createDataFrame(1 to 2)
+      sinks.sinkRecords(rdd1.toJSON.rdd, "test records")
+      val rdd2 = createDataFrame(2 to 4)
+      sinks.sinkRecords(rdd2.toJSON.rdd, "test records")
+      sinks.headSinkOpt match {
+        case Some(sink: CustomSink) => sink.allRecords
+        case _ =>
+      }
+    })
+
+    val expected = List(
+      "{\"id\":1,\"name\":\"name_1\",\"sex\":\"women\",\"age\":16}",
+      "{\"id\":2,\"name\":\"name_2\",\"sex\":\"man\",\"age\":17}",
+      "{\"id\":2,\"name\":\"name_2\",\"sex\":\"man\",\"age\":17}",
+      "{\"id\":3,\"name\":\"name_3\",\"sex\":\"women\",\"age\":18}",
+      "{\"id\":4,\"name\":\"name_4\",\"sex\":\"man\",\"age\":19}")
+
+    actualRecords should be(expected)
+  }
+
+  "RecordWriteStep" should "work with custom sink" in {
+    val resultTable = "result_table"
+    val df = createDataFrame(1 to 5)
+    df.createOrReplaceTempView(resultTable)
+
+    val rwName = Some(metricsDefaultOutput).flatMap(_.getNameOpt).getOrElse(resultTable)
+    val dQContext = getDqContext()
+    RecordWriteStep(rwName, resultTable).execute(dQContext)
+
+    val actualRecords = dQContext.getSink().asInstanceOf[MultiSinks].headSinkOpt match {
+      case Some(sink: CustomSink) => sink.allRecords
+      case _ => mutable.ListBuffer[String]()
+    }
+
+    val expected = List(
+      "{\"id\":1,\"name\":\"name_1\",\"sex\":\"women\",\"age\":16}",
+      "{\"id\":2,\"name\":\"name_2\",\"sex\":\"man\",\"age\":17}",
+      "{\"id\":3,\"name\":\"name_3\",\"sex\":\"women\",\"age\":18}",
+      "{\"id\":4,\"name\":\"name_4\",\"sex\":\"man\",\"age\":19}",
+      "{\"id\":5,\"name\":\"name_5\",\"sex\":\"women\",\"age\":20}")
+
+    actualRecords should be(expected)
+  }
+
+  val metricsDefaultOutput = RuleOutputParam("metrics", "default_output", "default")
+  val metricsEntriesOutput = RuleOutputParam("metrics", "entries_output", "entries")
+  val metricsArrayOutput = RuleOutputParam("metrics", "array_output", "array")
+  val metricsMapOutput = RuleOutputParam("metrics", "map_output", "map")
+
+  "MetricWriteStep" should "output default metrics with custom sink" in {
+    val resultTable = "result_table"
+    val df = createDataFrame(1 to 5)
+    df.groupBy("sex")
+      .agg("age" -> "max", "age" -> "avg")
+      .createOrReplaceTempView(resultTable)
+
+    val dQContext = getDqContext()
+
+    val metricWriteStep = {
+      val metricOpt = Some(metricsDefaultOutput)
+      val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse("default_metrics_name")
+      val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+      MetricWriteStep(mwName, resultTable, flattenType)
+    }
+
+    metricWriteStep.execute(dQContext)
+    MetricFlushStep().execute(dQContext)
+    val actualMetrics = dQContext.getSink().asInstanceOf[MultiSinks].headSinkOpt match {
+      case Some(sink: CustomSink) => sink.allMetrics
+      case _ => mutable.Map[String, Any]()
+    }
+
+    val metricsValue = Seq(Map("sex" -> "man", "max(age)" -> 19, "avg(age)" -> 18.0),
+      Map("sex" -> "women", "max(age)" -> 20, "avg(age)" -> 18.0))
+
+    val expected = Map("default_output" -> metricsValue)
+
+    actualMetrics.get("value").get should be(expected)
+  }
+
+}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
new file mode 100644
index 0000000..a88f1ee
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.sink
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import org.apache.griffin.measure.context.{ContextId, DQContext}
+
+trait SinkTestBase extends FlatSpec with Matchers with DataFrameSuiteBase with Loggable {
+
+  var sinkParams: Seq[SinkParam]
+
+  def getDqContext(name: String = "test-context"): DQContext = {
+    DQContext(
+      ContextId(System.currentTimeMillis),
+      name,
+      Nil,
+      sinkParams,
+      BatchProcessType
+    )(spark)
+  }
+
+
+  def createDataFrame(arr: Seq[Int]): DataFrame = {
+    val schema = StructType(Array(
+      StructField("id", LongType),
+      StructField("name", StringType),
+      StructField("sex", StringType),
+      StructField("age", IntegerType)
+    ))
+    val rows = arr.map { i =>
+      Row(i.toLong, s"name_$i", if (i % 2 == 0) "man" else "women", i + 15)
+    }
+    val rowRdd = sqlContext.sparkContext.parallelize(rows)
+    sqlContext.createDataFrame(rowRdd, schema)
+  }
+}