blob: a1af71633a6e52993df58044b349ec4f491ea31d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.Ascending
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.EqualNullSafe
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.Not
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
import org.apache.spark.sql.catalyst.plans.logical.Sort
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
import org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper
import org.apache.spark.sql.connector.iceberg.catalog.ExtendedSupportsDelete
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.BooleanType
// TODO: should be part of early scan push down after the delete condition is optimized
case class RewriteDelete(spark: SparkSession) extends Rule[LogicalPlan] with RewriteRowLevelOperationHelper {
import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
override def resolver: Resolver = spark.sessionState.conf.resolver
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
// don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond))
if isMetadataDelete(r, cond) && isIcebergRelation(r) =>
d
// rewrite all operations that require reading the table to delete records
case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isIcebergRelation(r) =>
// TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
val writeInfo = newWriteInfo(r.schema)
val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete", writeInfo)
val matchingRowsPlanBuilder = scanRelation => Filter(cond, scanRelation)
val scanPlan = buildDynamicFilterScanPlan(spark, r.table, r.output, mergeBuilder, cond, matchingRowsPlanBuilder)
val remainingRowFilter = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
val remainingRowsPlan = Filter(remainingRowFilter, scanPlan)
val mergeWrite = mergeBuilder.asWriteBuilder.buildForBatch()
val writePlan = buildWritePlan(remainingRowsPlan, r.output)
ReplaceData(r, mergeWrite, writePlan)
}
private def buildWritePlan(
remainingRowsPlan: LogicalPlan,
output: Seq[AttributeReference]): LogicalPlan = {
val fileNameCol = findOutputAttr(remainingRowsPlan.output, FILE_NAME_COL)
val rowPosCol = findOutputAttr(remainingRowsPlan.output, ROW_POS_COL)
val order = Seq(SortOrder(fileNameCol, Ascending), SortOrder(rowPosCol, Ascending))
val numShufflePartitions = SQLConf.get.numShufflePartitions
val repartition = RepartitionByExpression(Seq(fileNameCol), remainingRowsPlan, numShufflePartitions)
val sort = Sort(order, global = false, repartition)
Project(output, sort)
}
private def isMetadataDelete(relation: DataSourceV2Relation, cond: Expression): Boolean = {
relation.table match {
case t: ExtendedSupportsDelete if !SubqueryExpression.hasSubquery(cond) =>
val predicates = splitConjunctivePredicates(cond)
val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, relation.output)
val dataSourceFilters = toDataSourceFilters(normalizedPredicates)
val allPredicatesTranslated = normalizedPredicates.size == dataSourceFilters.length
allPredicatesTranslated && t.canDeleteWhere(dataSourceFilters)
case _ => false
}
}
}