HDDS-11416. refactor ratis submit request avoid code duplicate (#7166)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRangerBGSyncService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRangerBGSyncService.java
index abc21ed..a173bd9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRangerBGSyncService.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRangerBGSyncService.java
@@ -80,6 +80,7 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@@ -233,7 +234,7 @@ public void setUp() throws IOException {
ozoneManager.getMetadataManager().getMetaTable().put(
OzoneConsts.RANGER_OZONE_SERVICE_VERSION_KEY, String.valueOf(v));
return null;
- }).when(omRatisServer).submitRequest(any(), any());
+ }).when(omRatisServer).submitRequest(any(), any(), anyLong());
} catch (ServiceException e) {
throw new RuntimeException(e);
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index b6903ca..0038bca 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -2065,6 +2065,7 @@ private void addOMNodeToPeers(String newOMNodeId) throws IOException {
} catch (IOException e) {
LOG.error("{}: Couldn't add OM {} to peer list.", getOMNodeId(),
newOMNodeId);
+ return;
}
if (omRatisSnapshotProvider == null) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
index 6e1c9da..bd46222 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java
@@ -17,7 +17,6 @@
package org.apache.hadoop.ozone.om;
import com.google.common.base.Preconditions;
-import com.google.protobuf.RpcController;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -35,15 +34,12 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,8 +65,6 @@
*/
public class TrashOzoneFileSystem extends FileSystem {
- private static final RpcController NULL_RPC_CONTROLLER = null;
-
private static final int OZONE_FS_ITERATE_BATCH_SIZE = 100;
private static final int OZONE_MAX_LIST_KEYS_SIZE = 10000;
@@ -97,34 +91,15 @@ public TrashOzoneFileSystem(OzoneManager ozoneManager) throws IOException {
ozoneConfiguration = OzoneConfiguration.of(getConf());
}
- private RaftClientRequest getRatisRequest(
- OzoneManagerProtocolProtos.OMRequest omRequest) {
- return RaftClientRequest.newBuilder()
- .setClientId(CLIENT_ID)
- .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
- .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
- .setCallId(runCount.getAndIncrement())
- .setMessage(
- Message.valueOf(
- OMRatisHelper.convertRequestToByteString(omRequest)))
- .setType(RaftClientRequest.writeRequestType())
- .build();
-
- }
-
private void submitRequest(OzoneManagerProtocolProtos.OMRequest omRequest)
throws Exception {
ozoneManager.getMetrics().incNumTrashWriteRequests();
if (ozoneManager.isRatisEnabled()) {
- OMClientRequest omClientRequest =
- OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
+ // perform preExecute as ratis submit do no perform preExecute
+ OMClientRequest omClientRequest = OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
omRequest = omClientRequest.preExecute(ozoneManager);
- RaftClientRequest req = getRatisRequest(omRequest);
- ozoneManager.getOmRatisServer().submitRequest(omRequest, req);
- } else {
- ozoneManager.getOmServerProtocol().
- submitRequest(NULL_RPC_CONTROLLER, omRequest);
}
+ OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, CLIENT_ID, runCount.getAndIncrement());
}
@Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 78d6ed8..af4d42a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -301,15 +301,23 @@ private RaftClientRequest createRaftRequest(OMRequest omRequest) {
}
/**
- * API used internally from OzoneManager Server when requests needs to be
- * submitted to ratis, where the crafted RaftClientRequest is passed along.
+ * API used internally from OzoneManager Server when requests need to be submitted.
* @param omRequest
- * @param raftClientRequest
+ * @param cliId
+ * @param callId
* @return OMResponse
* @throws ServiceException
*/
- public OMResponse submitRequest(OMRequest omRequest,
- RaftClientRequest raftClientRequest) throws ServiceException {
+ public OMResponse submitRequest(OMRequest omRequest, ClientId cliId, long callId) throws ServiceException {
+ RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
+ .setClientId(cliId)
+ .setServerId(getRaftPeerId())
+ .setGroupId(getRaftGroupId())
+ .setCallId(callId)
+ .setMessage(Message.valueOf(
+ OMRatisHelper.convertRequestToByteString(omRequest)))
+ .setType(RaftClientRequest.writeRequestType())
+ .build();
RaftClientReply raftClientReply =
submitRequestToRatis(raftClientRequest);
return createOmResponse(omRequest, raftClientReply);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 5dc640c..ffaedaa 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -19,6 +19,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.File;
import java.nio.file.InvalidPathException;
@@ -98,6 +99,7 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.protocol.ClientId;
import org.rocksdb.RocksDBException;
import java.io.IOException;
@@ -117,6 +119,7 @@
public final class OzoneManagerRatisUtils {
private static final Logger LOG = LoggerFactory
.getLogger(OzoneManagerRatisUtils.class);
+ private static final RpcController NULL_RPC_CONTROLLER = null;
private OzoneManagerRatisUtils() {
}
@@ -502,4 +505,13 @@ public static GrpcTlsConfig createServerTlsConfig(SecurityConfig conf,
return null;
}
+
+ public static OzoneManagerProtocolProtos.OMResponse submitRequest(
+ OzoneManager om, OMRequest omRequest, ClientId clientId, long callId) throws ServiceException {
+ if (om.isRatisEnabled()) {
+ return om.getOmRatisServer().submitRequest(omRequest, clientId, callId);
+ } else {
+ return om.getOmServerProtocol().submitRequest(NULL_RPC_CONTROLLER, omRequest);
+ }
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index 154bd47..2c2d16b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -33,11 +33,11 @@
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
@@ -48,8 +48,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.util.Preconditions;
import java.io.IOException;
@@ -247,10 +245,7 @@ private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
// Submit PurgeKeys request to OM
try {
- RaftClientRequest raftClientRequest =
- createRaftClientRequestForPurge(omRequest);
- ozoneManager.getOmRatisServer().submitRequest(omRequest,
- raftClientRequest);
+ OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
} catch (ServiceException e) {
LOG.error("PurgeKey request failed. Will retry at next run.");
return 0;
@@ -259,20 +254,6 @@ private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
return deletedCount;
}
- protected RaftClientRequest createRaftClientRequestForPurge(
- OMRequest omRequest) {
- return RaftClientRequest.newBuilder()
- .setClientId(clientId)
- .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
- .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
- .setCallId(runCount.get())
- .setMessage(
- Message.valueOf(
- OMRatisHelper.convertRequestToByteString(omRequest)))
- .setType(RaftClientRequest.writeRequestType())
- .build();
- }
-
/**
* Parse Volume and Bucket Name from ObjectKey and add it to given map of
* keys to be purged per bucket.
@@ -311,15 +292,7 @@ protected void submitPurgePaths(List<PurgePathRequest> requests,
// Submit Purge paths request to OM
try {
- if (isRatisEnabled()) {
- RaftClientRequest raftClientRequest =
- createRaftClientRequestForPurge(omRequest);
- ozoneManager.getOmRatisServer().submitRequest(omRequest,
- raftClientRequest);
- } else {
- getOzoneManager().getOmServerProtocol()
- .submitRequest(null, omRequest);
- }
+ OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
} catch (ServiceException e) {
LOG.error("PurgePaths request failed. Will retry at next run.");
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index aa2eb67..5e622cb 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -42,8 +42,7 @@
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
@@ -67,8 +66,6 @@
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -481,24 +478,7 @@ private void updateDeepCleanedSnapshots(List<String> deepCleanedSnapshots) {
public void submitRequest(OMRequest omRequest, ClientId clientId) {
try {
- if (isRatisEnabled()) {
- OzoneManagerRatisServer server = getOzoneManager().getOmRatisServer();
-
- RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
- .setClientId(clientId)
- .setServerId(server.getRaftPeerId())
- .setGroupId(server.getRaftGroupId())
- .setCallId(getRunCount().get())
- .setMessage(Message.valueOf(
- OMRatisHelper.convertRequestToByteString(omRequest)))
- .setType(RaftClientRequest.writeRequestType())
- .build();
-
- server.submitRequest(omRequest, raftClientRequest);
- } else {
- getOzoneManager().getOmServerProtocol()
- .submitRequest(null, omRequest);
- }
+ OzoneManagerRatisUtils.submitRequest(getOzoneManager(), omRequest, clientId, getRunCount().get());
} catch (ServiceException e) {
LOG.error("Snapshot deep cleaning request failed. " +
"Will retry at next run.", e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java
index 1199a0c..f108415 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java
@@ -29,16 +29,13 @@
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -208,24 +205,7 @@ private OMRequest createRequest(List<ExpiredMultipartUploadsBucket>
private void submitRequest(OMRequest omRequest) {
try {
- if (isRatisEnabled()) {
- OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();
-
- RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
- .setClientId(clientId)
- .setServerId(server.getRaftPeerId())
- .setGroupId(server.getRaftGroupId())
- .setCallId(runCount.get())
- .setMessage(Message.valueOf(
- OMRatisHelper.convertRequestToByteString(omRequest)))
- .setType(RaftClientRequest.writeRequestType())
- .build();
-
- server.submitRequest(omRequest, raftClientRequest);
- } else {
- ozoneManager.getOmServerProtocol().submitRequest(null,
- omRequest);
- }
+ OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
} catch (ServiceException e) {
LOG.error("Expired multipart info delete request failed. " +
"Will retry at next run.", e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java
index 4511203..768c77a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java
@@ -47,7 +47,6 @@
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
import org.apache.hadoop.ozone.om.multitenant.AuthorizerLock;
@@ -55,12 +54,11 @@
import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController;
import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Policy;
import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Role;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetRangerServiceVersionRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -375,19 +373,6 @@ long getRangerOzoneServicePolicyVersion() throws IOException {
return policyVersion;
}
- private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
- return RaftClientRequest.newBuilder()
- .setClientId(CLIENT_ID)
- .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
- .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
- .setCallId(runCount.get())
- .setMessage(
- Message.valueOf(
- OMRatisHelper.convertRequestToByteString(omRequest)))
- .setType(RaftClientRequest.writeRequestType())
- .build();
- }
-
public void setOMDBRangerServiceVersion(long version)
throws ServiceException {
// OM DB update goes through Ratis
@@ -402,9 +387,7 @@ public void setOMDBRangerServiceVersion(long version)
.build();
try {
- RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);
- ozoneManager.getOmRatisServer().submitRequest(omRequest,
- raftClientRequest);
+ OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, CLIENT_ID, runCount.get());
} catch (ServiceException e) {
LOG.error("SetRangerServiceVersion request failed. "
+ "Will retry at next run.");
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
index ab55623..c0d958f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
@@ -31,8 +31,7 @@
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteOpenKeysRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -41,8 +40,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -268,24 +265,7 @@ private OMRequest createDeleteOpenKeysRequest(
private OMResponse submitRequest(OMRequest omRequest) {
try {
- if (isRatisEnabled()) {
- OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();
-
- RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
- .setClientId(clientId)
- .setServerId(server.getRaftPeerId())
- .setGroupId(server.getRaftGroupId())
- .setCallId(runCount.get())
- .setMessage(Message.valueOf(
- OMRatisHelper.convertRequestToByteString(omRequest)))
- .setType(RaftClientRequest.writeRequestType())
- .build();
-
- return server.submitRequest(omRequest, raftClientRequest);
- } else {
- return ozoneManager.getOmServerProtocol().submitRequest(
- null, omRequest);
- }
+ return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
} catch (ServiceException e) {
LOG.error("Open key " + omRequest.getCmdType()
+ " request failed. Will retry at next run.", e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
index c043a6a..1a29ee8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java
@@ -51,14 +51,11 @@
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -194,22 +191,7 @@ private void repairActiveDb(
private OzoneManagerProtocolProtos.OMResponse submitRequest(
OzoneManagerProtocolProtos.OMRequest omRequest, ClientId clientId) throws Exception {
try {
- if (om.isRatisEnabled()) {
- OzoneManagerRatisServer server = om.getOmRatisServer();
- RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
- .setClientId(clientId)
- .setServerId(om.getOmRatisServer().getRaftPeerId())
- .setGroupId(om.getOmRatisServer().getRaftGroupId())
- .setCallId(RUN_CNT.getAndIncrement())
- .setMessage(Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest)))
- .setType(RaftClientRequest.writeRequestType())
- .build();
- return server.submitRequest(omRequest, raftClientRequest);
- } else {
- RUN_CNT.getAndIncrement();
- return om.getOmServerProtocol().submitRequest(
- null, omRequest);
- }
+ return OzoneManagerRatisUtils.submitRequest(om, omRequest, clientId, RUN_CNT.getAndIncrement());
} catch (ServiceException e) {
LOG.error("repair quota count " + omRequest.getCmdType() + " request failed.", e);
throw e;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
index a98081c..f85bd78 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
@@ -40,7 +40,6 @@
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -48,7 +47,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
@@ -58,8 +57,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -562,23 +559,7 @@ public void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
public void submitRequest(OMRequest omRequest) {
try {
- if (isRatisEnabled()) {
- OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();
-
- RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
- .setClientId(clientId)
- .setServerId(server.getRaftPeerId())
- .setGroupId(server.getRaftGroupId())
- .setCallId(getRunCount().get())
- .setMessage(Message.valueOf(
- OMRatisHelper.convertRequestToByteString(omRequest)))
- .setType(RaftClientRequest.writeRequestType())
- .build();
-
- server.submitRequest(omRequest, raftClientRequest);
- } else {
- ozoneManager.getOmServerProtocol().submitRequest(null, omRequest);
- }
+ OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, getRunCount().get());
} catch (ServiceException e) {
LOG.error("Snapshot Deleting request failed. " +
"Will retry at next run.", e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
index 9746b44..26d5d24 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
@@ -34,13 +34,12 @@
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
-import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -48,8 +47,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
import java.io.IOException;
import java.util.ArrayList;
@@ -436,25 +433,7 @@ private void updateDeepCleanSnapshotDir(String snapshotKeyTable) {
public void submitRequest(OMRequest omRequest, ClientId clientId) {
try {
- if (isRatisEnabled()) {
- OzoneManagerRatisServer server =
- getOzoneManager().getOmRatisServer();
-
- RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
- .setClientId(clientId)
- .setServerId(server.getRaftPeerId())
- .setGroupId(server.getRaftGroupId())
- .setCallId(getRunCount().get())
- .setMessage(Message.valueOf(
- OMRatisHelper.convertRequestToByteString(omRequest)))
- .setType(RaftClientRequest.writeRequestType())
- .build();
-
- server.submitRequest(omRequest, raftClientRequest);
- } else {
- getOzoneManager().getOmServerProtocol()
- .submitRequest(null, omRequest);
- }
+ OzoneManagerRatisUtils.submitRequest(getOzoneManager(), omRequest, clientId, getRunCount().get());
} catch (ServiceException e) {
LOG.error("Snapshot deep cleaning request failed. " +
"Will retry at next run.", e);