blob: 2c2e9548f0c99123a1b1c39d9c025f83a52d1933 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import java.text.SimpleDateFormat
import java.util.Date
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{Job, JobID}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{Filter, HadoopFsRelation, OutputWriterFactory}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.util.DataTypeUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
import org.apache.carbondata.spark.rdd.CarbonRDD
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
private[sql] case class CarbonDatasourceHadoopRelation(
sqlContext: SQLContext,
paths: Array[String],
parameters: Map[String, String],
tableSchema: Option[StructType])
extends HadoopFsRelation {
lazy val schemaPath = new Path(CarbonTablePath.getSchemaFilePath(paths.head))
if (!schemaPath.getFileSystem(new Configuration).exists(schemaPath)) {
throw new IllegalArgumentException("invalid CarbonData file path: " + paths.head)
}
lazy val job = new Job(new JobConf())
lazy val options = new CarbonOption(parameters)
lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
lazy val relationRaw: CarbonRelation = {
val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
if (carbonTable == null) {
sys.error(s"CarbonData file path ${paths.head} is not valid")
}
CarbonRelation(
carbonTable.getDatabaseName,
carbonTable.getFactTableName,
CarbonSparkUtil.createSparkMeta(carbonTable),
new TableMeta(carbonTable.getCarbonTableIdentifier,
paths.head, absIdentifier.getTablePath, carbonTable),
None
)(sqlContext)
}
override def dataSchema: StructType = tableSchema.getOrElse(relationRaw.schema)
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
// TODO
throw new UnsupportedOperationException
}
override def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputFiles: Array[FileStatus]): RDD[Row] = {
val conf = new Configuration(job.getConfiguration)
filters.flatMap { filter =>
CarbonFilters.createCarbonFilter(dataSchema, filter)
}.reduceOption(new AndExpression(_, _))
.foreach(CarbonTableInputFormat.setFilterPredicates(conf, _))
val projection = new CarbonProjection
requiredColumns.foreach(projection.addColumn)
CarbonTableInputFormat.setColumnProjection(conf, projection)
CarbonTableInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
new SerializableConfiguration(conf),
absIdentifier,
classOf[CarbonTableInputFormat[Row]],
classOf[Row]
)
}
}
class CarbonHadoopFSPartition(rddId: Int, val idx: Int,
val carbonSplit: SerializableWritable[CarbonInputSplit])
extends Partition {
override val index: Int = idx
override def hashCode(): Int = 41 * (41 + rddId) + idx
}
class CarbonHadoopFSRDD[V: ClassTag](
@transient sc: SparkContext,
conf: SerializableConfiguration,
identifier: AbsoluteTableIdentifier,
inputFormatClass: Class[_ <: CarbonTableInputFormat[V]],
valueClass: Class[V])
extends CarbonRDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
formatter.format(new Date())
}
@transient protected val jobId = new JobID(jobTrackerId, id)
override def internalCompute(split: Partition,
context: TaskContext): Iterator[V] = {
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId)
val job: Job = new Job(hadoopAttemptContext.getConfiguration)
val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, job)
CarbonInputFormat.setDataTypeConverter(hadoopAttemptContext.getConfiguration,
new SparkDataTypeConverterImpl)
hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
val reader =
format.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
hadoopAttemptContext
)
reader.initialize(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
hadoopAttemptContext
)
new Iterator[V] {
private[this] var havePair = false
private[this] var finished = false
override def hasNext: Boolean = {
if (context.isInterrupted) {
throw new TaskKilledException
}
if (!finished && !havePair) {
finished = !reader.nextKeyValue
if (finished) {
reader.close()
}
havePair = !finished
}
!finished
}
override def next(): V = {
if (!hasNext) {
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
reader.getCurrentValue
}
}
}
override protected def getPartitions: Array[Partition] = {
val jobContext = newJobContext(conf.value, jobId)
val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, new Job(conf.value))
jobContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
val splits = format.getSplits(jobContext).toArray
val carbonInputSplits = splits
.map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit]))
carbonInputSplits.zipWithIndex.map(f => new CarbonHadoopFSPartition(id, f._2, f._1))
}
}