Spark: Rewrite MERGE INTO and DELETE for only Iceberg tables (#2134)

Co-authored-by: Dilip Biswal <dbiswal@adobe.com>
diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlignMergeIntoTable.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlignMergeIntoTable.scala
index 4d38f06..d15fc93 100644
--- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlignMergeIntoTable.scala
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlignMergeIntoTable.scala
@@ -22,12 +22,13 @@
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, InsertAction, LogicalPlan, MergeIntoTable, UpdateAction}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
 import org.apache.spark.sql.internal.SQLConf
 
 case class AlignMergeIntoTable(conf: SQLConf) extends Rule[LogicalPlan] with AssignmentAlignmentSupport {
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case m: MergeIntoTable if m.resolved =>
+    case m: MergeIntoTable if m.resolved && isIcebergRelation(m.targetTable) =>
       val alignedMatchedActions = m.matchedActions.map {
         case u @ UpdateAction(_, assignments) =>
           u.copy(assignments = alignAssignments(m.targetTable, assignments))
diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeleteFromTablePredicateCheck.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeleteFromTablePredicateCheck.scala
index 5f86817..9f69153 100644
--- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeleteFromTablePredicateCheck.scala
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeleteFromTablePredicateCheck.scala
@@ -22,12 +22,13 @@
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.{Expression, InSubquery, Not}
 import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
 
 object DeleteFromTablePredicateCheck extends (LogicalPlan => Unit) {
 
   override def apply(plan: LogicalPlan): Unit = {
     plan foreach {
-      case DeleteFromTable(_, Some(condition)) if hasNullAwarePredicateWithinNot(condition) =>
+      case DeleteFromTable(r, Some(condition)) if hasNullAwarePredicateWithinNot(condition) && isIcebergRelation(r) =>
         // this limitation is present since SPARK-25154 fix is not yet available
         // we use Not(EqualsNullSafe(cond, true)) when deciding which records to keep
         // such conditions are rewritten by Spark as an existential join and currently Spark
diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/MergeIntoTablePredicateCheck.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/MergeIntoTablePredicateCheck.scala
index a44e8f0..179b4aa6 100644
--- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/MergeIntoTablePredicateCheck.scala
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/MergeIntoTablePredicateCheck.scala
@@ -27,12 +27,13 @@
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.logical.MergeIntoTable
 import org.apache.spark.sql.catalyst.plans.logical.UpdateAction
+import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
 
 object MergeIntoTablePredicateCheck extends (LogicalPlan => Unit) {
 
   override def apply(plan: LogicalPlan): Unit = {
     plan foreach {
-      case merge: MergeIntoTable =>
+      case merge: MergeIntoTable if isIcebergRelation(merge.targetTable) =>
         validateMergeIntoConditions(merge)
       case _ => // OK
     }
diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeConditionsInRowLevelOperations.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeConditionsInRowLevelOperations.scala
index 906c628..d573ac5 100644
--- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeConditionsInRowLevelOperations.scala
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeConditionsInRowLevelOperations.scala
@@ -23,6 +23,7 @@
 import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 
 // we have to optimize expressions used in delete/update before we can rewrite row-level operations
@@ -30,7 +31,8 @@
 // it is a temp solution since we cannot inject rewrite of row-level ops after operator optimizations
 object OptimizeConditionsInRowLevelOperations extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case d @ DeleteFromTable(table, cond) if !SubqueryExpression.hasSubquery(cond.getOrElse(Literal.TrueLiteral)) =>
+    case d @ DeleteFromTable(table, cond)
+        if !SubqueryExpression.hasSubquery(cond.getOrElse(Literal.TrueLiteral)) && isIcebergRelation(table) =>
       val optimizedCond = optimizeCondition(cond.getOrElse(Literal.TrueLiteral), table)
       d.copy(condition = Some(optimizedCond))
   }
diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesInRowLevelOperations.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesInRowLevelOperations.scala
index c173150..42431aa 100644
--- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesInRowLevelOperations.scala
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesInRowLevelOperations.scala
@@ -22,11 +22,12 @@
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
 
 // a temp solution until PullupCorrelatedPredicates handles row-level operations in Spark
 object PullupCorrelatedPredicatesInRowLevelOperations extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case d @ DeleteFromTable(table, Some(cond)) if SubqueryExpression.hasSubquery(cond) =>
+    case d @ DeleteFromTable(table, Some(cond)) if SubqueryExpression.hasSubquery(cond) && isIcebergRelation(table) =>
       // Spark pulls up correlated predicates only for UnaryNodes
       // DeleteFromTable does not extend UnaryNode so it is ignored in that rule
       // We have this workaround until it is fixed in Spark
diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
index ad401ae..a1af716 100644
--- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
@@ -37,6 +37,7 @@
 import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
 import org.apache.spark.sql.catalyst.plans.logical.Sort
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
 import org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper
 import org.apache.spark.sql.connector.iceberg.catalog.ExtendedSupportsDelete
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
@@ -53,11 +54,12 @@
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
-    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isMetadataDelete(r, cond) =>
+    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond))
+        if isMetadataDelete(r, cond) && isIcebergRelation(r) =>
       d
 
     // rewrite all operations that require reading the table to delete records
-    case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) =>
+    case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isIcebergRelation(r) =>
       // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
       val writeInfo = newWriteInfo(r.schema)
       val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete", writeInfo)
diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
index 72a09d9..ef55a8d 100644
--- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
@@ -57,6 +57,7 @@
 import org.apache.spark.sql.catalyst.plans.logical.Sort
 import org.apache.spark.sql.catalyst.plans.logical.UpdateAction
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
 import org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.iceberg.write.MergeBuilder
@@ -75,8 +76,8 @@
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan resolveOperators {
-      case MergeIntoTable(target: DataSourceV2Relation, source: LogicalPlan, cond, matchedActions, notMatchedActions)
-          if matchedActions.isEmpty =>
+      case MergeIntoTable(target: DataSourceV2Relation, source, cond, matchedActions, notMatchedActions)
+          if matchedActions.isEmpty && isIcebergRelation(target) =>
 
         val targetTableScan = buildSimpleScanPlan(target, cond)
 
@@ -100,8 +101,8 @@
 
         AppendData.byPosition(target, writePlan, Map.empty)
 
-      case MergeIntoTable(target: DataSourceV2Relation, source: LogicalPlan, cond, matchedActions, notMatchedActions)
-          if notMatchedActions.isEmpty =>
+      case MergeIntoTable(target: DataSourceV2Relation, source, cond, matchedActions, notMatchedActions)
+          if notMatchedActions.isEmpty && isIcebergRelation(target) =>
 
         val mergeBuilder = target.table.asMergeable.newMergeBuilder("merge", newWriteInfo(target.schema))
 
@@ -131,7 +132,8 @@
 
         ReplaceData(target, batchWrite, writePlan)
 
-      case MergeIntoTable(target: DataSourceV2Relation, source: LogicalPlan, cond, matchedActions, notMatchedActions) =>
+      case MergeIntoTable(target: DataSourceV2Relation, source, cond, matchedActions, notMatchedActions)
+          if isIcebergRelation(target) =>
 
         val mergeBuilder = target.table.asMergeable.newMergeBuilder("merge", newWriteInfo(target.schema))
 
diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/PlanUtils.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/PlanUtils.scala
new file mode 100644
index 0000000..c9d96c0
--- /dev/null
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/PlanUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.utils
+
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+object PlanUtils {
+  def isIcebergRelation(plan: LogicalPlan): Boolean = {
+    def isIcebergTable(relation: DataSourceV2Relation): Boolean = relation.table match {
+      case _: SparkTable => true
+      case _ => false
+    }
+
+    plan match {
+      case s: SubqueryAlias => isIcebergRelation(s.child)
+      case r: DataSourceV2Relation => isIcebergTable(r)
+      case _ => false
+    }
+  }
+}
diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index 363a06d..31986db 100644
--- a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -466,6 +466,15 @@
   }
 
   @Test
+  public void testDeleteOnNonIcebergTableNotSupported() throws NoSuchTableException {
+    createOrReplaceView("testtable", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Delete is not supported for non iceberg table",
+        AnalysisException.class, "DELETE is only supported with v2 tables.",
+        () -> sql("DELETE FROM %s WHERE c1 = -100", "testtable"));
+  }
+
+  @Test
   public void testDeleteWithExistSubquery() throws NoSuchTableException {
     createAndInitUnpartitionedTable();
 
diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index a08d17d..c5dc5aa 100644
--- a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -362,4 +362,19 @@
               "  INSERT (id, c) VALUES (1, null)", tableName);
         });
   }
+
+  @Test
+  public void testMergeWithNonIcebergTargetTableNotSupported() {
+    createOrReplaceView("target", "{ \"c1\": -100, \"c2\": -200 }");
+    createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
+
+    AssertHelpers.assertThrows("Should complain non iceberg target table",
+        UnsupportedOperationException.class, "MERGE INTO TABLE is not supported temporarily.",
+        () -> {
+          sql("MERGE INTO target t USING source s " +
+              "ON t.c1 == s.c1 " +
+              "WHEN MATCHED THEN " +
+              "  UPDATE SET *");
+        });
+  }
 }