blob: 65460c3b7f231f24374af92316057b48b8c33ece [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.step.builder.dsl.expr.{DistinctnessClause, _}
import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.DistinctnessAnalyzer
import org.apache.griffin.measure.step.transform.SparkSqlTransformStep
import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
import org.apache.griffin.measure.utils.ParamUtil._
/**
* generate distinctness dq steps
*/
case class DistinctnessExpr2DQSteps(context: DQContext,
expr: Expr,
ruleParam: RuleParam
) extends Expr2DQSteps {
private object DistinctnessKeys {
val _source = "source"
val _target = "target"
val _distinct = "distinct"
val _total = "total"
val _dup = "dup"
val _accu_dup = "accu_dup"
val _num = "num"
val _duplicationArray = "duplication.array"
val _withAccumulate = "with.accumulate"
val _recordEnable = "record.enable"
}
import DistinctnessKeys._
def getDQSteps(): Seq[DQStep] = {
val details = ruleParam.getDetails
val distinctnessExpr = expr.asInstanceOf[DistinctnessClause]
val sourceName = details.getString(_source, context.getDataSourceName(0))
val targetName = details.getString(_target, context.getDataSourceName(1))
val analyzer = DistinctnessAnalyzer(distinctnessExpr, sourceName)
val procType = context.procType
val timestamp = context.contextId.timestamp
val dsTimeRanges = context.dataSourceTimeRanges
val beginTmst = dsTimeRanges.get(sourceName).map(_.begin) match {
case Some(t) => t
case _ => throw new Exception(s"empty begin tmst from ${sourceName}")
}
val endTmst = dsTimeRanges.get(sourceName).map(_.end) match {
case Some(t) => t
case _ => throw new Exception(s"empty end tmst from ${sourceName}")
}
val writeTimestampOpt = Some(endTmst)
if (!context.runTimeTableRegister.existsTable(sourceName)) {
warn(s"[${timestamp}] data source ${sourceName} not exists")
Nil
} else {
val withOlderTable = {
details.getBoolean(_withAccumulate, true) &&
context.runTimeTableRegister.existsTable(targetName)
}
val selClause = analyzer.selectionPairs.map { pair =>
val (expr, alias, _) = pair
s"${expr.desc} AS `${alias}`"
}.mkString(", ")
val distAliases = analyzer.selectionPairs.filter(_._3).map(_._2)
val distAliasesClause = distAliases.map( a => s"`${a}`" ).mkString(", ")
val allAliases = analyzer.selectionPairs.map(_._2)
val allAliasesClause = allAliases.map( a => s"`${a}`" ).mkString(", ")
val groupAliases = analyzer.selectionPairs.filter(!_._3).map(_._2)
val groupAliasesClause = groupAliases.map( a => s"`${a}`" ).mkString(", ")
// 1. source alias
val sourceAliasTableName = "__sourceAlias"
val sourceAliasSql = {
s"SELECT ${selClause} FROM `${sourceName}`"
}
val sourceAliasTransStep =
SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, None, true)
// 2. total metric
val totalTableName = "__totalMetric"
val totalColName = details.getStringOrKey(_total)
val totalSql = {
s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
}
val totalMetricWriteStep = {
MetricWriteStep(totalColName, totalTableName, EntriesFlattenType, writeTimestampOpt)
}
val totalTransStep =
SparkSqlTransformStep(totalTableName, totalSql, emptyMap, Some(totalMetricWriteStep))
totalTransStep.parentSteps += sourceAliasTransStep
// 3. group by self
val selfGroupTableName = "__selfGroup"
val dupColName = details.getStringOrKey(_dup)
val accuDupColName = details.getStringOrKey(_accu_dup)
val selfGroupSql = {
s"""
|SELECT ${distAliasesClause}, (COUNT(*) - 1) AS `${dupColName}`,
|TRUE AS `${ConstantColumns.distinct}`
|FROM `${sourceAliasTableName}` GROUP BY ${distAliasesClause}
""".stripMargin
}
val selfGroupTransStep =
SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, None, true)
selfGroupTransStep.parentSteps += sourceAliasTransStep
val transSteps1 = totalTransStep :: selfGroupTransStep :: Nil
val (transSteps2, dupCountTableName) = procType match {
case StreamingProcessType if (withOlderTable) =>
// 4.0 update old data
val targetDsUpdateWriteStep = DataSourceUpdateWriteStep(targetName, targetName)
// 4. older alias
val olderAliasTableName = "__older"
val olderAliasSql = {
s"SELECT ${selClause} FROM `${targetName}` WHERE `${ConstantColumns.tmst}` <= ${beginTmst}"
}
val olderAliasTransStep = SparkSqlTransformStep(olderAliasTableName, olderAliasSql, emptyMap)
// 5. join with older data
val joinedTableName = "__joined"
val selfSelClause = (distAliases :+ dupColName).map { alias =>
s"`${selfGroupTableName}`.`${alias}`"
}.mkString(", ")
val onClause = distAliases.map { alias =>
s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = coalesce(`${olderAliasTableName}`.`${alias}`, '')"
}.mkString(" AND ")
val olderIsNull = distAliases.map { alias =>
s"`${olderAliasTableName}`.`${alias}` IS NULL"
}.mkString(" AND ")
val joinedSql = {
s"""
|SELECT ${selfSelClause}, (${olderIsNull}) AS `${ConstantColumns.distinct}`
|FROM `${olderAliasTableName}` RIGHT JOIN `${selfGroupTableName}`
|ON ${onClause}
""".stripMargin
}
val joinedTransStep = SparkSqlTransformStep(joinedTableName, joinedSql, emptyMap)
joinedTransStep.parentSteps += selfGroupTransStep
joinedTransStep.parentSteps += olderAliasTransStep
// 6. group by joined data
val groupTableName = "__group"
val moreDupColName = "_more_dup"
val groupSql = {
s"""
|SELECT ${distAliasesClause}, `${dupColName}`, `${ConstantColumns.distinct}`,
|COUNT(*) AS `${moreDupColName}`
|FROM `${joinedTableName}`
|GROUP BY ${distAliasesClause}, `${dupColName}`, `${ConstantColumns.distinct}`
""".stripMargin
}
val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap)
groupTransStep.parentSteps += joinedTransStep
// 7. final duplicate count
val finalDupCountTableName = "__finalDupCount"
/**
* dupColName: the duplicate count of duplicated items only occurs in new data,
* which means the distinct one in new data is also duplicate
* accuDupColName: the count of duplicated items accumulated in new data and old data,
* which means the accumulated distinct count in all data
* e.g.: new data [A, A, B, B, C, D], old data [A, A, B, C]
* selfGroupTable will be (A, 1, F), (B, 1, F), (C, 0, T), (D, 0, T)
* joinedTable will be (A, 1, F), (A, 1, F), (B, 1, F), (C, 0, F), (D, 0, T)
* groupTable will be (A, 1, F, 2), (B, 1, F, 1), (C, 0, F, 1), (D, 0, T, 1)
* finalDupCountTable will be (A, F, 2, 3), (B, F, 2, 2), (C, F, 1, 1), (D, T, 0, 0)
* The distinct result of new data only should be: (A, 2), (B, 2), (C, 1), (D, 0),
* which means in new data [A, A, B, B, C, D], [A, A, B, B, C] are all duplicated, only [D] is distinct
*/
val finalDupCountSql = {
s"""
|SELECT ${distAliasesClause}, `${ConstantColumns.distinct}`,
|CASE WHEN `${ConstantColumns.distinct}` THEN `${dupColName}`
|ELSE (`${dupColName}` + 1) END AS `${dupColName}`,
|CASE WHEN `${ConstantColumns.distinct}` THEN `${dupColName}`
|ELSE (`${dupColName}` + `${moreDupColName}`) END AS `${accuDupColName}`
|FROM `${groupTableName}`
""".stripMargin
}
val finalDupCountTransStep =
SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, None, true)
finalDupCountTransStep.parentSteps += groupTransStep
(finalDupCountTransStep :: targetDsUpdateWriteStep :: Nil, finalDupCountTableName)
case _ =>
(selfGroupTransStep :: Nil, selfGroupTableName)
}
// 8. distinct metric
val distTableName = "__distMetric"
val distColName = details.getStringOrKey(_distinct)
val distSql = {
s"""
|SELECT COUNT(*) AS `${distColName}`
|FROM `${dupCountTableName}` WHERE `${ConstantColumns.distinct}`
""".stripMargin
}
val distMetricWriteStep = {
MetricWriteStep(distColName, distTableName, EntriesFlattenType, writeTimestampOpt)
}
val distTransStep =
SparkSqlTransformStep(distTableName, distSql, emptyMap, Some(distMetricWriteStep))
val transSteps3 = distTransStep :: Nil
val duplicationArrayName = details.getString(_duplicationArray, "")
val transSteps4 = if (duplicationArrayName.nonEmpty) {
val recordEnable = details.getBoolean(_recordEnable, false)
if (groupAliases.size > 0) {
// with some group by requirement
// 9. origin data join with distinct information
val informedTableName = "__informed"
val onClause = distAliases.map { alias =>
s"coalesce(`${sourceAliasTableName}`.`${alias}`, '') = coalesce(`${dupCountTableName}`.`${alias}`, '')"
}.mkString(" AND ")
val informedSql = {
s"""
|SELECT `${sourceAliasTableName}`.*,
|`${dupCountTableName}`.`${dupColName}` AS `${dupColName}`,
|`${dupCountTableName}`.`${ConstantColumns.distinct}` AS `${ConstantColumns.distinct}`
|FROM `${sourceAliasTableName}` LEFT JOIN `${dupCountTableName}`
|ON ${onClause}
""".stripMargin
}
val informedTransStep = SparkSqlTransformStep(informedTableName, informedSql, emptyMap)
// 10. add row number
val rnTableName = "__rowNumber"
val rnDistClause = distAliasesClause
val rnSortClause = s"SORT BY `${ConstantColumns.distinct}`"
val rnSql = {
s"""
|SELECT *,
|ROW_NUMBER() OVER (DISTRIBUTE BY ${rnDistClause} ${rnSortClause}) `${ConstantColumns.rowNumber}`
|FROM `${informedTableName}`
""".stripMargin
}
val rnTransStep = SparkSqlTransformStep(rnTableName, rnSql, emptyMap)
rnTransStep.parentSteps += informedTransStep
// 11. recognize duplicate items
val dupItemsTableName = "__dupItems"
val dupItemsSql = {
s"""
|SELECT ${allAliasesClause}, `${dupColName}` FROM `${rnTableName}`
|WHERE NOT `${ConstantColumns.distinct}` OR `${ConstantColumns.rowNumber}` > 1
""".stripMargin
}
val dupItemsWriteStep = {
val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupItemsTableName)
RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt)
}
val dupItemsTransStep = {
if (recordEnable) {
SparkSqlTransformStep(
dupItemsTableName,
dupItemsSql,
emptyMap,
Some(dupItemsWriteStep)
)
} else {
SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap)
}
}
dupItemsTransStep.parentSteps += rnTransStep
// 12. group by dup Record metric
val groupDupMetricTableName = "__groupDupMetric"
val numColName = details.getStringOrKey(_num)
val groupSelClause = groupAliasesClause
val groupDupMetricSql = {
s"""
|SELECT ${groupSelClause}, `${dupColName}`, COUNT(*) AS `${numColName}`
|FROM `${dupItemsTableName}` GROUP BY ${groupSelClause}, `${dupColName}`
""".stripMargin
}
val groupDupMetricWriteStep = {
MetricWriteStep(duplicationArrayName,
groupDupMetricTableName,
ArrayFlattenType,
writeTimestampOpt)
}
val groupDupMetricTransStep =
SparkSqlTransformStep(
groupDupMetricTableName,
groupDupMetricSql,
emptyMap,
Some(groupDupMetricWriteStep)
)
groupDupMetricTransStep.parentSteps += dupItemsTransStep
groupDupMetricTransStep :: Nil
} else {
// no group by requirement
// 9. duplicate record
val dupRecordTableName = "__dupRecords"
val dupRecordSelClause = procType match {
case StreamingProcessType if (withOlderTable) =>
s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`"
case _ => s"${distAliasesClause}, `${dupColName}`"
}
val dupRecordSql = {
s"""
|SELECT ${dupRecordSelClause}
|FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
""".stripMargin
}
val dupRecordWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
.getOrElse(dupRecordTableName)
RecordWriteStep(rwName, dupRecordTableName, None, writeTimestampOpt)
}
val dupRecordTransStep = {
if (recordEnable) {
SparkSqlTransformStep(
dupRecordTableName,
dupRecordSql,
emptyMap,
Some(dupRecordWriteStep),
true
)
} else {
SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, None, true)
}
}
// 10. duplicate metric
val dupMetricTableName = "__dupMetric"
val numColName = details.getStringOrKey(_num)
val dupMetricSql = {
s"""
|SELECT `${dupColName}`, COUNT(*) AS `${numColName}`
|FROM `${dupRecordTableName}` GROUP BY `${dupColName}`
""".stripMargin
}
val dupMetricWriteStep = {
MetricWriteStep(
duplicationArrayName,
dupMetricTableName,
ArrayFlattenType,
writeTimestampOpt
)
}
val dupMetricTransStep =
SparkSqlTransformStep(
dupMetricTableName,
dupMetricSql,
emptyMap,
Some(dupMetricWriteStep)
)
dupMetricTransStep.parentSteps += dupRecordTransStep
dupMetricTransStep :: Nil
}
} else Nil
// full steps
transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4
}
}
}