HDDS-11416. refactor ratis submit request avoid code duplicate (#7166)

diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRangerBGSyncService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRangerBGSyncService.java
index abc21ed..a173bd9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRangerBGSyncService.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRangerBGSyncService.java
@@ -80,6 +80,7 @@
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
@@ -233,7 +234,7 @@ public void setUp() throws IOException {
         ozoneManager.getMetadataManager().getMetaTable().put(
             OzoneConsts.RANGER_OZONE_SERVICE_VERSION_KEY, String.valueOf(v));
         return null;
-      }).when(omRatisServer).submitRequest(any(), any());
+      }).when(omRatisServer).submitRequest(any(), any(), anyLong());
     } catch (ServiceException e) {
       throw new RuntimeException(e);
     }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index b6903ca..0038bca 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -2065,6 +2065,7 @@ private void addOMNodeToPeers(String newOMNodeId) throws IOException {
     } catch (IOException e) {
       LOG.error("{}: Couldn't add OM {} to peer list.", getOMNodeId(),
           newOMNodeId);
+      return;
     }
 
     if (omRatisSnapshotProvider == null) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
index 6e1c9da..bd46222 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
@@ -17,7 +17,6 @@
 package org.apache.hadoop.ozone.om;
 
 import com.google.common.base.Preconditions;
-import com.google.protobuf.RpcController;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -35,15 +34,12 @@
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,8 +65,6 @@
  */
 public class TrashOzoneFileSystem extends FileSystem {
 
-  private static final RpcController NULL_RPC_CONTROLLER = null;
-
   private static final int OZONE_FS_ITERATE_BATCH_SIZE = 100;
 
   private static final int OZONE_MAX_LIST_KEYS_SIZE = 10000;
@@ -97,34 +91,15 @@ public TrashOzoneFileSystem(OzoneManager ozoneManager) throws IOException {
     ozoneConfiguration = OzoneConfiguration.of(getConf());
   }
 
-  private RaftClientRequest getRatisRequest(
-      OzoneManagerProtocolProtos.OMRequest omRequest) {
-    return RaftClientRequest.newBuilder()
-        .setClientId(CLIENT_ID)
-        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
-        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
-        .setCallId(runCount.getAndIncrement())
-        .setMessage(
-            Message.valueOf(
-                OMRatisHelper.convertRequestToByteString(omRequest)))
-        .setType(RaftClientRequest.writeRequestType())
-        .build();
-
-  }
-
   private void submitRequest(OzoneManagerProtocolProtos.OMRequest omRequest)
       throws Exception {
     ozoneManager.getMetrics().incNumTrashWriteRequests();
     if (ozoneManager.isRatisEnabled()) {
-      OMClientRequest omClientRequest =
-          OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
+      // perform preExecute as ratis submit do no perform preExecute
+      OMClientRequest omClientRequest = OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
       omRequest = omClientRequest.preExecute(ozoneManager);
-      RaftClientRequest req = getRatisRequest(omRequest);
-      ozoneManager.getOmRatisServer().submitRequest(omRequest, req);
-    } else {
-      ozoneManager.getOmServerProtocol().
-          submitRequest(NULL_RPC_CONTROLLER, omRequest);
     }
+    OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, CLIENT_ID, runCount.getAndIncrement());
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 78d6ed8..af4d42a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -301,15 +301,23 @@ private RaftClientRequest createRaftRequest(OMRequest omRequest) {
   }
 
   /**
-   * API used internally from OzoneManager Server when requests needs to be
-   * submitted to ratis, where the crafted RaftClientRequest is passed along.
+   * API used internally from OzoneManager Server when requests need to be submitted.
    * @param omRequest
-   * @param raftClientRequest
+   * @param cliId
+   * @param callId
    * @return OMResponse
    * @throws ServiceException
    */
-  public OMResponse submitRequest(OMRequest omRequest,
-      RaftClientRequest raftClientRequest) throws ServiceException {
+  public OMResponse submitRequest(OMRequest omRequest, ClientId cliId, long callId) throws ServiceException {
+    RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
+        .setClientId(cliId)
+        .setServerId(getRaftPeerId())
+        .setGroupId(getRaftGroupId())
+        .setCallId(callId)
+        .setMessage(Message.valueOf(
+            OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
     RaftClientReply raftClientReply =
         submitRequestToRatis(raftClientRequest);
     return createOmResponse(omRequest, raftClientReply);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 5dc640c..ffaedaa 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -19,6 +19,7 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import java.io.File;
 import java.nio.file.InvalidPathException;
@@ -98,6 +99,7 @@
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.protocol.ClientId;
 import org.rocksdb.RocksDBException;
 
 import java.io.IOException;
@@ -117,6 +119,7 @@
 public final class OzoneManagerRatisUtils {
   private static final Logger LOG = LoggerFactory
       .getLogger(OzoneManagerRatisUtils.class);
+  private static final RpcController NULL_RPC_CONTROLLER = null;
 
   private OzoneManagerRatisUtils() {
   }
@@ -502,4 +505,13 @@ public static GrpcTlsConfig createServerTlsConfig(SecurityConfig conf,
 
     return null;
   }
+
+  public static OzoneManagerProtocolProtos.OMResponse submitRequest(
+      OzoneManager om, OMRequest omRequest, ClientId clientId, long callId) throws ServiceException {
+    if (om.isRatisEnabled()) {
+      return om.getOmRatisServer().submitRequest(omRequest, clientId, callId);
+    } else {
+      return om.getOmServerProtocol().submitRequest(NULL_RPC_CONTROLLER, omRequest);
+    }
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index 154bd47..2c2d16b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -33,11 +33,11 @@
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
@@ -48,8 +48,6 @@
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
@@ -247,10 +245,7 @@ private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
 
     // Submit PurgeKeys request to OM
     try {
-      RaftClientRequest raftClientRequest =
-          createRaftClientRequestForPurge(omRequest);
-      ozoneManager.getOmRatisServer().submitRequest(omRequest,
-          raftClientRequest);
+      OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
     } catch (ServiceException e) {
       LOG.error("PurgeKey request failed. Will retry at next run.");
       return 0;
@@ -259,20 +254,6 @@ private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
     return deletedCount;
   }
 
-  protected RaftClientRequest createRaftClientRequestForPurge(
-      OMRequest omRequest) {
-    return RaftClientRequest.newBuilder()
-        .setClientId(clientId)
-        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
-        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
-        .setCallId(runCount.get())
-        .setMessage(
-            Message.valueOf(
-                OMRatisHelper.convertRequestToByteString(omRequest)))
-        .setType(RaftClientRequest.writeRequestType())
-        .build();
-  }
-
   /**
    * Parse Volume and Bucket Name from ObjectKey and add it to given map of
    * keys to be purged per bucket.
@@ -311,15 +292,7 @@ protected void submitPurgePaths(List<PurgePathRequest> requests,
 
     // Submit Purge paths request to OM
     try {
-      if (isRatisEnabled()) {
-        RaftClientRequest raftClientRequest =
-            createRaftClientRequestForPurge(omRequest);
-        ozoneManager.getOmRatisServer().submitRequest(omRequest,
-            raftClientRequest);
-      } else {
-        getOzoneManager().getOmServerProtocol()
-            .submitRequest(null, omRequest);
-      }
+      OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
     } catch (ServiceException e) {
       LOG.error("PurgePaths request failed. Will retry at next run.");
     }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index aa2eb67..5e622cb 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -42,8 +42,7 @@
 import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
@@ -67,8 +66,6 @@
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -481,24 +478,7 @@ private void updateDeepCleanedSnapshots(List<String> deepCleanedSnapshots) {
 
     public void submitRequest(OMRequest omRequest, ClientId clientId) {
       try {
-        if (isRatisEnabled()) {
-          OzoneManagerRatisServer server = getOzoneManager().getOmRatisServer();
-
-          RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
-              .setClientId(clientId)
-              .setServerId(server.getRaftPeerId())
-              .setGroupId(server.getRaftGroupId())
-              .setCallId(getRunCount().get())
-              .setMessage(Message.valueOf(
-                  OMRatisHelper.convertRequestToByteString(omRequest)))
-              .setType(RaftClientRequest.writeRequestType())
-              .build();
-
-          server.submitRequest(omRequest, raftClientRequest);
-        } else {
-          getOzoneManager().getOmServerProtocol()
-              .submitRequest(null, omRequest);
-        }
+        OzoneManagerRatisUtils.submitRequest(getOzoneManager(), omRequest, clientId, getRunCount().get());
       } catch (ServiceException e) {
         LOG.error("Snapshot deep cleaning request failed. " +
             "Will retry at next run.", e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java
index 1199a0c..f108415 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java
@@ -29,16 +29,13 @@
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -208,24 +205,7 @@ private OMRequest createRequest(List<ExpiredMultipartUploadsBucket>
 
     private void submitRequest(OMRequest omRequest) {
       try {
-        if (isRatisEnabled()) {
-          OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();
-
-          RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
-              .setClientId(clientId)
-              .setServerId(server.getRaftPeerId())
-              .setGroupId(server.getRaftGroupId())
-              .setCallId(runCount.get())
-              .setMessage(Message.valueOf(
-                  OMRatisHelper.convertRequestToByteString(omRequest)))
-              .setType(RaftClientRequest.writeRequestType())
-              .build();
-
-          server.submitRequest(omRequest, raftClientRequest);
-        } else {
-          ozoneManager.getOmServerProtocol().submitRequest(null,
-              omRequest);
-        }
+        OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
       } catch (ServiceException e) {
         LOG.error("Expired multipart info delete request failed. " +
             "Will retry at next run.", e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java
index 4511203..768c77a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java
@@ -47,7 +47,6 @@
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
 import org.apache.hadoop.ozone.om.multitenant.AuthorizerLock;
@@ -55,12 +54,11 @@
 import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController;
 import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Policy;
 import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Role;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetRangerServiceVersionRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -375,19 +373,6 @@ long getRangerOzoneServicePolicyVersion() throws IOException {
     return policyVersion;
   }
 
-  private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
-    return RaftClientRequest.newBuilder()
-        .setClientId(CLIENT_ID)
-        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
-        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
-        .setCallId(runCount.get())
-        .setMessage(
-            Message.valueOf(
-                OMRatisHelper.convertRequestToByteString(omRequest)))
-        .setType(RaftClientRequest.writeRequestType())
-        .build();
-  }
-
   public void setOMDBRangerServiceVersion(long version)
           throws ServiceException {
     // OM DB update goes through Ratis
@@ -402,9 +387,7 @@ public void setOMDBRangerServiceVersion(long version)
         .build();
 
     try {
-      RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);
-      ozoneManager.getOmRatisServer().submitRequest(omRequest,
-          raftClientRequest);
+      OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, CLIENT_ID, runCount.get());
     } catch (ServiceException e) {
       LOG.error("SetRangerServiceVersion request failed. "
           + "Will retry at next run.");
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
index ab55623..c0d958f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
@@ -31,8 +31,7 @@
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteOpenKeysRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -41,8 +40,6 @@
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -268,24 +265,7 @@ private OMRequest createDeleteOpenKeysRequest(
 
     private OMResponse submitRequest(OMRequest omRequest) {
       try {
-        if (isRatisEnabled()) {
-          OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();
-
-          RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
-              .setClientId(clientId)
-              .setServerId(server.getRaftPeerId())
-              .setGroupId(server.getRaftGroupId())
-              .setCallId(runCount.get())
-              .setMessage(Message.valueOf(
-                  OMRatisHelper.convertRequestToByteString(omRequest)))
-              .setType(RaftClientRequest.writeRequestType())
-              .build();
-
-          return server.submitRequest(omRequest, raftClientRequest);
-        } else {
-          return ozoneManager.getOmServerProtocol().submitRequest(
-              null, omRequest);
-        }
+        return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
       } catch (ServiceException e) {
         LOG.error("Open key " + omRequest.getCmdType()
             + " request failed. Will retry at next run.", e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
index c043a6a..1a29ee8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
@@ -51,14 +51,11 @@
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -194,22 +191,7 @@ private void repairActiveDb(
   private OzoneManagerProtocolProtos.OMResponse submitRequest(
       OzoneManagerProtocolProtos.OMRequest omRequest, ClientId clientId) throws Exception {
     try {
-      if (om.isRatisEnabled()) {
-        OzoneManagerRatisServer server = om.getOmRatisServer();
-        RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
-            .setClientId(clientId)
-            .setServerId(om.getOmRatisServer().getRaftPeerId())
-            .setGroupId(om.getOmRatisServer().getRaftGroupId())
-            .setCallId(RUN_CNT.getAndIncrement())
-            .setMessage(Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest)))
-            .setType(RaftClientRequest.writeRequestType())
-            .build();
-        return server.submitRequest(omRequest, raftClientRequest);
-      } else {
-        RUN_CNT.getAndIncrement();
-        return om.getOmServerProtocol().submitRequest(
-            null, omRequest);
-      }
+      return OzoneManagerRatisUtils.submitRequest(om, omRequest, clientId, RUN_CNT.getAndIncrement());
     } catch (ServiceException e) {
       LOG.error("repair quota count " + omRequest.getCmdType() + " request failed.", e);
       throw e;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
index a98081c..f85bd78 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
@@ -40,7 +40,6 @@
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -48,7 +47,7 @@
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
@@ -58,8 +57,6 @@
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -562,23 +559,7 @@ public void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
 
     public void submitRequest(OMRequest omRequest) {
       try {
-        if (isRatisEnabled()) {
-          OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();
-
-          RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
-              .setClientId(clientId)
-              .setServerId(server.getRaftPeerId())
-              .setGroupId(server.getRaftGroupId())
-              .setCallId(getRunCount().get())
-              .setMessage(Message.valueOf(
-                  OMRatisHelper.convertRequestToByteString(omRequest)))
-              .setType(RaftClientRequest.writeRequestType())
-              .build();
-
-          server.submitRequest(omRequest, raftClientRequest);
-        } else {
-          ozoneManager.getOmServerProtocol().submitRequest(null, omRequest);
-        }
+        OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, getRunCount().get());
       } catch (ServiceException e) {
         LOG.error("Snapshot Deleting request failed. " +
             "Will retry at next run.", e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
index 9746b44..26d5d24 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
@@ -34,13 +34,12 @@
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.SnapshotChainManager;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -48,8 +47,6 @@
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -436,25 +433,7 @@ private void updateDeepCleanSnapshotDir(String snapshotKeyTable) {
 
   public void submitRequest(OMRequest omRequest, ClientId clientId) {
     try {
-      if (isRatisEnabled()) {
-        OzoneManagerRatisServer server =
-            getOzoneManager().getOmRatisServer();
-
-        RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
-            .setClientId(clientId)
-            .setServerId(server.getRaftPeerId())
-            .setGroupId(server.getRaftGroupId())
-            .setCallId(getRunCount().get())
-            .setMessage(Message.valueOf(
-                OMRatisHelper.convertRequestToByteString(omRequest)))
-            .setType(RaftClientRequest.writeRequestType())
-            .build();
-
-        server.submitRequest(omRequest, raftClientRequest);
-      } else {
-        getOzoneManager().getOmServerProtocol()
-            .submitRequest(null, omRequest);
-      }
+      OzoneManagerRatisUtils.submitRequest(getOzoneManager(), omRequest, clientId, getRunCount().get());
     } catch (ServiceException e) {
       LOG.error("Snapshot deep cleaning request failed. " +
           "Will retry at next run.", e);