Merge remote-tracking branch 'origin/master' into HDDS-4440-s3-performance
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 55b518d..d45f68f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -228,6 +228,26 @@
   }
 
   /**
+   * Retrieve a number, trying the supplied config keys in order.
+   * Each config value may be absent
+   *
+   * @param conf Conf
+   * @param keys a list of configuration key names.
+   *
+   * @return first number found from the given keys, or absent.
+   */
+  public static OptionalInt getNumberFromConfigKeys(
+      ConfigurationSource conf, String... keys) {
+    for (final String key : keys) {
+      final String value = conf.getTrimmed(key);
+      if (value != null) {
+        return OptionalInt.of(Integer.parseInt(value));
+      }
+    }
+    return OptionalInt.empty();
+  }
+
+  /**
    * Retrieve the port number, trying the supplied config keys in order.
    * Each config value may be absent, or if present in the format
    * host:port (the :port part is optional).
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index d673586..709cf45 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3087,7 +3087,6 @@
     <description>The S3Gateway service principal.
       Ex: s3g/_HOST@REALM.COM</description>
   </property>
-
   <property>
     <name>hdds.container.checksum.verification.enabled</name>
     <value>true</value>
@@ -3105,7 +3104,14 @@
       OM/SCM/DN/S3GATEWAY Server connection timeout in milliseconds.
     </description>
   </property>
-
+  <property>
+    <name>ozone.om.grpc.maximum.response.length</name>
+    <value>134217728</value>
+    <tag>OZONE, OM, S3GATEWAY</tag>
+    <description>
+      OM/S3GATEWAY OMRequest, OMResponse over grpc max message length (bytes).
+    </description>
+  </property>
   <property>
     <name>ozone.default.bucket.layout</name>
     <value/>
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
index fd8aa28..6700101 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
@@ -36,6 +36,8 @@
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
+
 import static org.hamcrest.core.Is.is;
 import org.junit.Assert;
 import static org.junit.Assert.assertThat;
@@ -216,4 +218,39 @@
 
   }
 
-}
\ No newline at end of file
+  @Test
+  public void testGetNumberFromConfigKeys() {
+    final String testnum1 = "8";
+    final String testnum2 = "7";
+    final String serviceId = "id1";
+    final String nodeId = "scm1";
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
+        testnum1);
+    Assert.assertTrue(Integer.parseInt(testnum1) ==
+        HddsUtils.getNumberFromConfigKeys(
+            conf,
+            OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
+
+    /* Test to return first unempty key number from list */
+    /* first key is absent */
+    Assert.assertTrue(Integer.parseInt(testnum1) ==
+        HddsUtils.getNumberFromConfigKeys(
+            conf,
+            ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
+                serviceId, nodeId),
+            OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
+
+    /* now set the empty key and ensure returned value from this key */
+    conf.set(ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
+            serviceId, nodeId),
+        testnum2);
+    Assert.assertTrue(Integer.parseInt(testnum2) ==
+        HddsUtils.getNumberFromConfigKeys(
+            conf,
+            ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
+                serviceId, nodeId),
+            OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
+  }
+}
diff --git a/hadoop-hdds/interface-client/pom.xml b/hadoop-hdds/interface-client/pom.xml
index eed1807..8086643 100644
--- a/hadoop-hdds/interface-client/pom.xml
+++ b/hadoop-hdds/interface-client/pom.xml
@@ -86,7 +86,7 @@
               <clearOutputDirectory>false</clearOutputDirectory>
               <pluginId>grpc-java</pluginId>
               <pluginArtifact>
-                io.grpc:protoc-gen-grpc-java:${grpc-compile.version}:exe:${os.detected.classifier}
+                io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier}
               </pluginArtifact>
             </configuration>
           </execution>
diff --git a/hadoop-hdds/interface-server/pom.xml b/hadoop-hdds/interface-server/pom.xml
index e8c96b8..e79e8c7 100644
--- a/hadoop-hdds/interface-server/pom.xml
+++ b/hadoop-hdds/interface-server/pom.xml
@@ -74,7 +74,7 @@
               <clearOutputDirectory>false</clearOutputDirectory>
               <pluginId>grpc-java</pluginId>
               <pluginArtifact>
-                io.grpc:protoc-gen-grpc-java:${grpc-compile.version}:exe:${os.detected.classifier}
+                io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier}
               </pluginArtifact>
             </configuration>
           </execution>
diff --git a/hadoop-ozone/common/pom.xml b/hadoop-ozone/common/pom.xml
index 9108137..701e6d5 100644
--- a/hadoop-ozone/common/pom.xml
+++ b/hadoop-ozone/common/pom.xml
@@ -31,6 +31,56 @@
   <dependencies>
 
     <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-netty</artifactId>
+      <version>${io.grpc.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-testing</artifactId>
+      <version>${io.grpc.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-http2</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler-proxy</artifactId>
+    </dependency>
+      <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-tcnative-boringssl-static</artifactId>
+        <version>${tcnative.version}</version>
+        <scope>runtime</scope>
+      </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-tcnative</artifactId>
+      <version>${tcnative.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-compress</artifactId>
     </dependency>
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index c8af3e6..bc1ed8d 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -59,7 +59,8 @@
   public static final String OZONE_OM_BIND_HOST_DEFAULT =
       "0.0.0.0";
   public static final int OZONE_OM_PORT_DEFAULT = 9862;
-
+  public static final String OZONE_OM_GRPC_PORT_KEY =
+      "ozone.om.grpc.port";
   public static final String OZONE_OM_HTTP_ENABLED_KEY =
       "ozone.om.http.enabled";
   public static final String OZONE_OM_HTTP_BIND_HOST_KEY =
@@ -310,6 +311,16 @@
       "ozone.path.deleting.limit.per.task";
   public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 10000;
 
+  public static final String OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH =
+      "ozone.om.grpc.maximum.response.length";
+  /** Default value for GRPC_MAXIMUM_RESPONSE_LENGTH. */
+  public static final int OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT =
+      128 * 1024 * 1024;
+
+  public static final String OZONE_OM_S3_GPRC_SERVER_ENABLED =
+      "ozone.om.s3.grpc.server_enabled";
+  public static final boolean OZONE_OM_S3_GRPC_SERVER_ENABLED_DEFAULT =
+      false;
   /**
    * Configuration properties for OMAdminProtcol service.
    */
@@ -320,7 +331,11 @@
       "ozone.om.admin.protocol.wait.between.retries";
   public static final long OZONE_OM_ADMIN_PROTOCOL_WAIT_BETWEEN_RETRIES_DEFAULT
       = 1000;
-
+  public static final String OZONE_OM_TRANSPORT_CLASS =
+      "ozone.om.transport.class";
+  public static final String OZONE_OM_TRANSPORT_CLASS_DEFAULT =
+      "org.apache.hadoop.ozone.om.protocolPB"
+          + ".Hadoop3OmTransportFactory";
   public static final String OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT =
       "ozone.om.unflushed.transaction.max.count";
   public static final int OZONE_OM_UNFLUSHED_TRANSACTION_MAX_COUNT_DEFAULT
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
new file mode 100644
index 0000000..498f935
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.ha;
+
+import org.apache.hadoop.hdds.conf.ConfigurationException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+
+/**
+ * The Grpc s3gateway om transport failover proxy provider implementation
+ * extending the ozone client OM failover proxy provider.  This implmentation
+ * allows the Grpc OMTransport reuse OM failover retry policies and
+ * getRetryAction methods.  In case of OM failover, client can try
+ * connecting to another OM node from the list of proxies.
+ */
+public class GrpcOMFailoverProxyProvider<T> extends
+    OMFailoverProxyProvider<T> {
+
+  private Map<String, String> omAddresses;
+
+  public GrpcOMFailoverProxyProvider(ConfigurationSource configuration,
+                                     UserGroupInformation ugi,
+                                     String omServiceId,
+                                     Class<T> protocol) throws IOException {
+    super(configuration, ugi, omServiceId, protocol);
+  }
+
+  @Override
+  protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
+      throws IOException {
+    // to be used for base class omProxies,
+    // ProxyInfo not applicable for gRPC, just need key set
+    Map<String, ProxyInfo<T>> omProxiesNodeIdKeyset = new HashMap<>();
+    // to be used for base class omProxyInfos
+    // OMProxyInfo not applicable for gRPC, just need key set
+    Map<String, OMProxyInfo> omProxyInfosNodeIdKeyset = new HashMap<>();
+    List<String> omNodeIDList = new ArrayList<>();
+    omAddresses = new HashMap<>();
+
+    Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config, omSvcId);
+
+    for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+
+      String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+          omSvcId, nodeId);
+
+      Optional<String> hostaddr = getHostNameFromConfigKeys(config,
+          rpcAddrKey);
+
+      OptionalInt hostport = HddsUtils.getNumberFromConfigKeys(config,
+          ConfUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
+              omSvcId, nodeId),
+          OMConfigKeys.OZONE_OM_GRPC_PORT_KEY);
+      if (nodeId == null) {
+        nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
+      }
+      omProxiesNodeIdKeyset.put(nodeId, null);
+      omProxyInfosNodeIdKeyset.put(nodeId, null);
+      if (hostaddr.isPresent()) {
+        omAddresses.put(nodeId,
+            hostaddr.get() + ":"
+                + hostport.orElse(config
+                .getObject(GrpcOmTransport
+                    .GrpcOmTransportConfig.class)
+                .getPort()));
+      } else {
+        LOG.error("expected host address not defined for: {}", rpcAddrKey);
+        throw new ConfigurationException(rpcAddrKey + "is not defined");
+      }
+      omNodeIDList.add(nodeId);
+    }
+
+    if (omProxiesNodeIdKeyset.isEmpty()) {
+      throw new IllegalArgumentException("Could not find any configured " +
+          "addresses for OM. Please configure the system with "
+          + OZONE_OM_ADDRESS_KEY);
+    }
+
+    // set base class omProxies, omProxyInfos, omNodeIDList
+
+    // omProxies needed in base class
+    // omProxies.size == number of om nodes
+    // omProxies key needs to be valid nodeid
+    // omProxyInfos keyset needed in base class
+    setProxies(omProxiesNodeIdKeyset, omProxyInfosNodeIdKeyset, omNodeIDList);
+  }
+
+  @Override
+  protected Text computeDelegationTokenService() {
+    return new Text();
+  }
+
+  // need to throw if nodeID not in omAddresses
+  public String getGrpcProxyAddress(String nodeId) throws IOException {
+    if (omAddresses.containsKey(nodeId)) {
+      return omAddresses.get(nodeId);
+    } else {
+      LOG.error("expected nodeId not found in omAddresses for proxyhost {}",
+          nodeId);
+      throw new IOException(
+          "expected nodeId not found in omAddresses for proxyhost");
+    }
+
+  }
+
+  public List<String> getGrpcOmNodeIDList() {
+    return getOmNodeIDList();
+  }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
index 001322e..985b3e7 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -149,8 +149,6 @@
           rpcAddrStr);
 
       if (omProxyInfo.getAddress() != null) {
-
-
         // For a non-HA OM setup, nodeId might be null. If so, we assign it
         // the default value
         if (nodeId == null) {
@@ -551,14 +549,18 @@
     return null;
   }
 
-  @VisibleForTesting
-  protected void setProxiesForTesting(
-      Map<String, ProxyInfo<T>> testOMProxies,
-      Map<String, OMProxyInfo> testOMProxyInfos,
-      List<String> testOMNodeIDList) {
-    this.omProxies = testOMProxies;
-    this.omProxyInfos = testOMProxyInfos;
-    this.omNodeIDList = testOMNodeIDList;
+  protected void setProxies(
+      Map<String, ProxyInfo<T>> setOMProxies,
+      Map<String, OMProxyInfo> setOMProxyInfos,
+      List<String> setOMNodeIDList) {
+    this.omProxies = setOMProxies;
+    this.omProxyInfos = setOMProxyInfos;
+    this.omNodeIDList = setOMNodeIDList;
   }
+
+  protected List<String> getOmNodeIDList() {
+    return omNodeIDList;
+  }
+
 }
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
new file mode 100644
index 0000000..d32926a
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.security.cert.X509Certificate;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.net.HostAndPort;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import org.apache.hadoop.ipc.RemoteException;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.hadoop.ozone.om.ha.GrpcOMFailoverProxyProvider;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
+import io.grpc.ManagedChannel;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.handler.ssl.SslContextBuilder;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+    .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+    .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT;
+
+/**
+ * Grpc transport for grpc between s3g and om.
+ */
+public class GrpcOmTransport implements OmTransport {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GrpcOmTransport.class);
+
+  private static final String CLIENT_NAME = "GrpcOmTransport";
+  private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
+  // gRPC specific
+  private static List<X509Certificate> caCerts = null;
+
+  private OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub client;
+  private Map<String,
+      OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub> clients;
+  private Map<String, ManagedChannel> channels;
+  private int lastVisited = -1;
+  private ConfigurationSource conf;
+
+  private AtomicReference<String> host;
+  private final int maxSize;
+  private SecurityConfig secConfig;
+
+  public static void setCaCerts(List<X509Certificate> x509Certificates) {
+    caCerts = x509Certificates;
+  }
+
+  private List<String> oms;
+  private RetryPolicy retryPolicy;
+  private int failoverCount = 0;
+  private GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
+      omFailoverProxyProvider;
+
+  public GrpcOmTransport(ConfigurationSource conf,
+                          UserGroupInformation ugi, String omServiceId)
+      throws IOException {
+
+    this.channels = new HashMap<>();
+    this.clients = new HashMap<>();
+    this.conf = conf;
+    this.host = new AtomicReference();
+
+    secConfig =  new SecurityConfig(conf);
+    maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
+        OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
+
+    omFailoverProxyProvider = new GrpcOMFailoverProxyProvider(
+        conf,
+        ugi,
+        omServiceId,
+        OzoneManagerProtocolPB.class);
+
+    start();
+  }
+
+  public void start() throws IOException {
+    host.set(omFailoverProxyProvider
+        .getGrpcProxyAddress(
+            omFailoverProxyProvider.getCurrentProxyOMNodeId()));
+
+    if (!isRunning.compareAndSet(false, true)) {
+      LOG.info("Ignore. already started.");
+      return;
+    }
+
+    List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList();
+    for (String nodeId : nodes) {
+      String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId);
+      HostAndPort hp = HostAndPort.fromString(hostaddr);
+
+      NettyChannelBuilder channelBuilder =
+          NettyChannelBuilder.forAddress(hp.getHost(), hp.getPort())
+              .usePlaintext()
+              .maxInboundMessageSize(maxSize);
+
+      if (secConfig.isGrpcTlsEnabled()) {
+        try {
+          SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
+          if (secConfig.isSecurityEnabled()) {
+            if (caCerts != null) {
+              sslContextBuilder.trustManager(caCerts);
+            } else {
+              LOG.error("x509Certicates empty");
+            }
+            channelBuilder.useTransportSecurity().
+                sslContext(sslContextBuilder.build());
+          } else {
+            LOG.error("ozone.security not enabled when TLS specified," +
+                " using plaintext");
+          }
+        } catch (Exception ex) {
+          LOG.error("cannot establish TLS for grpc om transport client");
+        }
+      } else {
+        channelBuilder.usePlaintext();
+      }
+
+      channels.put(hostaddr, channelBuilder.build());
+      clients.put(hostaddr,
+          OzoneManagerServiceGrpc
+              .newBlockingStub(channels.get(hostaddr)));
+    }
+    int maxFailovers = conf.getInt(
+        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
+
+    retryPolicy = omFailoverProxyProvider.getRetryPolicy(maxFailovers);
+    LOG.info("{}: started", CLIENT_NAME);
+  }
+
+  @Override
+  public OMResponse submitRequest(OMRequest payload) throws IOException {
+    OMResponse resp = null;
+    boolean tryOtherHost = true;
+    ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
+    while (tryOtherHost) {
+      tryOtherHost = false;
+      try {
+        resp = clients.get(host.get()).submitRequest(payload);
+      } catch (StatusRuntimeException e) {
+        if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
+          resultCode = ResultCodes.TIMEOUT;
+        }
+        Exception exp = new Exception(e);
+        tryOtherHost = shouldRetry(unwrapException(exp));
+        if (!tryOtherHost) {
+          throw new OMException(resultCode);
+        }
+      }
+    }
+    return resp;
+  }
+
+  private Exception unwrapException(Exception ex) {
+    Exception grpcException = null;
+    try {
+      StatusRuntimeException srexp =
+          (StatusRuntimeException)ex.getCause();
+      Status status = srexp.getStatus();
+      LOG.debug("GRPC exception wrapped: {}", status.getDescription());
+      if (status.getCode() == Status.Code.INTERNAL) {
+        // exception potentially generated by OzoneManagerServiceGrpc
+        Class<?> realClass = Class.forName(status.getDescription()
+            .substring(0, status.getDescription()
+                .indexOf(":")));
+        Class<? extends Exception> cls = realClass
+            .asSubclass(Exception.class);
+        Constructor<? extends Exception> cn = cls.getConstructor(String.class);
+        cn.setAccessible(true);
+        grpcException = cn.newInstance(status.getDescription());
+        IOException remote = null;
+        try {
+          String cause = status.getDescription();
+          cause = cause.substring(cause.indexOf(":") + 2);
+          remote = new RemoteException(cause.substring(0, cause.indexOf(":")),
+              cause.substring(cause.indexOf(":") + 1));
+          grpcException.initCause(remote);
+        } catch (Exception e) {
+          LOG.error("cannot get cause for remote exception");
+        }
+      } else {
+        // exception generated by connection failure, gRPC
+        grpcException = ex;
+      }
+    } catch (Exception e) {
+      grpcException = new IOException(e);
+      LOG.error("error unwrapping exception from OMResponse {}");
+    }
+    return grpcException;
+  }
+
+  private boolean shouldRetry(Exception ex) {
+    boolean retry = false;
+    RetryPolicy.RetryAction action = null;
+    try {
+      action = retryPolicy.shouldRetry((Exception)ex, 0, failoverCount++, true);
+      LOG.debug("grpc failover retry action {}", action.action);
+      if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+        retry = false;
+        LOG.error("Retry request failed. " + action.reason, ex);
+      } else {
+        if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY ||
+            (action.action == RetryPolicy.RetryAction.RetryDecision
+                .FAILOVER_AND_RETRY)) {
+          if (action.delayMillis > 0) {
+            try {
+              Thread.sleep(action.delayMillis);
+            } catch (Exception e) {
+              LOG.error("Error trying sleep thread for {}", action.delayMillis);
+            }
+          }
+          // switch om host to current proxy OMNodeId
+          omFailoverProxyProvider.performFailover(null);
+          host.set(omFailoverProxyProvider
+              .getGrpcProxyAddress(
+                  omFailoverProxyProvider.getCurrentProxyOMNodeId()));
+          retry = true;
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Failed failover exception {}", e);
+    }
+    return retry;
+  }
+
+  // stub implementation for interface
+  @Override
+  public Text getDelegationTokenService() {
+    return new Text();
+  }
+
+  public void shutdown() {
+    for (Map.Entry<String, ManagedChannel> entry : channels.entrySet()) {
+      ManagedChannel channel = entry.getValue();
+      channel.shutdown();
+      try {
+        channel.awaitTermination(5, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        LOG.error("failed to shutdown OzoneManagerServiceGrpc channel {} : {}",
+            entry.getKey(), e);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    shutdown();
+  }
+
+  /**
+   * GrpcOmTransport configuration in Java style configuration class.
+   */
+  @ConfigGroup(prefix = "ozone.om.grpc")
+  public static final class GrpcOmTransportConfig {
+    @Config(key = "port", defaultValue = "8981",
+        description = "Port used for"
+            + " the GrpcOmTransport OzoneManagerServiceGrpc server",
+        tags = {ConfigTag.MANAGEMENT})
+    private int port;
+
+    public int getPort() {
+      return port;
+    }
+
+    public GrpcOmTransportConfig setPort(int portParam) {
+      this.port = portParam;
+      return this;
+    }
+  }
+
+  @VisibleForTesting
+  public void startClient(ManagedChannel testChannel) throws IOException {
+    List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList();
+    for (String nodeId : nodes) {
+      String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId);
+
+      clients.put(hostaddr,
+          OzoneManagerServiceGrpc
+              .newBlockingStub(testChannel));
+    }
+    LOG.info("{}: started", CLIENT_NAME);
+  }
+
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransportFactory.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransportFactory.java
new file mode 100644
index 0000000..5f34a3c
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransportFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Factory to create the default GrpcOm transport.
+ */
+public class GrpcOmTransportFactory implements OmTransportFactory {
+  @Override
+  public OmTransport createOmTransport(ConfigurationSource source,
+                                       UserGroupInformation ugi,
+                                       String omServiceId) throws IOException {
+    return new GrpcOmTransport(source, ugi, omServiceId);
+  }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OmTransportFactory.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OmTransportFactory.java
index 1ffc861..2ba8536 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OmTransportFactory.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OmTransportFactory.java
@@ -24,6 +24,9 @@
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_TRANSPORT_CLASS;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_TRANSPORT_CLASS_DEFAULT;
+
 /**
  * Factory pattern to create object for RPC communication with OM.
  */
@@ -34,24 +37,31 @@
 
   static OmTransport create(ConfigurationSource conf,
       UserGroupInformation ugi, String omServiceId) throws IOException {
-    OmTransportFactory factory = createFactory();
+    OmTransportFactory factory = createFactory(conf);
 
     return factory.createOmTransport(conf, ugi, omServiceId);
   }
 
-  static OmTransportFactory createFactory() throws IOException {
-    ServiceLoader<OmTransportFactory> transportFactoryServiceLoader =
-        ServiceLoader.load(OmTransportFactory.class);
-    Iterator<OmTransportFactory> iterator =
-        transportFactoryServiceLoader.iterator();
-    if (iterator.hasNext()) {
-      return iterator.next();
-    }
+  static OmTransportFactory createFactory(ConfigurationSource conf)
+      throws IOException {
     try {
+      // if configured transport class is different than the default
+      // OmTransportFactory (Hadoop3OmTransportFactory), then
+      // check service loader for transport class and instantiate it
+      if (conf
+          .get(OZONE_OM_TRANSPORT_CLASS,
+              OZONE_OM_TRANSPORT_CLASS_DEFAULT) !=
+          OZONE_OM_TRANSPORT_CLASS_DEFAULT) {
+        ServiceLoader<OmTransportFactory> transportFactoryServiceLoader =
+            ServiceLoader.load(OmTransportFactory.class);
+        Iterator<OmTransportFactory> iterator =
+            transportFactoryServiceLoader.iterator();
+        if (iterator.hasNext()) {
+          return iterator.next();
+        }
+      }
       return OmTransportFactory.class.getClassLoader()
-          .loadClass(
-              "org.apache.hadoop.ozone.om.protocolPB"
-                  + ".Hadoop3OmTransportFactory")
+          .loadClass(OZONE_OM_TRANSPORT_CLASS_DEFAULT)
           .asSubclass(OmTransportFactory.class)
           .newInstance();
     } catch (Exception ex) {
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 10ca155..21b31e1 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -188,7 +188,7 @@
   private OmTransport transport;
   private ThreadLocal<S3Auth> threadLocalS3Auth
       = new ThreadLocal<>();
-
+    
   private boolean s3AuthCheck;
   public OzoneManagerProtocolClientSideTranslatorPB(OmTransport omTransport,
       String clientId) {
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
new file mode 100644
index 0000000..1ca5b64
--- /dev/null
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import static org.apache.hadoop.ozone.ClientVersion.CURRENT_VERSION;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.mock;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.ManagedChannel;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+import com.google.protobuf.ServiceException;
+import org.apache.ratis.protocol.RaftPeerId;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for GrpcOmTransport client.
+ */
+public class TestS3GrpcOmTransport {
+  @Rule
+  public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestS3GrpcOmTransport.class);
+
+  private final String leaderOMNodeId = "TestOM";
+
+  private final OMResponse omResponse = OMResponse.newBuilder()
+                  .setSuccess(true)
+                  .setStatus(org.apache.hadoop.ozone.protocol
+                      .proto.OzoneManagerProtocolProtos.Status.OK)
+                  .setLeaderOMNodeId(leaderOMNodeId)
+                  .setCmdType(Type.AllocateBlock)
+                  .build();
+
+  private boolean doFailover = false;
+
+  private OzoneConfiguration conf;
+
+  private String omServiceId;
+  private UserGroupInformation ugi;
+  private ManagedChannel channel;
+
+
+  private ServiceException createNotLeaderException() {
+    RaftPeerId raftPeerId = RaftPeerId.getRaftPeerId("testNodeId");
+
+    // TODO: Set suggest leaderID. Right now, client is not using suggest
+    // leaderID. Need to fix this.
+    OMNotLeaderException notLeaderException =
+        new OMNotLeaderException(raftPeerId);
+    LOG.debug(notLeaderException.getMessage());
+    return new ServiceException(notLeaderException);
+  }
+
+  private final OzoneManagerServiceGrpc.OzoneManagerServiceImplBase
+      serviceImpl =
+        mock(OzoneManagerServiceGrpc.OzoneManagerServiceImplBase.class,
+            delegatesTo(
+              new OzoneManagerServiceGrpc.OzoneManagerServiceImplBase() {
+                @Override
+                public void submitRequest(org.apache.hadoop.ozone.protocol.proto
+                                              .OzoneManagerProtocolProtos
+                                              .OMRequest request,
+                                          io.grpc.stub.StreamObserver<org.apache
+                                              .hadoop.ozone.protocol.proto
+                                              .OzoneManagerProtocolProtos
+                                              .OMResponse>
+                                          responseObserver) {
+                  try {
+                    if (doFailover) {
+                      doFailover = false;
+                      throw createNotLeaderException();
+                    } else {
+                      responseObserver.onNext(omResponse);
+                      responseObserver.onCompleted();
+                    }
+                  } catch (Throwable e) {
+                    IOException ex = new IOException(e.getCause());
+                    responseObserver.onError(io.grpc.Status
+                        .INTERNAL
+                        .withDescription(ex.getMessage())
+                        .asRuntimeException());
+                  }
+                }
+              }));
+
+  private GrpcOmTransport client;
+
+  @Before
+  public void setUp() throws Exception {
+    // Generate a unique in-process server name.
+    String serverName = InProcessServerBuilder.generateName();
+
+    // Create a server, add service, start,
+    // and register for automatic graceful shutdown.
+    grpcCleanup.register(InProcessServerBuilder
+        .forName(serverName)
+        .directExecutor()
+        .addService(serviceImpl)
+        .build()
+        .start());
+
+    // Create a client channel and register for automatic graceful shutdown.
+    channel = grpcCleanup.register(
+        InProcessChannelBuilder.forName(serverName).directExecutor().build());
+
+    omServiceId = "";
+    conf = new OzoneConfiguration();
+    ugi = UserGroupInformation.getCurrentUser();
+    doFailover = false;
+  }
+
+  @Test
+  public void testSubmitRequestToServer() throws Exception {
+    ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+    final OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.ServiceList)
+        .setVersion(CURRENT_VERSION)
+        .setClientId("test")
+        .setServiceListRequest(req)
+        .build();
+
+    client = new GrpcOmTransport(conf, ugi, omServiceId);
+    client.startClient(channel);
+
+    final OMResponse resp = client.submitRequest(omRequest);
+    Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol
+        .proto.OzoneManagerProtocolProtos.Status.OK);
+    Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId);
+  }
+
+  @Test
+  public void testGrpcFailoverProxy() throws Exception {
+    ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+    final OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.ServiceList)
+        .setVersion(CURRENT_VERSION)
+        .setClientId("test")
+        .setServiceListRequest(req)
+        .build();
+
+    client = new GrpcOmTransport(conf, ugi, omServiceId);
+    client.startClient(channel);
+
+    doFailover = true;
+    // first invocation generates a NotALeaderException
+    // failover is performed and request is internally retried
+    // second invocation request to server succeeds
+    final OMResponse resp = client.submitRequest(omRequest);
+    Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol
+        .proto.OzoneManagerProtocolProtos.Status.OK);
+    Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId);
+  }
+
+  @Test
+  public void testGrpcFailoverProxyExhaustRetry() throws Exception {
+    ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+    final OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.ServiceList)
+        .setVersion(CURRENT_VERSION)
+        .setClientId("test")
+        .setServiceListRequest(req)
+        .build();
+
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 0);
+    client = new GrpcOmTransport(conf, ugi, omServiceId);
+    client.startClient(channel);
+
+    doFailover = true;
+    // first invocation generates a NotALeaderException
+    // failover is performed and request is internally retried
+    // OMFailoverProvider returns Fail retry due to #attempts >
+    // max failovers
+
+    try {
+      final OMResponse resp = client.submitRequest(omRequest);
+      fail();
+    } catch (Exception e) {
+      Assert.assertTrue(true);
+    }
+  }
+}
diff --git a/hadoop-ozone/common/src/test/resources/log4j.properties b/hadoop-ozone/common/src/test/resources/log4j.properties
new file mode 100644
index 0000000..b8ad21d
--- /dev/null
+++ b/hadoop-ozone/common/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop.security.ShellBasedUnixGroupsMapping=ERROR
+log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
diff --git a/hadoop-ozone/csi/pom.xml b/hadoop-ozone/csi/pom.xml
index 67dd38f..bb8fa56 100644
--- a/hadoop-ozone/csi/pom.xml
+++ b/hadoop-ozone/csi/pom.xml
@@ -71,7 +71,7 @@
     <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-netty</artifactId>
-      <version>${grpc-compile.version}</version>
+      <version>${io.grpc.version}</version>
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
@@ -79,6 +79,14 @@
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
+      <artifactId>netty-handler-proxy</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-http2</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
       <artifactId>netty-transport-native-unix-common</artifactId>
     </dependency>
     <dependency>
@@ -92,7 +100,7 @@
     <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-protobuf</artifactId>
-      <version>${grpc-compile.version}</version>
+      <version>${io.grpc.version}</version>
       <exclusions>
         <exclusion>
           <groupId>com.google.protobuf</groupId>
@@ -103,7 +111,7 @@
     <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-stub</artifactId>
-      <version>${grpc-compile.version}</version>
+      <version>${io.grpc.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.ozone</groupId>
@@ -171,7 +179,7 @@
             <configuration>
               <pluginId>grpc-java</pluginId>
               <pluginArtifact>
-                io.grpc:protoc-gen-grpc-java:${grpc-compile.version}:exe:${os.detected.classifier}
+                io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier}
               </pluginArtifact>
             </configuration>
           </execution>
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config
index fa38aad..c22505f 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config
@@ -38,6 +38,7 @@
 OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
 OZONE-SITE.XML_ozone.scm.primordial.node.id=scm1
 OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
 OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
 OZONE-SITE.XML_ozone.recon.address=recon:9891
 OZONE-SITE.XML_ozone.recon.http-address=0.0.0.0:9888
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
index 69f4e52..4642680 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
@@ -36,6 +36,7 @@
 OZONE-SITE.XML_hdds.profiler.endpoint.enabled=true
 OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
 OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
 HDFS-SITE.XML_rpc.metrics.quantile.enable=true
 HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
 ASYNC_PROFILER_HOME=/opt/profiler
diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
index 2733e07..f63a784 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
@@ -39,6 +39,7 @@
 OZONE-SITE.XML_ozone.datanode.pipeline.limit=1
 OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
 OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
 OZONE-SITE.XML_ozone.scm.stale.node.interval=30s
 OZONE-SITE.XML_ozone.scm.dead.node.interval=45s
 OZONE-SITE.XML_hdds.heartbeat.interval=5s
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
index 4baaca5..014583c 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
@@ -51,6 +51,7 @@
 OZONE-SITE.XML_ozone.replication=3
 OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
 OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
 
 OZONE-SITE.XML_ozone.recon.om.snapshot.task.interval.delay=1m
 OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index ad445a0..f3c70af 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -72,6 +72,7 @@
 OZONE-SITE.XML_ozone.scm.stale.node.interval=30s
 OZONE-SITE.XML_ozone.scm.dead.node.interval=45s
 OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
 
 HDFS-SITE.XML_dfs.datanode.kerberos.principal=dn/dn@EXAMPLE.COM
 HDFS-SITE.XML_dfs.datanode.kerberos.keytab.file=/etc/security/keytabs/dn.keytab
diff --git a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
index 3439317..20281ea 100644
--- a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
+++ b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
@@ -309,6 +309,8 @@
    io.netty:netty-handler
    io.netty:netty-handler-proxy
    io.netty:netty-resolver
+   io.netty:netty-tcnative-boringssl-static
+   io.netty:netty-tcnative
    io.netty:netty-transport
    io.netty:netty-transport-native-epoll
    io.netty:netty-transport-native-unix-common
diff --git a/hadoop-ozone/dist/src/main/license/jar-report.txt b/hadoop-ozone/dist/src/main/license/jar-report.txt
index 3911c4d..3cb582b 100644
--- a/hadoop-ozone/dist/src/main/license/jar-report.txt
+++ b/hadoop-ozone/dist/src/main/license/jar-report.txt
@@ -173,6 +173,8 @@
 share/ozone/lib/netty-handler.Final.jar
 share/ozone/lib/netty-handler-proxy.Final.jar
 share/ozone/lib/netty-resolver.Final.jar
+share/ozone/lib/netty-tcnative-boringssl-static.Final.jar
+share/ozone/lib/netty-tcnative.Final.jar
 share/ozone/lib/netty-transport.Final.jar
 share/ozone/lib/netty-transport-native-epoll.Final.jar
 share/ozone/lib/netty-transport-native-unix-common.Final.jar
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index b239621..3b9d3df 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -95,6 +95,7 @@
         ScmConfigKeys.OZONE_SCM_ADDRESS_KEY,
         OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY,
         OMConfigKeys.OZONE_FS_TRASH_CHECKPOINT_INTERVAL_KEY,
+        OMConfigKeys.OZONE_OM_S3_GPRC_SERVER_ENABLED,
         OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE,
         OzoneConfigKeys.OZONE_S3_AUTHINFO_MAX_LIFETIME_KEY,
         OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY,
@@ -114,7 +115,9 @@
         ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY,
         ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM,
         OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
-        OMConfigKeys.OZONE_OM_HA_PREFIX
+        OMConfigKeys.OZONE_OM_HA_PREFIX,
+        OMConfigKeys.OZONE_OM_TRANSPORT_CLASS,
+        OMConfigKeys.OZONE_OM_GRPC_PORT_KEY
         // TODO HDDS-2856
     ));
   }
diff --git a/hadoop-ozone/interface-client/pom.xml b/hadoop-ozone/interface-client/pom.xml
index 292571b..276ffdc 100644
--- a/hadoop-ozone/interface-client/pom.xml
+++ b/hadoop-ozone/interface-client/pom.xml
@@ -41,6 +41,24 @@
       <groupId>org.apache.ozone</groupId>
       <artifactId>hdds-interface-client</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-protobuf</artifactId>
+      <version>${io.grpc.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+      <version>${io.grpc.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-http2</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler-proxy</artifactId>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
@@ -55,16 +73,24 @@
         <extensions>true</extensions>
         <executions>
           <execution>
-            <id>compile-protoc</id>
+            <id>compile-protoc-OmGrpc</id>
             <goals>
               <goal>compile</goal>
               <goal>test-compile</goal>
+              <goal>compile-custom</goal>
+              <goal>test-compile-custom</goal>
             </goals>
             <configuration>
-              <protoSourceRoot>${basedir}/src/main/proto/</protoSourceRoot>
               <protocArtifact>
                 com.google.protobuf:protoc:${proto2.hadooprpc.protobuf.version}:exe:${os.detected.classifier}
               </protocArtifact>
+              <protoSourceRoot>${basedir}/src/main/proto/</protoSourceRoot>
+              <outputDirectory>target/generated-sources/protobuf/java</outputDirectory>
+              <clearOutputDirectory>false</clearOutputDirectory>
+              <pluginId>grpc-java</pluginId>
+              <pluginArtifact>
+                io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier}
+              </pluginArtifact>
             </configuration>
           </execution>
           <execution>
diff --git a/hadoop-ozone/ozone-manager/pom.xml b/hadoop-ozone/ozone-manager/pom.xml
index 5e42414..6291fb8 100644
--- a/hadoop-ozone/ozone-manager/pom.xml
+++ b/hadoop-ozone/ozone-manager/pom.xml
@@ -87,6 +87,17 @@
       <groupId>org.bouncycastle</groupId>
       <artifactId>bcprov-jdk15on</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-tcnative</artifactId>
+      <version>${tcnative.version}</version>
+    </dependency>
+      <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-tcnative-boringssl-static</artifactId>
+        <version>${tcnative.version}</version>
+        <scope>runtime</scope>
+      </dependency>
 
     <dependency>
       <groupId>org.mockito</groupId>
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
new file mode 100644
index 0000000..2231c28
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import java.io.IOException;
+import java.util.OptionalInt;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
+import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyServerBuilder;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.grpc.Server;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_PROVIDER;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_GRPC_TLS_PROVIDER_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT;
+
+/**
+ * Separated network server for gRPC transport OzoneManagerService s3g->OM.
+ */
+public class GrpcOzoneManagerServer {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GrpcOzoneManagerServer.class);
+
+  private Server server;
+  private int port = 8981;
+  private final int maxSize;
+
+  public GrpcOzoneManagerServer(OzoneConfiguration config,
+                                OzoneManagerProtocolServerSideTranslatorPB
+                                    omTranslator,
+                                OzoneDelegationTokenSecretManager
+                                    delegationTokenMgr,
+                                CertificateClient caClient) {
+    maxSize = config.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
+        OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
+    OptionalInt haPort = HddsUtils.getNumberFromConfigKeys(config,
+        ConfUtils.addKeySuffixes(
+            OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
+            config.get(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY),
+            config.get(OMConfigKeys.OZONE_OM_NODE_ID_KEY)),
+        OMConfigKeys.OZONE_OM_GRPC_PORT_KEY);
+    if (haPort.isPresent()) {
+      this.port = haPort.getAsInt();
+    } else {
+      this.port = config.getObject(
+          GrpcOmTransport.GrpcOmTransportConfig.class).
+          getPort();
+    }
+    
+    init(omTranslator,
+        delegationTokenMgr,
+        config,
+        caClient);
+  }
+
+  public void init(OzoneManagerProtocolServerSideTranslatorPB omTranslator,
+                   OzoneDelegationTokenSecretManager delegationTokenMgr,
+                   OzoneConfiguration omServerConfig,
+                   CertificateClient caClient) {
+    NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
+        .maxInboundMessageSize(maxSize)
+        .addService(new OzoneManagerServiceGrpc(omTranslator,
+            delegationTokenMgr,
+            omServerConfig));
+
+    SecurityConfig secConf = new SecurityConfig(omServerConfig);
+    if (secConf.isGrpcTlsEnabled()) {
+      try {
+        if (secConf.isSecurityEnabled()) {
+          SslContextBuilder sslClientContextBuilder =
+              SslContextBuilder.forServer(caClient.getPrivateKey(),
+                  caClient.getCertificate());
+          SslContextBuilder sslContextBuilder = GrpcSslContexts.configure(
+              sslClientContextBuilder,
+              SslProvider.valueOf(omServerConfig.get(HDDS_GRPC_TLS_PROVIDER,
+                  HDDS_GRPC_TLS_PROVIDER_DEFAULT)));
+          nettyServerBuilder.sslContext(sslContextBuilder.build());
+        } else {
+          LOG.error("ozone.security not enabled when TLS specified," +
+                            " creating Om S3g GRPC channel using plaintext");
+        }
+      } catch (Exception ex) {
+        LOG.error("Unable to setup TLS for secure Om S3g GRPC channel.", ex);
+      }
+    }
+
+    server = nettyServerBuilder.build();
+  }
+
+  public void start() throws IOException {
+    server.start();
+    LOG.info("{} is started using port {}", getClass().getSimpleName(),
+        server.getPort());
+    port = server.getPort();
+  }
+
+  public void stop() {
+    try {
+      server.shutdown().awaitTermination(10L, TimeUnit.SECONDS);
+      LOG.info("Server {} is shutdown", getClass().getSimpleName());
+    } catch (InterruptedException ex) {
+      LOG.warn("{} couldn't be stopped gracefully", getClass().getSimpleName());
+    }
+  }
+  public int getPort() {
+    return port;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index b733d89..c0bd8cc 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -236,6 +236,8 @@
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_S3_GPRC_SERVER_ENABLED;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_S3_GRPC_SERVER_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
@@ -296,6 +298,7 @@
   private final Text omRpcAddressTxt;
   private OzoneConfiguration configuration;
   private RPC.Server omRpcServer;
+  private GrpcOzoneManagerServer omS3gGrpcServer;    
   private InetSocketAddress omRpcAddress;
   private String omId;
 
@@ -334,7 +337,9 @@
   private JvmPauseMonitor jvmPauseMonitor;
   private final SecurityConfig secConfig;
   private S3SecretManager s3SecretManager;
+  private final boolean isOmGrpcServerEnabled;
   private volatile boolean isOmRpcServerRunning = false;
+  private volatile boolean isOmGrpcServerRunning = false;    
   private String omComponent;
   private OzoneManagerProtocolServerSideTranslatorPB omServerProtocol;
 
@@ -456,6 +461,9 @@
 
     this.isSpnegoEnabled = conf.get(OZONE_OM_HTTP_AUTH_TYPE, "simple")
         .equals("kerberos");
+    this.isOmGrpcServerEnabled = conf.getBoolean(
+        OZONE_OM_S3_GPRC_SERVER_ENABLED,
+        OZONE_OM_S3_GRPC_SERVER_ENABLED_DEFAULT);
     this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
         OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
     this.preallocateBlocksMax = conf.getInt(
@@ -559,6 +567,10 @@
     omRpcAddress = updateRPCListenAddress(configuration,
         OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
 
+    // Start S3g Om gRPC Server.
+    if (isOmGrpcServerEnabled) {
+      omS3gGrpcServer = getOmS3gGrpcServer(configuration);
+    }
     shutdownHook = () -> {
       saveOmMetrics();
     };
@@ -1086,6 +1098,21 @@
     return rpcServer;
   }
 
+  /**
+   * Starts an s3g OmGrpc server.
+   *
+   * @param conf         configuration
+   * @return gRPC server
+   * @throws IOException if there is an I/O error while creating RPC server
+   */
+  private GrpcOzoneManagerServer startGrpcServer(OzoneConfiguration conf)
+          throws IOException {
+    return new GrpcOzoneManagerServer(conf,
+            this.omServerProtocol,
+            this.delegationTokenMgr,
+            this.certClient);
+  }
+
   private static boolean isOzoneSecurityEnabled() {
     return securityEnabled;
   }
@@ -1431,7 +1458,10 @@
     isOmRpcServerRunning = true;
 
     startTrashEmptier(configuration);
-
+    if (isOmGrpcServerEnabled) {
+      omS3gGrpcServer.start();
+      isOmGrpcServerRunning = true;
+    }
     registerMXBean();
 
     startJVMPauseMonitor();
@@ -1492,7 +1522,9 @@
     }
 
     omRpcServer = getRpcServer(configuration);
-
+    if (isOmGrpcServerEnabled) {
+      omS3gGrpcServer = getOmS3gGrpcServer(configuration);
+    }
     try {
       httpServer = new OzoneManagerHttpServer(configuration, this);
       httpServer.start();
@@ -1506,6 +1538,10 @@
     startTrashEmptier(configuration);
     registerMXBean();
 
+    if (isOmGrpcServerEnabled) {
+      omS3gGrpcServer.start();
+      isOmGrpcServerRunning = true;
+    }
     startJVMPauseMonitor();
     setStartTime();
     omState = State.RUNNING;
@@ -1816,6 +1852,19 @@
   }
 
   /**
+   * Creates a new instance of gRPC OzoneManagerServiceGrpc transport server
+   * for serving s3g OmRequests.  If an earlier instance is already running
+   * then returns the same.
+   */
+  private GrpcOzoneManagerServer getOmS3gGrpcServer(OzoneConfiguration conf)
+      throws IOException {
+    if (isOmGrpcServerRunning) {
+      return omS3gGrpcServer;
+    }
+    return startGrpcServer(configuration);
+  }
+
+  /**
    * Creates an instance of ratis server.
    */
   /**
@@ -1908,6 +1957,9 @@
         scheduleOMMetricsWriteTask = null;
       }
       omRpcServer.stop();
+      if (isOmGrpcServerEnabled) {
+        omS3gGrpcServer.stop();
+      }
       // When ratis is not enabled, we need to call stop() to stop
       // OzoneManageDoubleBuffer in OM server protocol.
       if (!isRatisEnabled) {
@@ -1918,6 +1970,9 @@
         omRatisServer = null;
       }
       isOmRpcServerRunning = false;
+      if (isOmGrpcServerEnabled) {
+        isOmGrpcServerRunning = false;
+      }
       keyManager.stop();
       stopSecretManager();
       if (httpServer != null) {
@@ -3621,7 +3676,7 @@
       if (isAclEnabled) {
         InetAddress remoteIp = Server.getRemoteIp();
         resolved = resolveBucketLink(requested, new HashSet<>(),
-            Server.getRemoteUser(),
+            getRemoteUser(),
             remoteIp,
             remoteIp != null ? remoteIp.getHostName() :
                 omRpcAddress.getHostName());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
new file mode 100644
index 0000000..a88e259
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import io.grpc.Status;
+import com.google.protobuf.RpcController;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.ipc.ClientId;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc.OzoneManagerServiceImplBase;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.protocol.proto
+    .OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Grpc Service for handling S3 gateway OzoneManagerProtocol client requests.
+ */
+public class OzoneManagerServiceGrpc extends OzoneManagerServiceImplBase {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneManagerServiceGrpc.class);
+  /**
+   * RpcController is not used and hence is set to null.
+   */
+  private static final RpcController NULL_RPC_CONTROLLER = null;
+  private OzoneManagerProtocolServerSideTranslatorPB omTranslator;
+  private OzoneDelegationTokenSecretManager delegationTokenMgr;
+  private final SecurityConfig secConfig;
+
+  OzoneManagerServiceGrpc(
+      OzoneManagerProtocolServerSideTranslatorPB omTranslator,
+      OzoneDelegationTokenSecretManager delegationTokenMgr,
+      OzoneConfiguration configuration) {
+    this.omTranslator = omTranslator;
+    this.delegationTokenMgr = delegationTokenMgr;
+    this.secConfig = new SecurityConfig(configuration);
+  }
+
+  @Override
+  public void submitRequest(OMRequest request,
+                            io.grpc.stub.StreamObserver<OMResponse>
+                                responseObserver) {
+    LOG.debug("OzoneManagerServiceGrpc: OzoneManagerServiceImplBase " +
+        "processing s3g client submit request - for command {}",
+        request.getCmdType().name());
+    AtomicInteger callCount = new AtomicInteger(0);
+
+    org.apache.hadoop.ipc.Server.getCurCall().set(new Server.Call(1,
+        callCount.incrementAndGet(),
+        null,
+        null,
+        RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        ClientId.getClientId()));
+    // TODO: currently require setting the Server class for each request
+    // with thread context (Server.Call()) that includes retries
+    // and importantly random ClientId.  This is currently necessary for
+    // Om Ratis Server to create createWriteRaftClientRequest.
+    // Look to remove Server class requirement for issuing ratis transactions
+    // for OMRequests.  Test through successful ratis-enabled OMRequest
+    // handling without dependency on hadoop IPC based Server.
+    try {
+      OMResponse omResponse = this.omTranslator.
+          submitRequest(NULL_RPC_CONTROLLER, request);
+      responseObserver.onNext(omResponse);
+    } catch (Throwable e) {
+      IOException ex = new IOException(e.getCause());
+      responseObserver.onError(Status
+          .INTERNAL
+          .withDescription(ex.getMessage())
+          .asRuntimeException());
+    }
+    responseObserver.onCompleted();
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index 266024a..1eec75e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -141,6 +141,11 @@
       userInfo.setUserName(user.getUserName());
     }
 
+    // for gRPC s3g omRequests that contain user name
+    if (user == null && omRequest.hasUserInfo()) {
+      userInfo.setUserName(omRequest.getUserInfo().getUserName());
+    }
+
     if (remoteAddress != null) {
       userInfo.setHostName(remoteAddress.getHostName());
       userInfo.setRemoteAddress(remoteAddress.getHostAddress()).build();
@@ -356,7 +361,6 @@
     if (userGroupInformation != null) {
       return userGroupInformation;
     }
-
     if (omRequest.hasUserInfo() &&
         !StringUtils.isBlank(omRequest.getUserInfo().getUserName())) {
       userGroupInformation = UserGroupInformation.createRemoteUser(
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestGrpcOzoneManagerServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestGrpcOzoneManagerServer.java
new file mode 100644
index 0000000..687ed21
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestGrpcOzoneManagerServer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.mockito.Mockito;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+
+/**
+ * Tests for GrpcOzoneManagerServer.
+ */
+public class TestGrpcOzoneManagerServer {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestGrpcOzoneManagerServer.class);
+  private OzoneManager ozoneManager;
+  private OzoneManagerProtocolServerSideTranslatorPB omServerProtocol;
+  private GrpcOzoneManagerServer server;
+
+  @Rule
+  public Timeout timeout = Timeout.seconds(30);
+
+  @Test
+  public void testStartStop() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    ozoneManager = Mockito.mock(OzoneManager.class);
+    omServerProtocol = ozoneManager.getOmServerProtocol();
+
+    server = new GrpcOzoneManagerServer(conf,
+        omServerProtocol,
+        ozoneManager.getDelegationTokenMgr(),
+        ozoneManager.getCertificateClient());
+
+    try {
+      server.start();
+    } finally {
+      server.stop();
+    }
+  }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
index 0160166..fe7f6f4 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
@@ -143,7 +143,7 @@
         omProxyInfos.put(nodeId, null);
         omNodeIDList.add(nodeId);
       }
-      setProxiesForTesting(omProxies, omProxyInfos, omNodeIDList);
+      setProxies(omProxies, omProxyInfos, omNodeIDList);
     }
 
     @Override
diff --git a/hadoop-ozone/s3gateway/pom.xml b/hadoop-ozone/s3gateway/pom.xml
index ec751ed..ce1a4cf 100644
--- a/hadoop-ozone/s3gateway/pom.xml
+++ b/hadoop-ozone/s3gateway/pom.xml
@@ -98,7 +98,29 @@
       <groupId>javax.activation</groupId>
       <artifactId>activation</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-netty</artifactId>
+      <version>${io.grpc.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-protobuf</artifactId>
+      <version>${io.grpc.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+      <version>${io.grpc.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-http2</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.ozone</groupId>
       <artifactId>hdds-hadoop-dependency-server</artifactId>
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientCache.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientCache.java
new file mode 100644
index 0000000..4a7c4a2
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientCache.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.s3;
+
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.protocol.S3Auth;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
+import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
+import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PreDestroy;
+import javax.enterprise.context.ApplicationScoped;
+import java.io.IOException;
+import java.security.cert.CertificateException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_TRANSPORT_CLASS;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_TRANSPORT_CLASS_DEFAULT;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY;
+
+/**
+ * Cached ozone client for s3 requests.
+ */
+@ApplicationScoped
+public final class OzoneClientCache {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneClientCache.class);
+  // single, cached OzoneClient established on first connection
+  // for s3g gRPC OmTransport, OmRequest - OmResponse channel
+  private static OzoneClientCache instance;
+  private OzoneClient client;
+  private SecurityConfig secConfig;
+
+  private OzoneClientCache(OzoneConfiguration ozoneConfiguration)
+      throws IOException {
+    // Set the expected OM version if not set via config.
+    ozoneConfiguration.setIfUnset(OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY,
+        OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT);
+    String omServiceID = OmUtils.getOzoneManagerServiceId(ozoneConfiguration);
+    secConfig = new SecurityConfig(ozoneConfiguration);
+    client = null;
+    try {
+      if (secConfig.isGrpcTlsEnabled()) {
+        if (ozoneConfiguration
+            .get(OZONE_OM_TRANSPORT_CLASS,
+                OZONE_OM_TRANSPORT_CLASS_DEFAULT) !=
+            OZONE_OM_TRANSPORT_CLASS_DEFAULT) {
+          // Grpc transport selected
+          // need to get certificate for TLS through
+          // hadoop rpc first via ServiceInfo
+          setCertificate(omServiceID,
+              ozoneConfiguration);
+        }
+      }
+      if (omServiceID == null) {
+        client = OzoneClientFactory.getRpcClient(ozoneConfiguration);
+      } else {
+        // As in HA case, we need to pass om service ID.
+        client = OzoneClientFactory.getRpcClient(omServiceID,
+            ozoneConfiguration);
+      }
+    } catch (IOException e) {
+      LOG.warn("cannot create OzoneClient", e);
+      throw e;
+    }
+    // S3 Gateway should always set the S3 Auth.
+    ozoneConfiguration.setBoolean(S3Auth.S3_AUTH_CHECK, true);
+  }
+
+  public static OzoneClient getOzoneClientInstance(OzoneConfiguration
+                                                      ozoneConfiguration)
+      throws IOException {
+    if (instance == null) {
+      instance = new OzoneClientCache(ozoneConfiguration);
+    }
+    return instance.client;
+  }
+
+  public static void closeClient() throws IOException {
+    if (instance != null) {
+      instance.client.close();
+      instance = null;
+    }
+  }
+
+  private void setCertificate(String omServiceID,
+                              OzoneConfiguration conf)
+      throws IOException {
+
+    // create local copy of config incase exception occurs
+    // with certificate OmRequest
+    OzoneConfiguration config = new OzoneConfiguration(conf);
+    OzoneClient certClient;
+
+    if (secConfig.isGrpcTlsEnabled()) {
+      // set OmTransport to hadoop rpc to securely,
+      // get certificates with service list request
+      config.set(OZONE_OM_TRANSPORT_CLASS,
+          OZONE_OM_TRANSPORT_CLASS_DEFAULT);
+
+      if (omServiceID == null) {
+        certClient = OzoneClientFactory.getRpcClient(config);
+      } else {
+        // As in HA case, we need to pass om service ID.
+        certClient = OzoneClientFactory.getRpcClient(omServiceID,
+            config);
+      }
+      try {
+        ServiceInfoEx serviceInfoEx = certClient
+            .getObjectStore()
+            .getClientProxy()
+            .getOzoneManagerClient()
+            .getServiceInfo();
+
+        if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+          String caCertPem = null;
+          List<String> caCertPems = null;
+          caCertPem = serviceInfoEx.getCaCertificate();
+          caCertPems = serviceInfoEx.getCaCertPemList();
+          if (caCertPems == null || caCertPems.isEmpty()) {
+            if (caCertPem == null) {
+              LOG.error("S3g received empty caCertPems from serviceInfo");
+              throw new CertificateException("No caCerts found; caCertPem can" +
+                  " not be null when caCertPems is empty or null");
+            }
+            caCertPems = Collections.singletonList(caCertPem);
+          }
+          GrpcOmTransport.setCaCerts(OzoneSecurityUtil
+              .convertToX509(caCertPems));
+        }
+      } catch (CertificateException ce) {
+        throw new IOException(ce);
+      } catch (IOException e) {
+        throw e;
+      } finally {
+        if (certClient != null) {
+          certClient.close();
+        }
+      }
+    }
+  }
+
+
+  @PreDestroy
+  public void destroy() throws IOException {
+    OzoneClientCache.closeClient();
+  }
+}
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
index d2118f5..e9ddc08 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java
@@ -29,9 +29,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.om.protocol.S3Auth;
 import org.apache.hadoop.ozone.s3.exception.OS3Exception;
 import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
@@ -39,12 +37,9 @@
 import org.apache.hadoop.ozone.s3.signature.SignatureInfo.Version;
 import org.apache.hadoop.ozone.s3.signature.SignatureProcessor;
 import org.apache.hadoop.ozone.s3.signature.StringToSignProducer;
-import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.ACCESS_DENIED;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INTERNAL_ERROR;
 
@@ -57,7 +52,7 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(OzoneClientProducer.class);
 
-  private static OzoneClient client;
+  private OzoneClient client;
 
   @Inject
   private SignatureProcessor signatureProcessor;
@@ -71,9 +66,7 @@
   @Produces
   public synchronized OzoneClient createClient() throws WebApplicationException,
       IOException {
-    if (client == null) {
-      client = createOzoneClient();
-    }
+    client = getClient(ozoneConfiguration);      
     return client;
   }
 
@@ -112,25 +105,24 @@
     }
   }
 
-  @NotNull
-  @VisibleForTesting
-  OzoneClient createOzoneClient() throws IOException {
-    // S3 Gateway should always set the S3 Auth.
-    ozoneConfiguration.setBoolean(S3Auth.S3_AUTH_CHECK, true);
-    // Set the expected OM version if not set via config.
-    ozoneConfiguration.setIfUnset(OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY,
-        OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT);
-    String omServiceID = OmUtils.getOzoneManagerServiceId(ozoneConfiguration);
-    if (omServiceID == null) {
-      return OzoneClientFactory.getRpcClient(ozoneConfiguration);
-    } else {
-      // As in HA case, we need to pass om service ID.
-      return OzoneClientFactory.getRpcClient(omServiceID,
-          ozoneConfiguration);
+  private OzoneClient getClient(OzoneConfiguration config)
+      throws IOException {
+    OzoneClient ozoneClient = null;
+    try {
+      ozoneClient =
+          OzoneClientCache.getOzoneClientInstance(ozoneConfiguration);
+    } catch (Exception e) {
+      // For any other critical errors during object creation throw Internal
+      // error.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Error during Client Creation: ", e);
+      }
+      throw e;
     }
+    return ozoneClient;
   }
 
-  public void setOzoneConfiguration(OzoneConfiguration config) {
+  public synchronized void setOzoneConfiguration(OzoneConfiguration config) {
     this.ozoneConfiguration = config;
   }
 
@@ -138,7 +130,7 @@
   public void setSignatureParser(SignatureProcessor awsSignatureProcessor) {
     this.signatureProcessor = awsSignatureProcessor;
   }
-
+    
   private WebApplicationException wrapOS3Exception(OS3Exception os3Exception) {
     return new WebApplicationException(os3Exception.getErrorMessage(),
         os3Exception,
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index 9a0efc7..b08bfdb 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@ -79,6 +79,12 @@
     } catch (OMException ex) {
       if (ex.getResult() == ResultCodes.KEY_NOT_FOUND) {
         throw newError(S3ErrorTable.NO_SUCH_BUCKET, bucketName, ex);
+      } else if (ex.getResult() == ResultCodes.INVALID_TOKEN) {
+        throw newError(S3ErrorTable.ACCESS_DENIED,
+            s3Auth.getAccessID(), ex);
+      } else if (ex.getResult() == ResultCodes.TIMEOUT ||
+          ex.getResult() == ResultCodes.INTERNAL_ERROR) {
+        throw newError(S3ErrorTable.INTERNAL_ERROR, bucketName, ex);
       } else {
         throw ex;
       }
@@ -110,8 +116,14 @@
       if (ex.getResult() == ResultCodes.BUCKET_NOT_FOUND
           || ex.getResult() == ResultCodes.VOLUME_NOT_FOUND) {
         throw newError(S3ErrorTable.NO_SUCH_BUCKET, bucketName, ex);
+      } else if (ex.getResult() == ResultCodes.INVALID_TOKEN) {
+        throw newError(S3ErrorTable.ACCESS_DENIED,
+            s3Auth.getAccessID(), ex);
       } else if (ex.getResult() == ResultCodes.PERMISSION_DENIED) {
         throw newError(S3ErrorTable.ACCESS_DENIED, bucketName, ex);
+      } else if (ex.getResult() == ResultCodes.TIMEOUT ||
+          ex.getResult() == ResultCodes.INTERNAL_ERROR) {
+        throw newError(S3ErrorTable.INTERNAL_ERROR, bucketName, ex);
       } else {
         throw ex;
       }
@@ -140,6 +152,12 @@
       getMetrics().incCreateBucketFailure();
       if (ex.getResult() == ResultCodes.PERMISSION_DENIED) {
         throw newError(S3ErrorTable.ACCESS_DENIED, bucketName, ex);
+      } else if (ex.getResult() == ResultCodes.INVALID_TOKEN) {
+        throw newError(S3ErrorTable.ACCESS_DENIED,
+            s3Auth.getAccessID(), ex);
+      } else if (ex.getResult() == ResultCodes.TIMEOUT ||
+          ex.getResult() == ResultCodes.INTERNAL_ERROR) {
+        throw newError(S3ErrorTable.INTERNAL_ERROR, bucketName, ex);
       } else if (ex.getResult() != ResultCodes.BUCKET_ALREADY_EXISTS) {
         // S3 does not return error for bucket already exists, it just
         // returns the location.
@@ -160,9 +178,17 @@
       client.getObjectStore().deleteS3Bucket(s3BucketName);
     } catch (OMException ex) {
       if (ex.getResult() == ResultCodes.PERMISSION_DENIED) {
-        throw newError(S3ErrorTable.ACCESS_DENIED, s3BucketName, ex);
+        throw newError(S3ErrorTable.ACCESS_DENIED,
+            s3BucketName, ex);
+      } else if (ex.getResult() == ResultCodes.INVALID_TOKEN) {
+        throw newError(S3ErrorTable.ACCESS_DENIED,
+            s3Auth.getAccessID(), ex);
+      } else if (ex.getResult() == ResultCodes.TIMEOUT ||
+          ex.getResult() == ResultCodes.INTERNAL_ERROR) {
+        throw newError(S3ErrorTable.INTERNAL_ERROR, s3BucketName, ex);
+      } else {
+        throw ex;
       }
-      throw ex;
     }
   }
 
@@ -203,7 +229,15 @@
       if (e.getResult() == ResultCodes.VOLUME_NOT_FOUND) {
         return Collections.emptyIterator();
       } else  if (e.getResult() == ResultCodes.PERMISSION_DENIED) {
-        throw newError(S3ErrorTable.ACCESS_DENIED, "listBuckets", e);
+        throw newError(S3ErrorTable.ACCESS_DENIED,
+            "listBuckets", e);
+      } else if (e.getResult() == ResultCodes.INVALID_TOKEN) {
+        throw newError(S3ErrorTable.ACCESS_DENIED,
+            s3Auth.getAccessID(), e);
+      } else if (e.getResult() == ResultCodes.TIMEOUT ||
+          e.getResult() == ResultCodes.INTERNAL_ERROR) {
+        throw newError(S3ErrorTable.INTERNAL_ERROR,
+            "listBuckets", e);
       } else {
         throw e;
       }
diff --git a/hadoop-ozone/s3gateway/src/main/resources/META-INF/services/org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory b/hadoop-ozone/s3gateway/src/main/resources/META-INF/services/org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory
new file mode 100644
index 0000000..254933b
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/main/resources/META-INF/services/org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransportFactory
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java
new file mode 100644
index 0000000..304a2bf
--- /dev/null
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.protocolPB;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_TRANSPORT_CLASS;
+
+/**
+ * Tests for GrpcOmTransport.
+ */
+public class TestGrpcOmTransport {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestGrpcOmTransport.class);
+  @Rule
+  public Timeout timeout = Timeout.seconds(30);
+
+
+  @Test
+  public void testGrpcOmTransportFactory() throws Exception {
+    String omServiceId = "";
+    String transportCls = GrpcOmTransport.class.getName();
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(OZONE_OM_TRANSPORT_CLASS,
+        transportCls);
+
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    OmTransport omTransport = OmTransportFactory.create(conf, ugi, omServiceId);
+    Assert.assertEquals(GrpcOmTransport.class.getSimpleName(),
+        omTransport.getClass().getSimpleName());
+
+  }
+
+  @Test
+  public void testHrpcOmTransportFactory() throws Exception {
+    String omServiceId = "";
+    OzoneConfiguration conf = new OzoneConfiguration();
+
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    OmTransport omTransport = OmTransportFactory.create(conf, ugi, omServiceId);
+    // OmTransport should be Hadoop Rpc and
+    // fail equality GrpcOmTransport equality test
+    Assert.assertNotEquals(GrpcOmTransport.class.getSimpleName(),
+        omTransport.getClass().getSimpleName());
+  }
+
+  @Test
+  public void testStartStop() throws Exception {
+    String omServiceId = "";
+    OzoneConfiguration conf = new OzoneConfiguration();
+
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    GrpcOmTransport client = new GrpcOmTransport(conf, ugi, omServiceId);
+
+    try {
+      client.start();
+    } finally {
+      client.shutdown();
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index fd0578c..e69803f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,10 +196,13 @@
     <!-- Maven protoc compiler -->
     <protobuf-maven-plugin.version>0.5.1</protobuf-maven-plugin.version>
     <grpc.protobuf-compile.version>3.12.0</grpc.protobuf-compile.version>
-    <grpc-compile.version>1.33.0</grpc-compile.version>
     <os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
 
     <netty.version>4.1.63.Final</netty.version>
+    <io.grpc.version>1.38.0</io.grpc.version>
+    <tcnative.version>2.0.38.Final</tcnative.version> <!-- See table for correct version -->
+    <!-- Table for netty, grpc & tcnative version combinations  -->
+    <!-- https://github.com/grpc/grpc-java/blob/master/SECURITY.md#netty -->
 
     <!-- define the Java language version used by the compiler -->
     <javac.version>1.8</javac.version>