blob: bf3f862cfbb60d12962e1ace3d90fe8575a95b80 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.rdd
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util._
/**
* This RDD maintains session level ThreadLocal
*/
abstract class CarbonRDD[T: ClassTag](
@transient private val ss: SparkSession,
@transient private var deps: Seq[Dependency[_]]) extends RDD[T](ss.sparkContext, deps) {
@transient val sparkAppName: String = ss.sparkContext.appName
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkAppName)
val carbonSessionInfo: CarbonSessionInfo = {
var info = ThreadLocalSessionInfo.getCarbonSessionInfo
if (info == null || info.getSessionParams == null) {
info = new CarbonSessionInfo
info.setSessionParams(new SessionParams())
}
info.getSessionParams.addProps(CarbonProperties.getInstance().getAddedProperty)
info
}
val inputMetricsInterval: Long = CarbonProperties.getInputMetricsInterval
@transient val hadoopConf = SparkSQLUtil.sessionState(ss).newHadoopConf()
val config = SparkSQLUtil.broadCastHadoopConf(sparkContext, hadoopConf)
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) =
this (sparkSession, List(new OneToOneDependency(oneParent)))
protected def internalGetPartitions: Array[Partition]
override def getPartitions: Array[Partition] = {
ThreadLocalSessionInfo.setConfigurationToCurrentThread(hadoopConf)
internalGetPartitions
}
// RDD compute logic should be here
def internalCompute(split: Partition, context: TaskContext): Iterator[T]
final def compute(split: Partition, context: TaskContext): Iterator[T] = {
TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll())
carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", getConf)
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
TaskMetricsMap.initializeThreadLocal()
val carbonTaskInfo = new CarbonTaskInfo
carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
carbonSessionInfo.getSessionParams.getAddedProps.asScala.
map(f => CarbonProperties.getInstance().addProperty(f._1, f._2))
internalCompute(split, context)
}
def getConf: Configuration = {
config.value.value
}
}
/**
* This RDD contains TableInfo object which is serialized and deserialized in driver and executor
*/
abstract class CarbonRDDWithTableInfo[T: ClassTag](
@transient private val ss: SparkSession,
@transient private var deps: Seq[Dependency[_]],
serializedTableInfo: Array[Byte]) extends CarbonRDD[T](ss, deps) {
def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_],
serializedTableInfo: Array[Byte]) = {
this (sparkSession, List(new OneToOneDependency(oneParent)), serializedTableInfo)
}
def getTableInfo: TableInfo = TableInfo.deserialize(serializedTableInfo)
}