Merge remote-tracking branch 'origin/master' into HDDS-2939
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 370292f..7492a3a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -449,6 +449,11 @@
   public static final long OZONE_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
       TimeUnit.DAYS.toMillis(10); // 10 days
 
+  public static final String OZONE_CLIENT_KEY_LATEST_VERSION_LOCATION =
+      "ozone.client.key.latest.version.location";
+  public static final boolean OZONE_CLIENT_KEY_LATEST_VERSION_LOCATION_DEFAULT =
+      true;
+
   /**
    * There is no need to instantiate this class.
    */
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2dea94e..81d174c 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2849,7 +2849,6 @@
     </description>
   </property>
 
-
   <property>
     <name>ozone.scm.ca.list.retry.interval</name>
     <tag>OZONE, SCM, OM, DATANODE</tag>
@@ -2864,6 +2863,14 @@
   </property>
 
   <property>
+    <name>ozone.client.key.latest.version.location</name>
+    <tag>OZONE, CLIENT</tag>
+    <value>true</value>
+    <description>Ozone client gets the latest version location.
+    </description>
+  </property>
+
+  <property>
     <name>ozone.om.metadata.layout</name>
     <tag>OZONE, OM</tag>
     <value>SIMPLE</value>
@@ -2894,5 +2901,4 @@
       directory deleting service per time interval.
     </description>
   </property>
-
 </configuration>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
index e6b4106..8d4820e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java
@@ -22,6 +22,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 
@@ -80,7 +81,12 @@
    */
   private void publishReport() {
     try {
-      context.addReport(getReport());
+      GeneratedMessage report = getReport();
+      if (report instanceof CommandStatusReportsProto) {
+        context.addIncrementalReport(report);
+      } else {
+        context.refreshFullReport(report);
+      }
     } catch (IOException e) {
       LOG.error("Exception while publishing report.", e);
     }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 5c3aaa9..ca73468 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -40,7 +41,6 @@
 import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
 import com.google.protobuf.Descriptors.Descriptor;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
@@ -63,10 +63,11 @@
 import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import static java.lang.Math.min;
-import org.apache.commons.collections.CollectionUtils;
-
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.getLogWarnInterval;
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval;
+
+import org.apache.commons.collections.CollectionUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,10 +94,6 @@
   @VisibleForTesting
   static final String CRL_STATUS_REPORT_PROTO_NAME =
       CRLStatusReport.getDescriptor().getFullName();
-  // Accepted types of reports that can be queued to incrementalReportsQueue
-  private static final Set<String> ACCEPTED_INCREMENTAL_REPORT_TYPE_SET =
-      Sets.newHashSet(COMMAND_STATUS_REPORTS_PROTO_NAME,
-          INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);
 
   static final Logger LOG =
       LoggerFactory.getLogger(StateContext.class);
@@ -121,6 +118,14 @@
   private boolean shutdownOnError = false;
   private boolean shutdownGracefully = false;
   private final AtomicLong threadPoolNotAvailableCount;
+  // Endpoint -> ReportType -> Boolean of whether the full report should be
+  //  queued in getFullReports call.
+  private final Map<InetSocketAddress,
+      Map<String, AtomicBoolean>> fullReportSendIndicator;
+  // List of supported full report types.
+  private final List<String> fullReportTypeList;
+  // ReportType -> Report.
+  private final Map<String, AtomicReference<GeneratedMessage>> type2Reports;
 
   /**
    * term of latest leader SCM, extract from SCMCommand.
@@ -167,6 +172,24 @@
     lock = new ReentrantLock();
     stateExecutionCount = new AtomicLong(0);
     threadPoolNotAvailableCount = new AtomicLong(0);
+    fullReportSendIndicator = new HashMap<>();
+    fullReportTypeList = new ArrayList<>();
+    type2Reports = new HashMap<>();
+    initReportTypeCollection();
+  }
+
+  /**
+   * init related ReportType Collections.
+   */
+  private void initReportTypeCollection(){
+    fullReportTypeList.add(CONTAINER_REPORTS_PROTO_NAME);
+    type2Reports.put(CONTAINER_REPORTS_PROTO_NAME, containerReports);
+    fullReportTypeList.add(NODE_REPORT_PROTO_NAME);
+    type2Reports.put(NODE_REPORT_PROTO_NAME, nodeReport);
+    fullReportTypeList.add(PIPELINE_REPORTS_PROTO_NAME);
+    type2Reports.put(PIPELINE_REPORTS_PROTO_NAME, pipelineReports);
+    fullReportTypeList.add(CRL_STATUS_REPORT_PROTO_NAME);
+    type2Reports.put(CRL_STATUS_REPORT_PROTO_NAME, crlStatusReport);
   }
 
   /**
@@ -254,7 +277,7 @@
    *
    * @param report report to be added
    */
-  public void addReport(GeneratedMessage report) {
+  public void addIncrementalReport(GeneratedMessage report) {
     if (report == null) {
       return;
     }
@@ -262,23 +285,38 @@
     Preconditions.checkState(descriptor != null);
     final String reportType = descriptor.getFullName();
     Preconditions.checkState(reportType != null);
-    if (reportType.equals(CONTAINER_REPORTS_PROTO_NAME)) {
-      containerReports.set(report);
-    } else if (reportType.equals(NODE_REPORT_PROTO_NAME)) {
-      nodeReport.set(report);
-    } else if (reportType.equals(PIPELINE_REPORTS_PROTO_NAME)) {
-      pipelineReports.set(report);
-    } else if (ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
-      synchronized (incrementalReportsQueue) {
-        for (InetSocketAddress endpoint : endpoints) {
-          incrementalReportsQueue.get(endpoint).add(report);
-        }
+    // in some case, we want to add a fullReportType message
+    // as an incremental message.
+    // see XceiverServerRatis#sendPipelineReport
+    synchronized (incrementalReportsQueue) {
+      for (InetSocketAddress endpoint : endpoints) {
+        incrementalReportsQueue.get(endpoint).add(report);
       }
-    } else if(reportType.equals(CRL_STATUS_REPORT_PROTO_NAME)) {
-      crlStatusReport.set(report);
-    } else {
+    }
+  }
+
+  /**
+   * refresh Full report.
+   *
+   * @param report report to be refreshed
+   */
+  public void refreshFullReport(GeneratedMessage report) {
+    if (report == null) {
+      return;
+    }
+    final Descriptor descriptor = report.getDescriptorForType();
+    Preconditions.checkState(descriptor != null);
+    final String reportType = descriptor.getFullName();
+    Preconditions.checkState(reportType != null);
+    if (!fullReportTypeList.contains(reportType)) {
       throw new IllegalArgumentException(
-          "Unidentified report message type: " + reportType);
+          "not full report message type: " + reportType);
+    }
+    type2Reports.get(reportType).set(report);
+    if (fullReportSendIndicator != null) {
+      for (Map<String, AtomicBoolean> mp : fullReportSendIndicator.values()) {
+        mp.get(reportType).set(true);
+      }
     }
   }
 
@@ -301,10 +339,6 @@
       Preconditions.checkState(descriptor != null);
       final String reportType = descriptor.getFullName();
       Preconditions.checkState(reportType != null);
-      if (!ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
-        throw new IllegalArgumentException(
-            "Unaccepted report message type: " + reportType);
-      }
     }
     synchronized (incrementalReportsQueue) {
       if (incrementalReportsQueue.containsKey(endpoint)){
@@ -340,23 +374,27 @@
     return reportsToReturn;
   }
 
-  List<GeneratedMessage> getNonIncrementalReports() {
+  List<GeneratedMessage> getFullReports(
+      InetSocketAddress endpoint) {
+    Map<String, AtomicBoolean> mp = fullReportSendIndicator.get(endpoint);
     List<GeneratedMessage> nonIncrementalReports = new LinkedList<>();
-    GeneratedMessage report = containerReports.get();
-    if (report != null) {
-      nonIncrementalReports.add(report);
-    }
-    report = nodeReport.get();
-    if (report != null) {
-      nonIncrementalReports.add(report);
-    }
-    report = pipelineReports.get();
-    if (report != null) {
-      nonIncrementalReports.add(report);
-    }
-    report = crlStatusReport.get();
-    if (report != null) {
-      nonIncrementalReports.add(report);
+    if (null != mp){
+      for (Map.Entry<String, AtomicBoolean> kv : mp.entrySet()) {
+        if (kv.getValue().get()) {
+          String reportType = kv.getKey();
+          final AtomicReference<GeneratedMessage> ref =
+              type2Reports.get(reportType);
+          if (ref == null) {
+            throw new RuntimeException(reportType + " is not a valid full "
+                + "report type!");
+          }
+          final GeneratedMessage msg = ref.get();
+          if (msg != null) {
+            nonIncrementalReports.add(msg);
+            mp.get(reportType).set(false);
+          }
+        }
+      }
     }
     return nonIncrementalReports;
   }
@@ -372,7 +410,7 @@
     if (maxLimit < 0) {
       throw new IllegalArgumentException("Illegal maxLimit value: " + maxLimit);
     }
-    List<GeneratedMessage> reports = getNonIncrementalReports();
+    List<GeneratedMessage> reports = getFullReports(endpoint);
     if (maxLimit <= reports.size()) {
       return reports.subList(0, maxLimit);
     } else {
@@ -800,6 +838,11 @@
       this.containerActions.put(endpoint, new LinkedList<>());
       this.pipelineActions.put(endpoint, new LinkedList<>());
       this.incrementalReportsQueue.put(endpoint, new LinkedList<>());
+      Map<String, AtomicBoolean> mp = new HashMap<>();
+      fullReportTypeList.forEach(e -> {
+        mp.putIfAbsent(e, new AtomicBoolean(true));
+      });
+      this.fullReportSendIndicator.putIfAbsent(endpoint, mp);
     }
   }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 60b7978..cb65e37 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -150,7 +150,7 @@
     } catch (IOException ex) {
       Preconditions.checkState(requestBuilder != null);
       // put back the reports which failed to be sent
-      putBackReports(requestBuilder);
+      putBackIncrementalReports(requestBuilder);
       rpcEndpoint.logIfNeeded(ex);
     } finally {
       rpcEndpoint.unlock();
@@ -159,7 +159,8 @@
   }
 
   // TODO: Make it generic.
-  private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
+  private void putBackIncrementalReports(
+      SCMHeartbeatRequestProto.Builder requestBuilder) {
     List<GeneratedMessage> reports = new LinkedList<>();
     // We only put back CommandStatusReports and IncrementalContainerReport
     // because those are incremental. Container/Node/PipelineReport are
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 6fd2706..3a2aec9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -917,7 +917,8 @@
   private void sendPipelineReport() {
     if (context !=  null) {
       // TODO: Send IncrementalPipelineReport instead of full PipelineReport
-      context.addReport(context.getParent().getContainer().getPipelineReport());
+      context.addIncrementalReport(
+          context.getParent().getContainer().getPipelineReport());
       context.getParent().triggerHeartbeat();
     }
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 736e3d6..2312c0b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -130,7 +130,7 @@
           .newBuilder()
           .addReport(containerReplicaProto)
           .build();
-      context.addReport(icr);
+      context.addIncrementalReport(icr);
       context.getParent().triggerHeartbeat();
     };
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index c97d703..83e44d3 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -133,7 +133,7 @@
     Thread.sleep(150);
     executorService.shutdown();
     Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
-    verify(dummyContext, times(1)).addReport(null);
+    verify(dummyContext, times(1)).refreshFullReport(null);
     // After executor shutdown, no new reports should be published
     Thread.sleep(100);
     Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index d152f4d..6d0ad16 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -27,7 +27,6 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -37,6 +36,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
@@ -106,35 +106,7 @@
     // getReports dequeues incremental reports
     expectedReportCount.clear();
 
-    // Case 2: Attempt to put back a full report
-
-    try {
-      ctx.putBackReports(Collections.singletonList(
-          newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME)), scm1);
-      fail("Should throw exception when putting back unaccepted reports!");
-    } catch (IllegalArgumentException ignored) {
-    }
-    try {
-      ctx.putBackReports(Collections.singletonList(
-          newMockReport(StateContext.NODE_REPORT_PROTO_NAME)), scm2);
-      fail("Should throw exception when putting back unaccepted reports!");
-    } catch (IllegalArgumentException ignored) {
-    }
-    try {
-      ctx.putBackReports(Collections.singletonList(
-          newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME)), scm1);
-      fail("Should throw exception when putting back unaccepted reports!");
-    } catch (IllegalArgumentException ignored) {
-    }
-    try {
-      ctx.putBackReports(Collections.singletonList(
-          newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME)), scm1);
-      fail("Should throw exception when putting back unaccepted reports!");
-    } catch (IllegalArgumentException ignored) {
-    }
-
-    // Case 3: Put back mixed types of incremental reports
-
+    // Case 2: Put back mixed types of incremental reports
     ctx.putBackReports(Arrays.asList(
         newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME),
         newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME),
@@ -152,31 +124,6 @@
     checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
     // getReports dequeues incremental reports
     expectedReportCount.clear();
-
-    // Case 4: Attempt to put back mixed types of full reports
-
-    try {
-      ctx.putBackReports(Arrays.asList(
-          newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
-          newMockReport(StateContext.NODE_REPORT_PROTO_NAME),
-          newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME),
-          newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME)
-      ), scm1);
-      fail("Should throw exception when putting back unaccepted reports!");
-    } catch (IllegalArgumentException ignored) {
-    }
-
-    // Case 5: Attempt to put back mixed full and incremental reports
-
-    try {
-      ctx.putBackReports(Arrays.asList(
-          newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
-          newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME),
-          newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME)
-      ), scm2);
-      fail("Should throw exception when putting back unaccepted reports!");
-    } catch (IllegalArgumentException ignored) {
-    }
   }
 
   @Test
@@ -197,8 +144,48 @@
 
     Map<String, Integer> expectedReportCount = new HashMap<>();
 
+    // Add a bunch of ContainerReports
+    batchRefreshfullReports(ctx,
+        StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
+    // Should only keep the latest one
+    expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1);
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+    // every time getAllAvailableReports is called , if we want to get a full
+    // report of a certain type, we must call "batchRefreshfullReports" for
+    // this type to refresh.
+    expectedReportCount.remove(StateContext.CONTAINER_REPORTS_PROTO_NAME);
+
+    // Add a bunch of NodeReport
+    batchRefreshfullReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
+    // Should only keep the latest one
+    expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1);
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+    expectedReportCount.remove(StateContext.NODE_REPORT_PROTO_NAME);
+
+    // Add a bunch of PipelineReports
+    batchRefreshfullReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
+    // Should only keep the latest one
+    expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+    expectedReportCount.remove(StateContext.PIPELINE_REPORTS_PROTO_NAME);
+
+    // Add a bunch of CommandStatusReports
+    batchAddIncrementalReport(ctx,
+        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
+    expectedReportCount.put(
+        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
+    // Should keep all of them
+    checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+    checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+    // getReports dequeues incremental reports
+    expectedReportCount.remove(
+        StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
+
     // Add a bunch of IncrementalContainerReport
-    batchAddReports(ctx,
+    batchAddIncrementalReport(ctx,
         StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
     // Should keep all of them
     expectedReportCount.put(
@@ -210,9 +197,16 @@
         StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);
   }
 
-  void batchAddReports(StateContext ctx, String reportName, int count) {
+  void batchRefreshfullReports(StateContext ctx, String reportName, int count) {
     for (int i = 0; i < count; i++) {
-      ctx.addReport(newMockReport(reportName));
+      ctx.refreshFullReport(newMockReport(reportName));
+    }
+  }
+
+  void batchAddIncrementalReport(StateContext ctx,
+                                 String reportName, int count) {
+    for (int i = 0; i < count; i++) {
+      ctx.addIncrementalReport(newMockReport(reportName));
     }
   }
 
@@ -243,7 +237,7 @@
     assertNull(context1.getPipelineReports());
     GeneratedMessage containerReports =
         newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME);
-    context1.addReport(containerReports);
+    context1.refreshFullReport(containerReports);
 
     assertNotNull(context1.getContainerReports());
     assertEquals(StateContext.CONTAINER_REPORTS_PROTO_NAME,
@@ -255,7 +249,7 @@
     StateContext context2 = newStateContext(conf, datanodeStateMachineMock);
     GeneratedMessage nodeReport =
         newMockReport(StateContext.NODE_REPORT_PROTO_NAME);
-    context2.addReport(nodeReport);
+    context2.refreshFullReport(nodeReport);
 
     assertNull(context2.getContainerReports());
     assertNotNull(context2.getNodeReport());
@@ -267,7 +261,7 @@
     StateContext context3 = newStateContext(conf, datanodeStateMachineMock);
     GeneratedMessage pipelineReports =
         newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME);
-    context3.addReport(pipelineReports);
+    context3.refreshFullReport(pipelineReports);
 
     assertNull(context3.getContainerReports());
     assertNull(context3.getNodeReport());
@@ -311,7 +305,7 @@
         newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
 
     // Try to add report with zero endpoint. Should not be stored.
-    stateContext.addReport(generatedMessage);
+    stateContext.addIncrementalReport(generatedMessage);
     assertTrue(stateContext.getAllAvailableReports(scm1).isEmpty());
 
     // Add 2 scm endpoints.
@@ -319,7 +313,7 @@
     stateContext.addEndpoint(scm2);
 
     // Add report. Should be added to all endpoints.
-    stateContext.addReport(generatedMessage);
+    stateContext.addIncrementalReport(generatedMessage);
     List<GeneratedMessage> allAvailableReports =
         stateContext.getAllAvailableReports(scm1);
     assertEquals(1, allAvailableReports.size());
@@ -462,9 +456,9 @@
 
     // task num greater than pool size
     for (int i = 0; i < threadPoolSize; i++) {
-      executorService.submit(() -> futureOne.get());
+      executorService.submit((Callable<String>) futureOne::get);
     }
-    executorService.submit(() -> futureTwo.get());
+    executorService.submit((Callable<String>) futureTwo::get);
 
     Assert.assertFalse(stateContext.isThreadPoolAvailable(executorService));
 
@@ -483,8 +477,8 @@
 
     ExecutorService executorService = Executors.newFixedThreadPool(1);
     CompletableFuture<String> future = new CompletableFuture<>();
-    executorService.submit(() -> future.get());
-    executorService.submit(() -> future.get());
+    executorService.submit((Callable<String>) future::get);
+    executorService.submit((Callable<String>) future::get);
 
     StateContext subject = new StateContext(new OzoneConfiguration(),
         DatanodeStates.INIT, mock(DatanodeStateMachine.class)) {
@@ -549,11 +543,13 @@
     Map<String, Integer> expectedReportCount = new HashMap<>();
 
     // Add a bunch of ContainerReports
-    batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
-    batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
-    batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
-    batchAddReports(ctx, StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128);
-    batchAddReports(ctx,
+    batchRefreshfullReports(ctx,
+        StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
+    batchRefreshfullReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
+    batchRefreshfullReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
+    batchRefreshfullReports(ctx,
+        StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128);
+    batchAddIncrementalReport(ctx,
         StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
 
     // Should only keep the latest one
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 9b238a1..0f215f4 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -96,7 +96,7 @@
     HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
         conf, context, scm);
     context.addEndpoint(TEST_SCM_ENDPOINT);
-    context.addReport(NodeReportProto.getDefaultInstance());
+    context.refreshFullReport(NodeReportProto.getDefaultInstance());
     endpointTask.call();
     SCMHeartbeatRequestProto heartbeat = argument.getValue();
     Assert.assertTrue(heartbeat.hasDatanodeDetails());
@@ -128,7 +128,7 @@
     HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
         conf, context, scm);
     context.addEndpoint(TEST_SCM_ENDPOINT);
-    context.addReport(ContainerReportsProto.getDefaultInstance());
+    context.refreshFullReport(ContainerReportsProto.getDefaultInstance());
     endpointTask.call();
     SCMHeartbeatRequestProto heartbeat = argument.getValue();
     Assert.assertTrue(heartbeat.hasDatanodeDetails());
@@ -160,7 +160,8 @@
     HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
         conf, context, scm);
     context.addEndpoint(TEST_SCM_ENDPOINT);
-    context.addReport(CommandStatusReportsProto.getDefaultInstance());
+    context.addIncrementalReport(
+        CommandStatusReportsProto.getDefaultInstance());
     endpointTask.call();
     SCMHeartbeatRequestProto heartbeat = argument.getValue();
     Assert.assertTrue(heartbeat.hasDatanodeDetails());
@@ -224,9 +225,10 @@
     HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
         conf, context, scm);
     context.addEndpoint(TEST_SCM_ENDPOINT);
-    context.addReport(NodeReportProto.getDefaultInstance());
-    context.addReport(ContainerReportsProto.getDefaultInstance());
-    context.addReport(CommandStatusReportsProto.getDefaultInstance());
+    context.refreshFullReport(NodeReportProto.getDefaultInstance());
+    context.refreshFullReport(ContainerReportsProto.getDefaultInstance());
+    context.addIncrementalReport(
+        CommandStatusReportsProto.getDefaultInstance());
     context.addContainerAction(getContainerAction());
     endpointTask.call();
     SCMHeartbeatRequestProto heartbeat = argument.getValue();
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 1cee8d9..b49d05d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -160,6 +160,7 @@
   private final boolean checkKeyNameEnabled;
   private final OzoneClientConfig clientConfig;
   private final Cache<URI, KeyProvider> keyProviderCache;
+  private final boolean getLatestVersionLocation;
 
   /**
    * Creates RpcClient instance with the given configuration.
@@ -230,6 +231,9 @@
     checkKeyNameEnabled = conf.getBoolean(
         OMConfigKeys.OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_KEY,
         OMConfigKeys.OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_DEFAULT);
+    getLatestVersionLocation = conf.getBoolean(
+        OzoneConfigKeys.OZONE_CLIENT_KEY_LATEST_VERSION_LOCATION,
+        OzoneConfigKeys.OZONE_CLIENT_KEY_LATEST_VERSION_LOCATION_DEFAULT);
 
     long keyProviderCacheExpiryMs = conf.getTimeDuration(
         OZONE_CLIENT_KEY_PROVIDER_CACHE_EXPIRY,
@@ -817,6 +821,7 @@
         .setKeyName(keyName)
         .setRefreshPipeline(true)
         .setSortDatanodesInPipeline(topologyAwareReadEnabled)
+        .setLatestVersionLocation(getLatestVersionLocation)
         .build();
     OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
     return getInputStreamWithRetryFunction(keyInfo);
@@ -929,6 +934,7 @@
         .setKeyName(keyName)
         .setRefreshPipeline(true)
         .setSortDatanodesInPipeline(topologyAwareReadEnabled)
+        .setLatestVersionLocation(getLatestVersionLocation)
         .build();
     OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
 
@@ -1148,6 +1154,7 @@
         .setKeyName(keyName)
         .setRefreshPipeline(true)
         .setSortDatanodesInPipeline(topologyAwareReadEnabled)
+        .setLatestVersionLocation(getLatestVersionLocation)
         .build();
     return ozoneManagerClient.getFileStatus(keyArgs);
   }
@@ -1171,6 +1178,7 @@
         .setBucketName(bucketName)
         .setKeyName(keyName)
         .setSortDatanodesInPipeline(topologyAwareReadEnabled)
+        .setLatestVersionLocation(getLatestVersionLocation)
         .build();
     OmKeyInfo keyInfo = ozoneManagerClient.lookupFile(keyArgs);
     return getInputStreamWithRetryFunction(keyInfo);
@@ -1203,6 +1211,7 @@
             .setKeyName(omKeyInfo.getKeyName())
             .setRefreshPipeline(true)
             .setSortDatanodesInPipeline(topologyAwareReadEnabled)
+            .setLatestVersionLocation(getLatestVersionLocation)
             .build();
         return ozoneManagerClient.lookupKey(omKeyArgs);
       } catch (IOException e) {
@@ -1240,6 +1249,7 @@
         .setKeyName(keyName)
         .setRefreshPipeline(true)
         .setSortDatanodesInPipeline(topologyAwareReadEnabled)
+        .setLatestVersionLocation(getLatestVersionLocation)
         .build();
     return ozoneManagerClient
         .listStatus(keyArgs, recursive, startKey, numEntries);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
index 9a0bf6b..7b298d4 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
@@ -46,6 +46,7 @@
   private boolean refreshPipeline;
   private boolean sortDatanodesInPipeline;
   private List<OzoneAcl> acls;
+  private boolean latestVersionLocation;
   private boolean recursive;
 
   @SuppressWarnings("parameternumber")
@@ -54,7 +55,8 @@
       List<OmKeyLocationInfo> locationInfoList, boolean isMultipart,
       String uploadID, int partNumber,
       Map<String, String> metadataMap, boolean refreshPipeline,
-      List<OzoneAcl> acls, boolean sortDatanode, boolean recursive) {
+      List<OzoneAcl> acls, boolean sortDatanode,
+      boolean latestVersionLocation, boolean recursive) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.keyName = keyName;
@@ -68,6 +70,7 @@
     this.refreshPipeline = refreshPipeline;
     this.acls = acls;
     this.sortDatanodesInPipeline = sortDatanode;
+    this.latestVersionLocation = latestVersionLocation;
     this.recursive = recursive;
   }
 
@@ -135,6 +138,10 @@
     return sortDatanodesInPipeline;
   }
 
+  public boolean getLatestVersionLocation() {
+    return latestVersionLocation;
+  }
+
   public boolean isRecursive() {
     return recursive;
   }
@@ -174,6 +181,7 @@
         .addAllMetadata(metadata)
         .setRefreshPipeline(refreshPipeline)
         .setSortDatanodesInPipeline(sortDatanodesInPipeline)
+        .setLatestVersionLocation(latestVersionLocation)
         .setAcls(acls);
   }
 
@@ -193,6 +201,7 @@
     private Map<String, String> metadata = new HashMap<>();
     private boolean refreshPipeline;
     private boolean sortDatanodesInPipeline;
+    private boolean latestVersionLocation;
     private List<OzoneAcl> acls;
     private boolean recursive;
 
@@ -266,6 +275,11 @@
       return this;
     }
 
+    public Builder setLatestVersionLocation(boolean latest) {
+      this.latestVersionLocation = latest;
+      return this;
+    }
+
     public Builder setRecursive(boolean isRecursive) {
       this.recursive = isRecursive;
       return this;
@@ -276,7 +290,7 @@
           replicationConfig, locationInfoList, isMultipartKey,
           multipartUploadID,
           multipartUploadPartNumber, metadata, refreshPipeline, acls,
-          sortDatanodesInPipeline, recursive);
+          sortDatanodesInPipeline, latestVersionLocation, recursive);
     }
 
   }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index 92d5823..996e04f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -37,7 +37,6 @@
 import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
 import org.apache.hadoop.util.Time;
 
-import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,17 +82,6 @@
     this.bucketName = bucketName;
     this.keyName = keyName;
     this.dataSize = dataSize;
-    // it is important that the versions are ordered from old to new.
-    // Do this sanity check when versions got loaded on creating OmKeyInfo.
-    // TODO : this is not necessary, here only because versioning is still a
-    // work in-progress, remove this following check when versioning is
-    // complete and prove correctly functioning
-    long currentVersion = -1;
-    for (OmKeyLocationInfoGroup version : versions) {
-      Preconditions.checkArgument(
-            currentVersion + 1 == version.getVersion());
-      currentVersion = version.getVersion();
-    }
     this.keyLocationVersions = versions;
     this.creationTime = creationTime;
     this.modificationTime = modificationTime;
@@ -170,6 +158,11 @@
     return keyLocationVersions;
   }
 
+  public void setKeyLocationVersions(
+      List<OmKeyLocationInfoGroup> keyLocationVersions) {
+    this.keyLocationVersions = keyLocationVersions;
+  }
+
   public void updateModifcationTime() {
     this.modificationTime = Time.monotonicNow();
   }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index ec7e097..c13ef9f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -681,6 +681,7 @@
         .setKeyName(args.getKeyName())
         .setDataSize(args.getDataSize())
         .setSortDatanodes(args.getSortDatanodes())
+        .setLatestVersionLocation(args.getLatestVersionLocation())
         .build();
     req.setKeyArgs(keyArgs);
 
@@ -1215,6 +1216,7 @@
         .setBucketName(args.getBucketName())
         .setKeyName(args.getKeyName())
         .setSortDatanodes(args.getSortDatanodes())
+        .setLatestVersionLocation(args.getLatestVersionLocation())
         .build();
     GetFileStatusRequest req =
         GetFileStatusRequest.newBuilder()
@@ -1268,6 +1270,7 @@
         .setBucketName(args.getBucketName())
         .setKeyName(args.getKeyName())
         .setSortDatanodes(args.getSortDatanodes())
+        .setLatestVersionLocation(args.getLatestVersionLocation())
         .build();
     LookupFileRequest lookupFileRequest = LookupFileRequest.newBuilder()
             .setKeyArgs(keyArgs)
@@ -1432,6 +1435,7 @@
         .setBucketName(args.getBucketName())
         .setKeyName(args.getKeyName())
         .setSortDatanodes(args.getSortDatanodes())
+        .setLatestVersionLocation(args.getLatestVersionLocation())
         .build();
     ListStatusRequest listStatusRequest =
         ListStatusRequest.newBuilder()
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index 5ff95ce..addd15c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -393,7 +393,8 @@
 
     logCapturer.clearOutput();
     cluster.getHddsDatanodes().get(0)
-        .getDatanodeStateMachine().getContext().addReport(dummyReport);
+        .getDatanodeStateMachine().getContext().
+        addIncrementalReport(dummyReport);
     cluster.getHddsDatanodes().get(0)
         .getDatanodeStateMachine().triggerHeartbeat();
     // wait for event to be handled by event handler
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index e27afa7..a06a48b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -819,6 +819,107 @@
   }
 
   @Test
+  public void testLatestLocationVersion() throws IOException {
+    String keyName = RandomStringUtils.randomAlphabetic(5);
+    OmKeyArgs keyArgs = createBuilder()
+        .setKeyName(keyName)
+        .setLatestVersionLocation(true)
+        .build();
+
+    // lookup for a non-existent key
+    try {
+      keyManager.lookupKey(keyArgs, null);
+      Assert.fail("Lookup key should fail for non existent key");
+    } catch (OMException ex) {
+      if (ex.getResult() != OMException.ResultCodes.KEY_NOT_FOUND) {
+        throw ex;
+      }
+    }
+
+    // create a key
+    OpenKeySession keySession = keyManager.createFile(keyArgs, false, false);
+    // randomly select 3 datanodes
+    List<DatanodeDetails> nodeList = new ArrayList<>();
+    nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
+        0, null, null, null, null, 0));
+    nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
+        1, null, null, null, null, 0));
+    nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
+        2, null, null, null, null, 0));
+    Assume.assumeFalse(nodeList.get(0).equals(nodeList.get(1)));
+    Assume.assumeFalse(nodeList.get(0).equals(nodeList.get(2)));
+    // create a pipeline using 3 datanodes
+    Pipeline pipeline = scm.getPipelineManager().createPipeline(
+        new RatisReplicationConfig(ReplicationFactor.THREE), nodeList);
+    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+    List<OmKeyLocationInfo> locationList =
+        keySession.getKeyInfo().getLatestVersionLocations().getLocationList();
+    Assert.assertEquals(1, locationList.size());
+    locationInfoList.add(
+        new OmKeyLocationInfo.Builder().setPipeline(pipeline)
+            .setBlockID(new BlockID(locationList.get(0).getContainerID(),
+                locationList.get(0).getLocalID())).build());
+    keyArgs.setLocationInfoList(locationInfoList);
+
+    keyManager.commitKey(keyArgs, keySession.getId());
+    OmKeyInfo key = keyManager.lookupKey(keyArgs, null);
+    Assert.assertEquals(key.getKeyLocationVersions().size(), 1);
+
+    keySession = keyManager.createFile(keyArgs, true, true);
+    keyManager.commitKey(keyArgs, keySession.getId());
+
+    // Test lookupKey (latestLocationVersion == true)
+    key = keyManager.lookupKey(keyArgs, null);
+    Assert.assertEquals(key.getKeyLocationVersions().size(), 1);
+
+    // Test ListStatus (latestLocationVersion == true)
+    List<OzoneFileStatus> fileStatuses =
+        keyManager.listStatus(keyArgs, false, "", 1);
+    Assert.assertEquals(fileStatuses.size(), 1);
+    Assert.assertEquals(fileStatuses.get(0).getKeyInfo()
+        .getKeyLocationVersions().size(), 1);
+
+    // Test GetFileStatus (latestLocationVersion == true)
+    OzoneFileStatus ozoneFileStatus = keyManager.getFileStatus(keyArgs, null);
+    Assert.assertEquals(ozoneFileStatus.getKeyInfo()
+        .getKeyLocationVersions().size(), 1);
+
+    // Test LookupFile (latestLocationVersion == true)
+    key = keyManager.lookupFile(keyArgs, null);
+    Assert.assertEquals(key.getKeyLocationVersions().size(), 1);
+
+    keyArgs = createBuilder()
+        .setKeyName(keyName)
+        .setLatestVersionLocation(false)
+        .build();
+
+    // Test lookupKey (latestLocationVersion == false)
+    key = keyManager.lookupKey(keyArgs, null);
+    Assert.assertEquals(key.getKeyLocationVersions().size(), 2);
+
+    // Test ListStatus (latestLocationVersion == false)
+    fileStatuses = keyManager.listStatus(keyArgs, false, "", 100);
+    Assert.assertEquals(fileStatuses.size(), 1);
+    Assert.assertEquals(fileStatuses.get(0).getKeyInfo()
+        .getKeyLocationVersions().size(), 2);
+
+    // Test GetFileStatus (latestLocationVersion == false)
+    ozoneFileStatus = keyManager.getFileStatus(keyArgs, null);
+    Assert.assertEquals(ozoneFileStatus.getKeyInfo()
+        .getKeyLocationVersions().size(), 2);
+
+    // Test LookupFile (latestLocationVersion == false)
+    key = keyManager.lookupFile(keyArgs, null);
+    Assert.assertEquals(key.getKeyLocationVersions().size(), 2);
+
+    // Test ListKeys (latestLocationVersion is always true for ListKeys)
+    List<OmKeyInfo> keyInfos = keyManager.listKeys(keyArgs.getVolumeName(),
+        keyArgs.getBucketName(), "", keyArgs.getKeyName(), 100);
+    Assert.assertEquals(keyInfos.size(), 1);
+    Assert.assertEquals(keyInfos.get(0).getKeyLocationVersions().size(), 1);
+  }
+
+  @Test
   public void testListStatusWithTableCache() throws Exception {
     // Inspired by TestOmMetadataManager#testListKeys
     String prefixKeyInDB = "key-d";
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 295d2ad..650e9bb 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -741,9 +741,10 @@
 
     // This will be set by leader OM in HA and update the original request.
     optional FileEncryptionInfoProto fileEncryptionInfo = 15;
+    optional bool latestVersionLocation = 16;
 
     // This will be set when user performs delete directory recursively.
-    optional bool recursive = 16;
+    optional bool recursive = 17;
 }
 
 message KeyLocation {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index ac65aab..bcb0ccf 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -676,6 +676,10 @@
       throw new OMException("Key:" + keyName + " not found", KEY_NOT_FOUND);
     }
 
+    if (args.getLatestVersionLocation()) {
+      slimLocationVersion(value);
+    }
+
     // add block token for read.
     addBlockToken4Read(value);
 
@@ -947,6 +951,11 @@
     List<OmKeyInfo> keyList = metadataManager.listKeys(volumeName, bucketName,
         startKey, keyPrefix, maxKeys);
 
+    // For listKeys, we return the latest Key Location by default
+    for (OmKeyInfo omKeyInfo : keyList) {
+      slimLocationVersion(omKeyInfo);
+    }
+
     return keyList;
   }
 
@@ -1903,10 +1912,12 @@
 
     if (OzoneManagerRatisUtils.isBucketFSOptimized()) {
       return getOzoneFileStatusFSO(volumeName, bucketName, keyName,
-              args.getSortDatanodes(), clientAddress, false);
+              args.getSortDatanodes(), clientAddress,
+              args.getLatestVersionLocation(), false);
     }
     return getOzoneFileStatus(volumeName, bucketName, keyName,
-            args.getRefreshPipeline(), args.getSortDatanodes(), clientAddress);
+        args.getRefreshPipeline(), args.getSortDatanodes(),
+        args.getLatestVersionLocation(), clientAddress);
   }
 
   private OzoneFileStatus getOzoneFileStatus(String volumeName,
@@ -1914,6 +1925,7 @@
                                              String keyName,
                                              boolean refreshPipeline,
                                              boolean sortDatanodes,
+                                             boolean latestLocationVersion,
                                              String clientAddress)
       throws IOException {
     OmKeyInfo fileKeyInfo = null;
@@ -1947,6 +1959,9 @@
 
       // if the key is a file then do refresh pipeline info in OM by asking SCM
       if (fileKeyInfo != null) {
+        if (latestLocationVersion) {
+          slimLocationVersion(fileKeyInfo);
+        }
         // refreshPipeline flag check has been removed as part of
         // https://issues.apache.org/jira/browse/HDDS-3658.
         // Please refer this jira for more details.
@@ -1972,7 +1987,8 @@
 
   private OzoneFileStatus getOzoneFileStatusFSO(String volumeName,
       String bucketName, String keyName, boolean sortDatanodes,
-      String clientAddress, boolean skipFileNotFoundError) throws IOException {
+      String clientAddress, boolean latestLocationVersion,
+      boolean skipFileNotFoundError) throws IOException {
     OzoneFileStatus fileStatus = null;
     metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName,
             bucketName);
@@ -1994,9 +2010,10 @@
     if (fileStatus != null) {
       // if the key is a file then do refresh pipeline info in OM by asking SCM
       if (fileStatus.isFile()) {
-
         OmKeyInfo fileKeyInfo = fileStatus.getKeyInfo();
-
+        if (latestLocationVersion) {
+          slimLocationVersion(fileKeyInfo);
+        }
         // refreshPipeline flag check has been removed as part of
         // https://issues.apache.org/jira/browse/HDDS-3658.
         // Please refer this jira for more details.
@@ -2172,11 +2189,12 @@
     OzoneFileStatus fileStatus;
     if (OzoneManagerRatisUtils.isBucketFSOptimized()) {
       fileStatus = getOzoneFileStatusFSO(volumeName, bucketName, keyName,
-              args.getSortDatanodes(), clientAddress, false);
+              args.getSortDatanodes(), clientAddress,
+              args.getLatestVersionLocation(),false);
     } else {
       fileStatus = getOzoneFileStatus(volumeName, bucketName,
               keyName, args.getRefreshPipeline(), args.getSortDatanodes(),
-              clientAddress);
+              args.getLatestVersionLocation(), clientAddress);
     }
     //if key is not of type file or if key is not found we throw an exception
     if (fileStatus.isFile()) {
@@ -2409,6 +2427,9 @@
     for (OzoneFileStatus fileStatus : fileStatusList) {
       keyInfoList.add(fileStatus.getKeyInfo());
     }
+    if (args.getLatestVersionLocation()) {
+      slimLocationVersion(keyInfoList.toArray(new OmKeyInfo[0]));
+    }
     refreshPipeline(keyInfoList);
 
     if (args.getSortDatanodes()) {
@@ -2534,7 +2555,8 @@
         }
 
         OzoneFileStatus fileStatusInfo = getOzoneFileStatusFSO(volumeName,
-                bucketName, startKey, false, null, true);
+                bucketName, startKey, false, null,
+                args.getLatestVersionLocation(),true);
 
         if (fileStatusInfo != null) {
           prefixKeyInDB = fileStatusInfo.getKeyInfo().getParentObjectID();
@@ -2587,7 +2609,9 @@
     for (OzoneFileStatus fileStatus : cacheDirMap.values()) {
       fileStatusFinalList.add(fileStatus);
     }
-
+    if (args.getLatestVersionLocation()) {
+      slimLocationVersion(keyInfoList.toArray(new OmKeyInfo[0]));
+    }
     // refreshPipeline flag check has been removed as part of
     // https://issues.apache.org/jira/browse/HDDS-3658.
     // Please refer this jira for more details.
@@ -2953,6 +2977,23 @@
     }
     return nodeSet;
   }
+  private void slimLocationVersion(OmKeyInfo... keyInfos) {
+    if (keyInfos != null) {
+      for (OmKeyInfo keyInfo : keyInfos) {
+        OmKeyLocationInfoGroup key = keyInfo.getLatestVersionLocations();
+        if (key == null) {
+          LOG.warn("No location version for key {}", keyInfo);
+          continue;
+        }
+        int keyLocationVersionLength = keyInfo.getKeyLocationVersions().size();
+        if (keyLocationVersionLength <= 1) {
+          continue;
+        }
+        keyInfo.setKeyLocationVersions(keyInfo.getKeyLocationVersions()
+                .subList(keyLocationVersionLength - 1, keyLocationVersionLength));
+      }
+    }
+  }
 
   @Override
   public OmKeyInfo getPendingDeletionDir() throws IOException {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index c190768..4cbbd7f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -362,6 +362,7 @@
         .setKeyName(keyArgs.getKeyName())
         .setRefreshPipeline(true)
         .setSortDatanodesInPipeline(keyArgs.getSortDatanodes())
+        .setLatestVersionLocation(keyArgs.getLatestVersionLocation())
         .build();
     OmKeyInfo keyInfo = impl.lookupKey(omKeyArgs);
     resp.setKeyInfo(keyInfo.getProtobuf(false, clientVersion));
@@ -566,6 +567,7 @@
         .setKeyName(keyArgs.getKeyName())
         .setRefreshPipeline(true)
         .setSortDatanodesInPipeline(keyArgs.getSortDatanodes())
+        .setLatestVersionLocation(keyArgs.getLatestVersionLocation())
         .build();
     return LookupFileResponse.newBuilder()
         .setKeyInfo(impl.lookupFile(omKeyArgs).getProtobuf(clientVersion))
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
index 472efb4..be0b2ce 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.recon.fsck;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
@@ -68,6 +69,11 @@
     return replicaDelta == 0 && !isMisReplicated();
   }
 
+  public boolean isDeleted() {
+    return container.getState() == HddsProtos.LifeCycleState.DELETED ||
+        container.getState() == HddsProtos.LifeCycleState.DELETING;
+  }
+
   public boolean isOverReplicated() {
     return replicaDelta < 0;
   }
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
index 04afed2..5f4df00 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
@@ -18,18 +18,24 @@
 
 package org.apache.hadoop.ozone.recon.fsck;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
 import org.apache.hadoop.ozone.recon.scm.ReconScmTask;
+import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
 import org.apache.hadoop.util.Time;
 import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
@@ -49,6 +55,7 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(ContainerHealthTask.class);
 
+  private StorageContainerServiceProvider scmClient;
   private ContainerManagerV2 containerManager;
   private ContainerHealthSchemaManager containerHealthSchemaManager;
   private PlacementPolicy placementPolicy;
@@ -57,11 +64,13 @@
 
   public ContainerHealthTask(
       ContainerManagerV2 containerManager,
+      StorageContainerServiceProvider scmClient,
       ReconTaskStatusDao reconTaskStatusDao,
       ContainerHealthSchemaManager containerHealthSchemaManager,
       PlacementPolicy placementPolicy,
       ReconTaskConfig reconTaskConfig) {
     super(reconTaskStatusDao);
+    this.scmClient = scmClient;
     this.containerHealthSchemaManager = containerHealthSchemaManager;
     this.placementPolicy = placementPolicy;
     this.containerManager = containerManager;
@@ -147,6 +156,11 @@
           }
           if (ContainerHealthRecords
               .retainOrUpdateRecord(currentContainer, rec)) {
+            // Check if the missing container is deleted in SCM
+            if (currentContainer.isMissing() &&
+                containerDeletedInSCM(currentContainer.getContainer())) {
+              rec.delete();
+            }
             existingRecords.add(rec.getContainerState());
             if (rec.changed()) {
               rec.update();
@@ -174,7 +188,11 @@
           containerManager.getContainerReplicas(container.containerID());
       ContainerHealthStatus h = new ContainerHealthStatus(
           container, containerReplicas, placementPolicy);
-      if (h.isHealthy()) {
+      if (h.isHealthy() || h.isDeleted()) {
+        return;
+      }
+      // For containers deleted in SCM, we sync the container state here.
+      if (h.isMissing() && containerDeletedInSCM(container)) {
         return;
       }
       containerHealthSchemaManager.insertUnhealthyContainerRecords(
@@ -185,6 +203,35 @@
     }
   }
 
+  private boolean containerDeletedInSCM(ContainerInfo containerInfo) {
+    try {
+      ContainerWithPipeline containerWithPipeline =
+          scmClient.getContainerWithPipeline(containerInfo.getContainerID());
+      if (containerWithPipeline.getContainerInfo().getState() ==
+          HddsProtos.LifeCycleState.DELETED) {
+        if (containerInfo.getState() == HddsProtos.LifeCycleState.CLOSED) {
+          containerManager.updateContainerState(containerInfo.containerID(),
+              HddsProtos.LifeCycleEvent.DELETE);
+        }
+        if (containerInfo.getState() == HddsProtos.LifeCycleState.DELETING &&
+            containerManager.getContainerReplicas(containerInfo.containerID())
+                .size() == 0
+        ) {
+          containerManager.updateContainerState(containerInfo.containerID(),
+              HddsProtos.LifeCycleEvent.CLEANUP);
+        }
+        return true;
+      }
+    } catch (InvalidStateTransitionException e) {
+      LOG.error("Failed to transition Container state while processing " +
+          "container in Container Health task", e);
+    } catch (IOException e) {
+      LOG.error("Got exception while processing container in" +
+          " Container Health task", e);
+    }
+    return false;
+  }
+
   /**
    * Helper methods to generate and update the required database records for
    * unhealthy containers.
@@ -245,7 +292,7 @@
         ContainerHealthStatus container, Set<String> recordForStateExists,
         long time) {
       List<UnhealthyContainers> records = new ArrayList<>();
-      if (container.isHealthy()) {
+      if (container.isHealthy() || container.isDeleted()) {
         return records;
       }
 
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index c7f4e22..ebef4d8 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -195,6 +195,7 @@
         reconTaskConfig));
     reconScmTasks.add(new ContainerHealthTask(
         containerManager,
+        scmServiceProvider,
         reconTaskStatusDao, containerHealthSchemaManager,
         containerPlacementPolicy,
         reconTaskConfig));
@@ -334,4 +335,8 @@
   public EventQueue getEventQueue() {
     return eventQueue;
   }
+
+  public StorageContainerServiceProvider getScmServiceProvider() {
+    return scmServiceProvider;
+  }
 }
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
index 18f9fa4..18125ea 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
@@ -42,9 +42,11 @@
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
 import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
 import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition;
@@ -73,6 +75,8 @@
         mock(ReconStorageContainerManagerFacade.class);
     MockPlacementPolicy placementMock = new MockPlacementPolicy();
     ContainerManagerV2 containerManagerMock = mock(ContainerManagerV2.class);
+    StorageContainerServiceProvider scmClientMock =
+        mock(StorageContainerServiceProvider.class);
     ContainerReplica unhealthyReplicaMock = mock(ContainerReplica.class);
     when(unhealthyReplicaMock.getState()).thenReturn(State.UNHEALTHY);
     ContainerReplica healthyReplicaMock = mock(ContainerReplica.class);
@@ -81,10 +85,13 @@
     // Create 6 containers. The first 5 will have various unhealthy states
     // defined below. The container with ID=6 will be healthy.
     List<ContainerInfo> mockContainers = getMockContainers(6);
+    when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
     when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
     when(containerManagerMock.getContainers()).thenReturn(mockContainers);
     for (ContainerInfo c : mockContainers) {
       when(containerManagerMock.getContainer(c.containerID())).thenReturn(c);
+      when(scmClientMock.getContainerWithPipeline(c.getContainerID()))
+          .thenReturn(new ContainerWithPipeline(c, null));
     }
     // Under replicated
     when(containerManagerMock.getContainerReplicas(ContainerID.valueOf(1L)))
@@ -125,6 +132,7 @@
     reconTaskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(2));
     ContainerHealthTask containerHealthTask =
         new ContainerHealthTask(scmMock.getContainerManager(),
+            scmMock.getScmServiceProvider(),
             reconTaskStatusDao, containerHealthSchemaManager,
             placementMock, reconTaskConfig);
     containerHealthTask.start();
@@ -206,6 +214,76 @@
         unHealthyContainersTableHandle.fetchByContainerId(5L).size());
   }
 
+  @Test
+  public void testDeletedContainer() throws Exception {
+    UnhealthyContainersDao unHealthyContainersTableHandle =
+        getDao(UnhealthyContainersDao.class);
+
+    ContainerHealthSchemaManager containerHealthSchemaManager =
+        new ContainerHealthSchemaManager(
+            getSchemaDefinition(ContainerSchemaDefinition.class),
+            unHealthyContainersTableHandle);
+    ReconStorageContainerManagerFacade scmMock =
+        mock(ReconStorageContainerManagerFacade.class);
+    MockPlacementPolicy placementMock = new MockPlacementPolicy();
+    ContainerManagerV2 containerManagerMock = mock(ContainerManagerV2.class);
+    StorageContainerServiceProvider scmClientMock =
+        mock(StorageContainerServiceProvider.class);
+
+    // Create 2 containers. The first is OPEN will no replicas, the second is
+    // CLOSED with no replicas.
+    List<ContainerInfo> mockContainers = getMockContainers(2);
+    when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
+    when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
+    when(containerManagerMock.getContainers()).thenReturn(mockContainers);
+    for (ContainerInfo c : mockContainers) {
+      when(containerManagerMock.getContainer(c.containerID())).thenReturn(c);
+      when(scmClientMock.getContainerWithPipeline(c.getContainerID()))
+          .thenReturn(new ContainerWithPipeline(c, null));
+    }
+    // Container State OPEN with no replicas
+    when(containerManagerMock.getContainer(ContainerID.valueOf(1L)).getState())
+        .thenReturn(HddsProtos.LifeCycleState.OPEN);
+    when(containerManagerMock.getContainerReplicas(ContainerID.valueOf(1L)))
+        .thenReturn(Collections.emptySet());
+    when(scmClientMock.getContainerWithPipeline(1))
+        .thenReturn(new ContainerWithPipeline(mockContainers.get(0), null));
+
+    // Container State CLOSED with no replicas
+    when(containerManagerMock.getContainer(ContainerID.valueOf(2L)).getState())
+        .thenReturn(HddsProtos.LifeCycleState.CLOSED);
+    when(containerManagerMock.getContainerReplicas(ContainerID.valueOf(2L)))
+        .thenReturn(Collections.emptySet());
+    ContainerInfo mockDeletedContainer = getMockDeletedContainer(2);
+    when(scmClientMock.getContainerWithPipeline(2))
+        .thenReturn(new ContainerWithPipeline(mockDeletedContainer, null));
+
+    List<UnhealthyContainers> all = unHealthyContainersTableHandle.findAll();
+    Assert.assertTrue(all.isEmpty());
+
+    long currentTime = System.currentTimeMillis();
+    ReconTaskStatusDao reconTaskStatusDao = getDao(ReconTaskStatusDao.class);
+    ReconTaskConfig reconTaskConfig = new ReconTaskConfig();
+    reconTaskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(2));
+    ContainerHealthTask containerHealthTask =
+        new ContainerHealthTask(scmMock.getContainerManager(),
+            scmMock.getScmServiceProvider(),
+            reconTaskStatusDao, containerHealthSchemaManager,
+            placementMock, reconTaskConfig);
+    containerHealthTask.start();
+    LambdaTestUtils.await(6000, 1000, () ->
+        (unHealthyContainersTableHandle.count() == 1));
+    UnhealthyContainers rec =
+        unHealthyContainersTableHandle.fetchByContainerId(1L).get(0);
+    assertEquals("MISSING", rec.getContainerState());
+    assertEquals(3, rec.getReplicaDelta().intValue());
+
+    ReconTaskStatus taskStatus =
+        reconTaskStatusDao.findById(containerHealthTask.getTaskName());
+    Assert.assertTrue(taskStatus.getLastUpdatedTimestamp() >
+        currentTime);
+  }
+
   private Set<ContainerReplica> getMockReplicas(
       long containerId, State...states) {
     Set<ContainerReplica> replicas = new HashSet<>();
@@ -233,6 +311,16 @@
     return containers;
   }
 
+  private ContainerInfo getMockDeletedContainer(int containerID) {
+    ContainerInfo c = mock(ContainerInfo.class);
+    when(c.getContainerID()).thenReturn((long)containerID);
+    when(c.getReplicationFactor())
+        .thenReturn(HddsProtos.ReplicationFactor.THREE);
+    when(c.containerID()).thenReturn(ContainerID.valueOf(containerID));
+    when(c.getState()).thenReturn(HddsProtos.LifeCycleState.DELETED);
+    return c;
+  }
+
   /**
    * This is a simple implementation of PlacementPolicy, so that when
    * validateContainerPlacement() is called, by default it will return a value