HDDS-824. WriteStateMachineData timesout leading to Datanode crash. Contributed by Mukul Kumar Singh.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index bb1a804..270e164 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -116,8 +116,7 @@ public class ContainerStateMachine extends BaseStateMachine {
private final XceiverServerRatis ratisServer;
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
writeChunkFutureMap;
- private final ConcurrentHashMap<Long, CompletableFuture<Message>>
- createContainerFutureMap;
+ private final ConcurrentHashMap<Long, Message> createContainerResponseMap;
private ExecutorService[] executors;
private final int numExecutors;
private final Map<Long, Long> containerCommandCompletionMap;
@@ -137,7 +136,7 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
this.numExecutors = executors.size();
this.executors = executors.toArray(new ExecutorService[numExecutors]);
this.writeChunkFutureMap = new ConcurrentHashMap<>();
- this.createContainerFutureMap = new ConcurrentHashMap<>();
+ this.createContainerResponseMap = new ConcurrentHashMap<>();
containerCommandCompletionMap = new ConcurrentHashMap<>();
}
@@ -289,17 +288,8 @@ private ExecutorService getCommandExecutor(
private CompletableFuture<Message> handleWriteChunk(
ContainerCommandRequestProto requestProto, long entryIndex) {
final WriteChunkRequestProto write = requestProto.getWriteChunk();
- long containerID = write.getBlockID().getContainerID();
- CompletableFuture<Message> future =
- createContainerFutureMap.get(containerID);
- CompletableFuture<Message> writeChunkFuture;
- if (future != null) {
- writeChunkFuture = future.thenApplyAsync(
- v -> runCommand(requestProto), chunkExecutor);
- } else {
- writeChunkFuture = CompletableFuture.supplyAsync(
- () -> runCommand(requestProto), chunkExecutor);
- }
+ CompletableFuture<Message> writeChunkFuture = CompletableFuture.supplyAsync(
+ () -> runCommand(requestProto), chunkExecutor);
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID()
+ " logIndex " + entryIndex + " chunkName " + write.getChunkData()
@@ -319,9 +309,9 @@ private CompletableFuture<Message> handleWriteChunk(
private CompletableFuture<Message> handleCreateContainer(
ContainerCommandRequestProto requestProto) {
long containerID = requestProto.getContainerID();
- createContainerFutureMap.
- computeIfAbsent(containerID, k -> new CompletableFuture<>());
- return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
+ Message response = runCommand(requestProto);
+ createContainerResponseMap.put(containerID, response);
+ return CompletableFuture.completedFuture(response);
}
/*
@@ -508,6 +498,10 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
future = CompletableFuture
.supplyAsync(() -> runCommand(containerCommandRequestProto),
getCommandExecutor(requestProto));
+ } else if (cmdType == Type.CreateContainer) {
+ long containerID = requestProto.getContainerID();
+ return CompletableFuture.completedFuture(
+ createContainerResponseMap.get(containerID));
} else {
// Make sure that in write chunk, the user data is not set
if (cmdType == Type.WriteChunk) {
@@ -517,18 +511,6 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
future = CompletableFuture.supplyAsync(() -> runCommand(requestProto),
getCommandExecutor(requestProto));
}
- // Mark the createContainerFuture complete so that writeStateMachineData
- // for WriteChunk gets unblocked
- if (cmdType == Type.CreateContainer) {
- long containerID = requestProto.getContainerID();
- future.thenApply(
- r -> {
- createContainerFutureMap.remove(containerID).complete(null);
- LOG.info("create Container Transaction completed for container " +
- containerID + " log index " + index);
- return r;
- });
- }
future.thenAccept(m -> {
final Long previous =