| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.execution.datasources |
| |
| import org.apache.spark.internal.Logging |
| import org.apache.spark.sql._ |
| import org.apache.spark.sql.catalyst.catalog.BucketSpec |
| import org.apache.spark.sql.catalyst.expressions |
| import org.apache.spark.sql.catalyst.expressions._ |
| import org.apache.spark.sql.catalyst.planning.PhysicalOperation |
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan |
| import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} |
| import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME |
| import org.apache.spark.sql.types.{DoubleType, FloatType, StructType} |
| import org.apache.spark.util.collection.BitSet |
| |
| /** |
| * A strategy for planning scans over collections of files that might be partitioned or bucketed |
| * by user specified columns. |
| * |
| * At a high level planning occurs in several phases: |
| * - Split filters by when they need to be evaluated. |
| * - Prune the schema of the data requested based on any projections present. Today this pruning |
| * is only done on top level columns, but formats should support pruning of nested columns as |
| * well. |
| * - Construct a reader function by passing filters and the schema into the FileFormat. |
| * - Using a partition pruning predicates, enumerate the list of files that should be read. |
| * - Split the files into tasks and construct a FileScanRDD. |
| * - Add any projection or filters that must be evaluated after the scan. |
| * |
| * Files are assigned into tasks using the following algorithm: |
| * - If the table is bucketed, group files by bucket id into the correct number of partitions. |
| * - If the table is not bucketed or bucketing is turned off: |
| * - If any file is larger than the threshold, split it into pieces based on that threshold |
| * - Sort the files by decreasing file size. |
| * - Assign the ordered files to buckets using the following algorithm. If the current partition |
| * is under the threshold with the addition of the next file, add it. If not, open a new bucket |
| * and add it. Proceed to the next file. |
| */ |
| object FileSourceStrategy extends Strategy with PredicateHelper with Logging { |
| |
| // should prune buckets iff num buckets is greater than 1 and there is only one bucket column |
| private def shouldPruneBuckets(bucketSpec: Option[BucketSpec]): Boolean = { |
| bucketSpec match { |
| case Some(spec) => spec.bucketColumnNames.length == 1 && spec.numBuckets > 1 |
| case None => false |
| } |
| } |
| |
| private def getExpressionBuckets( |
| expr: Expression, |
| bucketColumnName: String, |
| numBuckets: Int): BitSet = { |
| |
| def getBucketNumber(attr: Attribute, v: Any): Int = { |
| BucketingUtils.getBucketIdFromValue(attr, numBuckets, v) |
| } |
| |
| def getBucketSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet = { |
| val matchedBuckets = new BitSet(numBuckets) |
| iter |
| .map(v => getBucketNumber(attr, v)) |
| .foreach(bucketNum => matchedBuckets.set(bucketNum)) |
| matchedBuckets |
| } |
| |
| def getBucketSetFromValue(attr: Attribute, v: Any): BitSet = { |
| val matchedBuckets = new BitSet(numBuckets) |
| matchedBuckets.set(getBucketNumber(attr, v)) |
| matchedBuckets |
| } |
| |
| expr match { |
| case expressions.Equality(a: Attribute, Literal(v, _)) if a.name == bucketColumnName => |
| getBucketSetFromValue(a, v) |
| case expressions.In(a: Attribute, list) |
| if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName => |
| getBucketSetFromIterable(a, list.map(e => e.eval(EmptyRow))) |
| case expressions.InSet(a: Attribute, hset) if a.name == bucketColumnName => |
| getBucketSetFromIterable(a, hset) |
| case expressions.IsNull(a: Attribute) if a.name == bucketColumnName => |
| getBucketSetFromValue(a, null) |
| case expressions.IsNaN(a: Attribute) |
| if a.name == bucketColumnName && a.dataType == FloatType => |
| getBucketSetFromValue(a, Float.NaN) |
| case expressions.IsNaN(a: Attribute) |
| if a.name == bucketColumnName && a.dataType == DoubleType => |
| getBucketSetFromValue(a, Double.NaN) |
| case expressions.And(left, right) => |
| getExpressionBuckets(left, bucketColumnName, numBuckets) & |
| getExpressionBuckets(right, bucketColumnName, numBuckets) |
| case expressions.Or(left, right) => |
| getExpressionBuckets(left, bucketColumnName, numBuckets) | |
| getExpressionBuckets(right, bucketColumnName, numBuckets) |
| case _ => |
| val matchedBuckets = new BitSet(numBuckets) |
| matchedBuckets.setUntil(numBuckets) |
| matchedBuckets |
| } |
| } |
| |
| private def genBucketSet( |
| normalizedFilters: Seq[Expression], |
| bucketSpec: BucketSpec): Option[BitSet] = { |
| if (normalizedFilters.isEmpty) { |
| return None |
| } |
| |
| val bucketColumnName = bucketSpec.bucketColumnNames.head |
| val numBuckets = bucketSpec.numBuckets |
| |
| val normalizedFiltersAndExpr = normalizedFilters |
| .reduce(expressions.And) |
| val matchedBuckets = getExpressionBuckets(normalizedFiltersAndExpr, bucketColumnName, |
| numBuckets) |
| |
| val numBucketsSelected = matchedBuckets.cardinality() |
| |
| logInfo { |
| s"Pruned ${numBuckets - numBucketsSelected} out of $numBuckets buckets." |
| } |
| |
| // None means all the buckets need to be scanned |
| if (numBucketsSelected == numBuckets) { |
| None |
| } else { |
| Some(matchedBuckets) |
| } |
| } |
| |
| def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { |
| case PhysicalOperation(projects, filters, |
| l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table, _)) => |
| // Filters on this relation fall into four categories based on where we can use them to avoid |
| // reading unneeded data: |
| // - partition keys only - used to prune directories to read |
| // - bucket keys only - optionally used to prune files to read |
| // - keys stored in the data only - optionally used to skip groups of data in files |
| // - filters that need to be evaluated again after the scan |
| val filterSet = ExpressionSet(filters) |
| |
| val normalizedFilters = DataSourceStrategy.normalizeExprs( |
| filters.filter(_.deterministic), l.output) |
| |
| val partitionColumns = |
| l.resolve( |
| fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver) |
| val partitionSet = AttributeSet(partitionColumns) |
| |
| // this partitionKeyFilters should be the same with the ones being executed in |
| // PruneFileSourcePartitions |
| val partitionKeyFilters = DataSourceStrategy.getPushedDownFilters(partitionColumns, |
| normalizedFilters) |
| |
| // subquery expressions are filtered out because they can't be used to prune buckets or pushed |
| // down as data filters, yet they would be executed |
| val normalizedFiltersWithoutSubqueries = |
| normalizedFilters.filterNot(SubqueryExpression.hasSubquery) |
| |
| val bucketSpec: Option[BucketSpec] = fsRelation.bucketSpec |
| val bucketSet = if (shouldPruneBuckets(bucketSpec)) { |
| genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec.get) |
| } else { |
| None |
| } |
| |
| val dataColumns = |
| l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver) |
| |
| // Partition keys are not available in the statistics of the files. |
| // `dataColumns` might have partition columns, we need to filter them out. |
| val dataColumnsWithoutPartitionCols = dataColumns.filterNot(partitionColumns.contains) |
| val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f => |
| if (f.references.intersect(partitionSet).nonEmpty) { |
| extractPredicatesWithinOutputSet(f, AttributeSet(dataColumnsWithoutPartitionCols)) |
| } else { |
| Some(f) |
| } |
| } |
| val supportNestedPredicatePushdown = |
| DataSourceUtils.supportNestedPredicatePushdown(fsRelation) |
| val pushedFilters = dataFilters |
| .flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) |
| logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}") |
| |
| // Predicates with both partition keys and attributes need to be evaluated after the scan. |
| val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty) |
| logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}") |
| |
| val filterAttributes = AttributeSet(afterScanFilters) |
| val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects |
| val requiredAttributes = AttributeSet(requiredExpressions) |
| |
| val readDataColumns = |
| dataColumns |
| .filter(requiredAttributes.contains) |
| .filterNot(partitionColumns.contains) |
| val outputSchema = readDataColumns.toStructType |
| logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}") |
| |
| val metadataStructOpt = l.output.collectFirst { |
| case FileSourceMetadataAttribute(attr) => attr |
| } |
| |
| val metadataColumns = metadataStructOpt.map { metadataStruct => |
| metadataStruct.dataType.asInstanceOf[StructType].fields.map { field => |
| FileSourceMetadataAttribute(field.name, field.dataType) |
| }.toSeq |
| }.getOrElse(Seq.empty) |
| |
| // outputAttributes should also include the metadata columns at the very end |
| val outputAttributes = readDataColumns ++ partitionColumns ++ metadataColumns |
| |
| val scan = |
| FileSourceScanExec( |
| fsRelation, |
| outputAttributes, |
| outputSchema, |
| partitionKeyFilters.toSeq, |
| bucketSet, |
| None, |
| dataFilters, |
| table.map(_.identifier)) |
| |
| // extra Project node: wrap flat metadata columns to a metadata struct |
| val withMetadataProjections = metadataStructOpt.map { metadataStruct => |
| val metadataAlias = |
| Alias(CreateStruct(metadataColumns), METADATA_NAME)(exprId = metadataStruct.exprId) |
| execution.ProjectExec( |
| scan.output.dropRight(metadataColumns.length) :+ metadataAlias, scan) |
| }.getOrElse(scan) |
| |
| val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) |
| val withFilter = afterScanFilter |
| .map(execution.FilterExec(_, withMetadataProjections)) |
| .getOrElse(withMetadataProjections) |
| val withProjections = if (projects == withFilter.output) { |
| withFilter |
| } else { |
| execution.ProjectExec(projects, withFilter) |
| } |
| |
| withProjections :: Nil |
| |
| case _ => Nil |
| } |
| } |