blob: 2daece38955824f385c2f74c681eb3569c902f6e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import java.net.URI
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextOutputWriter
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types.{AtomicType, StructField, StructType}
import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment}
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, ColumnarFormatVersion}
import org.apache.carbondata.core.reader.CarbonHeaderReader
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.scan.model.QueryModel
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats}
import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, DataMapJob}
import org.apache.carbondata.spark.util.CarbonScalaUtil
@InterfaceAudience.User
@InterfaceStability.Evolving
class SparkCarbonFileFormat extends FileFormat
with DataSourceRegister
with Logging
with Serializable {
@transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
override def inferSchema(sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val filePaths = CarbonUtil.getFilePathExternalFilePath(
options.get("path").get)
if (filePaths.size() == 0) {
throw new SparkException("CarbonData file is not present in the location mentioned in DDL")
}
val carbonHeaderReader: CarbonHeaderReader = new CarbonHeaderReader(filePaths.get(0))
val fileHeader = carbonHeaderReader.readHeader
val table_columns: java.util.List[org.apache.carbondata.format.ColumnSchema] = fileHeader
.getColumn_schema
var colArray = ArrayBuffer[StructField]()
for (i <- 0 to table_columns.size() - 1) {
val col = CarbonUtil.thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i))
colArray += (new StructField(col.getColumnName,
CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType), false))
}
colArray.+:(Nil)
Some(StructType(colArray))
}
override def prepareWrite(sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
new OutputWriterFactory {
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new TextOutputWriter(path, dataSchema, context)
}
override def getFileExtension(context: TaskAttemptContext): String = {
CarbonTablePath.CARBON_DATA_EXT
}
}
}
override def shortName(): String = "carbonfile"
override def toString: String = "carbonfile"
override def hashCode(): Int = getClass.hashCode()
override def equals(other: Any): Boolean = other.isInstanceOf[SparkCarbonFileFormat]
def supportVector(sparkSession: SparkSession, schema: StructType): Boolean = {
val vectorizedReader = {
if (sparkSession.sqlContext.sparkSession.conf
.contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
} else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
} else {
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
}
}
vectorizedReader.toBoolean
}
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
conf.wholeStageEnabled &&
schema.length <= conf.wholeStageMaxNumFields &&
schema.forall(_.dataType.isInstanceOf[AtomicType])
}
def createVectorizedCarbonRecordReader(queryModel: QueryModel,
inputMetricsStats: InputMetricsStats, enableBatch: String): RecordReader[Void, Object] = {
val name = "org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader"
try {
val cons = Class.forName(name).getDeclaredConstructors
cons.head.setAccessible(true)
cons.head.newInstance(queryModel, inputMetricsStats, enableBatch)
.asInstanceOf[RecordReader[Void, Object]]
} catch {
case e: Exception =>
LOGGER.error(e)
null
}
}
override def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
val filter : Option[Expression] = filters.flatMap { filter =>
CarbonFilters.createCarbonFilter(dataSchema, filter)
}.reduceOption(new AndExpression(_, _))
val projection = requiredSchema.map(_.name).toArray
val carbonProjection = new CarbonProjection
projection.foreach(carbonProjection.addColumn)
val conf = new Configuration()
val jobConf = new JobConf(conf)
SparkHadoopUtil.get.addCredentials(jobConf)
val job = Job.getInstance(jobConf)
var supportBatchValue: Boolean = false
val readVector = supportVector(sparkSession, dataSchema)
if (readVector) {
supportBatchValue = supportBatch(sparkSession, dataSchema)
}
CarbonInputFormat.setTableName(job.getConfiguration, "externaldummy")
CarbonInputFormat.setDatabaseName(job.getConfiguration, "default")
CarbonMetadata.getInstance.removeTable("default_externaldummy")
val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
if (file.filePath.endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
val fileSplit =
new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
val path: String = options.get("path").get
val endindex: Int = path.indexOf("Fact") - 1
val tablePath = path.substring(0, endindex)
lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
tablePath,
"default",
"externaldummy")
val split = CarbonInputSplit.from("null", "0", fileSplit, ColumnarFormatVersion.V3, null)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val conf1 = new Configuration()
conf1.set("mapreduce.input.carboninputformat.tableName", "externaldummy")
conf1.set("mapreduce.input.carboninputformat.databaseName", "default")
conf1.set("mapreduce.input.fileinputformat.inputdir", tablePath)
CarbonInputFormat.setColumnProjection(conf1, carbonProjection)
filter match {
case Some(c) => CarbonInputFormat.setFilterPredicates(conf1, c)
case None => None
}
val attemptContext = new TaskAttemptContextImpl(conf1, attemptId)
val model = format.createQueryModel(split, attemptContext)
var segments = new java.util.ArrayList[Segment]()
val seg = new Segment("null", null)
segments.add(seg)
var partition : java.util.List[PartitionSpec] = new java.util.ArrayList[PartitionSpec]()
val segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null")
val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentPath)
if (indexFiles.size() == 0) {
throw new SparkException("Index file not present to read the carbondata file")
}
val tab = model.getTable
DataMapStoreManager.getInstance().clearDataMaps(identifier)
val dataMapExprWrapper = DataMapChooser.get
.choose(tab, model.getFilterExpressionResolverTree)
// TODO : handle the partition for CarbonFileLevelFormat
val prunedBlocklets = dataMapExprWrapper.prune(segments, null)
val detailInfo = prunedBlocklets.get(0).getDetailInfo
detailInfo.readColumnSchema(detailInfo.getColumnSchemaBinary)
split.setDetailInfo(detailInfo)
val carbonReader = if (readVector) {
val vectorizedReader = createVectorizedCarbonRecordReader(model,
null,
supportBatchValue.toString)
vectorizedReader.initialize(split, attemptContext)
logDebug(s"Appending $partitionSchema ${ file.partitionValues }")
vectorizedReader
} else {
val reader = new CarbonRecordReader(model,
format.getReadSupportClass(attemptContext.getConfiguration), null)
reader.initialize(split, attemptContext)
reader
}
val iter = new RecordReaderIterator(carbonReader)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
iter.asInstanceOf[Iterator[InternalRow]]
}
else {
Iterator.empty
}
}
}
}