HIVE-29327: Iceberg: [V3] Add support for initial column default (#6200)

diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
index 362ea6a..f651406 100644
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
@@ -502,8 +502,4 @@ public static Object getDefaultValue(String defaultValue, Type type) {
       default -> Conversions.fromPartitionString(type, stripQuotes(defaultValue));
     };
   }
-
-  public static Type getStructType(TypeInfo typeInfo, String defaultValue) {
-    return HiveSchemaConverter.convert(typeInfo, false, defaultValue);
-  }
 }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
index 0575167..58908b0 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
@@ -428,14 +428,15 @@ protected void setOrcOnlyFilesParam(org.apache.hadoop.hive.metastore.api.Table h
   }
 
   protected boolean isOrcOnlyFiles(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
-    return !"FALSE".equalsIgnoreCase(hmsTable.getParameters().get(ORC_FILES_ONLY)) &&
-        (hmsTable.getSd().getInputFormat() != null &&
-            hmsTable.getSd().getInputFormat().toUpperCase().contains(org.apache.iceberg.FileFormat.ORC.name()) ||
-            org.apache.iceberg.FileFormat.ORC.name()
-                .equalsIgnoreCase(hmsTable.getSd().getSerdeInfo().getParameters()
-                    .get(TableProperties.DEFAULT_FILE_FORMAT)) ||
-            org.apache.iceberg.FileFormat.ORC.name()
-                .equalsIgnoreCase(hmsTable.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT)));
+    return !"FALSE".equalsIgnoreCase(hmsTable.getParameters().get(ORC_FILES_ONLY)) && isOrcFileFormat(hmsTable);
+  }
+
+  static boolean isOrcFileFormat(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+    return hmsTable.getSd().getInputFormat() != null && hmsTable.getSd().getInputFormat().toUpperCase()
+        .contains(org.apache.iceberg.FileFormat.ORC.name()) || org.apache.iceberg.FileFormat.ORC.name()
+        .equalsIgnoreCase(hmsTable.getSd().getSerdeInfo().getParameters().get(TableProperties.DEFAULT_FILE_FORMAT)) ||
+        org.apache.iceberg.FileFormat.ORC.name()
+        .equalsIgnoreCase(hmsTable.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT));
   }
 
   protected void setWriteModeDefaults(Table icebergTbl, Map<String, String> newProps, EnvironmentContext context) {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 77d9915..242772a 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -680,13 +680,18 @@ private void handleAddColumns(org.apache.hadoop.hive.metastore.api.Table hmsTabl
         (List<SQLDefaultConstraint>) SessionStateUtil.getResource(conf, SessionStateUtil.COLUMN_DEFAULTS).orElse(null);
     Map<String, String> defaultValues = Stream.ofNullable(sqlDefaultConstraints).flatMap(Collection::stream)
         .collect(Collectors.toMap(SQLDefaultConstraint::getColumn_name, SQLDefaultConstraint::getDefault_value));
+    boolean isORc = isOrcFileFormat(hmsTable);
     for (FieldSchema addedCol : addedCols) {
       String defaultValue = defaultValues.get(addedCol.getName());
       Type type = HiveSchemaUtil.convert(TypeInfoUtils.getTypeInfoFromTypeString(addedCol.getType()), defaultValue);
       Literal<Object> defaultVal = Optional.ofNullable(defaultValue).filter(v -> !type.isStructType())
           .map(v -> Expressions.lit(HiveSchemaUtil.getDefaultValue(v, type))).orElse(null);
 
-      updateSchema.addColumn(addedCol.getName(), type, addedCol.getComment(), defaultVal);
+      // ORC doesn't have support for initialDefault from iceberg layer, we only need to set default for writeDefault.
+      updateSchema.addColumn(addedCol.getName(), type, addedCol.getComment(), isORc ? null : defaultVal);
+      if (isORc && defaultVal != null) {
+        updateSchema.updateColumnDefault(addedCol.getName(), defaultVal);
+      }
     }
     updateSchema.commit();
   }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index dc13857..20a73cb 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -62,6 +62,7 @@
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.impl.OrcTail;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -161,7 +162,8 @@ public static CloseableIterable<HiveBatchContext> reader(Table table, Path path,
           break;
 
         case PARQUET:
-          recordReader = parquetRecordReader(job, reporter, task, path, start, length, fileId);
+          recordReader = parquetRecordReader(job, reporter, task, path, start, length, fileId,
+              getInitialColumnDefaults(table.schema().columns()));
           break;
         default:
           throw new UnsupportedOperationException("Vectorized Hive reading unimplemented for format: " + format);
@@ -177,6 +179,17 @@ public static CloseableIterable<HiveBatchContext> reader(Table table, Path path,
     }
   }
 
+  static Map<String, Object> getInitialColumnDefaults(List<Types.NestedField> columns) {
+    Map<String, Object> columnDefaults = Maps.newHashMap();
+
+    for (Types.NestedField column : columns) {
+      if (column.initialDefault() != null) {
+        columnDefaults.put(column.name(), column.initialDefault());
+      }
+    }
+    return columnDefaults;
+  }
+
   private static RecordReader<NullWritable, VectorizedRowBatch> orcRecordReader(JobConf job, Reporter reporter,
       FileScanTask task, Path path, long start, long length, List<Integer> readColumnIds,
       SyntheticFileId fileId, Expression residual, String tableName) throws IOException {
@@ -218,7 +231,8 @@ private static RecordReader<NullWritable, VectorizedRowBatch> orcRecordReader(Jo
   }
 
   private static RecordReader<NullWritable, VectorizedRowBatch> parquetRecordReader(JobConf job, Reporter reporter,
-      FileScanTask task, Path path, long start, long length, SyntheticFileId fileId) throws IOException {
+      FileScanTask task, Path path, long start, long length, SyntheticFileId fileId,
+      Map<String, Object> initialColumnDefaults) throws IOException {
     InputSplit split = new FileSplit(path, start, length, job);
     VectorizedParquetInputFormat inputFormat = new VectorizedParquetInputFormat();
 
@@ -249,6 +263,7 @@ private static RecordReader<NullWritable, VectorizedRowBatch> parquetRecordReade
     TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), typeWithIds, psv);
     job.set(IOConstants.COLUMNS, psv.retrieveColumnNameList());
 
+    inputFormat.seInitialColumnDefaults(initialColumnDefaults);
     return inputFormat.getRecordReader(split, job, reporter);
   }
 
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_initial_default.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_initial_default.q
new file mode 100644
index 0000000..8d78e67
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_initial_default.q
@@ -0,0 +1,100 @@
+CREATE TABLE ice_parq (
+  id INT)
+STORED BY ICEBERG  stored as parquet
+TBLPROPERTIES ('format-version'='3');
+
+INSERT INTO ice_parq (id) VALUES (1);
+
+ALTER TABLE ice_parq ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+  name STRING DEFAULT 'unknown',
+  age INT DEFAULT 25,
+  salary DOUBLE DEFAULT 50000.0,
+  is_active BOOLEAN DEFAULT TRUE,
+  created_date DATE DEFAULT '2024-01-01',
+  created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+  score DECIMAL(5,2) DEFAULT 100.00,
+  category STRING DEFAULT 'general');
+
+INSERT INTO ice_parq (id) VALUES (2);
+
+SELECT * FROM ice_parq ORDER BY id;
+
+-- change default of a field of Struct column
+ALTER TABLE ice_parq CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88';
+
+-- rename and change default value of age column
+ALTER TABLE ice_parq CHANGE COLUMN age age_new int DEFAULT 21;
+
+INSERT INTO ice_parq (id) VALUES (3);
+
+SELECT * FROM ice_parq ORDER BY id;
+
+-- AVRO table
+CREATE TABLE ice_avro (
+  id INT)
+STORED BY ICEBERG  stored as avro
+TBLPROPERTIES ('format-version'='3');
+
+INSERT INTO ice_avro (id) VALUES (1);
+
+ALTER TABLE ice_avro ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+  name STRING DEFAULT 'unknown',
+  age INT DEFAULT 25,
+  salary DOUBLE DEFAULT 50000.0,
+  is_active BOOLEAN DEFAULT TRUE,
+  created_date DATE DEFAULT '2024-01-01',
+  created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+  score DECIMAL(5,2) DEFAULT 100.00,
+  category STRING DEFAULT 'general');
+
+INSERT INTO ice_avro (id) VALUES (2);
+
+SELECT * FROM ice_avro ORDER BY id;
+
+-- change default of a field of Struct column
+ALTER TABLE ice_avro CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88';
+
+-- rename and change default value of age column
+ALTER TABLE ice_avro CHANGE COLUMN age age_new int DEFAULT 21;
+
+INSERT INTO ice_avro (id) VALUES (3);
+
+SELECT * FROM ice_avro ORDER BY id;
+
+-- ORC table
+CREATE TABLE ice_orc (
+  id INT)
+STORED BY ICEBERG stored as orc
+TBLPROPERTIES ('format-version'='3');
+
+INSERT INTO ice_orc (id) VALUES (1);
+
+ALTER TABLE ice_orc ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+  name STRING DEFAULT 'unknown',
+  age INT DEFAULT 25,
+  salary DOUBLE DEFAULT 50000.0,
+  is_active BOOLEAN DEFAULT TRUE,
+  created_date DATE DEFAULT '2024-01-01',
+  created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+  score DECIMAL(5,2) DEFAULT 100.00,
+  category STRING DEFAULT 'general');
+
+INSERT INTO ice_orc (id) VALUES (2);
+
+SELECT * FROM ice_orc ORDER BY id;
+
+-- change default of a field of Struct column
+ALTER TABLE ice_orc CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88';
+
+-- rename and change default value of age column
+ALTER TABLE ice_orc CHANGE COLUMN age age_new int DEFAULT 21;
+
+INSERT INTO ice_orc (id) VALUES (3);
+
+SELECT * FROM ice_orc ORDER BY id;
+
+-- disable vectorization
+set hive.vectorized.execution.enabled=false;
+SELECT * FROM ice_parq ORDER BY id;
+SELECT * FROM ice_avro ORDER BY id;
+SELECT * FROM ice_orc ORDER BY id;
\ No newline at end of file
diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_alter_default_column.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_alter_default_column.q.out
index bf7cdcf..ec1aa32 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/iceberg_alter_default_column.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_alter_default_column.q.out
@@ -60,7 +60,7 @@
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@ice_t
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
+1	NULL	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
 2	{"x":100,"y":99}	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
 PREHOOK: query: ALTER TABLE ice_t REPLACE COLUMNS (id INT,
   point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
@@ -94,7 +94,7 @@
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@ice_t
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
+1	NULL	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	general
 2	{"x":100,"y":99}	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	general
 PREHOOK: query: ALTER TABLE ice_t CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
 PREHOOK: type: ALTERTABLE_RENAMECOL
@@ -128,7 +128,7 @@
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@ice_t
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
+1	NULL	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	general
 2	{"x":100,"y":99}	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	general
 3	{"x":100,"y":88}	unknown	21	50000.0	true	2024-01-01	2024-01-01 10:00:00	general
 PREHOOK: query: ALTER TABLE ice_t CHANGE COLUMN point point_new STRUCT<x:INT, y:INT> DEFAULT 'x:55,y:88'
@@ -155,7 +155,7 @@
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@ice_t
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
+1	NULL	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	general
 2	{"x":100,"y":99}	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	general
 3	{"x":100,"y":88}	unknown	21	50000.0	true	2024-01-01	2024-01-01 10:00:00	general
 4	{"x":55,"y":88}	unknown	21	50000.0	true	2024-01-01	2024-01-01 10:00:00	general
diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_initial_default.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_initial_default.q.out
new file mode 100644
index 0000000..e6ff59c
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_initial_default.q.out
@@ -0,0 +1,330 @@
+PREHOOK: query: CREATE TABLE ice_parq (
+  id INT)
+STORED BY ICEBERG  stored as parquet
+TBLPROPERTIES ('format-version'='3')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: CREATE TABLE ice_parq (
+  id INT)
+STORED BY ICEBERG  stored as parquet
+TBLPROPERTIES ('format-version'='3')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: INSERT INTO ice_parq (id) VALUES (1)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: INSERT INTO ice_parq (id) VALUES (1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: ALTER TABLE ice_parq ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+  name STRING DEFAULT 'unknown',
+  age INT DEFAULT 25,
+  salary DOUBLE DEFAULT 50000.0,
+  is_active BOOLEAN DEFAULT TRUE,
+  created_date DATE DEFAULT '2024-01-01',
+  created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+  score DECIMAL(5,2) DEFAULT 100.00,
+  category STRING DEFAULT 'general')
+PREHOOK: type: ALTERTABLE_ADDCOLS
+PREHOOK: Input: default@ice_parq
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: ALTER TABLE ice_parq ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+  name STRING DEFAULT 'unknown',
+  age INT DEFAULT 25,
+  salary DOUBLE DEFAULT 50000.0,
+  is_active BOOLEAN DEFAULT TRUE,
+  created_date DATE DEFAULT '2024-01-01',
+  created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+  score DECIMAL(5,2) DEFAULT 100.00,
+  category STRING DEFAULT 'general')
+POSTHOOK: type: ALTERTABLE_ADDCOLS
+POSTHOOK: Input: default@ice_parq
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: INSERT INTO ice_parq (id) VALUES (2)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: INSERT INTO ice_parq (id) VALUES (2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: SELECT * FROM ice_parq ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_parq
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_parq ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_parq
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	NULL	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+2	{"x":100,"y":99}	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+PREHOOK: query: ALTER TABLE ice_parq CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@ice_parq
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: ALTER TABLE ice_parq CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@ice_parq
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: ALTER TABLE ice_parq CHANGE COLUMN age age_new int DEFAULT 21
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@ice_parq
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: ALTER TABLE ice_parq CHANGE COLUMN age age_new int DEFAULT 21
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@ice_parq
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: INSERT INTO ice_parq (id) VALUES (3)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_parq
+POSTHOOK: query: INSERT INTO ice_parq (id) VALUES (3)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_parq
+PREHOOK: query: SELECT * FROM ice_parq ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_parq
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_parq ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_parq
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	NULL	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+2	{"x":100,"y":99}	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+3	{"x":100,"y":88}	unknown	21	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+PREHOOK: query: CREATE TABLE ice_avro (
+  id INT)
+STORED BY ICEBERG  stored as avro
+TBLPROPERTIES ('format-version'='3')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: CREATE TABLE ice_avro (
+  id INT)
+STORED BY ICEBERG  stored as avro
+TBLPROPERTIES ('format-version'='3')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: INSERT INTO ice_avro (id) VALUES (1)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: INSERT INTO ice_avro (id) VALUES (1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: ALTER TABLE ice_avro ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+  name STRING DEFAULT 'unknown',
+  age INT DEFAULT 25,
+  salary DOUBLE DEFAULT 50000.0,
+  is_active BOOLEAN DEFAULT TRUE,
+  created_date DATE DEFAULT '2024-01-01',
+  created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+  score DECIMAL(5,2) DEFAULT 100.00,
+  category STRING DEFAULT 'general')
+PREHOOK: type: ALTERTABLE_ADDCOLS
+PREHOOK: Input: default@ice_avro
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: ALTER TABLE ice_avro ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+  name STRING DEFAULT 'unknown',
+  age INT DEFAULT 25,
+  salary DOUBLE DEFAULT 50000.0,
+  is_active BOOLEAN DEFAULT TRUE,
+  created_date DATE DEFAULT '2024-01-01',
+  created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+  score DECIMAL(5,2) DEFAULT 100.00,
+  category STRING DEFAULT 'general')
+POSTHOOK: type: ALTERTABLE_ADDCOLS
+POSTHOOK: Input: default@ice_avro
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: INSERT INTO ice_avro (id) VALUES (2)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: INSERT INTO ice_avro (id) VALUES (2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: SELECT * FROM ice_avro ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_avro
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_avro ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_avro
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	NULL	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+2	{"x":100,"y":99}	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+PREHOOK: query: ALTER TABLE ice_avro CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@ice_avro
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: ALTER TABLE ice_avro CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@ice_avro
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: ALTER TABLE ice_avro CHANGE COLUMN age age_new int DEFAULT 21
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@ice_avro
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: ALTER TABLE ice_avro CHANGE COLUMN age age_new int DEFAULT 21
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@ice_avro
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: INSERT INTO ice_avro (id) VALUES (3)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_avro
+POSTHOOK: query: INSERT INTO ice_avro (id) VALUES (3)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_avro
+PREHOOK: query: SELECT * FROM ice_avro ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_avro
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_avro ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_avro
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	NULL	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+2	{"x":100,"y":99}	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+3	{"x":100,"y":88}	unknown	21	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+PREHOOK: query: CREATE TABLE ice_orc (
+  id INT)
+STORED BY ICEBERG stored as orc
+TBLPROPERTIES ('format-version'='3')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: CREATE TABLE ice_orc (
+  id INT)
+STORED BY ICEBERG stored as orc
+TBLPROPERTIES ('format-version'='3')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: INSERT INTO ice_orc (id) VALUES (1)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: INSERT INTO ice_orc (id) VALUES (1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: ALTER TABLE ice_orc ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+  name STRING DEFAULT 'unknown',
+  age INT DEFAULT 25,
+  salary DOUBLE DEFAULT 50000.0,
+  is_active BOOLEAN DEFAULT TRUE,
+  created_date DATE DEFAULT '2024-01-01',
+  created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+  score DECIMAL(5,2) DEFAULT 100.00,
+  category STRING DEFAULT 'general')
+PREHOOK: type: ALTERTABLE_ADDCOLS
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: ALTER TABLE ice_orc ADD COLUMNS (point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:99',
+  name STRING DEFAULT 'unknown',
+  age INT DEFAULT 25,
+  salary DOUBLE DEFAULT 50000.0,
+  is_active BOOLEAN DEFAULT TRUE,
+  created_date DATE DEFAULT '2024-01-01',
+  created_ts TIMESTAMP DEFAULT '2024-01-01T10:00:00',
+  score DECIMAL(5,2) DEFAULT 100.00,
+  category STRING DEFAULT 'general')
+POSTHOOK: type: ALTERTABLE_ADDCOLS
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: INSERT INTO ice_orc (id) VALUES (2)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: INSERT INTO ice_orc (id) VALUES (2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: SELECT * FROM ice_orc ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_orc ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
+2	{"x":100,"y":99}	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+PREHOOK: query: ALTER TABLE ice_orc CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: ALTER TABLE ice_orc CHANGE COLUMN point point STRUCT<x:INT, y:INT> DEFAULT 'x:100,y:88'
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: ALTER TABLE ice_orc CHANGE COLUMN age age_new int DEFAULT 21
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: ALTER TABLE ice_orc CHANGE COLUMN age age_new int DEFAULT 21
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: INSERT INTO ice_orc (id) VALUES (3)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice_orc
+POSTHOOK: query: INSERT INTO ice_orc (id) VALUES (3)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice_orc
+PREHOOK: query: SELECT * FROM ice_orc ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_orc ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
+2	{"x":100,"y":99}	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+3	{"x":100,"y":88}	unknown	21	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+PREHOOK: query: SELECT * FROM ice_parq ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_parq
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_parq ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_parq
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	NULL	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+2	{"x":100,"y":99}	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+3	{"x":100,"y":88}	unknown	21	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+PREHOOK: query: SELECT * FROM ice_avro ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_avro
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_avro ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_avro
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	NULL	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+2	{"x":100,"y":99}	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+3	{"x":100,"y":88}	unknown	21	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+PREHOOK: query: SELECT * FROM ice_orc ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice_orc
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM ice_orc ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice_orc
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
+2	{"x":100,"y":99}	unknown	25	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
+3	{"x":100,"y":88}	unknown	21	50000.0	true	2024-01-01	2024-01-01 10:00:00	100.00	general
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
index 4dd0dee..6035a68 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
@@ -14,6 +14,7 @@
 package org.apache.hadoop.hive.ql.io.parquet;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.io.DataCache;
@@ -40,6 +41,7 @@ public class VectorizedParquetInputFormat
   private DataCache dataCache = null;
   private Configuration cacheConf = null;
   private ParquetMetadata metadata;
+  private Map<String, Object> initialDefaults;
 
   public VectorizedParquetInputFormat() {
   }
@@ -50,13 +52,17 @@ public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(
     JobConf jobConf,
     Reporter reporter) throws IOException {
     return new VectorizedParquetRecordReader(
-        inputSplit, jobConf, metadataCache, dataCache, cacheConf, metadata);
+        inputSplit, jobConf, metadataCache, dataCache, cacheConf, metadata, initialDefaults);
   }
 
   public void setMetadata(ParquetMetadata metadata) throws IOException {
     this.metadata = metadata;
   }
 
+  public void seInitialColumnDefaults(Map<String, Object> initialDefaults) {
+    this.initialDefaults = initialDefaults;
+  }
+
   @Override
   public void injectCaches(
       FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java
index ee1d692..e8d95cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java
@@ -18,25 +18,115 @@
 
 package org.apache.hadoop.hive.ql.io.parquet.vector;
 
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
 /**
- * A dummy vectorized parquet reader to support schema evolution.
+ * A dummy vectorized parquet reader used for schema evolution.
+ * If a default value is provided, it returns that value for the entire batch.
+ * Otherwise, it returns nulls.
  */
 public class VectorizedDummyColumnReader extends BaseVectorizedColumnReader {
 
-  public VectorizedDummyColumnReader() {
+  private final Object defaultValue;
+
+  public VectorizedDummyColumnReader(Object defaultValue) {
     super();
+    this.defaultValue = defaultValue;
   }
 
   @Override
-  public void readBatch(int total, ColumnVector column, TypeInfo columnType) throws IOException {
-    Arrays.fill(column.isNull, true);
-    column.isRepeating = true;
-    column.noNulls = false;
+  public void readBatch(int total, ColumnVector col, TypeInfo typeInfo) throws IOException {
+
+    col.isRepeating = true;
+    // Case 1: No default → (all nulls)
+    if (defaultValue == null) {
+      Arrays.fill(col.isNull, true);
+      col.noNulls = false;
+      return;
+    }
+
+    // Case 2: We have a default → fill with constant value
+    col.noNulls = true;
+    col.isNull[0] = false;
+
+    if (typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+      fillPrimitive(col, (PrimitiveTypeInfo) typeInfo, defaultValue);
+    } else {
+      throw new IOException("Unsupported type category in DummyColumnReader: " + typeInfo.getCategory());
+    }
+  }
+
+  /**
+   * Fill the column with the given value.
+   * @param col   the column to fill
+   * @param ti    the type info of the column
+   * @param value value to fill the column with
+   * @throws IOException in case of error
+   */
+  private void fillPrimitive(ColumnVector col, PrimitiveTypeInfo ti, Object value) throws IOException {
+
+    switch (ti.getPrimitiveCategory()) {
+
+      case BOOLEAN:
+        ((LongColumnVector) col).vector[0] = ((Boolean) value) ? 1 : 0;
+        return;
+
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        ((LongColumnVector) col).vector[0] = ((Number) value).longValue();
+        return;
+
+      case FLOAT:
+      case DOUBLE:
+        ((DoubleColumnVector) col).vector[0] = ((Number) value).doubleValue();
+        return;
+
+      case STRING:
+      case VARCHAR:
+      case CHAR:
+        byte[] bytes = value.toString().getBytes(StandardCharsets.UTF_8);
+        ((BytesColumnVector) col).setRef(0, bytes, 0, bytes.length);
+        return;
+
+      case DECIMAL:
+        DecimalColumnVector dcv = (DecimalColumnVector) col;
+        dcv.set(0, HiveDecimal.create(value.toString()));
+        return;
+
+      case TIMESTAMP: {
+        TimestampColumnVector tcv = (TimestampColumnVector) col;
+
+        long micros = (Long) value;
+        long seconds = micros / 1_000_000L;
+        long nanos = (micros % 1_000_000L) * 1000L;
+        tcv.time[0] = seconds * 1000L;
+        tcv.nanos[0] = (int) nanos;
+
+        return;
+      }
+
+      case DATE: {
+        LongColumnVector lcv = (LongColumnVector) col;
+        lcv.vector[0] = ((Number) value).intValue();
+        return;
+      }
+      default:
+        throw new IOException("Unsupported primitive type in DummyColumnReader: " + ti.getPrimitiveCategory());
+    }
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 8ce85db..a35a453 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -84,6 +84,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -96,6 +97,7 @@
 public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   implements RecordReader<NullWritable, VectorizedRowBatch>, RowPositionAwareVectorizedRecordReader {
   public static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class);
+  private final Map<String, Object> initialDefaults;
 
   private List<Integer> colsToInclude;
 
@@ -154,7 +156,8 @@ public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf) thr
   }
 
   public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf, FileMetadataCache metadataCache,
-      DataCache dataCache, Configuration cacheConf, ParquetMetadata parquetMetadata) throws IOException {
+      DataCache dataCache, Configuration cacheConf, ParquetMetadata parquetMetadata,
+      Map<String, Object> initialDefaults) throws IOException {
     super(conf, oldInputSplit);
     try {
       this.metadataCache = metadataCache;
@@ -188,6 +191,7 @@ public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf, Fil
       }
       initPartitionValues(fileSplit, conf);
       bucketIdentifier = BucketIdentifier.from(conf, filePath);
+      this.initialDefaults = initialDefaults;
     } catch (Throwable e) {
       LOG.error("Failed to create the vectorized reader due to exception " + e);
       throw new RuntimeException(e);
@@ -196,7 +200,7 @@ public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf, Fil
 
   public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf, FileMetadataCache metadataCache,
       DataCache dataCache, Configuration cacheConf) throws IOException {
-    this(oldInputSplit, conf, metadataCache, dataCache, cacheConf, null);
+    this(oldInputSplit, conf, metadataCache, dataCache, cacheConf, null, null);
   }
 
   private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws IOException {
@@ -524,7 +528,8 @@ private VectorizedColumnReader buildVectorizedParquetReader(
     // reader that produces nulls. This allows queries to proceed even
     // when new columns have been added after the file was written.
     if (!fileSchema.getColumns().contains(descriptors.get(0))) {
-      return new VectorizedDummyColumnReader();
+      return new VectorizedDummyColumnReader(Optional.ofNullable(initialDefaults)
+          .map(defaults -> defaults.getOrDefault(descriptors.get(0).getPath()[0], null)).orElse(null));
     }
     switch (typeInfo.getCategory()) {
     case PRIMITIVE: