blob: 926418555d631bb4e4d6b9c14220b7e25e3be37c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition
import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.commons.lang.StringUtils
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.enums.DqType._
import org.apache.griffin.measure.configuration.enums.DslType.{DslType, GriffinDsl}
import org.apache.griffin.measure.configuration.enums.FlattenType.{
DefaultFlattenType,
FlattenType
}
import org.apache.griffin.measure.configuration.enums.OutputType.{OutputType, UnknownOutputType}
import org.apache.griffin.measure.configuration.enums.SinkType.SinkType
/**
* dq param
* @param name name of dq measurement (must)
* @param timestamp default timestamp of measure in batch mode (optional)
* @param procType batch mode or streaming mode (must)
* @param dataSources data sources (must)
* @param evaluateRule dq measurement (must)
* @param sinks sink types (optional, by default will be elasticsearch)
*/
@JsonInclude(Include.NON_NULL)
case class DQConfig(
@JsonProperty("name") private val name: String,
@JsonProperty("timestamp") private val timestamp: Long,
@JsonProperty("process.type") private val procType: String,
@JsonProperty("data.sources") private val dataSources: List[DataSourceParam],
@JsonProperty("evaluate.rule") private val evaluateRule: EvaluateRuleParam,
@JsonProperty("sinks") private val sinks: List[String])
extends Param {
def getName: String = name
def getTimestampOpt: Option[Long] = if (timestamp != 0) Some(timestamp) else None
def getProcType: String = procType
def getDataSources: Seq[DataSourceParam] = {
dataSources
.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, ds) =>
val (seq, names) = ret
if (!names.contains(ds.getName)) {
(seq :+ ds, names + ds.getName)
} else ret
}
._1
}
def getEvaluateRule: EvaluateRuleParam = evaluateRule
def getValidSinkTypes: Seq[SinkType] =
SinkType.validSinkTypes(if (sinks != null) sinks else Nil)
def validate(): Unit = {
assert(StringUtils.isNotBlank(name), "dq config name should not be blank")
assert(StringUtils.isNotBlank(procType), "process.type should not be blank")
assert(dataSources != null, "data.sources should not be null")
assert(evaluateRule != null, "evaluate.rule should not be null")
getDataSources.foreach(_.validate())
evaluateRule.validate()
}
}
/**
* data source param
* @param name data source name (must)
* @param baseline data source is baseline or not, false by default (optional)
* @param connector data connector (optional)
* @param checkpoint data source checkpoint configuration (must in streaming mode with streaming connectors)
*/
@JsonInclude(Include.NON_NULL)
case class DataSourceParam(
@JsonProperty("name") private val name: String,
@JsonProperty("connector") private val connector: DataConnectorParam,
@JsonProperty("baseline") private val baseline: Boolean = false,
@JsonProperty("checkpoint") private val checkpoint: Map[String, Any] = null)
extends Param {
def getName: String = name
def isBaseline: Boolean = if (Option(baseline).isDefined) baseline else false
def getConnector: Option[DataConnectorParam] = Option(connector)
def getCheckpointOpt: Option[Map[String, Any]] = Option(checkpoint)
def validate(): Unit = {
assert(StringUtils.isNotBlank(name), "data source name should not be empty")
assert(getConnector.isDefined, "Connector is undefined or invalid")
getConnector.foreach(_.validate())
}
}
/**
* data connector param
* @param conType data connector type, e.g.: hive, avro, kafka (must)
* @param dataFrameName data connector dataframe name, for pre-process input usage (optional)
* @param config detail configuration of data connector (must)
* @param preProc pre-process rules after load data (optional)
*/
@JsonInclude(Include.NON_NULL)
case class DataConnectorParam(
@JsonProperty("type") private val conType: String,
@JsonProperty("dataframe.name") private val dataFrameName: String,
@JsonProperty("config") private val config: Map[String, Any],
@JsonProperty("pre.proc") private val preProc: List[RuleParam])
extends Param {
def getType: String = conType
def getDataFrameName(defName: String): String =
if (dataFrameName != null) dataFrameName else defName
def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
def getPreProcRules: Seq[RuleParam] = if (preProc != null) preProc else Nil
def validate(): Unit = {
assert(StringUtils.isNotBlank(conType), "data connector type should not be empty")
getPreProcRules.foreach(_.validate())
}
}
/**
* evaluate rule param
* @param rules rules to define dq measurement (optional)
*/
@JsonInclude(Include.NON_NULL)
case class EvaluateRuleParam(@JsonProperty("rules") private val rules: List[RuleParam])
extends Param {
def getRules: Seq[RuleParam] = if (rules != null) rules else Nil
def validate(): Unit = {
getRules.foreach(_.validate())
}
}
/**
* rule param
* @param dslType dsl type of this rule (must)
* @param dqType dq type of this rule (must if dsl type is "griffin-dsl")
* @param inDfName name of input dataframe of this rule, by default will be the previous rule output dataframe name
* @param outDfName name of output dataframe of this rule, by default will be generated
* as data connector dataframe name with index suffix
* @param rule rule to define dq step calculation (must)
* @param details detail config of rule (optional)
* @param cache cache the result for multiple usage (optional, valid for "spark-sql" and "df-ops" mode)
* @param outputs output ways configuration (optional)
* @param errorConfs error configuration (valid for 'COMPLETENESS' mode)
*/
@JsonInclude(Include.NON_NULL)
case class RuleParam(
@JsonProperty("dsl.type") private val dslType: String,
@JsonProperty("dq.type") private val dqType: String,
@JsonProperty("in.dataframe.name") private val inDfName: String = null,
@JsonProperty("out.dataframe.name") private val outDfName: String = null,
@JsonProperty("rule") private val rule: String = null,
@JsonProperty("details") private val details: Map[String, Any] = null,
@JsonProperty("cache") private val cache: Boolean = false,
@JsonProperty("out") private val outputs: List[RuleOutputParam] = null,
@JsonProperty("error.confs") private val errorConfs: List[RuleErrorConfParam] = null)
extends Param {
def getDslType: DslType =
if (dslType != null) DslType.withNameWithDefault(dslType) else GriffinDsl
def getDqType: DqType = if (dqType != null) DqType.withNameWithDefault(dqType) else Unknown
def getCache: Boolean = if (cache) cache else false
def getInDfName(defName: String = ""): String = if (inDfName != null) inDfName else defName
def getOutDfName(defName: String = ""): String = if (outDfName != null) outDfName else defName
def getRule: String = if (rule != null) rule else ""
def getDetails: Map[String, Any] = if (details != null) details else Map[String, Any]()
def getOutputs: Seq[RuleOutputParam] = if (outputs != null) outputs else Nil
def getOutputOpt(tp: OutputType): Option[RuleOutputParam] =
getOutputs.find(_.getOutputType == tp)
def getErrorConfs: Seq[RuleErrorConfParam] = if (errorConfs != null) errorConfs else Nil
def replaceInDfName(newName: String): RuleParam = {
if (StringUtils.equals(newName, inDfName)) this
else RuleParam(dslType, dqType, newName, outDfName, rule, details, cache, outputs)
}
def replaceOutDfName(newName: String): RuleParam = {
if (StringUtils.equals(newName, outDfName)) this
else RuleParam(dslType, dqType, inDfName, newName, rule, details, cache, outputs)
}
def replaceInOutDfName(in: String, out: String): RuleParam = {
if (StringUtils.equals(inDfName, in) && StringUtils.equals(outDfName, out)) this
else RuleParam(dslType, dqType, in, out, rule, details, cache, outputs)
}
def replaceRule(newRule: String): RuleParam = {
if (StringUtils.equals(newRule, rule)) this
else RuleParam(dslType, dqType, inDfName, outDfName, newRule, details, cache, outputs)
}
def validate(): Unit = {
assert(
!(getDslType.equals(GriffinDsl) && getDqType.equals(Unknown)),
"unknown dq type for griffin dsl")
getOutputs.foreach(_.validate())
getErrorConfs.foreach(_.validate())
}
}
/**
* out param of rule
* @param outputType output type (must)
* @param name output name (optional)
* @param flatten flatten type of output metric (optional, available in output metric type)
*/
@JsonInclude(Include.NON_NULL)
case class RuleOutputParam(
@JsonProperty("type") private val outputType: String,
@JsonProperty("name") private val name: String,
@JsonProperty("flatten") private val flatten: String)
extends Param {
def getOutputType: OutputType = {
if (outputType != null) OutputType.withNameWithDefault(outputType)
else UnknownOutputType
}
def getNameOpt: Option[String] = Some(name).filter(StringUtils.isNotBlank)
def getFlatten: FlattenType = {
if (StringUtils.isNotBlank(flatten)) FlattenType.withNameWithDefault(flatten)
else DefaultFlattenType
}
def validate(): Unit = {}
}
/**
* error configuration parameter
* @param columnName the name of the column
* @param errorType the way to match error, regex or enumeration
* @param values error value list
*/
@JsonInclude(Include.NON_NULL)
case class RuleErrorConfParam(
@JsonProperty("column.name") private val columnName: String,
@JsonProperty("type") private val errorType: String,
@JsonProperty("values") private val values: List[String])
extends Param {
def getColumnName: Option[String] = Some(columnName).filter(StringUtils.isNotBlank)
def getErrorType: Option[String] = Some(errorType).filter(StringUtils.isNotBlank)
def getValues: Seq[String] = if (values != null) values else Nil
def validate(): Unit = {
assert(
"regex".equalsIgnoreCase(getErrorType.get) ||
"enumeration".equalsIgnoreCase(getErrorType.get),
"error error.conf type")
}
}