[HUDI-6658] Inject filters for incremental query  (#10225)

Add incremental filters to the query plan

Also fix some tests that use partition path as the precombine field

Incremental queries will now work the new filegroup reader
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index 64ee645..b9110f1 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -102,9 +102,11 @@
    * Spark requires file formats to append the partition path fields to the end of the schema.
    * For tables where the partition path fields are not at the end of the schema, we don't want
    * to return the schema in the wrong order when they do a query like "select *". To fix this
-   * behavior, we apply a projection onto FileScan when the file format is NewHudiParquetFileFormat
+   * behavior, we apply a projection onto FileScan when the file format has HoodieFormatTrait
+   *
+   * Additionally, incremental queries require filters to be added to the plan
    */
-  def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan
+  def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan
 
   /**
    * Decomposes [[InsertIntoStatement]] into its arguments allowing to accommodate for API
@@ -140,4 +142,9 @@
   def failAnalysisForMIT(a: Attribute, cols: String): Unit = {}
 
   def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, condition: Option[Expression], hint: String): LogicalPlan
+
+  /**
+   * true if both plans produce the same attributes in the the same order
+   */
+  def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 168502b..ac8286b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -285,7 +285,12 @@
             resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
           }
         case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-          new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
+          if (fileFormatUtils.isDefined) {
+            new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, isBootstrappedTable).build()
+          } else {
+            new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
+          }
 
         case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
           if (fileFormatUtils.isDefined) {
@@ -304,7 +309,12 @@
           }
 
         case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-          MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema)
+          if (fileFormatUtils.isDefined) {
+            new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, isBootstrappedTable).build()
+          } else {
+            MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema)
+          }
 
         case (_, _, true) =>
           if (fileFormatUtils.isDefined) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
index 37e42bc..959f7d2 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
@@ -28,6 +28,7 @@
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow}
 import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
+import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
 import scala.jdk.CollectionConverters.{asScalaBufferConverter, mapAsScalaMapConverter}
@@ -78,4 +79,8 @@
       new Path(fileGroupId.getPartitionPath, fileGroupId.getFileId).toString
     }.toArray
   }
+
+  override def getRequiredFilters: Seq[Filter] = {
+    Seq.empty
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index 88f7224..aee1518 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -281,7 +281,7 @@
     sparkSession, metaClient, schemaSpec, options, FileStatusCache.getOrCreate(sparkSession), true, true)
 
   override def buildFileFormat(): FileFormat = {
-    if (fileGroupReaderEnabled && !isBootstrap) {
+    if (fileGroupReaderEnabled) {
       new HoodieFileGroupReaderBasedParquetFileFormat(
         tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
         metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
@@ -318,7 +318,7 @@
     shouldEmbedFileSlices = true)
 
   override def buildFileFormat(): FileFormat = {
-    if (fileGroupReaderEnabled && !isBootstrap) {
+    if (fileGroupReaderEnabled) {
       new HoodieFileGroupReaderBasedParquetFileFormat(
         tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
         metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
@@ -349,7 +349,7 @@
     sparkSession, metaClient, schemaSpec, options, FileStatusCache.getOrCreate(sparkSession), false, isBootstrap)
 
   override def buildFileFormat(): FileFormat = {
-    if (fileGroupReaderEnabled && !isBootstrap) {
+    if (fileGroupReaderEnabled) {
       new HoodieFileGroupReaderBasedParquetFileFormat(
         tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
         metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
index 7e765b4..d541e73 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
@@ -29,7 +29,7 @@
 import org.apache.spark.sql.types.StructType
 
 import java.util.stream.Collectors
-import scala.jdk.CollectionConverters.asScalaBufferConverter
+import scala.collection.JavaConverters._
 
 class HoodieIncrementalFileIndex(override val spark: SparkSession,
                                  override val metaClient: HoodieTableMetaClient,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
new file mode 100644
index 0000000..0587f13
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hudi.{HoodieCDCFileIndex, SparkAdapterSupport, SparkHoodieTableFileIndex}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Contains, EndsWith, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, NamedExpression, Not, Or, StartsWith}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, ParquetFileFormat}
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+import scala.util.Try
+
+object FileFormatUtilsForFileGroupReader extends SparkAdapterSupport {
+
+  def applyNewFileFormatChanges(scanOperation: LogicalPlan, logicalRelation: LogicalPlan, fs: HadoopFsRelation): LogicalPlan = {
+    val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+    ff.isProjected = true
+    val tableSchema = fs.location match {
+      case index: HoodieCDCFileIndex => index.cdcRelation.schema
+      case index: SparkHoodieTableFileIndex => index.schema
+    }
+    val resolvedSchema = logicalRelation.resolve(tableSchema, fs.sparkSession.sessionState.analyzer.resolver)
+    val unfilteredPlan = if (!fs.partitionSchema.fields.isEmpty && sparkAdapter.getCatalystPlanUtils.produceSameOutput(scanOperation, logicalRelation)) {
+      Project(resolvedSchema, scanOperation)
+    } else {
+      scanOperation
+    }
+    applyFiltersToPlan(unfilteredPlan, tableSchema, resolvedSchema, ff.getRequiredFilters)
+  }
+
+  /**
+   * adapted from https://github.com/apache/spark/blob/20df062d85e80422a55afae80ddbf2060f26516c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/StructFilters.scala
+   */
+  def applyFiltersToPlan(plan: LogicalPlan, tableSchema: StructType, resolvedSchema: Seq[Attribute], filters: Seq[org.apache.spark.sql.sources.Filter]): LogicalPlan = {
+
+    def filterToExpression(
+                            filter: sources.Filter,
+                            toRef: String => Option[NamedExpression]): Option[Expression] = {
+      def zipAttributeAndValue(name: String, value: Any): Option[(NamedExpression, Literal)] = {
+        zip(toRef(name), toLiteral(value))
+      }
+
+      def translate(filter: sources.Filter): Option[Expression] = filter match {
+        case sources.And(left, right) =>
+          zip(translate(left), translate(right)).map(And.tupled)
+        case sources.Or(left, right) =>
+          zip(translate(left), translate(right)).map(Or.tupled)
+        case sources.Not(child) =>
+          translate(child).map(Not)
+        case sources.EqualTo(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(EqualTo.tupled)
+        case sources.EqualNullSafe(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(EqualNullSafe.tupled)
+        case sources.IsNull(attribute) =>
+          toRef(attribute).map(IsNull)
+        case sources.IsNotNull(attribute) =>
+          toRef(attribute).map(IsNotNull)
+        case sources.In(attribute, values) =>
+          val literals = values.toSeq.flatMap(toLiteral)
+          if (literals.length == values.length) {
+            toRef(attribute).map(In(_, literals))
+          } else {
+            None
+          }
+        case sources.GreaterThan(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(GreaterThan.tupled)
+        case sources.GreaterThanOrEqual(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(GreaterThanOrEqual.tupled)
+        case sources.LessThan(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(LessThan.tupled)
+        case sources.LessThanOrEqual(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(LessThanOrEqual.tupled)
+        case sources.StringContains(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(Contains.tupled)
+        case sources.StringStartsWith(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(StartsWith.tupled)
+        case sources.StringEndsWith(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(EndsWith.tupled)
+        /* Not supported in spark2. If needed, we will need to create separate spark 2 and 3 implementations
+      case sources.AlwaysTrue() =>
+        Some(Literal(true, BooleanType))
+      case sources.AlwaysFalse() =>
+        Some(Literal(false, BooleanType))
+         */
+      }
+
+      translate(filter)
+    }
+
+    def zip[A, B](a: Option[A], b: Option[B]): Option[(A, B)] = {
+      a.zip(b).headOption
+    }
+
+    def toLiteral(value: Any): Option[Literal] = {
+      Try(Literal(value)).toOption
+    }
+
+    def toRef(attr: String): Option[NamedExpression] = {
+      tableSchema.getFieldIndex(attr).map { index =>
+        resolvedSchema(index)
+      }
+    }
+
+    val expressionFilters = filters.map(f => filterToExpression(f, n => toRef(n)).get)
+    if (expressionFilters.nonEmpty) {
+      Filter(expressionFilters.reduceLeft(And), plan)
+    } else {
+      plan
+    }
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 3565c70..57fdaf8 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -47,6 +47,7 @@
 
   // Used so that the planner only projects once and does not stack overflow
   var isProjected: Boolean = false
+  def getRequiredFilters: Seq[Filter]
 }
 
 /**
@@ -65,6 +66,8 @@
                                                   requiredFilters: Seq[Filter]
                                            ) extends ParquetFileFormat with SparkAdapterSupport with HoodieFormatTrait {
 
+  def getRequiredFilters: Seq[Filter] = requiredFilters
+
   /**
    * Support batch needs to remain consistent, even if one side of a bootstrap merge can support
    * while the other side can't
@@ -104,8 +107,8 @@
     val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
     val augmentedHadoopConf = FSUtils.buildInlineConf(hadoopConf)
     val (baseFileReader, preMergeBaseFileReader, readerMaps, cdcFileReader) = buildFileReaders(
-      spark, dataSchema, partitionSchema, if (isIncremental) requiredSchemaWithMandatory else requiredSchema,
-      filters, options, augmentedHadoopConf, requiredSchemaWithMandatory, requiredWithoutMeta, requiredMeta)
+      spark, dataSchema, partitionSchema, requiredSchema, filters, options, augmentedHadoopConf,
+      requiredSchemaWithMandatory, requiredWithoutMeta, requiredMeta)
 
     val requestedAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema, sanitizedTableName)
     val dataAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, sanitizedTableName)
@@ -242,25 +245,20 @@
       metaFields.find(f => f.name == name)
     }
 
-    // If not MergeOnRead or if projection is compatible
-    if (isIncremental) {
-      StructType(dataSchema.toArray ++ partitionSchema.fields)
-    } else {
-      val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
-      for (field <- mandatoryFields) {
-        if (requiredSchema.getFieldIndex(field).isEmpty) {
-          // Support for nested fields
-          val fieldParts = field.split("\\.")
-          val fieldToAdd = findNestedField(dataSchema, fieldParts)
-            .orElse(findNestedField(partitionSchema, fieldParts))
-            .orElse(findMetaField(field))
-            .getOrElse(throw new IllegalArgumentException(s"Field $field does not exist in the table schema"))
-          added.append(fieldToAdd)
-        }
+    val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
+    for (field <- mandatoryFields) {
+      if (requiredSchema.getFieldIndex(field).isEmpty) {
+        // Support for nested fields
+        val fieldParts = field.split("\\.")
+        val fieldToAdd = findNestedField(dataSchema, fieldParts)
+          .orElse(findNestedField(partitionSchema, fieldParts))
+          .orElse(findMetaField(field))
+          .getOrElse(throw new IllegalArgumentException(s"Field $field does not exist in the table schema"))
+        added.append(fieldToAdd)
       }
-      val addedFields = StructType(added.toArray)
-      StructType(requiredSchema.toArray ++ addedFields.fields)
     }
+    val addedFields = StructType(added.toArray)
+    StructType(requiredSchema.toArray ++ addedFields.fields)
   }
 
   protected def buildFileReaders(sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
index 44381a5..0751d99 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -56,6 +56,8 @@
                                  requiredFilters: Seq[Filter]
                                 ) extends ParquetFileFormat with SparkAdapterSupport with HoodieFormatTrait {
 
+  def getRequiredFilters: Seq[Filter] = requiredFilters
+
   override def isSplitable(sparkSession: SparkSession,
                            options: Map[String, String],
                            path: Path): Boolean = {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 9d40140..eb69f2b 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -281,7 +281,7 @@
             ut.copy(table = relation)
 
           case logicalPlan: LogicalPlan if logicalPlan.resolved =>
-            sparkAdapter.getCatalystPlanUtils.applyNewHoodieParquetFileFormatProjection(logicalPlan)
+            sparkAdapter.getCatalystPlanUtils.maybeApplyForNewFileFormat(logicalPlan)
         }
       }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
index b0454f7..a7ef56d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
@@ -31,6 +31,7 @@
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 
+import org.apache.spark.SparkConf;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -59,6 +60,11 @@
 
 public class TestGlobalIndexEnableUpdatePartitions extends SparkClientFunctionalTestHarness {
 
+  @Override
+  public SparkConf conf() {
+    return conf(SparkClientFunctionalTestHarness.getSparkSqlConf());
+  }
+
   private static Stream<Arguments> getTableTypeAndIndexType() {
     return Stream.of(
         Arguments.of(COPY_ON_WRITE, GLOBAL_SIMPLE),
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
index 75a7026..efcc935 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
@@ -127,12 +127,12 @@
     runIncrementalQueryAndCompare(startArchivedCommitTs, endArchivedCommitTs, 1, true)
 
     // Test start commit is archived, end commit is not archived
-    shouldThrowIfFallbackIsFalse(tableType,
+    shouldThrowIfFallbackIsFalse(
       () => runIncrementalQueryAndCompare(startArchivedCommitTs, endUnarchivedCommitTs, nArchivedInstants + 1, false))
     runIncrementalQueryAndCompare(startArchivedCommitTs, endUnarchivedCommitTs, nArchivedInstants + 1, true)
 
     // Test both start commit and end commits are not archived but got cleaned
-    shouldThrowIfFallbackIsFalse(tableType,
+    shouldThrowIfFallbackIsFalse(
       () => runIncrementalQueryAndCompare(startUnarchivedCommitTs, endUnarchivedCommitTs, 1, false))
     runIncrementalQueryAndCompare(startUnarchivedCommitTs, endUnarchivedCommitTs, 1, true)
 
@@ -169,22 +169,13 @@
     assertEquals(perBatchSize * batchNum, hoodieIncViewDF.count())
   }
 
-  private def shouldThrowIfFallbackIsFalse(tableType: HoodieTableType, fn: () => Unit): Unit = {
+  private def shouldThrowIfFallbackIsFalse(fn: () => Unit): Unit = {
     val msg = "Should fail with Path does not exist"
-    tableType match {
-      case HoodieTableType.COPY_ON_WRITE =>
-        assertThrows(classOf[HoodieIncrementalPathNotFoundException], new Executable {
-          override def execute(): Unit = {
-            fn()
-          }
-        }, msg)
-      case HoodieTableType.MERGE_ON_READ =>
-        val exp = assertThrows(classOf[SparkException], new Executable {
-          override def execute(): Unit = {
-            fn()
-          }
-        }, msg)
-        assertTrue(exp.getMessage.contains("FileNotFoundException"))
-    }
+    val exp = assertThrows(classOf[SparkException], new Executable {
+      override def execute(): Unit = {
+        fn()
+      }
+    }, msg)
+    assertTrue(exp.getMessage.contains("FileNotFoundException"))
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index 3f64e24..682a118 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -27,7 +27,9 @@
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig}
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.spark.SparkConf
 import org.apache.spark.sql._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
@@ -37,6 +39,8 @@
 
 class TestSparkDataSource extends SparkClientFunctionalTestHarness {
 
+  override def conf: SparkConf = conf(getSparkSqlConf)
+
   val parallelism: Integer = 4
 
   val commonOpts: Map[String, String] = Map(
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index b2c7333..ac0afd6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -50,14 +50,15 @@
              |  id int,
              |  name string,
              |  price double,
-             |  ts long
+             |  ts long,
+             |  partition long
              |) using hudi
              | options (
              |  primaryKey ='id',
              |  type = '$tableType',
              |  preCombineField = 'ts'
              | )
-             | partitioned by(ts)
+             | partitioned by(partition)
              | location '$basePath'
        """.stripMargin)
         // disable automatic inline compaction so that HoodieDataSourceHelpers.allCompletedCommitsCompactions
@@ -65,16 +66,16 @@
         spark.sql("set hoodie.compact.inline=false")
         spark.sql("set hoodie.compact.schedule.inline=false")
 
-        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
-        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1002)")
         val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, Map.empty, Option(tableName))
         // Generate the first clustering plan
         val firstScheduleInstant = client.createNewInstantTime()
         client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
 
         // Generate the second clustering plan
-        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003, 1003)")
         val secondScheduleInstant = client.createNewInstantTime()
         client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty())
         checkAnswer(s"call show_clustering('$tableName')")(
@@ -85,9 +86,9 @@
         // Do clustering for all clustering plan generated above, and no new clustering
         // instant will be generated because of there is no commit after the second
         // clustering plan generated
-        checkAnswer(s"call run_clustering(table => '$tableName', order => 'ts', show_involved_partition => true)")(
-          Seq(secondScheduleInstant, 1, HoodieInstant.State.COMPLETED.name(), "ts=1003"),
-          Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001,ts=1002")
+        checkAnswer(s"call run_clustering(table => '$tableName', order => 'partition', show_involved_partition => true)")(
+          Seq(secondScheduleInstant, 1, HoodieInstant.State.COMPLETED.name(), "partition=1003"),
+          Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "partition=1000,partition=1001,partition=1002")
         )
 
         // No new commits
@@ -102,11 +103,11 @@
           .toSeq
         assertResult(2)(finishedClustering.size)
 
-        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-          Seq(1, "a1", 10.0, 1000),
-          Seq(2, "a2", 10.0, 1001),
-          Seq(3, "a3", 10.0, 1002),
-          Seq(4, "a4", 10.0, 1003)
+        checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+          Seq(1, "a1", 10.0, 1000, 1000),
+          Seq(2, "a2", 10.0, 1001, 1001),
+          Seq(3, "a3", 10.0, 1002, 1002),
+          Seq(4, "a4", 10.0, 1003, 1003)
         )
 
         // After clustering there should be no pending clustering and all clustering instants should be completed
@@ -116,9 +117,9 @@
         )
 
         // Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
-        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
-        spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)")
-        spark.sql(s"call run_clustering(table => '$tableName', order => 'ts', show_involved_partition => true)").show()
+        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004, 1004)")
+        spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005, 1005)")
+        spark.sql(s"call run_clustering(table => '$tableName', order => 'partition', show_involved_partition => true)").show()
 
         val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
           .findInstantsAfter(secondScheduleInstant)
@@ -129,13 +130,13 @@
         // Should have a new replace commit after the second clustering command.
         assertResult(1)(thirdClusteringInstant.size)
 
-        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-          Seq(1, "a1", 10.0, 1000),
-          Seq(2, "a2", 10.0, 1001),
-          Seq(3, "a3", 10.0, 1002),
-          Seq(4, "a4", 10.0, 1003),
-          Seq(5, "a5", 10.0, 1004),
-          Seq(6, "a6", 10.0, 1005)
+        checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+          Seq(1, "a1", 10.0, 1000, 1000),
+          Seq(2, "a2", 10.0, 1001, 1001),
+          Seq(3, "a3", 10.0, 1002, 1002),
+          Seq(4, "a4", 10.0, 1003, 1003),
+          Seq(5, "a5", 10.0, 1004, 1004),
+          Seq(6, "a6", 10.0, 1005, 1005)
         )
       }
     }
@@ -152,39 +153,40 @@
              |  id int,
              |  name string,
              |  price double,
-             |  ts long
+             |  ts long,
+             |  partition long
              |) using hudi
              | options (
              |  primaryKey ='id',
              |  type = '$tableType',
              |  preCombineField = 'ts'
              | )
-             | partitioned by(ts)
+             | partitioned by(partition)
              | location '$basePath'
        """.stripMargin)
 
         spark.sql(s"call run_clustering(path => '$basePath')").show()
         checkAnswer(s"call show_clustering(path => '$basePath')")()
 
-        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
-        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1002)")
         val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, Map.empty, Option(tableName))
         // Generate the first clustering plan
         val firstScheduleInstant = client.createNewInstantTime()
         client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
         checkAnswer(s"call show_clustering(path => '$basePath', show_involved_partition => true)")(
-          Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "ts=1000,ts=1001,ts=1002")
+          Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "partition=1000,partition=1001,partition=1002")
         )
         // Do clustering for all the clustering plan
-        checkAnswer(s"call run_clustering(path => '$basePath', order => 'ts')")(
+        checkAnswer(s"call run_clustering(path => '$basePath', order => 'partition')")(
           Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "*")
         )
 
-        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-          Seq(1, "a1", 10.0, 1000),
-          Seq(2, "a2", 10.0, 1001),
-          Seq(3, "a3", 10.0, 1002)
+        checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+          Seq(1, "a1", 10.0, 1000, 1000),
+          Seq(2, "a2", 10.0, 1001, 1001),
+          Seq(3, "a3", 10.0, 1002, 1002)
         )
 
         val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
@@ -199,20 +201,20 @@
         assertResult(1)(finishedClustering.size)
 
         // Do clustering without manual schedule(which will do the schedule if no pending clustering exists)
-        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
-        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
-        val resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L', show_involved_partition => true)")
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003, 1003)")
+        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004, 1004)")
+        val resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'partition >= 1003L', show_involved_partition => true)")
           .collect()
           .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
         assertResult(1)(resultA.length)
-        assertResult("ts=1003,ts=1004")(resultA(0)(3))
+        assertResult("partition=1003,partition=1004")(resultA(0)(3))
 
-        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-          Seq(1, "a1", 10.0, 1000),
-          Seq(2, "a2", 10.0, 1001),
-          Seq(3, "a3", 10.0, 1002),
-          Seq(4, "a4", 10.0, 1003),
-          Seq(5, "a5", 10.0, 1004)
+        checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+          Seq(1, "a1", 10.0, 1000, 1000),
+          Seq(2, "a2", 10.0, 1001, 1001),
+          Seq(3, "a3", 10.0, 1002, 1002),
+          Seq(4, "a4", 10.0, 1003, 1003),
+          Seq(5, "a5", 10.0, 1004, 1004)
         )
 
         finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -236,14 +238,15 @@
              |  id int,
              |  name string,
              |  price double,
-             |  ts long
+             |  ts long,
+             |  partition long
              |) using hudi
              | options (
              |  primaryKey ='id',
              |  type = '$tableType',
              |  preCombineField = 'ts'
              | )
-             | partitioned by(ts)
+             | partitioned by(partition)
              | location '$basePath'
        """.stripMargin)
 
@@ -253,20 +256,20 @@
         var resultA: Array[Seq[Any]] = Array.empty
 
         {
-          spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-          spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
-          spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+          spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+          spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1001)")
+          spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1002)")
 
           checkException(
-            s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L and id = 10', order => 'ts')"
+            s"call run_clustering(table => '$tableName', predicate => 'partition <= 1001L and id = 10', order => 'partition')"
           )("Only partition predicates are allowed")
 
           // Do clustering table with partition predicate
-          resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts', show_involved_partition => true)")
+          resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'partition <= 1001L', order => 'partition', show_involved_partition => true)")
             .collect()
             .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
           assertResult(1)(resultA.length)
-          assertResult("ts=1000,ts=1001")(resultA(0)(3))
+          assertResult("partition=1000,partition=1001")(resultA(0)(3))
 
           // There is 1 completed clustering instant
           val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -285,13 +288,13 @@
 
           // All clustering instants are completed
           checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")(
-            Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001")
+            Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "partition=1000,partition=1001")
           )
 
-          checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-            Seq(1, "a1", 10.0, 1000),
-            Seq(2, "a2", 10.0, 1001),
-            Seq(3, "a3", 10.0, 1002)
+          checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+            Seq(1, "a1", 10.0, 1000, 1000),
+            Seq(2, "a2", 10.0, 1001, 1001),
+            Seq(3, "a3", 10.0, 1002, 1002)
           )
         }
 
@@ -299,20 +302,20 @@
         var resultB: Array[Seq[Any]] = Array.empty
 
         {
-          spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
-          spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
-          spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)")
+          spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003, 1003)")
+          spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004, 1004)")
+          spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005, 1005)")
 
           checkException(
-            s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L and id = 10', order => 'ts')"
+            s"call run_clustering(table => '$tableName', predicate => 'partition > 1001L and partition <= 1005L and id = 10', order => 'partition')"
           )("Only partition predicates are allowed")
 
           // Do clustering table with partition predicate
-          resultB = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L', order => 'ts', show_involved_partition => true)")
+          resultB = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'partition > 1001L and partition <= 1005L', order => 'partition', show_involved_partition => true)")
             .collect()
             .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
           assertResult(1)(resultB.length)
-          assertResult("ts=1002,ts=1003,ts=1004,ts=1005")(resultB(0)(3))
+          assertResult("partition=1002,partition=1003,partition=1004,partition=1005")(resultB(0)(3))
 
           // There are 2 completed clustering instants
           val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -330,17 +333,17 @@
 
           // All clustering instants are completed
           checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")(
-            Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001"),
-            Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1002,ts=1003,ts=1004,ts=1005")
+            Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "partition=1000,partition=1001"),
+            Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "partition=1002,partition=1003,partition=1004,partition=1005")
           )
 
-          checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-            Seq(1, "a1", 10.0, 1000),
-            Seq(2, "a2", 10.0, 1001),
-            Seq(3, "a3", 10.0, 1002),
-            Seq(4, "a4", 10.0, 1003),
-            Seq(5, "a5", 10.0, 1004),
-            Seq(6, "a6", 10.0, 1005)
+          checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+            Seq(1, "a1", 10.0, 1000, 1000),
+            Seq(2, "a2", 10.0, 1001, 1001),
+            Seq(3, "a3", 10.0, 1002, 1002),
+            Seq(4, "a4", 10.0, 1003, 1003),
+            Seq(5, "a5", 10.0, 1004, 1004),
+            Seq(6, "a6", 10.0, 1005, 1005)
           )
         }
 
@@ -348,21 +351,21 @@
         var resultC: Array[Seq[Any]] = Array.empty
 
         {
-          spark.sql(s"insert into $tableName values(7, 'a7', 10, 1006)")
-          spark.sql(s"insert into $tableName values(8, 'a8', 10, 1007)")
-          spark.sql(s"insert into $tableName values(9, 'a9', 10, 1008)")
-          spark.sql(s"insert into $tableName values(10, 'a10', 10, 1009)")
+          spark.sql(s"insert into $tableName values(7, 'a7', 10, 1006, 1006)")
+          spark.sql(s"insert into $tableName values(8, 'a8', 10, 1007, 1007)")
+          spark.sql(s"insert into $tableName values(9, 'a9', 10, 1008, 1008)")
+          spark.sql(s"insert into $tableName values(10, 'a10', 10, 1009, 1009)")
 
           checkException(
-            s"call run_clustering(table => '$tableName', predicate => 'ts < 1007L or ts >= 1008L or id = 10', order => 'ts')"
+            s"call run_clustering(table => '$tableName', predicate => 'partition < 1007L or partition >= 1008L or id = 10', order => 'partition')"
           )("Only partition predicates are allowed")
 
           // Do clustering table with partition predicate
-          resultC = spark.sql(s"call run_clustering(table => '$tableName', predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts', show_involved_partition => true)")
+          resultC = spark.sql(s"call run_clustering(table => '$tableName', predicate => '(partition >= 1006L and partition < 1008L) or partition >= 1009L', order => 'partition', show_involved_partition => true)")
             .collect()
             .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
           assertResult(1)(resultC.length)
-          assertResult("ts=1006,ts=1007,ts=1009")(resultC(0)(3))
+          assertResult("partition=1006,partition=1007,partition=1009")(resultC(0)(3))
 
           // There are 3 completed clustering instants
           val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -380,28 +383,28 @@
 
           // All clustering instants are completed
           checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")(
-            Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001"),
-            Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1002,ts=1003,ts=1004,ts=1005"),
-            Seq(resultC(0).head, resultC(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1006,ts=1007,ts=1009")
+            Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "partition=1000,partition=1001"),
+            Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "partition=1002,partition=1003,partition=1004,partition=1005"),
+            Seq(resultC(0).head, resultC(0)(1), HoodieInstant.State.COMPLETED.name(), "partition=1006,partition=1007,partition=1009")
           )
 
-          checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-            Seq(1, "a1", 10.0, 1000),
-            Seq(2, "a2", 10.0, 1001),
-            Seq(3, "a3", 10.0, 1002),
-            Seq(4, "a4", 10.0, 1003),
-            Seq(5, "a5", 10.0, 1004),
-            Seq(6, "a6", 10.0, 1005),
-            Seq(7, "a7", 10.0, 1006),
-            Seq(8, "a8", 10.0, 1007),
-            Seq(9, "a9", 10.0, 1008),
-            Seq(10, "a10", 10.0, 1009)
+          checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+            Seq(1, "a1", 10.0, 1000, 1000),
+            Seq(2, "a2", 10.0, 1001, 1001),
+            Seq(3, "a3", 10.0, 1002, 1002),
+            Seq(4, "a4", 10.0, 1003, 1003),
+            Seq(5, "a5", 10.0, 1004, 1004),
+            Seq(6, "a6", 10.0, 1005, 1005),
+            Seq(7, "a7", 10.0, 1006, 1006),
+            Seq(8, "a8", 10.0, 1007, 1007),
+            Seq(9, "a9", 10.0, 1008, 1008),
+            Seq(10, "a10", 10.0, 1009, 1009)
           )
         }
 
         // Test partition pruning with invalid predicates
         {
-          val resultD = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1111L', order => 'ts', show_involved_partition => true)")
+          val resultD = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'partition > 1111L', order => 'partition', show_involved_partition => true)")
             .collect()
             .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
           assertResult(0)(resultD.length)
@@ -460,7 +463,7 @@
 
       // test with operator schedule
       checkExceptionContain(
-      s"call run_clustering(table => '$tableName', instants => '000000', op => 'schedule')"
+        s"call run_clustering(table => '$tableName', instants => '000000', op => 'schedule')"
       )("specific instants only can be used in 'execute' op or not specific op")
 
       // test with operator scheduleAndExecute
@@ -628,56 +631,57 @@
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts long,
+           |  partition long
            |) using hudi
            | options (
            |  primaryKey ='id',
            |  type = 'cow',
            |  preCombineField = 'ts'
            | )
-           | partitioned by(ts)
+           | partitioned by(partition)
            | location '$basePath'
      """.stripMargin)
 
       // Test clustering with PARTITION_SELECTED config set, choose only a part of all partitions to schedule
       {
-        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
-        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
-        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011)")
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010, 1010)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010, 1010)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011, 1011)")
         // Do
         val result = spark.sql(s"call run_clustering(table => '$tableName', " +
-          s"selected_partitions => 'ts=1010', show_involved_partition => true)")
+          s"selected_partitions => 'partition=1010', show_involved_partition => true)")
           .collect()
           .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
         assertResult(1)(result.length)
-        assertResult("ts=1010")(result(0)(3))
+        assertResult("partition=1010")(result(0)(3))
 
-        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-          Seq(1, "a1", 10.0, 1010),
-          Seq(2, "a2", 10.0, 1010),
-          Seq(3, "a3", 10.0, 1011)
+        checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+          Seq(1, "a1", 10.0, 1010, 1010),
+          Seq(2, "a2", 10.0, 1010, 1010),
+          Seq(3, "a3", 10.0, 1011, 1011)
         )
       }
 
       // Test clustering with PARTITION_SELECTED, choose all partitions to schedule
       {
-        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010)")
-        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011)")
-        spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012)")
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010, 1010)")
+        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011, 1011)")
+        spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012, 1012)")
         val result = spark.sql(s"call run_clustering(table => '$tableName', " +
-          s"selected_partitions => 'ts=1010,ts=1011,ts=1012', show_involved_partition => true)")
+          s"selected_partitions => 'partition=1010,partition=1011,partition=1012', show_involved_partition => true)")
           .collect()
           .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3)))
         assertResult(1)(result.length)
-        assertResult("ts=1010,ts=1011,ts=1012")(result(0)(3))
+        assertResult("partition=1010,partition=1011,partition=1012")(result(0)(3))
 
-        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-          Seq(1, "a1", 10.0, 1010),
-          Seq(2, "a2", 10.0, 1010),
-          Seq(3, "a3", 10.0, 1011),
-          Seq(4, "a4", 10.0, 1010),
-          Seq(5, "a5", 10.0, 1011),
-          Seq(6, "a6", 10.0, 1012)
+        checkAnswer(s"select id, name, price, ts, partition from $tableName order by id")(
+          Seq(1, "a1", 10.0, 1010, 1010),
+          Seq(2, "a2", 10.0, 1010, 1010),
+          Seq(3, "a3", 10.0, 1011, 1011),
+          Seq(4, "a4", 10.0, 1010, 1010),
+          Seq(5, "a5", 10.0, 1011, 1011),
+          Seq(6, "a6", 10.0, 1012, 1012)
         )
       }
     }
@@ -693,7 +697,8 @@
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts long,
+           |  partition long
            |) using hudi
            | tblproperties (
            |  primaryKey ='id',
@@ -702,12 +707,12 @@
            |  hoodie.index.type = 'BUCKET',
            |  hoodie.bucket.index.hash.field = 'id'
            | )
-           | partitioned by (ts)
+           | partitioned by (partition)
            | location '$basePath'
      """.stripMargin)
 
-      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
-      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010, 1010)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010, 1010)")
 
       checkExceptionContain(s"call run_clustering(table => '$tableName')")(
         "Executor SparkExecuteClusteringCommitActionExecutor is not compatible with table layout HoodieSimpleBucketLayout")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
index 10ee950..c9a8c8e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
@@ -31,9 +31,10 @@
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts long,
+           |  partition int
            |) using hudi
-           | partitioned by (ts)
+           | partitioned by (partition)
            | location '$tablePath'
            | tblproperties (
            |  type = 'mor',
@@ -42,8 +43,8 @@
            | )
        """.stripMargin)
       // insert data to table
-      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
-      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500, 1500")
       spark.sql(s"update $tableName set name = 'b1', price = 100 where id = 1")
 
       // Check required fields
@@ -52,7 +53,7 @@
 
       // collect result for table
       val result = spark.sql(
-        s"""call show_logfile_metadata(table => '$tableName', log_file_path_pattern => '$tablePath/ts=1000/*.log.*')""".stripMargin).collect()
+        s"""call show_logfile_metadata(table => '$tableName', log_file_path_pattern => '$tablePath/partition=1000/*.log.*')""".stripMargin).collect()
       assertResult(1) {
         result.length
       }
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 7458d16..9f3a5ce0 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -98,22 +98,21 @@
     Join(left, right, joinType, condition)
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+  override def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean = {
+    val thisOutput = a.output
+    val otherOutput = b.output
+    thisOutput.length == otherOutput.length && thisOutput.zip(otherOutput).forall {
+      case (a1, a2) => a1.semanticEquals(a2)
+    }
+  }
+
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case physicalOperation@PhysicalOperation(_, _,
-      logicalRelation@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait] && !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
-        val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait]
-        ff.isProjected = true
-        val tableSchema = fs.location match {
-          case index: HoodieCDCFileIndex => index.cdcRelation.schema
-          case index: SparkHoodieTableFileIndex => index.schema
-        }
-        val resolvedSchema = logicalRelation.resolve(tableSchema, fs.sparkSession.sessionState.analyzer.resolver)
-        if (!fs.partitionSchema.fields.isEmpty) {
-          Project(resolvedSchema, physicalOperation)
-        } else {
-          physicalOperation
-        }
+      logicalRelation@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+        if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+          && !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
+        FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(physicalOperation, logicalRelation, fs)
       case _ => plan
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
index f5757e1..399e053 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
@@ -17,17 +17,15 @@
 
 package org.apache.spark.sql
 
-import org.apache.hudi.{HoodieCDCFileIndex, SparkAdapterSupport, SparkHoodieTableFileIndex}
+import org.apache.hudi.SparkAdapterSupport
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.TableOutputResolver
 import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LeafNode, LogicalPlan}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.execution.command.{CreateTableLikeCommand, ExplainCommand}
-import org.apache.spark.sql.execution.datasources.HadoopFsRelation
-import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, ParquetFileFormat}
 import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -82,25 +80,14 @@
   override def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, condition: Option[Expression], hint: String): LogicalPlan = {
     Join(left, right, joinType, condition, JoinHint.NONE)
   }
+
+  override def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean = {
+    a.sameOutput(b)
+  }
 }
 
 object HoodieSpark3CatalystPlanUtils extends SparkAdapterSupport {
 
-  def applyNewFileFormatChanges(scanOperation: LogicalPlan, logicalRelation: LogicalPlan, fs: HadoopFsRelation): LogicalPlan = {
-    val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait]
-    ff.isProjected = true
-    val tableSchema = fs.location match {
-      case index: HoodieCDCFileIndex => index.cdcRelation.schema
-      case index: SparkHoodieTableFileIndex => index.schema
-    }
-    val resolvedSchema = logicalRelation.resolve(tableSchema, fs.sparkSession.sessionState.analyzer.resolver)
-    if (!fs.partitionSchema.fields.isEmpty && scanOperation.sameOutput(logicalRelation)) {
-      Project(resolvedSchema, scanOperation)
-    } else {
-      scanOperation
-    }
-  }
-
   /**
    * This is an extractor to accommodate for [[ResolvedTable]] signature change in Spark 3.2
    */
diff --git a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
index 34a1e08..dbe68bb 100644
--- a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
@@ -45,13 +45,13 @@
     }
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _,
       l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
         if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
           && !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
-        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+        FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
index dd3ac53..765a9b0 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
@@ -46,13 +46,13 @@
     }
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _,
       l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
         if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
           && !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
-        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+        FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
index 7a30aeb..8562ca1 100644
--- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
@@ -48,13 +48,13 @@
     }
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _,
       l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
         if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
           && !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
-        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+        FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
index 9bcc40a..54dbaa0 100644
--- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
@@ -46,13 +46,13 @@
     }
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _,
       l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
         if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
           && !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
-        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+        FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
index e49adeb..2c50d21 100644
--- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
@@ -45,13 +45,13 @@
     }
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _, _,
       l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
         if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
           && !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
-        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+        FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
index eac3153..b95ee94 100644
--- a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
@@ -45,13 +45,13 @@
     }
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan = {
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _, _,
       l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
         if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
           && !fs.fileFormat.asInstanceOf[ParquetFileFormat with HoodieFormatTrait].isProjected =>
-        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+        FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 03208a0..b39c5f2 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -60,7 +60,6 @@
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieIncrementalPathNotFoundException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveSyncClient;
@@ -105,6 +104,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.AnalysisException;
@@ -2390,7 +2390,7 @@
 
     insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
     //No change as this fails with Path not exist error
-    assertThrows(HoodieIncrementalPathNotFoundException.class, () -> new HoodieDeltaStreamer(downstreamCfg, jsc).sync());
+    assertThrows(SparkException.class, () -> new HoodieDeltaStreamer(downstreamCfg, jsc).sync());
     assertRecordCount(1000, downstreamTableBasePath, sqlContext);
 
     if (downstreamCfg.configs == null) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index 1b534c22..b9e20fb 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -46,6 +46,7 @@
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -72,6 +73,11 @@
 
 public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {
 
+  @Override
+  public SparkConf conf() {
+    return conf(SparkClientFunctionalTestHarness.getSparkSqlConf());
+  }
+
   private HoodieTestDataGenerator dataGen;
   private HoodieTableMetaClient metaClient;
   private HoodieTableType tableType = COPY_ON_WRITE;