HIVE-29327: Iceberg: [V3] Add support for initial column default (#6200)
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
index 362ea6a..f651406 100644
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
@@ -502,8 +502,4 @@ public static Object getDefaultValue(String defaultValue, Type type) {
default -> Conversions.fromPartitionString(type, stripQuotes(defaultValue));
};
}
-
- public static Type getStructType(TypeInfo typeInfo, String defaultValue) {
- return HiveSchemaConverter.convert(typeInfo, false, defaultValue);
- }
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
index 0575167..58908b0 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
@@ -428,14 +428,15 @@ protected void setOrcOnlyFilesParam(org.apache.hadoop.hive.metastore.api.Table h
}
protected boolean isOrcOnlyFiles(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
- return !"FALSE".equalsIgnoreCase(hmsTable.getParameters().get(ORC_FILES_ONLY)) &&
- (hmsTable.getSd().getInputFormat() != null &&
- hmsTable.getSd().getInputFormat().toUpperCase().contains(org.apache.iceberg.FileFormat.ORC.name()) ||
- org.apache.iceberg.FileFormat.ORC.name()
- .equalsIgnoreCase(hmsTable.getSd().getSerdeInfo().getParameters()
- .get(TableProperties.DEFAULT_FILE_FORMAT)) ||
- org.apache.iceberg.FileFormat.ORC.name()
- .equalsIgnoreCase(hmsTable.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT)));
+ return !"FALSE".equalsIgnoreCase(hmsTable.getParameters().get(ORC_FILES_ONLY)) && isOrcFileFormat(hmsTable);
+ }
+
+ static boolean isOrcFileFormat(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+ return hmsTable.getSd().getInputFormat() != null && hmsTable.getSd().getInputFormat().toUpperCase()
+ .contains(org.apache.iceberg.FileFormat.ORC.name()) || org.apache.iceberg.FileFormat.ORC.name()
+ .equalsIgnoreCase(hmsTable.getSd().getSerdeInfo().getParameters().get(TableProperties.DEFAULT_FILE_FORMAT)) ||
+ org.apache.iceberg.FileFormat.ORC.name()
+ .equalsIgnoreCase(hmsTable.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT));
}
protected void setWriteModeDefaults(Table icebergTbl, Map<String, String> newProps, EnvironmentContext context) {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 77d9915..242772a 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -680,13 +680,18 @@ private void handleAddColumns(org.apache.hadoop.hive.metastore.api.Table hmsTabl
(List<SQLDefaultConstraint>) SessionStateUtil.getResource(conf, SessionStateUtil.COLUMN_DEFAULTS).orElse(null);
Map<String, String> defaultValues = Stream.ofNullable(sqlDefaultConstraints).flatMap(Collection::stream)
.collect(Collectors.toMap(SQLDefaultConstraint::getColumn_name, SQLDefaultConstraint::getDefault_value));
+ boolean isORc = isOrcFileFormat(hmsTable);
for (FieldSchema addedCol : addedCols) {
String defaultValue = defaultValues.get(addedCol.getName());
Type type = HiveSchemaUtil.convert(TypeInfoUtils.getTypeInfoFromTypeString(addedCol.getType()), defaultValue);
Literal<Object> defaultVal = Optional.ofNullable(defaultValue).filter(v -> !type.isStructType())
.map(v -> Expressions.lit(HiveSchemaUtil.getDefaultValue(v, type))).orElse(null);
- updateSchema.addColumn(addedCol.getName(), type, addedCol.getComment(), defaultVal);
+ // ORC doesn't have support for initialDefault from iceberg layer, we only need to set default for writeDefault.
+ updateSchema.addColumn(addedCol.getName(), type, addedCol.getComment(), isORc ? null : defaultVal);
+ if (isORc && defaultVal != null) {
+ updateSchema.updateColumnDefault(addedCol.getName(), defaultVal);
+ }
}
updateSchema.commit();
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index dc13857..20a73cb 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -62,6 +62,7 @@
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.orc.impl.OrcTail;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -161,7 +162,8 @@ public static CloseableIterable<HiveBatchContext> reader(Table table, Path path,
break;
case PARQUET:
- recordReader = parquetRecordReader(job, reporter, task, path, start, length, fileId);
+ recordReader = parquetRecordReader(job, reporter, task, path, start, length, fileId,
+ getInitialColumnDefaults(table.schema().columns()));
break;
default:
throw new UnsupportedOperationException("Vectorized Hive reading unimplemented for format: " + format);
@@ -177,6 +179,17 @@ public static CloseableIterable<HiveBatchContext> reader(Table table, Path path,
}
}
+ static Map<String, Object> getInitialColumnDefaults(List<Types.NestedField> columns) {
+ Map<String, Object> columnDefaults = Maps.newHashMap();
+
+ for (Types.NestedField column : columns) {
+ if (column.initialDefault() != null) {
+ columnDefaults.put(column.name(), column.initialDefault());
+ }
+ }
+ return columnDefaults;
+ }
+
private static RecordReader<NullWritable, VectorizedRowBatch> orcRecordReader(JobConf job, Reporter reporter,
FileScanTask task, Path path, long start, long length, List<Integer> readColumnIds,
SyntheticFileId fileId, Expression residual, String tableName) throws IOException {
@@ -218,7 +231,8 @@ private static RecordReader<NullWritable, VectorizedRowBatch> orcRecordReader(Jo
}
private static RecordReader<NullWritable, VectorizedRowBatch> parquetRecordReader(JobConf job, Reporter reporter,
- FileScanTask task, Path path, long start, long length, SyntheticFileId fileId) throws IOException {
+ FileScanTask task, Path path, long start, long length, SyntheticFileId fileId,
+ Map<String, Object> initialColumnDefaults) throws IOException {
InputSplit split = new FileSplit(path, start, length, job);
VectorizedParquetInputFormat inputFormat = new VectorizedParquetInputFormat();
@@ -249,6 +263,7 @@ private static RecordReader<NullWritable, VectorizedRowBatch> parquetRecordReade
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), typeWithIds, psv);
job.set(IOConstants.COLUMNS, psv.retrieveColumnNameList());
+ inputFormat.seInitialColumnDefaults(initialColumnDefaults);
return inputFormat.getRecordReader(split, job, reporter);
}
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_initial_default.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_initial_default.q
new file mode 100644
index 0000000..8d78e67
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_initial_default.q
@@ -0,0 +1,100 @@
+CREATE TABLE ice_parq (
+ id INT)
+STORED BY ICEBERG stored as parquet
+TBLPROPERTIES ('format-version'='3');
+
+INSERT INTO ice_parq (id) VALUES (1);
+
+ALTER TABLE ice_parq ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+ name STRING DEFAULT 'unknown',
+ age INT DEFAULT 25,
+ salary DOUBLE DEFAULT 50000.0,
+ is_active BOOLEAN DEFAULT TRUE,
+ created_date DATE DEFAULT '2024-01-01',
+ created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+ score DECIMAL(5,2) DEFAULT 100.00,
+ category STRING DEFAULT 'general');
+
+INSERT INTO ice_parq (id) VALUES (2);
+
+SELECT * FROM ice_parq ORDER BY id;
+
+-- change default of a field of Struct column
+ALTER TABLE ice_parq CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88';
+
+-- rename and change default value of age column
+ALTER TABLE ice_parq CHANGE COLUMN age age_new int DEFAULT 21;
+
+INSERT INTO ice_parq (id) VALUES (3);
+
+SELECT * FROM ice_parq ORDER BY id;
+
+-- AVRO table
+CREATE TABLE ice_avro (
+ id INT)
+STORED BY ICEBERG stored as avro
+TBLPROPERTIES ('format-version'='3');
+
+INSERT INTO ice_avro (id) VALUES (1);
+
+ALTER TABLE ice_avro ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+ name STRING DEFAULT 'unknown',
+ age INT DEFAULT 25,
+ salary DOUBLE DEFAULT 50000.0,
+ is_active BOOLEAN DEFAULT TRUE,
+ created_date DATE DEFAULT '2024-01-01',
+ created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+ score DECIMAL(5,2) DEFAULT 100.00,
+ category STRING DEFAULT 'general');
+
+INSERT INTO ice_avro (id) VALUES (2);
+
+SELECT * FROM ice_avro ORDER BY id;
+
+-- change default of a field of Struct column
+ALTER TABLE ice_avro CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88';
+
+-- rename and change default value of age column
+ALTER TABLE ice_avro CHANGE COLUMN age age_new int DEFAULT 21;
+
+INSERT INTO ice_avro (id) VALUES (3);
+
+SELECT * FROM ice_avro ORDER BY id;
+
+-- ORC table
+CREATE TABLE ice_orc (
+ id INT)
+STORED BY ICEBERG stored as orc
+TBLPROPERTIES ('format-version'='3');
+
+INSERT INTO ice_orc (id) VALUES (1);
+
+ALTER TABLE ice_orc ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+ name STRING DEFAULT 'unknown',
+ age INT DEFAULT 25,
+ salary DOUBLE DEFAULT 50000.0,
+ is_active BOOLEAN DEFAULT TRUE,
+ created_date DATE DEFAULT '2024-01-01',
+ created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+ score DECIMAL(5,2) DEFAULT 100.00,
+ category STRING DEFAULT 'general');
+
+INSERT INTO ice_orc (id) VALUES (2);
+
+SELECT * FROM ice_orc ORDER BY id;
+
+-- change default of a field of Struct column
+ALTER TABLE ice_orc CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88';
+
+-- rename and change default value of age column
+ALTER TABLE ice_orc CHANGE COLUMN age age_new int DEFAULT 21;
+
+INSERT INTO ice_orc (id) VALUES (3);
+
+SELECT * FROM ice_orc ORDER BY id;
+
+-- disable vectorization
+set hive.vectorized.execution.enabled=false;
+SELECT * FROM ice_parq ORDER BY id;
+SELECT * FROM ice_avro ORDER BY id;
+SELECT * FROM ice_orc ORDER BY id;
\ No newline at end of file
diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_alter_default_column.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_alter_default_column.q.out
index bf7cdcf..ec1aa32 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/iceberg_alter_default_column.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_alter_default_column.q.out
@@ -60,7 +60,7 @@
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_t
POSTHOOK: Output: hdfs://### HDFS PATH ###
-1 NULL NULL NULL NULL NULL NULL NULL NULL NULL
+1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
PREHOOK: query: ALTER TABLE ice_t REPLACE COLUMNS (id INT,
point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
@@ -94,7 +94,7 @@
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_t
POSTHOOK: Output: hdfs://### HDFS PATH ###
-1 NULL NULL NULL NULL NULL NULL NULL NULL
+1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
PREHOOK: query: ALTER TABLE ice_t CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
PREHOOK: type: ALTERTABLE_RENAMECOL
@@ -128,7 +128,7 @@
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_t
POSTHOOK: Output: hdfs://### HDFS PATH ###
-1 NULL NULL NULL NULL NULL NULL NULL NULL
+1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
3 {"x":100,"y":88} unknown 21 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
PREHOOK: query: ALTER TABLE ice_t CHANGE COLUMN point point_new STRUCT<x:INT, y:INT> DEFAULT 'x:55,y:88'
@@ -155,7 +155,7 @@
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_t
POSTHOOK: Output: hdfs://### HDFS PATH ###
-1 NULL NULL NULL NULL NULL NULL NULL NULL
+1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
3 {"x":100,"y":88} unknown 21 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
4 {"x":55,"y":88} unknown 21 50000.0 true 2024-01-01 2024-01-01 10:00:00 general
diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_initial_default.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_initial_default.q.out
new file mode 100644
index 0000000..e6ff59c
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_initial_default.q.out
@@ -0,0 +1,330 @@
+PREHOOK: query: CREATE TABLE ice_parq (
+ id INT)
+STORED BY ICEBERG stored as parquet
+TBLPROPERTIES ('format-version'='3')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: CREATE TABLE ice_parq (
+ id INT)
+STORED BY ICEBERG stored as parquet
+TBLPROPERTIES ('format-version'='3')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: INSERT INTO ice_parq (id) VALUES (1)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: INSERT INTO ice_parq (id) VALUES (1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: ALTER TABLE ice_parq ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+ name STRING DEFAULT 'unknown',
+ age INT DEFAULT 25,
+ salary DOUBLE DEFAULT 50000.0,
+ is_active BOOLEAN DEFAULT TRUE,
+ created_date DATE DEFAULT '2024-01-01',
+ created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+ score DECIMAL(5,2) DEFAULT 100.00,
+ category STRING DEFAULT 'general')
+PREHOOK: type: ALTERTABLE_ADDCOLS
+PREHOOK: Input: default@ice_parq
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: ALTER TABLE ice_parq ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+ name STRING DEFAULT 'unknown',
+ age INT DEFAULT 25,
+ salary DOUBLE DEFAULT 50000.0,
+ is_active BOOLEAN DEFAULT TRUE,
+ created_date DATE DEFAULT '2024-01-01',
+ created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+ score DECIMAL(5,2) DEFAULT 100.00,
+ category STRING DEFAULT 'general')
+POSTHOOK: type: ALTERTABLE_ADDCOLS
+POSTHOOK: Input: default@ice_parq
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: INSERT INTO ice_parq (id) VALUES (2)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: INSERT INTO ice_parq (id) VALUES (2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: SELECT * FROM ice_parq ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_parq
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_parq ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_parq
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+PREHOOK: query: ALTER TABLE ice_parq CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@ice_parq
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: ALTER TABLE ice_parq CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@ice_parq
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: ALTER TABLE ice_parq CHANGE COLUMN age age_new int DEFAULT 21
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@ice_parq
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: ALTER TABLE ice_parq CHANGE COLUMN age age_new int DEFAULT 21
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@ice_parq
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: INSERT INTO ice_parq (id) VALUES (3)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: INSERT INTO ice_parq (id) VALUES (3)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: SELECT * FROM ice_parq ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_parq
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_parq ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_parq
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+3 {"x":100,"y":88} unknown 21 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+PREHOOK: query: CREATE TABLE ice_avro (
+ id INT)
+STORED BY ICEBERG stored as avro
+TBLPROPERTIES ('format-version'='3')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: CREATE TABLE ice_avro (
+ id INT)
+STORED BY ICEBERG stored as avro
+TBLPROPERTIES ('format-version'='3')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: INSERT INTO ice_avro (id) VALUES (1)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: INSERT INTO ice_avro (id) VALUES (1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: ALTER TABLE ice_avro ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+ name STRING DEFAULT 'unknown',
+ age INT DEFAULT 25,
+ salary DOUBLE DEFAULT 50000.0,
+ is_active BOOLEAN DEFAULT TRUE,
+ created_date DATE DEFAULT '2024-01-01',
+ created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+ score DECIMAL(5,2) DEFAULT 100.00,
+ category STRING DEFAULT 'general')
+PREHOOK: type: ALTERTABLE_ADDCOLS
+PREHOOK: Input: default@ice_avro
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: ALTER TABLE ice_avro ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+ name STRING DEFAULT 'unknown',
+ age INT DEFAULT 25,
+ salary DOUBLE DEFAULT 50000.0,
+ is_active BOOLEAN DEFAULT TRUE,
+ created_date DATE DEFAULT '2024-01-01',
+ created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+ score DECIMAL(5,2) DEFAULT 100.00,
+ category STRING DEFAULT 'general')
+POSTHOOK: type: ALTERTABLE_ADDCOLS
+POSTHOOK: Input: default@ice_avro
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: INSERT INTO ice_avro (id) VALUES (2)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: INSERT INTO ice_avro (id) VALUES (2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: SELECT * FROM ice_avro ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_avro
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_avro ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_avro
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+PREHOOK: query: ALTER TABLE ice_avro CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@ice_avro
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: ALTER TABLE ice_avro CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@ice_avro
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: ALTER TABLE ice_avro CHANGE COLUMN age age_new int DEFAULT 21
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@ice_avro
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: ALTER TABLE ice_avro CHANGE COLUMN age age_new int DEFAULT 21
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@ice_avro
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: INSERT INTO ice_avro (id) VALUES (3)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: INSERT INTO ice_avro (id) VALUES (3)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: SELECT * FROM ice_avro ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_avro
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_avro ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_avro
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+3 {"x":100,"y":88} unknown 21 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+PREHOOK: query: CREATE TABLE ice_orc (
+ id INT)
+STORED BY ICEBERG stored as orc
+TBLPROPERTIES ('format-version'='3')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: CREATE TABLE ice_orc (
+ id INT)
+STORED BY ICEBERG stored as orc
+TBLPROPERTIES ('format-version'='3')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: INSERT INTO ice_orc (id) VALUES (1)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: INSERT INTO ice_orc (id) VALUES (1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: ALTER TABLE ice_orc ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+ name STRING DEFAULT 'unknown',
+ age INT DEFAULT 25,
+ salary DOUBLE DEFAULT 50000.0,
+ is_active BOOLEAN DEFAULT TRUE,
+ created_date DATE DEFAULT '2024-01-01',
+ created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+ score DECIMAL(5,2) DEFAULT 100.00,
+ category STRING DEFAULT 'general')
+PREHOOK: type: ALTERTABLE_ADDCOLS
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: ALTER TABLE ice_orc ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+ name STRING DEFAULT 'unknown',
+ age INT DEFAULT 25,
+ salary DOUBLE DEFAULT 50000.0,
+ is_active BOOLEAN DEFAULT TRUE,
+ created_date DATE DEFAULT '2024-01-01',
+ created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+ score DECIMAL(5,2) DEFAULT 100.00,
+ category STRING DEFAULT 'general')
+POSTHOOK: type: ALTERTABLE_ADDCOLS
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: INSERT INTO ice_orc (id) VALUES (2)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: INSERT INTO ice_orc (id) VALUES (2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: SELECT * FROM ice_orc ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_orc ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 NULL NULL NULL NULL NULL NULL NULL NULL NULL
+2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+PREHOOK: query: ALTER TABLE ice_orc CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: ALTER TABLE ice_orc CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: ALTER TABLE ice_orc CHANGE COLUMN age age_new int DEFAULT 21
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: ALTER TABLE ice_orc CHANGE COLUMN age age_new int DEFAULT 21
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: INSERT INTO ice_orc (id) VALUES (3)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: INSERT INTO ice_orc (id) VALUES (3)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: SELECT * FROM ice_orc ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_orc ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 NULL NULL NULL NULL NULL NULL NULL NULL NULL
+2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+3 {"x":100,"y":88} unknown 21 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+PREHOOK: query: SELECT * FROM ice_parq ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_parq
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_parq ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_parq
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+3 {"x":100,"y":88} unknown 21 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+PREHOOK: query: SELECT * FROM ice_avro ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_avro
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_avro ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_avro
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+3 {"x":100,"y":88} unknown 21 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+PREHOOK: query: SELECT * FROM ice_orc ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_orc ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 NULL NULL NULL NULL NULL NULL NULL NULL NULL
+2 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
+3 {"x":100,"y":88} unknown 21 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
index 4dd0dee..6035a68 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
@@ -14,6 +14,7 @@
package org.apache.hadoop.hive.ql.io.parquet;
import java.io.IOException;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.io.DataCache;
@@ -40,6 +41,7 @@ public class VectorizedParquetInputFormat
private DataCache dataCache = null;
private Configuration cacheConf = null;
private ParquetMetadata metadata;
+ private Map<String, Object> initialDefaults;
public VectorizedParquetInputFormat() {
}
@@ -50,13 +52,17 @@ public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(
JobConf jobConf,
Reporter reporter) throws IOException {
return new VectorizedParquetRecordReader(
- inputSplit, jobConf, metadataCache, dataCache, cacheConf, metadata);
+ inputSplit, jobConf, metadataCache, dataCache, cacheConf, metadata, initialDefaults);
}
public void setMetadata(ParquetMetadata metadata) throws IOException {
this.metadata = metadata;
}
+ public void seInitialColumnDefaults(Map<String, Object> initialDefaults) {
+ this.initialDefaults = initialDefaults;
+ }
+
@Override
public void injectCaches(
FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java
index ee1d692..e8d95cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java
@@ -18,25 +18,115 @@
package org.apache.hadoop.hive.ql.io.parquet.vector;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
/**
- * A dummy vectorized parquet reader to support schema evolution.
+ * A dummy vectorized parquet reader used for schema evolution.
+ * If a default value is provided, it returns that value for the entire batch.
+ * Otherwise, it returns nulls.
*/
public class VectorizedDummyColumnReader extends BaseVectorizedColumnReader {
- public VectorizedDummyColumnReader() {
+ private final Object defaultValue;
+
+ public VectorizedDummyColumnReader(Object defaultValue) {
super();
+ this.defaultValue = defaultValue;
}
@Override
- public void readBatch(int total, ColumnVector column, TypeInfo columnType) throws IOException {
- Arrays.fill(column.isNull, true);
- column.isRepeating = true;
- column.noNulls = false;
+ public void readBatch(int total, ColumnVector col, TypeInfo typeInfo) throws IOException {
+
+ col.isRepeating = true;
+ // Case 1: No default → (all nulls)
+ if (defaultValue == null) {
+ Arrays.fill(col.isNull, true);
+ col.noNulls = false;
+ return;
+ }
+
+ // Case 2: We have a default → fill with constant value
+ col.noNulls = true;
+ col.isNull[0] = false;
+
+ if (typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+ fillPrimitive(col, (PrimitiveTypeInfo) typeInfo, defaultValue);
+ } else {
+ throw new IOException("Unsupported type category in DummyColumnReader: " + typeInfo.getCategory());
+ }
+ }
+
+ /**
+ * Fill the column with the given value.
+ * @param col the column to fill
+ * @param ti the type info of the column
+ * @param value value to fill the column with
+ * @throws IOException in case of error
+ */
+ private void fillPrimitive(ColumnVector col, PrimitiveTypeInfo ti, Object value) throws IOException {
+
+ switch (ti.getPrimitiveCategory()) {
+
+ case BOOLEAN:
+ ((LongColumnVector) col).vector[0] = ((Boolean) value) ? 1 : 0;
+ return;
+
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ ((LongColumnVector) col).vector[0] = ((Number) value).longValue();
+ return;
+
+ case FLOAT:
+ case DOUBLE:
+ ((DoubleColumnVector) col).vector[0] = ((Number) value).doubleValue();
+ return;
+
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ byte[] bytes = value.toString().getBytes(StandardCharsets.UTF_8);
+ ((BytesColumnVector) col).setRef(0, bytes, 0, bytes.length);
+ return;
+
+ case DECIMAL:
+ DecimalColumnVector dcv = (DecimalColumnVector) col;
+ dcv.set(0, HiveDecimal.create(value.toString()));
+ return;
+
+ case TIMESTAMP: {
+ TimestampColumnVector tcv = (TimestampColumnVector) col;
+
+ long micros = (Long) value;
+ long seconds = micros / 1_000_000L;
+ long nanos = (micros % 1_000_000L) * 1000L;
+ tcv.time[0] = seconds * 1000L;
+ tcv.nanos[0] = (int) nanos;
+
+ return;
+ }
+
+ case DATE: {
+ LongColumnVector lcv = (LongColumnVector) col;
+ lcv.vector[0] = ((Number) value).intValue();
+ return;
+ }
+ default:
+ throw new IOException("Unsupported primitive type in DummyColumnReader: " + ti.getPrimitiveCategory());
+ }
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 8ce85db..a35a453 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -84,6 +84,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
@@ -96,6 +97,7 @@
public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
implements RecordReader<NullWritable, VectorizedRowBatch>, RowPositionAwareVectorizedRecordReader {
public static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class);
+ private final Map<String, Object> initialDefaults;
private List<Integer> colsToInclude;
@@ -154,7 +156,8 @@ public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf) thr
}
public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf, FileMetadataCache metadataCache,
- DataCache dataCache, Configuration cacheConf, ParquetMetadata parquetMetadata) throws IOException {
+ DataCache dataCache, Configuration cacheConf, ParquetMetadata parquetMetadata,
+ Map<String, Object> initialDefaults) throws IOException {
super(conf, oldInputSplit);
try {
this.metadataCache = metadataCache;
@@ -188,6 +191,7 @@ public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf, Fil
}
initPartitionValues(fileSplit, conf);
bucketIdentifier = BucketIdentifier.from(conf, filePath);
+ this.initialDefaults = initialDefaults;
} catch (Throwable e) {
LOG.error("Failed to create the vectorized reader due to exception " + e);
throw new RuntimeException(e);
@@ -196,7 +200,7 @@ public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf, Fil
public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf, FileMetadataCache metadataCache,
DataCache dataCache, Configuration cacheConf) throws IOException {
- this(oldInputSplit, conf, metadataCache, dataCache, cacheConf, null);
+ this(oldInputSplit, conf, metadataCache, dataCache, cacheConf, null, null);
}
private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws IOException {
@@ -524,7 +528,8 @@ private VectorizedColumnReader buildVectorizedParquetReader(
// reader that produces nulls. This allows queries to proceed even
// when new columns have been added after the file was written.
if (!fileSchema.getColumns().contains(descriptors.get(0))) {
- return new VectorizedDummyColumnReader();
+ return new VectorizedDummyColumnReader(Optional.ofNullable(initialDefaults)
+ .map(defaults -> defaults.getOrDefault(descriptors.get(0).getPath()[0], null)).orElse(null));
}
switch (typeInfo.getCategory()) {
case PRIMITIVE: