blob: 60fa5d5f8b0aea87057a23c13a5debef31ade130 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
import org.apache.griffin.measure.configuration.enums.FlattenType.{
ArrayFlattenType,
DefaultFlattenType
}
import org.apache.griffin.measure.configuration.enums.OutputType._
import org.apache.griffin.measure.configuration.enums.ProcessType._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.step.builder.dsl.expr._
import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.TimelinessAnalyzer
import org.apache.griffin.measure.step.transform.SparkSqlTransformStep
import org.apache.griffin.measure.step.write.{MetricWriteStep, RecordWriteStep}
import org.apache.griffin.measure.utils.ParamUtil._
import org.apache.griffin.measure.utils.TimeUtil
/**
* generate timeliness dq steps
*/
case class TimelinessExpr2DQSteps(context: DQContext, expr: Expr, ruleParam: RuleParam)
extends Expr2DQSteps {
private object TimelinessKeys {
val _source = "source"
val _latency = "latency"
val _total = "total"
val _avg = "avg"
val _threshold = "threshold"
val _step = "step"
val _count = "count"
val _stepSize = "step.size"
val _percentileColPrefix = "percentile"
val _percentileValues = "percentile.values"
}
import TimelinessKeys._
def getDQSteps: Seq[DQStep] = {
val details = ruleParam.getDetails
val timelinessExpr = expr.asInstanceOf[TimelinessClause]
val sourceName = details.getString(_source, context.getDataSourceName(0))
val procType = context.procType
val timestamp = context.contextId.timestamp
if (!context.runTimeTableRegister.existsTable(sourceName)) {
warn(s"[$timestamp] data source $sourceName not exists")
Nil
} else {
val analyzer = TimelinessAnalyzer(timelinessExpr, sourceName)
val btsSel = analyzer.btsExpr
val etsSelOpt = analyzer.etsExprOpt
// 1. in time
val inTimeTableName = "__inTime"
val inTimeSql = etsSelOpt match {
case Some(etsSel) =>
s"""
|SELECT *, ($btsSel) AS `${ConstantColumns.beginTs}`,
|($etsSel) AS `${ConstantColumns.endTs}`
|FROM $sourceName WHERE ($btsSel) IS NOT NULL AND ($etsSel) IS NOT NULL
""".stripMargin
case _ =>
s"""
|SELECT *, ($btsSel) AS `${ConstantColumns.beginTs}`
|FROM $sourceName WHERE ($btsSel) IS NOT NULL
""".stripMargin
}
val inTimeTransStep = SparkSqlTransformStep(inTimeTableName, inTimeSql, emptyMap)
// 2. latency
val latencyTableName = "__lat"
val latencyColName = details.getStringOrKey(_latency)
val etsColName = etsSelOpt match {
case Some(_) => ConstantColumns.endTs
case _ => ConstantColumns.tmst
}
val latencySql = {
s"SELECT *, (`$etsColName` - `${ConstantColumns.beginTs}`) AS `$latencyColName` " +
s"FROM `$inTimeTableName`"
}
val latencyTransStep =
SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, None, cache = true)
latencyTransStep.parentSteps += inTimeTransStep
// 3. timeliness metric
val metricTableName = ruleParam.getOutDfName()
val totalColName = details.getStringOrKey(_total)
val avgColName = details.getStringOrKey(_avg)
val metricSql = procType match {
case BatchProcessType =>
s"""
|SELECT COUNT(*) AS `$totalColName`,
|CAST(AVG(`$latencyColName`) AS BIGINT) AS `$avgColName`
|FROM `$latencyTableName`
""".stripMargin
case StreamingProcessType =>
s"""
|SELECT `${ConstantColumns.tmst}`,
|COUNT(*) AS `$totalColName`,
|CAST(AVG(`$latencyColName`) AS BIGINT) AS `$avgColName`
|FROM `$latencyTableName`
|GROUP BY `${ConstantColumns.tmst}`
""".stripMargin
}
val metricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt.map(_.getFlatten).getOrElse(DefaultFlattenType)
MetricWriteStep(mwName, metricTableName, flattenType)
}
val metricTransStep =
SparkSqlTransformStep(metricTableName, metricSql, emptyMap, Some(metricWriteStep))
metricTransStep.parentSteps += latencyTransStep
// current steps
val transSteps1 = metricTransStep :: Nil
// 4. timeliness record
val transSteps2 = TimeUtil.milliseconds(details.getString(_threshold, "")) match {
case Some(tsh) =>
val recordTableName = "__lateRecords"
val recordSql = {
s"SELECT * FROM `$latencyTableName` WHERE `$latencyColName` > $tsh"
}
val recordWriteStep = {
val rwName =
ruleParam
.getOutputOpt(RecordOutputType)
.flatMap(_.getNameOpt)
.getOrElse(recordTableName)
RecordWriteStep(rwName, recordTableName, None)
}
val recordTransStep =
SparkSqlTransformStep(recordTableName, recordSql, emptyMap, Some(recordWriteStep))
recordTransStep.parentSteps += latencyTransStep
recordTransStep :: Nil
case _ => Nil
}
// 5. ranges
val transSteps3 = TimeUtil.milliseconds(details.getString(_stepSize, "")) match {
case Some(stepSize) =>
// 5.1 range
val rangeTableName = "__range"
val stepColName = details.getStringOrKey(_step)
val rangeSql = {
s"""
|SELECT *, CAST((`$latencyColName` / $stepSize) AS BIGINT) AS `$stepColName`
|FROM `$latencyTableName`
""".stripMargin
}
val rangeTransStep = SparkSqlTransformStep(rangeTableName, rangeSql, emptyMap)
rangeTransStep.parentSteps += latencyTransStep
// 5.2 range metric
val rangeMetricTableName = "__rangeMetric"
val countColName = details.getStringOrKey(_count)
val rangeMetricSql = procType match {
case BatchProcessType =>
s"""
|SELECT `$stepColName`, COUNT(*) AS `$countColName`
|FROM `$rangeTableName` GROUP BY `$stepColName`
""".stripMargin
case StreamingProcessType =>
s"""
|SELECT `${ConstantColumns.tmst}`, `$stepColName`, COUNT(*) AS `$countColName`
|FROM `$rangeTableName` GROUP BY `${ConstantColumns.tmst}`, `$stepColName`
""".stripMargin
}
val rangeMetricWriteStep = {
MetricWriteStep(stepColName, rangeMetricTableName, ArrayFlattenType)
}
val rangeMetricTransStep =
SparkSqlTransformStep(
rangeMetricTableName,
rangeMetricSql,
emptyMap,
Some(rangeMetricWriteStep))
rangeMetricTransStep.parentSteps += rangeTransStep
rangeMetricTransStep :: Nil
case _ => Nil
}
// 6. percentiles
val percentiles = getPercentiles(details)
val transSteps4 = if (percentiles.nonEmpty) {
val percentileTableName = "__percentile"
val percentileColName = details.getStringOrKey(_percentileColPrefix)
val percentileCols = percentiles
.map { pct =>
val pctName = (pct * 100).toInt.toString
s"floor(percentile_approx($latencyColName, $pct)) " +
s"AS `${percentileColName}_$pctName`"
}
.mkString(", ")
val percentileSql = {
s"""
|SELECT $percentileCols
|FROM `$latencyTableName`
""".stripMargin
}
val percentileWriteStep = {
MetricWriteStep(percentileTableName, percentileTableName, DefaultFlattenType)
}
val percentileTransStep =
SparkSqlTransformStep(
percentileTableName,
percentileSql,
emptyMap,
Some(percentileWriteStep))
percentileTransStep.parentSteps += latencyTransStep
percentileTransStep :: Nil
} else Nil
// full steps
transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4
}
}
private def getPercentiles(details: Map[String, Any]): Seq[Double] = {
details.getDoubleArr(_percentileValues).filter(d => d >= 0 && d <= 1)
}
}