* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.spark.sql.execution
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.kylin.common.KylinConfig
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, ExpressionUtils, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.datasource.{FilePruner, ShardSpec}
import org.apache.spark.sql.types.StructType
import scala.collection.mutable.ArrayBuffer
class KylinFileSourceScanExec(
@transient override val relation: HadoopFsRelation,
override val output: Seq[Attribute],
override val requiredSchema: StructType,
override val partitionFilters: Seq[Expression],
val optionalShardSpec: Option[ShardSpec],
override val dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier]) extends FileSourceScanExec(
relation, output, requiredSchema, partitionFilters, None, dataFilters, tableIdentifier) {
@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
metrics("numFiles") :: metrics("metadataTime") :: Nil)
private lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
optionalShardSpec match {
case Some(spec) if KylinConfig.getInstanceFromEnv.isShardingJoinOptEnabled =>
createShardingReadRDD(spec, readFile, selectedPartitions, relation)
case _ =>
createNonShardingReadRDD(readFile, selectedPartitions, relation)
override def inputRDDs(): Seq[RDD[InternalRow]] = {
inputRDD :: Nil
private val pushedDownFilters = dataFilters
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val shardSpec = if (KylinConfig.getInstanceFromEnv.isShardingJoinOptEnabled) {
} else {
shardSpec match {
case Some(spec) =>
def toAttribute(colName: String): Option[Attribute] =
output.find( == colName)
val shardCols = spec.shardColumnNames.flatMap(toAttribute)
val partitioning = if (shardCols.size == spec.shardColumnNames.size) {
HashPartitioning(shardCols, spec.numShards)
} else {
val sortColumns =
val sortOrder = if (sortColumns.nonEmpty) {, Ascending))
} else {
(partitioning, sortOrder)
case _ =>
(UnknownPartitioning(0), Nil)
* Copied from org.apache.spark.sql.execution.FileSourceScanExec#createBucketedReadRDD
* Create an RDD for sharding reads.
* The non-sharding variant of this function is [[createNonShardingReadRDD]].
* The algorithm is pretty simple: each RDD partition being returned should include all the files
* with the same shard id from all the given Hive partitions.
* @param shardSpec the sharding spec.
* @param readFile a function to read each (part of a) file.
* @param selectedPartitions Hive-style partition that are part of the read.
* @param fsRelation [[HadoopFsRelation]] associated with the read.
private def createShardingReadRDD(
shardSpec: ShardSpec,
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
logInfo(s"Planning with ${shardSpec.numShards} shards")
val filesToPartitionId =
selectedPartitions.flatMap { p => { f =>
val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
}.groupBy {
f => FilePruner.getPartitionId(new Path(f.filePath))
val filePartitions = Seq.tabulate(shardSpec.numShards) { shardId =>
FilePartition(shardId, filesToPartitionId.getOrElse(shardId, Nil).toArray)
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
* Copied from org.apache.spark.sql.execution.FileSourceScanExec#createNonBucketedReadRDD, no hacking.
* Create an RDD for non-sharding reads.
* The sharding variant of this function is [[createShardingReadRDD]].
* @param readFile a function to read each (part of a) file.
* @param selectedPartitions Hive-style partition that are part of the read.
* @param fsRelation [[HadoopFsRelation]] associated with the read.
private def createNonShardingReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val defaultMaxSplitBytes =
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
logInfo(s"Planning scan with bin packing, max size is: $defaultMaxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
val blockLocations = getBlockLocations(file)
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, file.getPath)) {
(0L until file.getLen by defaultMaxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > defaultMaxSplitBytes) defaultMaxSplitBytes else remaining
val hosts = getBlockHosts(blockLocations, offset, size)
partition.values, file.getPath.toUri.toString, offset, size, hosts)
} else {
val hosts = getBlockHosts(blockLocations, 0, file.getLen)
partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L
/** Close the current partition and move to the next. */
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
val newPartition =
currentFiles.toArray) // Copy to a new Array.
partitions += newPartition
currentSize = 0
// Assign files to partitions using "Next Fit Decreasing"
splitFiles.foreach { file =>
if (currentSize + file.length > defaultMaxSplitBytes) {
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
case f: LocatedFileStatus => f.getBlockLocations
case f => Array.empty[BlockLocation]
// Given locations of all blocks of a single file, `blockLocations`, and an `(offset, length)`
// pair that represents a segment of the same file, find out the block that contains the largest
// fraction the segment, and returns location hosts of that block. If no such block can be found,
// returns an empty array.
private def getBlockHosts(blockLocations: Array[BlockLocation], offset: Long, length: Long): Array[String] = {
val candidates = {
// The fragment starts from a position within this block
case b if b.getOffset <= offset && offset < b.getOffset + b.getLength =>
b.getHosts -> (b.getOffset + b.getLength - offset).min(length)
// The fragment ends at a position within this block
case b if offset <= b.getOffset && offset + length < b.getLength =>
b.getHosts -> (offset + length - b.getOffset).min(length)
// The fragment fully contains this block
case b if offset <= b.getOffset && b.getOffset + b.getLength <= offset + length =>
b.getHosts -> b.getLength
// The fragment doesn't intersect with this block
case b =>
b.getHosts -> 0L
}.filter { case (hosts, size) =>
size > 0L
if (candidates.isEmpty) {
} else {
val (hosts, _) = candidates.maxBy { case (_, size) => size }