[CARBONDATA-1796] While submitting new job to HadoopRdd, token should be generated for accessing paths
In hadoop secure mode cluster,
while submitting job to hadoopRdd token should be generated for the path in JobConf, else Delegation Token exception will be thrown during load.
This closes #1552
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 776973b..c14e0a7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -21,8 +21,10 @@
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.command.ExecutionErrors
@@ -58,12 +60,14 @@
CommonUtil.configureCSVInputFormat(hadoopConf, model)
hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
val columnCount = model.getCsvHeaderColumns.length
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
new NewHadoopRDD[NullWritable, StringArrayWritable](
sc,
classOf[CSVInputFormat],
classOf[NullWritable],
classOf[StringArrayWritable],
- hadoopConf)
+ jobConf)
.map(x => DataLoadProcessorStepOnSpark.toStringArrayRow(x._2, columnCount))
}
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index c931c44..1b3b3f9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -30,8 +30,11 @@
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.security.TokenCache
import org.apache.spark.{Accumulator, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -362,6 +365,11 @@
})
val values = new Array[String](columnNames.length)
val row = new StringArrayRow(values)
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
+ Array[Path](new Path(carbonLoadModel.getFactFilePath)),
+ jobConf)
val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
sqlContext.sparkContext,
classOf[CSVInputFormat],
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 95ccc92..0619851 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -29,9 +29,11 @@
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
@@ -845,12 +847,14 @@
CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
val columnCount = columns.length
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
new NewHadoopRDD[NullWritable, StringArrayWritable](
sqlContext.sparkContext,
classOf[CSVInputFormat],
classOf[NullWritable],
classOf[StringArrayWritable],
- hadoopConf
+ jobConf
).map { currentRow =>
if (null == currentRow || null == currentRow._2) {
val row = new StringArrayRow(new Array[String](columnCount))