* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.griffin.measure.configuration.dqdefinition
import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.commons.lang.StringUtils
import org.apache.griffin.measure.configuration.enums.SinkType
import org.apache.griffin.measure.configuration.enums.SinkType.SinkType
* Model for Environment Config.
* @param sparkParam Job specific Spark Configs to override the Defaults set on the cluster
* @param sinkParams A [[Seq]] of sink definitions where records and metrics can be persisted
* @param checkpointParams Config of checkpoint locations (required in streaming mode)
case class EnvConfig(
@JsonProperty("spark") private val sparkParam: SparkParam,
@JsonProperty("sinks") private val sinkParams: List[SinkParam],
@JsonProperty("griffin.checkpoint") private val checkpointParams: List[CheckpointParam])
extends Param {
def getSparkParam: SparkParam = sparkParam
def getSinkParams: Seq[SinkParam] = if (sinkParams != null) sinkParams else Nil
def getCheckpointParams: Seq[CheckpointParam] =
if (checkpointParams != null) checkpointParams else Nil
def validate(): Unit = {
assert(sparkParam != null, "spark param should not be null")
val repeatedSinks = sinkParams
.groupBy(x => x)
.filter(_._2 > 1)
s"sink names must be unique. duplicate sink names ['${repeatedSinks.mkString("', '")}'] were found.")
* spark param
* @param logLevel log level of spark application (optional)
* @param cpDir checkpoint directory for spark streaming (required in streaming mode)
* @param batchInterval batch interval for spark streaming (required in streaming mode)
* @param processInterval process interval for streaming dq calculation (required in streaming mode)
* @param config extra config for spark environment (optional)
* @param initClear clear checkpoint directory or not when initial (optional)
case class SparkParam(
@JsonProperty("log.level") private val logLevel: String,
@JsonProperty("checkpoint.dir") private val cpDir: String,
@JsonProperty("batch.interval") private val batchInterval: String,
@JsonProperty("process.interval") private val processInterval: String,
@JsonProperty("config") private val config: Map[String, String],
@JsonProperty("init.clear") private val initClear: Boolean)
extends Param {
def getLogLevel: String = if (logLevel != null) logLevel else "WARN"
def getCpDir: String = if (cpDir != null) cpDir else ""
def getBatchInterval: String = if (batchInterval != null) batchInterval else ""
def getProcessInterval: String = if (processInterval != null) processInterval else ""
def getConfig: Map[String, String] = if (config != null) config else Map[String, String]()
def needInitClear: Boolean = if (initClear) initClear else false
def validate(): Unit = {
// assert(StringUtils.isNotBlank(cpDir), "checkpoint.dir should not be empty")
// assert(TimeUtil.milliseconds(getBatchInterval).nonEmpty, "batch.interval should be valid time string")
// assert(TimeUtil.milliseconds(getProcessInterval).nonEmpty, "process.interval should be valid time string")
* sink param
* @param sinkType sink type, e.g.: log, hdfs, http, mongo (must)
* @param config config of sink way (must)
case class SinkParam(
@JsonProperty("name") private val name: String,
@JsonProperty("type") private val sinkType: String,
@JsonProperty("config") private val config: Map[String, Any] = Map.empty)
extends Param {
def getName: String = name
def getType: SinkType = SinkType.withNameWithDefault(sinkType)
def getConfig: Map[String, Any] = config
def validate(): Unit = {
assert(name != null, "sink name should must be defined")
assert(StringUtils.isNotBlank(sinkType), "sink type should not be empty")
* checkpoint param
* @param cpType checkpoint location type, e.g.: zookeeper (must)
* @param config config of checkpoint location
case class CheckpointParam(
@JsonProperty("type") private val cpType: String,
@JsonProperty("config") private val config: Map[String, Any])
extends Param {
def getType: String = cpType
def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
def validate(): Unit = {
assert(StringUtils.isNotBlank(cpType), "griffin checkpoint type should not be empty")