[GRIFFIN-281] Add custom sink testcase
Add custom sink test.
I also have two question:
1. In `ConsoleSink`,`ElasticSearchSink`,`MongoSink` , the `sinkRecords` method doesn't really sink records ?
2. Why should we have four metrics output types ?
```
* flatten: Aggregation method used before sending data frame result into the sink:
- default: use "array" if data frame returned multiple records, otherwise use "entries"
- entries: sends first row of data frame as metric results, like like `{"agg_col": "value"}`
- array: wraps all metrics into a map, like `{"my_out_name": [{"agg_col": "value"}]}`
- map: wraps first row of data frame into a map, like `{"my_out_name": {"agg_col": "value"}}`
```
Author: wankunde <wankunde@163.com>
Closes #524 from wankunde/sink.
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
new file mode 100644
index 0000000..01ccaba
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
@@ -0,0 +1,59 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.sink
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+
+/**
+ * sink records and metrics in memory for test.
+ *
+ * @param sinkContext
+ */
+case class CustomSink(sinkContext: SinkContext) extends Sink {
+ val config: Map[String, Any] = sinkContext.config
+ val metricName: String = sinkContext.metricName
+ val timeStamp: Long = sinkContext.timeStamp
+ val block: Boolean = sinkContext.block
+
+ def available(): Boolean = true
+
+ def start(msg: String): Unit = {}
+
+ def finish(): Unit = {}
+
+ def log(rt: Long, msg: String): Unit = {}
+
+ val allRecords = mutable.ListBuffer[String]()
+
+ def sinkRecords(records: RDD[String], name: String): Unit = {
+ allRecords ++= records.collect()
+ }
+
+ def sinkRecords(records: Iterable[String], name: String): Unit = {
+ allRecords ++= records
+ }
+
+ val allMetrics = mutable.Map[String, Any]()
+
+ def sinkMetrics(metrics: Map[String, Any]): Unit = {
+ allMetrics ++= metrics
+ }
+}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
new file mode 100644
index 0000000..3eeb430
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
@@ -0,0 +1,136 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.sink
+
+import scala.collection.mutable
+
+import org.apache.griffin.measure.configuration.dqdefinition.{RuleOutputParam, SinkParam}
+import org.apache.griffin.measure.configuration.enums.FlattenType
+import org.apache.griffin.measure.step.write.{MetricFlushStep, MetricWriteStep, RecordWriteStep}
+
+class CustomSinkTest extends SinkTestBase {
+
+ var sinkParam = SinkParam("custom",
+ Map("class" -> "org.apache.griffin.measure.sink.CustomSink"))
+ var sinkParams = Seq(sinkParam)
+
+ def withCustomSink[A](func: (MultiSinks) => A): A = {
+ val sinkFactory = SinkFactory(sinkParams, "Test Sink Factory")
+ val timestamp = System.currentTimeMillis
+ val sinks = sinkFactory.getSinks(timestamp, true)
+ func(sinks)
+ }
+
+ "custom sink" can "sink metrics" in {
+ val actualMetrics = withCustomSink((sinks) => {
+ sinks.sinkMetrics(Map("sum" -> 10))
+ sinks.sinkMetrics(Map("count" -> 5))
+ sinks.headSinkOpt match {
+ case Some(sink: CustomSink) => sink.allMetrics
+ case _ => mutable.ListBuffer[String]()
+ }
+ })
+
+ val expected = Map("sum" -> 10, "count" -> 5)
+ actualMetrics should be(expected)
+ }
+
+ "custom sink" can "sink records" in {
+ val actualRecords = withCustomSink((sinks) => {
+ val rdd1 = createDataFrame(1 to 2)
+ sinks.sinkRecords(rdd1.toJSON.rdd, "test records")
+ val rdd2 = createDataFrame(2 to 4)
+ sinks.sinkRecords(rdd2.toJSON.rdd, "test records")
+ sinks.headSinkOpt match {
+ case Some(sink: CustomSink) => sink.allRecords
+ case _ =>
+ }
+ })
+
+ val expected = List(
+ "{\"id\":1,\"name\":\"name_1\",\"sex\":\"women\",\"age\":16}",
+ "{\"id\":2,\"name\":\"name_2\",\"sex\":\"man\",\"age\":17}",
+ "{\"id\":2,\"name\":\"name_2\",\"sex\":\"man\",\"age\":17}",
+ "{\"id\":3,\"name\":\"name_3\",\"sex\":\"women\",\"age\":18}",
+ "{\"id\":4,\"name\":\"name_4\",\"sex\":\"man\",\"age\":19}")
+
+ actualRecords should be(expected)
+ }
+
+ "RecordWriteStep" should "work with custom sink" in {
+ val resultTable = "result_table"
+ val df = createDataFrame(1 to 5)
+ df.createOrReplaceTempView(resultTable)
+
+ val rwName = Some(metricsDefaultOutput).flatMap(_.getNameOpt).getOrElse(resultTable)
+ val dQContext = getDqContext()
+ RecordWriteStep(rwName, resultTable).execute(dQContext)
+
+ val actualRecords = dQContext.getSink().asInstanceOf[MultiSinks].headSinkOpt match {
+ case Some(sink: CustomSink) => sink.allRecords
+ case _ => mutable.ListBuffer[String]()
+ }
+
+ val expected = List(
+ "{\"id\":1,\"name\":\"name_1\",\"sex\":\"women\",\"age\":16}",
+ "{\"id\":2,\"name\":\"name_2\",\"sex\":\"man\",\"age\":17}",
+ "{\"id\":3,\"name\":\"name_3\",\"sex\":\"women\",\"age\":18}",
+ "{\"id\":4,\"name\":\"name_4\",\"sex\":\"man\",\"age\":19}",
+ "{\"id\":5,\"name\":\"name_5\",\"sex\":\"women\",\"age\":20}")
+
+ actualRecords should be(expected)
+ }
+
+ val metricsDefaultOutput = RuleOutputParam("metrics", "default_output", "default")
+ val metricsEntriesOutput = RuleOutputParam("metrics", "entries_output", "entries")
+ val metricsArrayOutput = RuleOutputParam("metrics", "array_output", "array")
+ val metricsMapOutput = RuleOutputParam("metrics", "map_output", "map")
+
+ "MetricWriteStep" should "output default metrics with custom sink" in {
+ val resultTable = "result_table"
+ val df = createDataFrame(1 to 5)
+ df.groupBy("sex")
+ .agg("age" -> "max", "age" -> "avg")
+ .createOrReplaceTempView(resultTable)
+
+ val dQContext = getDqContext()
+
+ val metricWriteStep = {
+ val metricOpt = Some(metricsDefaultOutput)
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse("default_metrics_name")
+ val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+ MetricWriteStep(mwName, resultTable, flattenType)
+ }
+
+ metricWriteStep.execute(dQContext)
+ MetricFlushStep().execute(dQContext)
+ val actualMetrics = dQContext.getSink().asInstanceOf[MultiSinks].headSinkOpt match {
+ case Some(sink: CustomSink) => sink.allMetrics
+ case _ => mutable.Map[String, Any]()
+ }
+
+ val metricsValue = Seq(Map("sex" -> "man", "max(age)" -> 19, "avg(age)" -> 18.0),
+ Map("sex" -> "women", "max(age)" -> 20, "avg(age)" -> 18.0))
+
+ val expected = Map("default_output" -> metricsValue)
+
+ actualMetrics.get("value").get should be(expected)
+ }
+
+}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
new file mode 100644
index 0000000..a88f1ee
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.sink
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import org.apache.griffin.measure.context.{ContextId, DQContext}
+
+trait SinkTestBase extends FlatSpec with Matchers with DataFrameSuiteBase with Loggable {
+
+ var sinkParams: Seq[SinkParam]
+
+ def getDqContext(name: String = "test-context"): DQContext = {
+ DQContext(
+ ContextId(System.currentTimeMillis),
+ name,
+ Nil,
+ sinkParams,
+ BatchProcessType
+ )(spark)
+ }
+
+
+ def createDataFrame(arr: Seq[Int]): DataFrame = {
+ val schema = StructType(Array(
+ StructField("id", LongType),
+ StructField("name", StringType),
+ StructField("sex", StringType),
+ StructField("age", IntegerType)
+ ))
+ val rows = arr.map { i =>
+ Row(i.toLong, s"name_$i", if (i % 2 == 0) "man" else "women", i + 15)
+ }
+ val rowRdd = sqlContext.sparkContext.parallelize(rows)
+ sqlContext.createDataFrame(rowRdd, schema)
+ }
+}