[CARBONDATA-1796] While submitting new job to HadoopRdd, token should be generated for accessing paths

In hadoop secure mode cluster,
while submitting job to hadoopRdd token should be generated for the path in JobConf, else Delegation Token exception will be thrown during load.

This closes #1552
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 776973b..c14e0a7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -21,8 +21,10 @@
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.NewHadoopRDD
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.command.ExecutionErrors
@@ -58,12 +60,14 @@
       CommonUtil.configureCSVInputFormat(hadoopConf, model)
       hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
       val columnCount = model.getCsvHeaderColumns.length
+      val jobConf = new JobConf(hadoopConf)
+      SparkHadoopUtil.get.addCredentials(jobConf)
       new NewHadoopRDD[NullWritable, StringArrayWritable](
         sc,
         classOf[CSVInputFormat],
         classOf[NullWritable],
         classOf[StringArrayWritable],
-        hadoopConf)
+        jobConf)
         .map(x => DataLoadProcessorStepOnSpark.toStringArrayRow(x._2, columnCount))
     }
 
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index c931c44..1b3b3f9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -30,8 +30,11 @@
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.security.TokenCache
 import org.apache.spark.{Accumulator, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{NewHadoopRDD, RDD}
 import org.apache.spark.sql._
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -362,6 +365,11 @@
     })
     val values = new Array[String](columnNames.length)
     val row = new StringArrayRow(values)
+    val jobConf = new JobConf(hadoopConf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
+      Array[Path](new Path(carbonLoadModel.getFactFilePath)),
+      jobConf)
     val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
       sqlContext.sparkContext,
       classOf[CSVInputFormat],
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 95ccc92..0619851 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -29,9 +29,11 @@
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
 import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
@@ -845,12 +847,14 @@
       CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
       hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
       val columnCount = columns.length
+      val jobConf = new JobConf(hadoopConf)
+      SparkHadoopUtil.get.addCredentials(jobConf)
       new NewHadoopRDD[NullWritable, StringArrayWritable](
         sqlContext.sparkContext,
         classOf[CSVInputFormat],
         classOf[NullWritable],
         classOf[StringArrayWritable],
-        hadoopConf
+        jobConf
       ).map { currentRow =>
         if (null == currentRow || null == currentRow._2) {
           val row = new StringArrayRow(new Array[String](columnCount))