Merge master to EC branch HDDS-3816-ec
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 0ebf088..5f8f6f5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -26,6 +26,10 @@
       "hdds.heartbeat.interval";
   public static final String HDDS_HEARTBEAT_INTERVAL_DEFAULT =
       "30s";
+  public static final String HDDS_RECON_HEARTBEAT_INTERVAL =
+      "hdds.recon.heartbeat.interval";
+  public static final String HDDS_RECON_HEARTBEAT_INTERVAL_DEFAULT =
+      "60s";
   public static final String HDDS_NODE_REPORT_INTERVAL =
       "hdds.node.report.interval";
   public static final String HDDS_NODE_REPORT_INTERVAL_DEFAULT =
@@ -269,4 +273,7 @@
           "hdds.container.checksum.verification.enabled";
   public static final boolean
           HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED_DEFAULT = true;
+
+  public static final String OZONE_AUDIT_LOG_DEBUG_CMD_LIST_DNAUDIT =
+      "ozone.audit.log.debug.cmd.list.dnaudit";
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 66fb593..2a962d7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -552,6 +552,8 @@
   public static final boolean
       OZONE_SCM_HA_RATIS_SERVER_ELECTION_PRE_VOTE_DEFAULT = false;
 
+  public static final String OZONE_AUDIT_LOG_DEBUG_CMD_LIST_SCMAUDIT =
+      "ozone.audit.log.debug.cmd.list.scmaudit";
   /**
    * Never constructed.
    */
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 4b7fda9..699d732 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -467,6 +467,8 @@
       OZONE_CLIENT_BUCKET_REPLICATION_CONFIG_REFRESH_PERIOD_DEFAULT_MS =
       300 * 1000;
 
+  public static final String OZONE_AUDIT_LOG_DEBUG_CMD_LIST_OMAUDIT =
+      "ozone.audit.log.debug.cmd.list.omaudit";
   /**
    * There is no need to instantiate this class.
    */
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
index 9f1f5f0..43fb4e4 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java
@@ -18,10 +18,20 @@
 package org.apache.hadoop.ozone.audit;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Marker;
 import org.apache.logging.log4j.spi.ExtendedLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 
 /**
@@ -29,10 +39,17 @@
  */
 public class AuditLogger {
 
+  private static final Logger LOG = LoggerFactory.getLogger(AuditLogger.class);
+
   private ExtendedLogger logger;
   private static final String FQCN = AuditLogger.class.getName();
   private static final Marker WRITE_MARKER = AuditMarker.WRITE.getMarker();
   private static final Marker READ_MARKER = AuditMarker.READ.getMarker();
+  private final AtomicReference<Set<String>> debugCmdSetRef =
+      new AtomicReference<>(new HashSet<>());
+  public static final String AUDIT_LOG_DEBUG_CMD_LIST_PREFIX =
+      "ozone.audit.log.debug.cmd.list.";
+  private AuditLoggerType type;
 
   /**
    * Parametrized Constructor to initialize logger.
@@ -48,6 +65,8 @@
    */
   private void initializeLogger(AuditLoggerType loggerType) {
     this.logger = LogManager.getContext(false).getLogger(loggerType.getType());
+    this.type = loggerType;
+    refreshDebugCmdSet();
   }
 
   @VisibleForTesting
@@ -56,7 +75,11 @@
   }
 
   public void logWriteSuccess(AuditMessage msg) {
-    this.logger.logIfEnabled(FQCN, Level.INFO, WRITE_MARKER, msg, null);
+    if (shouldLogAtDebug(msg)) {
+      this.logger.logIfEnabled(FQCN, Level.DEBUG, WRITE_MARKER, msg, null);
+    } else {
+      this.logger.logIfEnabled(FQCN, Level.INFO, WRITE_MARKER, msg, null);
+    }
   }
 
   public void logWriteFailure(AuditMessage msg) {
@@ -65,7 +88,11 @@
   }
 
   public void logReadSuccess(AuditMessage msg) {
-    this.logger.logIfEnabled(FQCN, Level.INFO, READ_MARKER, msg, null);
+    if (shouldLogAtDebug(msg)) {
+      this.logger.logIfEnabled(FQCN, Level.DEBUG, READ_MARKER, msg, null);
+    } else {
+      this.logger.logIfEnabled(FQCN, Level.INFO, READ_MARKER, msg, null);
+    }
   }
 
   public void logReadFailure(AuditMessage msg) {
@@ -75,12 +102,28 @@
 
   public void logWrite(AuditMessage auditMessage) {
     if (auditMessage.getThrowable() == null) {
-      this.logger.logIfEnabled(FQCN, Level.INFO, WRITE_MARKER, auditMessage,
-          auditMessage.getThrowable());
+      logWriteSuccess(auditMessage);
     } else {
-      this.logger.logIfEnabled(FQCN, Level.ERROR, WRITE_MARKER, auditMessage,
-          auditMessage.getThrowable());
+      logWriteFailure(auditMessage);
     }
   }
 
+  public void refreshDebugCmdSet() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    refreshDebugCmdSet(conf);
+  }
+
+  public void refreshDebugCmdSet(OzoneConfiguration conf) {
+    Collection<String> cmds = conf.getTrimmedStringCollection(
+        AUDIT_LOG_DEBUG_CMD_LIST_PREFIX +
+            type.getType().toLowerCase(Locale.ROOT));
+    LOG.info("Refresh DebugCmdSet for {} to {}.", type.getType(), cmds);
+    debugCmdSetRef.set(
+        cmds.stream().map(String::toLowerCase).collect(Collectors.toSet()));
+  }
+
+  private boolean shouldLogAtDebug(AuditMessage auditMessage) {
+    return debugCmdSetRef.get()
+        .contains(auditMessage.getOp().toLowerCase(Locale.ROOT));
+  }
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLoggerType.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLoggerType.java
index dbfde9f..d37d221 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLoggerType.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditLoggerType.java
@@ -23,7 +23,8 @@
 public enum AuditLoggerType {
   DNLOGGER("DNAudit"),
   OMLOGGER("OMAudit"),
-  SCMLOGGER("SCMAudit");
+  SCMLOGGER("SCMAudit"),
+  S3GLOGGER("S3GAudit");
 
   private String type;
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditMessage.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditMessage.java
index 9d28c9f..85fa798 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditMessage.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/AuditMessage.java
@@ -26,10 +26,21 @@
 public final class AuditMessage implements Message {
 
   private final String message;
+  private final String user;
+  private final String ip;
+  private final String op;
+  private final Map<String, String> params;
+  private final String ret;
   private final Throwable throwable;
 
-  private AuditMessage(String message, Throwable throwable) {
-    this.message = message;
+  private AuditMessage(String user, String ip, String op,
+      Map<String, String> params, String ret, Throwable throwable) {
+    this.user = user;
+    this.ip = ip;
+    this.op = op;
+    this.params = params;
+    this.ret = ret;
+    this.message = formMessage(user, ip, op, params, ret);
     this.throwable = throwable;
   }
 
@@ -53,6 +64,10 @@
     return throwable;
   }
 
+  public String getOp() {
+    return op;
+  }
+
   /**
    * Builder class for AuditMessage.
    */
@@ -95,9 +110,14 @@
     }
 
     public AuditMessage build() {
-      String message = "user=" + this.user + " | ip=" + this.ip + " | " +
-          "op=" + this.op + " " + this.params + " | " + "ret=" + this.ret;
-      return new AuditMessage(message, throwable);
+      return new AuditMessage(user, ip, op, params, ret, throwable);
     }
   }
+
+  private String formMessage(String userStr, String ipStr, String opStr,
+      Map<String, String> paramsMap, String retStr) {
+    return "user=" + userStr + " | ip=" + ipStr + " | " + "op=" + opStr
+        + " " + paramsMap + " | " + "ret=" + retStr;
+
+  }
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/S3GAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/S3GAction.java
new file mode 100644
index 0000000..e0cbea9
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/S3GAction.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.audit;
+
+/**
+ * Enum to define Audit Action types for S3Gateway.
+ */
+public enum S3GAction implements AuditAction {
+
+  //BucketEndpoint
+  GET_BUCKET,
+  CREATE_BUCKET,
+  HEAD_BUCKET,
+  DELETE_BUCKET,
+  GET_ACL,
+  PUT_ACL,
+  LIST_MULTIPART_UPLOAD,
+  MULTI_DELETE,
+
+  //RootEndpoint
+  LIST_S3_BUCKETS,
+
+  //ObjectEndpoint
+  CREATE_MULTIPART_KEY,
+  COPY_OBJECT,
+  CREATE_KEY,
+  LIST_PARTS,
+  GET_KEY,
+  HEAD_KEY,
+  INIT_MULTIPART_UPLOAD,
+  COMPLETE_MULTIPART_UPLOAD,
+  ABORT_MULTIPART_UPLOAD,
+  DELETE_KEY;
+
+  @Override
+  public String getAction() {
+    return this.toString();
+  }
+
+}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 133528c..aaa7859 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -991,6 +991,14 @@
     </description>
   </property>
   <property>
+    <name>hdds.recon.heartbeat.interval</name>
+    <value>60s</value>
+    <tag>OZONE, MANAGEMENT, RECON</tag>
+    <description>
+      The heartbeat interval from a Datanode to Recon.
+    </description>
+  </property>
+  <property>
     <name>ozone.scm.heartbeat.log.warn.interval.count</name>
     <value>10</value>
     <tag>OZONE, MANAGEMENT</tag>
@@ -1651,6 +1659,24 @@
   </property>
 
   <property>
+    <name>ozone.om.unflushed.transaction.max.count</name>
+    <value>10000</value>
+    <tag>OZONE, OM</tag>
+    <description>the unflushed transactions here are those requests that have been
+      applied to OM state machine but not been flushed to OM rocksdb. when OM meets
+      high concurrency-pressure and flushing is not fast enough, too many pending
+      requests will be hold in memory and will lead to long GC of OM, which will slow
+      down flushing further. there are some cases that flushing is slow, for example,
+      1 rocksdb is on a HDD, which has poor IO performance than SSD.
+      2 a big compaction is happening internally in rocksdb and write stall of
+      rocksdb happens.
+      3 long GC, which may caused by other factors.
+      the property is to limit the max count of unflushed transactions, so that the
+      maximum memory occupied by unflushed transactions is limited.
+    </description>
+  </property>
+
+  <property>
     <name>ozone.om.lock.fair</name>
     <value>false</value>
     <description>If this is true, the Ozone Manager lock will be used in Fair
@@ -3103,4 +3129,34 @@
       default the number of retries are 10.
     </description>
   </property>
+
+  <property>
+    <name>ozone.audit.log.debug.cmd.list.omaudit</name>
+    <value></value>
+    <tag>OM</tag>
+    <description>
+      A comma separated list of OzoneManager commands that are written to the OzoneManager audit logs only if the audit
+      log level is debug. Ex: "ALLOCATE_BLOCK,ALLOCATE_KEY,COMMIT_KEY".
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.audit.log.debug.cmd.list.scmaudit</name>
+    <value></value>
+    <tag>SCM</tag>
+    <description>
+      A comma separated list of SCM commands that are written to the SCM audit logs only if the audit
+      log level is debug. Ex: "GET_VERSION,REGISTER,SEND_HEARTBEAT".
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.audit.log.debug.cmd.list.dnaudit</name>
+    <value></value>
+    <tag>DN</tag>
+    <description>
+      A comma separated list of Datanode commands that are written to the DN audit logs only if the audit
+      log level is debug. Ex: "CREATE_CONTAINER,READ_CONTAINER,UPDATE_CONTAINER".
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/audit/TestOzoneAuditLogger.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/audit/TestOzoneAuditLogger.java
index 01fceae..1b8e744 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/audit/TestOzoneAuditLogger.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/audit/TestOzoneAuditLogger.java
@@ -18,7 +18,9 @@
 package org.apache.hadoop.ozone.audit;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,6 +29,7 @@
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.stream.Collectors;
 
@@ -110,11 +113,16 @@
     }
   }
 
+  @Before
+  public void init() {
+    AUDIT.refreshDebugCmdSet();
+  }
+
   /**
-   * Test to verify default log level is INFO when logging success events.
+   * Test to verify default log level is INFO when logging WRITE success events.
    */
   @Test
-  public void verifyDefaultLogLevelForSuccess() throws IOException {
+  public void verifyDefaultLogLevelForWriteSuccess() throws IOException {
     AUDIT.logWriteSuccess(WRITE_SUCCESS_MSG);
     String expected =
         "INFO  | OMAudit | ? | " + WRITE_SUCCESS_MSG.getFormattedMessage();
@@ -122,16 +130,39 @@
   }
 
   /**
-   * Test to verify default log level is ERROR when logging failure events.
+   * Test to verify default log level is ERROR when logging WRITE failure
+   * events.
    */
   @Test
-  public void verifyDefaultLogLevelForFailure() throws IOException {
+  public void verifyDefaultLogLevelForWriteFailure() throws IOException {
     AUDIT.logWriteFailure(WRITE_FAIL_MSG);
     String expected =
         "ERROR | OMAudit | ? | " + WRITE_FAIL_MSG.getFormattedMessage();
     verifyLog(expected);
   }
 
+  /**
+   * Test to verify default log level is INFO when logging READ success events.
+   */
+  @Test
+  public void verifyDefaultLogLevelForReadSuccess() throws IOException {
+    AUDIT.logReadSuccess(READ_SUCCESS_MSG);
+    String expected =
+        "INFO  | OMAudit | ? | " + READ_SUCCESS_MSG.getFormattedMessage();
+    verifyLog(expected);
+  }
+
+  /**
+   * Test to verify default log level is ERROR when logging READ failure events.
+   */
+  @Test
+  public void verifyDefaultLogLevelForFailure() throws IOException {
+    AUDIT.logReadFailure(READ_FAIL_MSG);
+    String expected =
+        "ERROR | OMAudit | ? | " + READ_FAIL_MSG.getFormattedMessage();
+    verifyLog(expected);
+  }
+
   @Test
   public void messageIncludesAllParts() {
     String message = WRITE_FAIL_MSG.getFormattedMessage();
@@ -143,19 +174,22 @@
   }
 
   /**
-   * Test to verify no READ event is logged.
+   * Test to verify no WRITE event is logged.
    */
   @Test
-  public void notLogReadEvents() throws IOException {
-    AUDIT.logReadSuccess(READ_SUCCESS_MSG);
-    AUDIT.logReadFailure(READ_FAIL_MSG);
+  public void excludedEventNotLogged() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(AuditLogger.AUDIT_LOG_DEBUG_CMD_LIST_PREFIX +
+            AuditLoggerType.OMLOGGER.getType().toLowerCase(Locale.ROOT),
+        "CREATE_VOLUME");
+    AUDIT.refreshDebugCmdSet(conf);
+    AUDIT.logWriteSuccess(WRITE_SUCCESS_MSG);
     verifyNoLog();
   }
-
+  
   /**
    * Test to verify if multiline entries can be checked.
    */
-
   @Test
   public void messageIncludesMultilineException() throws IOException {
     String exceptionMessage = "Dummy exception message";
diff --git a/hadoop-hdds/common/src/test/resources/auditlog.properties b/hadoop-hdds/common/src/test/resources/auditlog.properties
index 85c18b5..959da04 100644
--- a/hadoop-hdds/common/src/test/resources/auditlog.properties
+++ b/hadoop-hdds/common/src/test/resources/auditlog.properties
@@ -28,7 +28,7 @@
 # in the configuration
 filter.read.type = MarkerFilter
 filter.read.marker = READ
-filter.read.onMatch = DENY
+filter.read.onMatch = NEUTRAL
 filter.read.onMismatch = NEUTRAL
 
 # filter.write.onMatch = DENY avoids logging all WRITE events
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 7738f8d..087226c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -65,6 +65,7 @@
 import com.google.protobuf.GeneratedMessage;
 import static java.lang.Math.min;
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.getLogWarnInterval;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getReconHeartbeatInterval;
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -149,6 +150,7 @@
    */
   private final AtomicLong heartbeatFrequency = new AtomicLong(2000);
 
+  private final AtomicLong reconHeartbeatFrequency = new AtomicLong(2000);
   /**
    * Constructs a StateContext.
    *
@@ -906,4 +908,15 @@
   public GeneratedMessage getCRLStatusReport() {
     return crlStatusReport.get();
   }
+
+  public void configureReconHeartbeatFrequency() {
+    reconHeartbeatFrequency.set(getReconHeartbeatInterval(conf));
+  }
+
+  /**
+   * Return current Datanode to Recon heartbeat frequency in ms.
+   */
+  public long getReconHeartbeatFrequency() {
+    return reconHeartbeatFrequency.get();
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
index 29d27f3..4e5b64c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
@@ -146,9 +146,15 @@
         // the thread in executor from DatanodeStateMachine for a long time,
         // so that it won't affect the communication between datanode and
         // other EndpointStateMachine.
+        long heartbeatFrequency;
+        if (endpoint.isPassive()) {
+          heartbeatFrequency = context.getReconHeartbeatFrequency();
+        } else {
+          heartbeatFrequency = context.getHeartbeatFrequency();
+        }
         ecs.submit(() -> endpoint.getExecutorService()
             .submit(endpointTask)
-            .get(context.getHeartbeatFrequency(), TimeUnit.MILLISECONDS));
+            .get(heartbeatFrequency, TimeUnit.MILLISECONDS));
       } else {
         // This can happen if a task is taking more time than the timeOut
         // specified for the task in await, and when it is completed the task
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
index 9951a38..47c9377 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
@@ -176,7 +176,11 @@
             rpcEndPoint.getState().getNextState();
         rpcEndPoint.setState(nextState);
         rpcEndPoint.zeroMissedCount();
-        this.stateContext.configureHeartbeatFrequency();
+        if (rpcEndPoint.isPassive()) {
+          this.stateContext.configureReconHeartbeatFrequency();
+        } else {
+          this.stateContext.configureHeartbeatFrequency();
+        }
       }
     } catch (IOException ex) {
       rpcEndPoint.logIfNeeded(ex);
diff --git a/hadoop-hdds/docs/content/feature/OM-HA.md b/hadoop-hdds/docs/content/feature/OM-HA.md
index 573ce77..cba48e1 100644
--- a/hadoop-hdds/docs/content/feature/OM-HA.md
+++ b/hadoop-hdds/docs/content/feature/OM-HA.md
@@ -110,6 +110,43 @@
 
 The details of this approach discussed in a separated [design doc]({{< ref "design/omha.md" >}}) but it's integral part of the OM HA design.
 
+## OM Bootstrap
+
+To convert a non-HA OM to be HA or to add new OM nodes to existing HA OM ring, new OM node(s) need to be bootstrapped.
+
+Before bootstrapping a new OM node, all the existing OM's on-disk configuration file (ozone-site.xml) must be updated with the configuration details
+of the new OM such nodeId, address, port etc. Note that the existing OM's need not be restarted. They will reload the configuration from disk when
+they receive a bootstrap request from the bootstrapping node.
+
+To bootstrap an OM, the following command needs to be run:
+
+```shell
+ozone om [global options (optional)] --bootstrap
+```
+
+The bootstrap command will first verify that all the OMs have the updated configuration file and fail the command otherwise. This check can be skipped
+using the _force_ option. The _force_ option allows to continue with the bootstrap when one of the existing OMs is down or not responding.
+
+```shell
+ozone om [global options (optional)] --bootstrap --force
+```
+
+Note that using the _force_ option during bootstrap could crash the OM process if it does not have updated configurations.
+
+## OM Decommission
+
+To decommission an OM and remove the node from the OM HA ring, the following steps need to be executed.
+1. Stop the OzoneManager process only on the node which needs to be decommissioned. <p> **Note -** Do not stop the decommissioning OM if there are
+   only two OMs in the ring as both the OMs would be needed to reach consensus to update the Ratis configuration.</p>
+2. Add the _OM NodeId_ of the to be decommissioned OM node to the _ozone.om.decommissioned.nodes.<omServiceId>_ property in _ozone-site.xml_ of all
+   other OMs.
+3. Run the following command to decommission an OM node.
+```shell
+ozone admin om decommission -id=<om-service-id> -nodeid=<decommissioning-om-node-id> -hostname=<decommissioning-om-node-address> [optional --force]
+```
+The _force_option will skip checking whether OM configurations in _ozone-site.xml_ have been updated with the decommissioned node added to
+_ozone.om.decommissioned.nodes_ property. <p>**Note -** It is recommended to bootstrap another OM node before decommissioning one to maintain HA.</p>
+
 ## References
 
  * Check [this page]({{< ref "design/omha.md" >}}) for the links to the original design docs
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index 98821d9..f64db4d 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -70,6 +70,8 @@
 import static org.apache.hadoop.hdds.DFSConfigKeysLegacy.DFS_DATANODE_DATA_DIR_KEY;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_RECON_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_RECON_HEARTBEAT_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
 import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
@@ -253,6 +255,18 @@
   }
 
   /**
+   * Heartbeat Interval - Defines the heartbeat frequency from a datanode to
+   * Recon.
+   *
+   * @param conf - Ozone Config
+   * @return - HB interval in milli seconds.
+   */
+  public static long getReconHeartbeatInterval(ConfigurationSource conf) {
+    return conf.getTimeDuration(HDDS_RECON_HEARTBEAT_INTERVAL,
+        HDDS_RECON_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+  }
+
+  /**
    * Get the Stale Node interval, which is used by SCM to flag a datanode as
    * stale, if the heartbeat from that node has been missing for this duration.
    *
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index 541ceaa..bc4748e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -59,7 +59,7 @@
   /**
    * Stops the HA service.
    */
-  void shutdown() throws IOException;
+  void stop() throws IOException;
 
   /**
    * Adds the SC M instance to the SCM HA group.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index a69f2cb..0fcc1ec 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -334,10 +334,9 @@
    * {@inheritDoc}
    */
   @Override
-  public void shutdown() throws IOException {
+  public void stop() throws IOException {
     if (ratisServer != null) {
       ratisServer.stop();
-      ratisServer.getSCMStateMachine().close();
       grpcServer.stop();
     }
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
index 65f3da3..6dbbea3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerStub.java
@@ -78,6 +78,11 @@
     ratisServer.start();
   }
 
+  @Override
+  public void stop() throws IOException {
+    ratisServer.stop();
+  }
+
   /**
    * Informs RatisServerStub to behaviour as a leader SCM or a follower SCM.
    */
@@ -112,14 +117,6 @@
     return null;
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void shutdown() throws IOException {
-    ratisServer.stop();
-  }
-
   @Override
   public boolean addSCM(AddSCMRequest request) throws IOException {
     return false;
@@ -217,6 +214,11 @@
     }
 
     @Override
+    public boolean isStopped() {
+      return false;
+    }
+
+    @Override
     public RaftServer.Division getDivision() {
       return null;
     }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
index e23c984..7a97697 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
@@ -43,6 +43,8 @@
 
   void stop() throws IOException;
 
+  boolean isStopped();
+
   RaftServer.Division getDivision();
 
   /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 3a68ad1..f8193fa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -76,6 +76,7 @@
   private final AtomicLong callId = new AtomicLong();
   private final RaftServer.Division division;
   private final GrpcTlsConfig grpcTlsConfig;
+  private boolean isStopped;
 
   // TODO: Refactor and remove ConfigurationSource and use only
   //  SCMHAConfiguration.
@@ -108,6 +109,7 @@
         (SCMStateMachine) server.getDivision(groupId).getStateMachine();
 
     this.division = server.getDivision(groupId);
+    this.isStopped = false;
   }
 
   public static void initialize(String clusterId, String scmId,
@@ -235,9 +237,17 @@
   public void stop() throws IOException {
     LOG.info("stopping ratis server {}", server.getPeer().getAddress());
     server.close();
+    isStopped = true;
+    getSCMStateMachine().close();
   }
 
   @Override
+  public boolean isStopped() {
+    return isStopped;
+  }
+
+
+  @Override
   public List<String> getRatisRoles() throws IOException {
     Collection<RaftPeer> peers = division.getGroup().getPeers();
     List<String> ratisRoles = new ArrayList<>();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 9aeda10..74074bc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -390,10 +390,17 @@
     if (!isInitialized) {
       return;
     }
-    super.close();
-    transactionBuffer.close();
-    HadoopExecutors.
-        shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
+    //if ratis server is stopped , it indicates this `close` originates
+    // from `scm.stop()`, otherwise, it indicates this `close` originates
+    // from ratis.
+    if (scm.getScmHAManager().getRatisServer().isStopped()) {
+      super.close();
+      transactionBuffer.close();
+      HadoopExecutors.
+          shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
+    } else {
+      scm.shutDown("scm statemachine is closed by ratis, terminate SCM");
+    }
   }
 
   @VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index a2bfa42..6fed1e2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -32,7 +32,6 @@
 import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.util.Time;
-import org.apache.ratis.util.ExitUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -133,12 +132,9 @@
         .setDaemon(false)
         .setNameFormat(THREAD_NAME + " - %d")
         .setUncaughtExceptionHandler((Thread t, Throwable ex) -> {
-          // gracefully shutdown SCM.
-          scmContext.getScm().stop();
-
           String message = "Terminate SCM, encounter uncaught exception"
               + " in RatisPipelineUtilsThread";
-          ExitUtils.terminate(1, message, ex, LOG);
+          scmContext.getScm().shutDown(message);
         })
         .build()
         .newThread(this::run);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
index 3784f97..fca789d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
@@ -41,6 +41,8 @@
 
   void join();
 
+  void shutDown(String message);
+
   NodeManager getScmNodeManager();
 
   BlockManager getScmBlockManager();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index e869754..395b77c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -141,6 +141,7 @@
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.ratis.util.ExitUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1511,7 +1512,7 @@
 
     try {
       LOG.info("Stopping SCM HA services.");
-      scmHAManager.shutdown();
+      scmHAManager.stop();
     } catch (Exception ex) {
       LOG.error("SCM HA Manager stop failed", ex);
     }
@@ -1533,6 +1534,12 @@
     scmSafeModeManager.stop();
   }
 
+  @Override
+  public void shutDown(String message) {
+    stop();
+    ExitUtils.terminate(1, message, LOG);
+  }
+
   /**
    * Wait until service has completed shutdown.
    */
diff --git a/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html b/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html
index e7a67ed..deaf37a 100644
--- a/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html
+++ b/hadoop-hdds/server-scm/src/main/resources/webapps/scm/scm-overview.html
@@ -37,6 +37,7 @@
         <th>HEALTHY</th>
         <th>STALE</th>
         <th>DEAD</th>
+        <th>HEALTHY_READONLY</th>
     </tr>
     </thead>
     <tbody>
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
index c64615f..5326f15 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
@@ -67,6 +67,11 @@
       }
 
       @Override
+      public boolean isStopped() {
+        return false;
+      }
+
+      @Override
       public RaftServer.Division getDivision() {
         return null;
       }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestSCMHAUnfinalizedStateValidationAction.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestSCMHAUnfinalizedStateValidationAction.java
index 17c6dee..c92820e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestSCMHAUnfinalizedStateValidationAction.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestSCMHAUnfinalizedStateValidationAction.java
@@ -27,7 +27,9 @@
 import org.apache.hadoop.ozone.upgrade.UpgradeException;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
 import org.apache.ozone.test.LambdaTestUtils;
+import org.apache.ratis.util.ExitUtils;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -63,6 +65,12 @@
   private final String dataPath;
   private static final String CLUSTER_ID = UUID.randomUUID().toString();
 
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
+
   @Parameterized.Parameters(name = "haEnabledBefore={0} " +
       "haEnabledPreFinalized={1}")
   public static Collection<Object[]> cases() {
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
index 25ed0e5..37bea18 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
@@ -63,6 +63,11 @@
       description = "Format output as JSON")
   private boolean json;
 
+  @CommandLine.Option(names = { "--replicas" },
+      defaultValue = "false",
+      description = "Adds replica related details")
+  private boolean addReplicaDetails;
+
   @Parameters(description = "Decimal id of the container.")
   private long containerID;
 
@@ -103,7 +108,7 @@
       LOG.info("Datanodes: [{}]", machinesStr);
 
       // Print the replica details if available
-      if (replicas != null) {
+      if (addReplicaDetails && replicas != null) {
         String replicaStr = replicas.stream().map(
             InfoSubcommand::buildReplicaDetails)
             .collect(Collectors.joining(",\n"));
diff --git a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
index 10b5758..c604019 100644
--- a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
+++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
@@ -97,7 +97,7 @@
         .thenReturn(getReplicas(includeIndex));
     cmd = new InfoSubcommand();
     CommandLine c = new CommandLine(cmd);
-    c.parseArgs("1");
+    c.parseArgs("1", "--replicas");
     cmd.execute(scmClient);
 
     // Ensure we have a line for Replicas:
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 01d9e91..213868c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -582,8 +582,18 @@
       }
     }
 
-    String owner = bucketArgs.getOwner() == null ?
-            ugi.getShortUserName() : bucketArgs.getOwner();
+    final String owner;
+    // If S3 auth exists, set owner name to the short user name derived from the
+    //  accessId. Similar to RpcClient#getDEK
+    if (getThreadLocalS3Auth() != null) {
+      UserGroupInformation s3gUGI = UserGroupInformation.createRemoteUser(
+          getThreadLocalS3Auth().getAccessID());
+      owner = s3gUGI.getShortUserName();
+    } else {
+      owner = bucketArgs.getOwner() == null ?
+          ugi.getShortUserName() : bucketArgs.getOwner();
+    }
+
     Boolean isVersionEnabled = bucketArgs.getVersioning() == null ?
         Boolean.FALSE : bucketArgs.getVersioning();
     StorageType storageType = bucketArgs.getStorageType() == null ?
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index c000ad5..5717ca6 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -319,4 +319,9 @@
   public static final long OZONE_OM_ADMIN_PROTOCOL_WAIT_BETWEEN_RETRIES_DEFAULT
       = 1000;
 
+  public static final String OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT =
+      "ozone.om.unflushed.transaction.max.count";
+  public static final int OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT_DEFAULT
+      = 10000;
+
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
index d9bb975..001322e 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -133,40 +133,36 @@
     this.omProxyInfos = new HashMap<>();
     this.omNodeIDList = new ArrayList<>();
 
-    Collection<String> omServiceIds = Collections.singletonList(omSvcId);
+    Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config,
+        omSvcId);
 
-    for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
-      Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config,
-          serviceId);
+    for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
 
-      for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+      String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+          omSvcId, nodeId);
+      String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
+      if (rpcAddrStr == null) {
+        continue;
+      }
 
-        String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
-            serviceId, nodeId);
-        String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
-        if (rpcAddrStr == null) {
-          continue;
+      OMProxyInfo omProxyInfo = new OMProxyInfo(omSvcId, nodeId,
+          rpcAddrStr);
+
+      if (omProxyInfo.getAddress() != null) {
+
+
+        // For a non-HA OM setup, nodeId might be null. If so, we assign it
+        // the default value
+        if (nodeId == null) {
+          nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
         }
-
-        OMProxyInfo omProxyInfo = new OMProxyInfo(serviceId, nodeId,
-            rpcAddrStr);
-
-        if (omProxyInfo.getAddress() != null) {
-
-
-          // For a non-HA OM setup, nodeId might be null. If so, we assign it
-          // the default value
-          if (nodeId == null) {
-            nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
-          }
-          // ProxyInfo will be set during first time call to server.
-          omProxies.put(nodeId, null);
-          omProxyInfos.put(nodeId, omProxyInfo);
-          omNodeIDList.add(nodeId);
-        } else {
-          LOG.error("Failed to create OM proxy for {} at address {}",
-              nodeId, rpcAddrStr);
-        }
+        // ProxyInfo will be set during first time call to server.
+        omProxies.put(nodeId, null);
+        omProxyInfos.put(nodeId, omProxyInfo);
+        omNodeIDList.add(nodeId);
+      } else {
+        LOG.error("Failed to create OM proxy for {} at address {}",
+            nodeId, rpcAddrStr);
       }
     }
 
diff --git a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
index 2491a1a..078824c 100755
--- a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
+++ b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
@@ -82,6 +82,7 @@
 run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/om-audit-log4j2.properties" "etc/hadoop"
 run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/dn-audit-log4j2.properties" "etc/hadoop"
 run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/scm-audit-log4j2.properties" "etc/hadoop"
+run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/s3g-audit-log4j2.properties" "etc/hadoop"
 run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/ozone-shell-log4j.properties" "etc/hadoop"
 run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/ozone-site.xml" "etc/hadoop"
 run cp -f "${ROOT}/hadoop-ozone/dist/src/shell/conf/log4j.properties" "etc/hadoop"
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
index 498d02e..4baaca5 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
@@ -67,7 +67,7 @@
 HDFS-SITE.XML_dfs.datanode.http.address=0.0.0.0:1012
 CORE-SITE.XML_dfs.data.transfer.protection=authentication
 CORE-SITE.XML_hadoop.security.authentication=kerberos
-CORE-SITE.XML_hadoop.security.auth_to_local=RULE:[2:$1](testuser2.*) RULE:[2:$1@$0](.*)s/.*/root/
+CORE-SITE.XML_hadoop.security.auth_to_local="RULE:[2:$1](testuser2.*) RULE:[2:$1](testuser.*) RULE:[2:$1@$0](.*)s/.*/root/"
 CORE-SITE.XML_hadoop.security.key.provider.path=kms://http@kms:9600/kms
 
 
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/bucketlist.robot b/hadoop-ozone/dist/src/main/smoketest/s3/bucketlist.robot
index b30d260..cdea52f 100644
--- a/hadoop-ozone/dist/src/main/smoketest/s3/bucketlist.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/bucketlist.robot
@@ -32,6 +32,14 @@
     ${result} =         Execute AWSS3APICli     list-buckets | jq -r '.Buckets[].Name'
                         Should contain          ${result}    ${BUCKET}
 
+Get bucket info with Ozone Shell to check the owner field
+    Pass Execution If   '${SECURITY_ENABLED}' == 'false'    Skipping this check as security is not enabled
+    ${result} =         Execute             ozone sh bucket info /s3v/${BUCKET} | jq -r '.owner'
+                        Should Be Equal     ${result}       testuser
+                        # In ozonesecure(-ha) docker-config, hadoop.security.auth_to_local is set
+                        # in the way that getShortUserName() converts the accessId to "testuser".
+                        # Also see "Setup dummy credentials for S3" in commonawslib.robot
+
 List buckets with empty access id
                         Execute                    aws configure set aws_access_key_id ''
     ${result} =         Execute AWSS3APICli and checkrc         list-buckets    255
diff --git a/hadoop-ozone/dist/src/shell/conf/dn-audit-log4j2.properties b/hadoop-ozone/dist/src/shell/conf/dn-audit-log4j2.properties
index 3c4d045..479b455 100644
--- a/hadoop-ozone/dist/src/shell/conf/dn-audit-log4j2.properties
+++ b/hadoop-ozone/dist/src/shell/conf/dn-audit-log4j2.properties
@@ -28,7 +28,7 @@
 # in the configuration
 filter.read.type=MarkerFilter
 filter.read.marker=READ
-filter.read.onMatch=DENY
+filter.read.onMatch=NEUTRAL
 filter.read.onMismatch=NEUTRAL
 
 # filter.write.onMatch=DENY avoids logging all WRITE events
diff --git a/hadoop-ozone/dist/src/shell/conf/om-audit-log4j2.properties b/hadoop-ozone/dist/src/shell/conf/om-audit-log4j2.properties
index 57577e1..af707fd 100644
--- a/hadoop-ozone/dist/src/shell/conf/om-audit-log4j2.properties
+++ b/hadoop-ozone/dist/src/shell/conf/om-audit-log4j2.properties
@@ -28,7 +28,7 @@
 # in the configuration
 filter.read.type=MarkerFilter
 filter.read.marker=READ
-filter.read.onMatch=DENY
+filter.read.onMatch=NEUTRAL
 filter.read.onMismatch=NEUTRAL
 
 # filter.write.onMatch=DENY avoids logging all WRITE events
diff --git a/hadoop-ozone/dist/src/shell/conf/s3g-audit-log4j2.properties b/hadoop-ozone/dist/src/shell/conf/s3g-audit-log4j2.properties
new file mode 100644
index 0000000..8bc374e
--- /dev/null
+++ b/hadoop-ozone/dist/src/shell/conf/s3g-audit-log4j2.properties
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership.  The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# <p>
+# http://www.apache.org/licenses/LICENSE-2.0
+# <p>
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+#
+name=PropertiesConfig
+
+# Checks for config change periodically and reloads
+monitorInterval=30
+
+filter=read,write
+# filter.read.onMatch=DENY avoids logging all READ events
+# filter.read.onMatch=ACCEPT permits logging all READ events
+# The above two settings ignore the log levels in configuration
+# filter.read.onMatch=NEUTRAL permits logging of only those READ events
+# which are attempted at log level equal or greater than log level specified
+# in the configuration
+filter.read.type=MarkerFilter
+filter.read.marker=READ
+filter.read.onMatch=NEUTRAL
+filter.read.onMismatch=NEUTRAL
+
+# filter.write.onMatch=DENY avoids logging all WRITE events
+# filter.write.onMatch=ACCEPT permits logging all WRITE events
+# The above two settings ignore the log levels in configuration
+# filter.write.onMatch=NEUTRAL permits logging of only those WRITE events
+# which are attempted at log level equal or greater than log level specified
+# in the configuration
+filter.write.type=MarkerFilter
+filter.write.marker=WRITE
+filter.write.onMatch=NEUTRAL
+filter.write.onMismatch=NEUTRAL
+
+# Log Levels are organized from most specific to least:
+# OFF (most specific, no logging)
+# FATAL (most specific, little data)
+# ERROR
+# WARN
+# INFO
+# DEBUG
+# TRACE (least specific, a lot of data)
+# ALL (least specific, all data)
+
+# Uncomment following section to enable logging to console appender also
+#appenders=console, rolling
+#appender.console.type=Console
+#appender.console.name=STDOUT
+#appender.console.layout.type=PatternLayout
+#appender.console.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n
+
+# Comment this line when using both console and rolling appenders
+appenders=rolling
+
+#Rolling File Appender with size & time thresholds.
+#Rolling is triggered when either threshold is breached.
+#The rolled over file is compressed by default
+#Time interval is specified in seconds 86400s=1 day
+appender.rolling.type=RollingFile
+appender.rolling.name=RollingFile
+appender.rolling.fileName =${sys:hadoop.log.dir}/s3g-audit-${hostName}.log
+appender.rolling.filePattern=${sys:hadoop.log.dir}/s3g-audit-${hostName}-%d{yyyy-MM-dd-HH-mm-ss}-%i.log.gz
+appender.rolling.layout.type=PatternLayout
+appender.rolling.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n
+appender.rolling.policies.type=Policies
+appender.rolling.policies.time.type=TimeBasedTriggeringPolicy
+appender.rolling.policies.time.interval=86400
+appender.rolling.policies.size.type=SizeBasedTriggeringPolicy
+appender.rolling.policies.size.size=64MB
+
+loggers=audit
+logger.audit.type=AsyncLogger
+logger.audit.name=S3GAudit
+logger.audit.level=INFO
+logger.audit.appenderRefs=rolling
+logger.audit.appenderRef.file.ref=RollingFile
+
+rootLogger.level=INFO
+#rootLogger.appenderRefs=stdout
+#rootLogger.appenderRef.stdout.ref=STDOUT
diff --git a/hadoop-ozone/dist/src/shell/conf/scm-audit-log4j2.properties b/hadoop-ozone/dist/src/shell/conf/scm-audit-log4j2.properties
index 3f81561..e8c46d6 100644
--- a/hadoop-ozone/dist/src/shell/conf/scm-audit-log4j2.properties
+++ b/hadoop-ozone/dist/src/shell/conf/scm-audit-log4j2.properties
@@ -28,7 +28,7 @@
 # in the configuration
 filter.read.type=MarkerFilter
 filter.read.marker=READ
-filter.read.onMatch=DENY
+filter.read.onMatch=NEUTRAL
 filter.read.onMismatch=NEUTRAL
 
 # filter.write.onMatch=DENY avoids logging all WRITE events
diff --git a/hadoop-ozone/dist/src/shell/ozone/ozone b/hadoop-ozone/dist/src/shell/ozone/ozone
index 3b5ac09..72be8cf 100755
--- a/hadoop-ozone/dist/src/shell/ozone/ozone
+++ b/hadoop-ozone/dist/src/shell/ozone/ozone
@@ -167,6 +167,7 @@
     s3g)
       OZONE_SUBCMD_SUPPORTDAEMONIZATION="true"
       OZONE_CLASSNAME='org.apache.hadoop.ozone.s3.Gateway'
+      OZONE_S3G_OPTS="${OZONE_S3G_OPTS} -Dlog4j.configurationFile=${OZONE_CONF_DIR}/s3g-audit-log4j2.properties"
       OZONE_RUN_ARTIFACT_NAME="ozone-s3gateway"
     ;;
     csi)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
index b1b6196..dc6e237 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
@@ -80,12 +80,15 @@
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
 import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
+
+import org.apache.ratis.util.ExitUtils;
 import org.junit.After;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -129,6 +132,11 @@
   private String omId;
   private OzoneManagerProtocolClientSideTranslatorPB omClient;
 
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
   @Before
   public void init() {
     try {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index 2a3d5cc..3eeb3e9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -113,6 +113,8 @@
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.SCM_GET_PIPELINE_EXCEPTION;
 import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+
+import org.apache.ratis.util.ExitUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -168,6 +170,7 @@
 
   @BeforeClass
   public static void setUp() throws Exception {
+    ExitUtils.disableSystemExit();
     conf = new OzoneConfiguration();
     dir = GenericTestUtils.getRandomizedTestDir();
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
index 01c5f5a..fb5b437 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
@@ -226,7 +226,7 @@
     conf2.set(ScmConfigKeys.OZONE_SCM_PRIMORDIAL_NODE_ID_KEY,
         scm1.getSCMNodeId());
     Assert.assertTrue(StorageContainerManager.scmBootstrap(conf1));
-    scm1.getScmHAManager().shutdown();
+    scm1.getScmHAManager().stop();
     Assert.assertTrue(
         StorageContainerManager.scmInit(conf1, scm1.getClusterId()));
     Assert.assertTrue(StorageContainerManager.scmBootstrap(conf2));
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index f5db636..7731b9e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -257,6 +257,7 @@
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.bouncycastle.pkcs.PKCS10CertificationRequest;
@@ -566,6 +567,10 @@
     }
   }
 
+  public boolean isStopped() {
+    return omState == State.STOPPED;
+  }
+
   /**
    * Set the {@link S3Authentication} for the current rpc handler thread.
    */
@@ -785,20 +790,6 @@
     stop();
   }
 
-  public void shutdown(Exception ex) throws IOException {
-    if (omState != State.STOPPED) {
-      stop();
-      exitManager.exitSystem(1, ex.getLocalizedMessage(), ex, LOG);
-    }
-  }
-
-  public void shutdown(String errorMsg) throws IOException {
-    if (omState != State.STOPPED) {
-      stop();
-      exitManager.exitSystem(1, errorMsg, LOG);
-    }
-  }
-
   /**
    * Class which schedule saving metrics to a file.
    */
@@ -1897,6 +1888,9 @@
    */
   public void stop() {
     LOG.info("{}: Stopping Ozone Manager", omNodeDetails.getOMPrintInfo());
+    if (isStopped()) {
+      return;
+    }
     try {
       omState = State.STOPPED;
       // Cancel the metrics timer and set to null.
@@ -1932,12 +1926,16 @@
       if (omSnapshotProvider != null) {
         omSnapshotProvider.stop();
       }
-      omState = State.STOPPED;
     } catch (Exception e) {
       LOG.error("OzoneManager stop failed.", e);
     }
   }
 
+  public void shutDown(String message) {
+    stop();
+    ExitUtils.terminate(1, message, LOG);
+  }
+
   /**
    * Wait until service has completed shutdown.
    */
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index eaa38ef..b3c8262 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -28,6 +28,7 @@
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -103,6 +104,7 @@
 
   private final boolean isRatisEnabled;
   private final boolean isTracingEnabled;
+  private final Semaphore unFlushedTransactions;
 
   /**
    * function which will get term associated with the transaction index.
@@ -120,6 +122,7 @@
     private boolean isRatisEnabled = false;
     private boolean isTracingEnabled = false;
     private Function<Long, Long> indexToTerm = null;
+    private int maxUnFlushedTransactionCount = 0;
 
     public Builder setOmMetadataManager(OMMetadataManager omm) {
       this.mm = omm;
@@ -147,22 +150,30 @@
       return this;
     }
 
+    public Builder setmaxUnFlushedTransactionCount(int size) {
+      this.maxUnFlushedTransactionCount = size;
+      return this;
+    }
+
     public OzoneManagerDoubleBuffer build() {
       if (isRatisEnabled) {
         Preconditions.checkNotNull(rs, "When ratis is enabled, " +
                 "OzoneManagerRatisSnapshot should not be null");
         Preconditions.checkNotNull(indexToTerm, "When ratis is enabled " +
             "indexToTerm should not be null");
+        Preconditions.checkState(maxUnFlushedTransactionCount > 0L,
+            "when ratis is enable, maxUnFlushedTransactions " +
+                "should be bigger than 0");
       }
       return new OzoneManagerDoubleBuffer(mm, rs, isRatisEnabled,
-          isTracingEnabled, indexToTerm);
+          isTracingEnabled, indexToTerm, maxUnFlushedTransactionCount);
     }
   }
 
   private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
       OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot,
       boolean isRatisEnabled, boolean isTracingEnabled,
-      Function<Long, Long> indexToTerm) {
+      Function<Long, Long> indexToTerm, int maxUnFlushedTransactions) {
     this.currentBuffer = new ConcurrentLinkedQueue<>();
     this.readyBuffer = new ConcurrentLinkedQueue<>();
 
@@ -175,7 +186,7 @@
       this.currentFutureQueue = null;
       this.readyFutureQueue = null;
     }
-
+    this.unFlushedTransactions = new Semaphore(maxUnFlushedTransactions);
     this.omMetadataManager = omMetadataManager;
     this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
     this.ozoneManagerDoubleBufferMetrics =
@@ -190,6 +201,22 @@
 
   }
 
+  /**
+   * Acquires the given number of permits from unFlushedTransactions,
+   * blocking until all are available, or the thread is interrupted.
+   */
+  public void acquireUnFlushedTransactions(int n) throws InterruptedException {
+    unFlushedTransactions.acquire(n);
+  }
+
+  /**
+   *Releases the given number of permits,
+   *returning them to the unFlushedTransactions.
+   */
+  public void releaseUnFlushedTransactions(int n) {
+    unFlushedTransactions.release(n);
+  }
+
   // TODO: pass the trace id further down and trace all methods of DBStore.
 
   /**
@@ -289,7 +316,7 @@
 
             long startTime = Time.monotonicNow();
             flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
-                (SupplierWithIOException<Void>) () -> {
+                () -> {
                   omMetadataManager.getStore().commitBatchOperation(
                       batchOperation);
                   return null;
@@ -336,14 +363,16 @@
 
           readyBuffer.clear();
 
+          if (isRatisEnabled) {
+            releaseUnFlushedTransactions(flushedTransactionsSize);
+          }
+
           // update the last updated index in OzoneManagerStateMachine.
           ozoneManagerRatisSnapShot.updateLastAppliedIndex(
               flushedEpochs);
 
           // set metrics.
           updateMetrics(flushedTransactionsSize);
-
-
         }
       } catch (InterruptedException ex) {
         Thread.currentThread().interrupt();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 5305ce3..761c8e1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -559,7 +559,7 @@
   public void stop() {
     try {
       server.close();
-      omStateMachine.stop();
+      omStateMachine.close();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index e45c52c..78cbabf 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -35,6 +35,7 @@
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.ozone.common.ha.ratis.RatisSnapshotInfo;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.OzoneManagerPrepareState;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -53,7 +54,6 @@
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -118,6 +118,7 @@
     this.handler = new OzoneManagerRequestHandler(ozoneManager,
         ozoneManagerDoubleBuffer);
 
+
     ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
         .setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build();
     this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
@@ -308,6 +309,10 @@
       CompletableFuture<Message> ratisFuture =
           new CompletableFuture<>();
       applyTransactionMap.put(trxLogIndex, trx.getLogEntry().getTerm());
+
+      //if there are too many pending requests, wait for doubleBuffer flushing
+      ozoneManagerDoubleBuffer.acquireUnFlushedTransactions(1);
+
       CompletableFuture<OMResponse> future = CompletableFuture.supplyAsync(
           () -> runCommand(request, trxLogIndex), executorService);
       future.thenApply(omResponse -> {
@@ -395,9 +400,13 @@
   }
 
   public OzoneManagerDoubleBuffer buildDoubleBufferForRatis() {
+    int maxUnflushedTransactionSize = ozoneManager.getConfiguration()
+        .getInt(OMConfigKeys.OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT,
+            OMConfigKeys.OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT_DEFAULT);
     return new OzoneManagerDoubleBuffer.Builder()
         .setOmMetadataManager(ozoneManager.getMetadataManager())
         .setOzoneManagerRatisSnapShot(this::updateLastAppliedIndex)
+        .setmaxUnFlushedTransactionCount(maxUnflushedTransactionSize)
         .setIndexToTerm(this::getTermForIndex)
         .enableRatis(true)
         .enableTracing(isTracingEnabled)
@@ -470,8 +479,11 @@
     // OM should be shutdown as the StateMachine has shutdown.
     LOG.info("StateMachine has shutdown. Shutdown OzoneManager if not " +
         "already shutdown.");
-    ozoneManager.shutdown(new RaftException("RaftServer called shutdown on " +
-        "StateMachine"));
+    if (!ozoneManager.isStopped()) {
+      ozoneManager.shutDown("OM state machine is shutdown by Ratis server");
+    } else {
+      stop();
+    }
   }
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
index 01fade0..3ef2661 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
@@ -114,7 +114,7 @@
         + "-" + snapshotTime;
     String snapshotFilePath = Paths.get(omSnapshotDir.getAbsolutePath(),
         snapshotFileName).toFile().getAbsolutePath();
-    File targetFile = new File(snapshotFileName + ".tar.gz");
+    File targetFile = new File(snapshotFilePath + ".tar.gz");
 
     String omCheckpointUrl = peerNodesMap.get(leaderOMNodeID)
         .getOMDBCheckpointEnpointUrl(httpPolicy.isHttpEnabled());
@@ -135,6 +135,13 @@
 
       try (InputStream inputStream = httpURLConnection.getInputStream()) {
         FileUtils.copyInputStreamToFile(inputStream, targetFile);
+      } catch (IOException ex) {
+        LOG.error("OM snapshot {} cannot be downloaded.", targetFile, ex);
+        boolean deleted = FileUtils.deleteQuietly(targetFile);
+        if (!deleted) {
+          LOG.error("OM snapshot which failed to download {} cannot be deleted",
+              targetFile);
+        }
       }
       return null;
     });
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
index 02718a6..4089a07 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
@@ -27,6 +27,8 @@
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,6 +74,11 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(TestKeyDeletingService.class);
 
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
   private OzoneConfiguration createConfAndInitValues() throws IOException {
     OzoneConfiguration conf = new OzoneConfiguration();
     File newFolder = folder.newFolder();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
index 826571e..748519e 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java
@@ -70,9 +70,11 @@
 import org.apache.ozone.test.GenericTestUtils;
 
 import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.ExitUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -99,9 +101,14 @@
 
   private OzoneManagerProtocol writeClient;
   private OzoneManager om;
+
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
   
   @Before
-  public void setup() throws Exception {
+  public void init() throws Exception {
     configuration = new OzoneConfiguration();
     testDir = GenericTestUtils.getRandomizedTestDir();
     configuration.set(HddsConfigKeys.OZONE_METADATA_DIRS,
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestTrashService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestTrashService.java
index c62e3cc..2544f32 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestTrashService.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestTrashService.java
@@ -32,6 +32,7 @@
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.ratis.util.ExitUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -66,6 +67,7 @@
 
   @Before
   public void setup() throws IOException, AuthenticationException {
+    ExitUtils.disableSystemExit();
     OzoneConfiguration configuration = new OzoneConfiguration();
 
     File folder = tempFolder.newFolder();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
index 16dc322..a00acdf 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
@@ -81,6 +81,7 @@
     doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
         .setOmMetadataManager(omMetadataManager)
         .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
+        .setmaxUnFlushedTransactionCount(10000)
         .enableRatis(true)
         .setIndexToTerm((val) -> term)
         .build();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
index 92d5c62..21e67d0 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
@@ -107,9 +107,10 @@
     ozoneManagerRatisSnapshot = index -> {
       lastAppliedIndex = index.get(index.size() - 1);
     };
-    doubleBuffer = new OzoneManagerDoubleBuffer.Builder().
-        setOmMetadataManager(omMetadataManager).
-        setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
+    doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
+        .setOmMetadataManager(omMetadataManager)
+        .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
+        .setmaxUnFlushedTransactionCount(1)
         .enableRatis(true)
         .setIndexToTerm((i) -> term)
         .build();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
index 351f524..365eb60 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
@@ -46,10 +46,12 @@
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -82,6 +84,11 @@
   private SecurityConfig secConfig;
   private OMCertificateClient certClient;
 
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+  
   @Before
   public void init() throws Exception {
     conf = new OzoneConfiguration();
@@ -112,6 +119,7 @@
     initialTermIndex = TermIndex.valueOf(0, 0);
     RatisSnapshotInfo omRatisSnapshotInfo = new RatisSnapshotInfo();
     when(ozoneManager.getSnapshotInfo()).thenReturn(omRatisSnapshotInfo);
+    when(ozoneManager.getConfiguration()).thenReturn(conf);
     secConfig = new SecurityConfig(conf);
     certClient = new OMCertificateClient(secConfig);
     omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, ozoneManager,
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
index a0a7cd8..48dca13 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
@@ -84,6 +84,7 @@
     when(ozoneManagerRatisServer.getOzoneManager()).thenReturn(ozoneManager);
     when(ozoneManager.getSnapshotInfo()).thenReturn(
         Mockito.mock(RatisSnapshotInfo.class));
+    when(ozoneManager.getConfiguration()).thenReturn(conf);
     ozoneManagerStateMachine =
         new OzoneManagerStateMachine(ozoneManagerRatisServer, false);
     ozoneManagerStateMachine.notifyTermIndexUpdated(0, 0);
diff --git a/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.DtFetcher b/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.DtFetcher
new file mode 100644
index 0000000..6e86731
--- /dev/null
+++ b/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.DtFetcher
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.hadoop.fs.ozone.O3fsDtFetcher
diff --git a/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier b/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
new file mode 100644
index 0000000..e0292bc
--- /dev/null
+++ b/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.ozone.security.OzoneTokenIdentifier
+org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier
\ No newline at end of file
diff --git a/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
new file mode 100644
index 0000000..ac1b3cf
--- /dev/null
+++ b/hadoop-ozone/ozonefs-hadoop3/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.hadoop.fs.ozone.BasicOzoneClientAdapterImpl$Renewer
+org.apache.hadoop.fs.ozone.BasicRootedOzoneClientAdapterImpl$Renewer
diff --git a/hadoop-ozone/ozonefs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-ozone/ozonefs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
index bbb8221..ac1b3cf 100644
--- a/hadoop-ozone/ozonefs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
+++ b/hadoop-ozone/ozonefs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
@@ -17,3 +17,4 @@
 #
 
 org.apache.hadoop.fs.ozone.BasicOzoneClientAdapterImpl$Renewer
+org.apache.hadoop.fs.ozone.BasicRootedOzoneClientAdapterImpl$Renewer
diff --git a/hadoop-ozone/recon/pom.xml b/hadoop-ozone/recon/pom.xml
index 2cfead4..bf03b18 100644
--- a/hadoop-ozone/recon/pom.xml
+++ b/hadoop-ozone/recon/pom.xml
@@ -103,7 +103,7 @@
               <goal>install-node-and-npm</goal>
             </goals>
             <configuration>
-              <nodeVersion>v16.2.0</nodeVersion>
+              <nodeVersion>v16.14.2</nodeVersion>
             </configuration>
           </execution>
           <execution>
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
index e57bb70..5f7af83 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
@@ -95,7 +95,7 @@
     super(conf, scmStorageConfig, eventPublisher, networkTopology,
         SCMContext.emptyContext(), scmLayoutVersionManager);
     this.reconDatanodeOutdatedTime = reconStaleDatanodeMultiplier *
-        HddsServerUtil.getScmHeartbeatInterval(conf);
+        HddsServerUtil.getReconHeartbeatInterval(conf);
     this.nodeDB = nodeDB;
     loadExistingNodes();
   }
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index c72d2e4..c693bd2 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -83,6 +83,7 @@
 import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 
+import org.apache.ratis.util.ExitUtils;
 import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -313,6 +314,12 @@
     }
   }
 
+  @Override
+  public void shutDown(String message) {
+    stop();
+    ExitUtils.terminate(1, message, LOG);
+  }
+
   public ReconDatanodeProtocolServer getDatanodeProtocolServer() {
     return datanodeProtocolServer;
   }
diff --git a/hadoop-ozone/s3gateway/pom.xml b/hadoop-ozone/s3gateway/pom.xml
index ba9a7b5..ec751ed 100644
--- a/hadoop-ozone/s3gateway/pom.xml
+++ b/hadoop-ozone/s3gateway/pom.xml
@@ -192,6 +192,10 @@
       <artifactId>spotbugs</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/ClientIpFilter.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/ClientIpFilter.java
new file mode 100644
index 0000000..921b18d
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/ClientIpFilter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.s3;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Priority;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.container.ContainerRequestFilter;
+import javax.ws.rs.container.PreMatching;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.ext.Provider;
+import java.io.IOException;
+
+/**
+ * Filter used to get ClientIP from HttpServletRequest.
+ */
+
+@Provider
+@PreMatching
+@Priority(ClientIpFilter.PRIORITY)
+public class ClientIpFilter implements ContainerRequestFilter {
+
+  public static final int PRIORITY = 200;
+
+  public static final String CLIENT_IP_HEADER = "client_ip";
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ClientIpFilter.class);
+
+  @Context
+  private HttpServletRequest httpServletRequest;
+
+  @Override
+  public void filter(ContainerRequestContext request) throws IOException {
+    String clientIp = httpServletRequest.getHeader("x-real-ip");
+
+    if (clientIp == null || clientIp.isEmpty()) {
+      // extract from forward ips
+      String ipForwarded = httpServletRequest.getHeader("x-forwarded-for");
+      String[] ips = ipForwarded == null ? null : ipForwarded.split(",");
+      clientIp = (ips == null || ips.length == 0) ? null : ips[0];
+
+      // extract from remote addr
+      clientIp = (clientIp == null || clientIp.isEmpty()) ?
+          httpServletRequest.getRemoteAddr() : clientIp;
+    }
+    LOG.trace("Real Ip[{}]", clientIp);
+    request.getHeaders().putSingle(CLIENT_IP_HEADER, clientIp);
+  }
+
+}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java
index e02c888..d71a99b 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/BucketEndpoint.java
@@ -21,6 +21,7 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.audit.S3GAction;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadList;
@@ -37,6 +38,7 @@
 import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
 import org.apache.hadoop.ozone.s3.util.ContinueToken;
 import org.apache.hadoop.ozone.s3.util.S3StorageType;
+import org.apache.hadoop.ozone.s3.util.S3Utils;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.http.HttpStatus;
 import org.slf4j.Logger;
@@ -63,6 +65,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
@@ -87,7 +90,7 @@
    */
   @GET
   @SuppressFBWarnings
-  @SuppressWarnings("parameternumber")
+  @SuppressWarnings({"parameternumber", "methodlength"})
   public Response get(
       @PathParam("bucket") String bucketName,
       @QueryParam("delimiter") String delimiter,
@@ -100,33 +103,46 @@
       @QueryParam("uploads") String uploads,
       @QueryParam("acl") String aclMarker,
       @Context HttpHeaders hh) throws OS3Exception, IOException {
-
-    if (aclMarker != null) {
-      S3BucketAcl result = getAcl(bucketName);
-      getMetrics().incGetAclSuccess();
-      return Response.ok(result, MediaType.APPLICATION_XML_TYPE).build();
-    }
-
-    if (uploads != null) {
-      return listMultipartUploads(bucketName, prefix);
-    }
-
-    if (prefix == null) {
-      prefix = "";
-    }
-
-    OzoneBucket bucket = getBucket(bucketName);
-
+    S3GAction s3GAction = S3GAction.GET_BUCKET;
+    Map<String, String> auditParams = S3Utils.genAuditParam(
+        "bucket", bucketName,
+        "delimiter", delimiter,
+        "encoding-type", encodingType,
+        "marker", marker,
+        "max-keys", String.valueOf(maxKeys),
+        "prefix", prefix,
+        "continuation-token", continueToken,
+        "start-after", startAfter
+    );
     Iterator<? extends OzoneKey> ozoneKeyIterator;
-
     ContinueToken decodedToken =
         ContinueToken.decodeFromString(continueToken);
 
-    // Assign marker to startAfter. for the compatibility of aws api v1
-    if (startAfter == null && marker != null) {
-      startAfter = marker;
-    }
     try {
+      if (aclMarker != null) {
+        s3GAction = S3GAction.GET_ACL;
+        S3BucketAcl result = getAcl(bucketName);
+        getMetrics().incGetAclSuccess();
+        AUDIT.logReadSuccess(
+            buildAuditMessageForSuccess(s3GAction, auditParams));
+        return Response.ok(result, MediaType.APPLICATION_XML_TYPE).build();
+      }
+
+      if (uploads != null) {
+        s3GAction = S3GAction.LIST_MULTIPART_UPLOAD;
+        return listMultipartUploads(bucketName, prefix);
+      }
+
+      if (prefix == null) {
+        prefix = "";
+      }
+
+      // Assign marker to startAfter. for the compatibility of aws api v1
+      if (startAfter == null && marker != null) {
+        startAfter = marker;
+      }
+      
+      OzoneBucket bucket = getBucket(bucketName);
       if (startAfter != null && continueToken != null) {
         // If continuation token and start after both are provided, then we
         // ignore start After
@@ -139,12 +155,19 @@
         ozoneKeyIterator = bucket.listKeys(prefix);
       }
     } catch (OMException ex) {
+      AUDIT.logReadFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
       getMetrics().incGetBucketFailure();
       if (ex.getResult() == ResultCodes.PERMISSION_DENIED) {
         throw newError(S3ErrorTable.ACCESS_DENIED, bucketName, ex);
       } else {
         throw ex;
       }
+    } catch (Exception ex) {
+      getMetrics().incGetBucketFailure();
+      AUDIT.logReadFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
+      throw ex;
     }
 
     ListObjectResponse response = new ListObjectResponse();
@@ -215,6 +238,7 @@
       response.setTruncated(false);
     }
 
+    AUDIT.logReadSuccess(buildAuditMessageForSuccess(s3GAction, auditParams));
     getMetrics().incGetBucketSuccess();
     response.setKeyCount(
         response.getCommonPrefixes().size() + response.getContents().size());
@@ -223,19 +247,30 @@
 
   @PUT
   public Response put(@PathParam("bucket") String bucketName,
-      @QueryParam("acl") String aclMarker,
-      @Context HttpHeaders httpHeaders,
-      InputStream body) throws IOException, OS3Exception {
-    if (aclMarker != null) {
-      return putAcl(bucketName, httpHeaders, body);
-    }
+                      @QueryParam("acl") String aclMarker,
+                      @Context HttpHeaders httpHeaders,
+                      InputStream body) throws IOException, OS3Exception {
+    S3GAction s3GAction = S3GAction.CREATE_BUCKET;
+    Map<String, String> auditParams = S3Utils.genAuditParam(
+        "bucket", bucketName,
+        "acl", aclMarker
+    );
+
     try {
+      if (aclMarker != null) {
+        s3GAction = S3GAction.PUT_ACL;
+        return putAcl(bucketName, httpHeaders, body);
+      }
       String location = createS3Bucket(bucketName);
       LOG.info("Location is {}", location);
+      AUDIT.logWriteSuccess(
+          buildAuditMessageForSuccess(s3GAction, auditParams));
       getMetrics().incCreateBucketSuccess();
       return Response.status(HttpStatus.SC_OK).header("Location", location)
           .build();
     } catch (OMException exception) {
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, exception));
       getMetrics().incCreateBucketFailure();
       if (exception.getResult() == ResultCodes.INVALID_BUCKET_NAME) {
         throw newError(S3ErrorTable.INVALID_BUCKET_NAME, bucketName, exception);
@@ -243,6 +278,10 @@
       LOG.error("Error in Create Bucket Request for bucket: {}", bucketName,
           exception);
       throw exception;
+    } catch (Exception ex) {
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
+      throw ex;
     }
   }
 
@@ -250,33 +289,46 @@
       @PathParam("bucket") String bucketName,
       @QueryParam("prefix") String prefix)
       throws OS3Exception, IOException {
+    S3GAction s3GAction = S3GAction.LIST_MULTIPART_UPLOAD;
+    Map<String, String> auditParams = S3Utils.genAuditParam(
+        "bucket", bucketName,
+        "prefix", prefix
+    );
 
     OzoneBucket bucket = getBucket(bucketName);
 
-    OzoneMultipartUploadList ozoneMultipartUploadList;
     try {
-      ozoneMultipartUploadList = bucket.listMultipartUploads(prefix);
+      OzoneMultipartUploadList ozoneMultipartUploadList =
+          bucket.listMultipartUploads(prefix);
+
+      ListMultipartUploadsResult result = new ListMultipartUploadsResult();
+      result.setBucket(bucketName);
+
+      ozoneMultipartUploadList.getUploads().forEach(upload -> result.addUpload(
+          new ListMultipartUploadsResult.Upload(
+              upload.getKeyName(),
+              upload.getUploadId(),
+              upload.getCreationTime(),
+              S3StorageType.fromReplicationConfig(upload.getReplicationConfig())
+          )));
+      AUDIT.logReadSuccess(buildAuditMessageForSuccess(s3GAction, auditParams));
+      getMetrics().incListMultipartUploadsSuccess();
+      return Response.ok(result).build();
     } catch (OMException exception) {
+      AUDIT.logReadFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, exception));
       getMetrics().incListMultipartUploadsFailure();
       if (exception.getResult() == ResultCodes.PERMISSION_DENIED) {
         throw newError(S3ErrorTable.ACCESS_DENIED, prefix, exception);
       }
       throw exception;
+    } catch (Exception ex) {
+      AUDIT.logReadFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
+      throw ex;
     }
-
-    ListMultipartUploadsResult result = new ListMultipartUploadsResult();
-    result.setBucket(bucketName);
-
-    ozoneMultipartUploadList.getUploads().forEach(upload -> result.addUpload(
-        new ListMultipartUploadsResult.Upload(
-            upload.getKeyName(),
-            upload.getUploadId(),
-            upload.getCreationTime(),
-            S3StorageType.fromReplicationConfig(upload.getReplicationConfig())
-        )));
-    getMetrics().incListMultipartUploadsSuccess();
-    return Response.ok(result).build();
   }
+
   /**
    * Rest endpoint to check the existence of a bucket.
    * <p>
@@ -286,9 +338,20 @@
   @HEAD
   public Response head(@PathParam("bucket") String bucketName)
       throws OS3Exception, IOException {
-    getBucket(bucketName);
-    getMetrics().incHeadBucketSuccess();
-    return Response.ok().build();
+    S3GAction s3GAction = S3GAction.HEAD_BUCKET;
+    Map<String, String> auditParams = S3Utils.genAuditParam(
+        "bucket", bucketName);
+    try {
+      getBucket(bucketName);
+      AUDIT.logReadSuccess(
+          buildAuditMessageForSuccess(s3GAction, auditParams));
+      getMetrics().incHeadBucketSuccess();
+      return Response.ok().build();
+    } catch (Exception e) {
+      AUDIT.logReadFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, e));
+      throw e;
+    }
   }
 
   /**
@@ -300,10 +363,16 @@
   @DELETE
   public Response delete(@PathParam("bucket") String bucketName)
       throws IOException, OS3Exception {
+    S3GAction s3GAction = S3GAction.DELETE_BUCKET;
+    Map<String, String> auditParams = S3Utils.genAuditParam(
+        "bucket", bucketName
+    );
 
     try {
       deleteS3Bucket(bucketName);
     } catch (OMException ex) {
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
       getMetrics().incDeleteBucketFailure();
       if (ex.getResult() == ResultCodes.BUCKET_NOT_EMPTY) {
         throw newError(S3ErrorTable.BUCKET_NOT_EMPTY, bucketName, ex);
@@ -314,8 +383,13 @@
       } else {
         throw ex;
       }
+    } catch (Exception ex) {
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
+      throw ex;
     }
 
+    AUDIT.logWriteSuccess(buildAuditMessageForSuccess(s3GAction, auditParams));
     getMetrics().incDeleteBucketSuccess();
     return Response
         .status(HttpStatus.SC_NO_CONTENT)
@@ -332,8 +406,15 @@
   @POST
   @Produces(MediaType.APPLICATION_XML)
   public MultiDeleteResponse multiDelete(@PathParam("bucket") String bucketName,
-      @QueryParam("delete") String delete,
-      MultiDeleteRequest request) throws OS3Exception, IOException {
+                                         @QueryParam("delete") String delete,
+                                         MultiDeleteRequest request)
+      throws OS3Exception, IOException {
+    S3GAction s3GAction = S3GAction.MULTI_DELETE;
+    Map<String, String> auditParams = S3Utils.genAuditParam(
+        "bucket", bucketName,
+        "delete", delete
+    );
+
     OzoneBucket bucket = getBucket(bucketName);
     MultiDeleteResponse result = new MultiDeleteResponse();
     if (request.getObjects() != null) {
@@ -363,6 +444,13 @@
         }
       }
     }
+    if (result.getErrors().size() != 0) {
+      AUDIT.logWriteFailure(buildAuditMessageForFailure(s3GAction, auditParams,
+          new Exception("MultiDelete Exception")));
+    } else {
+      AUDIT.logWriteSuccess(
+          buildAuditMessageForSuccess(s3GAction, auditParams));
+    }
     return result;
   }
 
@@ -406,6 +494,9 @@
         LOG.error("Failed to get acl of Bucket " + bucketName, ex);
         throw newError(S3ErrorTable.INTERNAL_ERROR, bucketName, ex);
       }
+    } catch (OS3Exception ex) {
+      getMetrics().incGetAclFailure();
+      throw ex;
     }
   }
 
@@ -415,7 +506,7 @@
    * see: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketAcl.html
    */
   public Response putAcl(String bucketName, HttpHeaders httpHeaders,
-      InputStream body) throws IOException, OS3Exception {
+                         InputStream body) throws IOException, OS3Exception {
     String grantReads = httpHeaders.getHeaderString(S3Acl.GRANT_READ);
     String grantWrites = httpHeaders.getHeaderString(S3Acl.GRANT_WRITE);
     String grantReadACP = httpHeaders.getHeaderString(S3Acl.GRANT_READ_CAP);
@@ -433,7 +524,7 @@
           && grantWriteACP == null && grantFull == null) {
         S3BucketAcl putBucketAclRequest =
             new PutBucketAclRequestUnmarshaller().readFrom(
-            null, null, null, null, null, body);
+                null, null, null, null, null, body);
         // Handle grants in body
         ozoneAclListOnBucket.addAll(
             S3Acl.s3AclToOzoneNativeAclOnBucket(putBucketAclRequest));
@@ -473,7 +564,6 @@
               S3Acl.ACLType.FULL_CONTROL.getValue()));
         }
       }
-
       // A put request will reset all previous ACLs on bucket
       bucket.setAcl(ozoneAclListOnBucket);
       // A put request will reset input user/group's permission on volume
@@ -506,24 +596,28 @@
       LOG.error("Error in set ACL Request for bucket: {}", bucketName,
           exception);
       throw exception;
+    } catch (OS3Exception ex) {
+      getMetrics().incPutAclFailure();
+      throw ex;
     }
     getMetrics().incPutAclSuccess();
     return Response.status(HttpStatus.SC_OK).build();
   }
 
   /**
-   *  Example: x-amz-grant-write: \
-   *  uri="http://acs.amazonaws.com/groups/s3/LogDelivery", id="111122223333", \
-   *  id="555566667777".
+   * Example: x-amz-grant-write: \
+   * uri="http://acs.amazonaws.com/groups/s3/LogDelivery", id="111122223333", \
+   * id="555566667777".
    */
   private List<OzoneAcl> getAndConvertAclOnBucket(String value,
-      String permission) throws OS3Exception {
+                                                  String permission)
+      throws OS3Exception {
     List<OzoneAcl> ozoneAclList = new ArrayList<>();
     if (StringUtils.isEmpty(value)) {
       return ozoneAclList;
     }
     String[] subValues = value.split(",");
-    for (String acl: subValues) {
+    for (String acl : subValues) {
       String[] part = acl.split("=");
       if (part.length != 2) {
         throw newError(S3ErrorTable.INVALID_ARGUMENT, acl);
@@ -550,13 +644,14 @@
   }
 
   private List<OzoneAcl> getAndConvertAclOnVolume(String value,
-      String permission) throws OS3Exception {
+                                                  String permission)
+      throws OS3Exception {
     List<OzoneAcl> ozoneAclList = new ArrayList<>();
     if (StringUtils.isEmpty(value)) {
       return ozoneAclList;
     }
     String[] subValues = value.split(",");
-    for (String acl: subValues) {
+    for (String acl : subValues) {
       String[] part = acl.split("=");
       if (part.length != 2) {
         throw newError(S3ErrorTable.INVALID_ARGUMENT, acl);
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index a9411dd..b6a928f 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@ -19,12 +19,21 @@
 
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.core.Context;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.function.Function;
 
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.ozone.audit.AuditAction;
+import org.apache.hadoop.ozone.audit.AuditEventStatus;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditLoggerType;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.audit.Auditor;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneVolume;
@@ -39,20 +48,27 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.s3.ClientIpFilter.CLIENT_IP_HEADER;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError;
 
 /**
  * Basic helpers for all the REST endpoints.
  */
-public abstract class EndpointBase {
+public abstract class EndpointBase implements Auditor {
 
   @Inject
   private OzoneClient client;
   @Inject
   private S3Auth s3Auth;
+  @Context
+  private ContainerRequestContext context;
+
   private static final Logger LOG =
       LoggerFactory.getLogger(EndpointBase.class);
 
+  protected static final AuditLogger AUDIT =
+      new AuditLogger(AuditLoggerType.S3GLOGGER);
+
   protected OzoneBucket getBucket(OzoneVolume volume, String bucketName)
       throws OS3Exception, IOException {
     OzoneBucket bucket;
@@ -119,6 +135,7 @@
     try {
       client.getObjectStore().createS3Bucket(bucketName);
     } catch (OMException ex) {
+      getMetrics().incCreateBucketFailure();
       if (ex.getResult() == ResultCodes.PERMISSION_DENIED) {
         throw newError(S3ErrorTable.ACCESS_DENIED, bucketName, ex);
       } else if (ex.getResult() != ResultCodes.BUCKET_ALREADY_EXISTS) {
@@ -191,6 +208,40 @@
     }
   }
 
+  private AuditMessage.Builder auditMessageBaseBuilder(AuditAction op,
+      Map<String, String> auditMap) {
+    AuditMessage.Builder builder = new AuditMessage.Builder()
+        .forOperation(op)
+        .withParams(auditMap);
+    if (s3Auth != null &&
+        s3Auth.getAccessID() != null &&
+        !s3Auth.getAccessID().isEmpty()) {
+      builder.setUser(s3Auth.getAccessID());
+    }
+    if (context != null) {
+      builder.atIp(getClientIpAddress());
+    }
+    return builder;
+  }
+
+  @Override
+  public AuditMessage buildAuditMessageForSuccess(AuditAction op,
+      Map<String, String> auditMap) {
+    AuditMessage.Builder builder = auditMessageBaseBuilder(op, auditMap)
+        .withResult(AuditEventStatus.SUCCESS);
+    return builder.build();
+  }
+
+  @Override
+  public AuditMessage buildAuditMessageForFailure(AuditAction op,
+      Map<String, String> auditMap, Throwable throwable) {
+    AuditMessage.Builder builder = auditMessageBaseBuilder(op, auditMap)
+        .withResult(AuditEventStatus.FAILURE)
+        .withException(throwable);
+    return builder.build();
+  }
+
+
   @VisibleForTesting
   public void setClient(OzoneClient ozoneClient) {
     this.client = ozoneClient;
@@ -204,4 +255,8 @@
   public S3GatewayMetrics getMetrics() {
     return S3GatewayMetrics.create();
   }
+
+  public String getClientIpAddress() {
+    return context.getHeaderString(CLIENT_IP_HEADER);
+  }
 }
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 5b37d1f..8824e98 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -59,6 +59,7 @@
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.S3GAction;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneKeyDetails;
@@ -110,6 +111,7 @@
 import static org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER_SUPPORTED_UNIT;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
+import static org.apache.hadoop.ozone.s3.util.S3Utils.genAuditParam;
 import static org.apache.hadoop.ozone.s3.util.S3Utils.urlDecode;
 
 import org.apache.http.HttpStatus;
@@ -166,16 +168,29 @@
       @QueryParam("uploadId") @DefaultValue("") String uploadID,
       InputStream body) throws IOException, OS3Exception {
 
-    OzoneOutputStream output = null;
-
-    if (uploadID != null && !uploadID.equals("")) {
-      // If uploadID is specified, it is a request for upload part
-      return createMultipartKey(bucketName, keyPath, length,
-          partNumber, uploadID, body);
+    S3GAction s3GAction = S3GAction.CREATE_KEY;
+    boolean auditSuccess = true;
+    Map<String, String> auditParams = genAuditParam(
+        "bucket", bucketName,
+        "path", keyPath,
+        "Content-Length", String.valueOf(length),
+        "partNumber", String.valueOf(partNumber)
+    );
+    if (partNumber != 0) {
+      auditParams.put("uploadId", uploadID);
     }
 
+    OzoneOutputStream output = null;
+
     String copyHeader = null, storageType = null;
     try {
+      if (uploadID != null && !uploadID.equals("")) {
+        s3GAction = S3GAction.CREATE_MULTIPART_KEY;
+        // If uploadID is specified, it is a request for upload part
+        return createMultipartKey(bucketName, keyPath, length,
+            partNumber, uploadID, body);
+      }
+
       copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
       storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
       boolean storageTypeDefault = StringUtils.isEmpty(storageType);
@@ -187,6 +202,7 @@
 
       if (copyHeader != null) {
         //Copy object, as copy source available.
+        s3GAction = S3GAction.COPY_OBJECT;
         CopyObjectResponse copyObjectResponse = copyObject(
             copyHeader, bucket, keyPath, replicationConfig, storageTypeDefault);
         return Response.status(Status.OK).entity(copyObjectResponse).header(
@@ -207,6 +223,9 @@
       return Response.ok().status(HttpStatus.SC_OK)
           .build();
     } catch (OMException ex) {
+      auditSuccess = false;
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
       if (copyHeader != null) {
         getMetrics().incCopyObjectFailure();
       } else {
@@ -225,7 +244,16 @@
       }
       LOG.error("Exception occurred in PutObject", ex);
       throw ex;
+    } catch (Exception ex) {
+      auditSuccess = false;
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
+      throw ex;
     } finally {
+      if (auditSuccess) {
+        AUDIT.logWriteSuccess(
+            buildAuditMessageForSuccess(s3GAction, auditParams));
+      }
       if (output != null) {
         output.close();
       }
@@ -249,10 +277,21 @@
       @QueryParam("max-parts") @DefaultValue("1000") int maxParts,
       @QueryParam("part-number-marker") String partNumberMarker,
       InputStream body) throws IOException, OS3Exception {
-    try {
 
+    S3GAction s3GAction = S3GAction.GET_KEY;
+    boolean auditSuccess = true;
+    Map<String, String> auditParams = genAuditParam(
+        "bucket", bucketName,
+        "path", keyPath,
+        "uploadId", uploadId,
+        "max-parts", String.valueOf(maxParts),
+        "part-number-marker", partNumberMarker
+    );
+
+    try {
       if (uploadId != null) {
         // When we have uploadId, this is the request for list Parts.
+        s3GAction = S3GAction.LIST_PARTS;
         int partMarker = parsePartNumberMarker(partNumberMarker);
         return listParts(bucketName, keyPath, uploadId,
             partMarker, maxParts);
@@ -327,6 +366,10 @@
       getMetrics().incGetKeySuccess();
       return responseBuilder.build();
     } catch (OMException ex) {
+      auditSuccess = false;
+      AUDIT.logReadFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex)
+      );
       if (uploadId != null) {
         getMetrics().incListPartsFailure();
       } else {
@@ -339,6 +382,18 @@
       } else {
         throw ex;
       }
+    } catch (Exception ex) {
+      auditSuccess = false;
+      AUDIT.logReadFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex)
+      );
+      throw ex;
+    } finally {
+      if (auditSuccess) {
+        AUDIT.logReadSuccess(
+            buildAuditMessageForSuccess(s3GAction, auditParams)
+        );
+      }
     }
   }
 
@@ -364,12 +419,19 @@
       @PathParam("bucket") String bucketName,
       @PathParam("path") String keyPath) throws IOException, OS3Exception {
 
-    OzoneKey key;
+    S3GAction s3GAction = S3GAction.HEAD_KEY;
+    Map<String, String> auditParams = genAuditParam(
+        "bucket", bucketName,
+        "keyPath", keyPath
+    );
 
+    OzoneKey key;
     try {
       key = getBucket(bucketName).headObject(keyPath);
       // TODO: return the specified range bytes of this object.
     } catch (OMException ex) {
+      AUDIT.logReadFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
       getMetrics().incHeadKeyFailure();
       if (ex.getResult() == ResultCodes.KEY_NOT_FOUND) {
         // Just return 404 with no content
@@ -379,6 +441,10 @@
       } else {
         throw ex;
       }
+    } catch (Exception ex) {
+      AUDIT.logReadFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
+      throw ex;
     }
 
     ResponseBuilder response = Response.ok().status(HttpStatus.SC_OK)
@@ -387,6 +453,7 @@
         .header("Content-Type", "binary/octet-stream");
     addLastModifiedDate(response, key);
     getMetrics().incHeadKeySuccess();
+    AUDIT.logReadSuccess(buildAuditMessageForSuccess(s3GAction, auditParams));
     return response.build();
   }
 
@@ -433,14 +500,24 @@
       @QueryParam("uploadId") @DefaultValue("") String uploadId) throws
       IOException, OS3Exception {
 
+    S3GAction s3GAction = S3GAction.DELETE_KEY;
+    Map<String, String> auditParams = genAuditParam(
+        "bucket", bucketName,
+        "path", keyPath,
+        "uploadId", uploadId
+    );
+
     try {
       if (uploadId != null && !uploadId.equals("")) {
+        s3GAction = S3GAction.ABORT_MULTIPART_UPLOAD;
         return abortMultipartUpload(bucketName, keyPath, uploadId);
       }
       OzoneBucket bucket = getBucket(bucketName);
       bucket.getKey(keyPath);
       bucket.deleteKey(keyPath);
     } catch (OMException ex) {
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
       if (uploadId != null && !uploadId.equals("")) {
         getMetrics().incAbortMultiPartUploadFailure();
       } else {
@@ -461,9 +538,13 @@
       } else {
         throw ex;
       }
-
+    } catch (Exception ex) {
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
+      throw ex;
     }
     getMetrics().incDeleteKeySuccess();
+    AUDIT.logWriteSuccess(buildAuditMessageForSuccess(s3GAction, auditParams));
     return Response
         .status(Status.NO_CONTENT)
         .build();
@@ -482,6 +563,12 @@
       @PathParam("path") String key
   )
       throws IOException, OS3Exception {
+    S3GAction s3GAction = S3GAction.INIT_MULTIPART_UPLOAD;
+    Map<String, String> auditParams = genAuditParam(
+        "bucket", bucket,
+        "path", key
+    );
+
     try {
       OzoneBucket ozoneBucket = getBucket(bucket);
       String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
@@ -499,10 +586,14 @@
       multipartUploadInitiateResponse.setKey(key);
       multipartUploadInitiateResponse.setUploadID(multipartInfo.getUploadID());
 
+      AUDIT.logWriteSuccess(
+          buildAuditMessageForSuccess(s3GAction, auditParams));
       getMetrics().incInitMultiPartUploadSuccess();
       return Response.status(Status.OK).entity(
           multipartUploadInitiateResponse).build();
     } catch (OMException ex) {
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
       getMetrics().incInitMultiPartUploadFailure();
       if (ex.getResult() == ResultCodes.PERMISSION_DENIED) {
         throw newError(S3ErrorTable.ACCESS_DENIED, key, ex);
@@ -510,6 +601,10 @@
       LOG.error("Error in Initiate Multipart Upload Request for bucket: {}, " +
           "key: {}", bucket, key, ex);
       throw ex;
+    } catch (Exception ex) {
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
+      throw ex;
     }
   }
 
@@ -537,6 +632,12 @@
       @QueryParam("uploadId") @DefaultValue("") String uploadID,
       CompleteMultipartUploadRequest multipartUploadRequest)
       throws IOException, OS3Exception {
+    S3GAction s3GAction = S3GAction.COMPLETE_MULTIPART_UPLOAD;
+    Map<String, String> auditParams = genAuditParam(
+        "bucket", bucket,
+        "path", key,
+        "uploadId", uploadID
+    );
     OzoneBucket ozoneBucket = getBucket(bucket);
     // Using LinkedHashMap to preserve ordering of parts list.
     Map<Integer, String> partsMap = new LinkedHashMap<>();
@@ -562,10 +663,14 @@
           .getHash());
       // Location also setting as bucket name.
       completeMultipartUploadResponse.setLocation(bucket);
+      AUDIT.logWriteSuccess(
+          buildAuditMessageForSuccess(s3GAction, auditParams));
       getMetrics().incCompleteMultiPartUploadSuccess();
       return Response.status(Status.OK).entity(completeMultipartUploadResponse)
           .build();
     } catch (OMException ex) {
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
       getMetrics().incCompleteMultiPartUploadFailure();
       if (ex.getResult() == ResultCodes.INVALID_PART) {
         throw newError(S3ErrorTable.INVALID_PART, key, ex);
@@ -593,6 +698,10 @@
       LOG.error("Error in Complete Multipart Upload Request for bucket: {}, " +
           ", key: {}", bucket, key, ex);
       throw ex;
+    } catch (Exception ex) {
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(s3GAction, auditParams, ex));
+      throw ex;
     }
   }
 
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/RootEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/RootEndpoint.java
index 615fb01..57d0d12 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/RootEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/RootEndpoint.java
@@ -21,8 +21,10 @@
 import javax.ws.rs.Path;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Iterator;
 
+import org.apache.hadoop.ozone.audit.S3GAction;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.s3.commontypes.BucketMetadata;
 import org.apache.hadoop.ozone.s3.exception.OS3Exception;
@@ -48,26 +50,43 @@
   @GET
   public Response get()
       throws OS3Exception, IOException {
-    ListBucketResponse response = new ListBucketResponse();
-
-    Iterator<? extends OzoneBucket> bucketIterator;
+    boolean auditSuccess = true;
     try {
-      bucketIterator = listS3Buckets(null);
-    } catch (Exception e) {
-      getMetrics().incListS3BucketsFailure();
-      throw e;
-    }
+      ListBucketResponse response = new ListBucketResponse();
 
-    while (bucketIterator.hasNext()) {
-      OzoneBucket next = bucketIterator.next();
-      BucketMetadata bucketMetadata = new BucketMetadata();
-      bucketMetadata.setName(next.getName());
-      bucketMetadata.setCreationDate(next.getCreationTime());
-      response.addBucket(bucketMetadata);
-    }
+      Iterator<? extends OzoneBucket> bucketIterator;
+      try {
+        bucketIterator = listS3Buckets(null);
+      } catch (Exception e) {
+        getMetrics().incListS3BucketsFailure();
+        throw e;
+      }
 
-    getMetrics().incListS3BucketsSuccess();
-    return Response.ok(response).build();
+      while (bucketIterator.hasNext()) {
+        OzoneBucket next = bucketIterator.next();
+        BucketMetadata bucketMetadata = new BucketMetadata();
+        bucketMetadata.setName(next.getName());
+        bucketMetadata.setCreationDate(next.getCreationTime());
+        response.addBucket(bucketMetadata);
+      }
+
+      getMetrics().incListS3BucketsSuccess();
+      return Response.ok(response).build();
+    } catch (Exception ex) {
+      auditSuccess = false;
+      AUDIT.logReadFailure(
+          buildAuditMessageForFailure(S3GAction.LIST_S3_BUCKETS,
+              Collections.emptyMap(), ex)
+      );
+      throw ex;
+    } finally {
+      if (auditSuccess) {
+        AUDIT.logReadSuccess(
+            buildAuditMessageForSuccess(S3GAction.LIST_S3_BUCKETS,
+                Collections.emptyMap())
+        );
+      }
+    }
   }
 
   @Override
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/S3GatewayMetrics.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/S3GatewayMetrics.java
index 012da2a..b8b4ce7 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/S3GatewayMetrics.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/S3GatewayMetrics.java
@@ -92,9 +92,9 @@
   }
 
   /**
-   * Create and returns SCMPipelineMetrics instance.
+   * Create and returns S3 Gateway Metrics instance.
    *
-   * @return SCMPipelineMetrics
+   * @return S3GatewayMetrics
    */
   public static synchronized S3GatewayMetrics create() {
     if (instance != null) {
@@ -317,4 +317,52 @@
   public long getHeadKeySuccess() {
     return headKeySuccess.value();
   }
+
+  public long getGetBucketSuccess() {
+    return getBucketSuccess.value();
+  }
+
+  public long getGetBucketFailure() {
+    return getBucketFailure.value();
+  }
+
+  public long getCreateBucketSuccess() {
+    return createBucketSuccess.value();
+  }
+
+  public long getCreateBucketFailure() {
+    return createBucketFailure.value();
+  }
+
+  public long getDeleteBucketSuccess() {
+    return deleteBucketSuccess.value();
+  }
+
+  public long getDeleteBucketFailure() {
+    return deleteBucketFailure.value();
+  }
+
+  public long getGetAclSuccess() {
+    return getAclSuccess.value();
+  }
+
+  public long getGetAclFailure() {
+    return getAclFailure.value();
+  }
+
+  public long getPutAclSuccess() {
+    return putAclSuccess.value();
+  }
+
+  public long getPutAclFailure() {
+    return putAclFailure.value();
+  }
+
+  public long getListMultipartUploadsSuccess() {
+    return listMultipartUploadsSuccess.value();
+  }
+
+  public long getListMultipartUploadsFailure() {
+    return listMultipartUploadsFailure.value();
+  }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3Utils.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3Utils.java
index d644162..5f110e8 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3Utils.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/util/S3Utils.java
@@ -22,10 +22,13 @@
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+import org.apache.commons.lang3.StringUtils;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
+import java.util.Map;
+import java.util.TreeMap;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_ARGUMENT;
@@ -46,6 +49,22 @@
     return URLEncoder.encode(str, UTF_8.name());
   }
 
+  public static Map<String, String> genAuditParam(String... strs) {
+    if (strs.length % 2 == 1) {
+      throw new IllegalArgumentException("Unexpected number of parameters: "
+          + strs.length);
+    }
+    Map<String, String> auditParams = new TreeMap<>();
+    for (int i = 0; i < strs.length; i++) {
+      if (StringUtils.isEmpty(strs[i]) || StringUtils.isEmpty(strs[i + 1])) {
+        ++i;
+        continue;
+      }
+      auditParams.put(strs[i], strs[++i]);
+    }
+    return auditParams;
+  }
+
   private S3Utils() {
     // no instances
   }
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestS3GatewayAuditLog.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestS3GatewayAuditLog.java
new file mode 100644
index 0000000..a598bde
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestS3GatewayAuditLog.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hadoop.ozone.s3;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.s3.endpoint.BucketEndpoint;
+import org.apache.hadoop.ozone.s3.endpoint.ObjectEndpoint;
+import org.apache.hadoop.ozone.s3.endpoint.RootEndpoint;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for S3Gateway Audit Log.
+ */
+public class TestS3GatewayAuditLog {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestS3GatewayAuditLog.class.getName());
+
+  static {
+    System.setProperty("log4j.configurationFile", "auditlog.properties");
+    System.setProperty("log4j2.contextSelector",
+        "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
+  }
+
+  private String bucketName = OzoneConsts.BUCKET;
+  private OzoneClient clientStub;
+  private BucketEndpoint bucketEndpoint;
+  private RootEndpoint rootEndpoint;
+  private ObjectEndpoint keyEndpoint;
+  private OzoneBucket bucket;
+
+  @Before
+  public void setup() throws Exception {
+
+    clientStub = new OzoneClientStub();
+    clientStub.getObjectStore().createS3Bucket(bucketName);
+    bucket = clientStub.getObjectStore().getS3Bucket(bucketName);
+
+    bucketEndpoint = new BucketEndpoint();
+    bucketEndpoint.setClient(clientStub);
+
+    rootEndpoint = new RootEndpoint();
+    rootEndpoint.setClient(clientStub);
+
+    keyEndpoint = new ObjectEndpoint();
+    keyEndpoint.setClient(clientStub);
+    keyEndpoint.setOzoneConfiguration(new OzoneConfiguration());
+
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    File file = new File("audit.log");
+    if (FileUtils.deleteQuietly(file)) {
+      LOG.info("{} has been deleted as all tests have completed.",
+          file.getName());
+    } else {
+      LOG.info("audit.log could not be deleted.");
+    }
+  }
+
+  @Test
+  public void testHeadBucket() throws Exception {
+    bucketEndpoint.head(bucketName);
+    String expected = "INFO  | S3GAudit | ? | user=null | ip=null | " +
+        "op=HEAD_BUCKET {bucket=bucket} | ret=SUCCESS";
+    verifyLog(expected);
+  }
+
+  @Test
+  public void testListBucket() throws Exception {
+
+    rootEndpoint.get().getEntity();
+    String expected = "INFO  | S3GAudit | ? | user=null | ip=null | " +
+        "op=LIST_S3_BUCKETS {} | ret=SUCCESS";
+    verifyLog(expected);
+  }
+
+  @Test
+  public void testHeadObject() throws Exception {
+    String value = RandomStringUtils.randomAlphanumeric(32);
+    OzoneOutputStream out = bucket.createKey("key1",
+        value.getBytes(UTF_8).length, ReplicationType.RATIS,
+        ReplicationFactor.ONE, new HashMap<>());
+    out.write(value.getBytes(UTF_8));
+    out.close();
+
+
+    keyEndpoint.head(bucketName, "key1");
+    String expected = "INFO  | S3GAudit | ? | user=null | ip=null | " +
+        "op=HEAD_KEY {bucket=bucket, keyPath=key1} | ret=SUCCESS";
+    verifyLog(expected);
+
+  }
+
+  private void verifyLog(String expectedString) throws IOException {
+    File file = new File("audit.log");
+    List<String> lines = FileUtils.readLines(file, (String)null);
+    final int retry = 5;
+    int i = 0;
+    while (lines.isEmpty() && i < retry) {
+      lines = FileUtils.readLines(file, (String)null);
+      try {
+        Thread.sleep(500 * (i + 1));
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        break;
+      }
+      i++;
+    }
+    assertEquals(lines.get(0), expectedString);
+
+    //empty the file
+    lines.clear();
+    FileUtils.writeLines(file, lines, false);
+  }
+
+}
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java
index c5d6606..ccf36a7 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java
@@ -31,13 +31,23 @@
 import org.apache.hadoop.ozone.s3.endpoint.BucketEndpoint;
 import org.apache.hadoop.ozone.s3.endpoint.ObjectEndpoint;
 import org.apache.hadoop.ozone.s3.endpoint.RootEndpoint;
+import org.apache.hadoop.ozone.s3.endpoint.TestBucketAcl;
+import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 
+import static java.net.HttpURLConnection.HTTP_OK;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link S3GatewayMetrics}.
@@ -50,9 +60,11 @@
   private RootEndpoint rootEndpoint;
   private ObjectEndpoint keyEndpoint;
   private OzoneBucket bucket;
-
+  private HttpHeaders headers;
+  private static final String ACL_MARKER = "acl";
   private S3GatewayMetrics metrics;
 
+
   @Before
   public void setup() throws Exception {
     clientStub = new OzoneClientStub();
@@ -69,6 +81,7 @@
     keyEndpoint.setClient(clientStub);
     keyEndpoint.setOzoneConfiguration(new OzoneConfiguration());
 
+    headers = Mockito.mock(HttpHeaders.class);
     metrics = bucketEndpoint.getMetrics();
   }
 
@@ -110,4 +123,160 @@
     long curMetric = metrics.getHeadKeySuccess();
     assertEquals(1L, curMetric - oriMetric);
   }
-}
+
+  @Test
+  public void testGetBucketSuccess() throws Exception {
+    long oriMetric = metrics.getGetBucketSuccess();
+
+    clientStub = createClientWithKeys("file1");
+    bucketEndpoint.setClient(clientStub);
+    bucketEndpoint.get(bucketName, null,
+        null, null, 1000, null,
+        null, "random", null,
+        null, null).getEntity();
+
+    long curMetric = metrics.getGetBucketSuccess();
+    assertEquals(1L, curMetric - oriMetric);
+  }
+
+  @Test
+  public void testGetBucketFailure() throws Exception {
+    long oriMetric = metrics.getGetBucketFailure();
+
+    try {
+      // Searching for a bucket that does not exist
+      bucketEndpoint.get("newBucket", null,
+          null, null, 1000, null,
+          null, "random", null,
+          null, null);
+      fail();
+    } catch (OS3Exception e) {
+    }
+
+    long curMetric = metrics.getGetBucketFailure();
+    assertEquals(1L, curMetric - oriMetric);
+  }
+
+  @Test
+  public void testCreateBucketSuccess() throws Exception {
+
+    long oriMetric = metrics.getCreateBucketSuccess();
+
+    bucketEndpoint.put(bucketName, null,
+        null, null);
+    long curMetric = metrics.getCreateBucketSuccess();
+
+    assertEquals(1L, curMetric - oriMetric);
+  }
+
+  @Test
+  public void testCreateBucketFailure() throws Exception {
+    // Creating an error by trying to create a bucket that already exists
+    long oriMetric = metrics.getCreateBucketFailure();
+
+    bucketEndpoint.put(bucketName, null, null, null);
+
+    long curMetric = metrics.getCreateBucketFailure();
+    assertEquals(1L, curMetric - oriMetric);
+  }
+
+  @Test
+  public void testDeleteBucketSuccess() throws Exception {
+    long oriMetric = metrics.getDeleteBucketSuccess();
+
+    bucketEndpoint.delete(bucketName);
+
+    long curMetric = metrics.getDeleteBucketSuccess();
+    assertEquals(1L, curMetric - oriMetric);
+  }
+
+  @Test
+  public void testDeleteBucketFailure() throws Exception {
+    long oriMetric = metrics.getDeleteBucketFailure();
+    bucketEndpoint.delete(bucketName);
+    try {
+      // Deleting a bucket that does not exist will result in delete failure
+      bucketEndpoint.delete(bucketName);
+      fail();
+    } catch (OS3Exception ex) {
+      assertEquals(S3ErrorTable.NO_SUCH_BUCKET.getCode(), ex.getCode());
+      assertEquals(S3ErrorTable.NO_SUCH_BUCKET.getErrorMessage(),
+          ex.getErrorMessage());
+    }
+
+    long curMetric = metrics.getDeleteBucketFailure();
+    assertEquals(1L, curMetric - oriMetric);
+  }
+
+  @Test
+  public void testGetAclSuccess() throws Exception {
+    long oriMetric = metrics.getGetAclSuccess();
+
+    Response response =
+        bucketEndpoint.get(bucketName, null, null,
+            null, 0, null, null,
+            null, null, "acl", null);
+    long curMetric = metrics.getGetAclSuccess();
+    assertEquals(HTTP_OK, response.getStatus());
+    assertEquals(1L, curMetric - oriMetric);
+  }
+
+  @Test
+  public void testGetAclFailure() throws Exception {
+    long oriMetric = metrics.getGetAclFailure();
+    try {
+      // Failing the getACL endpoint by applying ACL on a non-Existent Bucket
+      bucketEndpoint.get("random_bucket", null,
+          null, null, 0, null,
+          null, null, null, "acl", null);
+      fail();
+    } catch (OS3Exception ex) {
+      assertEquals(S3ErrorTable.NO_SUCH_BUCKET.getCode(), ex.getCode());
+      assertEquals(S3ErrorTable.NO_SUCH_BUCKET.getErrorMessage(),
+          ex.getErrorMessage());
+    }
+    long curMetric = metrics.getGetAclFailure();
+    assertEquals(1L, curMetric - oriMetric);
+  }
+
+  @Test
+  public void testPutAclSuccess() throws Exception {
+    long oriMetric = metrics.getPutAclSuccess();
+
+    clientStub.getObjectStore().createS3Bucket("b1");
+    InputStream inputBody = TestBucketAcl.class.getClassLoader()
+        .getResourceAsStream("userAccessControlList.xml");
+
+    bucketEndpoint.put("b1", ACL_MARKER, headers, inputBody);
+    inputBody.close();
+    long curMetric = metrics.getPutAclSuccess();
+    assertEquals(1L, curMetric - oriMetric);
+  }
+
+  @Test
+  public void testPutAclFailure() throws Exception {
+    // Failing the putACL endpoint by applying ACL on a non-Existent Bucket
+    long oriMetric = metrics.getPutAclFailure();
+
+    InputStream inputBody = TestBucketAcl.class.getClassLoader()
+        .getResourceAsStream("userAccessControlList.xml");
+
+    try {
+      bucketEndpoint.put("unknown_bucket", ACL_MARKER, headers, inputBody);
+      fail();
+    } catch (OS3Exception ex) {
+    } finally {
+      inputBody.close();
+    }
+    long curMetric = metrics.getPutAclFailure();
+    assertEquals(1L, curMetric - oriMetric);
+  }
+
+  private OzoneClient createClientWithKeys(String... keys) throws IOException {
+    OzoneBucket bkt = clientStub.getObjectStore().getS3Bucket(bucketName);
+    for (String key : keys) {
+      bkt.createKey(key, 0).close();
+    }
+    return clientStub;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/s3gateway/src/test/resources/auditlog.properties b/hadoop-ozone/s3gateway/src/test/resources/auditlog.properties
new file mode 100644
index 0000000..18a6b47
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/test/resources/auditlog.properties
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership.  The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# <p>
+# http://www.apache.org/licenses/LICENSE-2.0
+# <p>
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+#
+name=PropertiesConfig
+
+# Checks for config change periodically and reloads
+monitorInterval=5
+
+filter=read, write
+# filter.read.onMatch = DENY avoids logging all READ events
+# filter.read.onMatch = ACCEPT permits logging all READ events
+# The above two settings ignore the log levels in configuration
+# filter.read.onMatch = NEUTRAL permits logging of only those READ events
+# which are attempted at log level equal or greater than log level specified
+# in the configuration
+filter.read.type = MarkerFilter
+filter.read.marker = READ
+filter.read.onMatch = ACCEPT
+filter.read.onMismatch = NEUTRAL
+
+# filter.write.onMatch = DENY avoids logging all WRITE events
+# filter.write.onMatch = ACCEPT permits logging all WRITE events
+# The above two settings ignore the log levels in configuration
+# filter.write.onMatch = NEUTRAL permits logging of only those WRITE events
+# which are attempted at log level equal or greater than log level specified
+# in the configuration
+filter.write.type = MarkerFilter
+filter.write.marker = WRITE
+filter.write.onMatch = NEUTRAL
+filter.write.onMismatch = NEUTRAL
+
+# Log Levels are organized from most specific to least:
+# OFF (most specific, no logging)
+# FATAL (most specific, little data)
+# ERROR
+# WARN
+# INFO
+# DEBUG
+# TRACE (least specific, a lot of data)
+# ALL (least specific, all data)
+
+appenders = console, audit
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %-5level | %c{1} | %msg%n
+
+appender.audit.type = File
+appender.audit.name = AUDITLOG
+appender.audit.fileName=audit.log
+appender.audit.layout.type=PatternLayout
+appender.audit.layout.pattern= %-5level | %c{1} | %C | %msg%n
+
+loggers=audit
+logger.audit.type=AsyncLogger
+logger.audit.name=S3GAudit
+logger.audit.level = INFO
+logger.audit.appenderRefs = audit
+logger.audit.appenderRef.file.ref = AUDITLOG
+
+rootLogger.level = INFO
+rootLogger.appenderRefs = stdout
+rootLogger.appenderRef.stdout.ref = STDOUT
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/InfoBucketHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/InfoBucketHandler.java
index 6c6948c..cfb9f5a 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/InfoBucketHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/InfoBucketHandler.java
@@ -58,6 +58,7 @@
     private String sourceBucket;
     private Instant creationTime;
     private Instant modificationTime;
+    private String owner;
 
     LinkBucket(OzoneBucket ozoneBucket) {
       this.volumeName = ozoneBucket.getVolumeName();
@@ -66,6 +67,7 @@
       this.sourceBucket = ozoneBucket.getSourceBucket();
       this.creationTime = ozoneBucket.getCreationTime();
       this.modificationTime = ozoneBucket.getModificationTime();
+      this.owner = ozoneBucket.getOwner();
     }
 
     public String getVolumeName() {
@@ -91,6 +93,10 @@
     public Instant getModificationTime() {
       return modificationTime;
     }
+
+    public String getOwner() {
+      return owner;
+    }
   }
 
 }