Merge branch 'master' into HDDS-1880-Decom
diff --git a/README.md b/README.md
index ddf4c6e..a23bed1 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,7 @@
-Apache Hadoop Ozone
+Apache Ozone
 ===
 
-Ozone is a scalable, redundant, and distributed object store for Hadoop. Apart from scaling to billions of objects of varying sizes, Ozone can function effectively in containerized environments such as Kubernetes and YARN.
+Ozone is a scalable, redundant, and distributed object store for Hadoop and Cloud-native environments. Apart from scaling to billions of objects of varying sizes, Ozone can function effectively in containerized environments such as Kubernetes and YARN.
 
 
  * MULTI-PROTOCOL SUPPORT: Ozone supports different protocols like S3 and Hadoop File System APIs.
@@ -15,24 +15,22 @@
 
 The latest documentation is generated together with the releases and hosted on the apache side.
 
-Please check [the documentation page](https://hadoop.apache.org/ozone/docs/) for more information.
+Please check [the documentation page](https://ozone.apache.org/docs/) for more information.
 
 ## Contact
 
-Ozone is part of the [Apache Hadoop](https://hadoop.apache.org) project.
+Ozone is a top level project under the [Apache Software Foundation](https://apache.org)
 
- * Ozone [web page](https://hadoop.apache.org/ozone/) is available from the Hadoop site
+ * Ozone [web page](https://ozone.apache.org)
  * Mailing lists
-     * For dev questions use: [ozone-dev@hadoop.apache.org](https://lists.apache.org/list.html?ozone-dev@hadoop.apache.org)
-     * For user questions use: [user@hadoop.apache.org](https://lists.apache.org/list.html?user@hadoop.apache.org)
+     * For any questions use: [dev@ozone.apache.org](https://lists.apache.org/list.html?dev@ozone.apache.org)
  * Chat: You can find the #ozone channel on the official ASF slack. Invite link is [here](http://s.apache.org/slack-invite).
  * There are Open [Weekly calls](https://cwiki.apache.org/confluence/display/HADOOP/Ozone+Community+Calls) where you can ask anything about Ozone.
      * Past meeting notes are also available from the wiki.
 
-
 ## Download
 
-Latest release artifacts (source release and binary packages) are [available](https://hadoop.apache.org/ozone/downloads/) from the Ozone web page.
+Latest release artifacts (source release and binary packages) are [available](https://ozone.apache.org/downloads/) from the Ozone web page.
 
 ## Quick start
 
@@ -53,7 +51,7 @@
 
 ### Run Ozone from released artifact
 
-If you need a more realistic cluster, you can [download](https://hadoop.apache.org/ozone/downloads/) the last (binary) release package, and start a cluster with the help of docker-compose:
+If you need a more realistic cluster, you can [download](https://ozone.apache.org/downloads/) the latest (binary) release package, and start a cluster with the help of docker-compose:
 
 After you untar the binary:
 
@@ -95,4 +93,4 @@
 
 ## License
 
-The Apache Hadoop Ozone  project is licensed under the Apache 2.0 License. See the [LICENSE](./LICENSE.txt) file for details.
+The Apache Ozone project is licensed under the Apache 2.0 License. See the [LICENSE](./LICENSE.txt) file for details.
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index c7d5a28..3c1c6ec 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -47,6 +47,7 @@
    * DataNode's unique identifier in the cluster.
    */
   private final UUID uuid;
+  private final String uuidString;
 
   private String ipAddress;
   private String hostName;
@@ -84,6 +85,7 @@
       long persistedOpStateExpiryEpochSec) {
     super(hostName, networkLocation, NetConstants.NODE_COST_DEFAULT);
     this.uuid = uuid;
+    this.uuidString = uuid.toString();
     this.ipAddress = ipAddress;
     this.hostName = hostName;
     this.ports = ports;
@@ -100,6 +102,7 @@
     super(datanodeDetails.getHostName(), datanodeDetails.getNetworkLocation(),
         datanodeDetails.getCost());
     this.uuid = datanodeDetails.uuid;
+    this.uuidString = uuid.toString();
     this.ipAddress = datanodeDetails.ipAddress;
     this.hostName = datanodeDetails.hostName;
     this.ports = datanodeDetails.ports;
@@ -129,7 +132,7 @@
    * @return UUID of DataNode
    */
   public String getUuidString() {
-    return uuid.toString();
+    return uuidString;
   }
 
   /**
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
index 3084bb4..55d1de1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
@@ -74,6 +74,18 @@
   )
   private String pipelineChoosePolicyName;
 
+  @Config(key = "block.deletion.per-interval.max",
+      type = ConfigType.INT,
+      defaultValue = "10000",
+      tags = { ConfigTag.SCM, ConfigTag.DELETION},
+      description =
+          "Maximum number of blocks which SCM processes during an interval. "
+              + "If SCM has 100000 blocks which need to be deleted and the "
+              + "configuration is 5000 then it would only send 5000 blocks "
+              + "for deletion to the datanodes."
+  )
+  private int blockDeletionLimit;
+
   public void setKerberosPrincipal(String kerberosPrincipal) {
     this.principal = kerberosPrincipal;
   }
@@ -91,6 +103,10 @@
     this.pipelineChoosePolicyName = pipelineChoosePolicyName;
   }
 
+  public void setBlockDeletionLimit(int blockDeletionLimit) {
+    this.blockDeletionLimit = blockDeletionLimit;
+  }
+
   public String getKerberosPrincipal() {
     return this.principal;
   }
@@ -107,6 +123,10 @@
     return pipelineChoosePolicyName;
   }
 
+  public int getBlockDeletionLimit() {
+    return blockDeletionLimit;
+  }
+
   /**
    * Configuration strings class.
    * required for SCMSecurityProtocol where the KerberosInfo references
diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
index 385840a..3d1d689 100644
--- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
+++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
@@ -42,5 +42,6 @@
   STANDALONE,
   S3GATEWAY,
   DATANODE,
-  RECON
+  RECON,
+  DELETION
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
index cd1a376..13f953f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocolPB
@@ -27,6 +28,8 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.ZonedDateTime;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -51,6 +54,7 @@
   private VersionResponse version;
   private ZonedDateTime lastSuccessfulHeartbeat;
   private boolean isPassive;
+  private final ExecutorService executorService;
 
   /**
    * Constructs RPC Endpoints.
@@ -66,6 +70,11 @@
     state = EndPointStates.getInitState();
     lock = new ReentrantLock();
     this.conf = conf;
+    executorService = Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder()
+            .setNameFormat("EndpointStateMachine task thread for "
+                + this.address + " - %d ")
+            .build());
   }
 
   /**
@@ -130,6 +139,13 @@
   }
 
   /**
+   * Returns the endpoint specific ExecutorService.
+   */
+  public ExecutorService getExecutorService() {
+    return executorService;
+  }
+
+  /**
    * Closes the connection.
    *
    * @throws IOException
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
index b1c6090..fb05cb5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -17,11 +17,9 @@
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.ozone.container.common.statemachine
     .SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
@@ -68,17 +66,16 @@
     invocationCount.incrementAndGet();
     final long startTime = Time.monotonicNow();
     final DatanodeDetails dn = context.getParent().getDatanodeDetails();
-    final ClosePipelineCommandProto closeCommand =
-        ((ClosePipelineCommand)command).getProto();
-    final HddsProtos.PipelineID pipelineID = closeCommand.getPipelineID();
+    ClosePipelineCommand closePipelineCommand = (ClosePipelineCommand) command;
+    final PipelineID pipelineID = closePipelineCommand.getPipelineID();
 
     try {
       XceiverServerSpi server = ozoneContainer.getWriteChannel();
-      server.removeGroup(pipelineID);
-      LOG.info("Close Pipeline #{} command on datanode #{}.", pipelineID,
+      server.removeGroup(pipelineID.getProtobuf());
+      LOG.info("Close Pipeline {} command on datanode {}.", pipelineID,
           dn.getUuidString());
     } catch (IOException e) {
-      LOG.error("Can't close pipeline #{}", pipelineID, e);
+      LOG.error("Can't close pipeline {}", pipelineID, e);
     } finally {
       long endTime = Time.monotonicNow();
       totalTime += endTime - startTime;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
index 8f41fe9..4ad05de 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -19,12 +19,10 @@
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.List;
-import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -79,23 +77,19 @@
     final long startTime = Time.monotonicNow();
     final DatanodeDetails dn = context.getParent()
         .getDatanodeDetails();
-    final CreatePipelineCommandProto createCommand =
-        ((CreatePipelineCommand)command).getProto();
-    final HddsProtos.PipelineID pipelineID = createCommand.getPipelineID();
-    final List<DatanodeDetails> peers =
-        createCommand.getDatanodeList().stream()
-            .map(DatanodeDetails::getFromProtoBuf)
-            .collect(Collectors.toList());
+    final CreatePipelineCommand createCommand = (CreatePipelineCommand) command;
+    final PipelineID pipelineID = createCommand.getPipelineID();
+    final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
+    final List<DatanodeDetails> peers = createCommand.getNodeList();
     final List<Integer> priorityList = createCommand.getPriorityList();
 
     try {
       XceiverServerSpi server = ozoneContainer.getWriteChannel();
-      if (!server.isExist(pipelineID)) {
-        final RaftGroupId groupId = RaftGroupId.valueOf(
-            PipelineID.getFromProtobuf(pipelineID).getId());
+      if (!server.isExist(pipelineIdProto)) {
+        final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
         final RaftGroup group =
             RatisHelper.newRaftGroup(groupId, peers, priorityList);
-        server.addGroup(pipelineID, peers, priorityList);
+        server.addGroup(pipelineIdProto, peers, priorityList);
         peers.stream().filter(
             d -> !d.getUuid().equals(dn.getUuid()))
             .forEach(d -> {
@@ -109,11 +103,13 @@
                 LOG.warn("Add group failed for {}", d, ioe);
               }
             });
-        LOG.info("Created Pipeline {} {} #{}.",
-            createCommand.getType(), createCommand.getFactor(), pipelineID);
+        LOG.info("Created Pipeline {} {} {}.",
+            createCommand.getReplicationType(), createCommand.getFactor(),
+            pipelineID);
       }
     } catch (IOException e) {
-      LOG.error("Can't create pipeline {} {} #{}", createCommand.getType(),
+      LOG.error("Can't create pipeline {} {} {}",
+          createCommand.getReplicationType(),
           createCommand.getFactor(), pipelineID, e);
     } finally {
       long endTime = Time.monotonicNow();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
index b0cfb4c..7366650 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
@@ -140,7 +140,13 @@
     for (EndpointStateMachine endpoint : connectionManager.getValues()) {
       Callable<EndPointStates> endpointTask = getEndPointTask(endpoint);
       if (endpointTask != null) {
-        ecs.submit(endpointTask);
+        // Just do a timely wait. A slow EndpointStateMachine won't occupy
+        // the thread in executor from DatanodeStateMachine for a long time,
+        // so that it won't affect the communication between datanode and
+        // other EndpointStateMachine.
+        ecs.submit(() -> endpoint.getExecutorService()
+            .submit(endpointTask)
+            .get(context.getHeartbeatFrequency(), TimeUnit.MILLISECONDS));
       } else {
         // This can happen if a task is taking more time than the timeOut
         // specified for the task in await, and when it is completed the task
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 0b0756c..9bc2eb2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -143,6 +143,7 @@
   private Map<RaftGroupId, Boolean> groupLeaderMap = new ConcurrentHashMap<>();
   // Timeout used while calling submitRequest directly.
   private long requestTimeout;
+  private boolean shouldDeleteRatisLogDirectory;
 
   /**
    * Maintains a list of active volumes per StorageType.
@@ -163,6 +164,12 @@
     this.containerController = containerController;
     this.raftPeerId = RatisHelper.toRaftPeerId(dd);
     chunkExecutors = createChunkExecutors(conf);
+    nodeFailureTimeoutMs =
+            conf.getObject(DatanodeRatisServerConfig.class)
+                    .getFollowerSlownessTimeout();
+    shouldDeleteRatisLogDirectory =
+            conf.getObject(DatanodeRatisServerConfig.class)
+                    .shouldDeleteRatisLogDirectory();
 
     RaftServer.Builder builder =
         RaftServer.newBuilder().setServerId(raftPeerId)
@@ -213,6 +220,21 @@
         TimeDuration.valueOf(duration, timeUnit);
     RaftServerConfigKeys.Log.StateMachineData
         .setSyncTimeout(properties, dataSyncTimeout);
+    // typically a pipeline close will be initiated after a node failure
+    // timeout from Ratis in case a follower does not respond.
+    // By this time, all the writeStateMachine calls should be stopped
+    // and IOs should fail.
+    // Even if the leader is not able to complete write calls within
+    // the timeout seconds, it should just fail the operation and trigger
+    // pipeline close. failing the writeStateMachine call with limited retries
+    // will ensure even the leader initiates a pipeline close if its not
+    // able to complete write in the timeout configured.
+
+    // NOTE : the default value for the retry count in ratis is -1,
+    // which means retry indefinitely.
+    RaftServerConfigKeys.Log.StateMachineData
+            .setSyncTimeoutRetry(properties, (int) nodeFailureTimeoutMs /
+                    dataSyncTimeout.toIntExact(TimeUnit.MILLISECONDS));
 
     // set timeout for a retry cache entry
     setTimeoutForRetryCache(properties);
@@ -223,9 +245,6 @@
     // Set the maximum cache segments
     RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2);
 
-    // set the node failure timeout
-    setNodeFailureTimeout(properties);
-
     // Set the ratis storage directory
     Collection<String> storageDirPaths =
             HddsServerUtil.getOzoneDatanodeRatisDirectory(conf);
@@ -301,13 +320,6 @@
     return properties;
   }
 
-  private void setNodeFailureTimeout(RaftProperties properties) {
-    nodeFailureTimeoutMs =
-        conf.getObject(DatanodeRatisServerConfig.class)
-            .getFollowerSlownessTimeout();
-
-  }
-
   private void setRatisLeaderElectionTimeout(RaftProperties properties) {
     long duration;
     TimeUnit leaderElectionMinTimeoutUnit =
@@ -747,10 +759,13 @@
   @Override
   public void removeGroup(HddsProtos.PipelineID pipelineId)
       throws IOException {
+    // if shouldDeleteRatisLogDirectory is set to false, the raft log
+    // directory will be renamed and kept aside for debugging.
+    // In case, its set to true, the raft log directory will be removed
     GroupManagementRequest request = GroupManagementRequest.newRemove(
         clientId, server.getId(), nextCallId(),
         RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId()),
-        true, false);
+        shouldDeleteRatisLogDirectory, !shouldDeleteRatisLogDirectory);
 
     RaftClientReply reply;
     try {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index 74733ac..cdab0fd 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.container.replication;
 
 import java.io.FileInputStream;
+import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
@@ -43,7 +44,7 @@
  */
 public class DownloadAndImportReplicator implements ContainerReplicator {
 
-  private static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(DownloadAndImportReplicator.class);
 
   private final ContainerSet containerSet;
@@ -65,7 +66,8 @@
     this.packer = packer;
   }
 
-  public void importContainer(long containerID, Path tarFilePath) {
+  public void importContainer(long containerID, Path tarFilePath)
+      throws IOException {
     try {
       ContainerData originalContainerData;
       try (FileInputStream tempContainerTarStream = new FileInputStream(
@@ -85,10 +87,6 @@
         containerSet.addContainer(container);
       }
 
-    } catch (Exception e) {
-      LOG.error(
-          "Can't import the downloaded container data id=" + containerID,
-          e);
     } finally {
       try {
         Files.delete(tarFilePath);
@@ -122,7 +120,7 @@
       LOG.info("Container {} is replicated successfully", containerID);
       task.setStatus(Status.DONE);
     } catch (Exception e) {
-      LOG.error("Container replication was unsuccessful .", e);
+      LOG.error("Container {} replication was unsuccessful.", containerID, e);
       task.setStatus(Status.FAILED);
     }
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
index 6fdb4ce..44a883b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
@@ -139,4 +139,20 @@
   public PipelineID getPipelineID() {
     return pipelineID;
   }
+
+  public List<DatanodeDetails> getNodeList() {
+    return nodelist;
+  }
+
+  public List<Integer> getPriorityList() {
+    return priorityList;
+  }
+
+  public ReplicationType getReplicationType() {
+    return type;
+  }
+
+  public ReplicationFactor getFactor() {
+    return factor;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index b7415fa..df9ffbc 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -18,9 +18,12 @@
 
 package org.apache.hadoop.ozone.container.replication;
 
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -33,12 +36,14 @@
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
 
 import javax.annotation.Nonnull;
 
@@ -173,6 +178,36 @@
     }
   }
 
+  @Test
+  public void testDownloadAndImportReplicatorFailure() {
+    ReplicationSupervisor supervisor =
+        new ReplicationSupervisor(set, mutableReplicator,
+            newDirectExecutorService());
+
+    // Mock to fetch an exception in the importContainer method.
+    SimpleContainerDownloader moc =
+        Mockito.mock(SimpleContainerDownloader.class);
+    CompletableFuture<Path> res = new CompletableFuture<>();
+    res.complete(Paths.get("file:/tmp/no-such-file"));
+    Mockito.when(
+        moc.getContainerDataFromReplicas(Mockito.anyLong(), Mockito.anyList()))
+        .thenReturn(res);
+
+    ContainerReplicator replicatorFactory =
+        new DownloadAndImportReplicator(set, null, moc, null);
+
+    replicatorRef.set(replicatorFactory);
+
+    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+        .captureLogs(DownloadAndImportReplicator.LOG);
+
+    supervisor.addTask(new ReplicationTask(1L, emptyList()));
+    Assert.assertEquals(1, supervisor.getReplicationFailureCount());
+    Assert.assertEquals(0, supervisor.getReplicationSuccessCount());
+    Assert.assertTrue(logCapturer.getOutput()
+        .contains("Container 1 replication was unsuccessful."));
+  }
+
   private ReplicationSupervisor supervisorWithSuccessfulReplicator() {
     return supervisorWith(FakeReplicator::new, newDirectExecutorService());
   }
diff --git a/hadoop-hdds/docs/content/feature/Quota.md b/hadoop-hdds/docs/content/feature/Quota.md
new file mode 100644
index 0000000..5be9f4d
--- /dev/null
+++ b/hadoop-hdds/docs/content/feature/Quota.md
@@ -0,0 +1,74 @@
+---
+title: "Quota in Ozone"
+date: "2020-October-22"
+weight: 4
+summary: Quota in Ozone
+icon: user
+menu:
+   main:
+      parent: Features
+summary: Introduction to Ozone Quota
+---
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+So far, we know that Ozone allows users to create volumes, buckets, and keys. A Volume usually contains several buckets, and each Bucket also contains a certain number of keys. Obviously, it should allow the user to define quotas (for example, how many buckets can be created under a Volume or how much space can be used by a Bucket), which is a common requirement for storage systems.
+
+## Currently supported
+1. Storage Space level quota
+
+Administrators should be able to define how much storage space a Volume or Bucket can use.
+
+## Client usage
+### Storage Space level quota
+Storage space level quotas allow the use of units such as KB (k), MB (m), GB (g), TB (t), PB (p), etc. Represents how much storage Spaces will be used.
+
+#### Volume Storage Space level quota
+```shell
+bin/ozone sh volume create --space-quota 5MB /volume1
+```
+This means setting the storage space of Volume1 to 5MB
+
+```shell
+bin/ozone sh volume setquota --space-quota 10GB /volume1
+```
+This behavior changes the quota of Volume1 to 10GB.
+
+#### Bucket Storage Space level quota
+```shell
+bin/ozone sh bucket create --space-quota 5MB /volume1/bucket1
+```
+That means bucket1 allows us to use 5MB of storage.
+
+```shell
+bin/ozone sh bucket setquota  --space-quota 10GB /volume1/bucket1 
+```
+This behavior changes the quota for Bucket1 to 10GB
+
+A bucket quota should not be greater than its Volume quota. Let's look at an example. If we have a 10MB Volume and create five buckets under that Volume with a quota of 5MB, the total quota is 25MB. In this case, the bucket creation will always succeed, and we check the quota for bucket and volume when the data is actually written. Each write needs to check whether the current bucket is exceeding the limit and the current total volume usage is exceeding the limit.
+
+#### Clear the quota for Volume1. The Bucket cleanup command is similar.
+```shell
+bin/ozone sh volume clrquota --space-quota /volume1
+```
+
+#### Check quota and usedBytes for volume and bucket
+```shell
+bin/ozone sh volume info /volume1
+bin/ozone sh bucket info /volume1/bucket1
+```
+We can get the quota value and usedBytes in the info of volume and bucket.
\ No newline at end of file
diff --git a/hadoop-hdds/docs/content/feature/Quota.zh.md b/hadoop-hdds/docs/content/feature/Quota.zh.md
new file mode 100644
index 0000000..9cdb221
--- /dev/null
+++ b/hadoop-hdds/docs/content/feature/Quota.zh.md
@@ -0,0 +1,67 @@
+---
+title: "Ozone中的配额"
+date: "2020-October-22"
+weight: 4
+summary: Ozone中的配额
+icon: user
+---
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+ 目前,我们知道Ozone允许用户创建Volume,Bucket和Key。一个Volume通常包含几个Bucket,每个Bucket也包含一定数量的Key。显然,它应该允许用户定义配额(例如,在一个Volume下可以创建多少个Bucket或一个Bucket可以使用多少空间),这是存储系统的常见要求。
+## 目前支持的
+1. Storage space级别配额
+
+ 管理员应该能够定义一个Volume或Bucket可以使用多少存储空间。
+
+## 客户端用法
+### Storage space级别配额
+ Storage space级别配额允许使用 KB(k),MB(m),GB(g),TB(t), PB(p)等单位。表示将使用多少个存储空间。
+#### Volume Space quota用法
+```shell
+bin/ozone sh volume create --space-quota 5MB /volume1
+```
+ 这意味着将volume1的存储空间设置为5MB
+
+```shell
+bin/ozone sh volume setquota --space-quota 10GB /volume1
+```
+ 此行为将volume1的配额更改为10GB。
+
+#### Bucket Space quota 用法
+```shell
+bin/ozone sh bucket create --space-quota 5MB /volume1/bucket1
+```
+ 这意味着bucket1允许我们使用5MB的存储空间。
+
+```shell
+bin/ozone sh bucket setquota  --space-quota 10GB /volume1/bucket1 
+```
+ 该行为将bucket1的配额更改为10GB
+
+一个bucket配额 不应大于其Volume的配额。让我们看一个例子,如果我们有一个10MB的Volume,并在该Volume下创建5个Bucket,配额为5MB,则总配额为25MB。在这种情况下,创建存储桶将始终成功,我们会在数据真正写入时检查bucket和volume的quota。每次写入需要检查当前bucket的是否超上限,当前总的volume使用量是否超上限。
+
+#### 清除Volume1的配额, Bucket清除命令与此类似
+```shell
+bin/ozone sh volume clrquota --space-quota /volume1
+```
+#### 查看volume和bucket的quota值以及usedBytes
+```shell
+bin/ozone sh volume info /volume1
+bin/ozone sh bucket info /volume1/bucket1
+```
+我们能够在volume和bucket的info中查看quota及usedBytes的值
diff --git a/hadoop-hdds/docs/content/tools/Admin.md b/hadoop-hdds/docs/content/tools/Admin.md
index d99423a..a05065e 100644
--- a/hadoop-hdds/docs/content/tools/Admin.md
+++ b/hadoop-hdds/docs/content/tools/Admin.md
@@ -25,10 +25,10 @@
 And quick overview about the available functionalities:
 
  * `ozone admin safemode`: You can check the safe mode status and force to leave/enter from/to safemode,  `--verbose` option will print validation status of all rules that evaluate safemode status.
- * `ozone admin container`: Containers are the unit of the replicaiton. The subcommands can help to debug the current state of the containers (list/get/create/...)
+ * `ozone admin container`: Containers are the unit of the replication. The subcommands can help to debug the current state of the containers (list/get/create/...)
  * `ozone admin pipeline`: Can help to check the available pipelines (datanode sets)
  * `ozone admin datanode`: Provides information about the datanode
- * `ozone admin printTopology`: display the rack-awarness related information
+ * `ozone admin printTopology`: display the rack-awareness related information
  * `ozone admin replicationmanager`: Can be used to check the status of the replications (and start / stop replication in case of emergency).
  * `ozone admin om`: Ozone Manager HA related tool to get information about the current cluster.
 
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
index b0034ee..19084f1 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
@@ -122,4 +122,21 @@
   public void setLeaderNumPendingRequests(int leaderNumPendingRequests) {
     this.leaderNumPendingRequests = leaderNumPendingRequests;
   }
+
+  @Config(key = "delete.ratis.log.directory",
+          defaultValue = "true",
+          type = ConfigType.BOOLEAN,
+          tags = {OZONE, DATANODE, RATIS},
+          description = "Flag to indicate whether ratis log directory will be" +
+                  "cleaned up during pipeline remove."
+  )
+  private boolean shouldDeleteRatisLogDirectory;
+
+  public boolean shouldDeleteRatisLogDirectory() {
+    return shouldDeleteRatisLogDirectory;
+  }
+
+  public void setLeaderNumPendingRequests(boolean delete) {
+    this.shouldDeleteRatisLogDirectory = delete;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index dca1529..25bf350 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -16,121 +16,65 @@
  */
 package org.apache.hadoop.hdds.scm.block;
 
-import com.google.common.collect.ArrayListMultimap;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 
-import java.io.IOException;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Set;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-
 /**
  * A wrapper class to hold info about datanode and all deleted block
  * transactions that will be sent to this datanode.
  */
-public class DatanodeDeletedBlockTransactions {
-  private int nodeNum;
-  // The throttle size for each datanode.
-  private int maximumAllowedTXNum;
-  // Current counter of inserted TX.
-  private int currentTXNum;
-  private ContainerManager containerManager;
+class DatanodeDeletedBlockTransactions {
   // A list of TXs mapped to a certain datanode ID.
-  private final ArrayListMultimap<UUID, DeletedBlocksTransaction>
-      transactions;
+  private final Map<UUID, List<DeletedBlocksTransaction>> transactions =
+      new HashMap<>();
+  // counts blocks deleted across datanodes. Blocks deleted will be counted
+  // for all the replicas and may not be unique.
+  private int blocksDeleted = 0;
+  private final Map<Long, Long> containerIdToTxnId = new HashMap<>();
 
-  DatanodeDeletedBlockTransactions(ContainerManager containerManager,
-      int maximumAllowedTXNum, int nodeNum) {
-    this.transactions = ArrayListMultimap.create();
-    this.containerManager = containerManager;
-    this.maximumAllowedTXNum = maximumAllowedTXNum;
-    this.nodeNum = nodeNum;
+  DatanodeDeletedBlockTransactions() {
   }
 
-  public boolean addTransaction(DeletedBlocksTransaction tx,
-      Set<UUID> dnsWithTransactionCommitted) {
-    try {
-      boolean success = false;
-      final ContainerID id = ContainerID.valueof(tx.getContainerID());
-      final ContainerInfo container = containerManager.getContainer(id);
-      final Set<ContainerReplica> replicas = containerManager
-          .getContainerReplicas(id);
-      if (!container.isOpen()) {
-        for (ContainerReplica replica : replicas) {
-          UUID dnID = replica.getDatanodeDetails().getUuid();
-          if (dnsWithTransactionCommitted == null ||
-              !dnsWithTransactionCommitted.contains(dnID)) {
-            // Transaction need not be sent to dns which have
-            // already committed it
-            success = addTransactionToDN(dnID, tx);
-          }
-        }
-      }
-      return success;
-    } catch (IOException e) {
-      SCMBlockDeletingService.LOG.warn("Got container info error.", e);
-      return false;
+  void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
+    transactions.computeIfAbsent(dnID, k -> new LinkedList<>()).add(tx);
+    containerIdToTxnId.put(tx.getContainerID(), tx.getTxID());
+    blocksDeleted += tx.getLocalIDCount();
+    if (SCMBlockDeletingService.LOG.isDebugEnabled()) {
+      SCMBlockDeletingService.LOG
+          .debug("Transaction added: {} <- TX({})", dnID, tx.getTxID());
     }
   }
 
-  private boolean addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
-    List<DeletedBlocksTransaction> txs = transactions.get(dnID);
-    if (txs == null || txs.size() < maximumAllowedTXNum) {
-      currentTXNum++;
-      if (txs == null) {
-        transactions.put(dnID, tx);
-      } else {
-        txs.add(tx);
-      }
-      if (SCMBlockDeletingService.LOG.isDebugEnabled()) {
-        SCMBlockDeletingService.LOG
-            .debug("Transaction added: {} <- TX({})", dnID, tx.getTxID());
-      }
-      return true;
-    }
-    return false;
+  Map<UUID, List<DeletedBlocksTransaction>> getDatanodeTransactionMap() {
+    return transactions;
   }
 
-  Set<UUID> getDatanodeIDs() {
-    return transactions.keySet();
+  Map<Long, Long> getContainerIdToTxnIdMap() {
+    return containerIdToTxnId;
+  }
+
+  int getBlocksDeleted() {
+    return blocksDeleted;
+  }
+
+  List<String> getTransactionIDList(UUID dnId) {
+    return Optional.ofNullable(transactions.get(dnId))
+        .orElse(new LinkedList<>())
+        .stream()
+        .map(DeletedBlocksTransaction::getTxID)
+        .map(String::valueOf)
+        .collect(Collectors.toList());
   }
 
   boolean isEmpty() {
     return transactions.isEmpty();
   }
-
-  boolean hasTransactions(UUID dnId) {
-    return transactions.containsKey(dnId) &&
-        !transactions.get(dnId).isEmpty();
-  }
-
-  List<DeletedBlocksTransaction> getDatanodeTransactions(UUID dnId) {
-    return transactions.get(dnId);
-  }
-
-  List<String> getTransactionIDList(UUID dnId) {
-    if (hasTransactions(dnId)) {
-      return transactions.get(dnId).stream()
-          .map(DeletedBlocksTransaction::getTxID).map(String::valueOf)
-          .collect(Collectors.toList());
-    } else {
-      return Collections.emptyList();
-    }
-  }
-
-  boolean isFull() {
-    return currentTXNum >= maximumAllowedTXNum * nodeNum;
-  }
-
-  int getTXNum() {
-    return currentTXNum;
-  }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
index db6c1c5..9a5d74f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -41,11 +41,11 @@
    * Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions.
    * Once DatanodeDeletedBlockTransactions is full, the scan behavior will
    * stop.
-   * @param transactions a list of TXs will be set into.
+   * @param blockDeletionLimit Maximum number of blocks to fetch
    * @return Mapping from containerId to latest transactionId for the container.
    * @throws IOException
    */
-  Map<Long, Long> getTransactions(DatanodeDeletedBlockTransactions transactions)
+  DatanodeDeletedBlockTransactions getTransactions(int blockDeletionLimit)
       throws IOException;
 
   /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index edd3d4a..22f4834 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdds.scm.block;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -200,7 +199,7 @@
             LOG.debug("Transaction txId={} commit by dnId={} for containerID={}"
                     + " failed. Corresponding entry not found.", txID, dnID,
                 containerId);
-            return;
+            continue;
           }
 
           dnsWithCommittedTxn.add(dnID);
@@ -323,31 +322,54 @@
   public void close() throws IOException {
   }
 
-  @Override
-  public Map<Long, Long> getTransactions(
-      DatanodeDeletedBlockTransactions transactions) throws IOException {
-    lock.lock();
+  private void getTransaction(DeletedBlocksTransaction tx,
+      DatanodeDeletedBlockTransactions transactions) {
     try {
-      Map<Long, Long> deleteTransactionMap = new HashMap<>();
-      try (TableIterator<Long,
-          ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
-               scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
-        while (iter.hasNext()) {
-          Table.KeyValue<Long, DeletedBlocksTransaction> keyValue =
-              iter.next();
-          DeletedBlocksTransaction block = keyValue.getValue();
-          if (block.getCount() > -1 && block.getCount() <= maxRetry) {
-            if (transactions.addTransaction(block,
-                transactionToDNsCommitMap.get(block.getTxID()))) {
-              deleteTransactionMap.put(block.getContainerID(),
-                  block.getTxID());
-              transactionToDNsCommitMap
-                  .putIfAbsent(block.getTxID(), new LinkedHashSet<>());
-            }
+      final ContainerID id = ContainerID.valueof(tx.getContainerID());
+      if (!containerManager.getContainer(id).isOpen()) {
+        Set<ContainerReplica> replicas = containerManager
+            .getContainerReplicas(ContainerID.valueof(tx.getContainerID()));
+        for (ContainerReplica replica : replicas) {
+          UUID dnID = replica.getDatanodeDetails().getUuid();
+          Set<UUID> dnsWithTransactionCommitted =
+              transactionToDNsCommitMap.get(tx.getTxID());
+          if (dnsWithTransactionCommitted == null
+              || !dnsWithTransactionCommitted.contains(dnID)) {
+            // Transaction need not be sent to dns which have
+            // already committed it
+            transactions.addTransactionToDN(dnID, tx);
           }
         }
       }
-      return deleteTransactionMap;
+    } catch (IOException e) {
+      LOG.warn("Got container info error.", e);
+    }
+  }
+
+  @Override
+  public DatanodeDeletedBlockTransactions getTransactions(
+      int blockDeletionLimit) throws IOException {
+    lock.lock();
+    try {
+      DatanodeDeletedBlockTransactions transactions =
+          new DatanodeDeletedBlockTransactions();
+      try (TableIterator<Long,
+          ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+               scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+        int numBlocksAdded = 0;
+        while (iter.hasNext() && numBlocksAdded < blockDeletionLimit) {
+          Table.KeyValue<Long, DeletedBlocksTransaction> keyValue =
+              iter.next();
+          DeletedBlocksTransaction txn = keyValue.getValue();
+          if (txn.getCount() > -1 && txn.getCount() <= maxRetry) {
+            numBlocksAdded += txn.getLocalIDCount();
+            getTransaction(txn, transactions);
+            transactionToDNsCommitMap
+                .putIfAbsent(txn.getTxID(), new LinkedHashSet<>());
+          }
+        }
+      }
+      return transactions;
     } finally {
       lock.unlock();
     }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 01b0715..859a732 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -40,8 +41,6 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,24 +56,12 @@
   public static final Logger LOG =
       LoggerFactory.getLogger(SCMBlockDeletingService.class);
 
-  // ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
-  private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 2;
+  private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 1;
   private final DeletedBlockLog deletedBlockLog;
   private final ContainerManager containerManager;
   private final NodeManager nodeManager;
   private final EventPublisher eventPublisher;
 
-  // Block delete limit size is dynamically calculated based on container
-  // delete limit size (ozone.block.deleting.container.limit.per.interval)
-  // that configured for datanode. To ensure DN not wait for
-  // delete commands, we use this value multiply by a factor 2 as the final
-  // limit TX size for each node.
-  // Currently we implement a throttle algorithm that throttling delete blocks
-  // for each datanode. Each node is limited by the calculation size. Firstly
-  // current node info is fetched from nodemanager, then scan entire delLog
-  // from the beginning to end. If one node reaches maximum value, its records
-  // will be skipped. If not, keep scanning until it reaches maximum value.
-  // Once all node are full, the scan behavior will stop.
   private int blockDeleteLimitSize;
 
   public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
@@ -88,14 +75,10 @@
     this.nodeManager = nodeManager;
     this.eventPublisher = eventPublisher;
 
-    int containerLimit = conf.getInt(
-        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
-        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
-    Preconditions.checkArgument(containerLimit > 0,
-        "Container limit size should be " + "positive.");
-    // Use container limit value multiply by a factor 2 to ensure DN
-    // not wait for orders.
-    this.blockDeleteLimitSize = containerLimit * 2;
+    blockDeleteLimitSize =
+        conf.getObject(ScmConfig.class).getBlockDeletionLimit();
+    Preconditions.checkArgument(blockDeleteLimitSize > 0,
+        "Block deletion limit should be " + "positive.");
   }
 
   @Override
@@ -105,16 +88,18 @@
     return queue;
   }
 
-  public void handlePendingDeletes(PendingDeleteStatusList deletionStatusList) {
+  void handlePendingDeletes(PendingDeleteStatusList deletionStatusList) {
     DatanodeDetails dnDetails = deletionStatusList.getDatanodeDetails();
     for (PendingDeleteStatusList.PendingDeleteStatus deletionStatus :
         deletionStatusList.getPendingDeleteStatuses()) {
-      LOG.debug(
-          "Block deletion txnID lagging in datanode {} for containerID {}."
-              + " Datanode delete txnID: {}, SCM txnID: {}",
-          dnDetails.getUuid(), deletionStatus.getContainerId(),
-          deletionStatus.getDnDeleteTransactionId(),
-          deletionStatus.getScmDeleteTransactionId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Block deletion txnID lagging in datanode {} for containerID {}."
+                + " Datanode delete txnID: {}, SCM txnID: {}",
+            dnDetails.getUuid(), deletionStatus.getContainerId(),
+            deletionStatus.getDnDeleteTransactionId(),
+            deletionStatus.getScmDeleteTransactionId());
+      }
     }
   }
 
@@ -127,65 +112,68 @@
 
     @Override
     public EmptyTaskResult call() throws Exception {
-      int dnTxCount = 0;
       long startTime = Time.monotonicNow();
       // Scan SCM DB in HB interval and collect a throttled list of
       // to delete blocks.
-      LOG.debug("Running DeletedBlockTransactionScanner");
-      DatanodeDeletedBlockTransactions transactions = null;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Running DeletedBlockTransactionScanner");
+      }
       // TODO - DECOMM - should we be deleting blocks from decom nodes
       //        and what about entering maintenance.
       List<DatanodeDetails> datanodes =
           nodeManager.getNodes(NodeStatus.inServiceHealthy());
-      Map<Long, Long> transactionMap = null;
       if (datanodes != null) {
-        transactions = new DatanodeDeletedBlockTransactions(containerManager,
-            blockDeleteLimitSize, datanodes.size());
         try {
-          transactionMap = deletedBlockLog.getTransactions(transactions);
+          DatanodeDeletedBlockTransactions transactions =
+              deletedBlockLog.getTransactions(blockDeleteLimitSize);
+          Map<Long, Long> containerIdToMaxTxnId =
+              transactions.getContainerIdToTxnIdMap();
+
+          if (transactions.isEmpty()) {
+            return EmptyTaskResult.newResult();
+          }
+
+          for (Map.Entry<UUID, List<DeletedBlocksTransaction>> entry :
+              transactions.getDatanodeTransactionMap().entrySet()) {
+            UUID dnId = entry.getKey();
+            List<DeletedBlocksTransaction> dnTXs = entry.getValue();
+            if (!dnTXs.isEmpty()) {
+              // TODO commandQueue needs a cap.
+              // We should stop caching new commands if num of un-processed
+              // command is bigger than a limit, e.g 50. In case datanode goes
+              // offline for sometime, the cached commands be flooded.
+              eventPublisher.fireEvent(SCMEvents.RETRIABLE_DATANODE_COMMAND,
+                  new CommandForDatanode<>(dnId,
+                      new DeleteBlocksCommand(dnTXs)));
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                    "Added delete block command for datanode {} in the queue,"
+                        + " number of delete block transactions: {}{}", dnId,
+                    dnTXs.size(), LOG.isTraceEnabled() ?
+                        ", TxID list: " + String.join(",",
+                            transactions.getTransactionIDList(dnId)) : "");
+              }
+            }
+          }
+
+          containerManager.updateDeleteTransactionId(containerIdToMaxTxnId);
+          LOG.info("Totally added {} blocks to be deleted for"
+                  + " {} datanodes, task elapsed time: {}ms",
+              transactions.getBlocksDeleted(),
+              transactions.getDatanodeTransactionMap().size(),
+              Time.monotonicNow() - startTime);
         } catch (IOException e) {
-          // We may tolerant a number of failures for sometime
+          // We may tolerate a number of failures for sometime
           // but if it continues to fail, at some point we need to raise
           // an exception and probably fail the SCM ? At present, it simply
           // continues to retry the scanning.
           LOG.error("Failed to get block deletion transactions from delTX log",
               e);
+          return EmptyTaskResult.newResult();
         }
-        LOG.debug("Scanned deleted blocks log and got {} delTX to process.",
-            transactions.getTXNum());
       }
 
-      if (transactions != null && !transactions.isEmpty()) {
-        for (UUID dnId : transactions.getDatanodeIDs()) {
-          List<DeletedBlocksTransaction> dnTXs = transactions
-              .getDatanodeTransactions(dnId);
-          if (dnTXs != null && !dnTXs.isEmpty()) {
-            dnTxCount += dnTXs.size();
-            // TODO commandQueue needs a cap.
-            // We should stop caching new commands if num of un-processed
-            // command is bigger than a limit, e.g 50. In case datanode goes
-            // offline for sometime, the cached commands be flooded.
-            eventPublisher.fireEvent(SCMEvents.RETRIABLE_DATANODE_COMMAND,
-                new CommandForDatanode<>(dnId, new DeleteBlocksCommand(dnTXs)));
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(
-                  "Added delete block command for datanode {} in the queue," +
-                      " number of delete block transactions: {}, TxID list: {}",
-                  dnId, dnTXs.size(), String.join(",",
-                      transactions.getTransactionIDList(dnId)));
-            }
-          }
-        }
-        containerManager.updateDeleteTransactionId(transactionMap);
-      }
-
-      if (dnTxCount > 0) {
-        LOG.info(
-            "Totally added {} delete blocks command for"
-                + " {} datanodes, task elapsed time: {}ms",
-            dnTxCount, transactions.getDatanodeIDs().size(),
-            Time.monotonicNow() - startTime);
-      }
 
       return EmptyTaskResult.newResult();
     }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index d4e2553..8f2b3f5 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdds.scm.block;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -62,6 +61,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
@@ -79,6 +79,7 @@
 public class TestDeletedBlockLog {
 
   private static DeletedBlockLogImpl deletedBlockLog;
+  private static final int BLOCKS_PER_TXN = 5;
   private OzoneConfiguration conf;
   private File testDir;
   private ContainerManager containerManager;
@@ -146,8 +147,7 @@
     for (int i = 0; i < dataSize; i++) {
       long containerID = continerIDBase + i;
       List<Long> blocks = new ArrayList<>();
-      int blockSize = random.nextInt(30) + 1;
-      for (int j = 0; j < blockSize; j++)  {
+      for (int j = 0; j < BLOCKS_PER_TXN; j++)  {
         long localID = localIDBase + j;
         blocks.add(localID);
       }
@@ -194,12 +194,16 @@
   }
 
   private List<DeletedBlocksTransaction> getTransactions(
-      int maximumAllowedTXNum) throws IOException {
+      int maximumAllowedBlocksNum) throws IOException {
     DatanodeDeletedBlockTransactions transactions =
-        new DatanodeDeletedBlockTransactions(containerManager,
-            maximumAllowedTXNum, 3);
-    deletedBlockLog.getTransactions(transactions);
-    return transactions.getDatanodeTransactions(dnList.get(0).getUuid());
+        deletedBlockLog.getTransactions(maximumAllowedBlocksNum);
+    List<DeletedBlocksTransaction> txns = new LinkedList<>();
+    for (DatanodeDetails dn : dnList) {
+      txns.addAll(Optional.ofNullable(
+          transactions.getDatanodeTransactionMap().get(dn.getUuid()))
+          .orElseGet(LinkedList::new));
+    }
+    return txns.stream().distinct().collect(Collectors.toList());
   }
 
   @Test
@@ -213,7 +217,7 @@
 
     // This will return all TXs, total num 30.
     List<DeletedBlocksTransaction> blocks =
-        getTransactions(40);
+        getTransactions(40 * BLOCKS_PER_TXN);
     List<Long> txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID)
         .collect(Collectors.toList());
 
@@ -224,13 +228,13 @@
     // Increment another time so it exceed the maxRetry.
     // On this call, count will be set to -1 which means TX eventually fails.
     deletedBlockLog.incrementCount(txIDs);
-    blocks = getTransactions(40);
+    blocks = getTransactions(40 * BLOCKS_PER_TXN);
     for (DeletedBlocksTransaction block : blocks) {
       Assert.assertEquals(-1, block.getCount());
     }
 
     // If all TXs are failed, getTransactions call will always return nothing.
-    blocks = getTransactions(40);
+    blocks = getTransactions(40 * BLOCKS_PER_TXN);
     Assert.assertEquals(blocks.size(), 0);
   }
 
@@ -240,7 +244,7 @@
       deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
     }
     List<DeletedBlocksTransaction> blocks =
-        getTransactions(20);
+        getTransactions(20 * BLOCKS_PER_TXN);
     // Add an invalid txn.
     blocks.add(
         DeletedBlocksTransaction.newBuilder().setContainerID(1).setTxID(70)
@@ -248,17 +252,17 @@
     commitTransactions(blocks);
     blocks.remove(blocks.size() - 1);
 
-    blocks = getTransactions(50);
+    blocks = getTransactions(50 * BLOCKS_PER_TXN);
     Assert.assertEquals(30, blocks.size());
     commitTransactions(blocks, dnList.get(1), dnList.get(2),
         DatanodeDetails.newBuilder().setUuid(UUID.randomUUID())
             .build());
 
-    blocks = getTransactions(50);
+    blocks = getTransactions(50 * BLOCKS_PER_TXN);
     Assert.assertEquals(30, blocks.size());
     commitTransactions(blocks, dnList.get(0));
 
-    blocks = getTransactions(50);
+    blocks = getTransactions(50 * BLOCKS_PER_TXN);
     Assert.assertEquals(0, blocks.size());
   }
 
@@ -318,9 +322,10 @@
     deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
         scm.getScmMetadataStore());
     List<DeletedBlocksTransaction> blocks =
-        getTransactions(10);
+        getTransactions(BLOCKS_PER_TXN * 10);
+    Assert.assertEquals(10, blocks.size());
     commitTransactions(blocks);
-    blocks = getTransactions(100);
+    blocks = getTransactions(BLOCKS_PER_TXN * 40);
     Assert.assertEquals(40, blocks.size());
     commitTransactions(blocks);
   }
@@ -328,74 +333,44 @@
   @Test
   public void testDeletedBlockTransactions() throws IOException {
     int txNum = 10;
-    int maximumAllowedTXNum = 5;
-    List<DeletedBlocksTransaction> blocks = null;
-    List<Long> containerIDs = new LinkedList<>();
+    List<DeletedBlocksTransaction> blocks;
     DatanodeDetails dnId1 = dnList.get(0), dnId2 = dnList.get(1);
 
     int count = 0;
-    long containerID = 0L;
+    long containerID;
 
     // Creates {TXNum} TX in the log.
-    for (Map.Entry<Long, List<Long>> entry : generateData(txNum)
-        .entrySet()) {
+    for (Map.Entry<Long, List<Long>> entry : generateData(txNum).entrySet()) {
       count++;
       containerID = entry.getKey();
-      containerIDs.add(containerID);
       deletedBlockLog.addTransaction(containerID, entry.getValue());
 
-      // make TX[1-6] for datanode1; TX[7-10] for datanode2
-      if (count <= (maximumAllowedTXNum + 1)) {
+      if (count % 2 == 0) {
         mockContainerInfo(containerID, dnId1);
       } else {
         mockContainerInfo(containerID, dnId2);
       }
     }
 
-    DatanodeDeletedBlockTransactions transactions =
-        new DatanodeDeletedBlockTransactions(containerManager,
-            maximumAllowedTXNum, 2);
-    deletedBlockLog.getTransactions(transactions);
+    // fetch and delete 1 less txn Id
+    commitTransactions(getTransactions((txNum - 1) * BLOCKS_PER_TXN));
 
-    for (UUID id : transactions.getDatanodeIDs()) {
-      List<DeletedBlocksTransaction> txs = transactions
-          .getDatanodeTransactions(id);
-      // delete TX ID
-      commitTransactions(txs);
-    }
-
-    blocks = getTransactions(txNum);
-    // There should be one block remained since dnID1 reaches
-    // the maximum value (5).
+    blocks = getTransactions(txNum * BLOCKS_PER_TXN);
+    // There should be one txn remaining
     Assert.assertEquals(1, blocks.size());
 
-    Assert.assertFalse(transactions.isFull());
-    // The number of TX in dnID1 won't more than maximum value.
-    Assert.assertEquals(maximumAllowedTXNum,
-        transactions.getDatanodeTransactions(dnId1.getUuid()).size());
-
-    int size = transactions.getDatanodeTransactions(dnId2.getUuid()).size();
-    // add two transactions for same container in dnID2
+    // add two transactions for same container
+    containerID = blocks.get(0).getContainerID();
     DeletedBlocksTransaction.Builder builder =
         DeletedBlocksTransaction.newBuilder();
     builder.setTxID(11);
     builder.setContainerID(containerID);
     builder.setCount(0);
-    Assert.assertTrue(transactions.addTransaction(builder.build(), null));
+    deletedBlockLog.addTransaction(containerID, new LinkedList<>());
 
-    // Total number of transactions reaches the maximum value
-    Assert.assertEquals(size + 1,
-        transactions.getDatanodeTransactions(dnId2.getUuid()).size());
-    Assert.assertTrue(transactions.isFull());
-
-    containerID = RandomUtils.nextLong();
-    builder = DeletedBlocksTransaction.newBuilder();
-    builder.setTxID(12);
-    builder.setContainerID(containerID);
-    builder.setCount(0);
-    mockContainerInfo(containerID, dnId2);
-    // No more txns can be added as maximum number of transactions are reached
-    Assert.assertFalse(transactions.addTransaction(builder.build(), null));
+    // get should return two transactions for the same container
+    blocks = getTransactions(txNum);
+    Assert.assertEquals(2, blocks.size());
   }
 
   private void mockContainerInfo(long containerID, DatanodeDetails dd)
diff --git a/hadoop-ozone/common/pom.xml b/hadoop-ozone/common/pom.xml
index 78eb2e7..5220945 100644
--- a/hadoop-ozone/common/pom.xml
+++ b/hadoop-ozone/common/pom.xml
@@ -67,6 +67,11 @@
       <scope>test</scope>
       <type>test-jar</type>
     </dependency>
+    <dependency>
+      <groupId>com.github.spotbugs</groupId>
+      <artifactId>spotbugs-annotations</artifactId>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index b757eb9..a95eabc 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -34,6 +34,8 @@
 import java.util.Optional;
 import java.util.OptionalInt;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -51,6 +53,7 @@
 import org.apache.commons.lang3.StringUtils;
 import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
 import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_BIND_HOST_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY;
@@ -602,4 +605,40 @@
     }
     return false;
   }
+
+  /**
+   * Normalize the key name. This method used {@link Path} to
+   * normalize the key name.
+   * @param keyName
+   * @param preserveTrailingSlash - if True preserves trailing slash, else
+   * does not preserve.
+   * @return normalized key name.
+   */
+  @SuppressFBWarnings("DMI_HARDCODED_ABSOLUTE_FILENAME")
+  public static String normalizeKey(String keyName,
+      boolean preserveTrailingSlash) {
+    // For empty strings do nothing, just return the same.
+    // Reason to check here is the Paths method fail with NPE.
+    if (!StringUtils.isBlank(keyName)) {
+      String normalizedKeyName;
+      if (keyName.startsWith(OM_KEY_PREFIX)) {
+        normalizedKeyName = new Path(keyName).toUri().getPath();
+      } else {
+        normalizedKeyName = new Path(OM_KEY_PREFIX + keyName)
+            .toUri().getPath();
+      }
+      if (!keyName.equals(normalizedKeyName)) {
+        LOG.debug("Normalized key {} to {} ", keyName,
+            normalizedKeyName.substring(1));
+      }
+      if (preserveTrailingSlash) {
+        if (keyName.endsWith("/")) {
+          return normalizedKeyName.substring(1) + "/";
+        }
+      }
+      return normalizedKeyName.substring(1);
+    }
+
+    return keyName;
+  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
index 65eb139..0d698fd 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -214,7 +214,7 @@
     try {
       OzoneManagerProtocolPB proxy = createOMProxy(address);
       // Create proxyInfo here, to make it work with all Hadoop versions.
-      proxyInfo = new ProxyInfo<>(proxy, omProxyInfos.toString());
+      proxyInfo = new ProxyInfo<>(proxy, omProxyInfo.toString());
       omProxies.put(nodeId, proxyInfo);
     } catch (IOException ioe) {
       LOG.error("{} Failed to create RPC proxy to OM at {}",
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
index cbae18c..42ddbb9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
@@ -159,6 +159,16 @@
       return new Builder();
     }
 
+    public static Builder getBuilder(ResourceType resType,
+        StoreType storeType, String vol, String bucket, String key) {
+      return OzoneObjInfo.Builder.newBuilder()
+          .setResType(resType)
+          .setStoreType(storeType)
+          .setVolumeName(vol)
+          .setBucketName(bucket)
+          .setKeyName(key);
+    }
+
     public static Builder fromKeyArgs(OmKeyArgs args) {
       return new Builder()
           .setVolumeName(args.getVolumeName())
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/RequestContext.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/RequestContext.java
index 3295827..043cd55 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/RequestContext.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/RequestContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.security.acl;
 
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
@@ -32,16 +33,20 @@
   private final String serviceId;
   private final ACLIdentityType aclType;
   private final ACLType aclRights;
+  private final String ownerName;
 
+  @SuppressWarnings("parameternumber")
   public RequestContext(String host, InetAddress ip,
       UserGroupInformation clientUgi, String serviceId,
-      ACLIdentityType aclType, ACLType aclRights) {
+      ACLIdentityType aclType, ACLType aclRights,
+      String ownerName) {
     this.host = host;
     this.ip = ip;
     this.clientUgi = clientUgi;
     this.serviceId = serviceId;
     this.aclType = aclType;
     this.aclRights = aclRights;
+    this.ownerName = ownerName;
   }
 
   /**
@@ -55,6 +60,12 @@
     private IAccessAuthorizer.ACLIdentityType aclType;
     private IAccessAuthorizer.ACLType aclRights;
 
+    /**
+     *  ownerName is specially added to allow
+     *  authorizer to honor owner privilege.
+     */
+    private String ownerName;
+
     public Builder setHost(String bHost) {
       this.host = bHost;
       return this;
@@ -80,14 +91,23 @@
       return this;
     }
 
+    public ACLType getAclRights() {
+      return this.aclRights;
+    }
+
     public Builder setAclRights(ACLType aclRight) {
       this.aclRights = aclRight;
       return this;
     }
 
+    public Builder setOwnerName(String owner) {
+      this.ownerName = owner;
+      return this;
+    }
+
     public RequestContext build() {
       return new RequestContext(host, ip, clientUgi, serviceId, aclType,
-          aclRights);
+          aclRights, ownerName);
     }
   }
 
@@ -95,6 +115,27 @@
     return new Builder();
   }
 
+  public static RequestContext.Builder getBuilder(
+      UserGroupInformation ugi, InetAddress remoteAddress, String hostName,
+      ACLType aclType, String ownerName) {
+    RequestContext.Builder contextBuilder = RequestContext.newBuilder()
+        .setClientUgi(ugi)
+        .setIp(remoteAddress)
+        .setHost(hostName)
+        .setAclType(ACLIdentityType.USER)
+        .setAclRights(aclType)
+        .setOwnerName(ownerName);
+    return contextBuilder;
+  }
+
+  public static RequestContext.Builder getBuilder(UserGroupInformation ugi,
+      ACLType aclType, String ownerName) {
+    return getBuilder(ugi,
+        ProtobufRpcEngine.Server.getRemoteIp(),
+        ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
+        aclType, ownerName);
+  }
+
   public String getHost() {
     return host;
   }
@@ -119,4 +160,7 @@
     return aclRights;
   }
 
+  public String getOwnerName() {
+    return ownerName;
+  }
 }
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
index 8438cbf..7e21355 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
@@ -50,8 +50,8 @@
   private void testResourceLock(String[] resourceName,
       OzoneManagerLock.Resource resource) {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireLock(resource, resourceName);
-    lock.releaseLock(resource, resourceName);
+    lock.acquireWriteLock(resource, resourceName);
+    lock.releaseWriteLock(resource, resourceName);
     Assert.assertTrue(true);
   }
 
@@ -73,22 +73,22 @@
     if (resource == OzoneManagerLock.Resource.USER_LOCK ||
         resource == OzoneManagerLock.Resource.S3_SECRET_LOCK ||
         resource == OzoneManagerLock.Resource.PREFIX_LOCK){
-      lock.acquireLock(resource, resourceName);
+      lock.acquireWriteLock(resource, resourceName);
       try {
-        lock.acquireLock(resource, resourceName);
+        lock.acquireWriteLock(resource, resourceName);
         fail("reacquireResourceLock failed");
       } catch (RuntimeException ex) {
         String message = "cannot acquire " + resource.getName() + " lock " +
             "while holding [" + resource.getName() + "] lock(s).";
         Assert.assertTrue(ex.getMessage(), ex.getMessage().contains(message));
       }
-      lock.releaseLock(resource, resourceName);
+      lock.releaseWriteLock(resource, resourceName);
       Assert.assertTrue(true);
     } else {
-      lock.acquireLock(resource, resourceName);
-      lock.acquireLock(resource, resourceName);
-      lock.releaseLock(resource, resourceName);
-      lock.releaseLock(resource, resourceName);
+      lock.acquireWriteLock(resource, resourceName);
+      lock.acquireWriteLock(resource, resourceName);
+      lock.releaseWriteLock(resource, resourceName);
+      lock.releaseWriteLock(resource, resourceName);
       Assert.assertTrue(true);
     }
   }
@@ -105,20 +105,20 @@
         OzoneManagerLock.Resource.values()) {
       Stack<ResourceInfo> stack = new Stack<>();
       resourceName = generateResourceName(resource);
-      lock.acquireLock(resource, resourceName);
+      lock.acquireWriteLock(resource, resourceName);
       stack.push(new ResourceInfo(resourceName, resource));
       for (OzoneManagerLock.Resource higherResource :
           OzoneManagerLock.Resource.values()) {
         if (higherResource.getMask() > resource.getMask()) {
           resourceName = generateResourceName(higherResource);
-          lock.acquireLock(higherResource, resourceName);
+          lock.acquireWriteLock(higherResource, resourceName);
           stack.push(new ResourceInfo(resourceName, higherResource));
         }
       }
       // Now release locks
       while (!stack.empty()) {
         ResourceInfo resourceInfo = stack.pop();
-        lock.releaseLock(resourceInfo.getResource(),
+        lock.releaseWriteLock(resourceInfo.getResource(),
             resourceInfo.getLockName());
       }
     }
@@ -134,9 +134,9 @@
           OzoneManagerLock.Resource.values()) {
         if (higherResource.getMask() > resource.getMask()) {
           String[] resourceName = generateResourceName(higherResource);
-          lock.acquireLock(higherResource, resourceName);
+          lock.acquireWriteLock(higherResource, resourceName);
           try {
-            lock.acquireLock(resource, generateResourceName(resource));
+            lock.acquireWriteLock(resource, generateResourceName(resource));
             fail("testLockViolationsWithOneHigherLevelLock failed");
           } catch (RuntimeException ex) {
             String message = "cannot acquire " + resource.getName() + " lock " +
@@ -144,7 +144,7 @@
             Assert.assertTrue(ex.getMessage(),
                 ex.getMessage().contains(message));
           }
-          lock.releaseLock(higherResource, resourceName);
+          lock.releaseWriteLock(higherResource, resourceName);
         }
       }
     }
@@ -167,14 +167,14 @@
           OzoneManagerLock.Resource.values()) {
         if (higherResource.getMask() > resource.getMask()) {
           resourceName = generateResourceName(higherResource);
-          lock.acquireLock(higherResource, resourceName);
+          lock.acquireWriteLock(higherResource, resourceName);
           stack.push(new ResourceInfo(resourceName, higherResource));
           currentLocks.add(higherResource.getName());
           queue.add(new ResourceInfo(resourceName, higherResource));
           // try to acquire lower level lock
           try {
             resourceName = generateResourceName(resource);
-            lock.acquireLock(resource, resourceName);
+            lock.acquireWriteLock(resource, resourceName);
           } catch (RuntimeException ex) {
             String message = "cannot acquire " + resource.getName() + " lock " +
                 "while holding " + currentLocks.toString() + " lock(s).";
@@ -187,7 +187,7 @@
       // Now release locks
       while (!stack.empty()) {
         ResourceInfo resourceInfo = stack.pop();
-        lock.releaseLock(resourceInfo.getResource(),
+        lock.releaseWriteLock(resourceInfo.getResource(),
             resourceInfo.getLockName());
       }
     }
@@ -198,7 +198,7 @@
     OzoneManagerLock lock =
         new OzoneManagerLock(new OzoneConfiguration());
     try {
-      lock.releaseLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
+      lock.releaseWriteLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
       fail("releaseLockWithOutAcquiringLock failed");
     } catch (IllegalMonitorStateException ex) {
       String message = "Releasing lock on resource $user3 without acquiring " +
@@ -265,7 +265,7 @@
   @Test
   public void acquireMultiUserLockAfterUserLock() {
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
-    lock.acquireLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
+    lock.acquireWriteLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
     try {
       lock.acquireMultiUserLock("user1", "user2");
       fail("acquireMultiUserLockAfterUserLock failed");
@@ -274,7 +274,7 @@
           "[USER_LOCK] lock(s).";
       Assert.assertTrue(ex.getMessage(), ex.getMessage().contains(message));
     }
-    lock.releaseLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
+    lock.releaseWriteLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
   }
 
   @Test
@@ -282,7 +282,7 @@
     OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
     lock.acquireMultiUserLock("user1", "user2");
     try {
-      lock.acquireLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
+      lock.acquireWriteLock(OzoneManagerLock.Resource.USER_LOCK, "user3");
       fail("acquireUserLockAfterMultiUserLock failed");
     } catch (RuntimeException ex) {
       String message = "cannot acquire USER_LOCK lock while holding " +
@@ -299,20 +299,20 @@
     for (OzoneManagerLock.Resource resource :
         OzoneManagerLock.Resource.values()) {
       final String[] resourceName = generateResourceName(resource);
-      lock.acquireLock(resource, resourceName);
+      lock.acquireWriteLock(resource, resourceName);
 
       AtomicBoolean gotLock = new AtomicBoolean(false);
       new Thread(() -> {
-        lock.acquireLock(resource, resourceName);
+        lock.acquireWriteLock(resource, resourceName);
         gotLock.set(true);
-        lock.releaseLock(resource, resourceName);
+        lock.releaseWriteLock(resource, resourceName);
       }).start();
       // Let's give some time for the new thread to run
       Thread.sleep(100);
       // Since the new thread is trying to get lock on same resource,
       // it will wait.
       Assert.assertFalse(gotLock.get());
-      lock.releaseLock(resource, resourceName);
+      lock.releaseWriteLock(resource, resourceName);
       // Since we have released the lock, the new thread should have the lock
       // now.
       // Let's give some time for the new thread to run
diff --git a/hadoop-ozone/dist/src/main/compose/common/docker-image/docker-krb5/Dockerfile-krb5 b/hadoop-ozone/dist/src/main/compose/common/docker-image/docker-krb5/Dockerfile-krb5
index cca5f5e..69caa54 100644
--- a/hadoop-ozone/dist/src/main/compose/common/docker-image/docker-krb5/Dockerfile-krb5
+++ b/hadoop-ozone/dist/src/main/compose/common/docker-image/docker-krb5/Dockerfile-krb5
@@ -20,7 +20,7 @@
 RUN apk add --no-cache bash ca-certificates openssl krb5-server krb5 wget && update-ca-certificates
 RUN wget -O /usr/local/bin/dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.0/dumb-init_1.2.0_amd64
 RUN chmod +x /usr/local/bin/dumb-init
-RUN wget -O /root/issuer https://github.com/ajayydv/docker/raw/kdc/issuer
+RUN wget -c https://github.com/flokkr/issuer/releases/download/1.0.3/issuer_1.0.3_linux_amd64.tar.gz  -O - |  tar -xz -C /root
 RUN chmod +x /root/issuer
 WORKDIR /opt
 COPY krb5.conf /etc/
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop27/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop27/docker-compose.yaml
index d7293be..4d19ac6 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop27/docker-compose.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop27/docker-compose.yaml
@@ -90,7 +90,7 @@
       - ../common-config
     environment:
       HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-hadoop2-@project.version@.jar
-      WAIT_FOR: rm:8088
+      WAITFOR: rm:8088
     command: ["yarn","nodemanager"]
 # Optional section: comment out this part to get DNS resolution for all the containers.
 #  dns:
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop31/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop31/docker-compose.yaml
index 307882a..3a3279a 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop31/docker-compose.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop31/docker-compose.yaml
@@ -96,5 +96,5 @@
       - ../common-config
     environment:
       HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-hadoop3-@project.version@.jar
-      WAIT_FOR: rm:8088
+      WAITFOR: rm:8088
     command: ["yarn","nodemanager"]
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop32/.env b/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop32/.env
index 87e7cce..602a96f 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop32/.env
+++ b/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop32/.env
@@ -15,7 +15,7 @@
 # limitations under the License.
 
 HDDS_VERSION=@hdds.version@
-HADOOP_IMAGE=apache/hadoop
-HADOOP_VERSION=3
+HADOOP_IMAGE=flokkr/hadoop
+HADOOP_VERSION=3.2.1
 OZONE_RUNNER_VERSION=@docker.ozone-runner.version@
-HADOOP_OPTS=
\ No newline at end of file
+HADOOP_OPTS=
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop32/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop32/docker-compose.yaml
index 9cea61f..ff5023b 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop32/docker-compose.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozone-mr/hadoop32/docker-compose.yaml
@@ -94,7 +94,7 @@
       - ../common-config
     environment:
       HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-hadoop3-@project.version@.jar
-      WAIT_FOR: rm:8088
+      WAITFOR: rm:8088
     command: ["yarn","nodemanager"]
 # Optional section: comment out this part to get DNS resolution for all the containers.
 #    Add 127.0.0.1 (or the ip of your docker machine) to the resolv.conf to get local DNS resolution
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/.env b/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/.env
index 5aa2777..b4cd2f2 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/.env
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/.env
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 HDDS_VERSION=${hdds.version}
-HADOOP_VERSION=3
+HADOOP_IMAGE=flokkr/hadoop
+HADOOP_VERSION=3.2.1
 OZONE_RUNNER_VERSION=${docker.ozone-runner.version}
 HADOOP_OPTS=
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-compose.yaml
index 7854f08..8684744 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-compose.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-compose.yaml
@@ -27,7 +27,7 @@
     volumes:
       - ../..:/opt/hadoop
   kms:
-    image: apache/hadoop:${HADOOP_VERSION}
+    image: ${HADOOP_IMAGE}:${HADOOP_VERSION}
     networks:
       - ozone
     ports:
@@ -100,7 +100,7 @@
       HADOOP_OPTS: ${HADOOP_OPTS}
     command: ["/opt/hadoop/bin/ozone","scm"]
   rm:
-    image: apache/hadoop:${HADOOP_VERSION}
+    image: ${HADOOP_IMAGE}:${HADOOP_VERSION}
     hostname: rm
     networks:
       - ozone
@@ -115,7 +115,7 @@
       KERBEROS_KEYTABS: rm HTTP hadoop
     command: ["yarn", "resourcemanager"]
   nm:
-    image: apache/hadoop:${HADOOP_VERSION}
+    image: ${HADOOP_IMAGE}:${HADOOP_VERSION}
     hostname: nm
     networks:
       - ozone
@@ -125,11 +125,11 @@
       - ./docker-config
     environment:
       HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-hadoop3-@project.version@.jar
-      WAIT_FOR: rm:8088
+      WAITFOR: rm:8088
       KERBEROS_KEYTABS: nm HTTP
     command: ["yarn","nodemanager"]
   jhs:
-    image: apache/hadoop:${HADOOP_VERSION}
+    image: ${HADOOP_IMAGE}:${HADOOP_VERSION}
     container_name: jhs
     hostname: jhs
     networks:
@@ -143,7 +143,7 @@
     environment:
       KERBEROS_KEYTABS: jhs HTTP
       HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-hadoop3-@project.version@.jar
-      WAIT_FOR: rm:8088
+      WAITFOR: rm:8088
     command: ["yarn","timelineserver"]
 networks:
   ozone:
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/test.sh b/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/test.sh
index 3763397..11989fd 100755
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/test.sh
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/test.sh
@@ -35,6 +35,7 @@
 # shellcheck source=/dev/null
 source "$COMPOSE_DIR/../testlib.sh"
 
+execute_command_in_container rm sudo yum install -y krb5-workstation
 execute_robot_test rm kinit-hadoop.robot
 
 for scheme in o3fs ofs; do
diff --git a/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-fs.robot b/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-fs.robot
index af9c9ae..f9cac67 100644
--- a/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-fs.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-fs.robot
@@ -118,7 +118,7 @@
     Execute         kdestroy
     Run Keyword     Kinit test user     testuser2    testuser2.keytab
     ${result} =     Execute And Ignore Error         ozone sh bucket list /${volume3}/
-                    Should contain      ${result}    PERMISSION_DENIED org.apache.hadoop.ozone.om.exceptions.OMException: User testuser2/scm@EXAMPLE.COM doesn't have LIST permission to access volume
+                    Should contain      ${result}    PERMISSION_DENIED User testuser2/scm@EXAMPLE.COM doesn't have LIST permission to access volume
     Execute         ozone sh volume addacl ${volume3} -a user:testuser2/scm@EXAMPLE.COM:l
     Execute         ozone sh bucket list /${volume3}/
     Execute         ozone sh volume getacl /${volume3}/
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSWithObjectStoreCreate.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSWithObjectStoreCreate.java
index e89d1c4..a72a257 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSWithObjectStoreCreate.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSWithObjectStoreCreate.java
@@ -26,8 +26,10 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.TestDataUtil;
 import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -46,6 +48,8 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -332,18 +336,78 @@
 
 
   @Test
-  public void testReadWithNotNormalizedPath() throws Exception {
+  public void testListKeysWithNotNormalizedPath() throws Exception {
+
     OzoneVolume ozoneVolume =
         cluster.getRpcClient().getObjectStore().getVolume(volumeName);
 
     OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucketName);
 
-    String key = "/dir1///dir2/file1/";
+    String key1 = "/dir1///dir2/file1/";
+    String key2 = "/dir1///dir2/file2/";
+    String key3 = "/dir1///dir2/file3/";
+
+    LinkedList<String> keys = new LinkedList<>();
+    keys.add("dir1/");
+    keys.add("dir1/dir2/");
+    keys.add(OmUtils.normalizeKey(key1, false));
+    keys.add(OmUtils.normalizeKey(key2, false));
+    keys.add(OmUtils.normalizeKey(key3, false));
 
     int length = 10;
     byte[] input = new byte[length];
     Arrays.fill(input, (byte)96);
-    String inputString = new String(input);
+
+    createKey(ozoneBucket, key1, 10, input);
+    createKey(ozoneBucket, key2, 10, input);
+    createKey(ozoneBucket, key3, 10, input);
+
+    // Iterator with key name as prefix.
+
+    Iterator<? extends OzoneKey > ozoneKeyIterator =
+        ozoneBucket.listKeys("/dir1//");
+
+    checkKeyList(ozoneKeyIterator, keys);
+
+    // Iterator with with normalized key prefix.
+    ozoneKeyIterator =
+        ozoneBucket.listKeys("dir1/");
+
+    checkKeyList(ozoneKeyIterator, keys);
+
+    // Iterator with key name as previous key.
+    ozoneKeyIterator = ozoneBucket.listKeys(null,
+        "/dir1///dir2/file1/");
+
+    // Remove keys before //dir1/dir2/file1
+    keys.remove("dir1/");
+    keys.remove("dir1/dir2/");
+    keys.remove("dir1/dir2/file1");
+
+    checkKeyList(ozoneKeyIterator, keys);
+
+    // Iterator with  normalized key as previous key.
+    ozoneKeyIterator = ozoneBucket.listKeys(null,
+        OmUtils.normalizeKey(key1, false));
+
+    checkKeyList(ozoneKeyIterator, keys);
+  }
+
+  private void checkKeyList(Iterator<? extends OzoneKey > ozoneKeyIterator,
+      List<String> keys) {
+
+    LinkedList<String> outputKeys = new LinkedList<>();
+    while (ozoneKeyIterator.hasNext()) {
+      OzoneKey ozoneKey = ozoneKeyIterator.next();
+      outputKeys.add(ozoneKey.getName());
+    }
+
+    Assert.assertEquals(keys, outputKeys);
+  }
+
+  private void createKey(OzoneBucket ozoneBucket, String key, int length,
+      byte[] input)
+      throws Exception {
 
     OzoneOutputStream ozoneOutputStream =
         ozoneBucket.createKey(key, length);
@@ -358,6 +422,7 @@
     ozoneInputStream.read(read, 0, length);
     ozoneInputStream.close();
 
+    String inputString = new String(input);
     Assert.assertEquals(inputString, new String(read));
 
     // Read using filesystem.
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index 1a18681..e0b3d75 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
@@ -96,6 +97,7 @@
   private static OzoneManager om = null;
   private static Set<Long> containerIdsWithDeletedBlocks;
   private static long maxTransactionId = 0;
+  private static File baseDir;
 
   @BeforeClass
   public static void init() throws Exception {
@@ -106,7 +108,7 @@
 
     String path =
         GenericTestUtils.getTempPath(TestBlockDeletion.class.getSimpleName());
-    File baseDir = new File(path);
+    baseDir = new File(path);
     baseDir.mkdirs();
 
     conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
@@ -137,10 +139,11 @@
   }
 
   @AfterClass
-  public static void cleanup() {
+  public static void cleanup() throws IOException {
     if (cluster != null) {
       cluster.shutdown();
     }
+    FileUtils.deleteDirectory(baseDir);
   }
 
   @Test
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
index ddd467b..ae8aae9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
@@ -17,27 +17,22 @@
 
 package org.apache.hadoop.ozone.container.metrics;
 
-import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
-import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
-import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
-import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-
-import com.google.common.collect.Maps;
+import java.io.File;
+import java.util.Map;
+import java.util.UUID;
 
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
-import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
@@ -50,23 +45,23 @@
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
 import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
 import org.apache.hadoop.test.GenericTestUtils;
 
+import com.google.common.collect.Maps;
+import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.File;
-import java.util.Map;
-import java.util.UUID;
 import org.junit.Rule;
+import org.junit.Test;
 import org.junit.rules.Timeout;
+import org.mockito.Mockito;
 
 /**
  * Test for metrics published by storage containers.
@@ -182,9 +177,7 @@
       assertCounter("ReadOpCount", 1L, volumeIOMetrics);
       assertCounter("WriteBytes", 1024L, volumeIOMetrics);
       assertCounter("WriteOpCount", 1L, volumeIOMetrics);
-      // ReadTime and WriteTime vary from run to run, only checking non-zero
-      Assert.assertNotEquals(0L, getLongCounter("ReadTime", volumeIOMetrics));
-      Assert.assertNotEquals(0L, getLongCounter("WriteTime", volumeIOMetrics));
+
     } finally {
       if (client != null) {
         client.close();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
index 34303ff..271109f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
@@ -116,7 +116,14 @@
 
     String volumeName = RandomStringUtils.randomAlphabetic(5).toLowerCase();
     String bucketName = RandomStringUtils.randomAlphabetic(5).toLowerCase();
-    cluster.getClient().getObjectStore().createVolume(volumeName);
+
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner("user" + RandomStringUtils.randomNumeric(5))
+        .setAdmin("admin" + RandomStringUtils.randomNumeric(5))
+        .build();
+
+    cluster.getClient().getObjectStore().createVolume(volumeName,
+        createVolumeArgs);
     OzoneVolume volume =
         cluster.getClient().getObjectStore().getVolume(volumeName);
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
index d7aaf37..a47aa08 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
@@ -201,16 +201,16 @@
 
     // Login as user1, list other users' volumes
     UserGroupInformation.setLoginUser(user1);
-    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
-        true);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+        "volume5"), true);
 
     // Add "s3v" created default by OM.
     checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2", "volume3",
         "volume4", "volume5", "s3v"), true);
 
     UserGroupInformation.setLoginUser(user2);
-    checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
-        true);
+    checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume4",
+        "volume5"), true);
     checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2", "volume3",
         "volume4", "volume5", "s3v"), true);
 
@@ -229,18 +229,18 @@
 
     // Login as user1, list other users' volumes, expect failure
     UserGroupInformation.setLoginUser(user1);
-    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
-        false);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+        "volume5"), false);
     // Add "s3v" created default by OM.
     checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2", "volume3",
         "volume4", "volume5", "s3v"), false);
 
     // While admin should be able to list volumes just fine.
     UserGroupInformation.setLoginUser(adminUser);
-    checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
-        true);
-    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
-        true);
+    checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume4",
+        "volume5"), true);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+        "volume5"), true);
 
     stopCluster(cluster);
   }
@@ -249,10 +249,10 @@
   public void testAclEnabledListAllAllowed() throws Exception {
     // ozone.acl.enabled = true, ozone.om.volume.listall.allowed = true
     MiniOzoneCluster cluster = startCluster(true, true);
-    checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
-        true);
-    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
-        true);
+    checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume4",
+        "volume5"), true);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+        "volume5"), true);
 
     // Add "s3v" created default by OM.
     checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2", "volume3",
@@ -267,11 +267,11 @@
     // The default user is adminUser as set in init(),
     // listall always succeeds if we use that UGI, we should use non-admin here
     UserGroupInformation.setLoginUser(user1);
-    checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
-        false);
+    checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume4",
+        "volume5"), false);
     UserGroupInformation.setLoginUser(user2);
-    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
-        false);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+        "volume5"), false);
     UserGroupInformation.setLoginUser(adminUser);
     // Add "s3v" created default by OM.
     checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2",
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 3ec0146..b347dc1 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -635,6 +635,7 @@
 message AddAclRequest {
   required OzoneObj obj = 1;
   required OzoneAclInfo acl = 2;
+  optional uint64 modificationTime = 3;
 }
 
 message AddAclResponse {
@@ -644,6 +645,7 @@
 message RemoveAclRequest {
   required OzoneObj obj = 1;
   required OzoneAclInfo acl = 2;
+  optional uint64 modificationTime = 3;
 }
 
 message RemoveAclResponse {
@@ -653,6 +655,7 @@
 message SetAclRequest {
   required OzoneObj obj = 1;
   repeated OzoneAclInfo acl = 2;
+  optional uint64 modificationTime = 3;
 }
 
 message SetAclResponse {
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index c687a4b..738bc44 100644
--- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -32,8 +32,8 @@
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.lock.OzoneManagerLock;
 import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .UserVolumeInfo;
+import org.apache.hadoop.ozone.storage.proto.
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -258,7 +258,7 @@
    *
    * @return UserTable.
    */
-  Table<String, UserVolumeInfo> getUserTable();
+  Table<String, PersistedUserVolumeInfo> getUserTable();
 
   /**
    * Returns the Volume Table.
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/codec/UserVolumeInfoCodec.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/codec/UserVolumeInfoCodec.java
index 756b639..b38421e 100644
--- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/codec/UserVolumeInfoCodec.java
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/codec/UserVolumeInfoCodec.java
@@ -18,32 +18,34 @@
 package org.apache.hadoop.ozone.om.codec;
 
 import java.io.IOException;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.UserVolumeInfo;
+import org.apache.hadoop.ozone.storage.proto
+    .OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.hdds.utils.db.Codec;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
- * Codec to encode UserVolumeInfo as byte array.
+ * Codec to encode PersistedUserVolumeInfo as byte array.
  */
-public class UserVolumeInfoCodec implements Codec<UserVolumeInfo> {
+public class UserVolumeInfoCodec implements Codec<PersistedUserVolumeInfo> {
 
   @Override
-  public byte[] toPersistedFormat(UserVolumeInfo object) throws IOException {
+  public byte[] toPersistedFormat(
+      PersistedUserVolumeInfo object) throws IOException {
     Preconditions
         .checkNotNull(object, "Null object can't be converted to byte array.");
     return object.toByteArray();
   }
 
   @Override
-  public UserVolumeInfo fromPersistedFormat(byte[] rawData) throws IOException {
+  public PersistedUserVolumeInfo fromPersistedFormat(
+      byte[] rawData) throws IOException {
     Preconditions
         .checkNotNull(rawData,
             "Null byte array can't converted to real object.");
     try {
-      return UserVolumeInfo.parseFrom(rawData);
+      return PersistedUserVolumeInfo.parseFrom(rawData);
     } catch (InvalidProtocolBufferException e) {
       throw new IllegalArgumentException(
           "Can't encode the the raw data from the byte array", e);
@@ -51,7 +53,7 @@
   }
 
   @Override
-  public UserVolumeInfo copyObject(UserVolumeInfo object) {
+  public PersistedUserVolumeInfo copyObject(PersistedUserVolumeInfo object) {
     return object;
   }
 }
diff --git a/hadoop-ozone/interface-storage/src/main/proto/OmStorageProtocol.proto b/hadoop-ozone/interface-storage/src/main/proto/OmStorageProtocol.proto
index 74b0109..1c0014c 100644
--- a/hadoop-ozone/interface-storage/src/main/proto/OmStorageProtocol.proto
+++ b/hadoop-ozone/interface-storage/src/main/proto/OmStorageProtocol.proto
@@ -57,4 +57,10 @@
   repeated hadoop.hdds.KeyValue metadata = 3;
   optional uint64 objectID = 4;
   optional uint64 updateID = 5;
+}
+
+message PersistedUserVolumeInfo {
+  repeated string volumeNames = 1;
+  optional uint64 objectID = 2;
+  optional uint64 updateID = 3;
 }
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
index 68466f4..a93ebe6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
@@ -117,10 +117,10 @@
     String volumeName = bucketInfo.getVolumeName();
     String bucketName = bucketInfo.getBucketName();
     boolean acquiredBucketLock = false;
-    metadataManager.getLock().acquireLock(VOLUME_LOCK, volumeName);
+    metadataManager.getLock().acquireWriteLock(VOLUME_LOCK, volumeName);
     try {
-      acquiredBucketLock = metadataManager.getLock().acquireLock(BUCKET_LOCK,
-          volumeName, bucketName);
+      acquiredBucketLock = metadataManager.getLock().acquireWriteLock(
+          BUCKET_LOCK, volumeName, bucketName);
       String volumeKey = metadataManager.getVolumeKey(volumeName);
       String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
       OmVolumeArgs volumeArgs = metadataManager.getVolumeTable().get(volumeKey);
@@ -188,10 +188,10 @@
       throw ex;
     } finally {
       if (acquiredBucketLock) {
-        metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+        metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
             bucketName);
       }
-      metadataManager.getLock().releaseLock(VOLUME_LOCK, volumeName);
+      metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
     }
   }
 
@@ -282,7 +282,8 @@
     Preconditions.checkNotNull(args);
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+        bucketName);
     try {
       String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
       OmBucketInfo oldBucketInfo =
@@ -336,7 +337,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
     }
   }
@@ -353,7 +354,8 @@
       throws IOException {
     Preconditions.checkNotNull(volumeName);
     Preconditions.checkNotNull(bucketName);
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+        bucketName);
     try {
       //Check if bucket exists
       String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
@@ -376,7 +378,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
     }
   }
@@ -418,7 +420,7 @@
     String volume = obj.getVolumeName();
     String bucket = obj.getBucketName();
     boolean changed = false;
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
     try {
       String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
       OmBucketInfo bucketInfo =
@@ -440,7 +442,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
     }
 
     return changed;
@@ -465,7 +467,7 @@
     String volume = obj.getVolumeName();
     String bucket = obj.getBucketName();
     boolean removed = false;
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
     try {
       String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
       OmBucketInfo bucketInfo =
@@ -486,7 +488,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
     }
     return removed;
   }
@@ -509,7 +511,7 @@
     }
     String volume = obj.getVolumeName();
     String bucket = obj.getBucketName();
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
     try {
       String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
       OmBucketInfo bucketInfo =
@@ -528,7 +530,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
     }
     return true;
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index ced055c..83fa020 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -459,7 +459,8 @@
         args.getVolumeName(), args.getBucketName(), args.getKeyName());
 
     FileEncryptionInfo encInfo;
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+        bucketName);
     OmBucketInfo bucketInfo;
     try {
       bucketInfo = getBucketInfo(volumeName, bucketName);
@@ -472,7 +473,7 @@
           volumeName, bucketName, keyName, ex);
       throw new OMException(ex.getMessage(), ResultCodes.KEY_ALLOCATION_ERROR);
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
     }
     if (keyInfo == null) {
@@ -613,7 +614,7 @@
         .getOpenKey(volumeName, bucketName, keyName, clientID);
     Preconditions.checkNotNull(locationInfoList);
     try {
-      metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
+      metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
       validateBucket(volumeName, bucketName);
       OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey);
@@ -640,7 +641,7 @@
       throw new OMException(ex.getMessage(),
           ResultCodes.KEY_ALLOCATION_ERROR);
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
     }
   }
@@ -798,7 +799,8 @@
           ResultCodes.INVALID_KEY_NAME);
     }
 
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+        bucketName);
     try {
       // fromKeyName should exist
       String fromKey = metadataManager.getOzoneKey(
@@ -851,7 +853,7 @@
       throw new OMException(ex.getMessage(),
           ResultCodes.KEY_RENAME_ERROR);
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
     }
   }
@@ -862,7 +864,8 @@
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+        bucketName);
     try {
       String objectKey = metadataManager.getOzoneKey(
           volumeName, bucketName, keyName);
@@ -893,7 +896,7 @@
       throw new OMException(ex.getMessage(), ex,
           ResultCodes.KEY_DELETION_ERROR);
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
     }
   }
@@ -919,6 +922,12 @@
     // underlying table using an iterator. That automatically creates a
     // snapshot of the data, so we don't need these locks at a higher level
     // when we iterate.
+
+    if (enableFileSystemPaths) {
+      startKey = OmUtils.normalizeKey(startKey, true);
+      keyPrefix = OmUtils.normalizeKey(keyPrefix, true);
+    }
+
     List<OmKeyInfo> keyList = metadataManager.listKeys(volumeName, bucketName,
         startKey, keyPrefix, maxKeys);
     refreshPipeline(keyList);
@@ -981,7 +990,8 @@
     String bucketName = keyArgs.getBucketName();
     String keyName = keyArgs.getKeyName();
 
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+        bucketName);
     OmBucketInfo bucketInfo = validateS3Bucket(volumeName, bucketName);
     try {
 
@@ -1044,7 +1054,7 @@
       throw new OMException(ex.getMessage(),
           ResultCodes.INITIATE_MULTIPART_UPLOAD_ERROR);
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
     }
   }
@@ -1098,7 +1108,8 @@
     String uploadID = omKeyArgs.getMultipartUploadID();
     int partNumber = omKeyArgs.getMultipartUploadPartNumber();
 
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+        bucketName);
     validateS3Bucket(volumeName, bucketName);
     String partName;
     try {
@@ -1185,7 +1196,7 @@
       throw new OMException(ex.getMessage(),
           ResultCodes.MULTIPART_UPLOAD_PARTFILE_ERROR);
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
     }
 
@@ -1204,7 +1215,8 @@
     String bucketName = omKeyArgs.getBucketName();
     String keyName = omKeyArgs.getKeyName();
     String uploadID = omKeyArgs.getMultipartUploadID();
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+        bucketName);
     validateS3Bucket(volumeName, bucketName);
     try {
       String multipartKey = metadataManager.getMultipartKey(volumeName,
@@ -1232,7 +1244,7 @@
       throw new OMException(ex.getMessage(), ResultCodes
           .COMPLETE_MULTIPART_UPLOAD_ERROR);
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
     }
   }
@@ -1247,7 +1259,8 @@
     String uploadID = omKeyArgs.getMultipartUploadID();
     Preconditions.checkNotNull(uploadID, "uploadID cannot be null");
     validateS3Bucket(volumeName, bucketName);
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+        bucketName);
     OmBucketInfo bucketInfo;
     try {
       String multipartKey = metadataManager.getMultipartKey(volumeName,
@@ -1305,7 +1318,7 @@
       throw new OMException(ex.getMessage(), ResultCodes
           .ABORT_MULTIPART_UPLOAD_FAILED);
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
     }
 
@@ -1484,7 +1497,7 @@
     boolean changed = false;
 
 
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
     try {
       validateBucket(volume, bucket);
       String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
@@ -1507,7 +1520,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
     }
     return changed;
   }
@@ -1528,7 +1541,7 @@
     String keyName = obj.getKeyName();
     boolean changed = false;
 
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
     try {
       validateBucket(volume, bucket);
       String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
@@ -1548,7 +1561,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
     }
     return changed;
   }
@@ -1569,7 +1582,7 @@
     String keyName = obj.getKeyName();
     boolean changed = false;
 
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volume, bucket);
     try {
       validateBucket(volume, bucket);
       String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
@@ -1590,7 +1603,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volume, bucket);
     }
     return changed;
   }
@@ -1847,7 +1860,8 @@
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
 
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+        bucketName);
     try {
 
       // Check if this is the root of the filesystem.
@@ -1869,7 +1883,7 @@
           .getOzoneKey(volumeName, bucketName, dirDbKeyInfo.getKeyName());
       metadataManager.getKeyTable().put(dirDbKey, dirDbKeyInfo);
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
     }
   }
@@ -1921,7 +1935,8 @@
     String keyName = args.getKeyName();
     OpenKeySession keySession;
 
-    metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
+    metadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+        bucketName);
     try {
       OzoneFileStatus fileStatus;
       try {
@@ -1947,7 +1962,7 @@
       // filestatus. We can avoid some operations in openKey call.
       keySession = openKey(args);
     } finally {
-      metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+      metadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
           bucketName);
     }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index da7e985..f7d3ba0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -70,8 +70,8 @@
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.lock.OzoneManagerLock;
 import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.UserVolumeInfo;
+import org.apache.hadoop.ozone.storage.proto
+    .OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -192,7 +192,7 @@
   }
 
   @Override
-  public Table<String, UserVolumeInfo> getUserTable() {
+  public Table<String, PersistedUserVolumeInfo> getUserTable() {
     return userTable;
   }
 
@@ -330,7 +330,7 @@
             new RepeatedOmKeyInfoCodec(true))
         .addCodec(OmBucketInfo.class, new OmBucketInfoCodec())
         .addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec())
-        .addCodec(UserVolumeInfo.class, new UserVolumeInfoCodec())
+        .addCodec(PersistedUserVolumeInfo.class, new UserVolumeInfoCodec())
         .addCodec(OmMultipartKeyInfo.class, new OmMultipartKeyInfoCodec())
         .addCodec(S3SecretValue.class, new S3SecretValueCodec())
         .addCodec(OmPrefixInfo.class, new OmPrefixInfoCodec())
@@ -344,7 +344,8 @@
    */
   protected void initializeOmTables() throws IOException {
     userTable =
-        this.store.getTable(USER_TABLE, String.class, UserVolumeInfo.class);
+        this.store.getTable(USER_TABLE, String.class,
+            PersistedUserVolumeInfo.class);
     checkTableStatus(userTable, USER_TABLE);
 
     TableCacheImpl.CacheCleanupPolicy cleanupPolicy =
@@ -948,13 +949,13 @@
     return result;
   }
 
-  private UserVolumeInfo getVolumesByUser(String userNameKey)
+  private PersistedUserVolumeInfo getVolumesByUser(String userNameKey)
       throws OMException {
     try {
-      UserVolumeInfo userVolInfo = getUserTable().get(userNameKey);
+      PersistedUserVolumeInfo userVolInfo = getUserTable().get(userNameKey);
       if (userVolInfo == null) {
         // No volume found for this user, return an empty list
-        return UserVolumeInfo.newBuilder().build();
+        return PersistedUserVolumeInfo.newBuilder().build();
       } else {
         return userVolInfo;
       }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 4e9fee6..9f57bec 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -146,7 +146,7 @@
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRoleInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserVolumeInfo;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager;
@@ -227,6 +227,7 @@
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -527,6 +528,7 @@
         authorizer.setKeyManager(keyManager);
         authorizer.setPrefixManager(prefixManager);
         authorizer.setOzoneAdmins(getOzoneAdmins(configuration));
+        authorizer.setAllowListAllVolumes(allowListAllVolumes);
       }
     } else {
       accessAuthorizer = null;
@@ -1626,7 +1628,7 @@
         ProtobufRpcEngine.Server.getRemoteUser(),
         ProtobufRpcEngine.Server.getRemoteIp(),
         ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
-        true);
+        true, getVolumeOwner(vol, acl));
   }
 
   /**
@@ -1641,25 +1643,46 @@
           UserGroupInformation.createRemoteUser(userName),
           ProtobufRpcEngine.Server.getRemoteIp(),
           ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
-          false);
+          false, getVolumeOwner(vol, acl));
     } catch (OMException ex) {
       // Should not trigger exception here at all
       return false;
     }
   }
 
-  /**
-   * CheckAcls for the ozone object.
-   *
-   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied.
-   */
-  @SuppressWarnings("parameternumber")
-  public void checkAcls(ResourceType resType, StoreType storeType,
-      ACLType aclType, String vol, String bucket, String key,
-      UserGroupInformation ugi, InetAddress remoteAddress, String hostName)
-      throws OMException {
-    checkAcls(resType, storeType, aclType, vol, bucket, key,
-        ugi, remoteAddress, hostName, true);
+  public String getVolumeOwner(String vol, ACLType type) throws OMException {
+    String volOwnerName = null;
+    if (!vol.equals(OzoneConsts.OZONE_ROOT) && (type != ACLType.CREATE)) {
+      volOwnerName = getVolumeOwner(vol);
+    }
+    return volOwnerName;
+  }
+
+  private String getVolumeOwner(String volume) throws OMException {
+    Boolean lockAcquired = metadataManager.getLock().acquireReadLock(
+        VOLUME_LOCK, volume);
+    String dbVolumeKey = metadataManager.getVolumeKey(volume);
+    OmVolumeArgs volumeArgs = null;
+    try {
+      volumeArgs = metadataManager.getVolumeTable().get(dbVolumeKey);
+    } catch (IOException ioe) {
+      if (ioe instanceof OMException) {
+        throw (OMException)ioe;
+      } else {
+        throw new OMException("getVolumeOwner for Volume " + volume + " failed",
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } finally {
+      if (lockAcquired) {
+        metadataManager.getLock().releaseReadLock(VOLUME_LOCK, volume);
+      }
+    }
+    if (volumeArgs != null) {
+      return volumeArgs.getOwnerName();
+    } else {
+      throw new OMException("Volume " + volume + " is not found",
+          OMException.ResultCodes.VOLUME_NOT_FOUND);
+    }
   }
 
   /**
@@ -1670,10 +1693,10 @@
    *                     and throwOnPermissionDenied set to true.
    */
   @SuppressWarnings("parameternumber")
-  private boolean checkAcls(ResourceType resType, StoreType storeType,
+  public boolean checkAcls(ResourceType resType, StoreType storeType,
       ACLType aclType, String vol, String bucket, String key,
       UserGroupInformation ugi, InetAddress remoteAddress, String hostName,
-      boolean throwIfPermissionDenied)
+      boolean throwIfPermissionDenied, String volumeOwner)
       throws OMException {
     OzoneObj obj = OzoneObjInfo.Builder.newBuilder()
         .setResType(resType)
@@ -1687,13 +1710,17 @@
         .setHost(hostName)
         .setAclType(ACLIdentityType.USER)
         .setAclRights(aclType)
+        .setOwnerName(volumeOwner)
         .build();
     if (!accessAuthorizer.checkAccess(obj, context)) {
       if (throwIfPermissionDenied) {
         LOG.warn("User {} doesn't have {} permission to access {} /{}/{}/{}",
-            ugi.getUserName(), aclType, resType, vol, bucket, key);
-        throw new OMException("User " + ugi.getUserName() + " doesn't have " +
-            aclType + " permission to access " + resType,
+            context.getClientUgi().getUserName(), context.getAclRights(),
+            obj.getResourceType(), obj.getVolumeName(), obj.getBucketName(),
+            obj.getKeyName());
+        throw new OMException("User " + context.getClientUgi().getUserName() +
+            " doesn't have " + context.getAclRights() +
+            " permission to access " + obj.getResourceType(),
             ResultCodes.PERMISSION_DENIED);
       }
       return false;
@@ -3496,7 +3523,6 @@
     return new ResolvedBucket(requested, resolved);
   }
 
-
   public ResolvedBucket resolveBucketLink(Pair<String, String> requested)
       throws IOException {
 
@@ -3547,9 +3573,11 @@
     }
 
     if (isAclEnabled) {
-      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.READ,
+      final ACLType type = ACLType.READ;
+      checkAcls(ResourceType.BUCKET, StoreType.OZONE, type,
           volumeName, bucketName, null, userGroupInformation,
-          remoteAddress, hostName);
+          remoteAddress, hostName, true,
+          getVolumeOwner(volumeName, type));
     }
 
     return resolveBucketLink(
@@ -3593,7 +3621,8 @@
           transactionID, objectID);
 
       String dbUserKey = metadataManager.getUserKey(userName);
-      UserVolumeInfo userVolumeInfo = UserVolumeInfo.newBuilder()
+      PersistedUserVolumeInfo userVolumeInfo =
+          PersistedUserVolumeInfo.newBuilder()
           .setObjectID(objectID)
           .setUpdateID(transactionID)
           .addVolumeNames(s3VolumeName).build();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
index 6461a7e..7669233 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
@@ -101,7 +101,7 @@
     validateOzoneObj(obj);
 
     String prefixPath = obj.getPath();
-    metadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
+    metadataManager.getLock().acquireWriteLock(PREFIX_LOCK, prefixPath);
     try {
       OmPrefixInfo prefixInfo =
           metadataManager.getPrefixTable().get(prefixPath);
@@ -117,7 +117,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
+      metadataManager.getLock().releaseWriteLock(PREFIX_LOCK, prefixPath);
     }
   }
 
@@ -133,7 +133,7 @@
   public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
     validateOzoneObj(obj);
     String prefixPath = obj.getPath();
-    metadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
+    metadataManager.getLock().acquireWriteLock(PREFIX_LOCK, prefixPath);
     try {
       OmPrefixInfo prefixInfo =
           metadataManager.getPrefixTable().get(prefixPath);
@@ -156,7 +156,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
+      metadataManager.getLock().releaseWriteLock(PREFIX_LOCK, prefixPath);
     }
   }
 
@@ -172,7 +172,7 @@
   public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
     validateOzoneObj(obj);
     String prefixPath = obj.getPath();
-    metadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
+    metadataManager.getLock().acquireWriteLock(PREFIX_LOCK, prefixPath);
     try {
       OmPrefixInfo prefixInfo =
           metadataManager.getPrefixTable().get(prefixPath);
@@ -188,7 +188,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
+      metadataManager.getLock().releaseWriteLock(PREFIX_LOCK, prefixPath);
     }
   }
 
@@ -202,7 +202,7 @@
   public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
     validateOzoneObj(obj);
     String prefixPath = obj.getPath();
-    metadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
+    metadataManager.getLock().acquireReadLock(PREFIX_LOCK, prefixPath);
     try {
       String longestPrefix = prefixTree.getLongestPrefix(prefixPath);
       if (prefixPath.equals(longestPrefix)) {
@@ -213,7 +213,7 @@
         }
       }
     } finally {
-      metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
+      metadataManager.getLock().releaseReadLock(PREFIX_LOCK, prefixPath);
     }
     return EMPTY_ACL_LIST;
   }
@@ -232,7 +232,7 @@
     Objects.requireNonNull(context);
 
     String prefixPath = ozObject.getPath();
-    metadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
+    metadataManager.getLock().acquireReadLock(PREFIX_LOCK, prefixPath);
     try {
       String longestPrefix = prefixTree.getLongestPrefix(prefixPath);
       if (prefixPath.equals(longestPrefix)) {
@@ -253,18 +253,18 @@
         return true;
       }
     } finally {
-      metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
+      metadataManager.getLock().releaseReadLock(PREFIX_LOCK, prefixPath);
     }
   }
 
   @Override
   public List<OmPrefixInfo> getLongestPrefixPath(String path) {
     String prefixPath = prefixTree.getLongestPrefix(path);
-    metadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
+    metadataManager.getLock().acquireReadLock(PREFIX_LOCK, prefixPath);
     try {
       return getLongestPrefixPathHelper(prefixPath);
     } finally {
-      metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
+      metadataManager.getLock().releaseReadLock(PREFIX_LOCK, prefixPath);
     }
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java
index fb56658..59fa467 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java
@@ -61,7 +61,7 @@
     Preconditions.checkArgument(Strings.isNotBlank(kerberosID),
         "kerberosID cannot be null or empty.");
     S3SecretValue result = null;
-    omMetadataManager.getLock().acquireLock(S3_SECRET_LOCK, kerberosID);
+    omMetadataManager.getLock().acquireWriteLock(S3_SECRET_LOCK, kerberosID);
     try {
       S3SecretValue s3Secret =
           omMetadataManager.getS3SecretTable().get(kerberosID);
@@ -73,7 +73,7 @@
         return s3Secret;
       }
     } finally {
-      omMetadataManager.getLock().releaseLock(S3_SECRET_LOCK, kerberosID);
+      omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK, kerberosID);
     }
     if (LOG.isTraceEnabled()) {
       LOG.trace("Secret for accessKey:{}, proto:{}", kerberosID, result);
@@ -89,7 +89,7 @@
     LOG.trace("Get secret for awsAccessKey:{}", kerberosID);
 
     S3SecretValue s3Secret;
-    omMetadataManager.getLock().acquireLock(S3_SECRET_LOCK, kerberosID);
+    omMetadataManager.getLock().acquireReadLock(S3_SECRET_LOCK, kerberosID);
     try {
       s3Secret = omMetadataManager.getS3SecretTable().get(kerberosID);
       if (s3Secret == null) {
@@ -97,7 +97,7 @@
             "awsAccessKeyId " + kerberosID, S3_SECRET_NOT_FOUND);
       }
     } finally {
-      omMetadataManager.getLock().releaseLock(S3_SECRET_LOCK, kerberosID);
+      omMetadataManager.getLock().releaseReadLock(S3_SECRET_LOCK, kerberosID);
     }
 
     return s3Secret.getAwsSecret();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
index b3e8186..e3367b2 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
@@ -28,8 +28,6 @@
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.UserVolumeInfo;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
@@ -41,6 +39,7 @@
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,11 +70,12 @@
   }
 
   // Helpers to add and delete volume from user list
-  private UserVolumeInfo addVolumeToOwnerList(String volume, String owner)
-      throws IOException {
+  private PersistedUserVolumeInfo addVolumeToOwnerList(
+      String volume, String owner) throws IOException {
     // Get the volume list
     String dbUserKey = metadataManager.getUserKey(owner);
-    UserVolumeInfo volumeList = metadataManager.getUserTable().get(dbUserKey);
+    PersistedUserVolumeInfo volumeList =
+        metadataManager.getUserTable().get(dbUserKey);
     List<String> prevVolList = new ArrayList<>();
     if (volumeList != null) {
       prevVolList.addAll(volumeList.getVolumeNamesList());
@@ -90,16 +90,17 @@
 
     // Add the new volume to the list
     prevVolList.add(volume);
-    UserVolumeInfo newVolList = UserVolumeInfo.newBuilder()
+    PersistedUserVolumeInfo newVolList = PersistedUserVolumeInfo.newBuilder()
         .addAllVolumeNames(prevVolList).build();
 
     return newVolList;
   }
 
-  private UserVolumeInfo delVolumeFromOwnerList(String volume, String owner)
-      throws IOException {
+  private PersistedUserVolumeInfo delVolumeFromOwnerList(
+      String volume, String owner) throws IOException {
     // Get the volume list
-    UserVolumeInfo volumeList = metadataManager.getUserTable().get(owner);
+    PersistedUserVolumeInfo volumeList =
+        metadataManager.getUserTable().get(owner);
     List<String> prevVolList = new ArrayList<>();
     if (volumeList != null) {
       prevVolList.addAll(volumeList.getVolumeNamesList());
@@ -110,7 +111,7 @@
 
     // Remove the volume from the list
     prevVolList.remove(volume);
-    UserVolumeInfo newVolList = UserVolumeInfo.newBuilder()
+    PersistedUserVolumeInfo newVolList = PersistedUserVolumeInfo.newBuilder()
         .addAllVolumeNames(prevVolList).build();
     return newVolList;
   }
@@ -124,10 +125,10 @@
     Preconditions.checkNotNull(omVolumeArgs);
 
     boolean acquiredUserLock = false;
-    metadataManager.getLock().acquireLock(VOLUME_LOCK,
+    metadataManager.getLock().acquireWriteLock(VOLUME_LOCK,
         omVolumeArgs.getVolume());
     try {
-      acquiredUserLock = metadataManager.getLock().acquireLock(USER_LOCK,
+      acquiredUserLock = metadataManager.getLock().acquireWriteLock(USER_LOCK,
           omVolumeArgs.getOwnerName());
       String dbVolumeKey = metadataManager.getVolumeKey(
           omVolumeArgs.getVolume());
@@ -142,8 +143,8 @@
         throw new OMException(ResultCodes.VOLUME_ALREADY_EXISTS);
       }
 
-      UserVolumeInfo volumeList = addVolumeToOwnerList(omVolumeArgs.getVolume(),
-          omVolumeArgs.getOwnerName());
+      PersistedUserVolumeInfo volumeList = addVolumeToOwnerList(
+          omVolumeArgs.getVolume(), omVolumeArgs.getOwnerName());
 
       // Set creation time
       omVolumeArgs.setCreationTime(System.currentTimeMillis());
@@ -162,16 +163,16 @@
       throw ex;
     } finally {
       if (acquiredUserLock) {
-        metadataManager.getLock().releaseLock(USER_LOCK,
+        metadataManager.getLock().releaseWriteLock(USER_LOCK,
             omVolumeArgs.getOwnerName());
       }
-      metadataManager.getLock().releaseLock(VOLUME_LOCK,
+      metadataManager.getLock().releaseWriteLock(VOLUME_LOCK,
           omVolumeArgs.getVolume());
     }
   }
 
   private void createVolumeCommitToDB(OmVolumeArgs omVolumeArgs,
-      UserVolumeInfo volumeList, String dbVolumeKey, String dbUserKey)
+      PersistedUserVolumeInfo volumeList, String dbVolumeKey, String dbUserKey)
       throws IOException {
     try (BatchOperation batch = metadataManager.getStore()
         .initBatchOperation()) {
@@ -201,7 +202,7 @@
     Preconditions.checkNotNull(owner);
     boolean acquiredUsersLock = false;
     String actualOwner = null;
-    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
+    metadataManager.getLock().acquireWriteLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs = metadataManager
@@ -220,11 +221,11 @@
 
       acquiredUsersLock = metadataManager.getLock().acquireMultiUserLock(owner,
           originalOwner);
-      UserVolumeInfo oldOwnerVolumeList = delVolumeFromOwnerList(volume,
-          originalOwner);
+      PersistedUserVolumeInfo oldOwnerVolumeList =
+          delVolumeFromOwnerList(volume, originalOwner);
 
       String newOwner =  metadataManager.getUserKey(owner);
-      UserVolumeInfo newOwnerVolumeList = addVolumeToOwnerList(volume,
+      PersistedUserVolumeInfo newOwnerVolumeList = addVolumeToOwnerList(volume,
           newOwner);
 
       volumeArgs.setOwnerName(owner);
@@ -240,13 +241,13 @@
       if (acquiredUsersLock) {
         metadataManager.getLock().releaseMultiUserLock(owner, actualOwner);
       }
-      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
+      metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volume);
     }
   }
 
-  private void setOwnerCommitToDB(UserVolumeInfo oldOwnerVolumeList,
-      UserVolumeInfo newOwnerVolumeList, OmVolumeArgs newOwnerVolumeArgs,
-      String oldOwner) throws IOException {
+  private void setOwnerCommitToDB(PersistedUserVolumeInfo oldOwnerVolumeList,
+      PersistedUserVolumeInfo newOwnerVolumeList,
+      OmVolumeArgs newOwnerVolumeArgs, String oldOwner) throws IOException {
     try (BatchOperation batch = metadataManager.getStore()
         .initBatchOperation()) {
       if (oldOwnerVolumeList.getVolumeNamesList().size() == 0) {
@@ -309,10 +310,10 @@
     Preconditions.checkNotNull(volume);
     String owner = null;
     boolean acquiredUserLock = false;
-    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
+    metadataManager.getLock().acquireWriteLock(VOLUME_LOCK, volume);
     try {
       owner = getVolumeInfo(volume).getOwnerName();
-      acquiredUserLock = metadataManager.getLock().acquireLock(USER_LOCK,
+      acquiredUserLock = metadataManager.getLock().acquireWriteLock(USER_LOCK,
           owner);
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
@@ -330,7 +331,7 @@
       Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
       // delete the volume from the owner list
       // as well as delete the volume entry
-      UserVolumeInfo newVolumeList = delVolumeFromOwnerList(volume,
+      PersistedUserVolumeInfo newVolumeList = delVolumeFromOwnerList(volume,
           volumeArgs.getOwnerName());
 
 
@@ -342,15 +343,15 @@
       throw ex;
     } finally {
       if (acquiredUserLock) {
-        metadataManager.getLock().releaseLock(USER_LOCK, owner);
+        metadataManager.getLock().releaseWriteLock(USER_LOCK, owner);
       }
-      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
+      metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volume);
 
     }
   }
 
 
-  private void deleteVolumeCommitToDB(UserVolumeInfo newVolumeList,
+  private void deleteVolumeCommitToDB(PersistedUserVolumeInfo newVolumeList,
       String volume, String owner) throws IOException {
     try (BatchOperation batch = metadataManager.getStore()
         .initBatchOperation()) {
@@ -435,7 +436,7 @@
           "VolumeManager. OzoneObj type:" + obj.getResourceType());
     }
     String volume = obj.getVolumeName();
-    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
+    metadataManager.getLock().acquireWriteLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
@@ -464,7 +465,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
+      metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volume);
     }
 
     return true;
@@ -487,7 +488,7 @@
           "VolumeManager. OzoneObj type:" + obj.getResourceType());
     }
     String volume = obj.getVolumeName();
-    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
+    metadataManager.getLock().acquireWriteLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
@@ -516,7 +517,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
+      metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volume);
     }
 
     return true;
@@ -540,7 +541,7 @@
           "VolumeManager. OzoneObj type:" + obj.getResourceType());
     }
     String volume = obj.getVolumeName();
-    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
+    metadataManager.getLock().acquireWriteLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
@@ -562,7 +563,7 @@
       }
       throw ex;
     } finally {
-      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
+      metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volume);
     }
 
     return true;
@@ -620,7 +621,7 @@
     Objects.requireNonNull(context);
 
     String volume = ozObject.getVolumeName();
-    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
+    metadataManager.getLock().acquireReadLock(VOLUME_LOCK, volume);
     try {
       String dbVolumeKey = metadataManager.getVolumeKey(volume);
       OmVolumeArgs volumeArgs =
@@ -647,7 +648,7 @@
       throw new OMException("Check access operation failed for " +
           "volume:" + volume, ex, ResultCodes.INTERNAL_ERROR);
     } finally {
-      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
+      metadataManager.getLock().releaseReadLock(VOLUME_LOCK, volume);
     }
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
index f6d04a9..1161fd0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
@@ -33,8 +33,8 @@
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 
 import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos;
 
 /**
  * Class defines the structure and types of the om.db.
@@ -51,13 +51,13 @@
                     new RepeatedOmKeyInfoCodec(true));
 
   public static final DBColumnFamilyDefinition<String,
-            OzoneManagerProtocolProtos.UserVolumeInfo>
+            OzoneManagerStorageProtos.PersistedUserVolumeInfo>
             USER_TABLE =
             new DBColumnFamilyDefinition<>(
                     "userTable",
                     String.class,
                     new StringCodec(),
-                    OzoneManagerProtocolProtos.UserVolumeInfo.class,
+                    OzoneManagerStorageProtos.PersistedUserVolumeInfo.class,
                     new UserVolumeInfoCodec());
 
   public static final DBColumnFamilyDefinition<String, OmVolumeArgs>
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index 728a624..9163801 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -20,9 +20,9 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.AuditAction;
 import org.apache.hadoop.ozone.audit.AuditEventStatus;
@@ -45,11 +45,9 @@
 import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.nio.file.Paths;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_KEY_NAME;
 
 /**
@@ -148,12 +146,36 @@
    * @param key
    * @throws IOException
    */
+  @SuppressWarnings("parameternumber")
   public void checkAcls(OzoneManager ozoneManager,
       OzoneObj.ResourceType resType,
       OzoneObj.StoreType storeType, IAccessAuthorizer.ACLType aclType,
       String vol, String bucket, String key) throws IOException {
+    checkAcls(ozoneManager, resType, storeType, aclType, vol, bucket, key,
+        ozoneManager.getVolumeOwner(vol, aclType));
+  }
+
+  /**
+   * Check Acls of ozone object with volOwner given.
+   * @param ozoneManager
+   * @param resType
+   * @param storeType
+   * @param aclType
+   * @param vol
+   * @param bucket
+   * @param key
+   * @param volOwner
+   * @throws IOException
+   */
+  @SuppressWarnings("parameternumber")
+  public void checkAcls(OzoneManager ozoneManager,
+      OzoneObj.ResourceType resType,
+      OzoneObj.StoreType storeType, IAccessAuthorizer.ACLType aclType,
+      String vol, String bucket, String key, String volOwner)
+      throws IOException {
     ozoneManager.checkAcls(resType, storeType, aclType, vol, bucket, key,
-        createUGI(), getRemoteAddress(), getHostName());
+        createUGI(), getRemoteAddress(), getHostName(), true,
+        volOwner);
   }
 
   /**
@@ -298,21 +320,11 @@
     }
   }
 
-  @SuppressFBWarnings("DMI_HARDCODED_ABSOLUTE_FILENAME")
+
   public static String validateAndNormalizeKey(String keyName)
       throws OMException {
-    String normalizedKeyName;
-    if (keyName.startsWith(OM_KEY_PREFIX)) {
-      normalizedKeyName = Paths.get(keyName).toUri().normalize().getPath();
-    } else {
-      normalizedKeyName = Paths.get(OM_KEY_PREFIX, keyName).toUri()
-          .normalize().getPath();
-    }
-    if (!keyName.equals(normalizedKeyName)) {
-      LOG.debug("Normalized key {} to {} ", keyName,
-          normalizedKeyName.substring(1));
-    }
-    return isValidKeyPath(normalizedKeyName.substring(1));
+    String normalizedKeyName = OmUtils.normalizeKey(keyName, false);
+    return isValidKeyPath(normalizedKeyName);
   }
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAclRequest.java
index a493f9f..6bda132 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAclRequest.java
@@ -110,6 +110,21 @@
           ozoneManager.isRatisEnabled());
 
       if (operationResult) {
+        // Update the modification time when updating ACLs of Bucket.
+        long modificationTime = omBucketInfo.getModificationTime();
+        if (getOmRequest().getAddAclRequest().hasObj()) {
+          modificationTime = getOmRequest().getAddAclRequest()
+              .getModificationTime();
+        } else if (getOmRequest().getSetAclRequest().hasObj()) {
+          modificationTime = getOmRequest().getSetAclRequest()
+              .getModificationTime();
+        } else if (getOmRequest().getRemoveAclRequest().hasObj()) {
+          modificationTime = getOmRequest().getRemoveAclRequest()
+              .getModificationTime();
+        }
+        omBucketInfo = omBucketInfo.toBuilder()
+            .setModificationTime(modificationTime).build();
+
         // update cache.
         omMetadataManager.getBucketTable().addCacheEntry(
             new CacheKey<>(dbBucketKey),
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAddAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAddAclRequest.java
index 78afeff..6b3d614 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAddAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAddAclRequest.java
@@ -23,8 +23,10 @@
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.util.BooleanBiFunction;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +60,19 @@
     };
   }
 
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    long modificationTime = Time.now();
+    OzoneManagerProtocolProtos.AddAclRequest.Builder addAclRequestBuilder =
+        getOmRequest().getAddAclRequest().toBuilder()
+            .setModificationTime(modificationTime);
+
+    return getOmRequest().toBuilder()
+        .setAddAclRequest(addAclRequestBuilder)
+        .setUserInfo(getUserInfo())
+        .build();
+  }
+
   public OMBucketAddAclRequest(OMRequest omRequest) {
     super(omRequest, bucketAddAclOp);
     OzoneManagerProtocolProtos.AddAclRequest addAclRequest =
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketRemoveAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketRemoveAclRequest.java
index 8b6fdba..6552a18 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketRemoveAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketRemoveAclRequest.java
@@ -21,7 +21,9 @@
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +57,19 @@
     };
   }
 
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    long modificationTime = Time.now();
+    OzoneManagerProtocolProtos.RemoveAclRequest.Builder removeAclRequestBuilder
+        = getOmRequest().getRemoveAclRequest().toBuilder()
+            .setModificationTime(modificationTime);
+
+    return getOmRequest().toBuilder()
+        .setRemoveAclRequest(removeAclRequestBuilder)
+        .setUserInfo(getUserInfo())
+        .build();
+  }
+
   public OMBucketRemoveAclRequest(OMRequest omRequest) {
     super(omRequest, bucketAddAclOp);
     OzoneManagerProtocolProtos.RemoveAclRequest removeAclRequest =
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketSetAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketSetAclRequest.java
index cfc4eb4..09fa7da 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketSetAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketSetAclRequest.java
@@ -22,7 +22,9 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +57,19 @@
     };
   }
 
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    long modificationTime = Time.now();
+    OzoneManagerProtocolProtos.SetAclRequest.Builder setAclRequestBuilder =
+        getOmRequest().getSetAclRequest().toBuilder()
+            .setModificationTime(modificationTime);
+
+    return getOmRequest().toBuilder()
+        .setSetAclRequest(setAclRequestBuilder)
+        .setUserInfo(getUserInfo())
+        .build();
+  }
+
   public OMBucketSetAclRequest(OMRequest omRequest) {
     super(omRequest, bucketAddAclOp);
     OzoneManagerProtocolProtos.SetAclRequest setAclRequest =
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 29d0243..b1d47de 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -153,7 +153,7 @@
       }
 
       bucketLockAcquired =
-          omMetadataManager.getLock().acquireLock(BUCKET_LOCK,
+          omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
               volumeName, bucketName);
 
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
@@ -220,7 +220,7 @@
           omDoubleBufferHelper);
 
       if(bucketLockAcquired) {
-        omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
             bucketName);
       }
     }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index a048533..ee48f9b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -198,6 +198,19 @@
     }
   }
 
+  // For keys batch delete and rename only
+  protected String getVolumeOwner(OMMetadataManager omMetadataManager,
+      String volumeName) throws IOException {
+    String dbVolumeKey = omMetadataManager.getVolumeKey(volumeName);
+    OmVolumeArgs volumeArgs =
+        omMetadataManager.getVolumeTable().get(dbVolumeKey);
+    if (volumeArgs == null) {
+      throw new OMException("Volume not found " + volumeName,
+          VOLUME_NOT_FOUND);
+    }
+    return volumeArgs.getOwnerName();
+  }
+
   protected static Optional<FileEncryptionInfo> getFileEncryptionInfo(
       OzoneManager ozoneManager, OmBucketInfo bucketInfo) throws IOException {
     Optional<FileEncryptionInfo> encInfo = Optional.absent();
@@ -437,6 +450,27 @@
   }
 
   /**
+   * Check Acls for the ozone key with volumeOwner.
+   * @param ozoneManager
+   * @param volume
+   * @param bucket
+   * @param key
+   * @param aclType
+   * @param resourceType
+   * @throws IOException
+   */
+  @SuppressWarnings("parameternumber")
+  protected void checkKeyAcls(OzoneManager ozoneManager, String volume,
+      String bucket, String key, IAccessAuthorizer.ACLType aclType,
+      OzoneObj.ResourceType resourceType, String volumeOwner)
+      throws IOException {
+    if (ozoneManager.getAclsEnabled()) {
+      checkAcls(ozoneManager, resourceType, OzoneObj.StoreType.OZONE, aclType,
+          volume, bucket, key, volumeOwner);
+    }
+  }
+
+  /**
    * Check ACLs for Ozone Key in OpenKey table
    * if ozone native authorizer is enabled.
    * @param ozoneManager
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
index c6e7b9b..71e15f5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
@@ -124,6 +124,7 @@
           volumeName, bucketName);
       // Validate bucket and volume exists or not.
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+      String volumeOwner = getVolumeOwner(omMetadataManager, volumeName);
 
       for (indexFailed = 0; indexFailed < length; indexFailed++) {
         String keyName = deleteKeyArgs.getKeys(indexFailed);
@@ -143,7 +144,8 @@
         try {
           // check Acl
           checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY,
+              volumeOwner);
           omKeyInfoList.add(omKeyInfo);
         } catch (Exception ex) {
           deleteStatus = false;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
index abaa4ae..556c6f5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
@@ -117,6 +117,9 @@
           omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
               volumeName, bucketName);
 
+      // Validate bucket and volume exists or not.
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+      String volumeOwner = getVolumeOwner(omMetadataManager, volumeName);
       for (RenameKeysMap renameKey : renameKeysArgs.getRenameKeysMapList()) {
 
         fromKeyName = renameKey.getFromKeyName();
@@ -137,9 +140,11 @@
           // check Acls to see if user has access to perform delete operation
           // on old key and create operation on new key
           checkKeyAcls(ozoneManager, volumeName, bucketName, fromKeyName,
-              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY,
+              volumeOwner);
           checkKeyAcls(ozoneManager, volumeName, bucketName, toKeyName,
-              IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
+              IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY,
+              volumeOwner);
         } catch (Exception ex) {
           renameStatus = false;
           unRenamedKeys.add(
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequest.java
index 9fae498..68d621d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequest.java
@@ -95,6 +95,21 @@
       operationResult = apply(omKeyInfo, trxnLogIndex);
       omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
 
+      // Update the modification time when updating ACLs of Key.
+      long modificationTime = omKeyInfo.getModificationTime();
+      if (getOmRequest().getAddAclRequest().hasObj() && operationResult) {
+        modificationTime = getOmRequest().getAddAclRequest()
+            .getModificationTime();
+      } else if (getOmRequest().getSetAclRequest().hasObj()){
+        modificationTime = getOmRequest().getSetAclRequest()
+            .getModificationTime();
+      } else if (getOmRequest().getRemoveAclRequest().hasObj()
+          && operationResult) {
+        modificationTime = getOmRequest().getRemoveAclRequest()
+            .getModificationTime();
+      }
+      omKeyInfo.setModificationTime(modificationTime);
+
       // update cache.
       omMetadataManager.getKeyTable().addCacheEntry(
           new CacheKey<>(dbKey),
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java
index 3697cb8..c475d6e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java
@@ -23,10 +23,12 @@
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.key.acl.OMKeyAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +45,19 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(OMKeyAddAclRequest.class);
 
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    long modificationTime = Time.now();
+    OzoneManagerProtocolProtos.AddAclRequest.Builder addAclRequestBuilder =
+        getOmRequest().getAddAclRequest().toBuilder()
+            .setModificationTime(modificationTime);
+
+    return getOmRequest().toBuilder()
+        .setAddAclRequest(addAclRequestBuilder)
+        .setUserInfo(getUserInfo())
+        .build();
+  }
+
   private String path;
   private List<OzoneAcl> ozoneAcls;
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java
index f0d13be..d9fbc35 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java
@@ -23,10 +23,12 @@
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.key.acl.OMKeyAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +45,19 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(OMKeyRemoveAclRequest.class);
 
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    long modificationTime = Time.now();
+    OzoneManagerProtocolProtos.RemoveAclRequest.Builder removeAclRequestBuilder
+        = getOmRequest().getRemoveAclRequest().toBuilder()
+            .setModificationTime(modificationTime);
+
+    return getOmRequest().toBuilder()
+        .setRemoveAclRequest(removeAclRequestBuilder)
+        .setUserInfo(getUserInfo())
+        .build();
+  }
+
   private String path;
   private List<OzoneAcl> ozoneAcls;
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java
index 6d904e6..e07ac09 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java
@@ -23,11 +23,13 @@
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.key.acl.OMKeyAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +46,19 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(OMKeySetAclRequest.class);
 
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    long modificationTime = Time.now();
+    OzoneManagerProtocolProtos.SetAclRequest.Builder setAclRequestBuilder =
+        getOmRequest().getSetAclRequest().toBuilder()
+            .setModificationTime(modificationTime);
+
+    return getOmRequest().toBuilder()
+        .setSetAclRequest(setAclRequestBuilder)
+        .setUserInfo(getUserInfo())
+        .build();
+  }
+
   private String path;
   private List<OzoneAcl> ozoneAcls;
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java
index e928402..6fbd7d2 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java
@@ -29,7 +29,9 @@
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.ObjectParser;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
@@ -71,6 +73,11 @@
         (PrefixManagerImpl) ozoneManager.getPrefixManager();
     try {
       String prefixPath = getOzoneObj().getPath();
+      ObjectParser objectParser = new ObjectParser(prefixPath,
+          OzoneManagerProtocolProtos.OzoneObj.ObjectType.PREFIX);
+      volume = objectParser.getVolume();
+      bucket = objectParser.getBucket();
+      key = objectParser.getKey();
 
       // check Acl
       if (ozoneManager.getAclsEnabled()) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
index c12cdac..9b82702 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
@@ -43,7 +43,6 @@
   public ObjectParser(String path, ObjectType objectType) throws OMException {
     Preconditions.checkNotNull(path);
     String[] tokens = StringUtils.split(path, OZONE_URI_DELIMITER, 3);
-
     if (objectType == ObjectType.VOLUME && tokens.length == 1) {
       volume = tokens[0];
     } else if (objectType == ObjectType.BUCKET && tokens.length == 2) {
@@ -53,6 +52,11 @@
       volume = tokens[0];
       bucket = tokens[1];
       key = tokens[2];
+    } else if (objectType == ObjectType.PREFIX && tokens.length >= 1) {
+      volume = tokens[0];
+      if (tokens.length >= 2) {
+        bucket = tokens[1];
+      }
     } else {
       throw new OMException("Illegal path " + path,
           OMException.ResultCodes.INVALID_PATH_IN_ACL_REQUEST);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java
index 9c81c36..cba8bff 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java
@@ -31,6 +31,8 @@
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.storage.proto.
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +54,6 @@
     .OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .VolumeInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserVolumeInfo;
 import org.apache.hadoop.util.Time;
 
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
@@ -148,7 +149,7 @@
 
       String dbVolumeKey = omMetadataManager.getVolumeKey(volume);
 
-      UserVolumeInfo volumeList = null;
+      PersistedUserVolumeInfo volumeList = null;
       if (omMetadataManager.getVolumeTable().isExist(dbVolumeKey)) {
         LOG.debug("volume:{} already exists", omVolumeArgs.getVolume());
         throw new OMException("Volume already exists",
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
index ce93e26..544ca54 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
@@ -24,6 +24,7 @@
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +38,6 @@
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.response.volume.OMVolumeDeleteResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .DeleteVolumeRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -111,7 +111,7 @@
         throw new OMException(OMException.ResultCodes.VOLUME_NOT_EMPTY);
       }
 
-      OzoneManagerProtocolProtos.UserVolumeInfo newVolumeList =
+      OzoneManagerStorageProtos.PersistedUserVolumeInfo newVolumeList =
           omMetadataManager.getUserTable().get(owner);
 
       // delete the volume from the owner list
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeRequest.java
index 4c481a1..a3df235 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeRequest.java
@@ -25,8 +25,8 @@
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .UserVolumeInfo;
+import org.apache.hadoop.ozone.storage.proto.
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.slf4j.Logger;
@@ -60,8 +60,9 @@
    * @return UserVolumeInfo - updated UserVolumeInfo.
    * @throws IOException
    */
-  protected UserVolumeInfo delVolumeFromOwnerList(UserVolumeInfo volumeList,
-      String volume, String owner, long txID) throws IOException {
+  protected PersistedUserVolumeInfo delVolumeFromOwnerList(
+      PersistedUserVolumeInfo volumeList, String volume,
+      String owner, long txID) throws IOException {
 
     List<String> prevVolList = new ArrayList<>();
 
@@ -75,7 +76,7 @@
 
     // Remove the volume from the list
     prevVolList.remove(volume);
-    UserVolumeInfo newVolList = UserVolumeInfo.newBuilder()
+    PersistedUserVolumeInfo newVolList = PersistedUserVolumeInfo.newBuilder()
         .addAllVolumeNames(prevVolList)
             .setObjectID(volumeList.getObjectID())
             .setUpdateID(txID)
@@ -95,9 +96,9 @@
    * @throws OMException - if user has volumes greater than
    * maxUserVolumeCount, an exception is thrown.
    */
-  protected UserVolumeInfo addVolumeToOwnerList(UserVolumeInfo volumeList,
-      String volume, String owner, long maxUserVolumeCount, long txID)
-      throws IOException {
+  protected PersistedUserVolumeInfo addVolumeToOwnerList(
+      PersistedUserVolumeInfo volumeList, String volume, String owner,
+      long maxUserVolumeCount, long txID) throws IOException {
 
     // Check the volume count
     if (volumeList != null &&
@@ -114,7 +115,7 @@
     }
 
     volumeSet.add(volume);
-    return UserVolumeInfo.newBuilder()
+    return PersistedUserVolumeInfo.newBuilder()
         .setObjectID(objectID)
         .setUpdateID(txID)
         .addAllVolumeNames(volumeSet).build();
@@ -131,8 +132,9 @@
    * @param transactionLogIndex
    * @throws IOException
    */
-  protected void createVolume(final OMMetadataManager omMetadataManager,
-      OmVolumeArgs omVolumeArgs, UserVolumeInfo volumeList, String dbVolumeKey,
+  protected void createVolume(
+      final OMMetadataManager omMetadataManager, OmVolumeArgs omVolumeArgs,
+      PersistedUserVolumeInfo volumeList, String dbVolumeKey,
       String dbUserKey, long transactionLogIndex) {
     // Update cache: Update user and volume cache.
     omMetadataManager.getUserTable().addCacheEntry(new CacheKey<>(dbUserKey),
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java
index 8bc8842..9e3e99a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java
@@ -40,6 +40,7 @@
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetVolumePropertyResponse;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -118,8 +119,8 @@
       }
 
       long maxUserVolumeCount = ozoneManager.getMaxUserVolumeCount();
-      OzoneManagerProtocolProtos.UserVolumeInfo oldOwnerVolumeList = null;
-      OzoneManagerProtocolProtos.UserVolumeInfo newOwnerVolumeList = null;
+      OzoneManagerStorageProtos.PersistedUserVolumeInfo oldOwnerVolumeList;
+      OzoneManagerStorageProtos.PersistedUserVolumeInfo newOwnerVolumeList;
       OmVolumeArgs omVolumeArgs = null;
 
       acquiredVolumeLock = omMetadataManager.getLock().acquireWriteLock(
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAclRequest.java
index de7f0c0..c3d2620 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAclRequest.java
@@ -94,6 +94,20 @@
 
       // Update only when
       if (applyAcl) {
+        // Update the modification time when updating ACLs of Volume.
+        long modificationTime = omVolumeArgs.getModificationTime();
+        if (getOmRequest().getAddAclRequest().hasObj()) {
+          modificationTime = getOmRequest().getAddAclRequest()
+              .getModificationTime();
+        } else if (getOmRequest().getSetAclRequest().hasObj()){
+          modificationTime = getOmRequest().getSetAclRequest()
+              .getModificationTime();
+        } else if (getOmRequest().getRemoveAclRequest().hasObj()) {
+          modificationTime = getOmRequest().getRemoveAclRequest()
+              .getModificationTime();
+        }
+        omVolumeArgs.setModificationTime(modificationTime);
+
         omVolumeArgs.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
 
         // update cache.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAddAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAddAclRequest.java
index 12008e2..15f0c67 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAddAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAddAclRequest.java
@@ -21,6 +21,7 @@
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -28,6 +29,7 @@
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +50,19 @@
     volumeAddAclOp = (acls, volArgs) -> volArgs.addAcl(acls.get(0));
   }
 
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    long modificationTime = Time.now();
+    OzoneManagerProtocolProtos.AddAclRequest.Builder addAclRequestBuilder =
+        getOmRequest().getAddAclRequest().toBuilder()
+            .setModificationTime(modificationTime);
+
+    return getOmRequest().toBuilder()
+        .setAddAclRequest(addAclRequestBuilder)
+        .setUserInfo(getUserInfo())
+        .build();
+  }
+
   private List<OzoneAcl> ozoneAcls;
   private String volumeName;
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeRemoveAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeRemoveAclRequest.java
index 461ad48..ea03503 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeRemoveAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeRemoveAclRequest.java
@@ -21,6 +21,7 @@
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -28,6 +29,7 @@
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +50,19 @@
     volumeRemoveAclOp = (acls, volArgs) -> volArgs.removeAcl(acls.get(0));
   }
 
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    long modificationTime = Time.now();
+    OzoneManagerProtocolProtos.RemoveAclRequest.Builder removeAclRequestBuilder
+        = getOmRequest().getRemoveAclRequest().toBuilder()
+            .setModificationTime(modificationTime);
+
+    return getOmRequest().toBuilder()
+        .setRemoveAclRequest(removeAclRequestBuilder)
+        .setUserInfo(getUserInfo())
+        .build();
+  }
+
   private List<OzoneAcl> ozoneAcls;
   private String volumeName;
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeSetAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeSetAclRequest.java
index c73e19e..787cf76 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeSetAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeSetAclRequest.java
@@ -20,6 +20,7 @@
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -27,6 +28,7 @@
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +50,19 @@
     volumeSetAclOp = (acls, volArgs) -> volArgs.setAcls(acls);
   }
 
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    long modificationTime = Time.now();
+    OzoneManagerProtocolProtos.SetAclRequest.Builder setAclRequestBuilder =
+        getOmRequest().getSetAclRequest().toBuilder()
+            .setModificationTime(modificationTime);
+
+    return getOmRequest().toBuilder()
+        .setSetAclRequest(setAclRequestBuilder)
+        .setUserInfo(getUserInfo())
+        .build();
+  }
+
   private List<OzoneAcl> ozoneAcls;
   private String volumeName;
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeCreateResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeCreateResponse.java
index 1b8e26e..b48d770 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeCreateResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeCreateResponse.java
@@ -27,7 +27,8 @@
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserVolumeInfo;
+import org.apache.hadoop.ozone.storage.proto.
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 
@@ -41,12 +42,12 @@
 @CleanupTableInfo(cleanupTables = VOLUME_TABLE)
 public class OMVolumeCreateResponse extends OMClientResponse {
 
-  private UserVolumeInfo userVolumeInfo;
+  private PersistedUserVolumeInfo userVolumeInfo;
   private OmVolumeArgs omVolumeArgs;
 
   public OMVolumeCreateResponse(@Nonnull OMResponse omResponse,
       @Nonnull OmVolumeArgs omVolumeArgs,
-      @Nonnull UserVolumeInfo userVolumeInfo) {
+      @Nonnull PersistedUserVolumeInfo userVolumeInfo) {
     super(omResponse);
     this.omVolumeArgs = omVolumeArgs;
     this.userVolumeInfo = userVolumeInfo;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeDeleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeDeleteResponse.java
index db43fa6..5f14aa5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeDeleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeDeleteResponse.java
@@ -25,8 +25,8 @@
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .UserVolumeInfo;
+import org.apache.hadoop.ozone.storage.proto.
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 
 import javax.annotation.Nonnull;
@@ -40,11 +40,11 @@
 public class OMVolumeDeleteResponse extends OMClientResponse {
   private String volume;
   private String owner;
-  private UserVolumeInfo updatedVolumeList;
+  private PersistedUserVolumeInfo updatedVolumeList;
 
   public OMVolumeDeleteResponse(@Nonnull OMResponse omResponse,
       @Nonnull String volume, @Nonnull String owner,
-      @Nonnull UserVolumeInfo updatedVolumeList) {
+      @Nonnull PersistedUserVolumeInfo updatedVolumeList) {
     super(omResponse);
     this.volume = volume;
     this.owner = owner;
@@ -65,7 +65,7 @@
       BatchOperation batchOperation) throws IOException {
 
     String dbUserKey = omMetadataManager.getUserKey(owner);
-    UserVolumeInfo volumeList = updatedVolumeList;
+    PersistedUserVolumeInfo volumeList = updatedVolumeList;
     if (updatedVolumeList.getVolumeNamesList().size() == 0) {
       omMetadataManager.getUserTable().deleteWithBatch(batchOperation,
           dbUserKey);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeSetOwnerResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeSetOwnerResponse.java
index a1efe70..4e663aa 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeSetOwnerResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeSetOwnerResponse.java
@@ -26,8 +26,8 @@
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .UserVolumeInfo;
+import org.apache.hadoop.ozone.storage.proto.
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
@@ -42,13 +42,14 @@
 @CleanupTableInfo(cleanupTables = {VOLUME_TABLE})
 public class OMVolumeSetOwnerResponse extends OMClientResponse {
   private String oldOwner;
-  private UserVolumeInfo oldOwnerVolumeList;
-  private UserVolumeInfo newOwnerVolumeList;
+  private PersistedUserVolumeInfo oldOwnerVolumeList;
+  private PersistedUserVolumeInfo newOwnerVolumeList;
   private OmVolumeArgs newOwnerVolumeArgs;
 
   public OMVolumeSetOwnerResponse(@Nonnull OMResponse omResponse,
-      @Nonnull String oldOwner, @Nonnull UserVolumeInfo oldOwnerVolumeList,
-      @Nonnull UserVolumeInfo newOwnerVolumeList,
+      @Nonnull String oldOwner,
+      @Nonnull PersistedUserVolumeInfo oldOwnerVolumeList,
+      @Nonnull PersistedUserVolumeInfo newOwnerVolumeList,
       @Nonnull OmVolumeArgs newOwnerVolumeArgs) {
     super(omResponse);
     this.oldOwner = oldOwner;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
index df98e20..aebefdc 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
@@ -50,6 +50,7 @@
   private KeyManager keyManager;
   private PrefixManager prefixManager;
   private Collection<String> ozAdmins;
+  private boolean allowListAllVolumes;
 
   public OzoneNativeAuthorizer() {
   }
@@ -87,14 +88,18 @@
           "configured to work with OzoneObjInfo type only.", INVALID_REQUEST);
     }
 
-    // by pass all checks for admin
+    // bypass all checks for admin
     boolean isAdmin = isAdmin(context.getClientUgi());
     if (isAdmin) {
       return true;
     }
 
+    boolean isOwner = isOwner(context.getClientUgi(), context.getOwnerName());
     boolean isListAllVolume = ((context.getAclRights() == ACLType.LIST) &&
         objInfo.getVolumeName().equals(OzoneConsts.OZONE_ROOT));
+    if (isListAllVolume) {
+      return getAllowListAllVolumes();
+    }
 
     // For CREATE and DELETE acl requests, the parents need to be checked
     // for WRITE acl. If Key create request is received, then we need to
@@ -114,13 +119,19 @@
     switch (objInfo.getResourceType()) {
     case VOLUME:
       LOG.trace("Checking access for volume: {}", objInfo);
-      if (isACLTypeCreate || isListAllVolume) {
+      if (isACLTypeCreate) {
         // only admin is allowed to create volume and list all volumes
         return false;
       }
-      return volumeManager.checkAccess(objInfo, context);
+      boolean volumeAccess =  isOwner ||
+          volumeManager.checkAccess(objInfo, context);
+      return volumeAccess;
     case BUCKET:
       LOG.trace("Checking access for bucket: {}", objInfo);
+      // Skip check for volume owner
+      if (isOwner) {
+        return true;
+      }
       // Skip bucket access check for CREATE acl since
       // bucket will not exist at the time of creation
       boolean bucketAccess = isACLTypeCreate
@@ -129,6 +140,10 @@
           && volumeManager.checkAccess(objInfo, parentContext));
     case KEY:
       LOG.trace("Checking access for Key: {}", objInfo);
+      // Skip check for volume owner
+      if (isOwner) {
+        return true;
+      }
       // Skip key access check for CREATE acl since
       // key will not exist at the time of creation
       boolean keyAccess = isACLTypeCreate
@@ -139,6 +154,10 @@
           && volumeManager.checkAccess(objInfo, parentContext));
     case PREFIX:
       LOG.trace("Checking access for Prefix: {}", objInfo);
+      // Skip check for volume owner
+      if (isOwner) {
+        return true;
+      }
       // Skip prefix access check for CREATE acl since
       // prefix will not exist at the time of creation
       boolean prefixAccess = isACLTypeCreate
@@ -176,6 +195,25 @@
     return Collections.unmodifiableCollection(this.ozAdmins);
   }
 
+  public void setAllowListAllVolumes(boolean allowListAllVolumes) {
+    this.allowListAllVolumes = allowListAllVolumes;
+  }
+
+  public boolean getAllowListAllVolumes() {
+    return allowListAllVolumes;
+  }
+
+  private boolean isOwner(UserGroupInformation callerUgi, String ownerName) {
+    if (ownerName == null) {
+      return false;
+    }
+    if (callerUgi.getUserName().equals(ownerName) ||
+        callerUgi.getShortUserName().equals(ownerName)) {
+      return true;
+    }
+    return false;
+  }
+
   private boolean isAdmin(UserGroupInformation callerUgi) {
     if (ozAdmins == null) {
       return false;
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
index 1be303c..ce1b2b6 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
@@ -65,6 +65,7 @@
 import org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType;
 
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
@@ -410,10 +411,11 @@
   public static void addUserToDB(String volumeName, String ownerName,
       OMMetadataManager omMetadataManager) throws Exception {
 
-    OzoneManagerProtocolProtos.UserVolumeInfo userVolumeInfo = omMetadataManager
-        .getUserTable().get(omMetadataManager.getUserKey(ownerName));
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo userVolumeInfo =
+        omMetadataManager.getUserTable().get(
+            omMetadataManager.getUserKey(ownerName));
     if (userVolumeInfo == null) {
-      userVolumeInfo = OzoneManagerProtocolProtos.UserVolumeInfo
+      userVolumeInfo = OzoneManagerStorageProtos.PersistedUserVolumeInfo
           .newBuilder()
           .addVolumeNames(volumeName)
           .setObjectID(1)
@@ -519,6 +521,66 @@
         .setSetAclRequest(setAclRequestBuilder.build()).build();
   }
 
+  // Create OMRequest for testing adding acl of bucket.
+  public static OMRequest createBucketAddAclRequest(String volumeName,
+      String bucketName, OzoneAcl acl) {
+    AddAclRequest.Builder addAclRequestBuilder = AddAclRequest.newBuilder();
+    addAclRequestBuilder.setObj(OzoneObj.toProtobuf(new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName).setBucketName(bucketName)
+        .setResType(ResourceType.BUCKET)
+        .setStoreType(StoreType.OZONE)
+        .build()));
+
+    if (acl != null) {
+      addAclRequestBuilder.setAcl(OzoneAcl.toProtobuf(acl));
+    }
+
+    return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
+        .setCmdType(OzoneManagerProtocolProtos.Type.AddAcl)
+        .setAddAclRequest(addAclRequestBuilder.build()).build();
+  }
+
+  // Create OMRequest for testing removing acl of bucket.
+  public static OMRequest createBucketRemoveAclRequest(String volumeName,
+      String bucketName, OzoneAcl acl) {
+    RemoveAclRequest.Builder removeAclRequestBuilder =
+        RemoveAclRequest.newBuilder();
+    removeAclRequestBuilder.setObj(OzoneObj.toProtobuf(
+        new OzoneObjInfo.Builder()
+            .setVolumeName(volumeName).setBucketName(bucketName)
+            .setResType(ResourceType.BUCKET)
+            .setStoreType(StoreType.OZONE)
+            .build()));
+
+    if (acl != null) {
+      removeAclRequestBuilder.setAcl(OzoneAcl.toProtobuf(acl));
+    }
+
+    return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
+        .setCmdType(OzoneManagerProtocolProtos.Type.RemoveAcl)
+        .setRemoveAclRequest(removeAclRequestBuilder.build()).build();
+  }
+
+  // Create OMRequest for testing setting acls of bucket.
+  public static OMRequest createBucketSetAclRequest(String volumeName,
+      String bucketName, List<OzoneAcl> acls) {
+    SetAclRequest.Builder setAclRequestBuilder = SetAclRequest.newBuilder();
+    setAclRequestBuilder.setObj(OzoneObj.toProtobuf(new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName).setBucketName(bucketName)
+        .setResType(ResourceType.BUCKET)
+        .setStoreType(StoreType.OZONE)
+        .build()));
+
+    if (acls != null) {
+      acls.forEach(
+          acl -> setAclRequestBuilder.addAcl(OzoneAcl.toProtobuf(acl)));
+    }
+
+    return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
+        .setCmdType(OzoneManagerProtocolProtos.Type.SetAcl)
+        .setSetAclRequest(setAclRequestBuilder.build()).build();
+  }
+
   /**
    * Deletes key from Key table and adds it to DeletedKeys table.
    * @return the deletedKey name
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/acl/TestOMBucketAddAclRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/acl/TestOMBucketAddAclRequest.java
new file mode 100644
index 0000000..c4d4cc2
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/acl/TestOMBucketAddAclRequest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.bucket.acl;
+
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.request.bucket.TestBucketRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Tests bucket addAcl request.
+ */
+public class TestOMBucketAddAclRequest extends TestBucketRequest {
+  @Test
+  public void testPreExecute() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    OzoneAcl acl = OzoneAcl.parseAcl("user:testUser:rw");
+
+    OMRequest originalRequest = TestOMRequestUtils
+        .createBucketAddAclRequest(volumeName, bucketName, acl);
+    long originModTime = originalRequest.getAddAclRequest()
+        .getModificationTime();
+
+    OMBucketAddAclRequest omBucketAddAclRequest =
+        new OMBucketAddAclRequest(originalRequest);
+    OMRequest preExecuteRequest = omBucketAddAclRequest
+        .preExecute(ozoneManager);
+    Assert.assertNotEquals(originalRequest, preExecuteRequest);
+
+    long newModTime = preExecuteRequest.getAddAclRequest()
+        .getModificationTime();
+    // When preExecute() of adding acl,
+    // the new modification time is greater than origin one.
+    Assert.assertTrue(newModTime > originModTime);
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheSuccess() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String ownerName = "testUser";
+
+    TestOMRequestUtils.addUserToDB(volumeName, ownerName, omMetadataManager);
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+
+    OzoneAcl acl = OzoneAcl.parseAcl("user:newUser:rw");
+
+    OMRequest originalRequest = TestOMRequestUtils.
+        createBucketAddAclRequest(volumeName, bucketName, acl);
+    OMBucketAddAclRequest omBucketAddAclRequest =
+        new OMBucketAddAclRequest(originalRequest);
+    omBucketAddAclRequest.preExecute(ozoneManager);
+
+    OMClientResponse omClientResponse = omBucketAddAclRequest
+        .validateAndUpdateCache(ozoneManager, 1,
+            ozoneManagerDoubleBufferHelper);
+    OMResponse omResponse = omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getAddAclResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omResponse.getStatus());
+
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    List<OzoneAcl> bucketAcls = omMetadataManager.getBucketTable()
+        .get(bucketKey).getAcls();
+
+    // Acl is added.
+    Assert.assertEquals(1, bucketAcls.size());
+    Assert.assertEquals(acl, bucketAcls.get(0));
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheWithBucketNotFound() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    OzoneAcl acl = OzoneAcl.parseAcl("user:newUser:rw");
+
+    OMRequest originalRequest = TestOMRequestUtils
+        .createBucketAddAclRequest(volumeName, bucketName, acl);
+    OMBucketAddAclRequest omBucketAddAclRequest =
+        new OMBucketAddAclRequest(originalRequest);
+    omBucketAddAclRequest.preExecute(ozoneManager);
+
+    OMClientResponse omClientResponse = omBucketAddAclRequest
+        .validateAndUpdateCache(ozoneManager, 1,
+            ozoneManagerDoubleBufferHelper);
+    OMResponse omResponse = omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getAddAclResponse());
+    // The bucket is not created.
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.BUCKET_NOT_FOUND,
+        omResponse.getStatus());
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/acl/TestOMBucketRemoveAclRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/acl/TestOMBucketRemoveAclRequest.java
new file mode 100644
index 0000000..eca281c
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/acl/TestOMBucketRemoveAclRequest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.bucket.acl;
+
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.request.bucket.TestBucketRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Tests bucket removeAcl request.
+ */
+public class TestOMBucketRemoveAclRequest extends TestBucketRequest {
+  @Test
+  public void testPreExecute() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    OzoneAcl acl = OzoneAcl.parseAcl("user:testUser:rw");
+
+    OMRequest originalRequest = TestOMRequestUtils
+        .createBucketRemoveAclRequest(volumeName, bucketName, acl);
+    long originModTime = originalRequest.getRemoveAclRequest()
+        .getModificationTime();
+
+    OMBucketRemoveAclRequest omBucketRemoveAclRequest =
+        new OMBucketRemoveAclRequest(originalRequest);
+    OMRequest preExecuteRequest = omBucketRemoveAclRequest
+        .preExecute(ozoneManager);
+    Assert.assertNotEquals(originalRequest, preExecuteRequest);
+
+    long newModTime = preExecuteRequest.getRemoveAclRequest()
+        .getModificationTime();
+    // When preExecute() of removing acl,
+    // the new modification time is greater than origin one.
+    Assert.assertTrue(newModTime > originModTime);
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheSuccess() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String ownerName = "testUser";
+
+    TestOMRequestUtils.addUserToDB(volumeName, ownerName, omMetadataManager);
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+
+    OzoneAcl acl = OzoneAcl.parseAcl("user:newUser:rw");
+
+    // Add acl
+    OMRequest addAclRequest = TestOMRequestUtils
+        .createBucketAddAclRequest(volumeName, bucketName, acl);
+    OMBucketAddAclRequest omBucketAddAclRequest =
+        new OMBucketAddAclRequest(addAclRequest);
+    omBucketAddAclRequest.preExecute(ozoneManager);
+    OMClientResponse omClientAddAclResponse = omBucketAddAclRequest
+        .validateAndUpdateCache(ozoneManager, 1,
+            ozoneManagerDoubleBufferHelper);
+    OMResponse omAddAclResponse = omClientAddAclResponse.getOMResponse();
+    Assert.assertNotNull(omAddAclResponse.getAddAclResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omAddAclResponse.getStatus());
+
+    // Verify result of adding acl.
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    List<OzoneAcl> bucketAcls = omMetadataManager.getBucketTable()
+        .get(bucketKey).getAcls();
+    Assert.assertEquals(1, bucketAcls.size());
+    Assert.assertEquals(acl, bucketAcls.get(0));
+
+    // Remove acl.
+    OMRequest removeAclRequest = TestOMRequestUtils
+        .createBucketRemoveAclRequest(volumeName, bucketName, acl);
+    OMBucketRemoveAclRequest omBucketRemoveAclRequest =
+        new OMBucketRemoveAclRequest(removeAclRequest);
+    omBucketRemoveAclRequest.preExecute(ozoneManager);
+    OMClientResponse omClientRemoveAclResponse = omBucketRemoveAclRequest
+        .validateAndUpdateCache(ozoneManager, 2,
+            ozoneManagerDoubleBufferHelper);
+    OMResponse omRemoveAclResponse = omClientRemoveAclResponse.getOMResponse();
+    Assert.assertNotNull(omRemoveAclResponse.getRemoveAclResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omRemoveAclResponse.getStatus());
+
+    // Verify result of removing acl.
+    List<OzoneAcl> newAcls = omMetadataManager.getBucketTable()
+        .get(bucketKey).getAcls();
+    Assert.assertEquals(0, newAcls.size());
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheWithBucketNotFound() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    OzoneAcl acl = OzoneAcl.parseAcl("user:newUser:rw");
+
+    OMRequest originalRequest = TestOMRequestUtils
+        .createBucketRemoveAclRequest(volumeName, bucketName, acl);
+    OMBucketRemoveAclRequest omBucketRemoveAclRequest =
+        new OMBucketRemoveAclRequest(originalRequest);
+    omBucketRemoveAclRequest.preExecute(ozoneManager);
+
+    OMClientResponse omClientResponse = omBucketRemoveAclRequest
+        .validateAndUpdateCache(ozoneManager, 1,
+            ozoneManagerDoubleBufferHelper);
+    OMResponse omResponse = omClientResponse.getOMResponse();
+
+    Assert.assertNotNull(omResponse.getRemoveAclResponse());
+    // The bucket is not created.
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.BUCKET_NOT_FOUND,
+        omResponse.getStatus());
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/acl/TestOMBucketSetAclRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/acl/TestOMBucketSetAclRequest.java
new file mode 100644
index 0000000..519d1dd
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/acl/TestOMBucketSetAclRequest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.bucket.acl;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.request.bucket.TestBucketRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Tests bucket setAcl request.
+ */
+public class TestOMBucketSetAclRequest extends TestBucketRequest {
+  @Test
+  public void testPreExecute() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    OzoneAcl acl = OzoneAcl.parseAcl("user:testUser:rw");
+
+    OMRequest originalRequest = TestOMRequestUtils
+        .createBucketSetAclRequest(volumeName, bucketName,
+            Lists.newArrayList(acl));
+    long originModTime = originalRequest.getSetAclRequest()
+        .getModificationTime();
+
+    OMBucketSetAclRequest omBucketSetAclRequest =
+        new OMBucketSetAclRequest(originalRequest);
+    OMRequest preExecuteRequest = omBucketSetAclRequest
+        .preExecute(ozoneManager);
+    Assert.assertNotEquals(originalRequest, preExecuteRequest);
+
+    long newModTime = preExecuteRequest.getSetAclRequest()
+        .getModificationTime();
+    // When preExecute() of setting acl,
+    // the new modification time is greater than origin one.
+    Assert.assertTrue(newModTime > originModTime);
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheSuccess() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String ownerName = "owner";
+
+    TestOMRequestUtils.addUserToDB(volumeName, ownerName, omMetadataManager);
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+
+    OzoneAcl userAcl = OzoneAcl.parseAcl("user:newUser:rw");
+    OzoneAcl groupAcl = OzoneAcl.parseAcl("group:newGroup:rw");
+    List<OzoneAcl> acls = Lists.newArrayList(userAcl, groupAcl);
+
+    OMRequest originalRequest = TestOMRequestUtils
+        .createBucketSetAclRequest(volumeName, bucketName, acls);
+    OMBucketSetAclRequest omBucketSetAclRequest =
+        new OMBucketSetAclRequest(originalRequest);
+    omBucketSetAclRequest.preExecute(ozoneManager);
+
+    OMClientResponse omClientResponse = omBucketSetAclRequest
+        .validateAndUpdateCache(ozoneManager, 1,
+            ozoneManagerDoubleBufferHelper);
+    OMResponse omResponse = omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getSetAclResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omResponse.getStatus());
+
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+    List<OzoneAcl> bucketAclList = omMetadataManager.getBucketTable()
+        .get(bucketKey).getAcls();
+
+    // Acls are added to acl list.
+    Assert.assertEquals(acls.size(), bucketAclList.size());
+    Assert.assertEquals(userAcl, bucketAclList.get(0));
+    Assert.assertEquals(groupAcl, bucketAclList.get(1));
+
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheWithBucketNotFound() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    OzoneAcl acl = OzoneAcl.parseAcl("user:newUser:rw");
+
+    OMRequest originalRequest = TestOMRequestUtils
+        .createBucketSetAclRequest(volumeName, bucketName,
+            Lists.newArrayList(acl));
+    OMBucketSetAclRequest omBucketSetAclRequest =
+        new OMBucketSetAclRequest(originalRequest);
+    omBucketSetAclRequest.preExecute(ozoneManager);
+
+    OMClientResponse omClientResponse = omBucketSetAclRequest
+        .validateAndUpdateCache(ozoneManager, 1,
+            ozoneManagerDoubleBufferHelper);
+    OMResponse omResponse = omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getSetAclResponse());
+    // The bucket is not created.
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.BUCKET_NOT_FOUND,
+        omResponse.getStatus());
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/acl/package-info.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/acl/package-info.java
new file mode 100644
index 0000000..a7cf8b3
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/acl/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package contains test classes for bucket acl requests.
+ */
+package org.apache.hadoop.ozone.om.request.bucket.acl;
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyAclRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyAclRequest.java
index 5f704d3..470cf60 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyAclRequest.java
@@ -17,14 +17,20 @@
  */
 package org.apache.hadoop.ozone.om.request.key;
 
+import java.util.List;
 import java.util.UUID;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequest;
+import org.apache.hadoop.ozone.om.request.key.acl.OMKeyRemoveAclRequest;
+import org.apache.hadoop.ozone.om.request.key.acl.OMKeySetAclRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RemoveAclRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.junit.Assert;
@@ -36,7 +42,7 @@
 public class TestOMKeyAclRequest extends TestOMKeyRequest {
 
   @Test
-  public void testAclRequest() throws Exception {
+  public void testKeyAddAclRequest() throws Exception {
     // Manually add volume, bucket and key to DB
     TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
         omMetadataManager);
@@ -50,7 +56,15 @@
     OMRequest originalRequest = createAddAclkeyRequest(acl);
     OMKeyAddAclRequest omKeyAddAclRequest = new OMKeyAddAclRequest(
         originalRequest);
-    omKeyAddAclRequest.preExecute(ozoneManager);
+    OMRequest preExecuteRequest = omKeyAddAclRequest.preExecute(ozoneManager);
+
+    // When preExecute() of adding acl,
+    // the new modification time is greater than origin one.
+    long originModTime = originalRequest.getAddAclRequest()
+        .getModificationTime();
+    long newModTime = preExecuteRequest.getAddAclRequest()
+        .getModificationTime();
+    Assert.assertTrue(newModTime > originModTime);
 
     // Execute original request
     OMClientResponse omClientResponse = omKeyAddAclRequest
@@ -61,6 +75,105 @@
 
   }
 
+  @Test
+  public void testKeyRemoveAclRequest() throws Exception {
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+    TestOMRequestUtils.addKeyToTable(false, false, volumeName, bucketName,
+        keyName, clientID, replicationType, replicationFactor, 1L,
+        omMetadataManager);
+
+    OzoneAcl acl = OzoneAcl.parseAcl("user:bilbo:rwdlncxy[ACCESS]");
+
+    // Add acl.
+    OMRequest addAclRequest = createAddAclkeyRequest(acl);
+    OMKeyAddAclRequest omKeyAddAclRequest =
+        new OMKeyAddAclRequest(addAclRequest);
+    omKeyAddAclRequest.preExecute(ozoneManager);
+    OMClientResponse omClientAddAclResponse = omKeyAddAclRequest
+        .validateAndUpdateCache(ozoneManager, 1,
+            ozoneManagerDoubleBufferHelper);
+    OMResponse omAddAclResponse = omClientAddAclResponse.getOMResponse();
+    Assert.assertNotNull(omAddAclResponse.getAddAclResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omAddAclResponse.getStatus());
+
+    // Verify result of adding acl.
+    String ozoneKey = omMetadataManager
+        .getOzoneKey(volumeName, bucketName, keyName);
+    List<OzoneAcl> keyAcls = omMetadataManager.getKeyTable().get(ozoneKey)
+        .getAcls();
+    Assert.assertEquals(1, keyAcls.size());
+    Assert.assertEquals(acl, keyAcls.get(0));
+
+    // Remove acl.
+    OMRequest removeAclRequest = createRemoveAclKeyRequest(acl);
+    OMKeyRemoveAclRequest omKeyRemoveAclRequest =
+        new OMKeyRemoveAclRequest(removeAclRequest);
+    OMRequest preExecuteRequest = omKeyRemoveAclRequest
+        .preExecute(ozoneManager);
+
+    // When preExecute() of removing acl,
+    // the new modification time is greater than origin one.
+    long originModTime = removeAclRequest.getRemoveAclRequest()
+        .getModificationTime();
+    long newModTime = preExecuteRequest.getRemoveAclRequest()
+        .getModificationTime();
+    Assert.assertTrue(newModTime > originModTime);
+
+    OMClientResponse omClientRemoveAclResponse = omKeyRemoveAclRequest
+        .validateAndUpdateCache(ozoneManager, 2,
+            ozoneManagerDoubleBufferHelper);
+    OMResponse omRemoveAclResponse = omClientRemoveAclResponse.getOMResponse();
+    Assert.assertNotNull(omRemoveAclResponse.getRemoveAclResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omRemoveAclResponse.getStatus());
+
+    // Verify result of removing acl.
+    List<OzoneAcl> newAcls = omMetadataManager.getKeyTable().get(ozoneKey)
+        .getAcls();
+    Assert.assertEquals(0, newAcls.size());
+  }
+
+  @Test
+  public void testKeySetAclRequest() throws Exception {
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+    TestOMRequestUtils.addKeyToTable(false, false, volumeName, bucketName,
+        keyName, clientID, replicationType, replicationFactor, 1L,
+        omMetadataManager);
+
+    OzoneAcl acl = OzoneAcl.parseAcl("user:bilbo:rwdlncxy[ACCESS]");
+
+    OMRequest setAclRequest = createSetAclKeyRequest(acl);
+    OMKeySetAclRequest omKeySetAclRequest =
+        new OMKeySetAclRequest(setAclRequest);
+    OMRequest preExecuteRequest = omKeySetAclRequest.preExecute(ozoneManager);
+
+    // When preExecute() of setting acl,
+    // the new modification time is greater than origin one.
+    long originModTime = setAclRequest.getSetAclRequest()
+        .getModificationTime();
+    long newModTime = preExecuteRequest.getSetAclRequest()
+        .getModificationTime();
+    Assert.assertTrue(newModTime > originModTime);
+
+    OMClientResponse omClientResponse = omKeySetAclRequest
+        .validateAndUpdateCache(ozoneManager, 1,
+            ozoneManagerDoubleBufferHelper);
+    OMResponse omSetAclResponse = omClientResponse.getOMResponse();
+    Assert.assertNotNull(omSetAclResponse.getSetAclResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omSetAclResponse.getStatus());
+
+    // Verify result of setting acl.
+    String ozoneKey = omMetadataManager
+        .getOzoneKey(volumeName, bucketName, keyName);
+    List<OzoneAcl> newAcls = omMetadataManager.getKeyTable().get(ozoneKey)
+        .getAcls();
+    Assert.assertEquals(newAcls.get(0), acl);
+  }
+
   /**
    * Create OMRequest which encapsulates OMKeyAddAclRequest.
    */
@@ -82,4 +195,42 @@
         .setAddAclRequest(addAclRequest)
         .build();
   }
+
+  private OMRequest createRemoveAclKeyRequest(OzoneAcl acl) {
+    OzoneObj obj = OzoneObjInfo.Builder.newBuilder()
+        .setBucketName(bucketName)
+        .setVolumeName(volumeName)
+        .setKeyName(keyName)
+        .setResType(OzoneObj.ResourceType.KEY)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+    RemoveAclRequest removeAclRequest = RemoveAclRequest.newBuilder()
+        .setObj(OzoneObj.toProtobuf(obj))
+        .setAcl(OzoneAcl.toProtobuf(acl))
+        .build();
+
+    return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
+        .setCmdType(OzoneManagerProtocolProtos.Type.RemoveAcl)
+        .setRemoveAclRequest(removeAclRequest)
+        .build();
+  }
+
+  private OMRequest createSetAclKeyRequest(OzoneAcl acl) {
+    OzoneObj obj = OzoneObjInfo.Builder.newBuilder()
+        .setBucketName(bucketName)
+        .setVolumeName(volumeName)
+        .setKeyName(keyName)
+        .setResType(OzoneObj.ResourceType.KEY)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+    SetAclRequest setAclRequest = SetAclRequest.newBuilder()
+        .setObj(OzoneObj.toProtobuf(obj))
+        .addAcl(OzoneAcl.toProtobuf(acl))
+        .build();
+
+    return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
+        .setCmdType(OzoneManagerProtocolProtos.Type.SetAcl)
+        .setSetAclRequest(setAclRequest)
+        .build();
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeCreateRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeCreateRequest.java
index 4ac1f49..a8cccad 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeCreateRequest.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.response.volume.OMVolumeCreateResponse;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.Assert;
@@ -151,8 +152,8 @@
     Assert.assertEquals(volumeInfo.getCreationTime(),
         omVolumeArgs.getCreationTime());
 
-    OzoneManagerProtocolProtos.UserVolumeInfo userVolumeInfo = omMetadataManager
-        .getUserTable().get(ownerKey);
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo userVolumeInfo =
+        omMetadataManager.getUserTable().get(ownerKey);
     Assert.assertNotNull(userVolumeInfo);
     Assert.assertEquals(volumeName, userVolumeInfo.getVolumeNames(0));
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeSetOwnerRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeSetOwnerRequest.java
index cedad4b..9cd04f6 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeSetOwnerRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeSetOwnerRequest.java
@@ -23,6 +23,7 @@
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -99,15 +100,14 @@
         .get(volumeKey).getModificationTime();
     Assert.assertTrue(modificationTime > creationTime);
 
-
-    OzoneManagerProtocolProtos.UserVolumeInfo newOwnerVolumeList =
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo newOwnerVolumeList =
         omMetadataManager.getUserTable().get(newOwnerKey);
 
     Assert.assertNotNull(newOwnerVolumeList);
     Assert.assertEquals(volumeName,
         newOwnerVolumeList.getVolumeNamesList().get(0));
 
-    OzoneManagerProtocolProtos.UserVolumeInfo oldOwnerVolumeList =
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo oldOwnerVolumeList =
         omMetadataManager.getUserTable().get(
             omMetadataManager.getUserKey(ownerKey));
 
@@ -203,7 +203,7 @@
     Assert.assertFalse(omClientResponse.getOMResponse().getSuccess());
 
     // Check volume names list
-    OzoneManagerProtocolProtos.UserVolumeInfo userVolumeInfo =
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo userVolumeInfo =
         omMetadataManager.getUserTable().get(newOwner);
     Assert.assertNotNull(userVolumeInfo);
     List<String> volumeNamesList = userVolumeInfo.getVolumeNamesList();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/acl/TestOMVolumeAddAclRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/acl/TestOMVolumeAddAclRequest.java
index 66a122f..f324740 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/acl/TestOMVolumeAddAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/acl/TestOMVolumeAddAclRequest.java
@@ -44,6 +44,8 @@
     OzoneAcl acl = OzoneAcl.parseAcl("user:bilbo:rw");
     OMRequest originalRequest =
         TestOMRequestUtils.createVolumeAddAclRequest(volumeName, acl);
+    long originModTime = originalRequest.getAddAclRequest()
+        .getModificationTime();
 
     OMVolumeAddAclRequest omVolumeAddAclRequest =
         new OMVolumeAddAclRequest(originalRequest);
@@ -51,6 +53,11 @@
     OMRequest modifiedRequest = omVolumeAddAclRequest.preExecute(
         ozoneManager);
     Assert.assertNotEquals(modifiedRequest, originalRequest);
+
+    long newModTime = modifiedRequest.getAddAclRequest().getModificationTime();
+    // When preExecute() of adding acl,
+    // the new modification time is greater than origin one.
+    Assert.assertTrue(newModTime > originModTime);
   }
 
   @Test
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/acl/TestOMVolumeRemoveAclRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/acl/TestOMVolumeRemoveAclRequest.java
index b2eb0bf..390d547 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/acl/TestOMVolumeRemoveAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/acl/TestOMVolumeRemoveAclRequest.java
@@ -43,6 +43,8 @@
     OzoneAcl acl = OzoneAcl.parseAcl("user:bilbo:rw");
     OMRequest originalRequest =
         TestOMRequestUtils.createVolumeRemoveAclRequest(volumeName, acl);
+    long originModTime = originalRequest.getRemoveAclRequest()
+        .getModificationTime();
 
     OMVolumeRemoveAclRequest omVolumeRemoveAclRequest =
         new OMVolumeRemoveAclRequest(originalRequest);
@@ -50,6 +52,12 @@
     OMRequest modifiedRequest = omVolumeRemoveAclRequest.preExecute(
         ozoneManager);
     Assert.assertNotEquals(modifiedRequest, originalRequest);
+
+    long newModTime = modifiedRequest.getRemoveAclRequest()
+        .getModificationTime();
+    // When preExecute() of removing acl,
+    // the new modification time is greater than origin one.
+    Assert.assertTrue(newModTime > originModTime);
   }
 
   @Test
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/acl/TestOMVolumeSetAclRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/acl/TestOMVolumeSetAclRequest.java
index 087ba71..d374e47 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/acl/TestOMVolumeSetAclRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/acl/TestOMVolumeSetAclRequest.java
@@ -49,6 +49,8 @@
     OMRequest originalRequest =
         TestOMRequestUtils.createVolumeSetAclRequest(volumeName,
             Lists.newArrayList(acl));
+    long originModTime = originalRequest.getSetAclRequest()
+        .getModificationTime();
 
     OMVolumeSetAclRequest omVolumeSetAclRequest =
         new OMVolumeSetAclRequest(originalRequest);
@@ -56,6 +58,11 @@
     OMRequest modifiedRequest = omVolumeSetAclRequest.preExecute(
         ozoneManager);
     Assert.assertNotEquals(modifiedRequest, originalRequest);
+
+    long newModTime = modifiedRequest.getSetAclRequest().getModificationTime();
+    // When preExecute() of setting acl,
+    // the new modification time is greater than origin one.
+    Assert.assertTrue(newModTime > originModTime);
   }
 
   @Test
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeCreateResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeCreateResponse.java
index 6c93d37..9cfd21d 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeCreateResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeCreateResponse.java
@@ -28,8 +28,8 @@
     .CreateVolumeResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .UserVolumeInfo;
+import org.apache.hadoop.ozone.storage.proto.
+    OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.junit.After;
@@ -76,7 +76,7 @@
 
     String volumeName = UUID.randomUUID().toString();
     String userName = "user1";
-    UserVolumeInfo volumeList = UserVolumeInfo.newBuilder()
+    PersistedUserVolumeInfo volumeList = PersistedUserVolumeInfo.newBuilder()
         .setObjectID(1).setUpdateID(1)
         .addVolumeNames(volumeName).build();
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeDeleteResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeDeleteResponse.java
index ae8a8c8..f956b0d 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeDeleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeDeleteResponse.java
@@ -28,8 +28,7 @@
     .CreateVolumeResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .UserVolumeInfo;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.junit.After;
@@ -76,7 +75,7 @@
 
     String volumeName = UUID.randomUUID().toString();
     String userName = "user1";
-    UserVolumeInfo volumeList = UserVolumeInfo.newBuilder()
+    PersistedUserVolumeInfo volumeList = PersistedUserVolumeInfo.newBuilder()
         .setObjectID(1)
         .setUpdateID(1)
         .addVolumeNames(volumeName).build();
@@ -95,7 +94,8 @@
         new OMVolumeCreateResponse(omResponse, omVolumeArgs, volumeList);
 
     // As we are deleting updated volume list should be empty.
-    UserVolumeInfo updatedVolumeList = UserVolumeInfo.newBuilder()
+    PersistedUserVolumeInfo updatedVolumeList =
+        PersistedUserVolumeInfo.newBuilder()
         .setObjectID(1).setUpdateID(1).build();
     OMVolumeDeleteResponse omVolumeDeleteResponse =
         new OMVolumeDeleteResponse(omResponse, volumeName, userName,
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeSetOwnerResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeSetOwnerResponse.java
index b6f6335..96129e4 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeSetOwnerResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeSetOwnerResponse.java
@@ -28,8 +28,7 @@
     .CreateVolumeResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .UserVolumeInfo;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -78,7 +77,7 @@
 
     String volumeName = UUID.randomUUID().toString();
     String oldOwner = "user1";
-    UserVolumeInfo volumeList = UserVolumeInfo.newBuilder()
+    PersistedUserVolumeInfo volumeList = PersistedUserVolumeInfo.newBuilder()
         .setObjectID(1)
         .setUpdateID(1)
         .addVolumeNames(volumeName).build();
@@ -99,11 +98,13 @@
 
 
     String newOwner = "user2";
-    UserVolumeInfo newOwnerVolumeList = UserVolumeInfo.newBuilder()
+    PersistedUserVolumeInfo newOwnerVolumeList =
+        PersistedUserVolumeInfo.newBuilder()
         .setObjectID(1)
         .setUpdateID(1)
         .addVolumeNames(volumeName).build();
-    UserVolumeInfo oldOwnerVolumeList = UserVolumeInfo.newBuilder()
+    PersistedUserVolumeInfo oldOwnerVolumeList =
+        PersistedUserVolumeInfo.newBuilder()
         .setObjectID(2)
         .setUpdateID(2)
         .build();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
new file mode 100644
index 0000000..cb7471d
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.security.acl;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.ozone.om.BucketManagerImpl;
+import org.apache.hadoop.ozone.om.KeyManagerImpl;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.PrefixManager;
+import org.apache.hadoop.ozone.om.PrefixManagerImpl;
+import org.apache.hadoop.ozone.om.VolumeManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.CREATE;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.NONE;
+import static org.mockito.Mockito.mock;
+
+
+/**
+ * Test Ozone owner check from OzoneNativeAuthorizer.
+ */
+public class TestVolumeOwner {
+
+  private static OzoneConfiguration ozoneConfig;
+  private static OzoneNativeAuthorizer nativeAuthorizer;
+  private static KeyManagerImpl keyManager;
+  private static VolumeManagerImpl volumeManager;
+  private static BucketManagerImpl bucketManager;
+  private static PrefixManager prefixManager;
+  private static OMMetadataManager metadataManager;
+  private static UserGroupInformation testUgi;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    ozoneConfig = new OzoneConfiguration();
+    ozoneConfig.set(OZONE_ACL_AUTHORIZER_CLASS,
+        OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
+    File dir = GenericTestUtils.getRandomizedTestDir();
+    ozoneConfig.set(OZONE_METADATA_DIRS, dir.toString());
+
+    metadataManager = new OmMetadataManagerImpl(ozoneConfig);
+    volumeManager = new VolumeManagerImpl(metadataManager, ozoneConfig);
+    bucketManager = new BucketManagerImpl(metadataManager);
+    keyManager = new KeyManagerImpl(mock(ScmBlockLocationProtocol.class),
+        metadataManager, ozoneConfig, "om1", null);
+    prefixManager = new PrefixManagerImpl(metadataManager, false);
+
+    nativeAuthorizer = new OzoneNativeAuthorizer(volumeManager, bucketManager,
+        keyManager, prefixManager,
+        Collections.singletonList("om"));
+
+    testUgi = UserGroupInformation.createUserForTesting("testuser",
+        new String[]{"test"});
+
+    prepareTestVols();
+    prepareTestBuckets();
+    prepareTestKeys();
+  }
+
+  // create 2 volumes
+  private static void prepareTestVols() throws IOException {
+    for (int i = 0; i < 2; i++) {
+      OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
+          .setVolume(getTestVolumeName(i))
+          .setAdminName("om")
+          .setOwnerName(getTestVolOwnerName(i))
+          .build();
+      TestOMRequestUtils.addVolumeToOM(metadataManager, volumeArgs);
+    }
+  }
+
+  // create 2 buckets under each volume
+  private static void prepareTestBuckets() throws IOException {
+    for (int i = 0; i < 2; i++) {
+      for (int j = 0; j < 2; j++) {
+        OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
+            .setVolumeName(getTestVolumeName(i))
+            .setBucketName(getTestBucketName(j))
+            .build();
+        TestOMRequestUtils.addBucketToOM(metadataManager, bucketInfo);
+      }
+    }
+  }
+
+  // create 2 keys under each test buckets
+  private static void prepareTestKeys() throws IOException {
+    for (int i = 0; i < 2; i++) {
+      for (int j = 0; j < 2; j++) {
+        for (int k = 0; k < 2; k++) {
+          OmKeyArgs.Builder keyArgsBuilder = new OmKeyArgs.Builder()
+              .setVolumeName(getTestVolumeName(i))
+              .setBucketName(getTestBucketName(j))
+              .setKeyName(getTestKeyName(k))
+              .setFactor(HddsProtos.ReplicationFactor.ONE)
+              .setDataSize(0)
+              .setType(HddsProtos.ReplicationType.STAND_ALONE);
+          if (k == 0) {
+            keyArgsBuilder.setAcls(OzoneAclUtil.getAclList(
+                testUgi.getUserName(), testUgi.getGroupNames(), ALL, ALL));
+          } else {
+            keyArgsBuilder.setAcls(OzoneAclUtil.getAclList(
+                testUgi.getUserName(), testUgi.getGroupNames(), NONE, NONE));
+          }
+          OmKeyArgs keyArgs = keyArgsBuilder.build();
+          OpenKeySession keySession = keyManager.createFile(keyArgs, true,
+              false);
+          keyArgs.setLocationInfoList(
+              keySession.getKeyInfo().getLatestVersionLocations()
+                  .getLocationList());
+          keyManager.commitKey(keyArgs, keySession.getId());
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testVolumeOps() throws Exception {
+    OzoneObj vol0 = getTestVolumeobj(0);
+
+    // admin = true, owner = false, ownerName = testvolumeOwner
+    RequestContext nonOwnerContext = getUserRequestContext("om",
+        IAccessAuthorizer.ACLType.CREATE, false, getTestVolOwnerName(0));
+    Assert.assertTrue("matching admins are allowed to perform admin " +
+        "operations", nativeAuthorizer.checkAccess(vol0, nonOwnerContext));
+
+    // admin = true, owner = false, ownerName = null
+    Assert.assertTrue("matching admins are allowed to perform admin " +
+        "operations", nativeAuthorizer.checkAccess(vol0, nonOwnerContext));
+
+    // admin = false, owner = false, ownerName = testvolumeOwner
+    RequestContext nonAdminNonOwnerContext = getUserRequestContext("testuser",
+        IAccessAuthorizer.ACLType.CREATE, false, getTestVolOwnerName(0));
+    Assert.assertFalse("mismatching admins are not allowed to perform admin " +
+        "operations", nativeAuthorizer.checkAccess(vol0,
+        nonAdminNonOwnerContext));
+
+    // admin = false, owner = true
+    RequestContext nonAdminOwnerContext = getUserRequestContext(
+        getTestVolOwnerName(0), IAccessAuthorizer.ACLType.CREATE,
+        true, getTestVolOwnerName(0));
+    Assert.assertFalse("mismatching admins are not allowed to perform admin " +
+        "operations even for owner", nativeAuthorizer.checkAccess(vol0,
+        nonAdminOwnerContext));
+
+    List<IAccessAuthorizer.ACLType> aclsToTest =
+        Arrays.stream(IAccessAuthorizer.ACLType.values()).filter(
+            (type)-> type != NONE && type != CREATE)
+            .collect(Collectors.toList());
+    for (IAccessAuthorizer.ACLType type: aclsToTest) {
+      nonAdminOwnerContext = getUserRequestContext(getTestVolOwnerName(0),
+          type, true, getTestVolOwnerName(0));
+      Assert.assertTrue("Owner is allowed to perform all non-admin " +
+          "operations", nativeAuthorizer.checkAccess(vol0,
+          nonAdminOwnerContext));
+    }
+  }
+
+  @Test
+  public void testBucketOps() throws Exception {
+    OzoneObj obj = getTestBucketobj(1, 1);
+    List<IAccessAuthorizer.ACLType> aclsToTest = getAclsToTest();
+
+    // admin = false, owner = true
+    for (IAccessAuthorizer.ACLType type: aclsToTest) {
+      RequestContext nonAdminOwnerContext = getUserRequestContext(
+          getTestVolOwnerName(1), type, true, getTestVolOwnerName(1));
+      Assert.assertTrue("non admin volume owner without acls are allowed" +
+          " to do " + type + " on bucket",
+          nativeAuthorizer.checkAccess(obj, nonAdminOwnerContext));
+    }
+
+    // admin = false, owner = false
+    for (IAccessAuthorizer.ACLType type: aclsToTest) {
+      RequestContext nonAdminOwnerContext = getUserRequestContext(
+          getTestVolOwnerName(1), type, false, getTestVolOwnerName(0));
+      Assert.assertFalse("non admin non volume owner without acls" +
+          " are not allowed to do " + type + " on bucket",
+          nativeAuthorizer.checkAccess(obj, nonAdminOwnerContext));
+    }
+  }
+
+  @Test
+  public void testKeyOps() throws Exception {
+    OzoneObj obj = getTestKeyobj(0, 0, 1);
+    List<IAccessAuthorizer.ACLType> aclsToTest = getAclsToTest();
+
+    // admin = false, owner = true
+    for (IAccessAuthorizer.ACLType type: aclsToTest) {
+      RequestContext nonAdminOwnerContext = getUserRequestContext(
+          getTestVolOwnerName(0), type, true, getTestVolOwnerName(0));
+      Assert.assertTrue("non admin volume owner without acls are allowed to " +
+              "access key",
+          nativeAuthorizer.checkAccess(obj, nonAdminOwnerContext));
+    }
+
+    // admin = false, owner = false
+    for (IAccessAuthorizer.ACLType type: aclsToTest) {
+      RequestContext nonAdminOwnerContext = getUserRequestContext(
+          getTestVolOwnerName(0), type, false, getTestVolOwnerName(1));
+      Assert.assertFalse("non admin volume owner without acls are" +
+              " not allowed to access key",
+          nativeAuthorizer.checkAccess(obj, nonAdminOwnerContext));
+    }
+  }
+
+  private RequestContext getUserRequestContext(String username,
+      IAccessAuthorizer.ACLType type, boolean isOwner, String ownerName) {
+    return RequestContext.getBuilder(
+        UserGroupInformation.createRemoteUser(username), null, null,
+        type, ownerName).build();
+  }
+
+  private static String getTestVolumeName(int index) {
+    return "vol" + index;
+  }
+
+  private static String getTestVolOwnerName(int index) {
+    return "owner" + index;
+  }
+
+  private static String getTestBucketName(int index) {
+    return "bucket" + index;
+  }
+
+  private static String getTestKeyName(int index) {
+    return "key" + index;
+  }
+
+  private OzoneObj getTestVolumeobj(int index) {
+    return OzoneObjInfo.Builder.getBuilder(OzoneObj.ResourceType.VOLUME,
+        OzoneObj.StoreType.OZONE,
+        getTestVolumeName(index), null, null).build();
+  }
+
+  private OzoneObj getTestBucketobj(int volIndex, int bucketIndex) {
+    return OzoneObjInfo.Builder.newBuilder()
+        .setResType(OzoneObj.ResourceType.BUCKET)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .setVolumeName(getTestVolumeName(volIndex))
+        .setBucketName(getTestBucketName(bucketIndex)).build();
+  }
+
+  private OzoneObj getTestKeyobj(int volIndex, int bucketIndex,
+      int keyIndex) {
+    return OzoneObjInfo.Builder.newBuilder()
+        .setResType(OzoneObj.ResourceType.KEY)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .setVolumeName(getTestVolumeName(volIndex))
+        .setBucketName(getTestBucketName(bucketIndex))
+        .setKeyName(getTestKeyName(keyIndex))
+        .build();
+  }
+
+  List<IAccessAuthorizer.ACLType> getAclsToTest() {
+    return Arrays.stream(IAccessAuthorizer.ACLType.values()).filter(
+        (type)-> type != NONE).collect(Collectors.toList());
+  }
+}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/AWSSignatureProcessor.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/AWSSignatureProcessor.java
index 0cb82fb..4d45101 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/AWSSignatureProcessor.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/AWSSignatureProcessor.java
@@ -75,6 +75,7 @@
   private AuthorizationHeaderV4 v4Header;
   private AuthorizationHeaderV2 v2Header;
   private String stringToSign;
+  private Exception exception;
 
   @PostConstruct
   public void init()
@@ -108,23 +109,38 @@
 
     this.method = context.getMethod();
     String authHeader = headers.get(AUTHORIZATION_HEADER);
-    String[] split = authHeader.split(" ");
-    if (split[0].equals(AuthorizationHeaderV2.IDENTIFIER)) {
-      if (v2Header == null) {
-        v2Header = new AuthorizationHeaderV2(authHeader);
+    try {
+      if (authHeader != null) {
+        String[] split = authHeader.split(" ");
+        if (split[0].equals(AuthorizationHeaderV2.IDENTIFIER)) {
+          if (v2Header == null) {
+            v2Header = new AuthorizationHeaderV2(authHeader);
+          }
+        } else {
+          if (v4Header == null) {
+            v4Header = new AuthorizationHeaderV4(authHeader);
+          }
+          parse();
+        }
+      } else { // no auth header
+        v4Header = null;
+        v2Header = null;
       }
-    } else {
-      if (v4Header == null) {
-        v4Header = new AuthorizationHeaderV4(authHeader);
+    } catch (Exception ex) {
+      // During validation of auth header, create instance and set Exception.
+      // This way it can be handled in OzoneClientProducer creation of
+      // SignatureProcessor instance failure.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Error during Validation of Auth Header:{}", authHeader);
       }
-      parse();
+      this.exception = ex;
     }
   }
 
 
-  public void parse() throws Exception {
-    StringBuilder strToSign = new StringBuilder();
+  private void parse() throws Exception {
 
+    StringBuilder strToSign = new StringBuilder();
     // According to AWS sigv4 documentation, authorization header should be
     // in following format.
     // Authorization: algorithm Credential=access key ID/credential scope,
@@ -167,7 +183,8 @@
   }
 
   @VisibleForTesting
-  public String buildCanonicalRequest() throws OS3Exception {
+  protected String buildCanonicalRequest() throws OS3Exception {
+
     Iterable<String> parts = split("/", uri);
     List<String> encParts = new ArrayList<>();
     for (String p : parts) {
@@ -357,6 +374,10 @@
     this.v2Header = v2Header;
   }
 
+  public Exception getException() {
+    return this.exception;
+  }
+
   /**
    * A simple map which forces lower case key usage.
    */
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
index a3042c1..364d263 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
@@ -23,7 +23,9 @@
 import javax.inject.Inject;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
@@ -36,6 +38,8 @@
 
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMTokenProto.Type.S3AUTHINFO;
 import static org.apache.hadoop.ozone.s3.SignatureProcessor.UTF_8;
+import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INTERNAL_ERROR;
+import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.MALFORMED_HEADER;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.S3_AUTHINFO_CREATION_ERROR;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +68,7 @@
 
 
   @Produces
-  public OzoneClient createClient() throws IOException {
+  public OzoneClient createClient() throws OS3Exception, IOException {
     client = getClient(ozoneConfiguration);
     return client;
   }
@@ -74,15 +78,22 @@
     client.close();
   }
 
-  private OzoneClient getClient(OzoneConfiguration config) throws IOException {
+  private OzoneClient getClient(OzoneConfiguration config)
+      throws OS3Exception {
+    OzoneClient ozoneClient = null;
     try {
+      // Check if any error occurred during creation of signatureProcessor.
+      if (signatureParser.getException() != null) {
+        throw signatureParser.getException();
+      }
       String awsAccessId = signatureParser.getAwsAccessId();
+      validateAccessId(awsAccessId);
+
       UserGroupInformation remoteUser =
           UserGroupInformation.createRemoteUser(awsAccessId);
       if (OzoneSecurityUtil.isSecurityEnabled(config)) {
         LOG.debug("Creating s3 auth info for client.");
         try {
-
           OzoneTokenIdentifier identifier = new OzoneTokenIdentifier();
           identifier.setTokenType(S3AUTHINFO);
           identifier.setStrToSign(signatureParser.getStringToSign());
@@ -98,25 +109,49 @@
               omService);
           remoteUser.addToken(token);
         } catch (OS3Exception | URISyntaxException ex) {
-          LOG.error("S3 auth info creation failed.");
           throw S3_AUTHINFO_CREATION_ERROR;
         }
-
       }
-      UserGroupInformation.setLoginUser(remoteUser);
-    } catch (Exception e) {
-      LOG.error("Error: ", e);
+      ozoneClient =
+          remoteUser.doAs((PrivilegedExceptionAction<OzoneClient>)() -> {
+            if (omServiceID == null) {
+              return OzoneClientFactory.getRpcClient(ozoneConfiguration);
+            } else {
+              // As in HA case, we need to pass om service ID.
+              return OzoneClientFactory.getRpcClient(omServiceID,
+                  ozoneConfiguration);
+            }
+          });
+    } catch (OS3Exception ex) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Error during Client Creation: ", ex);
+      }
+      throw ex;
+    } catch (Throwable t) {
+      // For any other critical errors during object creation throw Internal
+      // error.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Error during Client Creation: ", t);
+      }
+      throw INTERNAL_ERROR;
     }
+    return ozoneClient;
+  }
 
-    if (omServiceID == null) {
-      return OzoneClientFactory.getRpcClient(ozoneConfiguration);
-    } else {
-      // As in HA case, we need to pass om service ID.
-      return OzoneClientFactory.getRpcClient(omServiceID, ozoneConfiguration);
+  // ONLY validate aws access id when needed.
+  private void validateAccessId(String awsAccessId) throws Exception {
+    if (awsAccessId == null || awsAccessId.equals("")) {
+      LOG.error("Malformed s3 header. awsAccessID: ", awsAccessId);
+      throw MALFORMED_HEADER;
     }
   }
 
   public void setOzoneConfiguration(OzoneConfiguration config) {
     this.ozoneConfiguration = config;
   }
+
+  @VisibleForTesting
+  public void setSignatureParser(SignatureProcessor signatureParser) {
+    this.signatureParser = signatureParser;
+  }
 }
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/SignatureProcessor.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/SignatureProcessor.java
index b21bfc1..e3cb6af 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/SignatureProcessor.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/SignatureProcessor.java
@@ -61,4 +61,6 @@
   String getSignature();
 
   String getAwsAccessId();
+
+  Exception getException();
 }
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java
index fedc1ba..432b582 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java
@@ -23,6 +23,7 @@
 import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
 import static java.net.HttpURLConnection.HTTP_CONFLICT;
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_SERVER_ERROR;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_NOT_SATISFIABLE;
 
 /**
@@ -100,6 +101,10 @@
       "allowed object size. Each part must be at least 5 MB in size, except " +
       "the last part.", HTTP_BAD_REQUEST);
 
+  public static final OS3Exception INTERNAL_ERROR = new OS3Exception(
+      "InternalError", "We encountered an internal error. Please try again.",
+      HTTP_SERVER_ERROR);
+
 
   /**
    * Create a new instance of Error.
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestOzoneClientProducer.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestOzoneClientProducer.java
index ac346c7..d1b5e08 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestOzoneClientProducer.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestOzoneClientProducer.java
@@ -21,7 +21,6 @@
 import javax.ws.rs.core.MultivaluedHashMap;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.UriInfo;
-import java.io.IOException;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
@@ -29,7 +28,7 @@
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.ozone.s3.exception.OS3Exception;
 
 import static org.apache.hadoop.ozone.s3.SignatureProcessor.AUTHORIZATION_HEADER;
 import static org.apache.hadoop.ozone.s3.SignatureProcessor.CONTENT_MD5;
@@ -37,6 +36,9 @@
 import static org.apache.hadoop.ozone.s3.SignatureProcessor.HOST_HEADER;
 import static org.apache.hadoop.ozone.s3.SignatureProcessor.X_AMAZ_DATE;
 import static org.apache.hadoop.ozone.s3.SignatureProcessor.X_AMZ_CONTENT_SHA256;
+import static org.junit.Assert.fail;
+
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -84,9 +86,13 @@
   }
 
   @Test
-  public void testGetClientFailure() throws Exception {
-    LambdaTestUtils.intercept(IOException.class, "Couldn't create",
-        () -> producer.createClient());
+  public void testGetClientFailure() {
+    try {
+      producer.createClient();
+      fail("testGetClientFailure");
+    } catch (Exception ex) {
+      Assert.assertTrue(ex instanceof OS3Exception);
+    }
   }
 
   private void setupContext() throws Exception {
@@ -106,6 +112,12 @@
         .thenReturn(authHeader);
     Mockito.when(context.getUriInfo().getQueryParameters())
         .thenReturn(queryMap);
+
+    AWSSignatureProcessor awsSignatureProcessor = new AWSSignatureProcessor();
+    awsSignatureProcessor.setContext(context);
+    awsSignatureProcessor.init();
+
+    producer.setSignatureParser(awsSignatureProcessor);
   }
 
   @Parameterized.Parameters
@@ -137,6 +149,9 @@
             "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
             "20150830T123600Z",
             "application/x-www-form-urlencoded; charset=utf-8"
+        },
+        {
+            null, null, null, null, null, null
         }
     });
   }
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestBucketPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestBucketPut.java
index b4a21e3..6c41509 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestBucketPut.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestBucketPut.java
@@ -74,6 +74,11 @@
       public String getAwsAccessId() {
         return OzoneConsts.OZONE;
       }
+
+      @Override
+      public Exception getException() {
+        return null;
+      }
     });
   }
 
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestRootList.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestRootList.java
index 02c3b7c..7c5d2a5 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestRootList.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestRootList.java
@@ -69,6 +69,11 @@
       public String getAwsAccessId() {
         return OzoneConsts.OZONE;
       }
+
+      @Override
+      public Exception getException() {
+        return null;
+      }
     });
     // List operation should succeed even there is no bucket.
     ListBucketResponse response =
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/Shell.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/Shell.java
index 2016886..6fd0aa3 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/Shell.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/Shell.java
@@ -44,16 +44,24 @@
 
   @Override
   protected void printError(Throwable errorArg) {
+    OMException omException = null;
+
     if (errorArg instanceof OMException) {
-      if (isVerbose()) {
-        errorArg.printStackTrace(System.err);
-      } else {
-        OMException omException = (OMException) errorArg;
-        System.err.println(String
-            .format("%s %s", omException.getResult().name(),
-                omException.getMessage()));
-      }
+      omException = (OMException) errorArg;
+    } else if (errorArg.getCause() instanceof OMException) {
+      // If the OMException occurred in a method that could not throw a
+      // checked exception (like an Iterator implementation), it will be
+      // chained to an unchecked exception and thrown.
+      omException = (OMException) errorArg.getCause();
+    }
+
+    if (omException != null && !isVerbose()) {
+      // In non-verbose mode, reformat OMExceptions as error messages to the
+      // user.
+      System.err.println(String.format("%s %s", omException.getResult().name(),
+              omException.getMessage()));
     } else {
+      // Prints the stack trace when in verbose mode.
       super.printError(errorArg);
     }
   }