HIVE-27302: Iceberg: Support write to iceberg branch (#4292). (Butao Zhang, reviewed by  Denys Kuzmenko, Ayush Saxena)

diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 5e4fbf8..4ba3273 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -478,7 +478,7 @@
   DATACONNECTOR_NOT_EXISTS(10428, "Dataconnector does not exist:"),
   TIME_TRAVEL_NOT_ALLOWED(10429, "Time travel is not allowed for {0}. Please choose a storage format which supports the feature.", true),
   INVALID_METADATA_TABLE_NAME(10430, "Invalid metadata table name {0}.", true),
-  METADATA_TABLE_NOT_SUPPORTED(10431, "Metadata tables are not supported for table {0}.", true),
+  TABLE_META_REF_NOT_SUPPORTED(10431, "Table Meta Ref extension is not supported for table {0}.", true),
   COMPACTION_REFUSED(10432, "Compaction request for {0}.{1}{2} is refused, details: {3}.", true),
   CBO_IS_REQUIRED(10433,
           "The following functionality requires CBO (" + HiveConf.ConfVars.HIVE_CBO_ENABLED.varname + "): {0}", true),
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
index ab75665..d3bc13b 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
@@ -69,6 +69,7 @@
 
   public static final String NAME = "name";
   public static final String LOCATION = "location";
+  public static final String BRANCH_NAME = "branch_name";
 
   private static final String NO_CATALOG_TYPE = "no catalog";
   private static final Set<String> PROPERTIES_TO_REMOVE =
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index ff1f6bb..d1bfde2 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -85,6 +85,7 @@
   public static final boolean CONFIG_SERIALIZATION_DISABLED_DEFAULT = true;
   public static final String OPERATION_TYPE_PREFIX = "iceberg.mr.operation.type.";
   public static final String OUTPUT_TABLES = "iceberg.mr.output.tables";
+  public static final String OUTPUT_TABLE_BRANCH = "iceberg.mr.output.table.branch";
   public static final String COMMIT_TABLE_THREAD_POOL_SIZE = "iceberg.mr.commit.table.thread.pool.size";
   public static final int COMMIT_TABLE_THREAD_POOL_SIZE_DEFAULT = 10;
   public static final String COMMIT_FILE_THREAD_POOL_SIZE = "iceberg.mr.commit.file.thread.pool.size";
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index 57c60f2..fa92b63 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -143,6 +143,7 @@
     job.set(InputFormatConfig.AS_OF_TIMESTAMP, job.get(TableScanDesc.AS_OF_TIMESTAMP, "-1"));
     job.set(InputFormatConfig.SNAPSHOT_ID, job.get(TableScanDesc.AS_OF_VERSION, "-1"));
     job.set(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, job.get(TableScanDesc.FROM_VERSION, "-1"));
+    job.set(InputFormatConfig.OUTPUT_TABLE_BRANCH, job.get(TableScanDesc.BRANCH_NAME, ""));
 
     String location = job.get(InputFormatConfig.TABLE_LOCATION);
     return Arrays.stream(super.getSplits(job, numSplits))
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 5a394ca..7a5a8e6 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -36,11 +36,13 @@
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
 import org.apache.hadoop.mapred.JobConf;
@@ -431,16 +433,17 @@
 
     FilesForCommit writeResults = collectResults(
         numTasks, executor, outputTable.table.location(), jobContext, io, true);
+    String branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_BRANCH);
     if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
       if (writeResults.isEmpty()) {
         LOG.info(
             "Not creating a new commit for table: {}, jobID: {}, operation: {}, since there were no new files to add",
             table, jobContext.getJobID(), HiveCustomStorageHandlerUtils.getWriteOperation(conf, name));
       } else {
-        commitWrite(table, startTime, writeResults);
+        commitWrite(table, branchName, startTime, writeResults);
       }
     } else {
-      commitOverwrite(table, startTime, writeResults);
+      commitOverwrite(table, branchName, startTime, writeResults);
     }
   }
 
@@ -451,15 +454,21 @@
    * @param startTime The start time of the commit - used only for logging
    * @param results The object containing the new files we would like to add to the table
    */
-  private void commitWrite(Table table, long startTime, FilesForCommit results) {
+  private void commitWrite(Table table, String branchName, long startTime, FilesForCommit results) {
     if (results.deleteFiles().isEmpty()) {
       AppendFiles write = table.newAppend();
       results.dataFiles().forEach(write::appendFile);
+      if (StringUtils.isNotEmpty(branchName)) {
+        write.toBranch(HiveUtils.getTableBranch(branchName));
+      }
       write.commit();
     } else {
       RowDelta write = table.newRowDelta();
       results.dataFiles().forEach(write::addRows);
       results.deleteFiles().forEach(write::addDeletes);
+      if (StringUtils.isNotEmpty(branchName)) {
+        write.toBranch(HiveUtils.getTableBranch(branchName));
+      }
       write.commit();
     }
 
@@ -478,17 +487,23 @@
    * @param startTime The start time of the commit - used only for logging
    * @param results The object containing the new files
    */
-  private void commitOverwrite(Table table, long startTime, FilesForCommit results) {
+  private void commitOverwrite(Table table, String branchName, long startTime, FilesForCommit results) {
     Preconditions.checkArgument(results.deleteFiles().isEmpty(), "Can not handle deletes with overwrite");
     if (!results.dataFiles().isEmpty()) {
       ReplacePartitions overwrite = table.newReplacePartitions();
       results.dataFiles().forEach(overwrite::addFile);
+      if (StringUtils.isNotEmpty(branchName)) {
+        overwrite.toBranch(HiveUtils.getTableBranch(branchName));
+      }
       overwrite.commit();
       LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime,
           table, results.dataFiles().size());
     } else if (table.spec().isUnpartitioned()) {
       DeleteFiles deleteFiles = table.newDelete();
       deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue());
+      if (StringUtils.isNotEmpty(branchName)) {
+        deleteFiles.toBranch(HiveUtils.getTableBranch(branchName));
+      }
       deleteFiles.commit();
       LOG.info("Cleared table contents as part of empty overwrite for unpartitioned table. " +
           "Commit took {} ms for table: {}", System.currentTimeMillis() - startTime, table);
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 66d336a..375a6d2 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -59,6 +59,7 @@
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.Context.Operation;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc;
 import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
@@ -75,6 +76,7 @@
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.parse.AlterTableBranchSpec;
 import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
@@ -633,11 +635,12 @@
   public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException {
     String tableName = commitProperties.getProperty(Catalogs.NAME);
     String location = commitProperties.getProperty(Catalogs.LOCATION);
+    String branchName = commitProperties.getProperty(Catalogs.BRANCH_NAME);
     Configuration configuration = SessionState.getSessionConf();
     if (location != null) {
       HiveTableUtil.cleanupTableObjectFile(location, configuration);
     }
-    List<JobContext> jobContextList = generateJobContext(configuration, tableName, overwrite);
+    List<JobContext> jobContextList = generateJobContext(configuration, tableName, branchName, overwrite);
     if (jobContextList.isEmpty()) {
       return;
     }
@@ -678,7 +681,7 @@
   }
 
   @Override
-  public boolean isMetadataTableSupported() {
+  public boolean isTableMetaRefSupported() {
     return true;
   }
 
@@ -769,6 +772,25 @@
   }
 
   @Override
+  public org.apache.hadoop.hive.ql.metadata.Table checkAndSetTableMetaRef(
+      org.apache.hadoop.hive.ql.metadata.Table hmsTable, String tableMetaRef) throws SemanticException {
+    String branch = HiveUtils.getTableBranch(tableMetaRef);
+    if (branch != null) {
+      Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
+      if (tbl.snapshot(branch) != null) {
+        hmsTable.setBranchName(tableMetaRef);
+        return hmsTable;
+      }
+      throw new SemanticException(String.format("Cannot use branch (does not exist): %s", branch));
+    }
+    if (IcebergMetadataTables.isValidMetaTable(tableMetaRef)) {
+      hmsTable.setMetaTable(tableMetaRef);
+      return hmsTable;
+    }
+    throw new SemanticException(ErrorMsg.INVALID_METADATA_TABLE_NAME, tableMetaRef);
+  }
+
+  @Override
   public URI getURIForAuth(org.apache.hadoop.hive.metastore.api.Table hmsTable) throws URISyntaxException {
     String dbName = hmsTable.getDbName();
     String tableName = hmsTable.getTableName();
@@ -1252,7 +1274,7 @@
    * @return The generated Optional JobContext list or empty if not presents.
    */
   private List<JobContext> generateJobContext(Configuration configuration, String tableName,
-      boolean overwrite) {
+      String branchName, boolean overwrite) {
     JobConf jobConf = new JobConf(configuration);
     Optional<Map<String, SessionStateUtil.CommitInfo>> commitInfoMap =
         SessionStateUtil.getCommitInfo(jobConf, tableName);
@@ -1266,6 +1288,9 @@
         // we should only commit this current table because
         // for multi-table inserts, this hook method will be called sequentially for each target table
         jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
+        if (branchName != null) {
+          jobConf.set(InputFormatConfig.OUTPUT_TABLE_BRANCH, branchName);
+        }
 
         jobContextList.add(new JobContextImpl(jobConf, jobID, null));
       }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 3e32c11..46c1c23 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -30,10 +30,12 @@
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.llap.LlapHiveUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -126,6 +128,10 @@
       }
       snapshotId = ref.snapshotId();
     }
+    String branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_BRANCH);
+    if (StringUtils.isNotEmpty(branchName)) {
+      scan = scan.useRef(HiveUtils.getTableBranch(branchName));
+    }
     if (snapshotId != -1) {
       scan = scan.useSnapshot(snapshotId);
     }
diff --git a/iceberg/iceberg-handler/src/test/queries/negative/write_iceberg_branch_negative.q b/iceberg/iceberg-handler/src/test/queries/negative/write_iceberg_branch_negative.q
new file mode 100644
index 0000000..1645efc
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/negative/write_iceberg_branch_negative.q
@@ -0,0 +1,4 @@
+create external table ice01(a int, b string, c int) stored by iceberg;
+
+-- insert into branch test1 which does not exist
+insert into default.ice01.branch_test1 values(11, 'one', 22);
\ No newline at end of file
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/write_iceberg_branch.q b/iceberg/iceberg-handler/src/test/queries/positive/write_iceberg_branch.q
new file mode 100644
index 0000000..88ea0a4
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/write_iceberg_branch.q
@@ -0,0 +1,60 @@
+-- SORT_QUERY_RESULTS
+set hive.explain.user=false;
+set hive.fetch.task.conversion=more;
+
+create external table ice01(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2');
+create table source01(a int, b string, c int);
+
+insert into ice01 values (1, 'one', 50), (2, 'two', 51), (111, 'one', 55);
+insert into source01 values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54), (111, 'one', 55);
+
+-- create a branch named test1
+alter table ice01 create branch test1;
+
+-- query branch using table identifier: db.tbl.branch_branchName
+explain select * from default.ice01.branch_test1;
+select * from default.ice01.branch_test1;
+-- query branch using time travel syntax
+select * from ice01 for system_version as of 'test1';
+
+-- insert into branch test1
+explain insert into default.ice01.branch_test1 values(22, 'three', 44), (33, 'three', 66);
+insert into default.ice01.branch_test1 values(22, 'three', 44), (33, 'three', 66);
+select * from default.ice01.branch_test1;
+
+-- delete from branch test1
+explain delete from default.ice01.branch_test1 where a=22;
+delete from default.ice01.branch_test1 where a=22;
+select * from default.ice01.branch_test1;
+
+-- update branch test1
+explain update default.ice01.branch_test1 set a=33 where c=66;
+update default.ice01.branch_test1 set a=33 where c=66;
+select * from default.ice01.branch_test1;
+
+-- merge into branch test1
+explain
+merge into default.ice01.branch_test1 as t using source01 src ON t.a = src.a
+when matched and t.a > 100 THEN DELETE
+when matched then update set b = 'Merged', c = t.c + 10
+when not matched then insert values (src.a, src.b, src.c);
+
+merge into default.ice01.branch_test1 as t using source01 src ON t.a = src.a
+when matched and t.a > 100 THEN DELETE
+when matched then update set b = 'Merged', c = t.c + 10
+when not matched then insert values (src.a, src.b, src.c);
+
+select * from default.ice01.branch_test1;
+
+-- insert overwrite branch test1
+explain insert overwrite table default.ice01.branch_test1 values (77, 'one', 88);
+insert overwrite table default.ice01.branch_test1 values (77, 'one', 88);
+select * from default.ice01.branch_test1;
+
+-- query branch using non-fetch task
+set hive.fetch.task.conversion=none;
+explain select * from default.ice01.branch_test1;
+select * from default.ice01.branch_test1;
+
+drop table ice01;
+drop table source01;
\ No newline at end of file
diff --git a/iceberg/iceberg-handler/src/test/results/negative/write_iceberg_branch_negative.q.out b/iceberg/iceberg-handler/src/test/results/negative/write_iceberg_branch_negative.q.out
new file mode 100644
index 0000000..0d76fc3
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/negative/write_iceberg_branch_negative.q.out
@@ -0,0 +1,9 @@
+PREHOOK: query: create external table ice01(a int, b string, c int) stored by iceberg
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ice01
+POSTHOOK: query: create external table ice01(a int, b string, c int) stored by iceberg
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ice01
+FAILED: SemanticException Cannot use branch (does not exist): test1
diff --git a/iceberg/iceberg-handler/src/test/results/positive/write_iceberg_branch.q.out b/iceberg/iceberg-handler/src/test/results/positive/write_iceberg_branch.q.out
new file mode 100644
index 0000000..4d99ee2
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/write_iceberg_branch.q.out
@@ -0,0 +1,902 @@
+PREHOOK: query: create external table ice01(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@ice01
+POSTHOOK: query: create external table ice01(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@ice01
+PREHOOK: query: create table source01(a int, b string, c int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@source01
+POSTHOOK: query: create table source01(a int, b string, c int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@source01
+PREHOOK: query: insert into ice01 values (1, 'one', 50), (2, 'two', 51), (111, 'one', 55)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice01
+POSTHOOK: query: insert into ice01 values (1, 'one', 50), (2, 'two', 51), (111, 'one', 55)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice01
+PREHOOK: query: insert into source01 values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54), (111, 'one', 55)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@source01
+POSTHOOK: query: insert into source01 values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54), (111, 'one', 55)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@source01
+POSTHOOK: Lineage: source01.a SCRIPT []
+POSTHOOK: Lineage: source01.b SCRIPT []
+POSTHOOK: Lineage: source01.c SCRIPT []
+PREHOOK: query: alter table ice01 create branch test1
+PREHOOK: type: ALTERTABLE_CREATEBRANCH
+PREHOOK: Input: default@ice01
+POSTHOOK: query: alter table ice01 create branch test1
+POSTHOOK: type: ALTERTABLE_CREATEBRANCH
+POSTHOOK: Input: default@ice01
+PREHOOK: query: explain select * from default.ice01.branch_test1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: explain select * from default.ice01.branch_test1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: ice01
+          branch name: branch_test1
+          Select Operator
+            expressions: a (type: int), b (type: string), c (type: int)
+            outputColumnNames: _col0, _col1, _col2
+            ListSink
+
+PREHOOK: query: select * from default.ice01.branch_test1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from default.ice01.branch_test1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	one	50
+111	one	55
+2	two	51
+PREHOOK: query: select * from ice01 for system_version as of 'test1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from ice01 for system_version as of 'test1'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	one	50
+111	one	55
+2	two	51
+PREHOOK: query: explain insert into default.ice01.branch_test1 values(22, 'three', 44), (33, 'three', 66)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice01
+POSTHOOK: query: explain insert into default.ice01.branch_test1 values(22, 'three', 44), (33, 'three', 66)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice01
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: _dummy_table
+                  Row Limit Per Split: 1
+                  Statistics: Num rows: 1 Data size: 10 Basic stats: COMPLETE Column stats: COMPLETE
+                  Select Operator
+                    expressions: array(const struct(22,'three',44),const struct(33,'three',66)) (type: array<struct<col1:int,col2:string,col3:int>>)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 1 Data size: 56 Basic stats: COMPLETE Column stats: COMPLETE
+                    UDTF Operator
+                      Statistics: Num rows: 1 Data size: 56 Basic stats: COMPLETE Column stats: COMPLETE
+                      function name: inline
+                      Select Operator
+                        expressions: col1 (type: int), col2 (type: string), col3 (type: int)
+                        outputColumnNames: _col0, _col1, _col2
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                          table:
+                              input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                              output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                              serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                              name: default.ice01
+                        Select Operator
+                          expressions: _col0 (type: int), _col1 (type: string), _col2 (type: int)
+                          outputColumnNames: a, b, c
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                          Group By Operator
+                            aggregations: min(a), max(a), count(1), count(a), compute_bit_vector_hll(a), max(length(b)), avg(COALESCE(length(b),0)), count(b), compute_bit_vector_hll(b), min(c), max(c), count(c), compute_bit_vector_hll(c)
+                            minReductionHashAggr: 0.4
+                            mode: hash
+                            outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
+                            Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: COMPLETE
+                            Reduce Output Operator
+                              null sort order: 
+                              sort order: 
+                              Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: COMPLETE
+                              value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: int), _col6 (type: struct<count:bigint,sum:double,input:int>), _col7 (type: bigint), _col8 (type: binary), _col9 (type: int), _col10 (type: int), _col11 (type: bigint), _col12 (type: binary)
+        Reducer 2 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), max(VALUE._col5), avg(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
+                Statistics: Num rows: 1 Data size: 492 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: 'LONG' (type: string), UDFToLong(_col0) (type: bigint), UDFToLong(_col1) (type: bigint), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'STRING' (type: string), UDFToLong(COALESCE(_col5,0)) (type: bigint), COALESCE(_col6,0) (type: double), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'LONG' (type: string), UDFToLong(_col9) (type: bigint), UDFToLong(_col10) (type: bigint), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary)
+                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17
+                  Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: false
+          table:
+              input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+              output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+              serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+              name: default.ice01
+
+  Stage: Stage-3
+    Stats Work
+      Basic Stats Work:
+      Column Stats Desc:
+          Columns: a, b, c
+          Column Types: int, string, int
+          Table: default.ice01
+
+PREHOOK: query: insert into default.ice01.branch_test1 values(22, 'three', 44), (33, 'three', 66)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice01
+POSTHOOK: query: insert into default.ice01.branch_test1 values(22, 'three', 44), (33, 'three', 66)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice01
+PREHOOK: query: select * from default.ice01.branch_test1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from default.ice01.branch_test1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	one	50
+111	one	55
+2	two	51
+22	three	44
+33	three	66
+PREHOOK: query: explain delete from default.ice01.branch_test1 where a=22
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: explain delete from default.ice01.branch_test1 where a=22
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: ice01
+                  branch name: branch_test1
+                  filterExpr: (a = 22) (type: boolean)
+                  Statistics: Num rows: 3 Data size: 291 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (a = 22) (type: boolean)
+                    Statistics: Num rows: 2 Data size: 194 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: PARTITION__SPEC__ID (type: int), PARTITION__HASH (type: bigint), FILE__PATH (type: string), ROW__POSITION (type: bigint), b (type: string), c (type: int)
+                      outputColumnNames: _col0, _col1, _col2, _col3, _col5, _col6
+                      Statistics: Num rows: 2 Data size: 594 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint)
+                        null sort order: zzzz
+                        sort order: ++++
+                        Statistics: Num rows: 2 Data size: 594 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col5 (type: string), _col6 (type: int)
+            Execution mode: vectorized
+        Reducer 2 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: bigint), KEY.reducesinkkey2 (type: string), KEY.reducesinkkey3 (type: bigint), 22 (type: int), VALUE._col0 (type: string), VALUE._col1 (type: int)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+                Statistics: Num rows: 2 Data size: 602 Basic stats: COMPLETE Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 2 Data size: 602 Basic stats: COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                      output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                      serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                      name: default.ice01
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: false
+          table:
+              input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+              output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+              serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+              name: default.ice01
+
+  Stage: Stage-3
+    Stats Work
+      Basic Stats Work:
+
+PREHOOK: query: delete from default.ice01.branch_test1 where a=22
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: delete from default.ice01.branch_test1 where a=22
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+PREHOOK: query: select * from default.ice01.branch_test1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from default.ice01.branch_test1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	one	50
+111	one	55
+2	two	51
+33	three	66
+PREHOOK: query: explain update default.ice01.branch_test1 set a=33 where c=66
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: explain update default.ice01.branch_test1 set a=33 where c=66
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+POSTHOOK: Output: default@ice01
+STAGE DEPENDENCIES:
+  Stage-2 is a root stage
+  Stage-3 depends on stages: Stage-2
+  Stage-0 depends on stages: Stage-3
+  Stage-4 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-2
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: ice01
+                  branch name: branch_test1
+                  filterExpr: (c = 66) (type: boolean)
+                  Statistics: Num rows: 3 Data size: 291 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (c = 66) (type: boolean)
+                    Statistics: Num rows: 2 Data size: 194 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: PARTITION__SPEC__ID (type: int), PARTITION__HASH (type: bigint), FILE__PATH (type: string), ROW__POSITION (type: bigint), a (type: int), b (type: string), b (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col8
+                      Statistics: Num rows: 2 Data size: 772 Basic stats: COMPLETE Column stats: COMPLETE
+                      Select Operator
+                        expressions: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint), _col4 (type: int), _col5 (type: string)
+                        outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
+                        Statistics: Num rows: 2 Data size: 594 Basic stats: COMPLETE Column stats: COMPLETE
+                        Reduce Output Operator
+                          key expressions: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint)
+                          null sort order: zzzz
+                          sort order: ++++
+                          Statistics: Num rows: 2 Data size: 594 Basic stats: COMPLETE Column stats: COMPLETE
+                          value expressions: _col4 (type: int), _col5 (type: string)
+                      Select Operator
+                        expressions: 33 (type: int), _col8 (type: string), 66 (type: int)
+                        outputColumnNames: _col0, _col1, _col2
+                        Statistics: Num rows: 2 Data size: 194 Basic stats: COMPLETE Column stats: COMPLETE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 2 Data size: 194 Basic stats: COMPLETE Column stats: COMPLETE
+                          table:
+                              input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                              output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                              serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                              name: default.ice01
+            Execution mode: vectorized
+        Reducer 2 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: bigint), KEY.reducesinkkey2 (type: string), KEY.reducesinkkey3 (type: bigint), VALUE._col0 (type: int), VALUE._col1 (type: string), 66 (type: int)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+                Statistics: Num rows: 2 Data size: 602 Basic stats: COMPLETE Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 2 Data size: 602 Basic stats: COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                      output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                      serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                      name: default.ice01
+
+  Stage: Stage-3
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: false
+          table:
+              input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+              output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+              serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+              name: default.ice01
+
+  Stage: Stage-4
+    Stats Work
+      Basic Stats Work:
+
+PREHOOK: query: update default.ice01.branch_test1 set a=33 where c=66
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: update default.ice01.branch_test1 set a=33 where c=66
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+POSTHOOK: Output: default@ice01
+PREHOOK: query: select * from default.ice01.branch_test1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from default.ice01.branch_test1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	one	50
+111	one	55
+2	two	51
+33	three	66
+PREHOOK: query: explain
+merge into default.ice01.branch_test1 as t using source01 src ON t.a = src.a
+when matched and t.a > 100 THEN DELETE
+when matched then update set b = 'Merged', c = t.c + 10
+when not matched then insert values (src.a, src.b, src.c)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Input: default@source01
+PREHOOK: Output: default@ice01
+PREHOOK: Output: default@ice01
+PREHOOK: Output: default@merge_tmp_table
+POSTHOOK: query: explain
+merge into default.ice01.branch_test1 as t using source01 src ON t.a = src.a
+when matched and t.a > 100 THEN DELETE
+when matched then update set b = 'Merged', c = t.c + 10
+when not matched then insert values (src.a, src.b, src.c)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Input: default@source01
+POSTHOOK: Output: default@ice01
+POSTHOOK: Output: default@ice01
+POSTHOOK: Output: default@merge_tmp_table
+STAGE DEPENDENCIES:
+  Stage-5 is a root stage
+  Stage-6 depends on stages: Stage-5
+  Stage-0 depends on stages: Stage-6
+  Stage-7 depends on stages: Stage-0
+  Stage-4 depends on stages: Stage-6
+  Stage-8 depends on stages: Stage-4
+
+STAGE PLANS:
+  Stage: Stage-5
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 5 <- Reducer 2 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 6 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
+                  Select Operator
+                    expressions: a (type: int), b (type: string), c (type: int)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 6 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: int)
+                      null sort order: z
+                      sort order: +
+                      Map-reduce partition columns: _col0 (type: int)
+                      Statistics: Num rows: 6 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE
+                      value expressions: _col1 (type: string), _col2 (type: int)
+            Execution mode: vectorized
+        Map 6 
+            Map Operator Tree:
+                TableScan
+                  alias: ice01
+                  branch name: branch_test1
+                  filterExpr: a is not null (type: boolean)
+                  Statistics: Num rows: 3 Data size: 291 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: a is not null (type: boolean)
+                    Statistics: Num rows: 3 Data size: 291 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: PARTITION__SPEC__ID (type: int), PARTITION__HASH (type: bigint), FILE__PATH (type: string), ROW__POSITION (type: bigint), a (type: int), b (type: string), c (type: int)
+                      outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+                      Statistics: Num rows: 3 Data size: 903 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col4 (type: int)
+                        null sort order: z
+                        sort order: +
+                        Map-reduce partition columns: _col4 (type: int)
+                        Statistics: Num rows: 3 Data size: 903 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint), _col5 (type: string), _col6 (type: int)
+            Execution mode: vectorized
+        Reducer 2 
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Left Outer Join 0 to 1
+                keys:
+                  0 _col0 (type: int)
+                  1 _col4 (type: int)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9
+                Statistics: Num rows: 3 Data size: 1191 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: _col1 (type: string), _col0 (type: int), _col5 (type: string), _col2 (type: int), _col6 (type: bigint), _col4 (type: bigint), _col3 (type: int), _col9 (type: int), _col8 (type: string), _col7 (type: int)
+                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9
+                  Statistics: Num rows: 3 Data size: 1191 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: ((_col9 = _col1) and (_col9 > 100)) (type: boolean)
+                    Statistics: Num rows: 1 Data size: 397 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: _col6 (type: int), _col5 (type: bigint), _col2 (type: string), _col4 (type: bigint), _col9 (type: int), _col8 (type: string), _col7 (type: int)
+                      outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+                      Statistics: Num rows: 1 Data size: 301 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint)
+                        null sort order: zzzz
+                        sort order: ++++
+                        Statistics: Num rows: 1 Data size: 301 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col4 (type: int), _col5 (type: string), _col6 (type: int)
+                  Filter Operator
+                    predicate: ((_col9 = _col1) and (_col9 <= 100)) (type: boolean)
+                    Statistics: Num rows: 1 Data size: 397 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: _col6 (type: int), _col5 (type: bigint), _col2 (type: string), _col4 (type: bigint), _col9 (type: int), _col8 (type: string), _col7 (type: int)
+                      outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+                      Statistics: Num rows: 1 Data size: 301 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint)
+                        null sort order: zzzz
+                        sort order: ++++
+                        Statistics: Num rows: 1 Data size: 301 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col4 (type: int), _col5 (type: string), _col6 (type: int)
+                  Filter Operator
+                    predicate: ((_col9 = _col1) and (_col9 <= 100)) (type: boolean)
+                    Statistics: Num rows: 1 Data size: 397 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: _col9 (type: int), 'Merged' (type: string), (_col7 + 10) (type: int)
+                      outputColumnNames: _col0, _col1, _col2
+                      Statistics: Num rows: 1 Data size: 98 Basic stats: COMPLETE Column stats: COMPLETE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 1 Data size: 98 Basic stats: COMPLETE Column stats: COMPLETE
+                        table:
+                            input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                            output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                            serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                            name: default.ice01
+                  Filter Operator
+                    predicate: _col9 is null (type: boolean)
+                    Statistics: Num rows: 1 Data size: 397 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: _col1 (type: int), _col0 (type: string), _col3 (type: int)
+                      outputColumnNames: _col0, _col1, _col2
+                      Statistics: Num rows: 1 Data size: 96 Basic stats: COMPLETE Column stats: COMPLETE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 1 Data size: 96 Basic stats: COMPLETE Column stats: COMPLETE
+                        table:
+                            input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                            output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                            serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                            name: default.ice01
+                  Filter Operator
+                    predicate: (_col9 = _col1) (type: boolean)
+                    Statistics: Num rows: 1 Data size: 397 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: _col2 (type: string), _col4 (type: bigint), _col5 (type: bigint), _col6 (type: int)
+                      outputColumnNames: _col2, _col4, _col5, _col6
+                      Statistics: Num rows: 1 Data size: 397 Basic stats: COMPLETE Column stats: COMPLETE
+                      Group By Operator
+                        aggregations: count()
+                        keys: _col6 (type: int), _col5 (type: bigint), _col2 (type: string), _col4 (type: bigint)
+                        minReductionHashAggr: 0.4
+                        mode: hash
+                        outputColumnNames: _col0, _col1, _col2, _col3, _col4
+                        Statistics: Num rows: 1 Data size: 212 Basic stats: COMPLETE Column stats: COMPLETE
+                        Reduce Output Operator
+                          key expressions: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint)
+                          null sort order: zzzz
+                          sort order: ++++
+                          Map-reduce partition columns: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint)
+                          Statistics: Num rows: 1 Data size: 212 Basic stats: COMPLETE Column stats: COMPLETE
+                          value expressions: _col4 (type: bigint)
+        Reducer 3 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: bigint), KEY.reducesinkkey2 (type: string), KEY.reducesinkkey3 (type: bigint), VALUE._col0 (type: int), VALUE._col1 (type: string), VALUE._col2 (type: int)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+                Statistics: Num rows: 1 Data size: 301 Basic stats: COMPLETE Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 301 Basic stats: COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                      output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                      serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                      name: default.ice01
+        Reducer 4 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: bigint), KEY.reducesinkkey2 (type: string), KEY.reducesinkkey3 (type: bigint), VALUE._col0 (type: int), VALUE._col1 (type: string), VALUE._col2 (type: int)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+                Statistics: Num rows: 1 Data size: 301 Basic stats: COMPLETE Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 301 Basic stats: COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                      output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                      serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                      name: default.ice01
+        Reducer 5 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                keys: KEY._col0 (type: int), KEY._col1 (type: bigint), KEY._col2 (type: string), KEY._col3 (type: bigint)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4
+                Statistics: Num rows: 1 Data size: 212 Basic stats: COMPLETE Column stats: COMPLETE
+                Filter Operator
+                  predicate: (_col4 > 1L) (type: boolean)
+                  Statistics: Num rows: 1 Data size: 212 Basic stats: COMPLETE Column stats: COMPLETE
+                  Select Operator
+                    expressions: cardinality_violation(_col0,_col1,_col2,_col3) (type: int)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE
+                      table:
+                          input format: org.apache.hadoop.mapred.TextInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                          name: default.merge_tmp_table
+
+  Stage: Stage-6
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: false
+          table:
+              input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+              output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+              serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+              name: default.ice01
+
+  Stage: Stage-7
+    Stats Work
+      Basic Stats Work:
+
+  Stage: Stage-4
+    Move Operator
+      tables:
+          replace: false
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: default.merge_tmp_table
+
+  Stage: Stage-8
+    Stats Work
+      Basic Stats Work:
+
+PREHOOK: query: merge into default.ice01.branch_test1 as t using source01 src ON t.a = src.a
+when matched and t.a > 100 THEN DELETE
+when matched then update set b = 'Merged', c = t.c + 10
+when not matched then insert values (src.a, src.b, src.c)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Input: default@source01
+PREHOOK: Output: default@ice01
+PREHOOK: Output: default@ice01
+PREHOOK: Output: default@merge_tmp_table
+POSTHOOK: query: merge into default.ice01.branch_test1 as t using source01 src ON t.a = src.a
+when matched and t.a > 100 THEN DELETE
+when matched then update set b = 'Merged', c = t.c + 10
+when not matched then insert values (src.a, src.b, src.c)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Input: default@source01
+POSTHOOK: Output: default@ice01
+POSTHOOK: Output: default@ice01
+POSTHOOK: Output: default@merge_tmp_table
+POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(ice01)ice01.null, ]
+PREHOOK: query: select * from default.ice01.branch_test1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from default.ice01.branch_test1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	Merged	60
+2	Merged	61
+3	three	52
+33	three	66
+4	four	53
+5	five	54
+PREHOOK: query: explain insert overwrite table default.ice01.branch_test1 values (77, 'one', 88)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice01
+POSTHOOK: query: explain insert overwrite table default.ice01.branch_test1 values (77, 'one', 88)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice01
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: _dummy_table
+                  Row Limit Per Split: 1
+                  Statistics: Num rows: 1 Data size: 10 Basic stats: COMPLETE Column stats: COMPLETE
+                  Select Operator
+                    expressions: array(const struct(77,'one',88)) (type: array<struct<col1:int,col2:string,col3:int>>)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 1 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
+                    UDTF Operator
+                      Statistics: Num rows: 1 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
+                      function name: inline
+                      Select Operator
+                        expressions: col1 (type: int), col2 (type: string), col3 (type: int)
+                        outputColumnNames: _col0, _col1, _col2
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                          table:
+                              input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                              output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                              serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                              name: default.ice01
+                        Select Operator
+                          expressions: _col0 (type: int), _col1 (type: string), _col2 (type: int)
+                          outputColumnNames: a, b, c
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
+                          Group By Operator
+                            aggregations: min(a), max(a), count(1), count(a), compute_bit_vector_hll(a), max(length(b)), avg(COALESCE(length(b),0)), count(b), compute_bit_vector_hll(b), min(c), max(c), count(c), compute_bit_vector_hll(c)
+                            minReductionHashAggr: 0.4
+                            mode: hash
+                            outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
+                            Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: COMPLETE
+                            Reduce Output Operator
+                              null sort order: 
+                              sort order: 
+                              Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: COMPLETE
+                              value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: int), _col6 (type: struct<count:bigint,sum:double,input:int>), _col7 (type: bigint), _col8 (type: binary), _col9 (type: int), _col10 (type: int), _col11 (type: bigint), _col12 (type: binary)
+        Reducer 2 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: min(VALUE._col0), max(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), max(VALUE._col5), avg(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
+                Statistics: Num rows: 1 Data size: 492 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: 'LONG' (type: string), UDFToLong(_col0) (type: bigint), UDFToLong(_col1) (type: bigint), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'STRING' (type: string), UDFToLong(COALESCE(_col5,0)) (type: bigint), COALESCE(_col6,0) (type: double), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'LONG' (type: string), UDFToLong(_col9) (type: bigint), UDFToLong(_col10) (type: bigint), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary)
+                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17
+                  Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: true
+          table:
+              input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+              output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+              serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+              name: default.ice01
+
+  Stage: Stage-3
+    Stats Work
+      Basic Stats Work:
+      Column Stats Desc:
+          Columns: a, b, c
+          Column Types: int, string, int
+          Table: default.ice01
+
+PREHOOK: query: insert overwrite table default.ice01.branch_test1 values (77, 'one', 88)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@ice01
+POSTHOOK: query: insert overwrite table default.ice01.branch_test1 values (77, 'one', 88)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@ice01
+PREHOOK: query: select * from default.ice01.branch_test1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from default.ice01.branch_test1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+77	one	88
+PREHOOK: query: explain select * from default.ice01.branch_test1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: explain select * from default.ice01.branch_test1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: ice01
+                  branch name: branch_test1
+                  Statistics: Num rows: 3 Data size: 285 Basic stats: COMPLETE Column stats: COMPLETE
+                  Select Operator
+                    expressions: a (type: int), b (type: string), c (type: int)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 3 Data size: 285 Basic stats: COMPLETE Column stats: COMPLETE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 3 Data size: 285 Basic stats: COMPLETE Column stats: COMPLETE
+                      table:
+                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Execution mode: vectorized
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select * from default.ice01.branch_test1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@ice01
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from default.ice01.branch_test1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+77	one	88
+PREHOOK: query: drop table ice01
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@ice01
+PREHOOK: Output: default@ice01
+POSTHOOK: query: drop table ice01
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@ice01
+POSTHOOK: Output: default@ice01
+PREHOOK: query: drop table source01
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@source01
+PREHOOK: Output: default@source01
+POSTHOOK: query: drop table source01
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@source01
+POSTHOOK: Output: default@source01
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
index 652e51c..2310f80 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
@@ -936,9 +936,9 @@
             tbl.getTTable().getDictionary() : new ObjectDictionary();
         List<ByteBuffer> buffers = new ArrayList<>();
         String statsSetup = StatsSetupConst.ColumnStatsSetup.getStatsSetupAsString(true,
-            // Ignore all Iceberg leftover files when storageHandler.isMetadataTableSupported() is true,
+            // Ignore all Iceberg leftover files when storageHandler.isTableIdentifierSupported() is true,
             // as the method is only enabled in Iceberg currently.
-            storageHandler != null && storageHandler.isMetadataTableSupported(),
+            storageHandler != null && storageHandler.isTableMetaRefSupported(),
             MetaStoreUtils.getColumnNames(tbl.getCols()));
         buffers.add(ByteBuffer.wrap(statsSetup.getBytes(StandardCharsets.UTF_8)));
         dictionary.putToValues(StatsSetupConst.STATS_FOR_CREATE_TABLE, buffers);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/DescTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/DescTableOperation.java
index 53db08b..c39dbc8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/DescTableOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/DescTableOperation.java
@@ -117,7 +117,7 @@
 
   private Table getTable() throws HiveException {
     Table table = context.getDb().getTable(desc.getTableName().getDb(), desc.getTableName().getTable(),
-        desc.getTableName().getMetaTable(), false, false, false);
+        desc.getTableName().getTableMetaRef(), false, false, false);
     if (table == null) {
       throw new HiveException(ErrorMsg.INVALID_TABLE, desc.getDbTableName());
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 315e00f..d9c726d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -257,6 +257,7 @@
   public static final String MAPNAME = "Map ";
   public static final String REDUCENAME = "Reducer ";
   public static final String ENSURE_OPERATORS_EXECUTED = "ENSURE_OPERATORS_EXECUTED";
+  public static final String BRANCH_NAME = "branch_name";
 
   @Deprecated
   protected static final String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = "mapred.dfsclient.parallelism.max";
@@ -763,6 +764,9 @@
     if (tbl.getMetaTable() != null) {
       props.put("metaTable", tbl.getMetaTable());
     }
+    if (tbl.getBranchName() != null) {
+      props.put(BRANCH_NAME, tbl.getBranchName());
+    }
     return (new TableDesc(tbl.getInputFormatClass(), tbl
         .getOutputFormatClass(), props));
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index fee00fa..3a95f47 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -997,6 +997,10 @@
     if (scanDesc.getVersionIntervalFrom() != null) {
       jobConf.set(TableScanDesc.FROM_VERSION, scanDesc.getVersionIntervalFrom());
     }
+
+    if (scanDesc.getBranchName() != null) {
+      jobConf.set(TableScanDesc.BRANCH_NAME, scanDesc.getBranchName());
+    }
   }
 
   protected void pushProjectionsAndFiltersAndAsOf(JobConf jobConf, Path splitPath) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 8377dc4..53e95ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1589,7 +1589,7 @@
    */
   public Table getTable(TableName tableName) throws HiveException {
     return this.getTable(ObjectUtils.firstNonNull(tableName.getDb(), SessionState.get().getCurrentDatabase()),
-        tableName.getTable(), null, true);
+        tableName.getTable(), tableName.getTableMetaRef(), true);
   }
 
   /**
@@ -1599,16 +1599,16 @@
    *          the name of the database
    * @param tableName
    *          the name of the table
-   * @param metaTableName
-   *          the name of the metadata table
+   * @param tableMetaRef
+   *          the name of the table meta ref, e.g. iceberg metadata table or branch
    * @param throwException
    *          controls whether an exception is thrown or a returns a null
    * @return the table or if throwException is false a null value.
    * @throws HiveException
    */
   public Table getTable(final String dbName, final String tableName,
-                        final String metaTableName, boolean throwException) throws HiveException {
-    return this.getTable(dbName, tableName, metaTableName, throwException, false);
+                        final String tableMetaRef, boolean throwException) throws HiveException {
+    return this.getTable(dbName, tableName, tableMetaRef, throwException, false);
   }
 
   /**
@@ -1654,8 +1654,8 @@
    *          the name of the database
    * @param tableName
    *          the name of the table
-   * @param metaTableName
-   *          the name of the metadata table
+   * @param tableMetaRef
+   *          the name of the table meta ref, e.g. iceberg metadata table or branch
    * @param throwException
    *          controls whether an exception is thrown or a returns a null
    * @param checkTransactional
@@ -1664,9 +1664,9 @@
    * @return the table or if throwException is false a null value.
    * @throws HiveException
    */
-  public Table getTable(final String dbName, final String tableName, String metaTableName, boolean throwException,
+  public Table getTable(final String dbName, final String tableName, String tableMetaRef, boolean throwException,
                         boolean checkTransactional) throws HiveException {
-    return getTable(dbName, tableName, metaTableName, throwException, checkTransactional, false);
+    return getTable(dbName, tableName, tableMetaRef, throwException, checkTransactional, false);
   }
 
   /**
@@ -1676,8 +1676,8 @@
    *          the name of the database
    * @param tableName
    *          the name of the table
-   * @param metaTableName
-   *          the name of the metadata table
+   * @param tableMetaRef
+   *          the name of the table meta ref, e.g. iceberg metadata table or branch
    * @param throwException
    *          controls whether an exception is thrown or a returns a null
    * @param checkTransactional
@@ -1688,7 +1688,7 @@
    * @return the table or if throwException is false a null value.
    * @throws HiveException
    */
-  public Table getTable(final String dbName, final String tableName, String metaTableName, boolean throwException,
+  public Table getTable(final String dbName, final String tableName, String tableMetaRef, boolean throwException,
                         boolean checkTransactional, boolean getColumnStats) throws HiveException {
 
     if (tableName == null || tableName.equals("")) {
@@ -1751,15 +1751,12 @@
     }
 
     Table t = new Table(tTable);
-    if (metaTableName != null) {
-      if (t.getStorageHandler() == null || !t.getStorageHandler().isMetadataTableSupported()) {
-        throw new SemanticException(ErrorMsg.METADATA_TABLE_NOT_SUPPORTED, t.getTableName());
+    if (tableMetaRef != null) {
+      if (t.getStorageHandler() == null || !t.getStorageHandler().isTableMetaRefSupported()) {
+        throw new SemanticException(ErrorMsg.TABLE_META_REF_NOT_SUPPORTED, t.getTableName());
       }
-      if (!t.getStorageHandler().isValidMetadataTable(metaTableName)) {
-        throw new SemanticException(ErrorMsg.INVALID_METADATA_TABLE_NAME, metaTableName);
-      }
+      t = t.getStorageHandler().checkAndSetTableMetaRef(t, tableMetaRef);
     }
-    t.setMetaTable(metaTableName);
     return t;
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 1ebbe50..d1efdc3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -550,7 +550,21 @@
     return false;
   }
 
+  /**
+   * Introduced by HIVE-25457 for iceberg to query metadata table.
+   * @return true if the storage handler can support it
+   * @deprecated Use {@link #isTableMetaRefSupported()}
+   */
+  @Deprecated
   default boolean isMetadataTableSupported() {
+    return isTableMetaRefSupported();
+  }
+
+  /**
+   * Check whether the table supports metadata references which mainly include branch, tag and metadata tables.
+   * @return true if the storage handler can support it
+   */
+  default boolean isTableMetaRefSupported() {
     return false;
   }
 
@@ -558,6 +572,11 @@
     return false;
   }
 
+  default org.apache.hadoop.hive.ql.metadata.Table checkAndSetTableMetaRef(
+      org.apache.hadoop.hive.ql.metadata.Table hmsTable, String tableMetaRef) throws SemanticException {
+    return null;
+  }
+
   /**
    * Constructs a URI for authorization purposes using the HMS table object
    * @param table The HMS table object
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
index cf756bc..c91ae9e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
@@ -21,6 +21,8 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.parse.Quotation;
 import org.slf4j.Logger;
@@ -107,6 +109,7 @@
   static final byte[] tabEscapeBytes = "\\t".getBytes();;
   static final byte[] tabUnescapeBytes = "\t".getBytes();
   static final byte[] ctrlABytes = "\u0001".getBytes();
+  static final Pattern BRANCH = Pattern.compile("branch_(.*)");
 
 
   public static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class);
@@ -439,4 +442,12 @@
     }
     return new Path(root, dbName);
   }
+
+  public static String getTableBranch(String branchName) {
+    Matcher branch = BRANCH.matcher(branchName);
+    if (branch.matches()) {
+      return branch.group(1);
+    }
+    return null;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index 88a7960..5f1cbad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -140,6 +140,8 @@
    */
   private String asOfTimestamp = null;
 
+  private String branchName;
+
   /**
    * Used only for serialization.
    */
@@ -182,6 +184,7 @@
     newTab.setVersionIntervalFrom(this.versionIntervalFrom);
 
     newTab.setMetaTable(this.getMetaTable());
+    newTab.setBranchName(this.getBranchName());
     return newTab;
   }
 
@@ -1357,6 +1360,14 @@
     this.metaTable = metaTable;
   }
 
+  public String getBranchName() {
+    return branchName;
+  }
+
+  public void setBranchName(String branchName) {
+    this.branchName = branchName;
+  }
+
   public SourceTable createSourceTable() {
     SourceTable sourceTable = new SourceTable();
     sourceTable.setTable(this.tTable);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
index d8beb0c..7e40f0d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
@@ -83,6 +83,8 @@
         .add(HiveParser.Identifier, hTbl.getHiveTableMD().getTableName());
     if (hTbl.getHiveTableMD().getMetaTable() != null) {
       tableNameBuilder.add(HiveParser.Identifier, hTbl.getHiveTableMD().getMetaTable());
+    } else if (hTbl.getHiveTableMD().getBranchName() != null) {
+      tableNameBuilder.add(HiveParser.Identifier, hTbl.getHiveTableMD().getBranchName());
     }
 
     ASTBuilder b = ASTBuilder.construct(HiveParser.TOK_TABREF, "TOK_TABREF").add(tableNameBuilder);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java
index 9b6312f..63a58ff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java
@@ -181,6 +181,8 @@
     String key = tab.getFullyQualifiedName() + ";";
     if (tab.getMetaTable() != null) {
       key = tab.getFullyQualifiedName() + "." + tab.getMetaTable() + ";";
+    } else if (tab.getBranchName() != null) {
+      key = tab.getFullyQualifiedName() + "." + tab.getBranchName() + ";";
     }
 
     if (!tab.isPartitioned()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index 36fd7e4..a8d271f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -434,16 +434,16 @@
       // table node
       Map.Entry<String,String> dbTablePair = getDbTableNamePair(tableOrColumnNode);
       String tableName = dbTablePair.getValue();
-      String metaTable = null;
+      String tableMetaRef = null;
       if (tableName.contains(".")) {
         String[] tmpNames = tableName.split("\\.");
         tableName = tmpNames[0];
-        metaTable = tmpNames[1];
+        tableMetaRef = tmpNames[1];
       }
       return TableName.fromString(tableName,
           null,
           dbTablePair.getKey() == null ? currentDatabase : dbTablePair.getKey(),
-          metaTable)
+          tableMetaRef)
           .getNotEmptyDbTable();
     } else if (tokenType == HiveParser.StringLiteral) {
       return unescapeSQLString(tableOrColumnNode.getText());
@@ -480,8 +480,8 @@
     if (tabNameNode.getChildCount() == 3) {
       final String dbName = unescapeIdentifier(tabNameNode.getChild(0).getText());
       final String tableName = unescapeIdentifier(tabNameNode.getChild(1).getText());
-      final String metaTableName = unescapeIdentifier(tabNameNode.getChild(2).getText());
-      return HiveTableName.fromString(tableName, catalogName, dbName, metaTableName);
+      final String tableMetaRef = unescapeIdentifier(tabNameNode.getChild(2).getText());
+      return HiveTableName.fromString(tableName, catalogName, dbName, tableMetaRef);
     }
 
     if (tabNameNode.getChildCount() == 2) {
@@ -1859,7 +1859,7 @@
   }
 
   protected Table getTable(TableName tn, boolean throwException) throws SemanticException {
-    return getTable(tn.getDb(), tn.getTable(), tn.getMetaTable(), throwException);
+    return getTable(tn.getDb(), tn.getTable(), tn.getTableMetaRef(), throwException);
   }
 
   protected Table getTable(String tblName) throws SemanticException {
@@ -1874,13 +1874,13 @@
     return getTable(database, tblName, null, throwException);
   }
 
-  protected Table getTable(String database, String tblName, String metaTableName, boolean throwException)
+  protected Table getTable(String database, String tblName, String tableMetaRef, boolean throwException)
       throws SemanticException {
     Table tab;
     try {
-      String tableName = metaTableName == null ? tblName : tblName + "." + metaTableName;
+      String tableName = tableMetaRef == null ? tblName : tblName + "." + tableMetaRef;
       tab = database == null ? db.getTable(tableName, false)
-          : db.getTable(database, tblName, metaTableName, false);
+          : db.getTable(database, tblName, tableMetaRef, false);
     }
     catch (InvalidTableException e) {
       throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(TableName.fromString(tblName, null, database).getNotEmptyDbTable()), e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index 4da0ff0..9e3f02b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -5406,16 +5406,16 @@
     String[] names = Utilities.getDbTableName(tabName);
     final String  tableName = names[1];
     final String  dbName = names[0];
-    String metaTable = null;
+    String tableMetaRef = null;
     if (names.length == 3) {
-      metaTable = names[2];
+      tableMetaRef = names[2];
     }
     String fullyQualName = dbName + "." + tableName;
-    if (metaTable != null) {
-      fullyQualName = fullyQualName + "." + metaTable;
+    if (tableMetaRef != null) {
+      fullyQualName += "." + tableMetaRef;
     }
     if (!tabNameToTabObject.containsKey(fullyQualName)) {
-      Table table = db.getTable(dbName, tableName, metaTable, throwException, true, false);
+      Table table = db.getTable(dbName, tableName, tableMetaRef, throwException, true, false);
       if (table != null) {
         tabNameToTabObject.put(fullyQualName, table);
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveTableName.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveTableName.java
index cd9f88c..166baf1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveTableName.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveTableName.java
@@ -95,11 +95,15 @@
    */
   // to be @Deprecated
   public static TableName ofNullable(String dbTableName, String defaultDb) throws SemanticException {
+    return ofNullable(dbTableName, defaultDb, null);
+  }
+
+  public static TableName ofNullable(String dbTableName, String defaultDb, String tableMetaRef) throws SemanticException {
     if (dbTableName == null) {
       return new TableName(null, null, null);
     } else {
       try {
-        return fromString(dbTableName, SessionState.get().getCurrentCatalog(), defaultDb);
+        return fromString(dbTableName, SessionState.get().getCurrentCatalog(), defaultDb, tableMetaRef);
       } catch (IllegalArgumentException e) {
         throw new SemanticException(e);
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
index 7f61777..aac61b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
@@ -211,8 +211,11 @@
   /**
    * See also {@link #getInsertOverwriteTables()}
    */
-  public boolean isInsertIntoTable(String dbName, String table) {
-    String fullName = dbName + "." + table;
+  public boolean isInsertIntoTable(String dbName, String table, String branchName) {
+    String  fullName = dbName + "." + table;
+    if (branchName != null) {
+      fullName += "." + branchName;
+    }
     return insertIntoTables.containsKey(fullName.toLowerCase());
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
index d555983..d5350fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
@@ -244,7 +244,7 @@
 
     Table mTable;
     try {
-      mTable = db.getTable(tableName.getDb(), tableName.getTable(), throwException);
+      mTable = db.getTable(tableName.getDb(), tableName.getTable(), tableName.getTableMetaRef(), throwException);
     } catch (InvalidTableException e) {
       LOG.error("Failed to find table " + tableName.getNotEmptyDbTable() + " got exception " + e.getMessage());
       throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName.getNotEmptyDbTable()), e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 3f15bc3..2f7f922 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -2398,7 +2398,7 @@
         }
 
         boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(ts.tableHandle.getDbName(),
-            ts.tableHandle.getTableName());
+            ts.tableHandle.getTableName(), ts.tableHandle.getBranchName());
         isTableWrittenTo |= (qb.getParseInfo().getInsertOverwriteTables().
             get(getUnescapedName((ASTNode) ast.getChild(0), ts.tableHandle.getDbName()).toLowerCase()) != null);
         assert isTableWrittenTo :
@@ -7386,7 +7386,7 @@
           throw new SemanticException("Failed to allocate write Id", ex);
         }
         boolean isReplace = !qb.getParseInfo().isInsertIntoTable(
-            destinationTable.getDbName(), destinationTable.getTableName());
+            destinationTable.getDbName(), destinationTable.getTableName(), destinationTable.getBranchName());
         ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, dpCtx, acidOp, isReplace, writeId);
         if (writeId != null) {
           ltd.setStmtId(txnMgr.getCurrentStmtId());
@@ -7395,7 +7395,7 @@
         // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old
         // deltas and base and leave them up to the cleaner to clean up
         boolean isInsertInto = qb.getParseInfo().isInsertIntoTable(
-            destinationTable.getDbName(), destinationTable.getTableName());
+            destinationTable.getDbName(), destinationTable.getTableName(), destinationTable.getBranchName());
         LoadFileType loadType;
         if (isDirectInsert) {
           loadType = LoadFileType.IGNORE;
@@ -7414,8 +7414,8 @@
         // We need to set stats as inaccurate.
         setStatsForNonNativeTable(destinationTable.getDbName(), destinationTable.getTableName());
         // true if it is insert overwrite.
-        boolean overwrite = !qb.getParseInfo().isInsertIntoTable(
-            String.format("%s.%s", destinationTable.getDbName(), destinationTable.getTableName()));
+        boolean overwrite = !qb.getParseInfo().isInsertIntoTable(destinationTable.getDbName(), destinationTable.getTableName(),
+            destinationTable.getBranchName());
         createPreInsertDesc(destinationTable, overwrite);
 
         ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, partSpec == null ? ImmutableMap.of() : partSpec);
@@ -8006,12 +8006,12 @@
         && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
       if (destType == QBMetaData.DEST_TABLE) {
         genAutoColumnStatsGatheringPipeline(destinationTable, partSpec, input,
-            qb.getParseInfo().isInsertIntoTable(destinationTable.getDbName(), destinationTable.getTableName()),
-            false);
+            qb.getParseInfo().isInsertIntoTable(destinationTable.getDbName(), destinationTable.getTableName(),
+                destinationTable.getBranchName()), false);
       } else if (destType == QBMetaData.DEST_PARTITION) {
         genAutoColumnStatsGatheringPipeline(destinationTable, destinationPartition.getSpec(), input,
-            qb.getParseInfo().isInsertIntoTable(destinationTable.getDbName(), destinationTable.getTableName()),
-            false);
+            qb.getParseInfo().isInsertIntoTable(destinationTable.getDbName(), destinationTable.getTableName(),
+                destinationTable.getBranchName()), false);
       } else if (destType == QBMetaData.DEST_LOCAL_FILE || destType == QBMetaData.DEST_DFS_FILE) {
         // CTAS or CMV statement
         genAutoColumnStatsGatheringPipeline(destinationTable, null, input,
@@ -8492,7 +8492,7 @@
     // If the query here is an INSERT_INTO and the target is an immutable table,
     // verify that our destination is empty before proceeding
     if (!dest_tab.isImmutable() || !qb.getParseInfo().isInsertIntoTable(
-        dest_tab.getDbName(), dest_tab.getTableName())) {
+        dest_tab.getDbName(), dest_tab.getTableName(), dest_tab.getBranchName())) {
       return;
     }
     try {
@@ -15670,7 +15670,7 @@
     case HiveParser.TOK_TABNAME:
       TableName tableName = getQualifiedTableName(n);
       return HiveTableName.ofNullable(HiveUtils.unparseIdentifier(tableName.getTable(), this.conf),
-          HiveUtils.unparseIdentifier(tableName.getDb(), this.conf)).getNotEmptyDbTable();
+          HiveUtils.unparseIdentifier(tableName.getDb(), this.conf), tableName.getTableMetaRef()).getNotEmptyDbTable();
     case HiveParser.TOK_TABREF:
       return getFullTableNameForSQL((ASTNode) n.getChild(0));
     default:
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 9c45457..4e66917 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -115,6 +115,9 @@
   public static final String FROM_VERSION =
       "hive.io.version.from";
 
+  public static final String BRANCH_NAME =
+      "hive.io.branch.name";
+
   // input file name (big) to bucket number
   private Map<String, Integer> bucketFileNameMapping;
 
@@ -144,6 +147,8 @@
 
   private String asOfTimestamp = null;
 
+  private String branchName = null;
+
   public TableScanDesc() {
     this(null, null);
   }
@@ -174,6 +179,7 @@
       asOfTimestamp = tblMetadata.getAsOfTimestamp();
       asOfVersion = tblMetadata.getAsOfVersion();
       versionIntervalFrom = tblMetadata.getVersionIntervalFrom();
+      branchName = tblMetadata.getBranchName();
     }
     isTranscationalTable = AcidUtils.isTransactionalTable(this.tableMetadata);
     if (isTranscationalTable) {
@@ -543,6 +549,11 @@
     return asOfTimestamp;
   }
 
+  @Explain(displayName = "branch name")
+  public String getBranchName() {
+    return branchName;
+  }
+
   public class TableScanOperatorExplainVectorization extends OperatorExplainVectorization {
 
     private final TableScanDesc tableScanDesc;
diff --git a/ql/src/test/results/clientnegative/desc_failure3.q.out b/ql/src/test/results/clientnegative/desc_failure3.q.out
index 34a1c58..c30e256 100644
--- a/ql/src/test/results/clientnegative/desc_failure3.q.out
+++ b/ql/src/test/results/clientnegative/desc_failure3.q.out
@@ -12,4 +12,4 @@
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:db1
 POSTHOOK: Output: db1@t1
-FAILED: SemanticException [Error 10431]: Metadata tables are not supported for table t1.
+FAILED: SemanticException [Error 10431]: Table Meta Ref extension is not supported for table t1.
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/TableName.java b/storage-api/src/java/org/apache/hadoop/hive/common/TableName.java
index e3dd441..27ad59f 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/TableName.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/TableName.java
@@ -31,12 +31,13 @@
   /** Exception message thrown. */
   private static final String ILL_ARG_EXCEPTION_MSG =
       "Table name must be either <tablename>, <dbname>.<tablename> " + "or <catname>.<dbname>.<tablename>";
+  public static final String BRANCH_NAME_PREFIX = "branch_";
 
   /** Names of the related DB objects. */
   private final String cat;
   private final String db;
   private final String table;
-  private final String metaTable;
+  private final String tableMetaRef;
 
   /**
    *
@@ -47,14 +48,14 @@
    * @param dbName database name.  Cannot be null.  If you do not now it you can get it from
    *           SessionState.getCurrentDatabase() or use Warehouse.DEFAULT_DATABASE_NAME.
    * @param tableName  table name, cannot be null
-   * @param metaTable name
-   *           Use this to query Iceberg metadata tables.
+   * @param tableMetaRef name
+   *           Use this to query table meta ref, e.g. iceberg metadata table or branch
    */
-  public TableName(final String catName, final String dbName, final String tableName, String metaTable) {
+  public TableName(final String catName, final String dbName, final String tableName, String tableMetaRef) {
     this.cat = catName;
     this.db = dbName;
     this.table = tableName;
-    this.metaTable = metaTable;
+    this.tableMetaRef = tableMetaRef;
   }
 
   public TableName(final String catName, final String dbName, final String tableName) {
@@ -76,11 +77,11 @@
    * @param defaultDatabase default database to use if database is not in the name.  If you do
    *                        not now it you can get it from SessionState.getCurrentDatabase() or
    *                        use Warehouse.DEFAULT_DATABASE_NAME.
-   * @param metaTable When querying Iceberg metadata tables, set this parameter.
+   * @param tableMetaRef When querying Iceberg meta ref, e.g. metadata table or branch, set this parameter.
    * @return TableName
    * @throws IllegalArgumentException if a non-null name is given
    */
-  public static TableName fromString(final String name, final String defaultCatalog, final String defaultDatabase, String metaTable)
+  public static TableName fromString(final String name, final String defaultCatalog, final String defaultDatabase, String tableMetaRef)
       throws IllegalArgumentException {
     if (name == null) {
       throw new IllegalArgumentException(String.join("", "Table value was null. ", ILL_ARG_EXCEPTION_MSG));
@@ -90,13 +91,17 @@
       if (names.length == 2) {
         return new TableName(defaultCatalog, names[0], names[1], null);
       } else if (names.length == 3) {
-        return new TableName(names[0], names[1], names[2], null);
+        if (names[2].startsWith(BRANCH_NAME_PREFIX)) {
+          return new TableName(defaultCatalog, names[0], names[1], names[2]);
+        } else {
+          return new TableName(names[0], names[1], names[2], null);
+        }
       } else {
         throw new IllegalArgumentException(ILL_ARG_EXCEPTION_MSG);
       }
 
     } else {
-      return new TableName(defaultCatalog, defaultDatabase, name, metaTable);
+      return new TableName(defaultCatalog, defaultDatabase, name, tableMetaRef);
     }
   }
 
@@ -112,8 +117,8 @@
     return table;
   }
 
-  public String getMetaTable() {
-    return metaTable;
+  public String getTableMetaRef() {
+    return tableMetaRef;
   }
 
   /**
@@ -139,8 +144,8 @@
    * Get the name in db.table format, if db is not empty, otherwise pass only the table name.
    */
   public String getNotEmptyDbTable() {
-    String metaTableName = metaTable == null ? "" : "." + metaTable;
-    return db == null || db.trim().isEmpty() ? table : db + DatabaseName.CAT_DB_TABLE_SEPARATOR + table + metaTableName;
+    String metaRefName = tableMetaRef == null ? "" : "." + tableMetaRef;
+    return db == null || db.trim().isEmpty() ? table : db + DatabaseName.CAT_DB_TABLE_SEPARATOR + table + metaRefName;
   }
 
   /**
diff --git a/storage-api/src/test/org/apache/hadoop/hive/common/TestTableName.java b/storage-api/src/test/org/apache/hadoop/hive/common/TestTableName.java
index bcd28f6..e6013be 100644
--- a/storage-api/src/test/org/apache/hadoop/hive/common/TestTableName.java
+++ b/storage-api/src/test/org/apache/hadoop/hive/common/TestTableName.java
@@ -37,7 +37,7 @@
     Assert.assertEquals("cat", name.getCat());
     Assert.assertEquals("db", name.getDb());
     Assert.assertEquals("t", name.getTable());
-    Assert.assertEquals("meta", name.getMetaTable());
+    Assert.assertEquals("meta", name.getTableMetaRef());
     Assert.assertEquals("cat.db.t", name.toString());
     Assert.assertEquals("db.t", name.getDbTable());
   }
@@ -63,7 +63,7 @@
     Assert.assertEquals("cat", name.getCat());
     Assert.assertEquals("db", name.getDb());
     Assert.assertEquals("tab", name.getTable());
-    Assert.assertEquals("metatable", name.getMetaTable());
+    Assert.assertEquals("metatable", name.getTableMetaRef());
 
     try {
       TableName.fromString(null, null, null, null);