blob: 2e227a39a72eb9e2961b7c588524164ba69554f5 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition
import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.commons.lang.StringUtils
import org.apache.griffin.measure.configuration.enums._
/**
* environment param
* @param sparkParam config of spark environment (must)
* @param sinkParams config of sink ways (optional)
* @param checkpointParams config of checkpoint locations (required in streaming mode)
*/
@JsonInclude(Include.NON_NULL)
case class EnvConfig(@JsonProperty("spark") private val sparkParam: SparkParam,
@JsonProperty("sinks") private val sinkParams: List[SinkParam],
@JsonProperty("griffin.checkpoint") private val checkpointParams: List[CheckpointParam]
) extends Param {
def getSparkParam: SparkParam = sparkParam
def getSinkParams: Seq[SinkParam] = if (sinkParams != null) sinkParams else Nil
def getCheckpointParams: Seq[CheckpointParam] = if (checkpointParams != null) checkpointParams else Nil
def validate(): Unit = {
assert((sparkParam != null), "spark param should not be null")
sparkParam.validate
getSinkParams.foreach(_.validate)
getCheckpointParams.foreach(_.validate)
}
}
/**
* spark param
* @param logLevel log level of spark application (optional)
* @param cpDir checkpoint directory for spark streaming (required in streaming mode)
* @param batchInterval batch interval for spark streaming (required in streaming mode)
* @param processInterval process interval for streaming dq calculation (required in streaming mode)
* @param config extra config for spark environment (optional)
* @param initClear clear checkpoint directory or not when initial (optional)
*/
@JsonInclude(Include.NON_NULL)
case class SparkParam( @JsonProperty("log.level") private val logLevel: String,
@JsonProperty("checkpoint.dir") private val cpDir: String,
@JsonProperty("batch.interval") private val batchInterval: String,
@JsonProperty("process.interval") private val processInterval: String,
@JsonProperty("config") private val config: Map[String, String],
@JsonProperty("init.clear") private val initClear: Boolean
) extends Param {
def getLogLevel: String = if (logLevel != null) logLevel else "WARN"
def getCpDir: String = if (cpDir != null) cpDir else ""
def getBatchInterval: String = if (batchInterval != null) batchInterval else ""
def getProcessInterval: String = if (processInterval != null) processInterval else ""
def getConfig: Map[String, String] = if (config != null) config else Map[String, String]()
def needInitClear: Boolean = if (initClear) initClear else false
def validate(): Unit = {
// assert(StringUtils.isNotBlank(cpDir), "checkpoint.dir should not be empty")
// assert(TimeUtil.milliseconds(getBatchInterval).nonEmpty, "batch.interval should be valid time string")
// assert(TimeUtil.milliseconds(getProcessInterval).nonEmpty, "process.interval should be valid time string")
}
}
/**
* sink param
* @param sinkType sink type, e.g.: log, hdfs, http, mongo (must)
* @param config config of sink way (must)
*/
@JsonInclude(Include.NON_NULL)
case class SinkParam(@JsonProperty("type") private val sinkType: String,
@JsonProperty("config") private val config: Map[String, Any]
) extends Param {
def getType: SinkType = SinkType(sinkType)
def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
def validate(): Unit = {
assert(StringUtils.isNotBlank(sinkType), "sink type should not be empty")
}
}
/**
* checkpoint param
* @param cpType checkpoint location type, e.g.: zookeeper (must)
* @param config config of checkpoint location
*/
@JsonInclude(Include.NON_NULL)
case class CheckpointParam(@JsonProperty("type") private val cpType: String,
@JsonProperty("config") private val config: Map[String, Any]
) extends Param {
def getType: String = cpType
def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
def validate(): Unit = {
assert(StringUtils.isNotBlank(cpType), "griffin checkpoint type should not be empty")
}
}