[GRIFFIN-304] Eliminate older contexts
**What changes were proposed in this pull request?**
As SparkSession is a direct replacement for SparkContext, SQLContext and HiveContext, there is no need to pass/ instantiate them. If any of the oder contexts are needed, they can be derived from SparkSession.
This issue aims to eliminate dependency on older Contexts in favour of SparkSession.
**Does this PR introduce any user-facing change?**
No
**How was this patch tested?**
Griffin test suite.
Author: chitralverma <chitralverma@gmail.com>
Closes #557 from chitralverma/eliminate-older-contexts.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index 2fdf409..8069632 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -18,7 +18,7 @@
*/
package org.apache.griffin.measure.context
-import org.apache.spark.sql.{Encoders, SparkSession, SQLContext}
+import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.griffin.measure.configuration.dqdefinition._
import org.apache.griffin.measure.configuration.enums._
@@ -37,10 +37,8 @@
procType: ProcessType
)(@transient implicit val sparkSession: SparkSession) {
- val sqlContext: SQLContext = sparkSession.sqlContext
-
val compileTableRegister: CompileTableRegister = CompileTableRegister()
- val runTimeTableRegister: RunTimeTableRegister = RunTimeTableRegister(sqlContext)
+ val runTimeTableRegister: RunTimeTableRegister = RunTimeTableRegister(sparkSession)
val dataFrameCache: DataFrameCache = DataFrameCache()
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
index c4dda3b..d1d9dbe 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
@@ -61,7 +61,7 @@
/**
* register table name and create temp view during calculation
*/
-case class RunTimeTableRegister(@transient sqlContext: SQLContext) extends TableRegister {
+case class RunTimeTableRegister(@transient sparkSession: SparkSession) extends TableRegister {
def registerTable(name: String, df: DataFrame): Unit = {
registerTable(name)
@@ -70,13 +70,13 @@
override def unregisterTable(name: String): Unit = {
if (existsTable(name)) {
- sqlContext.dropTempTable(name)
+ sparkSession.catalog.dropTempView(name)
tables -= name
}
}
override def unregisterAllTables(): Unit = {
val uts = getTables
- uts.foreach(t => sqlContext.dropTempTable(t))
+ uts.foreach(t => sparkSession.catalog.dropTempView(t))
tables.clear
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
index 28e616b..1b1bcf0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
@@ -52,7 +52,7 @@
// for streaming data cache
val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
- sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage)
+ sparkSession, dataSourceParam.getCheckpointOpt, name, index, timestampStorage)
val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index a03a468..5bfe41a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -43,7 +43,7 @@
trait StreamingCacheClient
extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable {
- val sqlContext: SQLContext
+ val sparkSession: SparkSession
val param: Map[String, Any]
val dsName: String
val index: Int
@@ -181,7 +181,7 @@
// new cache data
val newDfOpt = try {
- val dfr = sqlContext.read
+ val dfr = sparkSession.read
readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
} catch {
case e: Throwable =>
@@ -194,7 +194,7 @@
val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
val oldDfPath = s"${oldFilePath}/${idx}"
try {
- val dfr = sqlContext.read
+ val dfr = sparkSession.read
readDataFrameOpt(dfr, oldDfPath).map(_.filter(filterStr))
} catch {
case e: Throwable =>
@@ -329,7 +329,7 @@
val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}"
val updateDf = df.filter(filterStr)
- val prlCount = sqlContext.sparkContext.defaultParallelism
+ val prlCount = sparkSession.sparkContext.defaultParallelism
// repartition
val repartitionedDf = updateDf.repartition(prlCount)
val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
index eeda8ef..1948438 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
@@ -18,7 +18,7 @@
*/
package org.apache.griffin.measure.datasource.cache
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.datasource.TimestampStorage
@@ -37,14 +37,14 @@
/**
* create streaming cache client
- * @param sqlContext sqlContext in spark environment
+ * @param sparkSession sparkSession in spark environment
* @param checkpointOpt data source checkpoint/cache config option
* @param name data source name
* @param index data source index
* @param tmstCache the same tmstCache instance inside a data source
* @return streaming cache client option
*/
- def getClientOpt(sqlContext: SQLContext, checkpointOpt: Option[Map[String, Any]],
+ def getClientOpt(sparkSession: SparkSession, checkpointOpt: Option[Map[String, Any]],
name: String, index: Int, tmstCache: TimestampStorage
): Option[StreamingCacheClient] = {
checkpointOpt.flatMap { param =>
@@ -52,13 +52,13 @@
val tp = param.getString(_type, "")
val dsCache = tp match {
case ParquetRegex() =>
- StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+ StreamingCacheParquetClient(sparkSession, param, name, index, tmstCache)
case JsonRegex() =>
- StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
+ StreamingCacheJsonClient(sparkSession, param, name, index, tmstCache)
case OrcRegex() =>
- StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
+ StreamingCacheOrcClient(sparkSession, param, name, index, tmstCache)
case _ =>
- StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+ StreamingCacheParquetClient(sparkSession, param, name, index, tmstCache)
}
Some(dsCache)
} catch {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
index c81d4d1..8db918a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
@@ -25,7 +25,7 @@
/**
* data source cache in json format
*/
-case class StreamingCacheJsonClient(sqlContext: SQLContext, param: Map[String, Any],
+case class StreamingCacheJsonClient(sparkSession: SparkSession, param: Map[String, Any],
dsName: String, index: Int, timestampStorage: TimestampStorage
) extends StreamingCacheClient {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
index 0649b74..5707cfa 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
@@ -25,7 +25,7 @@
/**
* data source cache in orc format
*/
-case class StreamingCacheOrcClient(sqlContext: SQLContext, param: Map[String, Any],
+case class StreamingCacheOrcClient(sparkSession: SparkSession, param: Map[String, Any],
dsName: String, index: Int, timestampStorage: TimestampStorage
) extends StreamingCacheClient {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
index 9c369ee..699ccc2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
@@ -25,14 +25,14 @@
/**
* data source cache in parquet format
*/
-case class StreamingCacheParquetClient(sqlContext: SQLContext,
+case class StreamingCacheParquetClient(sparkSession: SparkSession,
param: Map[String, Any],
dsName: String,
index: Int,
timestampStorage: TimestampStorage
) extends StreamingCacheClient {
- sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
+ sparkSession.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
info(s"write path: ${path}")
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
index 85b4774..d37f51b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -98,7 +98,7 @@
// def metaData(): Try[Iterable[(String, String)]] = {
// Try {
-// val st = sqlContext.read.format("com.databricks.spark.avro").
+// val st = sparkSession.read.format("com.databricks.spark.avro").
// load(concreteFileFullPath).schema
// st.fields.map(f => (f.name, f.dataType.typeName))
// }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index c05d043..a4a1d37 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -23,7 +23,7 @@
import scala.util.Try
import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.SparkSession
import org.apache.griffin.measure.configuration.dqdefinition._
import org.apache.griffin.measure.configuration.enums._
@@ -43,7 +43,6 @@
val metricName = dqParam.getName
val sinkParams = getSinkParams
- var sqlContext: SQLContext = _
var dqContext: DQContext = _
def retryable: Boolean = false
@@ -57,10 +56,9 @@
val logLevel = getGriffinLogLevel()
sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
griffinLogger.setLevel(logLevel)
- sqlContext = sparkSession.sqlContext
// register udf
- GriffinUDFAgent.register(sqlContext)
+ GriffinUDFAgent.register(sparkSession)
}
def run: Try[Boolean] = Try {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index be32eba..57e6f82 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -24,7 +24,7 @@
import scala.util.Try
import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.griffin.measure.Loggable
@@ -49,8 +49,6 @@
val metricName = dqParam.getName
val sinkParams = getSinkParams
- var sqlContext: SQLContext = _
-
def retryable: Boolean = true
def init: Try[_] = Try {
@@ -62,7 +60,6 @@
val logLevel = getGriffinLogLevel()
sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
griffinLogger.setLevel(logLevel)
- sqlContext = sparkSession.sqlContext
// clear checkpoint directory
clearCpDir
@@ -72,7 +69,7 @@
OffsetCheckpointClient.init
// register udf
- GriffinUDFAgent.register(sqlContext)
+ GriffinUDFAgent.register(sparkSession)
}
def run: Try[Boolean] = Try {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
index 61b93ab..baa7a8c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
@@ -18,12 +18,12 @@
*/
package org.apache.griffin.measure.step.builder.udf
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
object GriffinUDFAgent {
- def register(sqlContext: SQLContext): Unit = {
- GriffinUDFs.register(sqlContext)
- GriffinUDAggFs.register(sqlContext)
+ def register(sparkSession: SparkSession): Unit = {
+ GriffinUDFs.register(sparkSession)
+ GriffinUDAggFs.register(sparkSession)
}
}
@@ -32,10 +32,10 @@
*/
object GriffinUDFs {
- def register(sqlContext: SQLContext): Unit = {
- sqlContext.udf.register("index_of", indexOf _)
- sqlContext.udf.register("matches", matches _)
- sqlContext.udf.register("reg_replace", regReplace _)
+ def register(sparkSession: SparkSession): Unit = {
+ sparkSession.udf.register("index_of", indexOf _)
+ sparkSession.udf.register("matches", matches _)
+ sparkSession.udf.register("reg_replace", regReplace _)
}
private def indexOf(arr: Seq[String], v: String) = {
@@ -57,7 +57,7 @@
*/
object GriffinUDAggFs {
- def register(sqlContext: SQLContext): Unit = {
+ def register(sparkSession: SparkSession): Unit = {
}
}
\ No newline at end of file
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
index ba64d33..8678db5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
@@ -20,8 +20,9 @@
import java.util.Date
-import org.apache.spark.sql.{Encoders, Row, SQLContext, _}
+import org.apache.spark.sql.{Encoders, Row, _}
import org.apache.spark.sql.types._
+
import org.apache.griffin.measure.context.ContextId
import org.apache.griffin.measure.context.streaming.metric._
import org.apache.griffin.measure.context.streaming.metric.CacheResults.CacheResult
@@ -45,7 +46,7 @@
val _matchedFraction = "matchedFraction"
}
- def fromJson(sqlContext: SQLContext,
+ def fromJson(sparkSession: SparkSession,
inputDfName: String,
details: Map[String, Any]): DataFrame = {
val _colName = "col.name"
@@ -53,15 +54,15 @@
implicit val encoder = Encoders.STRING
- val df: DataFrame = sqlContext.table(s"`${inputDfName}`")
+ val df: DataFrame = sparkSession.table(s"`${inputDfName}`")
val rdd = colNameOpt match {
case Some(colName: String) => df.map(r => r.getAs[String](colName))
case _ => df.map(_.getAs[String](0))
}
- sqlContext.read.json(rdd) // slow process
+ sparkSession.read.json(rdd) // slow process
}
- def accuracy(sqlContext: SQLContext,
+ def accuracy(sparkSession: SparkSession,
inputDfName: String,
contextId: ContextId,
details: Map[String, Any]): DataFrame = {
@@ -82,7 +83,7 @@
}
}
- val df = sqlContext.table(s"`${inputDfName}`")
+ val df = sparkSession.table(s"`${inputDfName}`")
val results = df.rdd.flatMap { row =>
try {
@@ -116,16 +117,16 @@
val ar = r.result.asInstanceOf[AccuracyMetric]
Row(r.timeStamp, ar.miss, ar.total, ar.getMatch, ar.matchFraction, !ar.initial, ar.eventual)
}.toArray
- val rowRdd = sqlContext.sparkContext.parallelize(rows)
- val retDf = sqlContext.createDataFrame(rowRdd, schema)
+ val rowRdd = sparkSession.sparkContext.parallelize(rows)
+ val retDf = sparkSession.createDataFrame(rowRdd, schema)
retDf
}
- def clear(sqlContext: SQLContext, inputDfName: String, details: Map[String, Any]): DataFrame = {
- val df = sqlContext.table(s"`${inputDfName}`")
- val emptyRdd = sqlContext.sparkContext.emptyRDD[Row]
- sqlContext.createDataFrame(emptyRdd, df.schema)
+ def clear(sparkSession: SparkSession, inputDfName: String, details: Map[String, Any]): DataFrame = {
+ val df = sparkSession.table(s"`${inputDfName}`")
+ val emptyRdd = sparkSession.sparkContext.emptyRDD[Row]
+ sparkSession.createDataFrame(emptyRdd, df.schema)
}
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index c393706..c5bcb13 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -33,14 +33,14 @@
) extends TransformStep {
def doExecute(context: DQContext): Boolean = {
- val sqlContext = context.sqlContext
+ val sparkSession = context.sparkSession
try {
val df = rule match {
- case DataFrameOps._fromJson => DataFrameOps.fromJson(sqlContext, inputDfName, details)
+ case DataFrameOps._fromJson => DataFrameOps.fromJson(sparkSession, inputDfName, details)
case DataFrameOps._accuracy =>
- DataFrameOps.accuracy(sqlContext, inputDfName, context.contextId, details)
+ DataFrameOps.accuracy(sparkSession, inputDfName, context.contextId, details)
- case DataFrameOps._clear => DataFrameOps.clear(sqlContext, inputDfName, details)
+ case DataFrameOps._clear => DataFrameOps.clear(sparkSession, inputDfName, details)
case _ => throw new Exception(s"df opr [ ${rule} ] not supported")
}
if (cache) context.dataFrameCache.cacheDataFrame(name, df)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index 00edf07..e5c18e3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -31,9 +31,9 @@
cache: Boolean = false
) extends TransformStep {
def doExecute(context: DQContext): Boolean = {
- val sqlContext = context.sqlContext
+ val sparkSession = context.sparkSession
try {
- val df = sqlContext.sql(rule)
+ val df = sparkSession.sql(rule)
if (cache) context.dataFrameCache.cacheDataFrame(name, df)
context.runTimeTableRegister.registerTable(name, df)
writeStepOpt match {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
index d2805cf..92f6c00 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
@@ -47,7 +47,7 @@
private def getDataFrame(context: DQContext, name: String): Option[DataFrame] = {
try {
- val df = context.sqlContext.table(s"`${name}`")
+ val df = context.sparkSession.table(s"`${name}`")
Some(df)
} catch {
case e: Throwable =>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index c7ebae7..3f3eaf5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -75,7 +75,7 @@
private def getMetricMaps(context: DQContext): Seq[Map[String, Any]] = {
try {
- val pdf = context.sqlContext.table(s"`${inputName}`")
+ val pdf = context.sparkSession.table(s"`${inputName}`")
val records = pdf.toJSON.collect()
if (records.size > 0) {
records.flatMap { rec =>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
index 3dcbe90..44b05ff 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
@@ -76,7 +76,7 @@
private def getDataFrame(context: DQContext, name: String): Option[DataFrame] = {
try {
- val df = context.sqlContext.table(s"`${name}`")
+ val df = context.sparkSession.table(s"`${name}`")
Some(df)
} catch {
case e: Throwable =>
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index ae0bfb8..0dec789 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -47,10 +47,9 @@
val logLevel = getGriffinLogLevel()
sc.setLogLevel(sparkParam.getLogLevel)
griffinLogger.setLevel(logLevel)
- val sqlContext = spark.sqlContext
// register udf
- GriffinUDFAgent.register(sqlContext)
+ GriffinUDFAgent.register(spark)
}
}