[HUDI-6804] Fix hive read schema evolution MOR table (#9573)
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
index f9f7faf..746066e 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
@@ -82,7 +82,7 @@
private final InputSplit split;
private final JobConf job;
- private HoodieTableMetaClient metaClient;
+ private final HoodieTableMetaClient metaClient;
public Option<InternalSchema> internalSchemaOption;
public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException {
@@ -149,6 +149,7 @@
realtimeRecordReader.setWriterSchema(writerSchema);
realtimeRecordReader.setReaderSchema(readerSchema);
realtimeRecordReader.setHiveSchema(hiveSchema);
+ internalSchemaOption = Option.of(prunedInternalSchema);
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(), requiredColumns));
@@ -171,7 +172,7 @@
if (!disableSchemaEvolution) {
prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns);
InternalSchema querySchema = prunedSchema;
- Long commitTime = Long.valueOf(FSUtils.getCommitTime(finalPath.getName()));
+ long commitTime = Long.parseLong(FSUtils.getCommitTime(finalPath.getName()));
InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false);
InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true,
true).mergeSchema();
@@ -258,10 +259,10 @@
case DECIMAL:
return typeInfo;
case TIME:
- throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", new Object[] { type }));
+ throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", type));
default:
- LOG.error(String.format("cannot convert unknown type: %s to Hive", new Object[] { type }));
- throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", new Object[] { type }));
+ LOG.error(String.format("cannot convert unknown type: %s to Hive", type));
+ throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", type));
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
index 027224d..dff9d2e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
@@ -19,39 +19,46 @@
package org.apache.hudi.functional;
import org.apache.hudi.HoodieSparkUtils;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
-import org.apache.hudi.hadoop.SchemaEvolutionContext;
-import org.apache.hudi.hadoop.realtime.HoodieEmptyRecordReader;
-import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
-import org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader;
-import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
-import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
import static org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("functional")
public class TestHiveTableSchemaEvolution {
- private SparkSession sparkSession = null;
+ private SparkSession spark = null;
@TempDir
java.nio.file.Path basePath;
@@ -61,90 +68,98 @@
initSparkContexts("HiveSchemaEvolution");
}
+ @AfterEach
+ public void clean() {
+ if (spark != null) {
+ spark.close();
+ }
+ }
+
private void initSparkContexts(String appName) {
SparkConf sparkConf = getSparkConfForTest(appName);
- sparkSession = SparkSession.builder()
+ spark = SparkSession.builder()
.config("hoodie.support.write.lock", "false")
.config("spark.sql.session.timeZone", "CTT")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.config(sparkConf)
.getOrCreate();
- sparkSession.sparkContext().setLogLevel("ERROR");
+ spark.sparkContext().setLogLevel("ERROR");
}
- @Test
- public void testCopyOnWriteTableForHive() throws Exception {
- String tableName = "huditest" + new Date().getTime();
+ @ParameterizedTest
+ @ValueSource(strings = {"cow", "mor"})
+ public void testHiveReadSchemaEvolutionTable(String tableType) throws Exception {
if (HoodieSparkUtils.gteqSpark3_1()) {
- sparkSession.sql("set hoodie.schema.on.read.enable=true");
+ String tableName = "hudi_test" + new Date().getTime();
String path = new Path(basePath.toAbsolutePath().toString()).toUri().toString();
- sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'");
- sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')");
- sparkSession.sql("alter table " + tableName + " alter column col1 type double");
- sparkSession.sql("alter table " + tableName + " rename column col2 to aaa");
- HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat();
+ spark.sql("set hoodie.schema.on.read.enable=true");
+ spark.sql(String.format("create table %s (col0 int, col1 float, col2 string) using hudi "
+ + "tblproperties (type='%s', primaryKey='col0', preCombineField='col1') location '%s'",
+ tableName, tableType, path));
+ spark.sql(String.format("insert into %s values(1, 1.1, 'text')", tableName));
+ spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1", tableName));
+ spark.sql(String.format("alter table %s alter column col1 type double", tableName));
+ spark.sql(String.format("alter table %s rename column col2 to col2_new", tableName));
+
JobConf jobConf = new JobConf();
- inputFormat.setConf(jobConf);
+ jobConf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false");
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,col2_new");
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7");
+ jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno,"
+ + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2_new");
+ jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string");
FileInputFormat.setInputPaths(jobConf, path);
- InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
- assertEvolutionResult("cow", splits[0], jobConf);
- }
- }
- @Test
- public void testMergeOnReadTableForHive() throws Exception {
- String tableName = "huditest" + new Date().getTime();
- if (HoodieSparkUtils.gteqSpark3_1()) {
- sparkSession.sql("set hoodie.schema.on.read.enable=true");
- String path = new Path(basePath.toAbsolutePath().toString()).toUri().toString();
- sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'");
- sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')");
- sparkSession.sql("insert into " + tableName + " values(2, 1.2, 'text2')");
- sparkSession.sql("alter table " + tableName + " alter column col1 type double");
- sparkSession.sql("alter table " + tableName + " rename column col2 to aaa");
-
- HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat();
- JobConf jobConf = new JobConf();
+ HoodieParquetInputFormat inputFormat = "cow".equals(tableType) ? new HoodieParquetInputFormat()
+ : new HoodieParquetRealtimeInputFormat();
inputFormat.setConf(jobConf);
- FileInputFormat.setInputPaths(jobConf, path);
+
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
- assertEvolutionResult("mor", splits[0], jobConf);
- }
- }
+ assertEquals(1, splits.length);
- private void assertEvolutionResult(String tableType, InputSplit split, JobConf jobConf) throws Exception {
- jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,aaa");
- jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7");
- jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno,"
- + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,aaa");
- jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string");
+ RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat.getRecordReader(splits[0], jobConf, null);
+ List<List<Writable>> records = getWritableList(recordReader);
+ assertEquals(1, records.size());
+ List<Writable> record1 = records.get(0);
+ if ("cow".equals(tableType)) {
+ // col1, col2_new
+ assertEquals(2, record1.size());
- SchemaEvolutionContext schemaEvolutionContext = new SchemaEvolutionContext(split, jobConf);
- if ("cow".equals(tableType)) {
- schemaEvolutionContext.doEvolutionForParquetFormat();
- } else {
- // mot table
- RealtimeSplit realtimeSplit = (RealtimeSplit) split;
- RecordReader recordReader;
- // for log only split, set the parquet reader as empty.
- if (FSUtils.isLogFile(realtimeSplit.getPath())) {
- recordReader = new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf));
+ Writable c1 = record1.get(0);
+ assertTrue(c1 instanceof DoubleWritable);
+ assertEquals("1.1", c1.toString().substring(0, 3));
+
+ Writable c2 = record1.get(1);
+ assertTrue(c2 instanceof Text);
+ assertEquals("text2", c2.toString());
} else {
- // create a RecordReader to be used by HoodieRealtimeRecordReader
- recordReader = new MapredParquetInputFormat().getRecordReader(realtimeSplit, jobConf, null);
- }
- RealtimeCompactedRecordReader realtimeCompactedRecordReader = new RealtimeCompactedRecordReader(realtimeSplit, jobConf, recordReader);
- // mor table also run with doEvolutionForParquetFormat in HoodieParquetInputFormat
- schemaEvolutionContext.doEvolutionForParquetFormat();
- schemaEvolutionContext.doEvolutionForRealtimeInputFormat(realtimeCompactedRecordReader);
- }
+ // _hoodie_record_key,_hoodie_commit_time,_hoodie_partition_path, col1, col2_new
+ assertEquals(5, record1.size());
- assertEquals(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), "col1,col2");
- assertEquals(jobConf.get(serdeConstants.LIST_COLUMNS), "_hoodie_commit_time,_hoodie_commit_seqno,"
- + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2");
- assertEquals(jobConf.get(serdeConstants.LIST_COLUMN_TYPES), "string,string,string,string,string,int,double,string");
+ Writable c1 = record1.get(3);
+ assertTrue(c1 instanceof DoubleWritable);
+ assertEquals("1.1", c1.toString().substring(0, 3));
+
+ Writable c2 = record1.get(4);
+ assertTrue(c2 instanceof Text);
+ assertEquals("text2", c2.toString());
+ }
+ recordReader.close();
+ }
+ }
+
+ private List<List<Writable>> getWritableList(RecordReader<NullWritable, ArrayWritable> recordReader) throws IOException {
+ List<List<Writable>> records = new ArrayList<>();
+ NullWritable key = recordReader.createKey();
+ ArrayWritable writable = recordReader.createValue();
+ while (writable != null && recordReader.next(key, writable)) {
+ records.add(Arrays.stream(writable.get())
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList()));
+ }
+ return records;
}
}