/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.griffin.measure.sink

import scala.collection.mutable

import org.apache.spark.sql.functions._

import org.apache.griffin.measure.configuration.dqdefinition.{RuleOutputParam, SinkParam}
import org.apache.griffin.measure.configuration.enums.FlattenType.MapFlattenType
import org.apache.griffin.measure.step.write.{MetricFlushStep, MetricWriteStep, RecordWriteStep}

class CustomSinkTest extends SinkTestBase {

  val sinkParam: SinkParam =
    SinkParam(
      "customSink",
      "custom",
      Map("class" -> "org.apache.griffin.measure.sink.CustomSink"))
  override var sinkParams = Seq(sinkParam)

  def withCustomSink[A](func: Iterable[Sink] => A): A = {
    val sinkFactory = SinkFactory(sinkParams, "Test Sink Factory")
    val timestamp = System.currentTimeMillis
    val sinks = sinkFactory.getSinks(timestamp, block = true)
    func(sinks)
  }

  "custom sink" can "sink metrics" in {
    val measureName = "test_measure"
    withCustomSink(sinks => {
      sinks.foreach { sink =>
        try {
          sink.sinkMetrics(Map("value" -> Map(measureName -> Map("sum" -> 10))))
        } catch {
          case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
        }
      }

      sinks.foreach { sink =>
        try {
          sink.sinkMetrics(Map("value" -> Map(measureName -> Map("count" -> 5))))
        } catch {
          case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
        }
      }
    })

    val actualMetricsOpt = CustomSinkResultRegister.getMetrics(measureName)
    assert(actualMetricsOpt.isDefined)

    val expected = Map("sum" -> 10, "count" -> 5)
    actualMetricsOpt.get should contain theSameElementsAs expected
  }

  "custom sink" can "sink records" in {
    val actualRecords = withCustomSink(sinks => {
      val rdd1 = createDataFrame(1 to 2)
      sinks.foreach { sink =>
        try {
          sink.sinkRecords(rdd1.toJSON.rdd, "test records")
        } catch {
          case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
        }
      }
      val rdd2 = createDataFrame(2 to 4)
      sinks.foreach { sink =>
        try {
          sink.sinkRecords(rdd2.toJSON.rdd, "test records")
        } catch {
          case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
        }
      }
      sinks.headOption match {
        case Some(sink: CustomSink) => sink.allRecords
        case _ =>
      }
    })

    val expected = List(
      "{\"id\":1,\"name\":\"name_1\",\"sex\":\"women\",\"age\":16}",
      "{\"id\":2,\"name\":\"name_2\",\"sex\":\"man\",\"age\":17}",
      "{\"id\":2,\"name\":\"name_2\",\"sex\":\"man\",\"age\":17}",
      "{\"id\":3,\"name\":\"name_3\",\"sex\":\"women\",\"age\":18}",
      "{\"id\":4,\"name\":\"name_4\",\"sex\":\"man\",\"age\":19}")

    actualRecords should be(expected)
  }

  val metricsDefaultOutput: RuleOutputParam =
    RuleOutputParam("metrics", "default_output", "default")

  "RecordWriteStep" should "work with custom sink" in {
    val resultTable = "result_table"
    val df = createDataFrame(1 to 5)
    df.createOrReplaceTempView(resultTable)

    val rwName = Some(metricsDefaultOutput).flatMap(_.getNameOpt).getOrElse(resultTable)
    val dQContext = getDqContext()
    RecordWriteStep(rwName, resultTable).execute(dQContext)

    val actualRecords = dQContext.getSinks.headOption match {
      case Some(sink: CustomSink) => sink.allRecords
      case _ => mutable.ListBuffer[String]()
    }

    val expected = List(
      "{\"id\":1,\"name\":\"name_1\",\"sex\":\"women\",\"age\":16}",
      "{\"id\":2,\"name\":\"name_2\",\"sex\":\"man\",\"age\":17}",
      "{\"id\":3,\"name\":\"name_3\",\"sex\":\"women\",\"age\":18}",
      "{\"id\":4,\"name\":\"name_4\",\"sex\":\"man\",\"age\":19}",
      "{\"id\":5,\"name\":\"name_5\",\"sex\":\"women\",\"age\":20}")

    actualRecords should be(expected)
  }

  val metricsEntriesOutput: RuleOutputParam =
    RuleOutputParam("metrics", "entries_output", "entries")
  val metricsArrayOutput: RuleOutputParam = RuleOutputParam("metrics", "array_output", "array")
  val metricsMapOutput: RuleOutputParam = RuleOutputParam("metrics", "map_output", "map")

  "MetricWriteStep" should "output default metrics with custom sink" in {
    val resultTable = "result_table"
    val df = createDataFrame(1 to 5)
    val metricCols = Seq("sex", "max_age", "avg_age").flatMap(c => Seq(lit(c), col(c)))

    val metricDf = df
      .groupBy("sex")
      .agg(max("age").as("max_age"), avg("age").as("avg_age"))
      .select(map(metricCols: _*).as("metrics"))
      .withColumn("mark", lit(1))
      .groupBy("mark")
      .agg(collect_list("metrics") as "metrics")
      .select("metrics")

    metricDf.createOrReplaceTempView(resultTable)

    val dQContext = getDqContext()

    val metricWriteStep = {
      val metricOpt = Some(metricsMapOutput)
      val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse("default_metrics_name")
      val flattenType = metricOpt.map(_.getFlatten).getOrElse(MapFlattenType)

      MetricWriteStep(mwName, resultTable, flattenType)
    }

    metricWriteStep.execute(dQContext)
    MetricFlushStep().execute(dQContext)

    val expectedMetrics = Array(
      Map("sex" -> "women", "max_age" -> "20", "avg_age" -> "18.0"),
      Map("sex" -> "man", "max_age" -> "19", "avg_age" -> "18.0"))

    val actualMetricsOpt = CustomSinkResultRegister.getMetrics(metricWriteStep.name)
    assert(actualMetricsOpt.isDefined)

    val actualMetricsMap: Map[String, Any] = actualMetricsOpt.get
    assert(actualMetricsMap.contains("metrics"))

    val actualMetrics = actualMetricsMap("metrics").asInstanceOf[Seq[Map[String, String]]]
    actualMetrics should contain theSameElementsAs expectedMetrics
  }

}
