blob: 9ae0b523dd67b1e788126d58da17fa411f19901d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.carbondata.execution.datasources
import java.util
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.{AtomicType, StructType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, HDFSCarbonFile}
import org.apache.carbondata.core.index.IndexFilter
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression}
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}
import org.apache.carbondata.spark.util.CarbonSparkUtil
/**
* Its a custom implementation which uses carbon's driver pruning feature to prune carbondata files
* using carbonindex.
*/
case class CarbonFileIndex(
sparkSession: SparkSession,
dataSchema: StructType,
parameters: Map[String, String],
fileIndex: FileIndex)
extends FileIndex with AbstractCarbonFileIndex {
// When this flag is set it just returns empty files during pruning. It is needed for carbon
// session partition flow as we handle directly through indexSchema pruning.
private var actAsDummy = false
override def rootPaths: Seq[Path] = fileIndex.rootPaths
override def inputFiles: Array[String] = fileIndex.inputFiles
override def refresh(): Unit = fileIndex.refresh()
override def sizeInBytes: Long = fileIndex.sizeInBytes
override def partitionSchema: StructType = fileIndex.partitionSchema
/**
* It lists the pruned files after applying partition and data filters.
*
* @param partitionFilters
* @param dataFilters
* @return
*/
override def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
if (actAsDummy) {
return Seq.empty
}
val method = fileIndex.getClass.getMethods.find(_.getName == "listFiles").get
val directories =
method.invoke(
fileIndex,
partitionFilters,
dataFilters).asInstanceOf[Seq[PartitionDirectory]]
prune(dataFilters, directories)
}
private def prune(dataFilters: Seq[Expression],
directories: Seq[PartitionDirectory]): Seq[PartitionDirectory] = {
// set the driver flag to true which will used for unsafe memory initialization and carbon LRU
// cache instance initialization as per teh driver memory
CarbonProperties.getInstance
.addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
val tablePath = parameters.get("path")
if (tablePath.nonEmpty && dataFilters.nonEmpty) {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
ThreadLocalSessionInfo.setConfigurationToCurrentThread(hadoopConf)
// convert t sparks source filter
val filters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
val dataTypeMap = dataSchema.map(f => f.name -> f.dataType).toMap
// convert to carbon filter expressions
val filter: Option[CarbonExpression] = filters.filterNot{ ref =>
ref.references.exists{ p =>
!dataTypeMap(p).isInstanceOf[AtomicType]
}
}.flatMap { filter =>
CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
}.reduceOption(new AndExpression(_, _))
val model = CarbonSparkDataSourceUtil.prepareLoadModel(parameters, dataSchema)
CarbonInputFormat.setTableInfo(
hadoopConf,
model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
CarbonInputFormat.setTransactionalTable(hadoopConf, false)
var totalFiles = 0
val indexFiles = directories.flatMap { dir =>
totalFiles += dir.files.length
dir.files.filter{f =>
f.getPath.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
f.getPath.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)}.
map(new HDFSCarbonFile(_))
}.toArray.asInstanceOf[Array[CarbonFile]]
if (indexFiles.length == 0 && totalFiles > 0) {
return directories
}
CarbonInputFormat.setReadCommittedScope(
hadoopConf,
new LatestFilesReadCommittedScope(indexFiles, hadoopConf))
filter match {
case Some(c) => CarbonInputFormat
.setFilterPredicates(hadoopConf,
new IndexFilter(model.getCarbonDataLoadSchema.getCarbonTable, c, true))
case None => None
}
val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
val splits = format.getSplits(CarbonSparkUtil.createHadoopJob(hadoopConf))
.asInstanceOf[util.List[CarbonInputSplit]].asScala
val prunedDirs = directories.map { dir =>
val files = dir.files
.filter(d => splits.exists(_.getBlockPath.equalsIgnoreCase(d.getPath.getName)))
PartitionDirectory(dir.values, files)
}
prunedDirs
} else {
directories.map { dir =>
val files = dir.files
.filter(_.getPath.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT))
PartitionDirectory(dir.values, files)
}
}
}
override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
if (actAsDummy) {
return Seq.empty
}
val method = fileIndex.getClass.getMethods.find(_.getName == "listFiles").get
val directories =
method.invoke(fileIndex, filters).asInstanceOf[Seq[PartitionDirectory]]
prune(filters, directories)
}
def setDummy(actDummy: Boolean): Unit = {
actAsDummy = actDummy
}
}
/**
* It is a just class to make compile between spark 2.1 and 2.2
*/
trait AbstractCarbonFileIndex {
def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory]
def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
}