blob: 1e5a930e14f84d435015e559b543cf288de0eb05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.launch.batch
import java.util.concurrent.TimeUnit
import scala.util.{Failure, Success, Try}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.griffin.measure.configuration.dqdefinition._
import org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
import org.apache.griffin.measure.context._
import org.apache.griffin.measure.datasource.DataSourceFactory
import org.apache.griffin.measure.execution.MeasureExecutor
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
import org.apache.griffin.measure.utils.CommonUtils
case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
val envParam: EnvConfig = allParam.getEnvConfig
val dqParam: DQConfig = allParam.getDqConfig
val sparkParam: SparkParam = envParam.getSparkParam
val metricName: String = dqParam.getName
val sinkParams: Seq[SinkParam] = getSinkParams
var dqContext: DQContext = _
def retryable: Boolean = false
def init: Try[_] = Try {
// build spark 2.0+ application context
val conf = new SparkConf().setAppName(metricName)
conf.setAll(sparkParam.getConfig)
conf.set("spark.sql.crossJoin.enabled", "true")
sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val logLevel = getGriffinLogLevel
sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
griffinLogger.setLevel(logLevel)
// register udf
GriffinUDFAgent.register(sparkSession)
}
def run: Try[Boolean] = {
val result =
CommonUtils.timeThis(
{
val measureTime = getMeasureTime
val contextId = ContextId(measureTime)
// get data sources
val dataSources =
DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
dataSources.foreach(_.init())
// create dq context
dqContext = DQContext(contextId, metricName, dataSources, sinkParams, BatchProcessType)(
sparkSession)
// start id
val applicationId = sparkSession.sparkContext.applicationId
dqContext.getSinks.foreach(_.open(applicationId))
Try {
val t = Try(MeasureExecutor(dqContext).execute(dqParam.getMeasures))
t match {
case Success(_) =>
case Failure(exception) =>
error("Exception", exception)
}
t.isSuccess
}
},
TimeUnit.MILLISECONDS)
// clean context
dqContext.clean()
// finish
dqContext.getSinks.foreach(_.close())
result
}
def close: Try[_] = Try {
sparkSession.stop()
}
}