[CARBONDATA-2990] Queries slow down after some time due to broadcast issue

Problem
It is observed that during consecutive run of queries after some time queries are slowing down. This is causing the degrade in query performance.
No exception is thrown in driver and executor logs but as observed from the logs the time to broadcast hadoop conf is increasing after every query run.

Analysis

This is happening because in carbon SerializableConfiguration class is overriden from spark. Spark registers this class with Kryo serializer and hence the computation using the kryo is fast. The same benefit is not observed in carbondata becuase of overriding the class.
Internal Spark sizeEstimator calculates the size of object and there are few extra objects in carbondata overriden class because of which the computation time is increasing.
Solution
Use the spark class instead of overriding the class in carbondata

This closes #2803
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index 8d6dd32..5511645 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -34,6 +34,7 @@
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -41,7 +42,6 @@
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
 import org.apache.carbondata.spark.util.CommonUtil
 
 object CsvRDDHelper {
@@ -110,7 +110,7 @@
     closePartition()
 
     // 2. read function
-    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+    val serializableConfiguration = SparkSQLUtil.getSerializableConfigurableInstance(hadoopConf)
     val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
       override def apply(file: PartitionedFile): Iterator[InternalRow] = {
         new Iterator[InternalRow] {
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 2e74a94..923676c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -23,6 +23,7 @@
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -35,7 +36,6 @@
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 /**
  * Use sortBy operator in spark to load the data
@@ -66,7 +66,7 @@
     val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
     val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
 
-    val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+    val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
     // 1. Input
     val inputRDD = originRDD
       .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
@@ -121,7 +121,7 @@
     // 4. Write
     sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
       DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast,
-        writeStepRowCounter, conf))
+        writeStepRowCounter, conf.value.value))
 
     // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
     // not have any functional impact as spark automatically monitors the cache usage on each node
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index f17bd91..f1a12bf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -42,7 +42,7 @@
 import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
 import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
-import org.apache.carbondata.spark.rdd.{NewRddIterator, SerializableConfiguration, StringArrayRow}
+import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
 
 object DataLoadProcessorStepOnSpark {
@@ -230,8 +230,8 @@
       index: Int,
       modelBroadcast: Broadcast[CarbonLoadModel],
       rowCounter: Accumulator[Int],
-      conf: Broadcast[SerializableConfiguration]) {
-    ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+      conf: Configuration) {
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf)
     var model: CarbonLoadModel = null
     var tableName: String = null
     var rowConverter: RowConverterImpl = null
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 3a02f85..83cd59c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -22,7 +22,6 @@
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
-import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.util.SparkSQLUtil
@@ -49,8 +48,7 @@
 
   @transient val hadoopConf = SparkSQLUtil.sessionState(ss).newHadoopConf()
 
-  val config: Broadcast[SerializableConfiguration] = sparkContext
-    .broadcast(new SerializableConfiguration(hadoopConf))
+  val config = SparkSQLUtil.broadCastHadoopConf(sparkContext, hadoopConf)
 
   /** Construct an RDD with just a one-to-one dependency on one parent */
   def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) =
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 87c8e4c..6076e4a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -54,39 +54,6 @@
 import org.apache.carbondata.spark.DataLoadResult
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
-class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
-
-  @transient
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  private def writeObject(out: ObjectOutputStream): Unit =
-    try {
-      out.defaultWriteObject()
-      value.write(out)
-    } catch {
-      case e: IOException =>
-        LOGGER.error(e, "Exception encountered")
-        throw e
-      case NonFatal(e) =>
-        LOGGER.error(e, "Exception encountered")
-        throw new IOException(e)
-    }
-
-
-  private def readObject(in: ObjectInputStream): Unit =
-    try {
-      value = new Configuration(false)
-      value.readFields(in)
-    } catch {
-      case e: IOException =>
-        LOGGER.error(e, "Exception encountered")
-        throw e
-      case NonFatal(e) =>
-        LOGGER.error(e, "Exception encountered")
-        throw new IOException(e)
-    }
-}
-
 /**
  * This partition class use to split by Host
  *
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index b7d47a0..9ffe6e1 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -19,11 +19,14 @@
 
 import java.lang.reflect.Method
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkContext
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
-import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
+import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil}
 
 object SparkSQLUtil {
   def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
@@ -99,4 +102,20 @@
       throw new UnsupportedOperationException("Spark version not supported")
     }
   }
+
+  /**
+   * Method to broadcast a variable using spark SerializableConfiguration class
+   *
+   * @param sparkContext
+   * @param hadoopConf
+   * @return
+   */
+  def broadCastHadoopConf(sparkContext: SparkContext,
+      hadoopConf: Configuration): Broadcast[SerializableConfiguration] = {
+    sparkContext.broadcast(getSerializableConfigurableInstance(hadoopConf))
+  }
+
+  def getSerializableConfigurableInstance(hadoopConf: Configuration): SerializableConfiguration = {
+    new SerializableConfiguration(hadoopConf)
+  }
 }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 6350b50..0ec3bc6 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -43,7 +43,7 @@
 import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil}
 
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -728,7 +728,7 @@
 
       // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism,
       // so segmentIdIndex=partitionId/parallelism, this has been verified.
-      val conf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+      val conf = SparkSQLUtil.broadCastHadoopConf(sqlContext.sparkSession.sparkContext, hadoopConf)
       partitionByRdd.map(_._2).mapPartitions { partition =>
         ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
         val partitionId = TaskContext.getPartitionId()
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index d0ed56e..ff7ac60 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -30,7 +30,7 @@
 import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.util.SparkTypeConverter
+import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
@@ -44,7 +44,7 @@
 import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, SerializableConfiguration}
+import org.apache.carbondata.spark.rdd.CarbonRDDWithTableInfo
 
 /**
  * It decodes the data.
@@ -76,8 +76,8 @@
         (carbonTable.getTableName, carbonTable)
       }.toMap
 
-      val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
-        .sessionState.newHadoopConf()))
+      val conf = SparkSQLUtil
+        .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
       if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
         val dataTypes = child.output.map { attr => attr.dataType }
         child.execute().mapPartitions { iter =>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index f7a5f42..43c8b86 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -42,6 +42,7 @@
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
@@ -78,7 +79,7 @@
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark}
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, SerializableConfiguration}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
 
 case class CarbonLoadDataCommand(
@@ -986,8 +987,8 @@
           array
         }
       }
-    val conf = sparkSession.sparkContext
-      .broadcast(new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()))
+    val conf = SparkSQLUtil
+      .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
     val finalRDD = convertRDD.mapPartitionsWithIndex { case(index, rows) =>
         DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
       ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 4921b33..7e7f671 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -32,6 +32,7 @@
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -49,7 +50,6 @@
 import org.apache.carbondata.processing.exception.MultipleMatchingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.spark.DeleteDelataResultImpl
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 object DeleteExecution {
   val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
@@ -120,9 +120,8 @@
         blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
         keyRdd.partitions.length)
 
-    val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
-      .sessionState.newHadoopConf()))
-
+    val conf = SparkSQLUtil
+      .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
     val rdd = rowContRdd.join(keyRdd)
     res = rdd.mapPartitionsWithIndex(
       (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) =>
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 66066ed..35fc3c3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -26,16 +26,16 @@
 import org.apache.spark.sql.execution.command.AlterTableModel
 import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
 import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
-import org.apache.carbondata.core.util.{ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 object HorizontalCompaction {
 
@@ -191,8 +191,8 @@
 
       val timestamp = factTimeStamp
       val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails
-      val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
-        .sessionState.newHadoopConf()))
+      val conf = SparkSQLUtil
+        .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
       val result = rdd1.mapPartitions(iter =>
         new Iterator[Seq[CarbonDataMergerUtilResult]] {
           ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)