[HUDI-6804] Fix hive read schema evolution MOR table (#9573)

diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
index f9f7faf..746066e 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
@@ -82,7 +82,7 @@
 

   private final InputSplit split;

   private final JobConf job;

-  private HoodieTableMetaClient metaClient;

+  private final HoodieTableMetaClient metaClient;

   public Option<InternalSchema> internalSchemaOption;

 

   public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException {

@@ -149,6 +149,7 @@
       realtimeRecordReader.setWriterSchema(writerSchema);

       realtimeRecordReader.setReaderSchema(readerSchema);

       realtimeRecordReader.setHiveSchema(hiveSchema);

+      internalSchemaOption = Option.of(prunedInternalSchema);

       RealtimeSplit realtimeSplit = (RealtimeSplit) split;

       LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",

           realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(), requiredColumns));

@@ -171,7 +172,7 @@
       if (!disableSchemaEvolution) {

         prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns);

         InternalSchema querySchema = prunedSchema;

-        Long commitTime = Long.valueOf(FSUtils.getCommitTime(finalPath.getName()));

+        long commitTime = Long.parseLong(FSUtils.getCommitTime(finalPath.getName()));

         InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false);

         InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true,

             true).mergeSchema();

@@ -258,10 +259,10 @@
       case DECIMAL:

         return typeInfo;

       case TIME:

-        throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", new Object[] { type }));

+        throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", type));

       default:

-        LOG.error(String.format("cannot convert unknown type: %s to Hive", new Object[] { type }));

-        throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", new Object[] { type }));

+        LOG.error(String.format("cannot convert unknown type: %s to Hive", type));

+        throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", type));

     }

   }

 

diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
index 027224d..dff9d2e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
@@ -19,39 +19,46 @@
 package org.apache.hudi.functional;

 

 import org.apache.hudi.HoodieSparkUtils;

-import org.apache.hudi.common.fs.FSUtils;

 import org.apache.hudi.hadoop.HoodieParquetInputFormat;

-import org.apache.hudi.hadoop.SchemaEvolutionContext;

-import org.apache.hudi.hadoop.realtime.HoodieEmptyRecordReader;

-import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;

-import org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader;

-import org.apache.hudi.hadoop.realtime.RealtimeSplit;

+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;

 

-import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;

 import org.apache.hadoop.fs.Path;

-import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;

 import org.apache.hadoop.hive.serde.serdeConstants;

 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;

+import org.apache.hadoop.io.ArrayWritable;

+import org.apache.hadoop.io.DoubleWritable;

+import org.apache.hadoop.io.NullWritable;

+import org.apache.hadoop.io.Text;

+import org.apache.hadoop.io.Writable;

 import org.apache.hadoop.mapred.FileInputFormat;

 import org.apache.hadoop.mapred.InputSplit;

 import org.apache.hadoop.mapred.JobConf;

 import org.apache.hadoop.mapred.RecordReader;

 import org.apache.spark.SparkConf;

 import org.apache.spark.sql.SparkSession;

+import org.junit.jupiter.api.AfterEach;

 import org.junit.jupiter.api.BeforeEach;

 import org.junit.jupiter.api.Tag;

-import org.junit.jupiter.api.Test;

 import org.junit.jupiter.api.io.TempDir;

+import org.junit.jupiter.params.ParameterizedTest;

+import org.junit.jupiter.params.provider.ValueSource;

 

+import java.io.IOException;

+import java.util.ArrayList;

+import java.util.Arrays;

 import java.util.Date;

+import java.util.List;

+import java.util.Objects;

+import java.util.stream.Collectors;

 

 import static org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;

 import static org.junit.jupiter.api.Assertions.assertEquals;

+import static org.junit.jupiter.api.Assertions.assertTrue;

 

 @Tag("functional")

 public class TestHiveTableSchemaEvolution {

 

-  private SparkSession sparkSession = null;

+  private SparkSession spark = null;

 

   @TempDir

   java.nio.file.Path basePath;

@@ -61,90 +68,98 @@
     initSparkContexts("HiveSchemaEvolution");

   }

 

+  @AfterEach

+  public void clean() {

+    if (spark != null) {

+      spark.close();

+    }

+  }

+

   private void initSparkContexts(String appName) {

     SparkConf sparkConf = getSparkConfForTest(appName);

 

-    sparkSession = SparkSession.builder()

+    spark = SparkSession.builder()

         .config("hoodie.support.write.lock", "false")

         .config("spark.sql.session.timeZone", "CTT")

         .config("spark.sql.hive.convertMetastoreParquet", "false")

         .config(sparkConf)

         .getOrCreate();

 

-    sparkSession.sparkContext().setLogLevel("ERROR");

+    spark.sparkContext().setLogLevel("ERROR");

   }

 

-  @Test

-  public void testCopyOnWriteTableForHive() throws Exception {

-    String tableName = "huditest" + new Date().getTime();

+  @ParameterizedTest

+  @ValueSource(strings = {"cow", "mor"})

+  public void testHiveReadSchemaEvolutionTable(String tableType) throws Exception {

     if (HoodieSparkUtils.gteqSpark3_1()) {

-      sparkSession.sql("set hoodie.schema.on.read.enable=true");

+      String tableName = "hudi_test" + new Date().getTime();

       String path = new Path(basePath.toAbsolutePath().toString()).toUri().toString();

-      sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'");

-      sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')");

-      sparkSession.sql("alter table " + tableName + " alter column col1 type double");

-      sparkSession.sql("alter table " + tableName + " rename column col2 to aaa");

 

-      HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat();

+      spark.sql("set hoodie.schema.on.read.enable=true");

+      spark.sql(String.format("create table %s (col0 int, col1 float, col2 string) using hudi "

+              + "tblproperties (type='%s', primaryKey='col0', preCombineField='col1') location '%s'",

+          tableName, tableType, path));

+      spark.sql(String.format("insert into %s values(1, 1.1, 'text')", tableName));

+      spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1", tableName));

+      spark.sql(String.format("alter table %s alter column col1 type double", tableName));

+      spark.sql(String.format("alter table %s rename column col2 to col2_new", tableName));

+

       JobConf jobConf = new JobConf();

-      inputFormat.setConf(jobConf);

+      jobConf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false");

+      jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,col2_new");

+      jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7");

+      jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno,"

+          + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2_new");

+      jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string");

       FileInputFormat.setInputPaths(jobConf, path);

-      InputSplit[] splits = inputFormat.getSplits(jobConf, 1);

-      assertEvolutionResult("cow", splits[0], jobConf);

-    }

-  }

 

-  @Test

-  public void testMergeOnReadTableForHive() throws Exception {

-    String tableName = "huditest" + new Date().getTime();

-    if (HoodieSparkUtils.gteqSpark3_1()) {

-      sparkSession.sql("set hoodie.schema.on.read.enable=true");

-      String path = new Path(basePath.toAbsolutePath().toString()).toUri().toString();

-      sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'");

-      sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')");

-      sparkSession.sql("insert into " + tableName + " values(2, 1.2, 'text2')");

-      sparkSession.sql("alter table " + tableName + " alter column col1 type double");

-      sparkSession.sql("alter table " + tableName + " rename column col2 to aaa");

-

-      HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat();

-      JobConf jobConf = new JobConf();

+      HoodieParquetInputFormat inputFormat = "cow".equals(tableType) ? new HoodieParquetInputFormat()

+          : new HoodieParquetRealtimeInputFormat();

       inputFormat.setConf(jobConf);

-      FileInputFormat.setInputPaths(jobConf, path);

+

       InputSplit[] splits = inputFormat.getSplits(jobConf, 1);

-      assertEvolutionResult("mor", splits[0], jobConf);

-    }

-  }

+      assertEquals(1, splits.length);

 

-  private void assertEvolutionResult(String tableType, InputSplit split, JobConf jobConf) throws Exception {

-    jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,aaa");

-    jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7");

-    jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno,"

-        + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,aaa");

-    jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string");

+      RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat.getRecordReader(splits[0], jobConf, null);

+      List<List<Writable>> records = getWritableList(recordReader);

+      assertEquals(1, records.size());

+      List<Writable> record1 = records.get(0);

+      if ("cow".equals(tableType)) {

+        // col1, col2_new

+        assertEquals(2, record1.size());

 

-    SchemaEvolutionContext schemaEvolutionContext = new SchemaEvolutionContext(split, jobConf);

-    if ("cow".equals(tableType)) {

-      schemaEvolutionContext.doEvolutionForParquetFormat();

-    } else {

-      // mot table

-      RealtimeSplit realtimeSplit = (RealtimeSplit) split;

-      RecordReader recordReader;

-      // for log only split, set the parquet reader as empty.

-      if (FSUtils.isLogFile(realtimeSplit.getPath())) {

-        recordReader = new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf));

+        Writable c1 = record1.get(0);

+        assertTrue(c1 instanceof DoubleWritable);

+        assertEquals("1.1", c1.toString().substring(0, 3));

+

+        Writable c2 = record1.get(1);

+        assertTrue(c2 instanceof Text);

+        assertEquals("text2", c2.toString());

       } else {

-        // create a RecordReader to be used by HoodieRealtimeRecordReader

-        recordReader = new MapredParquetInputFormat().getRecordReader(realtimeSplit, jobConf, null);

-      }

-      RealtimeCompactedRecordReader realtimeCompactedRecordReader = new RealtimeCompactedRecordReader(realtimeSplit, jobConf, recordReader);

-      // mor table also run with doEvolutionForParquetFormat in HoodieParquetInputFormat

-      schemaEvolutionContext.doEvolutionForParquetFormat();

-      schemaEvolutionContext.doEvolutionForRealtimeInputFormat(realtimeCompactedRecordReader);

-    }

+        // _hoodie_record_key,_hoodie_commit_time,_hoodie_partition_path, col1, col2_new

+        assertEquals(5, record1.size());

 

-    assertEquals(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), "col1,col2");

-    assertEquals(jobConf.get(serdeConstants.LIST_COLUMNS), "_hoodie_commit_time,_hoodie_commit_seqno,"

-        + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2");

-    assertEquals(jobConf.get(serdeConstants.LIST_COLUMN_TYPES), "string,string,string,string,string,int,double,string");

+        Writable c1 = record1.get(3);

+        assertTrue(c1 instanceof DoubleWritable);

+        assertEquals("1.1", c1.toString().substring(0, 3));

+

+        Writable c2 = record1.get(4);

+        assertTrue(c2 instanceof Text);

+        assertEquals("text2", c2.toString());

+      }

+      recordReader.close();

+    }

+  }

+

+  private List<List<Writable>> getWritableList(RecordReader<NullWritable, ArrayWritable> recordReader) throws IOException {

+    List<List<Writable>> records = new ArrayList<>();

+    NullWritable key = recordReader.createKey();

+    ArrayWritable writable = recordReader.createValue();

+    while (writable != null && recordReader.next(key, writable)) {

+      records.add(Arrays.stream(writable.get())

+          .filter(Objects::nonNull)

+          .collect(Collectors.toList()));

+    }

+    return records;

   }

 }