HDDS-2954. Support admin groups (#3693)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/recon/ReconConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/recon/ReconConfigKeys.java
index e5122f8..a1823b3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/recon/ReconConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/recon/ReconConfigKeys.java
@@ -65,4 +65,7 @@
    */
   public static final String OZONE_RECON_ADMINISTRATORS =
       "ozone.recon.administrators";
+
+  public static final String OZONE_RECON_ADMINISTRATORS_GROUPS =
+      "ozone.recon.administrators.groups";
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 9894298..446070b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -129,6 +129,9 @@
    * */
   public static final String OZONE_ADMINISTRATORS =
       "ozone.administrators";
+
+  public static final String OZONE_ADMINISTRATORS_GROUPS =
+      "ozone.administrators.groups";
   /**
    * Used only for testing purpose. Results in making every user an admin.
    * */
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 9fa3dd3..5ff818f 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -376,6 +376,17 @@
     </description>
   </property>
   <property>
+    <name>ozone.administrators.groups</name>
+    <value/>
+    <tag>OZONE, SECURITY</tag>
+    <description>Ozone administrator groups delimited by the comma.
+      This is the list of groups who can access admin only information
+      from ozone.
+      It is enough to either have the name defined in ozone.administrators
+      or be directly or indirectly in a group defined in this property.
+    </description>
+  </property>
+  <property>
     <name>ozone.block.deleting.container.limit.per.interval</name>
     <value>10</value>
     <tag>OZONE, PERFORMANCE, SCM</tag>
@@ -2546,6 +2557,18 @@
     </description>
   </property>
   <property>
+    <name>ozone.recon.administrators.groups</name>
+    <value/>
+    <tag>RECON, SECURITY</tag>
+    <description>
+      Recon administrator groups delimited by a comma.
+      This is the list of groups who can access admin only information
+      from recon.
+      It is enough to either have the name defined in ozone.recon.administrators
+      or be directly or indirectly in a group defined in this property.
+    </description>
+  </property>
+  <property>
     <name>hdds.datanode.http.auth.type</name>
     <value>simple</value>
     <tag>DATANODE, SECURITY, KERBEROS</tag>
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneAdmins.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneAdmins.java
new file mode 100644
index 0000000..342e10a
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneAdmins.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
+
+/**
+ * This class contains ozone admin user information, username and group,
+ * and is able to check whether the provided {@link UserGroupInformation}
+ * has admin permissions.
+ */
+public class OzoneAdmins {
+
+  /**
+   * Ozone super user / admin username list.
+   */
+  private final Set<String> adminUsernames;
+  /**
+   * Ozone super user / admin group list.
+   */
+  private final Set<String> adminGroups;
+
+  public OzoneAdmins(Collection<String> adminUsernames) {
+    this(adminUsernames, null);
+  }
+
+  public OzoneAdmins(Collection<String> adminUsernames,
+      Collection<String> adminGroups) {
+    this.adminUsernames = adminUsernames != null ?
+        Collections.unmodifiableSet(new LinkedHashSet<>(adminUsernames)) :
+        Collections.emptySet();
+    this.adminGroups = adminGroups != null ?
+        Collections.unmodifiableSet(new LinkedHashSet<>(adminGroups)) :
+        Collections.emptySet();
+  }
+
+  private boolean hasAdminGroup(Collection<String> userGroups) {
+    return !Sets.intersection(adminGroups,
+        new LinkedHashSet<>(userGroups)).isEmpty();
+  }
+
+  /**
+   * Check whether the provided {@link UserGroupInformation user}
+   * has admin permissions.
+   *
+   * @param user
+   * @return
+   */
+  public boolean isAdmin(UserGroupInformation user) {
+    return adminUsernames.contains(OZONE_ADMINISTRATORS_WILDCARD)
+        || adminUsernames.contains(user.getShortUserName())
+        || adminUsernames.contains(user.getUserName())
+        || hasAdminGroup(user.getGroups());
+  }
+
+  public Collection<String> getAdminGroups() {
+    return adminGroups;
+  }
+
+  public Set<String> getAdminUsernames() {
+    return adminUsernames;
+  }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/BaseHttpServer.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/BaseHttpServer.java
index c3e7a5f..4315dcd 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/BaseHttpServer.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/BaseHttpServer.java
@@ -45,6 +45,7 @@
 import static org.apache.hadoop.hdds.HddsUtils.createDir;
 import static org.apache.hadoop.hdds.server.http.HttpConfig.getHttpPolicy;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_GROUPS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_HTTPS_NEED_AUTH_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_HTTP_SECURITY_ENABLED_DEFAULT;
@@ -194,9 +195,12 @@
       final InetSocketAddress httpsAddr, String name) throws IOException {
     HttpConfig.Policy policy = getHttpPolicy(conf);
 
+    String userString = conf.get(OZONE_ADMINISTRATORS, "");
+    String groupString = conf.get(OZONE_ADMINISTRATORS_GROUPS, "");
+
     HttpServer2.Builder builder = new HttpServer2.Builder().setName(name)
-        .setConf(conf).setACL(new AccessControlList(conf.get(
-            OZONE_ADMINISTRATORS, " ")));
+        .setConf(conf)
+        .setACL(new AccessControlList(userString, groupString));
 
     // initialize the webserver for uploading/downloading files.
     if (policy.isHttpEnabled()) {
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
index 9fa071f..03f2fdc 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
@@ -33,6 +33,7 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.hadoop.hdds.server.OzoneAdmins;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 
@@ -45,7 +46,6 @@
 import org.apache.commons.compress.utils.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
 
 import org.apache.hadoop.security.UserGroupInformation;
@@ -66,11 +66,12 @@
 
   private boolean aclEnabled;
   private boolean isSpnegoEnabled;
-  private Collection<String> allowedUsers;
+  private transient OzoneAdmins admins;
 
   public void initialize(DBStore store, DBCheckpointMetrics metrics,
                          boolean omAclEnabled,
                          Collection<String> allowedAdminUsers,
+                         Collection<String> allowedAdminGroups,
                          boolean isSpnegoAuthEnabled)
       throws ServletException {
 
@@ -82,7 +83,7 @@
     }
 
     this.aclEnabled = omAclEnabled;
-    this.allowedUsers = allowedAdminUsers;
+    this.admins = new OzoneAdmins(allowedAdminUsers, allowedAdminGroups);
     this.isSpnegoEnabled = isSpnegoAuthEnabled;
   }
 
@@ -90,9 +91,7 @@
     // Check ACL for dbCheckpoint only when global Ozone ACL and SPNEGO is
     // enabled
     if (aclEnabled && isSpnegoEnabled) {
-      return allowedUsers.contains(OZONE_ADMINISTRATORS_WILDCARD)
-          || allowedUsers.contains(user.getShortUserName())
-          || allowedUsers.contains(user.getUserName());
+      return admins.isAdmin(user);
     } else {
       return true;
     }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDBCheckpointServlet.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDBCheckpointServlet.java
index 7fa39ba..6bce670 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDBCheckpointServlet.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDBCheckpointServlet.java
@@ -60,6 +60,7 @@
         scm.getMetrics().getDBCheckpointMetrics(),
         false,
         Collections.emptyList(),
+        Collections.emptyList(),
         false);
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 68aafda..e43c28f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -77,6 +77,7 @@
 import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultProfile;
 import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
+import org.apache.hadoop.hdds.server.OzoneAdmins;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.server.events.EventExecutor;
 import org.apache.hadoop.hdds.server.events.FixedThreadPoolWithAffinityExecutor;
@@ -182,7 +183,6 @@
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
 import static org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore.CertType.VALID_CERTS;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
 import static org.apache.hadoop.ozone.OzoneConsts.CRL_SEQUENCE_ID_KEY;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_SUB_CA_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_ROOT_CA_COMPONENT_NAME;
@@ -248,7 +248,7 @@
   /**
    * SCM super user.
    */
-  private final Collection<String> scmAdminUsernames;
+  private final OzoneAdmins scmAdmins;
   /**
    * SCM mxbean.
    */
@@ -386,8 +386,8 @@
       securityProtocolServer = null;
     }
 
-    scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
-        .OZONE_ADMINISTRATORS);
+    Collection<String> scmAdminUsernames =
+        conf.getTrimmedStringCollection(OzoneConfigKeys.OZONE_ADMINISTRATORS);
     String scmShortUsername =
         UserGroupInformation.getCurrentUser().getShortUserName();
 
@@ -395,6 +395,12 @@
       scmAdminUsernames.add(scmShortUsername);
     }
 
+    Collection<String> scmAdminGroups =
+        conf.getTrimmedStringCollection(
+            OzoneConfigKeys.OZONE_ADMINISTRATORS_GROUPS);
+
+    scmAdmins = new OzoneAdmins(scmAdminUsernames, scmAdminGroups);
+
     datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this,
         eventQueue);
     blockProtocolServer = new SCMBlockProtocolServer(conf, this);
@@ -1814,10 +1820,7 @@
 
   public void checkAdminAccess(UserGroupInformation remoteUser)
       throws IOException {
-    if (remoteUser != null
-        && !scmAdminUsernames.contains(remoteUser.getUserName()) &&
-        !scmAdminUsernames.contains(remoteUser.getShortUserName()) &&
-        !scmAdminUsernames.contains(OZONE_ADMINISTRATORS_WILDCARD)) {
+    if (remoteUser != null && !scmAdmins.isAdmin(remoteUser)) {
       throw new AccessControlException(
           "Access denied for user " + remoteUser.getUserName() +
               ". Superuser privilege is required.");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
index b2576fe..4434b40 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
@@ -118,6 +118,7 @@
           scmMetrics.getDBCheckpointMetrics(),
           false,
           Collections.emptyList(),
+          Collections.emptyList(),
           false);
 
       HttpServletRequest requestMock = mock(HttpServletRequest.class);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
index ff7b321..90743f2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
@@ -33,6 +33,8 @@
 import java.nio.file.Paths;
 import java.security.Principal;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -176,6 +178,7 @@
         om.getMetrics().getDBCheckpointMetrics(),
         om.getAclsEnabled(),
         om.getOmAdminUsernames(),
+        om.getOmAdminGroups(),
         om.isSpnegoEnabled());
 
     doNothing().when(responseMock).setContentType("application/x-tgz");
@@ -211,7 +214,8 @@
     setupCluster();
 
     final OzoneManager om = cluster.getOzoneManager();
-    Collection<String> allowedUsers = om.getOmAdminUsernames();
+    Collection<String> allowedUsers =
+            new LinkedHashSet<>(om.getOmAdminUsernames());
     allowedUsers.add("recon");
 
     doCallRealMethod().when(omDbCheckpointServletMock).initialize(
@@ -219,6 +223,7 @@
         om.getMetrics().getDBCheckpointMetrics(),
         om.getAclsEnabled(),
         allowedUsers,
+        Collections.emptySet(),
         om.isSpnegoEnabled());
 
     omDbCheckpointServletMock.init();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index 6f66143..0f84139 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -30,6 +30,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.LinkedHashSet;
 
 /**
  * Provides the current checkpoint Snapshot of the OM DB. (tar.gz)
@@ -63,7 +64,9 @@
 
     OzoneConfiguration conf = om.getConfiguration();
     // Only Ozone Admins and Recon are allowed
-    Collection<String> allowedUsers = om.getOmAdminUsernames();
+    Collection<String> allowedUsers =
+            new LinkedHashSet<>(om.getOmAdminUsernames());
+    Collection<String> allowedGroups = om.getOmAdminGroups();
     ReconConfig reconConfig = conf.getObject(ReconConfig.class);
     String reconPrincipal = reconConfig.getKerberosPrincipal();
     if (!reconPrincipal.isEmpty()) {
@@ -76,6 +79,7 @@
         om.getMetrics().getDBCheckpointMetrics(),
         om.getAclsEnabled(),
         allowedUsers,
+        allowedGroups,
         om.isSpnegoEnabled());
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index e6ab0c3..b9d8b55 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -69,6 +69,7 @@
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.server.OzoneAdmins;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -225,7 +226,7 @@
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_GROUPS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FLEXIBLE_FQDN_RESOLUTION_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FLEXIBLE_FQDN_RESOLUTION_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX;
@@ -333,7 +334,7 @@
   /**
    * OM super user / admin list.
    */
-  private final Collection<String> omAdminUsernames;
+  private final OzoneAdmins omAdmins;
 
   private final OMMetrics metrics;
   private final ProtocolMessageMetrics<ProtocolMessageEnum>
@@ -572,7 +573,11 @@
         OMMultiTenantManager.checkAndEnableMultiTenancy(this, conf);
 
     // Get admin list
-    omAdminUsernames = getOzoneAdminsFromConfig(configuration);
+    Collection<String> omAdminUsernames =
+        getOzoneAdminsFromConfig(configuration);
+    Collection<String> omAdminGroups =
+        getOzoneAdminsGroupsFromConfig(configuration);
+    omAdmins = new OzoneAdmins(omAdminUsernames, omAdminGroups);
     instantiateServices(false);
 
     // Create special volume s3v which is required for S3G.
@@ -748,7 +753,7 @@
         authorizer.setBucketManager(bucketManager);
         authorizer.setKeyManager(keyManager);
         authorizer.setPrefixManager(prefixManager);
-        authorizer.setOzoneAdmins(omAdminUsernames);
+        authorizer.setOzoneAdmins(omAdmins);
         authorizer.setAllowListAllVolumes(allowListAllVolumes);
       }
     } else {
@@ -4069,11 +4074,19 @@
     return ozAdmins;
   }
 
+  Collection<String> getOzoneAdminsGroupsFromConfig(OzoneConfiguration conf) {
+    return conf.getTrimmedStringCollection(OZONE_ADMINISTRATORS_GROUPS);
+  }
+
   /**
    * Return the list of Ozone administrators in effect.
    */
   Collection<String> getOmAdminUsernames() {
-    return omAdminUsernames;
+    return omAdmins.getAdminUsernames();
+  }
+
+  public Collection<String> getOmAdminGroups() {
+    return omAdmins.getAdminGroups();
   }
 
   /**
@@ -4081,22 +4094,7 @@
    * @param callerUgi Caller UserGroupInformation
    */
   public boolean isAdmin(UserGroupInformation callerUgi) {
-    if (callerUgi == null) {
-      return false;
-    } else {
-      return isAdmin(callerUgi.getShortUserName())
-          || isAdmin(callerUgi.getUserName());
-    }
-  }
-
-  @VisibleForTesting
-  private boolean isAdmin(String username) {
-    if (omAdminUsernames == null) {
-      return false;
-    } else {
-      return omAdminUsernames.contains(OZONE_ADMINISTRATORS_WILDCARD) ||
-          omAdminUsernames.contains(username);
-    }
+    return callerUgi != null && omAdmins.isAdmin(callerUgi);
   }
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
index ebcdb52..224aa1e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
@@ -19,6 +19,7 @@
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
+import org.apache.hadoop.hdds.server.OzoneAdmins;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.BucketManager;
 import org.apache.hadoop.ozone.om.KeyManager;
@@ -29,11 +30,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Objects;
 
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 
 /**
@@ -50,7 +48,7 @@
   private BucketManager bucketManager;
   private KeyManager keyManager;
   private PrefixManager prefixManager;
-  private Collection<String> ozAdmins;
+  private OzoneAdmins ozAdmins;
   private boolean allowListAllVolumes;
 
   public OzoneNativeAuthorizer() {
@@ -58,7 +56,7 @@
 
   public OzoneNativeAuthorizer(VolumeManager volumeManager,
       BucketManager bucketManager, KeyManager keyManager,
-      PrefixManager prefixManager, Collection<String> ozoneAdmins) {
+      PrefixManager prefixManager, OzoneAdmins ozoneAdmins) {
     this.volumeManager = volumeManager;
     this.bucketManager = bucketManager;
     this.keyManager = keyManager;
@@ -200,12 +198,12 @@
     this.prefixManager = prefixManager;
   }
 
-  public void setOzoneAdmins(Collection<String> ozoneAdmins) {
+  public void setOzoneAdmins(OzoneAdmins ozoneAdmins) {
     this.ozAdmins = ozoneAdmins;
   }
 
-  public Collection<String> getOzoneAdmins() {
-    return Collections.unmodifiableCollection(this.ozAdmins);
+  public OzoneAdmins getOzoneAdmins() {
+    return ozAdmins;
   }
 
   public void setAllowListAllVolumes(boolean allowListAllVolumes) {
@@ -234,12 +232,6 @@
       return false;
     }
 
-    if (ozAdmins.contains(callerUgi.getShortUserName()) ||
-        ozAdmins.contains(callerUgi.getUserName()) ||
-        ozAdmins.contains(OZONE_ADMINISTRATORS_WILDCARD)) {
-      return true;
-    }
-
-    return false;
+    return ozAdmins.isAdmin(callerUgi);
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneAdministrators.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneAdministrators.java
index d876ca1..896415b 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneAdministrators.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneAdministrators.java
@@ -18,15 +18,16 @@
 
 package org.apache.hadoop.ozone.security.acl;
 
+import org.apache.hadoop.hdds.server.OzoneAdmins;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.Collections;
 
+import static java.util.Arrays.asList;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
 
 /**
@@ -43,47 +44,76 @@
 
   @Test
   public void testCreateVolume() throws Exception {
-    OzoneObj obj = getTestVolumeobj("testvolume");
-    RequestContext context = getUserRequestContext("testuser",
-        IAccessAuthorizer.ACLType.CREATE);
-    testAdminOperations(obj, context);
+    UserGroupInformation.createUserForTesting("testuser",
+        new String[]{"testgroup"});
+    try {
+      OzoneObj obj = getTestVolumeobj("testvolume");
+      RequestContext context = getUserRequestContext("testuser",
+          IAccessAuthorizer.ACLType.CREATE);
+      testAdminOperations(obj, context);
+      testGroupAdminOperations(obj, context);
+    } finally {
+      UserGroupInformation.reset();
+    }
   }
 
   @Test
   public void testListAllVolume() throws Exception {
-    OzoneObj obj = getTestVolumeobj("/");
-    RequestContext context = getUserRequestContext("testuser",
-        IAccessAuthorizer.ACLType.LIST);
-    testAdminOperations(obj, context);
+    UserGroupInformation.createUserForTesting("testuser",
+        new String[]{"testgroup"});
+    try {
+      OzoneObj obj = getTestVolumeobj("/");
+      RequestContext context = getUserRequestContext("testuser",
+          IAccessAuthorizer.ACLType.LIST);
+      testAdminOperations(obj, context);
+      testGroupAdminOperations(obj, context);
+    } finally {
+      UserGroupInformation.reset();
+    }
   }
 
   private void testAdminOperations(OzoneObj obj, RequestContext context)
       throws OMException {
-    nativeAuthorizer.setOzoneAdmins(Collections.emptyList());
+    nativeAuthorizer.setOzoneAdmins(new OzoneAdmins(Collections.emptyList()));
     Assert.assertFalse("empty admin list disallow anyone to perform " +
             "admin operations", nativeAuthorizer.checkAccess(obj, context));
 
-    nativeAuthorizer.setOzoneAdmins(
-        Collections.singletonList(OZONE_ADMINISTRATORS_WILDCARD));
+    nativeAuthorizer.setOzoneAdmins(new OzoneAdmins(
+        Collections.singletonList(OZONE_ADMINISTRATORS_WILDCARD)));
     Assert.assertTrue("wildcard admin allows everyone to perform admin" +
         " operations", nativeAuthorizer.checkAccess(obj, context));
 
-    nativeAuthorizer.setOzoneAdmins(
-        Collections.singletonList("testuser"));
+    nativeAuthorizer.setOzoneAdmins(new OzoneAdmins(
+        Collections.singletonList("testuser")));
     Assert.assertTrue("matching admins are allowed to perform admin " +
             "operations", nativeAuthorizer.checkAccess(obj, context));
 
-    nativeAuthorizer.setOzoneAdmins(
-        Arrays.asList(new String[]{"testuser2", "testuser"}));
+    nativeAuthorizer.setOzoneAdmins(new OzoneAdmins(
+        asList(new String[]{"testuser2", "testuser"})));
     Assert.assertTrue("matching admins are allowed to perform admin " +
             "operations", nativeAuthorizer.checkAccess(obj, context));
 
-    nativeAuthorizer.setOzoneAdmins(
-        Arrays.asList(new String[]{"testuser2", "testuser3"}));
+    nativeAuthorizer.setOzoneAdmins(new OzoneAdmins(
+        asList(new String[]{"testuser2", "testuser3"})));
     Assert.assertFalse("mismatching admins are not allowed perform " +
         "admin operations", nativeAuthorizer.checkAccess(obj, context));
   }
 
+  private void testGroupAdminOperations(OzoneObj obj, RequestContext context)
+      throws OMException {
+    nativeAuthorizer.setOzoneAdmins(
+        new OzoneAdmins(null, asList("testgroup", "anothergroup")));
+    Assert.assertTrue("Users from matching admin groups " +
+        "are allowed to perform admin operations",
+            nativeAuthorizer.checkAccess(obj, context));
+
+    nativeAuthorizer.setOzoneAdmins(
+            new OzoneAdmins(null, asList("wronggroup")));
+    Assert.assertFalse("Users from mismatching admin groups " +
+        "are allowed to perform admin operations",
+            nativeAuthorizer.checkAccess(obj, context));
+  }
+
   private RequestContext getUserRequestContext(String username,
                                                IAccessAuthorizer.ACLType type) {
     return RequestContext.newBuilder()
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
index cc4d775..2d34250 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.OzoneAdmins;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OzoneAcl;
@@ -159,7 +160,7 @@
     writeClient = omTestManagers.getWriteClient();
     nativeAuthorizer = new OzoneNativeAuthorizer(volumeManager, bucketManager,
         keyManager, prefixManager,
-        Collections.singletonList("om"));
+        new OzoneAdmins(Collections.singletonList("om")));
     adminUgi = UserGroupInformation.createUserForTesting("om",
         new String[]{"ozone"});
     testUgi = UserGroupInformation.createUserForTesting("testuser",
@@ -400,7 +401,8 @@
       String msg = "Acl to check:" + a1 + " accessType:" +
           accessType + " path:" + obj.getPath();
       if (a1.equals(CREATE) && obj.getResourceType().equals(VOLUME)) {
-        assertEquals(msg, nativeAuthorizer.getOzoneAdmins().contains(user),
+        assertEquals(msg, nativeAuthorizer.getOzoneAdmins()
+                         .getAdminUsernames().contains(user),
             nativeAuthorizer.checkAccess(obj,
                 builder.setAclRights(a1).build()));
       } else {
@@ -521,7 +523,7 @@
     allAcls.remove(NONE);
     RequestContext ctx = builder.build();
     boolean expectedResult = expectedAclResult;
-    if (nativeAuthorizer.getOzoneAdmins().contains(
+    if (nativeAuthorizer.getOzoneAdmins().getAdminUsernames().contains(
         ctx.getClientUgi().getUserName())) {
       expectedResult = true;
     }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java
index 82dd430..8e1c1f6 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.OzoneAdmins;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OzoneAcl;
@@ -109,7 +110,7 @@
     writeClient = omTestManagers.getWriteClient();
     nativeAuthorizer = new OzoneNativeAuthorizer(volumeManager, bucketManager,
         keyManager, prefixManager,
-        Collections.singletonList("om"));
+        new OzoneAdmins(Collections.singletonList("om")));
     adminUgi = UserGroupInformation.createUserForTesting("om",
         new String[]{"ozone"});
     testUgi = UserGroupInformation.createUserForTesting("testuser",
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
index 1eab30b..096d4ff 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.OzoneAdmins;
 import org.apache.hadoop.ozone.om.BucketManager;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -89,7 +90,7 @@
     writeClient = omTestManagers.getWriteClient();
     nativeAuthorizer = new OzoneNativeAuthorizer(volumeManager, bucketManager,
         keyManager, prefixManager,
-        Collections.singletonList("om"));
+        new OzoneAdmins(Collections.singletonList("om")));
 
     testUgi = UserGroupInformation.createUserForTesting("testuser",
         new String[]{"test"});
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/filters/ReconAdminFilter.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/filters/ReconAdminFilter.java
index 5e13144..ee6ee54 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/filters/ReconAdminFilter.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/filters/ReconAdminFilter.java
@@ -21,6 +21,7 @@
 import com.google.inject.Singleton;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.recon.ReconConfigKeys;
+import org.apache.hadoop.hdds.server.OzoneAdmins;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -38,8 +39,6 @@
 import java.security.Principal;
 import java.util.Collection;
 
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
-
 /**
  * Filter that can be applied to paths to only allow access by configured
  * admins.
@@ -106,8 +105,11 @@
         conf.getStringCollection(OzoneConfigKeys.OZONE_ADMINISTRATORS);
     admins.addAll(
         conf.getStringCollection(ReconConfigKeys.OZONE_RECON_ADMINISTRATORS));
-    return admins.contains(OZONE_ADMINISTRATORS_WILDCARD)
-        || admins.contains(user.getShortUserName())
-        || admins.contains(user.getUserName());
+    Collection<String> adminGroups =
+        conf.getStringCollection(OzoneConfigKeys.OZONE_ADMINISTRATORS_GROUPS);
+    adminGroups.addAll(
+        conf.getStringCollection(
+            ReconConfigKeys.OZONE_RECON_ADMINISTRATORS_GROUPS));
+    return new OzoneAdmins(admins, adminGroups).isAdmin(user);
   }
 }
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/filters/TestAdminFilter.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/filters/TestAdminFilter.java
index 21ea5e0..3be4293 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/filters/TestAdminFilter.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/filters/TestAdminFilter.java
@@ -28,6 +28,7 @@
 import org.apache.hadoop.ozone.recon.api.PipelineEndpoint;
 import org.apache.hadoop.ozone.recon.api.TaskStatusService;
 import org.apache.hadoop.ozone.recon.api.UtilizationEndpoint;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
@@ -102,6 +103,17 @@
     conf.setStrings(OzoneConfigKeys.OZONE_ADMINISTRATORS,
         OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD);
     testAdminFilterWithPrincipal(conf, "other", true);
+
+    UserGroupInformation.createUserForTesting("user1",
+        new String[]{"admingroup"});
+    try {
+      conf.setStrings(OzoneConfigKeys.OZONE_ADMINISTRATORS, "ozone");
+      conf.setStrings(OzoneConfigKeys.OZONE_ADMINISTRATORS_GROUPS,
+          "admingroup");
+      testAdminFilterWithPrincipal(conf, "user1", true);
+    } finally {
+      UserGroupInformation.reset();
+    }
   }
 
   @Test
@@ -114,6 +126,17 @@
     conf.setStrings(ReconConfigKeys.OZONE_RECON_ADMINISTRATORS,
         OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD);
     testAdminFilterWithPrincipal(conf, "other", true);
+
+    UserGroupInformation.createUserForTesting("user1",
+        new String[]{"reconadmingroup"});
+    try {
+      conf.setStrings(ReconConfigKeys.OZONE_RECON_ADMINISTRATORS, "recon");
+      conf.setStrings(ReconConfigKeys.OZONE_RECON_ADMINISTRATORS_GROUPS,
+          "reconadmingroup");
+      testAdminFilterWithPrincipal(conf, "user1", true);
+    } finally {
+      UserGroupInformation.reset();
+    }
   }
 
   @Test