blob: 3bb5737db03c2e5bb49706531f2baa5a82bcc671 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.step.builder.dsl.expr._
import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.AccuracyAnalyzer
import org.apache.griffin.measure.step.transform.{DataFrameOps, DataFrameOpsTransformStep, SparkSqlTransformStep}
import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys
import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
import org.apache.griffin.measure.utils.ParamUtil._
/**
* generate accuracy dq steps
*/
case class AccuracyExpr2DQSteps(context: DQContext,
expr: Expr,
ruleParam: RuleParam
) extends Expr2DQSteps {
private object AccuracyKeys {
val _source = "source"
val _target = "target"
val _miss = "miss"
val _total = "total"
val _matched = "matched"
val _matchedFraction = "matchedFraction"
}
import AccuracyKeys._
def getDQSteps(): Seq[DQStep] = {
val details = ruleParam.getDetails
val accuracyExpr = expr.asInstanceOf[LogicalExpr]
val sourceName = details.getString(_source, context.getDataSourceName(0))
val targetName = details.getString(_target, context.getDataSourceName(1))
val analyzer = AccuracyAnalyzer(accuracyExpr, sourceName, targetName)
val procType = context.procType
val timestamp = context.contextId.timestamp
if (!context.runTimeTableRegister.existsTable(sourceName)) {
warn(s"[${timestamp}] data source ${sourceName} not exists")
Nil
} else {
// 1. miss record
val missRecordsTableName = "__missRecords"
val selClause = s"`${sourceName}`.*"
val missRecordsSql = if (!context.runTimeTableRegister.existsTable(targetName)) {
warn(s"[${timestamp}] data source ${targetName} not exists")
s"SELECT ${selClause} FROM `${sourceName}`"
} else {
val onClause = expr.coalesceDesc
val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
s"${sel.desc} IS NULL"
}.mkString(" AND ")
val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
s"${sel.desc} IS NULL"
}.mkString(" AND ")
val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
s"SELECT ${selClause} FROM `${sourceName}` " +
s"LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
}
val missRecordsWriteSteps = procType match {
case BatchProcessType =>
val rwName =
ruleParam.getOutputOpt(RecordOutputType).
flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
RecordWriteStep(rwName, missRecordsTableName)
case StreamingProcessType =>
val dsName =
ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
DataSourceUpdateWriteStep(dsName, missRecordsTableName)
}
val missRecordsTransStep =
SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, Some(missRecordsWriteSteps), true)
// 2. miss count
val missCountTableName = "__missCount"
val missColName = details.getStringOrKey(_miss)
val missCountSql = procType match {
case BatchProcessType =>
s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`"
case StreamingProcessType =>
s"SELECT `${ConstantColumns.tmst}`,COUNT(*) AS `${missColName}` " +
s"FROM `${missRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`"
}
val missCountTransStep = SparkSqlTransformStep(missCountTableName, missCountSql, emptyMap)
missCountTransStep.parentSteps += missRecordsTransStep
// 3. total count
val totalCountTableName = "__totalCount"
val totalColName = details.getStringOrKey(_total)
val totalCountSql = procType match {
case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
case StreamingProcessType =>
s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` " +
s"FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`"
}
val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
// 4. accuracy metric
val accuracyTableName = ruleParam.getOutDfName()
val matchedColName = details.getStringOrKey(_matched)
val matchedFractionColName = details.getStringOrKey(_matchedFraction)
val accuracyMetricSql = procType match {
case BatchProcessType =>
s"""
SELECT A.total AS `${totalColName}`,
A.miss AS `${missColName}`,
(A.total - A.miss) AS `${matchedColName}`,
coalesce( (A.total - A.miss) / A.total, 1.0) AS `${matchedFractionColName}`
FROM (
SELECT `${totalCountTableName}`.`${totalColName}` AS total,
coalesce(`${missCountTableName}`.`${missColName}`, 0) AS miss
FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
) AS A
"""
case StreamingProcessType =>
s"""
|SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS `${ConstantColumns.tmst}`,
|`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
|coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
|(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
|FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
|ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = `${missCountTableName}`.`${ConstantColumns.tmst}`
""".stripMargin
}
val accuracyMetricWriteStep = procType match {
case BatchProcessType =>
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
Some(MetricWriteStep(mwName, accuracyTableName, flattenType))
case StreamingProcessType => None
}
val accuracyTransStep =
SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap, accuracyMetricWriteStep)
accuracyTransStep.parentSteps += missCountTransStep
accuracyTransStep.parentSteps += totalCountTransStep
procType match {
case BatchProcessType => accuracyTransStep :: Nil
// streaming extra steps
case StreamingProcessType =>
// 5. accuracy metric merge
val accuracyMetricTableName = "__accuracy"
val accuracyMetricRule = DataFrameOps._accuracy
val accuracyMetricDetails = Map[String, Any](
(AccuracyOprKeys._miss -> missColName),
(AccuracyOprKeys._total -> totalColName),
(AccuracyOprKeys._matched -> matchedColName)
)
val accuracyMetricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, accuracyMetricTableName, flattenType)
}
val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName,
accuracyTableName, accuracyMetricRule, accuracyMetricDetails, Some(accuracyMetricWriteStep))
accuracyMetricTransStep.parentSteps += accuracyTransStep
// 6. collect accuracy records
val accuracyRecordTableName = "__accuracyRecords"
val accuracyRecordSql = {
s"""
|SELECT `${ConstantColumns.tmst}`, `${ConstantColumns.empty}`
|FROM `${accuracyMetricTableName}` WHERE `${ConstantColumns.record}`
""".stripMargin
}
val accuracyRecordWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
.getOrElse(missRecordsTableName)
RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName))
}
val accuracyRecordTransStep = SparkSqlTransformStep(
accuracyRecordTableName, accuracyRecordSql, emptyMap, Some(accuracyRecordWriteStep))
accuracyRecordTransStep.parentSteps += accuracyMetricTransStep
accuracyRecordTransStep :: Nil
}
}
}
}