blob: a19b35cd27a1f92da499bcf8c1b36838d94a3299 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.step.builder.dsl.expr._
import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.UniquenessAnalyzer
import org.apache.griffin.measure.step.transform.SparkSqlTransformStep
import org.apache.griffin.measure.step.write.{MetricWriteStep, RecordWriteStep}
import org.apache.griffin.measure.utils.ParamUtil._
/**
* generate uniqueness dq steps
*/
case class UniquenessExpr2DQSteps(context: DQContext,
expr: Expr,
ruleParam: RuleParam
) extends Expr2DQSteps {
private object UniquenessKeys {
val _source = "source"
val _target = "target"
val _unique = "unique"
val _total = "total"
val _dup = "dup"
val _num = "num"
val _duplicationArray = "duplication.array"
}
import UniquenessKeys._
def getDQSteps(): Seq[DQStep] = {
val details = ruleParam.getDetails
val uniquenessExpr = expr.asInstanceOf[UniquenessClause]
val sourceName = details.getString(_source, context.getDataSourceName(0))
val targetName = details.getString(_target, context.getDataSourceName(1))
val analyzer = UniquenessAnalyzer(uniquenessExpr, sourceName, targetName)
val procType = context.procType
val timestamp = context.contextId.timestamp
if (!context.runTimeTableRegister.existsTable(sourceName)) {
warn(s"[${timestamp}] data source ${sourceName} not exists")
Nil
} else if (!context.runTimeTableRegister.existsTable(targetName)) {
warn(s"[${timestamp}] data source ${targetName} not exists")
Nil
} else {
val selItemsClause = analyzer.selectionPairs.map { pair =>
val (expr, alias) = pair
s"${expr.desc} AS `${alias}`"
}.mkString(", ")
val aliases = analyzer.selectionPairs.map(_._2)
val selClause = procType match {
case BatchProcessType => selItemsClause
case StreamingProcessType => s"`${ConstantColumns.tmst}`, ${selItemsClause}"
}
val selAliases = procType match {
case BatchProcessType => aliases
case StreamingProcessType => ConstantColumns.tmst +: aliases
}
// 1. source distinct mapping
val sourceTableName = "__source"
val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}"
val sourceTransStep = SparkSqlTransformStep(sourceTableName, sourceSql, emptyMap)
// 2. target mapping
val targetTableName = "__target"
val targetSql = s"SELECT ${selClause} FROM ${targetName}"
val targetTransStep = SparkSqlTransformStep(targetTableName, targetSql, emptyMap)
// 3. joined
val joinedTableName = "__joined"
val joinedSelClause = selAliases.map { alias =>
s"`${sourceTableName}`.`${alias}` AS `${alias}`"
}.mkString(", ")
val onClause = aliases.map { alias =>
s"coalesce(`${sourceTableName}`.`${alias}`, '') = coalesce(`${targetTableName}`.`${alias}`, '')"
}.mkString(" AND ")
val joinedSql = {
s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN `${sourceTableName}` ON ${onClause}"
}
val joinedTransStep = SparkSqlTransformStep(joinedTableName, joinedSql, emptyMap)
joinedTransStep.parentSteps += sourceTransStep
joinedTransStep.parentSteps += targetTransStep
// 4. group
val groupTableName = "__group"
val groupSelClause = selAliases.map { alias =>
s"`${alias}`"
}.mkString(", ")
val dupColName = details.getStringOrKey(_dup)
val groupSql = {
s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` " +
s"FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
}
val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap, None, true)
groupTransStep.parentSteps += joinedTransStep
// 5. total metric
val totalTableName = "__totalMetric"
val totalColName = details.getStringOrKey(_total)
val totalSql = procType match {
case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
case StreamingProcessType =>
s"""
|SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}`
|FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`
""".stripMargin
}
val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, EntriesFlattenType)
val totalTransStep =
SparkSqlTransformStep(totalTableName, totalSql, emptyMap, Some(totalMetricWriteStep))
// 6. unique record
val uniqueRecordTableName = "__uniqueRecord"
val uniqueRecordSql = {
s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0"
}
val uniqueRecordTransStep =
SparkSqlTransformStep(uniqueRecordTableName, uniqueRecordSql, emptyMap)
uniqueRecordTransStep.parentSteps += groupTransStep
// 7. unique metric
val uniqueTableName = "__uniqueMetric"
val uniqueColName = details.getStringOrKey(_unique)
val uniqueSql = procType match {
case BatchProcessType =>
s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`"
case StreamingProcessType =>
s"""
|SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${uniqueColName}`
|FROM `${uniqueRecordTableName}` GROUP BY `${ConstantColumns.tmst}`
""".stripMargin
}
val uniqueMetricWriteStep =
MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
val uniqueTransStep =
SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap, Some(uniqueMetricWriteStep))
uniqueTransStep.parentSteps += uniqueRecordTransStep
val transSteps1 = totalTransStep :: uniqueTransStep :: Nil
val duplicationArrayName = details.getString(_duplicationArray, "")
val transSteps2 = if (duplicationArrayName.nonEmpty) {
// 8. duplicate record
val dupRecordTableName = "__dupRecords"
val dupRecordSql = {
s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0"
}
val dupRecordWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
.getOrElse(dupRecordTableName)
RecordWriteStep(rwName, dupRecordTableName)
}
val dupRecordTransStep =
SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, Some(dupRecordWriteStep), true)
// 9. duplicate metric
val dupMetricTableName = "__dupMetric"
val numColName = details.getStringOrKey(_num)
val dupMetricSelClause = procType match {
case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`"
case StreamingProcessType =>
s"`${ConstantColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`"
}
val dupMetricGroupbyClause = procType match {
case BatchProcessType => s"`${dupColName}`"
case StreamingProcessType => s"`${ConstantColumns.tmst}`, `${dupColName}`"
}
val dupMetricSql = {
s"""
|SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}`
|GROUP BY ${dupMetricGroupbyClause}
""".stripMargin
}
val dupMetricWriteStep = {
MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType)
}
val dupMetricTransStep =
SparkSqlTransformStep(dupMetricTableName,
dupMetricSql,
emptyMap,
Some(dupMetricWriteStep)
)
dupMetricTransStep.parentSteps += dupRecordTransStep
dupMetricTransStep :: Nil
} else Nil
// full steps
transSteps1 ++ transSteps2
}
}
}