[GRIFFIN-304] Eliminate older contexts

**What changes were proposed in this pull request?**

As SparkSession is a direct replacement for SparkContext, SQLContext and HiveContext, there is no need to pass/ instantiate them. If any of the oder contexts are needed, they can be derived from SparkSession.

This issue aims to eliminate dependency on older Contexts in favour of SparkSession.

**Does this PR introduce any user-facing change?**
No

**How was this patch tested?**
Griffin test suite.

Author: chitralverma <chitralverma@gmail.com>

Closes #557 from chitralverma/eliminate-older-contexts.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index 2fdf409..8069632 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -18,7 +18,7 @@
 */
 package org.apache.griffin.measure.context
 
-import org.apache.spark.sql.{Encoders, SparkSession, SQLContext}
+import org.apache.spark.sql.{Encoders, SparkSession}
 
 import org.apache.griffin.measure.configuration.dqdefinition._
 import org.apache.griffin.measure.configuration.enums._
@@ -37,10 +37,8 @@
                      procType: ProcessType
                     )(@transient implicit val sparkSession: SparkSession) {
 
-  val sqlContext: SQLContext = sparkSession.sqlContext
-
   val compileTableRegister: CompileTableRegister = CompileTableRegister()
-  val runTimeTableRegister: RunTimeTableRegister = RunTimeTableRegister(sqlContext)
+  val runTimeTableRegister: RunTimeTableRegister = RunTimeTableRegister(sparkSession)
 
   val dataFrameCache: DataFrameCache = DataFrameCache()
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
index c4dda3b..d1d9dbe 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
@@ -61,7 +61,7 @@
 /**
   * register table name and create temp view during calculation
   */
-case class RunTimeTableRegister(@transient sqlContext: SQLContext) extends TableRegister {
+case class RunTimeTableRegister(@transient sparkSession: SparkSession) extends TableRegister {
 
   def registerTable(name: String, df: DataFrame): Unit = {
     registerTable(name)
@@ -70,13 +70,13 @@
 
   override def unregisterTable(name: String): Unit = {
     if (existsTable(name)) {
-      sqlContext.dropTempTable(name)
+      sparkSession.catalog.dropTempView(name)
       tables -= name
     }
   }
   override def unregisterAllTables(): Unit = {
     val uts = getTables
-    uts.foreach(t => sqlContext.dropTempTable(t))
+    uts.foreach(t => sparkSession.catalog.dropTempView(t))
     tables.clear
   }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
index 28e616b..1b1bcf0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
@@ -52,7 +52,7 @@
 
     // for streaming data cache
     val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
-      sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage)
+      sparkSession, dataSourceParam.getCheckpointOpt, name, index, timestampStorage)
 
     val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
       DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index a03a468..5bfe41a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -43,7 +43,7 @@
 trait StreamingCacheClient
   extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable {
 
-  val sqlContext: SQLContext
+  val sparkSession: SparkSession
   val param: Map[String, Any]
   val dsName: String
   val index: Int
@@ -181,7 +181,7 @@
 
     // new cache data
     val newDfOpt = try {
-      val dfr = sqlContext.read
+      val dfr = sparkSession.read
       readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
     } catch {
       case e: Throwable =>
@@ -194,7 +194,7 @@
     val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
       val oldDfPath = s"${oldFilePath}/${idx}"
       try {
-        val dfr = sqlContext.read
+        val dfr = sparkSession.read
         readDataFrameOpt(dfr, oldDfPath).map(_.filter(filterStr))
       } catch {
         case e: Throwable =>
@@ -329,7 +329,7 @@
               val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}"
               val updateDf = df.filter(filterStr)
 
-              val prlCount = sqlContext.sparkContext.defaultParallelism
+              val prlCount = sparkSession.sparkContext.defaultParallelism
               // repartition
               val repartitionedDf = updateDf.repartition(prlCount)
               val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
index eeda8ef..1948438 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
@@ -18,7 +18,7 @@
 */
 package org.apache.griffin.measure.datasource.cache
 
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
 
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.datasource.TimestampStorage
@@ -37,14 +37,14 @@
 
   /**
     * create streaming cache client
-    * @param sqlContext     sqlContext in spark environment
+    * @param sparkSession     sparkSession in spark environment
     * @param checkpointOpt  data source checkpoint/cache config option
     * @param name           data source name
     * @param index          data source index
     * @param tmstCache      the same tmstCache instance inside a data source
     * @return               streaming cache client option
     */
-  def getClientOpt(sqlContext: SQLContext, checkpointOpt: Option[Map[String, Any]],
+  def getClientOpt(sparkSession: SparkSession, checkpointOpt: Option[Map[String, Any]],
                    name: String, index: Int, tmstCache: TimestampStorage
                   ): Option[StreamingCacheClient] = {
     checkpointOpt.flatMap { param =>
@@ -52,13 +52,13 @@
         val tp = param.getString(_type, "")
         val dsCache = tp match {
           case ParquetRegex() =>
-            StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+            StreamingCacheParquetClient(sparkSession, param, name, index, tmstCache)
           case JsonRegex() =>
-            StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
+            StreamingCacheJsonClient(sparkSession, param, name, index, tmstCache)
           case OrcRegex() =>
-            StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
+            StreamingCacheOrcClient(sparkSession, param, name, index, tmstCache)
           case _ =>
-            StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+            StreamingCacheParquetClient(sparkSession, param, name, index, tmstCache)
         }
         Some(dsCache)
       } catch {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
index c81d4d1..8db918a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
@@ -25,7 +25,7 @@
 /**
   * data source cache in json format
   */
-case class StreamingCacheJsonClient(sqlContext: SQLContext, param: Map[String, Any],
+case class StreamingCacheJsonClient(sparkSession: SparkSession, param: Map[String, Any],
                                     dsName: String, index: Int, timestampStorage: TimestampStorage
                               ) extends StreamingCacheClient {
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
index 0649b74..5707cfa 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
@@ -25,7 +25,7 @@
 /**
   * data source cache in orc format
   */
-case class StreamingCacheOrcClient(sqlContext: SQLContext, param: Map[String, Any],
+case class StreamingCacheOrcClient(sparkSession: SparkSession, param: Map[String, Any],
                                    dsName: String, index: Int, timestampStorage: TimestampStorage
                              ) extends StreamingCacheClient {
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
index 9c369ee..699ccc2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
@@ -25,14 +25,14 @@
 /**
   * data source cache in parquet format
   */
-case class StreamingCacheParquetClient(sqlContext: SQLContext,
+case class StreamingCacheParquetClient(sparkSession: SparkSession,
                                        param: Map[String, Any],
                                        dsName: String,
                                        index: Int,
                                        timestampStorage: TimestampStorage
                                  ) extends StreamingCacheClient {
 
-  sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
+  sparkSession.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
 
   protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
     info(s"write path: ${path}")
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
index 85b4774..d37f51b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -98,7 +98,7 @@
 
 //  def metaData(): Try[Iterable[(String, String)]] = {
 //    Try {
-//      val st = sqlContext.read.format("com.databricks.spark.avro").
+//      val st = sparkSession.read.format("com.databricks.spark.avro").
   //       load(concreteFileFullPath).schema
 //      st.fields.map(f => (f.name, f.dataType.typeName))
 //    }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index c05d043..a4a1d37 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -23,7 +23,7 @@
 import scala.util.Try
 
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.SparkSession
 
 import org.apache.griffin.measure.configuration.dqdefinition._
 import org.apache.griffin.measure.configuration.enums._
@@ -43,7 +43,6 @@
   val metricName = dqParam.getName
   val sinkParams = getSinkParams
 
-  var sqlContext: SQLContext = _
   var dqContext: DQContext = _
 
   def retryable: Boolean = false
@@ -57,10 +56,9 @@
     val logLevel = getGriffinLogLevel()
     sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
     griffinLogger.setLevel(logLevel)
-    sqlContext = sparkSession.sqlContext
 
     // register udf
-    GriffinUDFAgent.register(sqlContext)
+    GriffinUDFAgent.register(sparkSession)
   }
 
   def run: Try[Boolean] = Try {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index be32eba..57e6f82 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -24,7 +24,7 @@
 import scala.util.Try
 
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 
 import org.apache.griffin.measure.Loggable
@@ -49,8 +49,6 @@
   val metricName = dqParam.getName
   val sinkParams = getSinkParams
 
-  var sqlContext: SQLContext = _
-
   def retryable: Boolean = true
 
   def init: Try[_] = Try {
@@ -62,7 +60,6 @@
     val logLevel = getGriffinLogLevel()
     sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
     griffinLogger.setLevel(logLevel)
-    sqlContext = sparkSession.sqlContext
 
     // clear checkpoint directory
     clearCpDir
@@ -72,7 +69,7 @@
     OffsetCheckpointClient.init
 
     // register udf
-    GriffinUDFAgent.register(sqlContext)
+    GriffinUDFAgent.register(sparkSession)
   }
 
   def run: Try[Boolean] = Try {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
index 61b93ab..baa7a8c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
@@ -18,12 +18,12 @@
 */
 package org.apache.griffin.measure.step.builder.udf
 
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
 
 object GriffinUDFAgent {
-  def register(sqlContext: SQLContext): Unit = {
-    GriffinUDFs.register(sqlContext)
-    GriffinUDAggFs.register(sqlContext)
+  def register(sparkSession: SparkSession): Unit = {
+    GriffinUDFs.register(sparkSession)
+    GriffinUDAggFs.register(sparkSession)
   }
 }
 
@@ -32,10 +32,10 @@
   */
 object GriffinUDFs {
 
-  def register(sqlContext: SQLContext): Unit = {
-    sqlContext.udf.register("index_of", indexOf _)
-    sqlContext.udf.register("matches", matches _)
-    sqlContext.udf.register("reg_replace", regReplace _)
+  def register(sparkSession: SparkSession): Unit = {
+    sparkSession.udf.register("index_of", indexOf _)
+    sparkSession.udf.register("matches", matches _)
+    sparkSession.udf.register("reg_replace", regReplace _)
   }
 
   private def indexOf(arr: Seq[String], v: String) = {
@@ -57,7 +57,7 @@
   */
 object GriffinUDAggFs {
 
-  def register(sqlContext: SQLContext): Unit = {
+  def register(sparkSession: SparkSession): Unit = {
   }
 
 }
\ No newline at end of file
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
index ba64d33..8678db5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
@@ -20,8 +20,9 @@
 
 import java.util.Date
 
-import org.apache.spark.sql.{Encoders, Row, SQLContext, _}
+import org.apache.spark.sql.{Encoders, Row, _}
 import org.apache.spark.sql.types._
+
 import org.apache.griffin.measure.context.ContextId
 import org.apache.griffin.measure.context.streaming.metric._
 import org.apache.griffin.measure.context.streaming.metric.CacheResults.CacheResult
@@ -45,7 +46,7 @@
     val _matchedFraction = "matchedFraction"
   }
 
-  def fromJson(sqlContext: SQLContext,
+  def fromJson(sparkSession: SparkSession,
                inputDfName: String,
                details: Map[String, Any]): DataFrame = {
     val _colName = "col.name"
@@ -53,15 +54,15 @@
 
     implicit val encoder = Encoders.STRING
 
-    val df: DataFrame = sqlContext.table(s"`${inputDfName}`")
+    val df: DataFrame = sparkSession.table(s"`${inputDfName}`")
     val rdd = colNameOpt match {
       case Some(colName: String) => df.map(r => r.getAs[String](colName))
       case _ => df.map(_.getAs[String](0))
     }
-    sqlContext.read.json(rdd) // slow process
+    sparkSession.read.json(rdd) // slow process
   }
 
-  def accuracy(sqlContext: SQLContext,
+  def accuracy(sparkSession: SparkSession,
                inputDfName: String,
                contextId: ContextId,
                details: Map[String, Any]): DataFrame = {
@@ -82,7 +83,7 @@
       }
     }
 
-    val df = sqlContext.table(s"`${inputDfName}`")
+    val df = sparkSession.table(s"`${inputDfName}`")
 
     val results = df.rdd.flatMap { row =>
       try {
@@ -116,16 +117,16 @@
       val ar = r.result.asInstanceOf[AccuracyMetric]
       Row(r.timeStamp, ar.miss, ar.total, ar.getMatch, ar.matchFraction, !ar.initial, ar.eventual)
     }.toArray
-    val rowRdd = sqlContext.sparkContext.parallelize(rows)
-    val retDf = sqlContext.createDataFrame(rowRdd, schema)
+    val rowRdd = sparkSession.sparkContext.parallelize(rows)
+    val retDf = sparkSession.createDataFrame(rowRdd, schema)
 
     retDf
   }
 
-  def clear(sqlContext: SQLContext, inputDfName: String, details: Map[String, Any]): DataFrame = {
-    val df = sqlContext.table(s"`${inputDfName}`")
-    val emptyRdd = sqlContext.sparkContext.emptyRDD[Row]
-    sqlContext.createDataFrame(emptyRdd, df.schema)
+  def clear(sparkSession: SparkSession, inputDfName: String, details: Map[String, Any]): DataFrame = {
+    val df = sparkSession.table(s"`${inputDfName}`")
+    val emptyRdd = sparkSession.sparkContext.emptyRDD[Row]
+    sparkSession.createDataFrame(emptyRdd, df.schema)
   }
 
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index c393706..c5bcb13 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -33,14 +33,14 @@
                                     ) extends TransformStep {
 
   def doExecute(context: DQContext): Boolean = {
-    val sqlContext = context.sqlContext
+    val sparkSession = context.sparkSession
     try {
       val df = rule match {
-        case DataFrameOps._fromJson => DataFrameOps.fromJson(sqlContext, inputDfName, details)
+        case DataFrameOps._fromJson => DataFrameOps.fromJson(sparkSession, inputDfName, details)
         case DataFrameOps._accuracy =>
-          DataFrameOps.accuracy(sqlContext, inputDfName, context.contextId, details)
+          DataFrameOps.accuracy(sparkSession, inputDfName, context.contextId, details)
 
-        case DataFrameOps._clear => DataFrameOps.clear(sqlContext, inputDfName, details)
+        case DataFrameOps._clear => DataFrameOps.clear(sparkSession, inputDfName, details)
         case _ => throw new Exception(s"df opr [ ${rule} ] not supported")
       }
       if (cache) context.dataFrameCache.cacheDataFrame(name, df)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index 00edf07..e5c18e3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -31,9 +31,9 @@
                                                  cache: Boolean = false
                                                 ) extends TransformStep {
   def doExecute(context: DQContext): Boolean = {
-    val sqlContext = context.sqlContext
+    val sparkSession = context.sparkSession
     try {
-      val df = sqlContext.sql(rule)
+      val df = sparkSession.sql(rule)
       if (cache) context.dataFrameCache.cacheDataFrame(name, df)
       context.runTimeTableRegister.registerTable(name, df)
       writeStepOpt match {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
index d2805cf..92f6c00 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
@@ -47,7 +47,7 @@
 
   private def getDataFrame(context: DQContext, name: String): Option[DataFrame] = {
     try {
-      val df = context.sqlContext.table(s"`${name}`")
+      val df = context.sparkSession.table(s"`${name}`")
       Some(df)
     } catch {
       case e: Throwable =>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index c7ebae7..3f3eaf5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -75,7 +75,7 @@
 
   private def getMetricMaps(context: DQContext): Seq[Map[String, Any]] = {
     try {
-      val pdf = context.sqlContext.table(s"`${inputName}`")
+      val pdf = context.sparkSession.table(s"`${inputName}`")
       val records = pdf.toJSON.collect()
       if (records.size > 0) {
         records.flatMap { rec =>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
index 3dcbe90..44b05ff 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
@@ -76,7 +76,7 @@
 
   private def getDataFrame(context: DQContext, name: String): Option[DataFrame] = {
     try {
-      val df = context.sqlContext.table(s"`${name}`")
+      val df = context.sparkSession.table(s"`${name}`")
       Some(df)
     } catch {
       case e: Throwable =>
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index ae0bfb8..0dec789 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -47,10 +47,9 @@
       val logLevel = getGriffinLogLevel()
       sc.setLogLevel(sparkParam.getLogLevel)
       griffinLogger.setLevel(logLevel)
-      val sqlContext = spark.sqlContext
 
       // register udf
-      GriffinUDFAgent.register(sqlContext)
+      GriffinUDFAgent.register(spark)
     }
   }