/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.griffin.measure.execution.impl

import java.util.Locale

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
import org.apache.griffin.measure.execution.Measure._
import org.apache.griffin.measure.step.builder.ConstantColumns

/**
 * Profiling measure.
 *
 * Data processing and its analysis can't truly be complete without data profiling -
 * reviewing source data for content and quality. Data profiling helps to find data quality rules and
 * requirements that will support a more thorough data quality assessment in a later step.
 *
 * The process of Data profiling involves:
 *   - Collecting descriptive statistics like min, max, count and sum
 *   - Collecting data types, length and recurring patterns
 *   - Discovering metadata and assessing its accuracy, etc.
 *
 * A common problem in data management circles is the confusion around what is meant by
 * Data profiling as opposed to Data Quality Assessment due to the interchangeable use of these 2 terms.
 *
 * Data Profiling helps us create a huge amount of insight into the quality levels of our
 * data and helps to find data quality rules and requirements that will support a more thorough
 * data quality assessment in a later step. For example, data profiling can help us to discover value
 * frequencies, formats and patterns for each attribute in the data asset. Using data profiling alone
 * we can find some perceived defects and outliers in the data asset, and we end up with a whole
 * range of clues based on which correct Quality assessment measures can be defined like
 * completeness/ distinctness etc.
 *
 * @param sparkSession SparkSession for this Griffin Application.
 * @param measureParam Object representation of this measure and its configuration.
 */
case class ProfilingMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
    extends Measure {

  import ProfilingMeasure._

  /**
   * Profiling measure supports only metric write
   */
  override val supportsRecordWrite: Boolean = false

  override val supportsMetricWrite: Boolean = true

  /**
   * The value for `expr` is a comma separated string of columns in the data asset on which the
   * profiling measure is to be executed. `expr` is an optional key for Profiling measure,
   * i.e., if it is not defined, all columns in the data set will be profiled.
   */
  val exprOpt: Option[String] = Option(getFromConfig[String](Expression, null))

  /**
   * The value for this key is boolean. If this is `true`, the distinct counts will be approximated
   * to allow up to 5% error. Approximate counts are usually faster by are less accurate. If this is set
   * to `false`, then the counts will be 100% accurate.
   */
  val roundScale: Int = getFromConfig[java.lang.Integer](RoundScaleStr, 3)

  /**
   * Several resultant metrics of profiling measure are floating-point numbers. This key controls to extent
   * to which these floating-point numbers are rounded. For example, if `round.scale = 2` then all
   * floating-point metric values will be rounded to 2 decimal places.
   */
  val approxDistinctCount: Boolean =
    getFromConfig[java.lang.Boolean](ApproxDistinctCountStr, true)

  /**
   * Various metrics are calculated for columns of the data set. If expr is correctly defined,
   * then metrics are generated for only the given subset of columns else, its generated for all columns.
   *
   * List of profiling metrics that are generated,
   *  - avg_col_len
   *  - max_col_len
   *  - min_col_len
   *  - avg
   *  - max
   *  - min
   *  - approx_distinct_count OR distinct_count
   *  - variance
   *  - kurtosis
   *  - std_dev
   *  - total
   *  - data_type
   *
   *  @return tuple of records dataframe and metric dataframe
   */
  override def impl(): (DataFrame, DataFrame) = {
    val input = sparkSession.read.table(measureParam.getDataSource)
    val profilingColNames = exprOpt
      .getOrElse(input.columns.mkString(","))
      .split(",")
      .map(_.trim.toLowerCase(Locale.ROOT))
      .toSet

    val profilingCols =
      input.schema.fields.filter(f =>
        profilingColNames.contains(f.name) && !f.name.equalsIgnoreCase(ConstantColumns.tmst))

    assert(
      profilingCols.nonEmpty,
      s"Invalid columns [${profilingCols.map(_.name).mkString(", ")}] were provided for profiling.")

    val profilingExprs = profilingCols.foldLeft(Array.empty[Column])((exprList, field) => {
      val colName = field.name
      val profilingExprs = getProfilingExprs(field, roundScale, approxDistinctCount)

      exprList.:+(map(profilingExprs: _*).as(s"$DetailsPrefix$colName"))
    })

    val aggregateDf = profilingCols
      .foldLeft(input)((df, field) => {
        val colName = field.name
        val column = col(colName)

        val lengthColName = lengthColFn(colName)
        val nullColName = nullsInColFn(colName)

        df.withColumn(lengthColName, length(column))
          .withColumn(nullColName, when(isnull(column), 1L).otherwise(0L))
      })
      .agg(count(lit(1L)).as(Total), profilingExprs: _*)

    val detailCols =
      aggregateDf.columns
        .filter(_.startsWith(DetailsPrefix))
        .flatMap(c => Seq(lit(c.stripPrefix(DetailsPrefix)), col(c)))

    val metricDf = aggregateDf
      .withColumn(ColumnDetails, map(detailCols: _*))
      .select(Total, ColumnDetails)
      .select(map(lit(ColumnDetails), col(ColumnDetails)).as(valueColumn))

    (sparkSession.emptyDataFrame, metricDf)
  }

}

/**
 * Profiling measure constants
 */
object ProfilingMeasure {

  /**
   * Options Keys
   */
  final val RoundScaleStr: String = "round.scale"
  final val ApproxDistinctCountStr: String = "approx.distinct.count"

  /**
   * Structure Keys
   */
  final val ColumnDetails: String = "column_details"
  private final val DataTypeStr: String = "data_type"

  /**
   * Prefix Keys
   */
  private final val ApproxPrefix: String = "approx_"
  private final val DetailsPrefix: String = "details_"
  private final val ColumnLengthPrefix: String = "col_len"
  private final val IsNullPrefix: String = "is_null"

  /**
   * Column Detail Keys
   */
  private final val NullCount: String = "null_count"
  private final val DistinctCount: String = "distinct_count"
  private final val Min: String = "min"
  private final val Max: String = "max"
  private final val Avg: String = "avg"
  private final val StdDeviation: String = "std_dev"
  private final val Variance: String = "variance"
  private final val Kurtosis: String = "kurtosis"
  private final val MinColLength: String = s"${Min}_$ColumnLengthPrefix"
  private final val MaxColLength: String = s"${Max}_$ColumnLengthPrefix"
  private final val AvgColLength: String = s"${Avg}_$ColumnLengthPrefix"

  private def lengthColFn(colName: String): String = s"${ColumnLengthPrefix}_$colName"

  private def nullsInColFn(colName: String): String = s"${IsNullPrefix}_$colName"

  private def forNumericFn(t: DataType, value: Column, alias: String): Column = {
    (if (t.isInstanceOf[NumericType]) value else lit(null)).as(alias)
  }

  /**
   * Calculates profiling metrics for a column.
   *
   * @param field column
   * @param roundScale round off places.
   * @param approxDistinctCount to approximate distinct or not.
   * @return
   */
  private def getProfilingExprs(
      field: StructField,
      roundScale: Int,
      approxDistinctCount: Boolean): Seq[Column] = {
    val colName = field.name
    val colType = field.dataType

    val column = col(colName)
    val lengthColExpr = col(lengthColFn(colName))
    val nullColExpr = col(nullsInColFn(colName))
    val (distinctCountName, distinctCountExpr) =
      if (approxDistinctCount) {
        (
          lit(s"$ApproxPrefix$DistinctCount"),
          approx_count_distinct(column).as(s"$ApproxPrefix$DistinctCount"))
      } else {
        (lit(DistinctCount), countDistinct(column).as(DistinctCount))
      }

    Seq(
      Seq(lit(DataTypeStr), lit(colType.catalogString).as(DataTypeStr)),
      Seq(lit(Total), sum(lit(1)).as(Total)),
      Seq(lit(MinColLength), min(lengthColExpr).as(MinColLength)),
      Seq(lit(MaxColLength), max(lengthColExpr).as(MaxColLength)),
      Seq(lit(AvgColLength), forNumericFn(colType, avg(lengthColExpr), AvgColLength)),
      Seq(lit(Min), forNumericFn(colType, min(column), Min)),
      Seq(lit(Max), forNumericFn(colType, max(column), Max)),
      Seq(lit(Avg), forNumericFn(colType, bround(avg(column), roundScale), Avg)),
      Seq(
        lit(StdDeviation),
        forNumericFn(colType, bround(stddev(column), roundScale), StdDeviation)),
      Seq(lit(Variance), forNumericFn(colType, bround(variance(column), roundScale), Variance)),
      Seq(lit(Kurtosis), forNumericFn(colType, bround(kurtosis(column), roundScale), Kurtosis)),
      Seq(lit(distinctCountName), distinctCountExpr),
      Seq(lit(NullCount), sum(nullColExpr).as(NullCount))).flatten
  }
}
