[HUDI-7042] Fix new filegroup reader (#10003)
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index 3d59ad2..5cb8800 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -40,6 +40,7 @@
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.expressions.JoinedRow;
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.types.DataType;
@@ -447,6 +448,7 @@
         || schema != null && (
         data instanceof HoodieInternalRow
             || data instanceof GenericInternalRow
+            || data instanceof SpecificInternalRow
             || SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
 
     ValidationUtils.checkState(isValid);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 4a1bd08..90ebf71 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -80,7 +80,7 @@
   }
 
   @Override
-  public void setBaseFileIteraotr(ClosableIterator<T> baseFileIterator) {
+  public void setBaseFileIterator(ClosableIterator<T> baseFileIterator) {
     this.baseFileIterator = baseFileIterator;
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index b655238..2850a77 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -154,7 +154,7 @@
             baseFilePath.get().getHadoopPath(), start, length, readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
         : new EmptyIterator<>();
     scanLogFiles();
-    recordBuffer.setBaseFileIteraotr(baseFileIterator);
+    recordBuffer.setBaseFileIterator(baseFileIterator);
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index 680bbf9..0bf27cf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -100,7 +100,7 @@
    *
    * @param baseFileIterator
    */
-  void setBaseFileIteraotr(ClosableIterator<T> baseFileIterator);
+  void setBaseFileIterator(ClosableIterator<T> baseFileIterator);
 
   /**
    * Check if next merged record exists.
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index d8278ea..3fb8050 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -33,8 +33,10 @@
 import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex, HoodiePartitionCDCFileGroupMapping,
   HoodieSparkUtils, HoodieTableSchema, HoodieTableState, MergeOnReadSnapshotRelation, HoodiePartitionFileSliceMapping,
   SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
 import org.apache.spark.sql.sources.Filter
@@ -42,6 +44,7 @@
 import org.apache.spark.util.SerializableConfiguration
 import org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD
 
+import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.jdk.CollectionConverters.asScalaIteratorConverter
@@ -89,6 +92,7 @@
                                               filters: Seq[Filter],
                                               options: Map[String, String],
                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    val outputSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
     val requiredSchemaWithMandatory = generateRequiredSchemaWithMandatory(requiredSchema, dataSchema, partitionSchema)
     val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f => HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
     val requiredMeta = StructType(requiredSchemaSplits._1)
@@ -117,7 +121,6 @@
                 val logFiles = getLogFilesFromSlice(fileSlice)
                 if (requiredSchemaWithMandatory.isEmpty) {
                   val baseFile = createPartitionedFile(partitionValues, hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
-                  // TODO: Use FileGroupReader here: HUDI-6942.
                   baseFileReader(baseFile)
                 } else if (bootstrapFileOpt.isPresent) {
                   // TODO: Use FileGroupReader here: HUDI-6942.
@@ -134,9 +137,11 @@
                     hoodieBaseFile,
                     logFiles,
                     requiredSchemaWithMandatory,
+                    outputSchema,
+                    partitionSchema,
                     broadcastedHadoopConf.value.value,
-                    file.start,
-                    file.length,
+                    0,
+                    hoodieBaseFile.getFileLen,
                     shouldUseRecordPosition
                   )
                 }
@@ -180,6 +185,8 @@
                                        baseFile: HoodieBaseFile,
                                        logFiles: List[HoodieLogFile],
                                        requiredSchemaWithMandatory: StructType,
+                                       outputSchema: StructType,
+                                       partitionSchema: StructType,
                                        hadoopConf: Configuration,
                                        start: Long,
                                        length: Long,
@@ -201,19 +208,65 @@
       length,
       shouldUseRecordPosition)
     reader.initRecordIterators()
-    reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala
+    // Append partition values to rows and project to output schema
+    appendPartitionAndProject(
+      reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala,
+      requiredSchemaWithMandatory,
+      partitionSchema,
+      outputSchema,
+      partitionValues)
   }
 
-  def generateRequiredSchemaWithMandatory(requiredSchema: StructType,
-                                          dataSchema: StructType,
-                                          partitionSchema: StructType): StructType = {
+  private def appendPartitionAndProject(iter: Iterator[InternalRow],
+                                        inputSchema: StructType,
+                                        partitionSchema: StructType,
+                                        to: StructType,
+                                        partitionValues: InternalRow): Iterator[InternalRow] = {
+    if (partitionSchema.isEmpty) {
+      projectSchema(iter, inputSchema, to)
+    } else {
+      val unsafeProjection = generateUnsafeProjection(StructType(inputSchema.fields ++ partitionSchema.fields), to)
+      val joinedRow = new JoinedRow()
+      iter.map(d => unsafeProjection(joinedRow(d, partitionValues)))
+    }
+  }
+
+  private def projectSchema(iter: Iterator[InternalRow],
+                            from: StructType,
+                            to: StructType): Iterator[InternalRow] = {
+    val unsafeProjection = generateUnsafeProjection(from, to)
+    iter.map(d => unsafeProjection(d))
+  }
+
+  private def generateRequiredSchemaWithMandatory(requiredSchema: StructType,
+                                                  dataSchema: StructType,
+                                                  partitionSchema: StructType): StructType = {
+    // Helper method to get the StructField for nested fields
+    @tailrec
+    def findNestedField(schema: StructType, fieldParts: Array[String]): Option[StructField] = {
+      fieldParts.toList match {
+        case head :: Nil => schema.fields.find(_.name == head) // If it's the last part, find and return the field
+        case head :: tail => // If there are more parts, find the field and its nested fields
+          schema.fields.find(_.name == head) match {
+            case Some(StructField(_, nested: StructType, _, _)) => findNestedField(nested, tail.toArray)
+            case _ => None // The path is not valid
+          }
+        case _ => None // Empty path, should not happen if the input is correct
+      }
+    }
+
+    // If not MergeOnRead or if projection is compatible
     if (isIncremental) {
       StructType(dataSchema.toArray ++ partitionSchema.fields)
     } else if (!isMOR || MergeOnReadSnapshotRelation.isProjectionCompatible(tableState)) {
       val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
       for (field <- mandatoryFields) {
         if (requiredSchema.getFieldIndex(field).isEmpty) {
-          val fieldToAdd = dataSchema.fields(dataSchema.getFieldIndex(field).get)
+          // Support for nested fields
+          val fieldParts = field.split("\\.")
+          val fieldToAdd = findNestedField(dataSchema, fieldParts).getOrElse(
+            throw new IllegalArgumentException(s"Field $field does not exist in the data schema")
+          )
           added.append(fieldToAdd)
         }
       }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 300c9ab..acf406c 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -19,7 +19,7 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -46,12 +46,12 @@
 
   @ParameterizedTest
   @CsvSource(Array(
-    "true,",
-    "true,fare.currency",
-    "false,",
-    "false,fare.currency"
+    "true,,false",
+    "true,fare.currency,false",
+    "false,,false",
+    "false,fare.currency,true"
   ))
-  def testMergeOnReadStorage(isMetadataEnabled: Boolean, preCombineField: String): Unit = {
+  def testMergeOnReadStorage(isMetadataEnabled: Boolean, preCombineField: String, useFileGroupReader: Boolean): Unit = {
     val commonOpts = Map(
       "hoodie.insert.shuffle.parallelism" -> "4",
       "hoodie.upsert.shuffle.parallelism" -> "4",
@@ -70,6 +70,10 @@
     if (!StringUtils.isNullOrEmpty(preCombineField)) {
       options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> preCombineField)
     }
+    if (useFileGroupReader) {
+      options += (DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key() -> String.valueOf(useFileGroupReader))
+      options += (HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> String.valueOf(useFileGroupReader))
+    }
     val dataGen = new HoodieTestDataGenerator(0xDEEF)
     val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
     // Bulk Insert Operation
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
index 172d0a7..1e605b0 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
@@ -18,17 +18,16 @@
 
 package org.apache.hudi.functional
 
-import java.util.function.Consumer
-
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
-import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
 import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs}
+import org.apache.hudi.common.config.HoodieReaderConfig
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.util
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.util.JFunction
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, QuickstartUtils}
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{lit, typedLit}
 import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
@@ -36,8 +35,9 @@
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.EnumSource
+import org.junit.jupiter.params.provider.CsvSource
 
+import java.util.function.Consumer
 import scala.collection.JavaConversions._
 
 class TestPartialUpdateAvroPayload extends HoodieClientTestBase {
@@ -66,8 +66,14 @@
   }
 
   @ParameterizedTest
-  @EnumSource(classOf[HoodieTableType])
-  def testPartialUpdatesAvroPayloadPrecombine(hoodieTableType: HoodieTableType): Unit = {
+  @CsvSource(Array(
+    "COPY_ON_WRITE,false",
+    "MERGE_ON_READ,false",
+    "COPY_ON_WRITE,true",
+    "MERGE_ON_READ,true"
+  ))
+  def testPartialUpdatesAvroPayloadPrecombine(tableType: String, useFileGroupReader: Boolean): Unit = {
+    val hoodieTableType = HoodieTableType.valueOf(tableType)
     val dataGenerator = new QuickstartUtils.DataGenerator()
     val records = convertToStringList(dataGenerator.generateInserts(1))
     val recordsRDD = spark.sparkContext.parallelize(records, 2)
@@ -116,7 +122,10 @@
       .mode(SaveMode.Append)
       .save(basePath)
 
-    val finalDF = spark.read.format("hudi").load(basePath)
+    val finalDF = spark.read.format("hudi")
+      .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(), String.valueOf(useFileGroupReader))
+      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), String.valueOf(useFileGroupReader))
+      .load(basePath)
     assertEquals(finalDF.select("rider").collectAsList().get(0).getString(0), upsert1DF.select("rider").collectAsList().get(0).getString(0))
     assertEquals(finalDF.select("driver").collectAsList().get(0).getString(0), upsert2DF.select("driver").collectAsList().get(0).getString(0))
     assertEquals(finalDF.select("fare").collectAsList().get(0).getDouble(0), upsert3DF.select("fare").collectAsList().get(0).getDouble(0))
diff --git a/style/scalastyle.xml b/style/scalastyle.xml
index bb4bcbe..463ceeb 100644
--- a/style/scalastyle.xml
+++ b/style/scalastyle.xml
@@ -62,7 +62,7 @@
  </check>
  <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
   <parameters>
-   <parameter name="maxParameters"><![CDATA[10]]></parameter>
+   <parameter name="maxParameters"><![CDATA[12]]></parameter>
   </parameters>
  </check>
  <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="false">