HIVE-29248: Propagate HiveAccessControlException to HiveCatalog (#6171)

* Add test cases to reproduce HiveAccessControlException

* Classify HiveAccessControlException as ForbiddenException

* Add core classifier
diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
index 08f16e6..656ef34 100644
--- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
+++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
@@ -28,6 +28,7 @@
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.iceberg.ClientPoolImpl;
 import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.exceptions.ForbiddenException;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
@@ -76,6 +77,31 @@ protected IMetaStoreClient newClient()  {
   }
 
   @Override
+  public <R> R run(Action<R, IMetaStoreClient, TException> action) throws TException, InterruptedException {
+    try {
+      return super.run(action);
+    } catch (MetaException e) {
+      if (isAccessControlException(e)) {
+        throw new ForbiddenException(e, "Access denied: %s", e.getMessage());
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry)
+      throws TException, InterruptedException {
+    try {
+      return super.run(action, retry);
+    } catch (MetaException e) {
+      if (isAccessControlException(e)) {
+        throw new ForbiddenException(e, "Access denied: %s", e.getMessage());
+      }
+      throw e;
+    }
+  }
+
+  @Override
   protected IMetaStoreClient reconnect(IMetaStoreClient client) {
     try {
       client.close();
@@ -92,6 +118,11 @@ protected boolean isConnectionException(Exception e) {
         e.getMessage().contains("Got exception: org.apache.thrift.transport.TTransportException");
   }
 
+  private boolean isAccessControlException(MetaException exception) {
+    return exception.getMessage() != null && exception.getMessage().startsWith(
+        "Got exception: org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException");
+  }
+
   @Override
   protected void close(IMetaStoreClient client) {
     client.close();
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java
index 4f4f920..c53fe58 100644
--- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java
@@ -49,7 +49,7 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception {
       }
     }
 
-    metastore.start(hiveConfWithOverrides, 5, true);
+    metastore.start(hiveConfWithOverrides, 20, true);
     metastoreClient = new HiveMetaStoreClient(hiveConfWithOverrides);
     if (null != databaseName) {
       String dbPath = metastore.getDatabasePath(databaseName);
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/MockHiveAuthorizer.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/MockHiveAuthorizer.java
new file mode 100644
index 0000000..9935d11
--- /dev/null
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/MockHiveAuthorizer.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.util.List;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.AbstractHiveAuthorizer;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrincipal;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilege;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeInfo;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveRoleGrant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockHiveAuthorizer extends AbstractHiveAuthorizer {
+  public static final String PERMISSION_TEST_USER = "permission_test_user";
+  private static final Logger LOG = LoggerFactory.getLogger(MockHiveAuthorizer.class);
+
+  private final HiveAuthenticationProvider authenticator;
+
+  public MockHiveAuthorizer(HiveAuthenticationProvider authenticator) {
+    this.authenticator = authenticator;
+  }
+
+  @Override
+  public VERSION getVersion() {
+    return null;
+  }
+
+  @Override
+  public void grantPrivileges(List<HivePrincipal> hivePrincipals, List<HivePrivilege> hivePrivileges,
+      HivePrivilegeObject hivePrivObject, HivePrincipal grantorPrincipal, boolean grantOption) {
+    // NOP
+  }
+
+  @Override
+  public void revokePrivileges(List<HivePrincipal> hivePrincipals, List<HivePrivilege> hivePrivileges,
+      HivePrivilegeObject hivePrivObject, HivePrincipal grantorPrincipal, boolean grantOption) {
+    // NOP
+  }
+
+  @Override
+  public void createRole(String roleName, HivePrincipal adminGrantor) {
+    // NOP
+  }
+
+  @Override
+  public void dropRole(String roleName) {
+    // NOP
+  }
+
+  @Override
+  public List<HiveRoleGrant> getPrincipalGrantInfoForRole(String roleName) {
+    return List.of();
+  }
+
+  @Override
+  public List<HiveRoleGrant> getRoleGrantInfoForPrincipal(HivePrincipal principal) {
+    return List.of();
+  }
+
+  @Override
+  public void grantRole(List<HivePrincipal> hivePrincipals, List<String> roles, boolean grantOption,
+      HivePrincipal grantorPrinc) {
+    // NOP
+  }
+
+  @Override
+  public void revokeRole(List<HivePrincipal> hivePrincipals, List<String> roles, boolean grantOption,
+      HivePrincipal grantorPrinc) {
+    // NOP
+  }
+
+  @Override
+  public void checkPrivileges(HiveOperationType hiveOpType, List<HivePrivilegeObject> inputsHObjs,
+      List<HivePrivilegeObject> outputHObjs, HiveAuthzContext context) throws HiveAccessControlException {
+    LOG.info("Checking privileges. User={}, Operation={}, inputs={}, outputs={}", authenticator.getUserName(),
+        hiveOpType, inputsHObjs, outputHObjs);
+    if (PERMISSION_TEST_USER.equals(authenticator.getUserName())) {
+      throw new HiveAccessControlException(String.format("Unauthorized. Operation=%s, inputs=%s, outputs=%s",
+          hiveOpType, inputsHObjs, outputHObjs));
+    }
+  }
+
+  @Override
+  public List<HivePrivilegeObject> filterListCmdObjects(List<HivePrivilegeObject> listObjs, HiveAuthzContext context) {
+    return List.of();
+  }
+
+  @Override
+  public List<String> getAllRoles() {
+    return List.of();
+  }
+
+  @Override
+  public List<HivePrivilegeInfo> showPrivileges(HivePrincipal principal, HivePrivilegeObject privObj) {
+    return List.of();
+  }
+
+  @Override
+  public void setCurrentRole(String roleName) {
+    // NOP
+  }
+
+  @Override
+  public List<String> getCurrentRoleNames() {
+    return List.of();
+  }
+
+  @Override
+  public void applyAuthorizationConfigPolicy(HiveConf hiveConf) {
+    // NOP
+  }
+
+  @Override
+  public List<HivePrivilegeObject> applyRowFilterAndColumnMasking(HiveAuthzContext context,
+      List<HivePrivilegeObject> privObjs) {
+    return List.of();
+  }
+
+  @Override
+  public boolean needTransform() {
+    return false;
+  }
+}
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/MockHiveAuthorizerFactory.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/MockHiveAuthorizerFactory.java
new file mode 100644
index 0000000..b8ab06d
--- /dev/null
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/MockHiveAuthorizerFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactory;
+
+public class MockHiveAuthorizerFactory implements HiveAuthorizerFactory {
+  @Override
+  public HiveAuthorizer createHiveAuthorizer(HiveMetastoreClientFactory metastoreClientFactory, HiveConf conf,
+      HiveAuthenticationProvider hiveAuthenticator, HiveAuthzSessionContext ctx) {
+    return new MockHiveAuthorizer(hiveAuthenticator);
+  }
+}
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalogAccessControl.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalogAccessControl.java
new file mode 100644
index 0000000..8f704ad
--- /dev/null
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalogAccessControl.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.metastore.HiveMetaStoreAuthorizer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class TestHiveCatalogAccessControl {
+  private static final Schema DUMMY_SCHEMA = new Schema();
+
+  @RegisterExtension
+  private static final HiveMetastoreExtension HIVE_METASTORE_EXTENSION = HiveMetastoreExtension.builder()
+      .withConfig(Map.of(
+          ConfVars.HIVE_AUTHORIZATION_MANAGER.getVarname(), MockHiveAuthorizerFactory.class.getName(),
+          ConfVars.PRE_EVENT_LISTENERS.getVarname(), HiveMetaStoreAuthorizer.class.getName()
+      )).build();
+
+  @AfterEach
+  void afterEach() throws Exception {
+    HIVE_METASTORE_EXTENSION.metastore().reset();
+  }
+
+  @Test
+  void testNamespace() throws Exception {
+    var namespace = Namespace.of("permission_test_db");
+    asAuthorized(catalog -> catalog.createNamespace(namespace, Collections.emptyMap()));
+    asUnauthorized(catalog -> {
+      // Should HMS omit namespaces?
+      Assertions.assertThat(catalog.listNamespaces()).isEqualTo(List.of(Namespace.of("default"), namespace));
+      Assertions.assertThatThrownBy(() -> catalog.namespaceExists(namespace)).isInstanceOf(ForbiddenException.class);
+      Assertions.assertThatThrownBy(() -> catalog.loadNamespaceMetadata(namespace))
+          .isInstanceOf(ForbiddenException.class);
+      var newNamespace = Namespace.of("new_db");
+      Assertions.assertThatThrownBy(() -> catalog.createNamespace(newNamespace)).isInstanceOf(ForbiddenException.class);
+      Assertions.assertThatThrownBy(() -> catalog.dropNamespace(namespace)).isInstanceOf(ForbiddenException.class);
+      var properties = Collections.singletonMap("key", "value");
+      Assertions.assertThatThrownBy(() -> catalog.setProperties(namespace, properties))
+          .isInstanceOf(ForbiddenException.class);
+      var propertyKeys = properties.keySet();
+      Assertions.assertThatThrownBy(() -> catalog.removeProperties(namespace, propertyKeys))
+          .isInstanceOf(ForbiddenException.class);
+    });
+  }
+
+  @Test
+  void testTable() throws Exception {
+    Namespace namespace = Namespace.of("permission_test_db");
+    TableIdentifier table = TableIdentifier.of(namespace, "permission_test_table");
+    asAuthorized(catalog -> {
+      catalog.createNamespace(namespace, Collections.emptyMap());
+      catalog.createTable(table, new Schema());
+    });
+    asUnauthorized(catalog -> {
+      // Should HMS omit namespaces?
+      Assertions.assertThat(catalog.listTables(namespace)).isEqualTo(Collections.singletonList(table));
+      Assertions.assertThatThrownBy(() -> catalog.tableExists(table)).isInstanceOf(ForbiddenException.class);
+      Assertions.assertThatThrownBy(() -> catalog.loadTable(table)).isInstanceOf(ForbiddenException.class);
+      var newTable = TableIdentifier.of(namespace, "new_table");
+      Assertions.assertThatThrownBy(() -> catalog.createTable(newTable, DUMMY_SCHEMA))
+          .isInstanceOf(ForbiddenException.class);
+      Assertions.assertThatThrownBy(() -> catalog.renameTable(table, newTable)).isInstanceOf(ForbiddenException.class);
+      Assertions.assertThatThrownBy(() -> catalog.dropTable(table)).isInstanceOf(ForbiddenException.class);
+    });
+  }
+
+  @Test
+  void testView() throws Exception {
+    Namespace namespace = Namespace.of("permission_test_db");
+    TableIdentifier view = TableIdentifier.of(namespace, "permission_test_view");
+    asAuthorized(catalog -> {
+      catalog.createNamespace(namespace, Collections.emptyMap());
+      catalog.buildView(view).withQuery("hive", "SELECT 1 AS id").withSchema(new Schema())
+          .withDefaultNamespace(namespace).create();
+    });
+    asUnauthorized(catalog -> {
+      // Should HMS omit namespaces?
+      Assertions.assertThat(catalog.listViews(namespace)).isEqualTo(Collections.singletonList(view));
+      Assertions.assertThatThrownBy(() -> catalog.viewExists(view)).isInstanceOf(ForbiddenException.class);
+      Assertions.assertThatThrownBy(() -> catalog.loadView(view)).isInstanceOf(ForbiddenException.class);
+      var newView = TableIdentifier.of(namespace, "new_view");
+      var builder = catalog.buildView(newView).withQuery("hive", "SELECT 1 AS id").withSchema(DUMMY_SCHEMA)
+          .withDefaultNamespace(namespace);
+      Assertions.assertThatThrownBy(builder::create).isInstanceOf(ForbiddenException.class);
+      Assertions.assertThatThrownBy(() -> catalog.renameView(view, newView)).isInstanceOf(ForbiddenException.class);
+      Assertions.assertThatThrownBy(() -> catalog.dropView(view)).isInstanceOf(ForbiddenException.class);
+    });
+  }
+
+  @Test
+  void testTransaction() throws Exception {
+    var namespace = Namespace.of("permission_test_db");
+    asAuthorized(catalog -> catalog.createNamespace(namespace, Collections.emptyMap()));
+    asUnauthorized(catalog -> {
+      var newTable = TableIdentifier.of(namespace, "new_table");
+      Assertions.assertThatThrownBy(() -> catalog.newCreateTableTransaction(newTable, DUMMY_SCHEMA))
+          .isInstanceOf(ForbiddenException.class);
+      Assertions.assertThatThrownBy(() -> catalog.newReplaceTableTransaction(newTable, DUMMY_SCHEMA, true))
+          .isInstanceOf(ForbiddenException.class);
+    });
+  }
+
+  private static void asAuthorized(Consumer<HiveCatalog> consumer) throws Exception {
+    withUser("authorized_user", consumer);
+  }
+
+  private static void asUnauthorized(Consumer<HiveCatalog> consumer) throws Exception {
+    withUser(MockHiveAuthorizer.PERMISSION_TEST_USER, consumer);
+  }
+
+  private static void withUser(String username, Consumer<HiveCatalog> consumer) throws Exception {
+    var ugi = UserGroupInformation.createRemoteUser(username);
+    ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+      try (HiveCatalog catalog = createCatalog()) {
+        consumer.accept(catalog);
+        return null;
+      }
+    });
+  }
+
+  private static HiveCatalog createCatalog() {
+    return (HiveCatalog)
+        CatalogUtil.loadCatalog(
+            HiveCatalog.class.getName(),
+            UUID.randomUUID().toString(),
+            Map.of(
+                CatalogProperties.CLIENT_POOL_CACHE_KEYS, "ugi"
+            ),
+            HIVE_METASTORE_EXTENSION.hiveConf());
+  }
+}
diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
index 90f4f23..a53d694 100644
--- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
+++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
@@ -33,9 +33,12 @@
 import org.apache.hadoop.hive.metastore.IHMSHandler;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
+import org.apache.hadoop.hive.metastore.TUGIBasedProcessor;
 import org.apache.hadoop.hive.metastore.api.GetTableRequest;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.security.TUGIContainingTransport;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.iceberg.ClientPool;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -43,6 +46,7 @@
 import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.hadoop.Util;
 import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
@@ -244,9 +248,18 @@ private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf con
     baseHandler = HMS_HANDLER_CTOR.newInstance("new db based metaserver", serverConf);
     IHMSHandler handler = GET_BASE_HMS_HANDLER.invoke(serverConf, baseHandler, false);
 
+    TProcessor processor;
+    TTransportFactory transportFactory;
+    if (MetastoreConf.getBoolVar(conf, ConfVars.EXECUTE_SET_UGI)) {
+      processor = new TUGIBasedProcessor<>(handler);
+      transportFactory = new TUGIContainingTransport.Factory();
+    } else {
+      processor = new TSetIpAddressProcessor<>(handler);
+      transportFactory = new TTransportFactory();
+    }
     TThreadPoolServer.Args args = new TThreadPoolServer.Args(socket)
-        .processor(new TSetIpAddressProcessor<>(handler))
-        .transportFactory(new TTransportFactory())
+        .processor(processor)
+        .transportFactory(transportFactory)
         .protocolFactory(new TBinaryProtocol.Factory())
         .minWorkerThreads(poolSize)
         .maxWorkerThreads(poolSize);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/metastore/HiveMetaStoreAuthorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/metastore/HiveMetaStoreAuthorizer.java
index 7c15edd..4825ef9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/metastore/HiveMetaStoreAuthorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/metastore/HiveMetaStoreAuthorizer.java
@@ -134,7 +134,7 @@ public final void onEvent(PreEventContext preEventContext)
       }
     } catch (Exception e) {
       LOG.error("HiveMetaStoreAuthorizer.onEvent(): failed", e);
-      throw MetaStoreUtils.newMetaException(e);
+      MetaStoreUtils.throwMetaException(e);
     }
 
     LOG.debug("<== HiveMetaStoreAuthorizer.onEvent(): EventType=" + preEventContext.getEventType());
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/security/authorization/plugin/metastore/TestHiveMetaStoreAuthorizer.java b/ql/src/test/org/apache/hadoop/hive/ql/security/authorization/plugin/metastore/TestHiveMetaStoreAuthorizer.java
index 8975c60..ced07a1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/security/authorization/plugin/metastore/TestHiveMetaStoreAuthorizer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/security/authorization/plugin/metastore/TestHiveMetaStoreAuthorizer.java
@@ -204,7 +204,9 @@ public void testA_CreateDatabase_unAuthorizedUser() throws Exception {
       hmsHandler.create_database(db);
     } catch (Exception e) {
       String err = e.getMessage();
-      String expected = "Operation type " + HiveOperationType.CREATEDATABASE + " not allowed for user:" + unAuthorizedUser;
+      String expected = "Got exception: "
+          + "org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException Operation type "
+          + HiveOperationType.CREATEDATABASE + " not allowed for user:" + unAuthorizedUser;
       assertEquals(expected, err);
     }
   }
@@ -221,7 +223,9 @@ public void testB_CreateTable_unAuthorizedUser() throws Exception {
       hmsHandler.create_table(table);
     } catch (Exception e) {
       String err = e.getMessage();
-      String expected = "Operation type " + HiveOperationType.CREATETABLE + " not allowed for user:" + unAuthorizedUser;
+      String expected = "Got exception: "
+          + "org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException Operation type "
+          + HiveOperationType.CREATETABLE + " not allowed for user:" + unAuthorizedUser;
       assertEquals(expected, err);
     }
   }
@@ -297,7 +301,8 @@ public void testE_CreateRole__anyUser() throws Exception {
       hmsHandler.create_role(role);
     } catch (Exception e) {
       String err = e.getMessage();
-      String expected = "Operation type " + PreEventContext.PreEventType.AUTHORIZATION_API_CALL.name() + " not allowed for user:" + authorizedUser;
+      String expected = "Got exception: org.apache.hadoop.hive.metastore.api.MetaException Operation type "
+          + PreEventContext.PreEventType.AUTHORIZATION_API_CALL.name() + " not allowed for user:" + authorizedUser;
       assertEquals(expected, err);
     }
   }
@@ -313,7 +318,8 @@ public void testF_CreateCatalog_anyUser() throws Exception {
       hmsHandler.create_catalog(new CreateCatalogRequest(catalog));
     } catch (Exception e) {
       String err = e.getMessage();
-      String expected = "Operation type " + PreEventContext.PreEventType.CREATE_CATALOG.name() + " not allowed for user:" + authorizedUser;
+      String expected = "Got exception: org.apache.hadoop.hive.metastore.api.MetaException Operation type "
+          + PreEventContext.PreEventType.CREATE_CATALOG.name() + " not allowed for user:" + authorizedUser;
       assertEquals(expected, err);
     }
   }
@@ -658,7 +664,9 @@ public void testR_CreateDataConnector_unAuthorizedUser() {
       hmsHandler.create_dataconnector_req(connectorReq);
     } catch (Exception e) {
       String err = e.getMessage();
-      String expected = "Operation type " + HiveOperationType.CREATEDATACONNECTOR + " not allowed for user:" + unAuthorizedUser;
+      String expected = "Got exception: "
+          + "org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException Operation type "
+          + HiveOperationType.CREATEDATACONNECTOR + " not allowed for user:" + unAuthorizedUser;
       assertEquals(expected, err);
     }
   }
diff --git a/standalone-metastore/metastore-rest-catalog/pom.xml b/standalone-metastore/metastore-rest-catalog/pom.xml
index 0f8410c..c1468e4 100644
--- a/standalone-metastore/metastore-rest-catalog/pom.xml
+++ b/standalone-metastore/metastore-rest-catalog/pom.xml
@@ -45,6 +45,13 @@
     <!-- Test dependencies -->
     <dependency>
       <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${hive.version}</version>
+      <classifier>core</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
       <artifactId>hive-standalone-metastore-common</artifactId>
       <version>${hive.version}</version>
       <classifier>tests</classifier>
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/BaseRESTCatalogTests.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/BaseRESTCatalogTests.java
index 4500111..5d41d9c 100644
--- a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/BaseRESTCatalogTests.java
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/BaseRESTCatalogTests.java
@@ -19,16 +19,26 @@
 
 package org.apache.iceberg.rest;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.catalog.CatalogTests;
 import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableCommit;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.function.Executable;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 abstract class BaseRESTCatalogTests extends CatalogTests<RESTCatalog> {
@@ -36,6 +46,8 @@ abstract class BaseRESTCatalogTests extends CatalogTests<RESTCatalog> {
 
   protected abstract Map<String, String> getDefaultClientConfiguration() throws Exception;
 
+  protected abstract Optional<Map<String, String>> getPermissionTestClientConfiguration() throws Exception;
+
   @BeforeAll
   void setupAll() throws Exception {
     catalog = RCKUtils.initCatalogClient(getDefaultClientConfiguration());
@@ -82,4 +94,65 @@ protected boolean supportsNamesWithSlashes() {
   protected boolean supportsServerSideRetry() {
     return true;
   }
+
+  private void testUnauthorizedAccess(Executable executable) {
+    Assertions.assertThrows(ForbiddenException.class, executable);
+  }
+
+  @Test
+  void testPermissionsWithDeniedUser() throws Exception {
+    var properties = getPermissionTestClientConfiguration();
+    if (properties.isEmpty()) {
+      return;
+    }
+    var db = Namespace.of("permission_test_db");
+    var table = TableIdentifier.of(db, "test_table");
+    var view = TableIdentifier.of(db, "test_view");
+    try (var client = RCKUtils.initCatalogClient(getDefaultClientConfiguration())) {
+      client.createNamespace(db);
+      client.createTable(table, new Schema());
+      client.buildView(view).withQuery("hive", "SELECT count(*) FROM default.permission_test")
+          .withSchema(new Schema()).withDefaultNamespace(db).create();
+    } catch (IOException e) {
+      throw new AssertionError("Catalog operation failed", e);
+    }
+    try (var client = RCKUtils.initCatalogClient(properties.get())) {
+      // Should this fail?
+      Assertions.assertTrue(client.listNamespaces().contains(db));
+      testUnauthorizedAccess(() -> client.namespaceExists(db));
+      testUnauthorizedAccess(() -> client.loadNamespaceMetadata(db));
+      testUnauthorizedAccess(() -> client.createNamespace(Namespace.of("new-db")));
+      testUnauthorizedAccess(() -> client.dropNamespace(db));
+      testUnauthorizedAccess(() -> client.setProperties(db, Collections.singletonMap("key", "value")));
+      testUnauthorizedAccess(() -> client.removeProperties(db, Collections.singleton("key")));
+
+      // Should this fail?
+      Assertions.assertEquals(Collections.singletonList(table), client.listTables(db));
+      testUnauthorizedAccess(() -> client.tableExists(table));
+      testUnauthorizedAccess(() -> client.loadTable(table));
+      testUnauthorizedAccess(() -> client.createTable(TableIdentifier.of(db, "new-table"), new Schema()));
+      testUnauthorizedAccess(() -> client.renameTable(table, TableIdentifier.of(db, "new-table")));
+      testUnauthorizedAccess(() -> client.dropTable(table));
+
+      // Should this fail?
+      Assertions.assertEquals(Collections.singletonList(view), client.listViews(db));
+      testUnauthorizedAccess(() -> client.viewExists(view));
+      testUnauthorizedAccess(() -> client.loadView(view));
+      testUnauthorizedAccess(() -> client.buildView(TableIdentifier.of(db, "new-view"))
+          .withQuery("hive", "SELECT count(*) FROM default.permission_test").withSchema(new Schema())
+          .withDefaultNamespace(db).create());
+      testUnauthorizedAccess(() -> client.renameView(view, TableIdentifier.of(db, "new-view")));
+      testUnauthorizedAccess(() -> client.dropView(view));
+
+      testUnauthorizedAccess(() -> client.newCreateTableTransaction(TableIdentifier.of(db, "test"),
+          new Schema()));
+      testUnauthorizedAccess(() -> client.newReplaceTableTransaction(TableIdentifier.of(db, "test"),
+          new Schema(), true));
+      var dummyMetadata = TableMetadata.newTableMetadata(new Schema(), PartitionSpec.unpartitioned(),
+          "dummy-location", Collections.emptyMap());
+      testUnauthorizedAccess(() -> client.commitTransaction(TableCommit.create(table, dummyMetadata, dummyMetadata)));
+    } catch (IOException e) {
+      throw new AssertionError("Catalog operation failed", e);
+    }
+  }
 }
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogJwtAuth.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogJwtAuth.java
index 2a3f607..ff06a53 100644
--- a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogJwtAuth.java
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogJwtAuth.java
@@ -20,9 +20,11 @@
 package org.apache.iceberg.rest;
 
 import java.util.Map;
+import java.util.Optional;
 import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest;
 import org.apache.iceberg.exceptions.NotAuthorizedException;
+import org.apache.iceberg.rest.extension.MockHiveAuthorizer;
 import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension;
 import org.apache.iceberg.rest.extension.JwksServer;
 import org.junit.experimental.categories.Category;
@@ -44,6 +46,14 @@ protected Map<String, String> getDefaultClientConfiguration() throws Exception {
     );
   }
 
+  @Override
+  protected Optional<Map<String, String>> getPermissionTestClientConfiguration() throws Exception {
+    return Optional.of(Map.of(
+        "uri", REST_CATALOG_EXTENSION.getRestEndpoint(),
+        "token", JwksServer.generateValidJWT(MockHiveAuthorizer.PERMISSION_TEST_USER)
+    ));
+  }
+
   @Test
   void testWithUnauthorizedKey() throws Exception {
     // "token" is a parameter for OAuth 2.0 Bearer token authentication. We use it to pass a JWT token
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogNoneAuth.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogNoneAuth.java
index b993215..3414bbb 100644
--- a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogNoneAuth.java
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogNoneAuth.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.rest;
 
 import java.util.Map;
+import java.util.Optional;
 import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest;
 import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension;
@@ -38,4 +39,9 @@ protected Map<String, String> getDefaultClientConfiguration() {
         "uri", REST_CATALOG_EXTENSION.getRestEndpoint()
     );
   }
+
+  @Override
+  protected Optional<Map<String, String>> getPermissionTestClientConfiguration() {
+    return Optional.empty();
+  }
 }
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogOAuth2Jwt.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogOAuth2Jwt.java
index c140bc0..df1787c 100644
--- a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogOAuth2Jwt.java
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogOAuth2Jwt.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.rest;
 
 import java.util.Map;
+import java.util.Optional;
 import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest;
 import org.apache.iceberg.exceptions.NotAuthorizedException;
@@ -44,6 +45,16 @@ protected Map<String, String> getDefaultClientConfiguration() {
     );
   }
 
+  @Override
+  protected Optional<Map<String, String>> getPermissionTestClientConfiguration() {
+    return Optional.of(Map.of(
+        "uri", REST_CATALOG_EXTENSION.getRestEndpoint(),
+        "rest.auth.type", "oauth2",
+        "oauth2-server-uri", REST_CATALOG_EXTENSION.getOAuth2TokenEndpoint(),
+        "credential", REST_CATALOG_EXTENSION.getOAuth2ClientCredentialForPermissionTest()
+    ));
+  }
+
   @Test
   void testWithAccessToken() {
     Map<String, String> properties = Map.of(
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogOAuth2TokenIntrospection.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogOAuth2TokenIntrospection.java
index 46a6e36..765ba00 100644
--- a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogOAuth2TokenIntrospection.java
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogOAuth2TokenIntrospection.java
@@ -21,6 +21,7 @@
 import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.CATALOG_SERVLET_AUTH_OAUTH2_VALIDATION_METHOD;
 
 import java.util.Map;
+import java.util.Optional;
 import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest;
 import org.apache.iceberg.exceptions.NotAuthorizedException;
@@ -47,6 +48,16 @@ protected Map<String, String> getDefaultClientConfiguration() {
     );
   }
 
+  @Override
+  protected Optional<Map<String, String>> getPermissionTestClientConfiguration() {
+    return Optional.of(Map.of(
+        "uri", REST_CATALOG_EXTENSION.getRestEndpoint(),
+        "rest.auth.type", "oauth2",
+        "oauth2-server-uri", REST_CATALOG_EXTENSION.getOAuth2TokenEndpoint(),
+        "credential", REST_CATALOG_EXTENSION.getOAuth2ClientCredentialForPermissionTest()
+    ));
+  }
+
   @Test
   void testWithAccessToken() {
     Map<String, String> properties = Map.of(
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogSimpleAuth.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogSimpleAuth.java
index 9699c6c..699233c 100644
--- a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogSimpleAuth.java
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestRESTCatalogSimpleAuth.java
@@ -20,9 +20,11 @@
 package org.apache.iceberg.rest;
 
 import java.util.Map;
+import java.util.Optional;
 import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest;
 import org.apache.iceberg.exceptions.NotAuthorizedException;
+import org.apache.iceberg.rest.extension.MockHiveAuthorizer;
 import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension;
 import org.junit.experimental.categories.Category;
 import org.junit.jupiter.api.Assertions;
@@ -43,6 +45,14 @@ protected Map<String, String> getDefaultClientConfiguration() {
     );
   }
 
+  @Override
+  protected Optional<Map<String, String>> getPermissionTestClientConfiguration() {
+    return Optional.of(Map.of(
+        "uri", REST_CATALOG_EXTENSION.getRestEndpoint(),
+        "header.x-actor-username", MockHiveAuthorizer.PERMISSION_TEST_USER
+    ));
+  }
+
   @Test
   void testWithoutUserName() {
     Map<String, String> properties = Map.of(
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HiveRESTCatalogServerExtension.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HiveRESTCatalogServerExtension.java
index 38ee151..671fcc0 100644
--- a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HiveRESTCatalogServerExtension.java
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/HiveRESTCatalogServerExtension.java
@@ -138,6 +138,10 @@ public String getOAuth2ClientCredential() {
     return authorizationServer.getClientCredential();
   }
 
+  public String getOAuth2ClientCredentialForPermissionTest() {
+    return authorizationServer.getClientCredentialForPermissionTest();
+  }
+
   public String getOAuth2AccessToken() {
     return authorizationServer.getAccessToken();
   }
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizer.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizer.java
new file mode 100644
index 0000000..4dd2600
--- /dev/null
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizer.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.rest.extension;
+
+import java.util.List;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.AbstractHiveAuthorizer;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrincipal;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilege;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeInfo;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveRoleGrant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockHiveAuthorizer extends AbstractHiveAuthorizer {
+  public static final String PERMISSION_TEST_USER = "permission_test_user";
+  private static final Logger LOG = LoggerFactory.getLogger(MockHiveAuthorizer.class);
+
+  private final HiveAuthenticationProvider authenticator;
+
+  public MockHiveAuthorizer(HiveAuthenticationProvider authenticator) {
+    this.authenticator = authenticator;
+  }
+
+  @Override
+  public VERSION getVersion() {
+    return null;
+  }
+
+  @Override
+  public void grantPrivileges(List<HivePrincipal> hivePrincipals, List<HivePrivilege> hivePrivileges,
+      HivePrivilegeObject hivePrivObject, HivePrincipal grantorPrincipal, boolean grantOption) {
+    // NOP
+  }
+
+  @Override
+  public void revokePrivileges(List<HivePrincipal> hivePrincipals, List<HivePrivilege> hivePrivileges,
+      HivePrivilegeObject hivePrivObject, HivePrincipal grantorPrincipal, boolean grantOption) {
+    // NOP
+  }
+
+  @Override
+  public void createRole(String roleName, HivePrincipal adminGrantor) {
+    // NOP
+  }
+
+  @Override
+  public void dropRole(String roleName) {
+    // NOP
+  }
+
+  @Override
+  public List<HiveRoleGrant> getPrincipalGrantInfoForRole(String roleName) {
+    return List.of();
+  }
+
+  @Override
+  public List<HiveRoleGrant> getRoleGrantInfoForPrincipal(HivePrincipal principal) {
+    return List.of();
+  }
+
+  @Override
+  public void grantRole(List<HivePrincipal> hivePrincipals, List<String> roles, boolean grantOption,
+      HivePrincipal grantorPrinc) {
+    // NOP
+  }
+
+  @Override
+  public void revokeRole(List<HivePrincipal> hivePrincipals, List<String> roles, boolean grantOption,
+      HivePrincipal grantorPrinc) {
+    // NOP
+  }
+
+  @Override
+  public void checkPrivileges(HiveOperationType hiveOpType, List<HivePrivilegeObject> inputsHObjs,
+      List<HivePrivilegeObject> outputHObjs, HiveAuthzContext context) throws HiveAccessControlException {
+    LOG.info("Checking privileges. User={}, Operation={}, inputs={}, outputs={}", authenticator.getUserName(),
+        hiveOpType, inputsHObjs, outputHObjs);
+    if (PERMISSION_TEST_USER.equals(authenticator.getUserName())) {
+      throw new HiveAccessControlException(String.format("Unauthorized. Operation=%s, inputs=%s, outputs=%s",
+          hiveOpType, inputsHObjs, outputHObjs));
+    }
+  }
+
+  @Override
+  public List<HivePrivilegeObject> filterListCmdObjects(List<HivePrivilegeObject> listObjs, HiveAuthzContext context) {
+    return List.of();
+  }
+
+  @Override
+  public List<String> getAllRoles() {
+    return List.of();
+  }
+
+  @Override
+  public List<HivePrivilegeInfo> showPrivileges(HivePrincipal principal, HivePrivilegeObject privObj) {
+    return List.of();
+  }
+
+  @Override
+  public void setCurrentRole(String roleName) {
+    // NOP
+  }
+
+  @Override
+  public List<String> getCurrentRoleNames() {
+    return List.of();
+  }
+
+  @Override
+  public void applyAuthorizationConfigPolicy(HiveConf hiveConf) {
+    // NOP
+  }
+
+  @Override
+  public List<HivePrivilegeObject> applyRowFilterAndColumnMasking(HiveAuthzContext context,
+      List<HivePrivilegeObject> privObjs) {
+    return List.of();
+  }
+
+  @Override
+  public boolean needTransform() {
+    return false;
+  }
+}
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizerFactory.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizerFactory.java
new file mode 100644
index 0000000..4cfbefd
--- /dev/null
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizerFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.rest.extension;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactory;
+
+public class MockHiveAuthorizerFactory implements HiveAuthorizerFactory {
+  @Override
+  public HiveAuthorizer createHiveAuthorizer(HiveMetastoreClientFactory metastoreClientFactory, HiveConf conf,
+      HiveAuthenticationProvider hiveAuthenticator, HiveAuthzSessionContext ctx) {
+    return new MockHiveAuthorizer(hiveAuthenticator);
+  }
+}
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/OAuth2AuthorizationServer.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/OAuth2AuthorizationServer.java
index 4f339e0..de66d11 100644
--- a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/OAuth2AuthorizationServer.java
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/OAuth2AuthorizationServer.java
@@ -41,6 +41,8 @@ public class OAuth2AuthorizationServer {
   static final String HMS_SECRET = "hive-metastore-secret";
   private static final String ICEBERG_CLIENT_ID = "iceberg-client";
   private static final String ICEBERG_CLIENT_SECRET = "iceberg-client-secret";
+  private static final String ICEBERG_CLIENT_ID_PERMISSION_TEST = "iceberg-client-permission-test";
+  private static final String ICEBERG_CLIENT_SECRET_PERMISSION_TEST = "iceberg-client-secret-permission-test";
 
   private GenericContainer<?> container;
   private Keycloak keycloak;
@@ -99,14 +101,14 @@ private static ProtocolMapperRepresentation createAudience() {
     return aud;
   }
 
-  private static ProtocolMapperRepresentation createEmailClaim() {
+  private static ProtocolMapperRepresentation createEmailClaim(String username) {
     var mapper = new ProtocolMapperRepresentation();
     mapper.setName("email");
     mapper.setProtocol("openid-connect");
     mapper.setProtocolMapper("oidc-hardcoded-claim-mapper");
     mapper.setConfig(Map.of(
         "claim.name", "email",
-        "claim.value", "iceberg-user@example.com",
+        "claim.value", username + "@example.com",
         "jsonType.label", "String",
         "access.token.claim", "true"
     ));
@@ -114,10 +116,10 @@ private static ProtocolMapperRepresentation createEmailClaim() {
   }
 
   private void createClient(RealmResource realm, List<String> scopes,
-      List<ProtocolMapperRepresentation> protocolMappers) {
+      List<ProtocolMapperRepresentation> protocolMappers, String clientId, String clientSecret) {
     var client = new ClientRepresentation();
-    client.setClientId(ICEBERG_CLIENT_ID);
-    client.setSecret(ICEBERG_CLIENT_SECRET);
+    client.setClientId(clientId);
+    client.setSecret(clientSecret);
     client.setEnabled(true);
     client.setProtocol("openid-connect");
     client.setPublicClient(false);
@@ -129,6 +131,13 @@ private void createClient(RealmResource realm, List<String> scopes,
     realm.clients().create(client).close();
   }
 
+  private void createClients(RealmResource realm, List<String> scopes, ProtocolMapperRepresentation audience) {
+    createClient(realm, scopes, List.of(audience, createEmailClaim("iceberg-user")), ICEBERG_CLIENT_ID,
+        ICEBERG_CLIENT_SECRET);
+    createClient(realm, scopes, List.of(audience, createEmailClaim(MockHiveAuthorizer.PERMISSION_TEST_USER)),
+        ICEBERG_CLIENT_ID_PERMISSION_TEST, ICEBERG_CLIENT_SECRET_PERMISSION_TEST);
+  }
+
   private static String getAccessToken(String url, List<String> scopes) {
     try (var keycloak = KeycloakBuilder.builder()
         .serverUrl(url)
@@ -162,8 +171,7 @@ public void start() {
 
     createScope(realm);
     var audience = createAudience();
-    var email = createEmailClaim();
-    createClient(realm, List.of("catalog"), List.of(audience, email));
+    createClients(realm, List.of("catalog"), audience);
     accessToken = getAccessToken(base, List.of("catalog"));
   }
 
@@ -186,10 +194,14 @@ public String getClientCredential() {
     return "%s:%s".formatted(ICEBERG_CLIENT_ID, ICEBERG_CLIENT_SECRET);
   }
 
+  public String getClientCredentialForPermissionTest() {
+    return "%s:%s".formatted(ICEBERG_CLIENT_ID_PERMISSION_TEST, ICEBERG_CLIENT_SECRET_PERMISSION_TEST);
+  }
+
   public String getAccessToken() {
     return accessToken;
   }
-  
+
   public String getKeycloackContainerDockerInternalHostName() {
     return container.getNetworkAliases().get(0);
   }
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/RESTCatalogServer.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/RESTCatalogServer.java
index 49d5ca7..836e18c 100644
--- a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/RESTCatalogServer.java
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/RESTCatalogServer.java
@@ -23,11 +23,14 @@
 import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStorePreEventListener;
 import org.apache.hadoop.hive.metastore.MetaStoreSchemaInfo;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.metastore.HiveMetaStoreAuthorizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,6 +71,11 @@ public void start(Configuration conf) throws Exception {
 
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.SCHEMA_INFO_CLASS, schemaInfoClass.getCanonicalName());
 
+    MetastoreConf.setClass(conf, ConfVars.HIVE_AUTHORIZATION_MANAGER, MockHiveAuthorizerFactory.class,
+        HiveAuthorizerFactory.class);
+    MetastoreConf.setClass(conf, ConfVars.PRE_EVENT_LISTENERS, HiveMetaStoreAuthorizer.class,
+        MetaStorePreEventListener.class);
+
     for (int i = 0; i < MetaStoreTestUtils.RETRY_COUNT; i++) {
       try {
         restPort = MetaStoreTestUtils.findFreePort();