| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.griffin.measure.sink |
| |
| import scala.collection.mutable |
| |
| import org.apache.spark.sql.functions._ |
| |
| import org.apache.griffin.measure.configuration.dqdefinition.{RuleOutputParam, SinkParam} |
| import org.apache.griffin.measure.configuration.enums.FlattenType.MapFlattenType |
| import org.apache.griffin.measure.step.write.{MetricFlushStep, MetricWriteStep, RecordWriteStep} |
| |
| class CustomSinkTest extends SinkTestBase { |
| |
| val sinkParam: SinkParam = |
| SinkParam( |
| "customSink", |
| "custom", |
| Map("class" -> "org.apache.griffin.measure.sink.CustomSink")) |
| override var sinkParams = Seq(sinkParam) |
| |
| def withCustomSink[A](func: Iterable[Sink] => A): A = { |
| val sinkFactory = SinkFactory(sinkParams, "Test Sink Factory") |
| val timestamp = System.currentTimeMillis |
| val sinks = sinkFactory.getSinks(timestamp, block = true) |
| func(sinks) |
| } |
| |
| "custom sink" can "sink metrics" in { |
| val measureName = "test_measure" |
| withCustomSink(sinks => { |
| sinks.foreach { sink => |
| try { |
| sink.sinkMetrics(Map("value" -> Map(measureName -> Map("sum" -> 10)))) |
| } catch { |
| case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e) |
| } |
| } |
| |
| sinks.foreach { sink => |
| try { |
| sink.sinkMetrics(Map("value" -> Map(measureName -> Map("count" -> 5)))) |
| } catch { |
| case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e) |
| } |
| } |
| }) |
| |
| val actualMetricsOpt = CustomSinkResultRegister.getMetrics(measureName) |
| assert(actualMetricsOpt.isDefined) |
| |
| val expected = Map("sum" -> 10, "count" -> 5) |
| actualMetricsOpt.get should contain theSameElementsAs expected |
| } |
| |
| "custom sink" can "sink records" in { |
| val actualRecords = withCustomSink(sinks => { |
| val rdd1 = createDataFrame(1 to 2) |
| sinks.foreach { sink => |
| try { |
| sink.sinkRecords(rdd1.toJSON.rdd, "test records") |
| } catch { |
| case e: Throwable => error(s"sink records error: ${e.getMessage}", e) |
| } |
| } |
| val rdd2 = createDataFrame(2 to 4) |
| sinks.foreach { sink => |
| try { |
| sink.sinkRecords(rdd2.toJSON.rdd, "test records") |
| } catch { |
| case e: Throwable => error(s"sink records error: ${e.getMessage}", e) |
| } |
| } |
| sinks.headOption match { |
| case Some(sink: CustomSink) => sink.allRecords |
| case _ => |
| } |
| }) |
| |
| val expected = List( |
| "{\"id\":1,\"name\":\"name_1\",\"sex\":\"women\",\"age\":16}", |
| "{\"id\":2,\"name\":\"name_2\",\"sex\":\"man\",\"age\":17}", |
| "{\"id\":2,\"name\":\"name_2\",\"sex\":\"man\",\"age\":17}", |
| "{\"id\":3,\"name\":\"name_3\",\"sex\":\"women\",\"age\":18}", |
| "{\"id\":4,\"name\":\"name_4\",\"sex\":\"man\",\"age\":19}") |
| |
| actualRecords should be(expected) |
| } |
| |
| val metricsDefaultOutput: RuleOutputParam = |
| RuleOutputParam("metrics", "default_output", "default") |
| |
| "RecordWriteStep" should "work with custom sink" in { |
| val resultTable = "result_table" |
| val df = createDataFrame(1 to 5) |
| df.createOrReplaceTempView(resultTable) |
| |
| val rwName = Some(metricsDefaultOutput).flatMap(_.getNameOpt).getOrElse(resultTable) |
| val dQContext = getDqContext() |
| RecordWriteStep(rwName, resultTable).execute(dQContext) |
| |
| val actualRecords = dQContext.getSinks.headOption match { |
| case Some(sink: CustomSink) => sink.allRecords |
| case _ => mutable.ListBuffer[String]() |
| } |
| |
| val expected = List( |
| "{\"id\":1,\"name\":\"name_1\",\"sex\":\"women\",\"age\":16}", |
| "{\"id\":2,\"name\":\"name_2\",\"sex\":\"man\",\"age\":17}", |
| "{\"id\":3,\"name\":\"name_3\",\"sex\":\"women\",\"age\":18}", |
| "{\"id\":4,\"name\":\"name_4\",\"sex\":\"man\",\"age\":19}", |
| "{\"id\":5,\"name\":\"name_5\",\"sex\":\"women\",\"age\":20}") |
| |
| actualRecords should be(expected) |
| } |
| |
| val metricsEntriesOutput: RuleOutputParam = |
| RuleOutputParam("metrics", "entries_output", "entries") |
| val metricsArrayOutput: RuleOutputParam = RuleOutputParam("metrics", "array_output", "array") |
| val metricsMapOutput: RuleOutputParam = RuleOutputParam("metrics", "map_output", "map") |
| |
| "MetricWriteStep" should "output default metrics with custom sink" in { |
| val resultTable = "result_table" |
| val df = createDataFrame(1 to 5) |
| val metricCols = Seq("sex", "max_age", "avg_age").flatMap(c => Seq(lit(c), col(c))) |
| |
| val metricDf = df |
| .groupBy("sex") |
| .agg(max("age").as("max_age"), avg("age").as("avg_age")) |
| .select(map(metricCols: _*).as("metrics")) |
| .withColumn("mark", lit(1)) |
| .groupBy("mark") |
| .agg(collect_list("metrics") as "metrics") |
| .select("metrics") |
| |
| metricDf.createOrReplaceTempView(resultTable) |
| |
| val dQContext = getDqContext() |
| |
| val metricWriteStep = { |
| val metricOpt = Some(metricsMapOutput) |
| val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse("default_metrics_name") |
| val flattenType = metricOpt.map(_.getFlatten).getOrElse(MapFlattenType) |
| |
| MetricWriteStep(mwName, resultTable, flattenType) |
| } |
| |
| metricWriteStep.execute(dQContext) |
| MetricFlushStep().execute(dQContext) |
| |
| val expectedMetrics = Array( |
| Map("sex" -> "women", "max_age" -> "20", "avg_age" -> "18.0"), |
| Map("sex" -> "man", "max_age" -> "19", "avg_age" -> "18.0")) |
| |
| val actualMetricsOpt = CustomSinkResultRegister.getMetrics(metricWriteStep.name) |
| assert(actualMetricsOpt.isDefined) |
| |
| val actualMetricsMap: Map[String, Any] = actualMetricsOpt.get |
| assert(actualMetricsMap.contains("metrics")) |
| |
| val actualMetrics = actualMetricsMap("metrics").asInstanceOf[Seq[Map[String, String]]] |
| actualMetrics should contain theSameElementsAs expectedMetrics |
| } |
| |
| } |