blob: 21fd24cc472fc53a403d49d7f4a1e103e9b784fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
import org.apache.griffin.measure.configuration.enums.FlattenType.DefaultFlattenType
import org.apache.griffin.measure.configuration.enums.OutputType._
import org.apache.griffin.measure.configuration.enums.ProcessType._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.step.builder.dsl.expr._
import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.AccuracyAnalyzer
import org.apache.griffin.measure.step.transform.{
DataFrameOps,
DataFrameOpsTransformStep,
SparkSqlTransformStep
}
import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys
import org.apache.griffin.measure.step.write.{
DataSourceUpdateWriteStep,
MetricWriteStep,
RecordWriteStep
}
import org.apache.griffin.measure.utils.ParamUtil._
/**
* generate accuracy dq steps
*/
case class AccuracyExpr2DQSteps(context: DQContext, expr: Expr, ruleParam: RuleParam)
extends Expr2DQSteps {
private object AccuracyKeys {
val _source = "source"
val _target = "target"
val _miss = "miss"
val _total = "total"
val _matched = "matched"
val _matchedFraction = "matchedFraction"
}
import AccuracyKeys._
def getDQSteps: Seq[DQStep] = {
val details = ruleParam.getDetails
val accuracyExpr = expr.asInstanceOf[LogicalExpr]
val sourceName = details.getString(_source, context.getDataSourceName(0))
val targetName = details.getString(_target, context.getDataSourceName(1))
val analyzer = AccuracyAnalyzer(accuracyExpr, sourceName, targetName)
val procType = context.procType
val timestamp = context.contextId.timestamp
if (!context.runTimeTableRegister.existsTable(sourceName)) {
warn(s"[$timestamp] data source $sourceName not exists")
Nil
} else {
// 1. miss record
val missRecordsTableName = "__missRecords"
val selClause = s"`$sourceName`.*"
val missRecordsSql =
if (!context.runTimeTableRegister.existsTable(targetName)) {
warn(s"[$timestamp] data source $targetName not exists")
s"SELECT $selClause FROM `$sourceName`"
} else {
val onClause = expr.coalesceDesc
val sourceIsNull = analyzer.sourceSelectionExprs
.map { sel =>
s"${sel.desc} IS NULL"
}
.mkString(" AND ")
val targetIsNull = analyzer.targetSelectionExprs
.map { sel =>
s"${sel.desc} IS NULL"
}
.mkString(" AND ")
val whereClause = s"(NOT ($sourceIsNull)) AND ($targetIsNull)"
s"SELECT $selClause FROM `$sourceName` " +
s"LEFT JOIN `$targetName` ON $onClause WHERE $whereClause"
}
val missRecordsWriteSteps = procType match {
case BatchProcessType =>
val rwName =
ruleParam
.getOutputOpt(RecordOutputType)
.flatMap(_.getNameOpt)
.getOrElse(missRecordsTableName)
RecordWriteStep(rwName, missRecordsTableName)
case StreamingProcessType =>
val dsName =
ruleParam
.getOutputOpt(DscUpdateOutputType)
.flatMap(_.getNameOpt)
.getOrElse(sourceName)
DataSourceUpdateWriteStep(dsName, missRecordsTableName)
}
val missRecordsTransStep =
SparkSqlTransformStep(
missRecordsTableName,
missRecordsSql,
emptyMap,
Some(missRecordsWriteSteps),
cache = true)
// 2. miss count
val missCountTableName = "__missCount"
val missColName = details.getStringOrKey(_miss)
val missCountSql = procType match {
case BatchProcessType =>
s"SELECT COUNT(*) AS `$missColName` FROM `$missRecordsTableName`"
case StreamingProcessType =>
s"SELECT `${ConstantColumns.tmst}`,COUNT(*) AS `$missColName` " +
s"FROM `$missRecordsTableName` GROUP BY `${ConstantColumns.tmst}`"
}
val missCountTransStep =
SparkSqlTransformStep(missCountTableName, missCountSql, emptyMap)
missCountTransStep.parentSteps += missRecordsTransStep
// 3. total count
val totalCountTableName = "__totalCount"
val totalColName = details.getStringOrKey(_total)
val totalCountSql = procType match {
case BatchProcessType =>
s"SELECT COUNT(*) AS `$totalColName` FROM `$sourceName`"
case StreamingProcessType =>
s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `$totalColName` " +
s"FROM `$sourceName` GROUP BY `${ConstantColumns.tmst}`"
}
val totalCountTransStep =
SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
// 4. accuracy metric
val accuracyTableName = ruleParam.getOutDfName()
val matchedColName = details.getStringOrKey(_matched)
val matchedFractionColName = details.getStringOrKey(_matchedFraction)
val accuracyMetricSql = procType match {
case BatchProcessType =>
s"""
SELECT A.total AS `$totalColName`,
A.miss AS `$missColName`,
(A.total - A.miss) AS `$matchedColName`,
coalesce( (A.total - A.miss) / A.total, 1.0) AS `$matchedFractionColName`
FROM (
SELECT `$totalCountTableName`.`$totalColName` AS total,
coalesce(`$missCountTableName`.`$missColName`, 0) AS miss
FROM `$totalCountTableName` LEFT JOIN `$missCountTableName`
) AS A
"""
case StreamingProcessType =>
// scalastyle:off
s"""
|SELECT `$totalCountTableName`.`${ConstantColumns.tmst}` AS `${ConstantColumns.tmst}`,
|`$totalCountTableName`.`$totalColName` AS `$totalColName`,
|coalesce(`$missCountTableName`.`$missColName`, 0) AS `$missColName`,
|(`$totalCountTableName`.`$totalColName` - coalesce(`$missCountTableName`.`$missColName`, 0)) AS `$matchedColName`
|FROM `$totalCountTableName` LEFT JOIN `$missCountTableName`
|ON `$totalCountTableName`.`${ConstantColumns.tmst}` = `$missCountTableName`.`${ConstantColumns.tmst}`
""".stripMargin
// scalastyle:on
}
val accuracyMetricWriteStep = procType match {
case BatchProcessType =>
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName =
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt
.map(_.getFlatten)
.getOrElse(DefaultFlattenType)
Some(MetricWriteStep(mwName, accuracyTableName, flattenType))
case StreamingProcessType => None
}
val accuracyTransStep =
SparkSqlTransformStep(
accuracyTableName,
accuracyMetricSql,
emptyMap,
accuracyMetricWriteStep)
accuracyTransStep.parentSteps += missCountTransStep
accuracyTransStep.parentSteps += totalCountTransStep
procType match {
case BatchProcessType => accuracyTransStep :: Nil
// streaming extra steps
case StreamingProcessType =>
// 5. accuracy metric merge
val accuracyMetricTableName = "__accuracy"
val accuracyMetricRule = DataFrameOps._accuracy
val accuracyMetricDetails: Map[String, Any] = Map(
(AccuracyOprKeys._miss, missColName),
(AccuracyOprKeys._total, totalColName),
(AccuracyOprKeys._matched, matchedColName))
val accuracyMetricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt
.flatMap(_.getNameOpt)
.getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt
.map(_.getFlatten)
.getOrElse(DefaultFlattenType)
MetricWriteStep(mwName, accuracyMetricTableName, flattenType)
}
val accuracyMetricTransStep = DataFrameOpsTransformStep(
accuracyMetricTableName,
accuracyTableName,
accuracyMetricRule,
accuracyMetricDetails,
Some(accuracyMetricWriteStep))
accuracyMetricTransStep.parentSteps += accuracyTransStep
// 6. collect accuracy records
val accuracyRecordTableName = "__accuracyRecords"
val accuracyRecordSql = {
s"""
|SELECT `${ConstantColumns.tmst}`, `${ConstantColumns.empty}`
|FROM `$accuracyMetricTableName` WHERE `${ConstantColumns.record}`
""".stripMargin
}
val accuracyRecordWriteStep = {
val rwName =
ruleParam
.getOutputOpt(RecordOutputType)
.flatMap(_.getNameOpt)
.getOrElse(missRecordsTableName)
RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName))
}
val accuracyRecordTransStep = SparkSqlTransformStep(
accuracyRecordTableName,
accuracyRecordSql,
emptyMap,
Some(accuracyRecordWriteStep))
accuracyRecordTransStep.parentSteps += accuracyMetricTransStep
accuracyRecordTransStep :: Nil
}
}
}
}