HDDS-8073. Replace Usages of LegacyReplicationManager.MoveResult with MoveManager.MoveResult (#4339)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
index 72c1cda..61e0761 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
@@ -19,7 +19,6 @@
 
 package org.apache.hadoop.hdds.scm.container.balancer;
 
-import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager.MoveResult;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
@@ -131,8 +130,7 @@ public void incrementNumContainerMovesCompletedInLatestIteration(
   }
 
   public void incrementCurrentIterationContainerMoveMetric(
-      MoveResult result,
-      long valueToAdd) {
+      MoveManager.MoveResult result, long valueToAdd) {
     if (result == null) {
       return;
     }
@@ -145,9 +143,8 @@ public void incrementCurrentIterationContainerMoveMetric(
       this.numContainerMovesTimeoutInLatestIteration.incr(valueToAdd);
       break;
     // TODO: Add metrics for other errors that need to be tracked.
-    case FAIL_NOT_RUNNING:
+    case FAIL_LEADER_NOT_READY:
     case REPLICATION_FAIL_INFLIGHT_REPLICATION:
-    case FAIL_NOT_LEADER:
     case REPLICATION_FAIL_NOT_EXIST_IN_SOURCE:
     case REPLICATION_FAIL_EXIST_IN_TARGET:
     case REPLICATION_FAIL_CONTAINER_NOT_CLOSED:
@@ -157,10 +154,10 @@ public void incrementCurrentIterationContainerMoveMetric(
     case REPLICATION_FAIL_NODE_UNHEALTHY:
     case DELETION_FAIL_NODE_UNHEALTHY:
     case DELETE_FAIL_POLICY:
-    case PLACEMENT_POLICY_NOT_SATISFIED:
-    case UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION:
-    case UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION:
-    case FAIL_CAN_NOT_RECORD_TO_DB:
+    case REPLICATION_NOT_HEALTHY_BEFORE_MOVE:
+    case REPLICATION_NOT_HEALTHY_AFTER_MOVE:
+    case FAIL_CONTAINER_ALREADY_BEING_MOVED:
+    case FAIL_UNEXPECTED_ERROR:
     default:
       break;
     }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java
index 234ac78..fc1ddff 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerTask.java
@@ -27,7 +27,6 @@
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
-import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -110,8 +109,7 @@ public class ContainerBalancerTask implements Runnable {
   private Set<DatanodeDetails> selectedSources;
   private FindTargetStrategy findTargetStrategy;
   private FindSourceStrategy findSourceStrategy;
-  private Map<ContainerMoveSelection,
-      CompletableFuture<LegacyReplicationManager.MoveResult>>
+  private Map<ContainerMoveSelection, CompletableFuture<MoveManager.MoveResult>>
       moveSelectionToFutureMap;
   private IterationResult iterationResult;
   private int nextIterationIndex;
@@ -756,7 +754,7 @@ private boolean adaptOnReachingIterationLimits() {
   private boolean moveContainer(DatanodeDetails source,
                                 ContainerMoveSelection moveSelection) {
     ContainerID containerID = moveSelection.getContainerID();
-    CompletableFuture<LegacyReplicationManager.MoveResult> future;
+    CompletableFuture<MoveManager.MoveResult> future;
     try {
       ContainerInfo containerInfo = containerManager.getContainer(containerID);
       future = replicationManager
@@ -772,7 +770,7 @@ private boolean moveContainer(DatanodeDetails source,
                   moveSelection.getTargetNode().getUuidString(), ex);
               metrics.incrementNumContainerMovesFailedInLatestIteration(1);
             } else {
-              if (result == LegacyReplicationManager.MoveResult.COMPLETED) {
+              if (result == MoveManager.MoveResult.COMPLETED) {
                 sizeActuallyMovedInLatestIteration +=
                     containerInfo.getUsedBytes();
                 if (LOG.isDebugEnabled()) {
@@ -805,9 +803,9 @@ private boolean moveContainer(DatanodeDetails source,
       if (future.isCompletedExceptionally()) {
         return false;
       } else {
-        LegacyReplicationManager.MoveResult result = future.join();
+        MoveManager.MoveResult result = future.join();
         moveSelectionToFutureMap.put(moveSelection, future);
-        return result == LegacyReplicationManager.MoveResult.COMPLETED;
+        return result == MoveManager.MoveResult.COMPLETED;
       }
     } else {
       moveSelectionToFutureMap.put(moveSelection, future);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index c17452f..7cdd48c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -40,6 +40,7 @@
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport.HealthState;
+import org.apache.hadoop.hdds.scm.container.balancer.MoveManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -247,63 +248,12 @@ List<DatanodeDetails> getDatanodeDetails(ContainerID id) {
    */
   private final InflightMap inflightDeletion;
 
-
-  /**
-   * This is used for indicating the result of move option and
-   * the corresponding reason. this is useful for tracking
-   * the result of move option
-   */
-  public enum MoveResult {
-    // both replication and deletion are completed
-    COMPLETED,
-    // RM is not running
-    FAIL_NOT_RUNNING,
-    // RM is not ratis leader
-    FAIL_NOT_LEADER,
-    // replication fail because the container does not exist in src
-    REPLICATION_FAIL_NOT_EXIST_IN_SOURCE,
-    // replication fail because the container exists in target
-    REPLICATION_FAIL_EXIST_IN_TARGET,
-    // replication fail because the container is not cloesed
-    REPLICATION_FAIL_CONTAINER_NOT_CLOSED,
-    // replication fail because the container is in inflightDeletion
-    REPLICATION_FAIL_INFLIGHT_DELETION,
-    // replication fail because the container is in inflightReplication
-    REPLICATION_FAIL_INFLIGHT_REPLICATION,
-    // replication fail because of timeout
-    REPLICATION_FAIL_TIME_OUT,
-    // replication fail because of node is not in service
-    REPLICATION_FAIL_NODE_NOT_IN_SERVICE,
-    // replication fail because node is unhealthy
-    REPLICATION_FAIL_NODE_UNHEALTHY,
-    // deletion fail because of node is not in service
-    DELETION_FAIL_NODE_NOT_IN_SERVICE,
-    // replication succeed, but deletion fail because of timeout
-    DELETION_FAIL_TIME_OUT,
-    // replication succeed, but deletion fail because because
-    // node is unhealthy
-    DELETION_FAIL_NODE_UNHEALTHY,
-    // replication succeed, but if we delete the container from
-    // the source datanode , the policy(eg, replica num or
-    // rack location) will not be satisfied, so we should not delete
-    // the container
-    DELETE_FAIL_POLICY,
-    //  replicas + target - src does not satisfy placement policy
-    PLACEMENT_POLICY_NOT_SATISFIED,
-    //unexpected action, remove src at inflightReplication
-    UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION,
-    //unexpected action, remove target at inflightDeletion
-    UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION,
-    //write DB error
-    FAIL_CAN_NOT_RECORD_TO_DB
-  }
-
   /**
    * This is used for tracking container move commands
    * which are not yet complete.
    */
   private final Map<ContainerID,
-      CompletableFuture<MoveResult>> inflightMoveFuture;
+      CompletableFuture<MoveManager.MoveResult>> inflightMoveFuture;
 
   /**
    * ReplicationManager specific configuration.
@@ -698,14 +648,20 @@ private void updateMoveIfNeeded(final boolean isUnhealthy,
       //but inflightMoveFuture not. so there will be a case that
       //container is in inflightMove, but not in inflightMoveFuture.
       compleleteMoveFutureWithResult(id,
-          MoveResult.UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION);
+          MoveManager.MoveResult.FAIL_UNEXPECTED_ERROR);
+      LOG.info("Move failed because replication for container {} " +
+              "unexpectedly happened at the source {}, not the target {}.",
+          container, kv.getSrc().getUuidString(), kv.getTgt().getUuidString());
       moveScheduler.completeMove(id.getProtobuf());
       return;
     }
 
     if (isTarget && !isInflightReplication) {
       compleleteMoveFutureWithResult(id,
-          MoveResult.UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION);
+          MoveManager.MoveResult.FAIL_UNEXPECTED_ERROR);
+      LOG.info("Move failed because deletion for container {} unexpectedly " +
+              "happened at the target {}, not the source {}.", container,
+          kv.getTgt().getUuidString(), kv.getSrc().getUuidString());
       moveScheduler.completeMove(id.getProtobuf());
       return;
     }
@@ -714,27 +670,26 @@ private void updateMoveIfNeeded(final boolean isUnhealthy,
       if (isInflightReplication) {
         if (isUnhealthy) {
           compleleteMoveFutureWithResult(id,
-              MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+              MoveManager.MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
         } else if (isNotInService) {
           compleleteMoveFutureWithResult(id,
-              MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+              MoveManager.MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
         } else {
           compleleteMoveFutureWithResult(id,
-              MoveResult.REPLICATION_FAIL_TIME_OUT);
+              MoveManager.MoveResult.REPLICATION_FAIL_TIME_OUT);
         }
       } else {
         if (isUnhealthy) {
           compleleteMoveFutureWithResult(id,
-              MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+              MoveManager.MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
         } else if (isTimeout) {
           compleleteMoveFutureWithResult(id,
-              MoveResult.DELETION_FAIL_TIME_OUT);
+              MoveManager.MoveResult.DELETION_FAIL_TIME_OUT);
         } else if (isNotInService) {
           compleleteMoveFutureWithResult(id,
-              MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
+              MoveManager.MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
         } else {
-          compleleteMoveFutureWithResult(id,
-              MoveResult.COMPLETED);
+          compleleteMoveFutureWithResult(id, MoveManager.MoveResult.COMPLETED);
         }
       }
       moveScheduler.completeMove(id.getProtobuf());
@@ -751,7 +706,7 @@ private void updateMoveIfNeeded(final boolean isUnhealthy,
    * @param src source datanode
    * @param tgt target datanode
    */
-  public CompletableFuture<MoveResult> move(ContainerID cid,
+  public CompletableFuture<MoveManager.MoveResult> move(ContainerID cid,
              DatanodeDetails src, DatanodeDetails tgt)
       throws ContainerNotFoundException, NodeNotFoundException,
       TimeoutException {
@@ -764,13 +719,13 @@ public CompletableFuture<MoveResult> move(ContainerID cid,
    * @param cid Container to move
    * @param mp MoveDataNodePair which contains source and target datanodes
    */
-  private CompletableFuture<MoveResult> move(ContainerID cid,
+  private CompletableFuture<MoveManager.MoveResult> move(ContainerID cid,
       MoveDataNodePair mp) throws ContainerNotFoundException,
       NodeNotFoundException, TimeoutException {
-    CompletableFuture<MoveResult> ret = new CompletableFuture<>();
+    CompletableFuture<MoveManager.MoveResult> ret = new CompletableFuture<>();
 
     if (!scmContext.isLeader()) {
-      ret.complete(MoveResult.FAIL_NOT_LEADER);
+      ret.complete(MoveManager.MoveResult.FAIL_LEADER_NOT_READY);
       return ret;
     }
 
@@ -798,11 +753,15 @@ private CompletableFuture<MoveResult> move(ContainerID cid,
     NodeOperationalState operationalState =
         currentNodeStat.getOperationalState();
     if (healthStat != NodeState.HEALTHY) {
-      ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+      ret.complete(MoveManager.MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+      LOG.info("Failing move for container {} because source {} is {}", cid,
+          srcDn.getUuidString(), healthStat.toString());
       return ret;
     }
     if (operationalState != NodeOperationalState.IN_SERVICE) {
-      ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+      ret.complete(MoveManager.MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+      LOG.info("Failing move for container {} because source {} is {}", cid,
+          srcDn.getUuidString(), operationalState.toString());
       return ret;
     }
 
@@ -810,11 +769,15 @@ private CompletableFuture<MoveResult> move(ContainerID cid,
     healthStat = currentNodeStat.getHealth();
     operationalState = currentNodeStat.getOperationalState();
     if (healthStat != NodeState.HEALTHY) {
-      ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+      ret.complete(MoveManager.MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+      LOG.info("Failing move for container {} because target {} is {}", cid,
+          targetDn.getUuidString(), healthStat.toString());
       return ret;
     }
     if (operationalState != NodeOperationalState.IN_SERVICE) {
-      ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+      ret.complete(MoveManager.MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+      LOG.info("Failing move for container {} because target {} is {}", cid,
+          targetDn.getUuidString(), operationalState.toString());
       return ret;
     }
 
@@ -829,11 +792,12 @@ private CompletableFuture<MoveResult> move(ContainerID cid,
             .map(ContainerReplica::getDatanodeDetails)
             .collect(Collectors.toSet());
       if (replicas.contains(targetDn)) {
-        ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+        ret.complete(MoveManager.MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
         return ret;
       }
       if (!replicas.contains(srcDn)) {
-        ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+        ret.complete(
+            MoveManager.MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
         return ret;
       }
 
@@ -846,11 +810,12 @@ private CompletableFuture<MoveResult> move(ContainerID cid,
       * */
 
       if (inflightReplication.containsKey(cid)) {
-        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+        ret.complete(
+            MoveManager.MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
         return ret;
       }
       if (inflightDeletion.containsKey(cid)) {
-        ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+        ret.complete(MoveManager.MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
         return ret;
       }
 
@@ -865,7 +830,8 @@ private CompletableFuture<MoveResult> move(ContainerID cid,
 
       LifeCycleState currentContainerStat = cif.getState();
       if (currentContainerStat != LifeCycleState.CLOSED) {
-        ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+        ret.complete(
+            MoveManager.MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
         return ret;
       }
 
@@ -873,7 +839,7 @@ private CompletableFuture<MoveResult> move(ContainerID cid,
       // satisfies current placement policy
       if (!isPolicySatisfiedAfterMove(cif, srcDn, targetDn,
           new ArrayList<>(currentReplicas))) {
-        ret.complete(MoveResult.PLACEMENT_POLICY_NOT_SATISFIED);
+        ret.complete(MoveManager.MoveResult.REPLICATION_NOT_HEALTHY_AFTER_MOVE);
         return ret;
       }
 
@@ -881,8 +847,8 @@ private CompletableFuture<MoveResult> move(ContainerID cid,
         moveScheduler.startMove(cid.getProtobuf(),
             mp.getProtobufMessage(ClientVersion.CURRENT_VERSION));
       } catch (IOException e) {
-        LOG.warn("Exception while starting move {}", cid);
-        ret.complete(MoveResult.FAIL_CAN_NOT_RECORD_TO_DB);
+        LOG.warn("Exception while starting move for container {}", cid, e);
+        ret.complete(MoveManager.MoveResult.FAIL_UNEXPECTED_ERROR);
         return ret;
       }
 
@@ -1359,7 +1325,7 @@ private void deleteSrcDnForMove(final ContainerInfo cif,
         .anyMatch(r -> r.getDatanodeDetails().equals(srcDn))) {
       // if the target is present but source disappears somehow,
       // we can consider move is successful.
-      compleleteMoveFutureWithResult(cid, MoveResult.COMPLETED);
+      compleleteMoveFutureWithResult(cid, MoveManager.MoveResult.COMPLETED);
       moveScheduler.completeMove(cid.getProtobuf());
       return;
     }
@@ -1383,7 +1349,8 @@ private void deleteSrcDnForMove(final ContainerInfo cif,
       // we just complete the future without sending a delete command.
       LOG.info("can not remove source replica after successfully " +
           "replicated to target datanode");
-      compleleteMoveFutureWithResult(cid, MoveResult.DELETE_FAIL_POLICY);
+      compleleteMoveFutureWithResult(cid,
+          MoveManager.MoveResult.DELETE_FAIL_POLICY);
       moveScheduler.completeMove(cid.getProtobuf());
     }
   }
@@ -1695,7 +1662,8 @@ DatanodeDetails getFirstDatanode(InflightType type, ContainerID id) {
     return getInflightMap(type).get(id).get(0).getDatanode();
   }
 
-  public Map<ContainerID, CompletableFuture<MoveResult>> getInflightMove() {
+  public Map<ContainerID, CompletableFuture<MoveManager.MoveResult>>
+      getInflightMove() {
     return inflightMoveFuture;
   }
 
@@ -1939,9 +1907,10 @@ private void onLeaderReadyAndOutOfSafeMode() {
 
   /**
    * complete the CompletableFuture of the container in the given Map with
-   * a given MoveResult.
+   * the given MoveManager.MoveResult.
    */
-  private void compleleteMoveFutureWithResult(ContainerID cid, MoveResult mr) {
+  private void compleleteMoveFutureWithResult(ContainerID cid,
+      MoveManager.MoveResult mr) {
     if (inflightMoveFuture.containsKey(cid)) {
       inflightMoveFuture.get(cid).complete(mr);
       inflightMoveFuture.remove(cid);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index cfb6aa7..9833d7a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -39,6 +39,7 @@
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.balancer.MoveManager;
 import org.apache.hadoop.hdds.scm.container.replication.health.MismatchedReplicasHandler;
 import org.apache.hadoop.hdds.scm.container.replication.health.ClosedWithUnhealthyReplicasHandler;
 import org.apache.hadoop.hdds.scm.container.replication.health.ClosingContainerHandler;
@@ -1127,22 +1128,23 @@ public ReplicationManagerConfiguration getConfig() {
   /**
   * following functions will be refactored in a separate jira.
   */
-  public CompletableFuture<LegacyReplicationManager.MoveResult> move(
+  public CompletableFuture<MoveManager.MoveResult> move(
       ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
       throws NodeNotFoundException, ContainerNotFoundException,
       TimeoutException {
-    CompletableFuture<LegacyReplicationManager.MoveResult> ret =
+    CompletableFuture<MoveManager.MoveResult> ret =
         new CompletableFuture<>();
     if (!isRunning()) {
-      ret.complete(LegacyReplicationManager.MoveResult.FAIL_NOT_RUNNING);
+      ret.complete(MoveManager.MoveResult.FAIL_UNEXPECTED_ERROR);
+      LOG.warn("Failing move because Replication Monitor thread's " +
+          "running state is {}", isRunning());
       return ret;
     }
 
     return legacyReplicationManager.move(cid, src, tgt);
   }
 
-  public Map<ContainerID,
-      CompletableFuture<LegacyReplicationManager.MoveResult>>
+  public Map<ContainerID, CompletableFuture<MoveManager.MoveResult>>
       getInflightMove() {
     return legacyReplicationManager.getInflightMove();
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java
index fb40561..1435bc3 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancerTask.java
@@ -37,7 +37,6 @@
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
-import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager.MoveResult;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMService;
@@ -156,7 +155,8 @@ public void setup() throws IOException, NodeNotFoundException,
     Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
         Mockito.any(DatanodeDetails.class),
         Mockito.any(DatanodeDetails.class)))
-        .thenReturn(CompletableFuture.completedFuture(MoveResult.COMPLETED));
+        .thenReturn(CompletableFuture.
+            completedFuture(MoveManager.MoveResult.COMPLETED));
 
     when(containerManager.getContainerReplicas(Mockito.any(ContainerID.class)))
         .thenAnswer(invocationOnMock -> {
@@ -721,7 +721,7 @@ public void checkIterationResult()
             Mockito.any(DatanodeDetails.class),
             Mockito.any(DatanodeDetails.class)))
         .thenReturn(CompletableFuture.completedFuture(
-            MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY));
+            MoveManager.MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY));
     balancerConfiguration.setMaxSizeToMovePerIteration(10 * STORAGE_UNIT);
 
     startBalancer(balancerConfiguration);
@@ -774,12 +774,12 @@ public void checkIterationResultTimeoutFromReplicationManager()
       throws NodeNotFoundException, IOException,
       IllegalContainerBalancerStateException,
       InvalidContainerBalancerConfigurationException, TimeoutException {
-    CompletableFuture<MoveResult> future
+    CompletableFuture<MoveManager.MoveResult> future
         = CompletableFuture.supplyAsync(() ->
-        MoveResult.REPLICATION_FAIL_TIME_OUT);
-    CompletableFuture<MoveResult> future2
+        MoveManager.MoveResult.REPLICATION_FAIL_TIME_OUT);
+    CompletableFuture<MoveManager.MoveResult> future2
         = CompletableFuture.supplyAsync(() ->
-        MoveResult.DELETION_FAIL_TIME_OUT);
+        MoveManager.MoveResult.DELETION_FAIL_TIME_OUT);
     Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
             Mockito.any(DatanodeDetails.class),
             Mockito.any(DatanodeDetails.class)))
@@ -806,14 +806,15 @@ public void checkIterationResultException()
       InvalidContainerBalancerConfigurationException,
       TimeoutException {
 
-    CompletableFuture<MoveResult> f = new CompletableFuture();
-    f.completeExceptionally(new RuntimeException("Runtime Exception"));
+    CompletableFuture<MoveManager.MoveResult> future =
+        new CompletableFuture<>();
+    future.completeExceptionally(new RuntimeException("Runtime Exception"));
     Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
             Mockito.any(DatanodeDetails.class),
             Mockito.any(DatanodeDetails.class)))
         .thenThrow(new ContainerNotFoundException("Test Container not found"),
             new NodeNotFoundException("Test Node not found"))
-        .thenReturn(f).thenReturn(CompletableFuture.supplyAsync(() -> {
+        .thenReturn(future).thenReturn(CompletableFuture.supplyAsync(() -> {
           try {
             Thread.sleep(200);
           } catch (Exception ex) {
@@ -1025,7 +1026,7 @@ private void stopBalancer() {
     // do nothing as testcase is not threaded
   }
 
-  private CompletableFuture<MoveResult>
+  private CompletableFuture<MoveManager.MoveResult>
       genCompletableFuture(int sleepMilSec) {
     return CompletableFuture.supplyAsync(() -> {
       try {
@@ -1033,7 +1034,7 @@ private void stopBalancer() {
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
-      return MoveResult.COMPLETED;
+      return MoveManager.MoveResult.COMPLETED;
     });
   }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
index 21d0045..fa1e474 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
@@ -41,12 +41,12 @@
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
 import org.apache.hadoop.hdds.scm.container.SimpleMockNodeManager;
 import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
+import org.apache.hadoop.hdds.scm.container.balancer.MoveManager;
 import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager.LegacyReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
-import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager.MoveResult;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -1849,7 +1849,7 @@ public void testMove() throws IOException, NodeNotFoundException,
       addReplica(container,
               new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
       DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
-      CompletableFuture<MoveResult> cf =
+      CompletableFuture<MoveManager.MoveResult> cf =
               replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
       Assertions.assertTrue(scmLogs.getOutput().contains(
               "receive a move request about container"));
@@ -1874,7 +1874,8 @@ public void testMove() throws IOException, NodeNotFoundException,
       replicationManager.processAll();
       eventQueue.processAll(1000);
 
-      Assertions.assertTrue(cf.isDone() && cf.get() == MoveResult.COMPLETED);
+      Assertions.assertTrue(
+          cf.isDone() && cf.get() == MoveManager.MoveResult.COMPLETED);
     }
 
     /**
@@ -1989,7 +1990,7 @@ public void testMoveNotDeleteSrcIfPolicyNotSatisfied()
       addReplica(container,
               new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
       DatanodeDetails dn4 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
-      CompletableFuture<MoveResult> cf =
+      CompletableFuture<MoveManager.MoveResult> cf =
               replicationManager.move(id, dn1.getDatanodeDetails(), dn4);
       Assertions.assertTrue(scmLogs.getOutput().contains(
               "receive a move request about container"));
@@ -2014,7 +2015,7 @@ public void testMoveNotDeleteSrcIfPolicyNotSatisfied()
               dn1.getDatanodeDetails()));
 
       Assertions.assertTrue(cf.isDone() &&
-              cf.get() == MoveResult.DELETE_FAIL_POLICY);
+              cf.get() == MoveManager.MoveResult.DELETE_FAIL_POLICY);
     }
 
 
@@ -2034,7 +2035,7 @@ public void testDnBecameUnhealthyWhenMoving() throws IOException,
       addReplica(container,
               new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
       DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
-      CompletableFuture<MoveResult> cf =
+      CompletableFuture<MoveManager.MoveResult> cf =
               replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
       Assertions.assertTrue(scmLogs.getOutput().contains(
               "receive a move request about container"));
@@ -2044,7 +2045,7 @@ public void testDnBecameUnhealthyWhenMoving() throws IOException,
       eventQueue.processAll(1000);
 
       Assertions.assertTrue(cf.isDone() && cf.get() ==
-              MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+              MoveManager.MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
 
       nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
       cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
@@ -2057,7 +2058,7 @@ public void testDnBecameUnhealthyWhenMoving() throws IOException,
       eventQueue.processAll(1000);
 
       Assertions.assertTrue(cf.isDone() && cf.get() ==
-              MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
+              MoveManager.MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
     }
 
     /**
@@ -2079,7 +2080,7 @@ public void testMovePrerequisites() throws IOException,
       ContainerReplica dn4 = addReplica(container,
               new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
 
-      CompletableFuture<MoveResult> cf;
+      CompletableFuture<MoveManager.MoveResult> cf;
       //the above move is executed successfully, so there may be some item in
       //inflightReplication or inflightDeletion. here we stop replication
       // manager to clear these states, which may impact the tests below.
@@ -2088,26 +2089,26 @@ public void testMovePrerequisites() throws IOException,
       Thread.sleep(100L);
       cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
       Assertions.assertTrue(cf.isDone() && cf.get() ==
-              MoveResult.FAIL_NOT_RUNNING);
+              MoveManager.MoveResult.FAIL_UNEXPECTED_ERROR);
       replicationManager.start();
       Thread.sleep(100L);
 
       //container in not in OPEN state
       cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
       Assertions.assertTrue(cf.isDone() && cf.get() ==
-              MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+              MoveManager.MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
       //open -> closing
       containerStateManager.updateContainerState(id.getProtobuf(),
               LifeCycleEvent.FINALIZE);
       cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
       Assertions.assertTrue(cf.isDone() && cf.get() ==
-              MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+              MoveManager.MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
       //closing -> quasi_closed
       containerStateManager.updateContainerState(id.getProtobuf(),
               LifeCycleEvent.QUASI_CLOSE);
       cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
       Assertions.assertTrue(cf.isDone() && cf.get() ==
-              MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+              MoveManager.MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
 
       //quasi_closed -> closed
       containerStateManager.updateContainerState(id.getProtobuf(),
@@ -2122,10 +2123,10 @@ public void testMovePrerequisites() throws IOException,
                   new NodeStatus(IN_SERVICE, state));
           cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
           Assertions.assertTrue(cf.isDone() && cf.get() ==
-                  MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+                  MoveManager.MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
           cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
           Assertions.assertTrue(cf.isDone() && cf.get() ==
-                  MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
+                  MoveManager.MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
         }
       }
       nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
@@ -2138,10 +2139,10 @@ public void testMovePrerequisites() throws IOException,
                   new NodeStatus(state, HEALTHY));
           cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
           Assertions.assertTrue(cf.isDone() && cf.get() ==
-                  MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+                  MoveManager.MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
           cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
           Assertions.assertTrue(cf.isDone() && cf.get() ==
-                  MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
+                  MoveManager.MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
         }
       }
       nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
@@ -2150,12 +2151,12 @@ public void testMovePrerequisites() throws IOException,
       cf = replicationManager.move(id, dn1.getDatanodeDetails(),
               dn2.getDatanodeDetails());
       Assertions.assertTrue(cf.isDone() && cf.get() ==
-              MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
+              MoveManager.MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
 
       //container does not exist in source datanode
       cf = replicationManager.move(id, dn3, dn3);
       Assertions.assertTrue(cf.isDone() && cf.get() ==
-              MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
+              MoveManager.MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
 
       //make container over relplicated to test the
       // case that container is in inflightDeletion
@@ -2166,7 +2167,7 @@ public void testMovePrerequisites() throws IOException,
       eventQueue.processAll(1000);
       cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
       Assertions.assertTrue(cf.isDone() && cf.get() ==
-              MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
+              MoveManager.MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
       resetReplicationManager();
 
       //make the replica num be 2 to test the case
@@ -2179,7 +2180,7 @@ public void testMovePrerequisites() throws IOException,
       eventQueue.processAll(1000);
       cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
       Assertions.assertTrue(cf.isDone() && cf.get() ==
-              MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
+              MoveManager.MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
     }
   }