Merge remote-tracking branch 'origin/master' into HDDS-4440-s3-performance
diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
index 073bd9d..9f2116c 100644
--- a/hadoop-hdds/client/pom.xml
+++ b/hadoop-hdds/client/pom.xml
@@ -46,6 +46,10 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.ozone</groupId>
+      <artifactId>hdds-erasurecode</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
similarity index 100%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
similarity index 95%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
index d1bf7f3..6703216 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
@@ -22,8 +22,8 @@
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.Token;
 
 import java.util.function.Function;
@@ -47,7 +47,7 @@
    * @return BlockExtendedInputStream of the correct type.
    */
   BlockExtendedInputStream create(ReplicationConfig repConfig,
-      OmKeyLocationInfo blockInfo, Pipeline pipeline,
+      BlockLocationInfo blockInfo, Pipeline pipeline,
       Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
        XceiverClientFactory xceiverFactory,
        Function<BlockID, Pipeline> refreshFunction);
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
similarity index 96%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 104e5bc..ba05ec2 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -25,10 +25,10 @@
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.ElasticByteBufferPool;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.Token;
 
 import java.util.concurrent.ExecutorService;
@@ -75,7 +75,7 @@
    * @return BlockExtendedInputStream of the correct type.
    */
   public BlockExtendedInputStream create(ReplicationConfig repConfig,
-      OmKeyLocationInfo blockInfo, Pipeline pipeline,
+      BlockLocationInfo blockInfo, Pipeline pipeline,
       Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
       XceiverClientFactory xceiverFactory,
       Function<BlockID, Pipeline> refreshFunction) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
similarity index 97%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index b0e9755..70bd384 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -28,8 +28,8 @@
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +53,7 @@
   private final boolean verifyChecksum;
   private final XceiverClientFactory xceiverClientFactory;
   private final Function<BlockID, Pipeline> refreshFunction;
-  private final OmKeyLocationInfo blockInfo;
+  private final BlockLocationInfo blockInfo;
   private final DatanodeDetails[] dataLocations;
   private final BlockExtendedInputStream[] blockStreams;
   private final int maxLocations;
@@ -62,10 +62,6 @@
   private boolean closed = false;
   private boolean seeked = false;
 
-  protected OmKeyLocationInfo getBlockInfo() {
-    return blockInfo;
-  }
-
   protected ECReplicationConfig getRepConfig() {
     return repConfig;
   }
@@ -109,7 +105,7 @@
   }
 
   public ECBlockInputStream(ECReplicationConfig repConfig,
-      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      BlockLocationInfo blockInfo, boolean verifyChecksum,
       XceiverClientFactory xceiverClientFactory, Function<BlockID,
       Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
     this.repConfig = repConfig;
@@ -174,7 +170,7 @@
           .setState(Pipeline.PipelineState.CLOSED)
           .build();
 
-      OmKeyLocationInfo blkInfo = new OmKeyLocationInfo.Builder()
+      BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
           .setBlockID(blockInfo.getBlockID())
           .setLength(internalBlockLength(locationIndex + 1))
           .setPipeline(blockInfo.getPipeline())
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
similarity index 95%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
index 6c39e93..c9d2b76 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
@@ -23,7 +23,7 @@
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 
 import java.util.List;
 import java.util.function.Function;
@@ -52,7 +52,7 @@
    */
   BlockExtendedInputStream create(boolean missingLocations,
       List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
-      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      BlockLocationInfo blockInfo, boolean verifyChecksum,
       XceiverClientFactory xceiverFactory,
       Function<BlockID, Pipeline> refreshFunction);
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
similarity index 97%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
index 470df0c..efc3b31 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
@@ -24,8 +24,8 @@
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.io.ByteBufferPool;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -75,7 +75,7 @@
    */
   public BlockExtendedInputStream create(boolean missingLocations,
       List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
-      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      BlockLocationInfo blockInfo, boolean verifyChecksum,
       XceiverClientFactory xceiverFactory,
       Function<BlockID, Pipeline> refreshFunction) {
     if (missingLocations) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
similarity index 97%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
index ecde9c6..49ee7c7 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
@@ -23,8 +23,8 @@
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +51,7 @@
   private final boolean verifyChecksum;
   private final XceiverClientFactory xceiverClientFactory;
   private final Function<BlockID, Pipeline> refreshFunction;
-  private final OmKeyLocationInfo blockInfo;
+  private final BlockLocationInfo blockInfo;
   private final ECBlockInputStreamFactory ecBlockInputStreamFactory;
 
   private BlockExtendedInputStream blockReader;
@@ -96,7 +96,7 @@
   }
 
   public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
-      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      BlockLocationInfo blockInfo, boolean verifyChecksum,
       XceiverClientFactory xceiverClientFactory, Function<BlockID,
       Pipeline> refreshFunction, ECBlockInputStreamFactory streamFactory) {
     this.repConfig = repConfig;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
similarity index 100%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
similarity index 99%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index d5ec6db..dc7daf8 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -25,9 +25,9 @@
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
 import org.apache.hadoop.io.ByteBufferPool;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.ozone.erasurecode.rawcoder.RawErasureDecoder;
 import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
 import org.apache.ratis.util.Preconditions;
@@ -116,7 +116,7 @@
 
   @SuppressWarnings("checkstyle:ParameterNumber")
   public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
-      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      BlockLocationInfo blockInfo, boolean verifyChecksum,
       XceiverClientFactory xceiverClientFactory, Function<BlockID,
       Pipeline> refreshFunction, BlockInputStreamFactory streamFactory,
       ByteBufferPool byteBufferPool,
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java
similarity index 100%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java
new file mode 100644
index 0000000..493ece8
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+/**
+ * This package contains Ozone I/O classes.
+ */
diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml
index 58c0788..cf65d8b 100644
--- a/hadoop-hdds/common/pom.xml
+++ b/hadoop-hdds/common/pom.xml
@@ -71,6 +71,10 @@
       <artifactId>jackson-annotations</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.datatype</groupId>
+      <artifactId>jackson-datatype-jsr310</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.ozone</groupId>
       <artifactId>hdds-config</artifactId>
     </dependency>
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 2a962d7..fc83469 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -403,6 +403,12 @@
   public static final String OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT =
       "120s";
 
+  public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL =
+      "ozone.scm.pipeline.scrub.interval";
+  public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT =
+      "5m";
+
+
   // Allow SCM to auto create factor ONE ratis pipeline.
   public static final String OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE =
       "ozone.scm.pipeline.creation.auto.factor.one";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManagerReport.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManagerReport.java
index 2f2a7bf..60e61b2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManagerReport.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManagerReport.java
@@ -107,7 +107,7 @@
     for (HddsProtos.KeyContainerIDList sample : proto.getStatSampleList()) {
       report.setSample(sample.getKey(), sample.getContainerList()
           .stream()
-          .map(c -> ContainerID.getFromProtobuf(c))
+          .map(ContainerID::getFromProtobuf)
           .collect(Collectors.toList()));
     }
     return report;
@@ -147,6 +147,34 @@
   }
 
   /**
+   * Return a map of all stats and their value as a long.
+   * @return
+   */
+  public Map<String, Long> getStats() {
+    Map<String, Long> result = new HashMap<>();
+    for (Map.Entry<String, LongAdder> e : stats.entrySet()) {
+      result.put(e.getKey(), e.getValue().longValue());
+    }
+    return result;
+  }
+
+  /**
+   * Return a map of all samples, with the stat as the key and the samples
+   * for the stat as a List of Long.
+   * @return
+   */
+  public Map<String, List<Long>> getSamples() {
+    Map<String, List<Long>> result = new HashMap<>();
+    for (Map.Entry<String, List<ContainerID>> e : containerSample.entrySet()) {
+      result.put(e.getKey(),
+          e.getValue().stream()
+              .map(c -> c.getId())
+              .collect(Collectors.toList()));
+    }
+    return result;
+  }
+
+  /**
    * Get the stat for the given LifeCycleState. If there is no stat available
    * for that stat -1 is returned.
    * @param stat The requested stat.
@@ -184,7 +212,11 @@
   }
 
   protected void setStat(String stat, long value) {
-    LongAdder adder = getStatAndEnsurePresent(stat);
+    LongAdder adder = stats.get(stat);
+    if (adder == null) {
+      // this is an unknown stat, so ignore it.
+      return;
+    }
     if (adder.longValue() != 0) {
       throw new IllegalStateException(stat + " is expected to be zero");
     }
@@ -192,9 +224,11 @@
   }
 
   protected void setSample(String stat, List<ContainerID> sample) {
-    // First get the stat, as we should not receive a sample for a stat which
-    // does not exist.
-    getStatAndEnsurePresent(stat);
+    LongAdder adder = stats.get(stat);
+    if (adder == null) {
+      // this is an unknown stat, so ignore it.
+      return;
+    }
     // Now check there is not already a sample for this stat
     List<ContainerID> existingSample = containerSample.get(stat);
     if (existingSample != null) {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
new file mode 100644
index 0000000..286762d
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.util.Objects;
+
+/**
+ * One key can be too huge to fit in one container. In which case it gets split
+ * into a number of subkeys. This class represents one such subkey instance.
+ */
+public class BlockLocationInfo {
+  private final BlockID blockID;
+  private long length;
+  private final long offset;
+  // Block token, required for client authentication when security is enabled.
+  private Token<OzoneBlockTokenIdentifier> token;
+  // the version number indicating when this block was added
+  private long createVersion;
+
+  private Pipeline pipeline;
+
+  // PartNumber is set for Multipart upload Keys.
+  private int partNumber;
+
+  protected BlockLocationInfo(Builder builder) {
+    this.blockID = builder.blockID;
+    this.pipeline = builder.pipeline;
+    this.length = builder.length;
+    this.offset = builder.offset;
+    this.token = builder.token;
+    this.partNumber = builder.partNumber;
+    this.createVersion = builder.createVersion;
+  }
+
+  public void setCreateVersion(long version) {
+    createVersion = version;
+  }
+
+  public long getCreateVersion() {
+    return createVersion;
+  }
+
+  public BlockID getBlockID() {
+    return blockID;
+  }
+
+  public long getContainerID() {
+    return blockID.getContainerID();
+  }
+
+  public long getLocalID() {
+    return blockID.getLocalID();
+  }
+
+  public Pipeline getPipeline() {
+    return pipeline;
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+  public void setLength(long length) {
+    this.length = length;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public long getBlockCommitSequenceId() {
+    return blockID.getBlockCommitSequenceId();
+  }
+
+  public Token<OzoneBlockTokenIdentifier> getToken() {
+    return token;
+  }
+
+  public void setToken(Token<OzoneBlockTokenIdentifier> token) {
+    this.token = token;
+  }
+
+  public void setPipeline(Pipeline pipeline) {
+    this.pipeline = pipeline;
+  }
+
+  public void setPartNumber(int partNumber) {
+    this.partNumber = partNumber;
+  }
+
+  public int getPartNumber() {
+    return partNumber;
+  }
+
+  /**
+   * Builder of BlockLocationInfo.
+   */
+  public static class Builder {
+    private BlockID blockID;
+    private long length;
+    private long offset;
+    private Token<OzoneBlockTokenIdentifier> token;
+    private Pipeline pipeline;
+    private int partNumber;
+    private long createVersion;
+
+    public Builder setBlockID(BlockID blockId) {
+      this.blockID = blockId;
+      return this;
+    }
+
+    public Builder setPipeline(Pipeline pipeline) {
+      this.pipeline = pipeline;
+      return this;
+    }
+
+    public Builder setLength(long len) {
+      this.length = len;
+      return this;
+    }
+
+    public Builder setOffset(long off) {
+      this.offset = off;
+      return this;
+    }
+
+    public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
+      this.token = bToken;
+      return this;
+    }
+
+    public Builder setPartNumber(int partNum) {
+      this.partNumber = partNum;
+      return this;
+    }
+
+    public Builder setCreateVersion(long version) {
+      this.createVersion = version;
+      return this;
+    }
+
+    public BlockLocationInfo build() {
+      return new BlockLocationInfo(this);
+    }
+  }
+
+  @Override
+  public String  toString() {
+    return "{blockID={containerID=" + blockID.getContainerID() +
+        ", localID=" + blockID.getLocalID() + "}" +
+        ", length=" + length +
+        ", offset=" + offset +
+        ", token=" + token +
+        ", pipeline=" + pipeline +
+        ", createVersion=" + createVersion +
+        ", partNumber=" + partNumber
+        + '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    BlockLocationInfo that = (BlockLocationInfo) o;
+    return length == that.length &&
+        offset == that.offset &&
+        createVersion == that.createVersion &&
+        Objects.equals(blockID, that.blockID) &&
+        Objects.equals(token, that.token) &&
+        Objects.equals(pipeline, that.pipeline);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(blockID, length, offset, token, createVersion,
+        pipeline);
+  }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/JsonUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/server/JsonUtils.java
similarity index 100%
rename from hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/JsonUtils.java
rename to hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/server/JsonUtils.java
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 6c76ca6..709cf45 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1369,6 +1369,15 @@
     </description>
   </property>
   <property>
+    <name>ozone.scm.pipeline.scrub.interval</name>
+    <value>5m</value>
+    <tag>OZONE, SCM, PIPELINE</tag>
+    <description>
+      SCM schedules a fixed interval job using the configured interval to
+      scrub pipelines.
+    </description>
+  </property>
+  <property>
     <name>ozone.scm.pipeline.creation.auto.factor.one</name>
     <value>true</value>
     <tag>OZONE, SCM, PIPELINE</tag>
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java
index a05f9ab..a68f70c 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java
@@ -17,16 +17,22 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.JsonUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static com.fasterxml.jackson.databind.node.JsonNodeType.ARRAY;
+
 /**
  * Tests for the ReplicationManagerReport class.
  */
@@ -64,6 +70,54 @@
         report.getStat(HddsProtos.LifeCycleState.QUASI_CLOSED));
   }
 
+
+  @Test
+  public void testJsonOutput() throws IOException {
+    report.increment(HddsProtos.LifeCycleState.OPEN);
+    report.increment(HddsProtos.LifeCycleState.CLOSED);
+    report.increment(HddsProtos.LifeCycleState.CLOSED);
+
+    report.incrementAndSample(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED,
+        new ContainerID(1));
+    report.incrementAndSample(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED,
+        new ContainerID(2));
+    report.incrementAndSample(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED,
+        new ContainerID(3));
+    report.setComplete();
+
+    String jsonString = JsonUtils.toJsonStringWithDefaultPrettyPrinter(report);
+
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode json = mapper.readTree(jsonString);
+
+    Assert.assertTrue(json.get("reportTimeStamp").longValue() > 0);
+    JsonNode stats = json.get("stats");
+    Assert.assertEquals(1, stats.get("OPEN").longValue());
+    Assert.assertEquals(0, stats.get("CLOSING").longValue());
+    Assert.assertEquals(0, stats.get("QUASI_CLOSED").longValue());
+    Assert.assertEquals(2, stats.get("CLOSED").longValue());
+    Assert.assertEquals(0, stats.get("DELETING").longValue());
+    Assert.assertEquals(0, stats.get("DELETED").longValue());
+
+    Assert.assertEquals(2, stats.get("UNDER_REPLICATED").longValue());
+    Assert.assertEquals(1, stats.get("OVER_REPLICATED").longValue());
+    Assert.assertEquals(0, stats.get("MIS_REPLICATED").longValue());
+    Assert.assertEquals(0, stats.get("MISSING").longValue());
+    Assert.assertEquals(0, stats.get("UNHEALTHY").longValue());
+    Assert.assertEquals(0, stats.get("EMPTY").longValue());
+    Assert.assertEquals(0, stats.get("OPEN_UNHEALTHY").longValue());
+    Assert.assertEquals(0, stats.get("QUASI_CLOSED_STUCK").longValue());
+
+    JsonNode samples = json.get("samples");
+    Assert.assertEquals(ARRAY, samples.get("UNDER_REPLICATED").getNodeType());
+    Assert.assertEquals(1, samples.get("UNDER_REPLICATED").get(0).longValue());
+    Assert.assertEquals(2, samples.get("UNDER_REPLICATED").get(1).longValue());
+    Assert.assertEquals(3, samples.get("OVER_REPLICATED").get(0).longValue());
+  }
+
   @Test
   public void testContainerIDsCanBeSampled() {
     report.incrementAndSample(
@@ -146,6 +200,35 @@
     }
   }
 
+  @Test
+  public void testDeSerializeCanHandleUnknownMetric() {
+    HddsProtos.ReplicationManagerReportProto.Builder proto =
+        HddsProtos.ReplicationManagerReportProto.newBuilder();
+    proto.setTimestamp(12345);
+
+    proto.addStat(HddsProtos.KeyIntValue.newBuilder()
+        .setKey("unknownValue")
+        .setValue(15)
+        .build());
+
+    proto.addStat(HddsProtos.KeyIntValue.newBuilder()
+        .setKey(ReplicationManagerReport.HealthState.UNDER_REPLICATED
+            .toString())
+        .setValue(20)
+        .build());
+
+    HddsProtos.KeyContainerIDList.Builder sample
+        = HddsProtos.KeyContainerIDList.newBuilder();
+    sample.setKey("unknownValue");
+    sample.addContainer(ContainerID.valueOf(1).getProtobuf());
+    proto.addStatSample(sample.build());
+    // Ensure no exception is thrown
+    ReplicationManagerReport newReport =
+        ReplicationManagerReport.fromProtobuf(proto.build());
+    Assert.assertEquals(20, newReport.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+  }
+
   @Test(expected = IllegalStateException.class)
   public void testStatCannotBeSetTwice() {
     report.setStat(HddsProtos.LifeCycleState.CLOSED.toString(), 10);
diff --git a/hadoop-hdds/docs/pom.xml b/hadoop-hdds/docs/pom.xml
index fded922..9a6a330 100644
--- a/hadoop-hdds/docs/pom.xml
+++ b/hadoop-hdds/docs/pom.xml
@@ -28,9 +28,10 @@
   <name>Apache Ozone/HDDS Documentation</name>
   <packaging>jar</packaging>
 
-  <dependencies>
+  <properties>
+    <skipDocs>false</skipDocs>
+  </properties>
 
-  </dependencies>
   <build>
     <plugins>
       <plugin>
@@ -42,11 +43,12 @@
               <goal>exec</goal>
             </goals>
             <phase>compile</phase>
+            <configuration>
+              <executable>../../hadoop-ozone/dev-support/checks/docs.sh</executable>
+              <skip>${skipDocs}</skip>
+            </configuration>
           </execution>
         </executions>
-        <configuration>
-          <executable>dev-support/bin/generate-site.sh</executable>
-        </configuration>
       </plugin>
       <plugin>
         <groupId>org.apache.rat</groupId>
diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml
index 35248b7..d981032 100644
--- a/hadoop-hdds/pom.xml
+++ b/hadoop-hdds/pom.xml
@@ -109,6 +109,12 @@
 
       <dependency>
         <groupId>org.apache.ozone</groupId>
+        <artifactId>hdds-erasurecode</artifactId>
+        <version>${hdds.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.ozone</groupId>
         <artifactId>hdds-client</artifactId>
         <version>${hdds.version}</version>
       </dependency>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
index 2e7b04f..09d289e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
@@ -461,62 +461,44 @@
    */
   private void checkIterationMoveResults(Set<DatanodeDetails> selectedTargets) {
     this.countDatanodesInvolvedPerIteration = 0;
-    this.sizeMovedPerIteration = 0;
-    for (Map.Entry<ContainerMoveSelection,
-            CompletableFuture<ReplicationManager.MoveResult>>
-        futureEntry : moveSelectionToFutureMap.entrySet()) {
-      ContainerMoveSelection moveSelection = futureEntry.getKey();
-      CompletableFuture<ReplicationManager.MoveResult> future =
-          futureEntry.getValue();
-      try {
-        ReplicationManager.MoveResult result = future.get(
-            config.getMoveTimeout().toMillis(), TimeUnit.MILLISECONDS);
-        if (result == ReplicationManager.MoveResult.COMPLETED) {
-          try {
-            ContainerInfo container =
-                containerManager.getContainer(moveSelection.getContainerID());
-            this.sizeMovedPerIteration += container.getUsedBytes();
-            metrics.incrementNumContainerMovesInLatestIteration(1);
-            LOG.info("Container move completed for container {} to target {}",
-                container.containerID(),
-                moveSelection.getTargetNode().getUuidString());
-          } catch (ContainerNotFoundException e) {
-            LOG.warn("Could not find Container {} while " +
-                    "checking move results in ContainerBalancer",
-                moveSelection.getContainerID(), e);
-          }
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Container move for container {} to target {} failed: {}",
-                moveSelection.getContainerID(),
-                moveSelection.getTargetNode().getUuidString(), result);
-          }
-        }
-      } catch (InterruptedException e) {
-        LOG.warn("Interrupted while waiting for container move result for " +
-                "container {}.",
-            moveSelection.getContainerID(), e);
-        Thread.currentThread().interrupt();
-      } catch (ExecutionException e) {
-        LOG.warn("Container move for container {} completed exceptionally.",
-            moveSelection.getContainerID(), e);
-      } catch (TimeoutException e) {
-        LOG.warn("Container move for container {} timed out.",
-            moveSelection.getContainerID(), e);
-      }
+
+    CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(
+        moveSelectionToFutureMap.values()
+            .toArray(new CompletableFuture[moveSelectionToFutureMap.size()]));
+    try {
+      allFuturesResult.get(config.getMoveTimeout().toMillis(),
+          TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } catch (TimeoutException e) {
+      long timeoutCounts = moveSelectionToFutureMap.entrySet().stream()
+          .filter(entry -> !entry.getValue().isDone())
+          .peek(entry -> {
+            LOG.warn("Container move canceled for container {} to target {} " +
+                "due to timeout.", entry.getKey().getContainerID(),
+                entry.getKey().getTargetNode().getUuidString());
+            entry.getValue().cancel(true);
+          }).count();
+      LOG.warn("{} Container moves are canceled.", timeoutCounts);
+      metrics.incrementNumContainerMovesTimeoutInLatestIteration(timeoutCounts);
+    } catch (ExecutionException e) {
+      LOG.error("Got exception while checkIterationMoveResults", e);
     }
+
     countDatanodesInvolvedPerIteration =
         sourceToTargetMap.size() + selectedTargets.size();
     metrics.incrementNumDatanodesInvolvedInLatestIteration(
         countDatanodesInvolvedPerIteration);
-    sizeMovedPerIteration /= OzoneConsts.GB;
-    metrics.incrementDataSizeMovedGBInLatestIteration(sizeMovedPerIteration);
-    metrics.incrementNumContainerMoves(
-        metrics.getNumContainerMovesInLatestIteration());
-    metrics.incrementDataSizeMovedGB(sizeMovedPerIteration);
+    metrics.incrementNumContainerMovesCompleted(
+        metrics.getNumContainerMovesCompletedInLatestIteration());
+    metrics.incrementNumContainerMovesTimeout(
+        metrics.getNumContainerMovesTimeoutInLatestIteration());
+    metrics.incrementDataSizeMovedGB(
+        metrics.getDataSizeMovedGBInLatestIteration());
     LOG.info("Number of datanodes involved in this iteration: {}. Size moved " +
             "in this iteration: {}GB.",
-        countDatanodesInvolvedPerIteration, sizeMovedPerIteration);
+        countDatanodesInvolvedPerIteration,
+        metrics.getDataSizeMovedGBInLatestIteration());
   }
 
   /**
@@ -603,25 +585,49 @@
    */
   private boolean moveContainer(DatanodeDetails source,
                                 ContainerMoveSelection moveSelection) {
-    ContainerID container = moveSelection.getContainerID();
+    ContainerID containerID = moveSelection.getContainerID();
     CompletableFuture<ReplicationManager.MoveResult> future;
     try {
+      ContainerInfo containerInfo = containerManager.getContainer(containerID);
       future = replicationManager
-          .move(container, source, moveSelection.getTargetNode());
+          .move(containerID, source, moveSelection.getTargetNode())
+          .whenComplete((result, ex) -> {
+            if (ex != null) {
+              LOG.info("Container move for container {} from source {} to " +
+                      "target {} failed with exceptions {}",
+                  containerID.toString(),
+                  source.getUuidString(),
+                  moveSelection.getTargetNode().getUuidString(), ex);
+            } else {
+              if (result == ReplicationManager.MoveResult.COMPLETED) {
+                metrics.incrementDataSizeMovedGBInLatestIteration(
+                    containerInfo.getUsedBytes() / OzoneConsts.GB);
+                metrics.incrementNumContainerMovesCompletedInLatestIteration(1);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug(
+                      "Container move completed for container {} to target {}",
+                      containerID,
+                      moveSelection.getTargetNode().getUuidString());
+                }
+              } else {
+                LOG.warn(
+                    "Container move for container {} to target {} failed: {}",
+                    moveSelection.getContainerID(),
+                    moveSelection.getTargetNode().getUuidString(), result);
+              }
+            }
+          });
     } catch (ContainerNotFoundException e) {
-      LOG.warn("Could not find Container {} for container move", container, e);
+      LOG.warn("Could not find Container {} for container move",
+          containerID, e);
       return false;
     } catch (NodeNotFoundException e) {
-      LOG.warn("Container move failed for container {}", container, e);
+      LOG.warn("Container move failed for container {}", containerID, e);
       return false;
     }
+
     if (future.isDone()) {
       if (future.isCompletedExceptionally()) {
-        LOG.info("Container move for container {} from source {} to target {}" +
-                "failed with exceptions",
-            container.toString(),
-            source.getUuidString(),
-            moveSelection.getTargetNode().getUuidString());
         return false;
       } else {
         ReplicationManager.MoveResult result = future.join();
@@ -785,7 +791,8 @@
     this.countDatanodesInvolvedPerIteration = 0;
     this.sizeMovedPerIteration = 0;
     metrics.resetDataSizeMovedGBInLatestIteration();
-    metrics.resetNumContainerMovesInLatestIteration();
+    metrics.resetNumContainerMovesCompletedInLatestIteration();
+    metrics.resetNumContainerMovesTimeoutInLatestIteration();
     metrics.resetNumDatanodesInvolvedInLatestIteration();
     metrics.resetDataSizeUnbalancedGB();
     metrics.resetNumDatanodesUnbalanced();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
index f40dd44..3a7ce49 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
@@ -40,9 +40,13 @@
       " in the latest iteration.")
   private MutableCounterLong dataSizeMovedGBInLatestIteration;
 
-  @Metric(about = "Number of container moves performed by Container Balancer " +
-      "in the latest iteration.")
-  private MutableCounterLong numContainerMovesInLatestIteration;
+  @Metric(about = "Number of completed container moves performed by " +
+      "Container Balancer in the latest iteration.")
+  private MutableCounterLong numContainerMovesCompletedInLatestIteration;
+
+  @Metric(about = "Number of timeout container moves performed by " +
+      "Container Balancer in the latest iteration.")
+  private MutableCounterLong numContainerMovesTimeoutInLatestIteration;
 
   @Metric(about = "Number of iterations that Container Balancer has run for.")
   private MutableCounterLong numIterations;
@@ -57,9 +61,13 @@
   @Metric(about = "Number of unbalanced datanodes.")
   private MutableCounterLong numDatanodesUnbalanced;
 
-  @Metric(about = "Total number of container moves across all iterations of " +
-      "Container Balancer.")
-  private MutableCounterLong numContainerMoves;
+  @Metric(about = "Total number of completed container moves across all " +
+      "iterations of Container Balancer.")
+  private MutableCounterLong numContainerMovesCompleted;
+
+  @Metric(about = "Total number of timeout container moves across " +
+      "all iterations of Container Balancer.")
+  private MutableCounterLong numContainerMovesTimeout;
 
   @Metric(about = "Total data size in GB moved across all iterations of " +
       "Container Balancer.")
@@ -104,17 +112,37 @@
    * latest iteration.
    * @return number of container moves
    */
-  public long getNumContainerMovesInLatestIteration() {
-    return numContainerMovesInLatestIteration.value();
+  public long getNumContainerMovesCompletedInLatestIteration() {
+    return numContainerMovesCompletedInLatestIteration.value();
   }
 
-  public void incrementNumContainerMovesInLatestIteration(long valueToAdd) {
-    this.numContainerMovesInLatestIteration.incr(valueToAdd);
+  public void incrementNumContainerMovesCompletedInLatestIteration(
+      long valueToAdd) {
+    this.numContainerMovesCompletedInLatestIteration.incr(valueToAdd);
   }
 
-  public void resetNumContainerMovesInLatestIteration() {
-    numContainerMovesInLatestIteration.incr(
-        -getNumContainerMovesInLatestIteration());
+  public void resetNumContainerMovesCompletedInLatestIteration() {
+    numContainerMovesCompletedInLatestIteration.incr(
+        -getNumContainerMovesCompletedInLatestIteration());
+  }
+
+  /**
+   * Gets the number of timeout container moves performed by
+   * Container Balancer in the latest iteration.
+   * @return number of timeout container moves
+   */
+  public long getNumContainerMovesTimeoutInLatestIteration() {
+    return numContainerMovesTimeoutInLatestIteration.value();
+  }
+
+  public void incrementNumContainerMovesTimeoutInLatestIteration(
+      long valueToAdd) {
+    this.numContainerMovesTimeoutInLatestIteration.incr(valueToAdd);
+  }
+
+  public void resetNumContainerMovesTimeoutInLatestIteration() {
+    numContainerMovesTimeoutInLatestIteration.incr(
+        -getNumContainerMovesTimeoutInLatestIteration());
   }
 
   /**
@@ -179,12 +207,20 @@
     numDatanodesUnbalanced.incr(-getNumDatanodesUnbalanced());
   }
 
-  public long getNumContainerMoves() {
-    return numContainerMoves.value();
+  public long getNumContainerMovesCompleted() {
+    return numContainerMovesCompleted.value();
   }
 
-  public void incrementNumContainerMoves(long valueToAdd) {
-    numContainerMoves.incr(valueToAdd);
+  public void incrementNumContainerMovesCompleted(long valueToAdd) {
+    numContainerMovesCompleted.incr(valueToAdd);
+  }
+
+  public long getNumContainerMovesTimeout() {
+    return numContainerMovesTimeout.value();
+  }
+
+  public void incrementNumContainerMovesTimeout(long valueToAdd) {
+    numContainerMovesTimeout.incr(valueToAdd);
   }
 
   public long getDataSizeMovedGB() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 6fed1e2..c9eb683 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -29,7 +29,6 @@
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMService;
-import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
@@ -88,9 +87,7 @@
 
 
   BackgroundPipelineCreator(PipelineManager pipelineManager,
-                              ConfigurationSource conf,
-                              SCMServiceManager serviceManager,
-                              SCMContext scmContext) {
+      ConfigurationSource conf, SCMContext scmContext) {
     this.pipelineManager = pipelineManager;
     this.conf = conf;
     this.scmContext = scmContext;
@@ -109,9 +106,6 @@
         ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
         TimeUnit.MILLISECONDS);
 
-    // register BackgroundPipelineCreator to SCMServiceManager
-    serviceManager.register(this);
-
     // start RatisPipelineUtilsThread
     start();
   }
@@ -223,13 +217,6 @@
         continue;
       }
       list.add(replicationConfig);
-      if (!pipelineManager.getSafeModeStatus()) {
-        try {
-          pipelineManager.scrubPipeline(replicationConfig);
-        } catch (IOException e) {
-          LOG.error("Error while scrubbing pipelines.", e);
-        }
-      }
     }
 
     LoopingIterator it = new LoopingIterator(list);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java
new file mode 100644
index 0000000..2063ac3
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Background service to clean up pipelines with following conditions.
+ * - CLOSED
+ * - ALLOCATED for too long
+ */
+public class BackgroundPipelineScrubber implements SCMService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BackgroundPipelineScrubber.class);
+
+  private static final String THREAD_NAME = "PipelineScrubberThread";
+
+  private final PipelineManager pipelineManager;
+  private final ConfigurationSource conf;
+  private final SCMContext scmContext;
+
+  private final Lock serviceLock = new ReentrantLock();
+  private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+
+  private final AtomicBoolean running = new AtomicBoolean(false);
+  private Thread scrubThread;
+  private final long intervalInMillis;
+  private final long waitTimeInMillis;
+  private long lastTimeToBeReadyInMillis = 0;
+
+  public BackgroundPipelineScrubber(PipelineManager pipelineManager,
+      ConfigurationSource conf, SCMContext scmContext) {
+    this.pipelineManager = pipelineManager;
+    this.conf = conf;
+    this.scmContext = scmContext;
+
+    this.intervalInMillis = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.waitTimeInMillis = conf.getTimeDuration(
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    start();
+  }
+
+  @Override
+  public void notifyStatusChanged() {
+    serviceLock.lock();
+    try {
+      if (scmContext.isLeaderReady() && !scmContext.isInSafeMode()) {
+        if (serviceStatus != ServiceStatus.RUNNING) {
+          LOG.info("Service {} transitions to RUNNING.", getServiceName());
+          serviceStatus = ServiceStatus.RUNNING;
+          lastTimeToBeReadyInMillis = Time.monotonicNow();
+        }
+      } else {
+        if (serviceStatus != ServiceStatus.PAUSING) {
+          LOG.info("Service {} transitions to PAUSING.", getServiceName());
+          serviceStatus = ServiceStatus.PAUSING;
+        }
+      }
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean shouldRun() {
+    serviceLock.lock();
+    try {
+      // If safe mode is off, then this SCMService starts to run with a delay.
+      return serviceStatus == ServiceStatus.RUNNING &&
+          Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public String getServiceName() {
+    return BackgroundPipelineScrubber.class.getSimpleName();
+  }
+
+  @Override
+  public void start() {
+    if (!running.compareAndSet(false, true)) {
+      LOG.info("Pipeline Scrubber Service is already running, skip start.");
+      return;
+    }
+    LOG.info("Starting Pipeline Scrubber Service.");
+
+    scrubThread = new Thread(this::run);
+    scrubThread.setName(THREAD_NAME);
+    scrubThread.setDaemon(true);
+    scrubThread.start();
+  }
+
+  @Override
+  public void stop() {
+    synchronized (this) {
+      if (!running.compareAndSet(true, false)) {
+        LOG.info("Pipeline Scrubber Service is not running, skip stop.");
+        return;
+      }
+      notifyAll();
+    }
+    LOG.info("Stopping Pipeline Scrubber Service.");
+  }
+
+  @VisibleForTesting
+  public boolean getRunning() {
+    return running.get();
+  }
+
+  private void run() {
+    while (running.get()) {
+      try {
+        if (shouldRun()) {
+          scrubAllPipelines();
+        }
+        synchronized (this) {
+          wait(intervalInMillis);
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("{} is interrupted, exit", THREAD_NAME);
+        Thread.currentThread().interrupt();
+        running.set(false);
+      }
+    }
+  }
+
+  private void scrubAllPipelines() {
+    try {
+      pipelineManager.scrubPipelines();
+    } catch (IOException e) {
+      LOG.error("Unexpected error during pipeline scrubbing", e);
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 6a50876..ffce314 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -111,8 +111,7 @@
 
   void closePipeline(Pipeline pipeline, boolean onTimeout) throws IOException;
 
-  void scrubPipeline(ReplicationConfig replicationConfig)
-      throws IOException;
+  void scrubPipelines() throws IOException;
 
   void startPipelineCreator();
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 19b7a51..83b037c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -76,6 +76,7 @@
   private PipelineFactory pipelineFactory;
   private PipelineStateManager stateManager;
   private BackgroundPipelineCreator backgroundPipelineCreator;
+  private BackgroundPipelineScrubber backgroundPipelineScrubber;
   private final ConfigurationSource conf;
   private final EventPublisher eventPublisher;
   // Pipeline Manager MXBean
@@ -141,10 +142,16 @@
 
     // Create background thread.
     BackgroundPipelineCreator backgroundPipelineCreator =
-        new BackgroundPipelineCreator(
-            pipelineManager, conf, serviceManager, scmContext);
+        new BackgroundPipelineCreator(pipelineManager, conf, scmContext);
+
+    BackgroundPipelineScrubber backgroundPipelineScrubber =
+        new BackgroundPipelineScrubber(pipelineManager, conf, scmContext);
 
     pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
+    pipelineManager.setBackgroundPipelineScrubber(backgroundPipelineScrubber);
+
+    serviceManager.register(backgroundPipelineCreator);
+    serviceManager.register(backgroundPipelineScrubber);
 
     return pipelineManager;
   }
@@ -408,15 +415,14 @@
    * Scrub pipelines.
    */
   @Override
-  public void scrubPipeline(ReplicationConfig config)
-      throws IOException {
+  public void scrubPipelines() throws IOException {
     Instant currentTime = Instant.now();
     Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
         ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
         ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
         TimeUnit.MILLISECONDS);
 
-    List<Pipeline> candidates = stateManager.getPipelines(config);
+    List<Pipeline> candidates = stateManager.getPipelines();
 
     for (Pipeline p : candidates) {
       // scrub pipelines who stay ALLOCATED for too long.
@@ -437,7 +443,6 @@
         removePipeline(p);
       }
     }
-    return;
   }
 
   /**
@@ -591,6 +596,9 @@
     if (backgroundPipelineCreator != null) {
       backgroundPipelineCreator.stop();
     }
+    if (backgroundPipelineScrubber != null) {
+      backgroundPipelineScrubber.stop();
+    }
 
     if (pmInfoBean != null) {
       MBeans.unregister(this.pmInfoBean);
@@ -639,6 +647,16 @@
     return this.backgroundPipelineCreator;
   }
 
+  private void setBackgroundPipelineScrubber(
+      BackgroundPipelineScrubber backgroundPipelineScrubber) {
+    this.backgroundPipelineScrubber = backgroundPipelineScrubber;
+  }
+
+  @VisibleForTesting
+  public BackgroundPipelineScrubber getBackgroundPipelineScrubber() {
+    return this.backgroundPipelineScrubber;
+  }
+
   @VisibleForTesting
   public PipelineFactory getPipelineFactory() {
     return pipelineFactory;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
index 0163152..48069c5 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
@@ -642,6 +642,40 @@
     containerBalancer.stop();
   }
 
+  @Test
+  public void checkIterationResultTimeout()
+      throws NodeNotFoundException, ContainerNotFoundException {
+
+    Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
+            Mockito.any(DatanodeDetails.class),
+            Mockito.any(DatanodeDetails.class)))
+        .thenReturn(genCompletableFuture(500), genCompletableFuture(2000));
+
+    balancerConfiguration.setThreshold(10);
+    balancerConfiguration.setIterations(1);
+    balancerConfiguration.setMaxSizeEnteringTarget(10 * OzoneConsts.GB);
+    balancerConfiguration.setMaxSizeToMovePerIteration(100 * OzoneConsts.GB);
+    balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
+    balancerConfiguration.setMoveTimeout(Duration.ofMillis(1000));
+
+    startBalancer(balancerConfiguration);
+    sleepWhileBalancing(2000);
+
+    /*
+    According to the setup and configurations, this iteration's result should
+    be ITERATION_COMPLETED.
+     */
+    Assert.assertEquals(ContainerBalancer.IterationResult.ITERATION_COMPLETED,
+        containerBalancer.getIterationResult());
+    Assert.assertEquals(1,
+        containerBalancer.getMetrics()
+            .getNumContainerMovesCompletedInLatestIteration());
+    Assert.assertTrue(containerBalancer.getMetrics()
+            .getNumContainerMovesTimeoutInLatestIteration() > 1);
+    containerBalancer.stop();
+
+  }
+
   /**
    * Determines unBalanced nodes, that is, over and under utilized nodes,
    * according to the generated utilization values for nodes and the threshold.
@@ -834,4 +868,16 @@
     }
   }
 
+  private CompletableFuture<ReplicationManager.MoveResult>
+      genCompletableFuture(int sleepMilSec) {
+    return CompletableFuture.supplyAsync(() -> {
+      try {
+        Thread.sleep(sleepMilSec);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      return ReplicationManager.MoveResult.COMPLETED;
+    });
+  }
+
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 523efc0..932d2d5 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -218,8 +218,7 @@
   }
 
   @Override
-  public void scrubPipeline(ReplicationConfig replicationConfig)
-      throws IOException {
+  public void scrubPipelines() {
 
   }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java
new file mode 100644
index 0000000..dfa5284
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test for {@link BackgroundPipelineScrubber}.
+ */
+public class TestBackgroundPipelineScrubber {
+
+  private BackgroundPipelineScrubber scrubber;
+  private SCMContext scmContext;
+  private PipelineManager pipelineManager;
+  private OzoneConfiguration conf;
+
+  @Before
+  public void setup() throws IOException {
+    this.scmContext = SCMContext.emptyContext();
+    this.pipelineManager = mock(PipelineManager.class);
+    doNothing().when(pipelineManager).scrubPipelines();
+
+    // no initial delay after exit safe mode
+    this.conf = new OzoneConfiguration();
+    conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "0ms");
+
+    this.scrubber = new BackgroundPipelineScrubber(pipelineManager, conf,
+        scmContext);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    scrubber.stop();
+  }
+
+  @Test
+  public void testStop() {
+    assertTrue(scrubber.getRunning());
+    scrubber.stop();
+    assertFalse(scrubber.getRunning());
+  }
+
+  @Test
+  public void testNotifyStatusChanged() {
+    // init at PAUSING
+    assertFalse(scrubber.shouldRun());
+
+    // out of safe mode, PAUSING -> RUNNING
+    scrubber.notifyStatusChanged();
+    assertTrue(scrubber.shouldRun());
+
+    // go into safe mode, RUNNING -> PAUSING
+    scmContext.updateSafeModeStatus(new SafeModeStatus(true, true));
+    scrubber.notifyStatusChanged();
+    assertFalse(scrubber.shouldRun());
+  }
+
+  @Test
+  public void testRun() throws IOException {
+    assertFalse(scrubber.shouldRun());
+    // kick a run
+    synchronized (scrubber) {
+      scrubber.notifyStatusChanged();
+      assertTrue(scrubber.shouldRun());
+      scrubber.notifyAll();
+    }
+    verify(pipelineManager, timeout(3000).times(1)).scrubPipelines();
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index fd29929..64ef02e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -535,7 +535,7 @@
   }
 
   @Test
-  public void testScrubPipeline() throws Exception {
+  public void testScrubPipelines() throws Exception {
     // No timeout for pipeline scrubber.
     conf.setTimeDuration(
         OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
@@ -543,33 +543,50 @@
 
     PipelineManagerImpl pipelineManager = createPipelineManager(true);
     pipelineManager.setScmContext(scmContext);
-    Pipeline pipeline = pipelineManager
+    Pipeline allocatedPipeline = pipelineManager
         .createPipeline(RatisReplicationConfig
             .getInstance(ReplicationFactor.THREE));
     // At this point, pipeline is not at OPEN stage.
     Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
-        pipeline.getPipelineState());
+        allocatedPipeline.getPipelineState());
 
     // pipeline should be seen in pipelineManager as ALLOCATED.
     Assert.assertTrue(pipelineManager
         .getPipelines(RatisReplicationConfig
             .getInstance(ReplicationFactor.THREE),
-            Pipeline.PipelineState.ALLOCATED).contains(pipeline));
-    pipelineManager
-        .scrubPipeline(RatisReplicationConfig
-            .getInstance(ReplicationFactor.THREE));
+            Pipeline.PipelineState.ALLOCATED).contains(allocatedPipeline));
 
-    // pipeline should be scrubbed.
+    Pipeline closedPipeline = pipelineManager
+        .createPipeline(RatisReplicationConfig
+            .getInstance(ReplicationFactor.THREE));
+    pipelineManager.openPipeline(closedPipeline.getId());
+    pipelineManager.closePipeline(closedPipeline, true);
+
+    // pipeline should be seen in pipelineManager as CLOSED.
+    Assert.assertTrue(pipelineManager
+        .getPipelines(RatisReplicationConfig
+                .getInstance(ReplicationFactor.THREE),
+            Pipeline.PipelineState.CLOSED).contains(closedPipeline));
+
+    pipelineManager.scrubPipelines();
+
+    // The allocatedPipeline should be scrubbed.
     Assert.assertFalse(pipelineManager
         .getPipelines(RatisReplicationConfig
             .getInstance(ReplicationFactor.THREE),
-            Pipeline.PipelineState.ALLOCATED).contains(pipeline));
+            Pipeline.PipelineState.ALLOCATED).contains(allocatedPipeline));
+
+    // The closedPipeline should be scrubbed.
+    Assert.assertFalse(pipelineManager
+        .getPipelines(RatisReplicationConfig
+                .getInstance(ReplicationFactor.THREE),
+            Pipeline.PipelineState.CLOSED).contains(closedPipeline));
 
     pipelineManager.close();
   }
 
   @Test
-  public void testScrubPipelineShouldFailOnFollower() throws Exception {
+  public void testScrubPipelinesShouldFailOnFollower() throws Exception {
     // No timeout for pipeline scrubber.
     conf.setTimeDuration(
         OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
@@ -595,9 +612,7 @@
     ((SCMHAManagerStub) pipelineManager.getScmhaManager()).setIsLeader(false);
 
     try {
-      pipelineManager
-          .scrubPipeline(RatisReplicationConfig
-              .getInstance(ReplicationFactor.THREE));
+      pipelineManager.scrubPipelines();
     } catch (NotLeaderException ex) {
       pipelineManager.close();
       return;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 1062274..823c2dd 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -354,6 +354,7 @@
     pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
     pipelineManager.getBackgroundPipelineCreator().stop();
+    pipelineManager.getBackgroundPipelineScrubber().stop();
 
     for (int i = 0; i < pipelineCount; i++) {
       // Create pipeline
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
index 37bea18..25ed0e5 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
@@ -63,11 +63,6 @@
       description = "Format output as JSON")
   private boolean json;
 
-  @CommandLine.Option(names = { "--replicas" },
-      defaultValue = "false",
-      description = "Adds replica related details")
-  private boolean addReplicaDetails;
-
   @Parameters(description = "Decimal id of the container.")
   private long containerID;
 
@@ -108,7 +103,7 @@
       LOG.info("Datanodes: [{}]", machinesStr);
 
       // Print the replica details if available
-      if (addReplicaDetails && replicas != null) {
+      if (replicas != null) {
         String replicaStr = replicas.stream().map(
             InfoSubcommand::buildReplicaDetails)
             .collect(Collectors.joining(",\n"));
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReportSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReportSubcommand.java
index 89b1a11..554316c 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReportSubcommand.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReportSubcommand.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.hdds.scm.client.ScmClient;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.server.JsonUtils;
 import picocli.CommandLine;
 
 import java.io.IOException;
@@ -43,9 +44,20 @@
   @CommandLine.Spec
   private CommandLine.Model.CommandSpec spec;
 
+  @CommandLine.Option(names = { "--json" },
+      defaultValue = "false",
+      description = "Format output as JSON")
+  private boolean json;
+
   @Override
   public void execute(ScmClient scmClient) throws IOException {
     ReplicationManagerReport report = scmClient.getReplicationManagerReport();
+
+    if (json) {
+      output(JsonUtils.toJsonStringWithDefaultPrettyPrinter(report));
+      return;
+    }
+
     outputHeader(report.getReportTimeStamp());
     blankLine();
     outputContainerStats(report);
diff --git a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
index c604019..10b5758 100644
--- a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
+++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
@@ -97,7 +97,7 @@
         .thenReturn(getReplicas(includeIndex));
     cmd = new InfoSubcommand();
     CommandLine c = new CommandLine(cmd);
-    c.parseArgs("1", "--replicas");
+    c.parseArgs("1");
     cmd.execute(scmClient);
 
     // Ensure we have a line for Replicas:
diff --git a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestReportSubCommand.java b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestReportSubCommand.java
index be0e2c8..df8dd95 100644
--- a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestReportSubCommand.java
+++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestReportSubCommand.java
@@ -17,14 +17,18 @@
  */
 package org.apache.hadoop.hdds.scm.cli.container;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.client.ScmClient;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import picocli.CommandLine;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -88,6 +92,26 @@
   }
 
   @Test
+  public void testValidJsonOutput() throws IOException {
+    // More complete testing of the Report JSON output is in
+    // TestReplicationManagerReport.
+    ScmClient scmClient = mock(ScmClient.class);
+    Mockito.when(scmClient.getReplicationManagerReport())
+        .thenAnswer(invocation -> new ReplicationManagerReport());
+
+    CommandLine c = new CommandLine(cmd);
+    c.parseArgs("--json");
+    cmd.execute(scmClient);
+
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode json = mapper.readTree(outContent.toString("UTF-8"));
+
+    Assert.assertTrue(json.get("reportTimeStamp") != null);
+    Assert.assertTrue(json.get("stats") != null);
+    Assert.assertTrue(json.get("samples") != null);
+  }
+
+  @Test
   public void testCorrectValuesAppearInReport() throws IOException {
     ScmClient scmClient = mock(ScmClient.class);
     Mockito.when(scmClient.getReplicationManagerReport())
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
index 9df7518..9ff1ca9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
@@ -19,156 +19,72 @@
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.UnknownPipelineStateException;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
 import org.apache.hadoop.ozone.protocolPB.OzonePBHelper;
 import org.apache.hadoop.security.token.Token;
 
-import java.util.Objects;
-
 /**
  * One key can be too huge to fit in one container. In which case it gets split
  * into a number of subkeys. This class represents one such subkey instance.
  */
-public final class OmKeyLocationInfo {
-  private final BlockID blockID;
-  // the id of this subkey in all the subkeys.
-  private long length;
-  private final long offset;
-  // Block token, required for client authentication when security is enabled.
-  private Token<OzoneBlockTokenIdentifier> token;
-  // the version number indicating when this block was added
-  private long createVersion;
+public final class OmKeyLocationInfo extends BlockLocationInfo {
 
-  private Pipeline pipeline;
-
-  // PartNumber is set for Multipart upload Keys.
-  private int partNumber = -1;
-
-  private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length,
-                            long offset, int partNumber) {
-    this.blockID = blockID;
-    this.pipeline = pipeline;
-    this.length = length;
-    this.offset = offset;
-    this.partNumber = partNumber;
-  }
-
-  private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length,
-      long offset, Token<OzoneBlockTokenIdentifier> token, int partNumber) {
-    this.blockID = blockID;
-    this.pipeline = pipeline;
-    this.length = length;
-    this.offset = offset;
-    this.token = token;
-    this.partNumber = partNumber;
-  }
-
-  public void setCreateVersion(long version) {
-    createVersion = version;
-  }
-
-  public long getCreateVersion() {
-    return createVersion;
-  }
-
-  public BlockID getBlockID() {
-    return blockID;
-  }
-
-  public long getContainerID() {
-    return blockID.getContainerID();
-  }
-
-  public long getLocalID() {
-    return blockID.getLocalID();
-  }
-
-  public Pipeline getPipeline() {
-    return pipeline;
-  }
-
-  public long getLength() {
-    return length;
-  }
-
-  public void setLength(long length) {
-    this.length = length;
-  }
-
-  public long getOffset() {
-    return offset;
-  }
-
-  public long getBlockCommitSequenceId() {
-    return blockID.getBlockCommitSequenceId();
-  }
-
-  public Token<OzoneBlockTokenIdentifier> getToken() {
-    return token;
-  }
-
-  public void setToken(Token<OzoneBlockTokenIdentifier> token) {
-    this.token = token;
-  }
-
-  public void setPipeline(Pipeline pipeline) {
-    this.pipeline = pipeline;
-  }
-
-  public void setPartNumber(int partNumber) {
-    this.partNumber = partNumber;
-  }
-
-  public int getPartNumber() {
-    return partNumber;
+  private OmKeyLocationInfo(Builder builder) {
+    super(builder);
   }
 
   /**
    * Builder of OmKeyLocationInfo.
    */
-  public static class Builder {
-    private BlockID blockID;
-    private long length;
-    private long offset;
-    private Token<OzoneBlockTokenIdentifier> token;
-    private Pipeline pipeline;
-    private int partNumber;
+  public static class Builder extends BlockLocationInfo.Builder {
 
-    public Builder setBlockID(BlockID blockId) {
-      this.blockID = blockId;
+    @Override
+    public Builder setBlockID(BlockID blockID) {
+      super.setBlockID(blockID);
       return this;
     }
 
-    @SuppressWarnings("checkstyle:hiddenfield")
+    @Override
     public Builder setPipeline(Pipeline pipeline) {
-      this.pipeline = pipeline;
+      super.setPipeline(pipeline);
       return this;
     }
 
-    public Builder setLength(long len) {
-      this.length = len;
+    @Override
+    public Builder setLength(long length) {
+      super.setLength(length);
       return this;
     }
 
-    public Builder setOffset(long off) {
-      this.offset = off;
+    @Override
+    public Builder setOffset(long offset) {
+      super.setOffset(offset);
       return this;
     }
 
-    public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
-      this.token = bToken;
+    @Override
+    public Builder setToken(Token<OzoneBlockTokenIdentifier> token) {
+      super.setToken(token);
       return this;
     }
 
-    public Builder setPartNumber(int partNum) {
-      this.partNumber = partNum;
+    @Override
+    public Builder setPartNumber(int partNumber) {
+      super.setPartNumber(partNumber);
       return this;
     }
 
+    @Override
+    public Builder setCreateVersion(long version) {
+      super.setCreateVersion(version);
+      return this;
+    }
+
+    @Override
     public OmKeyLocationInfo build() {
-      return new OmKeyLocationInfo(blockID, pipeline, length, offset, token,
-          partNumber);
+      return new OmKeyLocationInfo(this);
     }
   }
 
@@ -178,13 +94,15 @@
 
   public KeyLocation getProtobuf(boolean ignorePipeline, int clientVersion) {
     KeyLocation.Builder builder = KeyLocation.newBuilder()
-        .setBlockID(blockID.getProtobuf())
-        .setLength(length)
-        .setOffset(offset)
-        .setCreateVersion(createVersion).setPartNumber(partNumber);
+        .setBlockID(getBlockID().getProtobuf())
+        .setLength(getLength())
+        .setOffset(getOffset())
+        .setCreateVersion(getCreateVersion())
+        .setPartNumber(getPartNumber());
     if (!ignorePipeline) {
       try {
-        if (this.token != null) {
+        Token<OzoneBlockTokenIdentifier> token = getToken();
+        if (token != null) {
           builder.setToken(OzonePBHelper.protoFromToken(token));
         }
 
@@ -200,7 +118,8 @@
         // TODO: this needs to be revisited when bucket versioning
         //  implementation is handled.
 
-        if (this.pipeline != null) {
+        Pipeline pipeline = getPipeline();
+        if (pipeline != null) {
           builder.setPipeline(pipeline.getProtobufMessage(clientVersion));
         }
       } catch (UnknownPipelineStateException e) {
@@ -220,51 +139,18 @@
   }
 
   public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
-    OmKeyLocationInfo info = new OmKeyLocationInfo(
-        BlockID.getFromProtobuf(keyLocation.getBlockID()),
-        getPipeline(keyLocation),
-        keyLocation.getLength(),
-        keyLocation.getOffset(), keyLocation.getPartNumber());
+    Builder builder = new Builder()
+        .setBlockID(BlockID.getFromProtobuf(keyLocation.getBlockID()))
+        .setLength(keyLocation.getLength())
+        .setOffset(keyLocation.getOffset())
+        .setPipeline(getPipeline(keyLocation))
+        .setCreateVersion(keyLocation.getCreateVersion())
+        .setPartNumber(keyLocation.getPartNumber());
     if (keyLocation.hasToken()) {
-      info.token = (Token<OzoneBlockTokenIdentifier>)
-              OzonePBHelper.tokenFromProto(keyLocation.getToken());
+      builder.setToken((Token<OzoneBlockTokenIdentifier>)
+          OzonePBHelper.tokenFromProto(keyLocation.getToken()));
     }
-    info.setCreateVersion(keyLocation.getCreateVersion());
-    return info;
+    return builder.build();
   }
 
-  @Override
-  public String  toString() {
-    return "{blockID={containerID=" + blockID.getContainerID() +
-        ", localID=" + blockID.getLocalID() + "}" +
-        ", length=" + length +
-        ", offset=" + offset +
-        ", token=" + token +
-        ", pipeline=" + pipeline +
-        ", createVersion=" + createVersion  + ", partNumber=" + partNumber
-        + '}';
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    OmKeyLocationInfo that = (OmKeyLocationInfo) o;
-    return length == that.length &&
-        offset == that.offset &&
-        createVersion == that.createVersion &&
-        Objects.equals(blockID, that.blockID) &&
-        Objects.equals(token, that.token) &&
-        Objects.equals(pipeline, that.pipeline);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(blockID, length, offset, token, createVersion,
-        pipeline);
-  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
index 11dc5b2..f597150 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
@@ -585,6 +585,10 @@
     omLockMetrics.unRegister();
   }
 
+  public OMLockMetrics getOMLockMetrics() {
+    return omLockMetrics;
+  }
+
   /**
    * Resource defined in Ozone.
    */
diff --git a/hadoop-ozone/dev-support/checks/build.sh b/hadoop-ozone/dev-support/checks/build.sh
index 26d66b8..42edeb5 100755
--- a/hadoop-ozone/dev-support/checks/build.sh
+++ b/hadoop-ozone/dev-support/checks/build.sh
@@ -17,5 +17,5 @@
 cd "$DIR/../../.." || exit 1
 
 export MAVEN_OPTS="-Xmx4096m $MAVEN_OPTS"
-mvn -V -B -Dmaven.javadoc.skip=true -DskipTests clean install "$@"
+mvn -V -B -Dmaven.javadoc.skip=true -DskipTests -DskipDocs clean install "$@"
 exit $?
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
index 4c4bdc6..0bbf8c9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
@@ -90,6 +90,7 @@
 
 import org.apache.ozone.test.LambdaTestUtils;
 import org.apache.ozone.test.TestClock;
+import org.apache.ozone.test.tag.Flaky;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -1297,6 +1298,7 @@
    * since fs.rename(src,dst,options) is enabled.
    */
   @Test
+  @Flaky("HDDS-6646")
   public void testRenameToTrashEnabled() throws Exception {
     // Create a file
     String testKeyName = "testKey1";
@@ -1326,6 +1328,7 @@
    * 2.Verify that the key gets deleted by the trash emptier.
    */
   @Test
+  @Flaky("HDDS-6645")
   public void testTrash() throws Exception {
     String testKeyName = "testKey2";
     Path path = new Path(OZONE_URI_DELIMITER, testKeyName);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
index 29f3326..695d22d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
@@ -61,6 +61,7 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.LambdaTestUtils;
+import org.apache.ozone.test.tag.Flaky;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -1260,6 +1261,7 @@
    * fs.rename(src, dst, options).
    */
   @Test
+  @Flaky({"HDDS-5819", "HDDS-6451"})
   public void testRenameToTrashEnabled() throws IOException {
     // Create a file
     String testKeyName = "testKey2";
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
index 0503dbe..f3649ea 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
@@ -26,10 +26,10 @@
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.Token;
 import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
 import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
@@ -57,7 +57,7 @@
   private ECStreamTestUtil() {
   }
 
-  public static OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+  public static BlockLocationInfo createKeyInfo(ReplicationConfig repConf,
       long blockLength, Map<DatanodeDetails, Integer> dnMap) {
 
     Pipeline pipeline = Pipeline.newBuilder()
@@ -68,7 +68,7 @@
         .setReplicationConfig(repConf)
         .build();
 
-    OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
+    BlockLocationInfo keyInfo = new BlockLocationInfo.Builder()
         .setBlockID(new BlockID(1, 1))
         .setLength(blockLength)
         .setOffset(0)
@@ -78,7 +78,7 @@
     return keyInfo;
   }
 
-  public static OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+  public static BlockLocationInfo createKeyInfo(ReplicationConfig repConf,
       int nodeCount, long blockLength) {
     Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
     for (int i = 0; i < nodeCount; i++) {
@@ -257,7 +257,7 @@
 
     public synchronized BlockExtendedInputStream create(
         ReplicationConfig repConfig,
-        OmKeyLocationInfo blockInfo, Pipeline pipeline,
+        BlockLocationInfo blockInfo, Pipeline pipeline,
         Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
         XceiverClientFactory xceiverFactory,
         Function<BlockID, Pipeline> refreshFunction) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
index 17a0a6f..a9f9407 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
@@ -25,12 +25,12 @@
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.ozone.client.io.BadDataLocationException;
 import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
 import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.Token;
 import org.junit.Assert;
 import org.junit.Before;
@@ -65,7 +65,7 @@
   @Test
   public void testSufficientLocations() {
     // EC-3-2, 5MB block, so all 3 data locations are needed
-    OmKeyLocationInfo keyInfo = ECStreamTestUtil
+    BlockLocationInfo keyInfo = ECStreamTestUtil
         .createKeyInfo(repConfig, 5, 5 * ONEMB);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
@@ -116,7 +116,7 @@
   public void testCorrectBlockSizePassedToBlockStreamLessThanCell()
       throws IOException {
     ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB - 100);
 
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
@@ -133,7 +133,7 @@
   public void testCorrectBlockSizePassedToBlockStreamTwoCells()
       throws IOException {
     ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB + 100);
 
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
@@ -149,7 +149,7 @@
   public void testCorrectBlockSizePassedToBlockStreamThreeCells()
       throws IOException {
     ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
 
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
@@ -166,7 +166,7 @@
   public void testCorrectBlockSizePassedToBlockStreamThreeFullAndPartialStripe()
       throws IOException {
     ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
 
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
@@ -183,7 +183,7 @@
   public void testCorrectBlockSizePassedToBlockStreamSingleFullCell()
       throws IOException {
     ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB);
 
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
@@ -198,7 +198,7 @@
   public void testCorrectBlockSizePassedToBlockStreamSeveralFullCells()
       throws IOException {
     ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, 9 * ONEMB);
 
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
@@ -213,7 +213,7 @@
 
   @Test
   public void testSimpleRead() throws IOException {
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -236,7 +236,7 @@
    */
   @Test
   public void testSimpleReadUnderOneChunk() throws IOException {
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 1, ONEMB);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -255,7 +255,7 @@
 
   @Test
   public void testReadPastEOF() throws IOException {
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, 50);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -274,7 +274,7 @@
     // EC-3-2, 5MB block, so all 3 data locations are needed
     repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
         100);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -309,7 +309,7 @@
   public void testSeekPastBlockLength() throws IOException {
     repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
         ONEMB);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, 100);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -321,7 +321,7 @@
   public void testSeekToLength() throws IOException {
     repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
         ONEMB);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, 100);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -333,7 +333,7 @@
   public void testSeekToLengthZeroLengthBlock() throws IOException {
     repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
         ONEMB);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, 0);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -347,7 +347,7 @@
   public void testSeekToValidPosition() throws IOException {
     repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
         ONEMB);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -376,7 +376,7 @@
   public void testErrorReadingBlockReportsBadLocation() throws IOException {
     repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
         ONEMB);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
@@ -417,7 +417,7 @@
     }
 
     public synchronized BlockExtendedInputStream create(
-        ReplicationConfig repConfig, OmKeyLocationInfo blockInfo,
+        ReplicationConfig repConfig, BlockLocationInfo blockInfo,
         Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
         boolean verifyChecksum, XceiverClientFactory xceiverFactory,
         Function<BlockID, Pipeline> refreshFunction) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java
index fd4e8ad..a45ab43 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java
@@ -24,10 +24,10 @@
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.ozone.client.io.BadDataLocationException;
 import org.apache.hadoop.ozone.client.io.ECBlockInputStreamFactory;
 import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -93,7 +93,7 @@
   public void testAvailableDataLocations() {
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
-    OmKeyLocationInfo blockInfo =
+    BlockLocationInfo blockInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
     Assert.assertEquals(1, ECBlockInputStreamProxy.availableDataLocations(
         blockInfo.getPipeline(), 1));
@@ -120,7 +120,7 @@
 
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
-    OmKeyLocationInfo blockInfo =
+    BlockLocationInfo blockInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
 
     try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
@@ -135,7 +135,7 @@
 
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
-    OmKeyLocationInfo blockInfo =
+    BlockLocationInfo blockInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
 
     try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
@@ -150,7 +150,7 @@
 
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
-    OmKeyLocationInfo blockInfo =
+    BlockLocationInfo blockInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
 
     dataGenerator = new SplittableRandom(randomSeed);
@@ -172,7 +172,7 @@
 
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
-    OmKeyLocationInfo blockInfo =
+    BlockLocationInfo blockInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
 
     try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
@@ -203,7 +203,7 @@
 
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
-    OmKeyLocationInfo blockInfo =
+    BlockLocationInfo blockInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
 
     ByteBuffer readBuffer = ByteBuffer.allocate(100);
@@ -231,7 +231,7 @@
 
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(2, 3, 4, 5);
-    OmKeyLocationInfo blockInfo =
+    BlockLocationInfo blockInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
 
     ByteBuffer readBuffer = ByteBuffer.allocate(100);
@@ -259,7 +259,7 @@
 
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
-    OmKeyLocationInfo blockInfo =
+    BlockLocationInfo blockInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
 
     ByteBuffer readBuffer = ByteBuffer.allocate(100);
@@ -301,7 +301,7 @@
 
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
-    OmKeyLocationInfo blockInfo =
+    BlockLocationInfo blockInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
 
     ByteBuffer readBuffer = ByteBuffer.allocate(100);
@@ -352,7 +352,7 @@
   }
 
   private ECBlockInputStreamProxy createBISProxy(ECReplicationConfig rConfig,
-      OmKeyLocationInfo blockInfo) {
+      BlockLocationInfo blockInfo) {
     return new ECBlockInputStreamProxy(
         rConfig, blockInfo, true, null, null, streamFactory);
   }
@@ -382,7 +382,7 @@
     @Override
     public BlockExtendedInputStream create(boolean missingLocations,
         List<DatanodeDetails> failedDatanodes,
-        ReplicationConfig repConfig, OmKeyLocationInfo blockInfo,
+        ReplicationConfig repConfig, BlockLocationInfo blockInfo,
         boolean verifyChecksum, XceiverClientFactory xceiverFactory,
         Function<BlockID, Pipeline> refreshFunction) {
       this.failedLocations = failedDatanodes;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
index ee3f50a..823ddd5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
@@ -20,11 +20,11 @@
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.ozone.client.io.ECBlockReconstructedInputStream;
 import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -72,7 +72,7 @@
 
   private ECBlockReconstructedStripeInputStream createStripeInputStream(
       Map<DatanodeDetails, Integer> dnMap, long blockLength) {
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
     return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index 0eff8f9..b111b12 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -19,6 +19,7 @@
 
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
@@ -26,7 +27,6 @@
 import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
 import org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputStreamFactory;
 import org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputStream;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -82,7 +82,7 @@
   @Test
   public void testSufficientLocations() throws IOException {
     // One chunk, only 1 location.
-    OmKeyLocationInfo keyInfo = ECStreamTestUtil
+    BlockLocationInfo keyInfo = ECStreamTestUtil
         .createKeyInfo(repConfig, 1, ONEMB);
     try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
       Assert.assertTrue(ecb.hasSufficientLocations());
@@ -168,7 +168,7 @@
       streamFactory = new TestBlockInputStreamFactory();
       addDataStreamsToFactory(dataBufs, parity);
 
-      OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+      BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
           stripeSize() * 3 + partialStripeSize, dnMap);
       streamFactory.setCurrentPipeline(keyInfo.getPipeline());
 
@@ -223,7 +223,7 @@
     // from the parity and padded blocks 2 and 3.
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(4, 5);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
     dataGen = new SplittableRandom(randomSeed);
@@ -265,7 +265,7 @@
     // from the parity and padded blocks 2 and 3.
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(4, 5);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
     dataGen = new SplittableRandom(randomSeed);
@@ -322,7 +322,7 @@
       addDataStreamsToFactory(dataBufs, parity);
       ByteBuffer[] bufs = allocateByteBuffers(repConfig);
 
-      OmKeyLocationInfo keyInfo =
+      BlockLocationInfo keyInfo =
           ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
       streamFactory.setCurrentPipeline(keyInfo.getPipeline());
       dataGen = new SplittableRandom(randomSeed);
@@ -365,7 +365,7 @@
     // from the parity and padded blocks 2 and 3.
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(4, 5);
-    OmKeyLocationInfo keyInfo =
+    BlockLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
     try (ECBlockReconstructedStripeInputStream ecb =
@@ -407,7 +407,7 @@
       streamFactory = new TestBlockInputStreamFactory();
       addDataStreamsToFactory(dataBufs, parity);
 
-      OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+      BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
           stripeSize() * 3 + partialStripeSize, dnMap);
       streamFactory.setCurrentPipeline(keyInfo.getPipeline());
 
@@ -461,7 +461,7 @@
   public void testSeekToPartialOffsetFails() {
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(1, 4, 5);
-    OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+    BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
         stripeSize() * 3, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
 
@@ -503,7 +503,7 @@
       // Data block index 3 is missing and needs recovered initially.
       Map<DatanodeDetails, Integer> dnMap =
           ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
-      OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+      BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
           stripeSize() * 3 + partialStripeSize, dnMap);
       streamFactory.setCurrentPipeline(keyInfo.getPipeline());
 
@@ -583,7 +583,7 @@
     // when containers are reported by SCM.
     Map<DatanodeDetails, Integer> dnMap =
           ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
-    OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+    BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
         blockLength, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
 
@@ -609,7 +609,7 @@
 
     Map<DatanodeDetails, Integer> dnMap =
         ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
-    OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+    BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
         stripeSize() * 3 + partialStripeSize, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
 
@@ -643,7 +643,7 @@
   }
 
   private ECBlockReconstructedStripeInputStream createInputStream(
-      OmKeyLocationInfo keyInfo) {
+      BlockLocationInfo keyInfo) {
     return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
         null, null, streamFactory, bufferPool, ecReconstructExecutor);
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOmBucketReadWriteFileOps.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOmBucketReadWriteFileOps.java
new file mode 100644
index 0000000..a87b6a6
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOmBucketReadWriteFileOps.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.lock.OMLockMetrics;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Test for OmBucketReadWriteFileOps.
+ */
+public class TestOmBucketReadWriteFileOps {
+
+  private String path;
+  private OzoneConfiguration conf = null;
+  private MiniOzoneCluster cluster = null;
+  private ObjectStore store = null;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestOmBucketReadWriteFileOps.class);
+
+  @Before
+  public void setup() {
+    path = GenericTestUtils
+        .getTempPath(TestOmBucketReadWriteFileOps.class.getSimpleName());
+    GenericTestUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
+    File baseDir = new File(path);
+    baseDir.mkdirs();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  private void shutdown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+      FileUtils.deleteDirectory(new File(path));
+    }
+  }
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  private void startCluster() throws Exception {
+    conf = getOzoneConfiguration();
+    conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
+        BucketLayout.LEGACY.name());
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
+    cluster.waitForClusterToBeReady();
+    cluster.waitTobeOutOfSafeMode();
+
+    store = OzoneClientFactory.getRpcClient(conf).getObjectStore();
+  }
+
+  protected OzoneConfiguration getOzoneConfiguration() {
+    return new OzoneConfiguration();
+  }
+
+  @Test
+  public void testOmBucketReadWriteFileOps() throws Exception {
+    try {
+      startCluster();
+      FileOutputStream out = FileUtils.openOutputStream(new File(path,
+          "conf"));
+      cluster.getConf().writeXml(out);
+      out.getFD().sync();
+      out.close();
+
+      verifyFreonCommand(new ParameterBuilder().setTotalThreadCount(10)
+          .setNumOfReadOperations(10).setNumOfWriteOperations(5)
+          .setFileCountForRead(10).setFileCountForWrite(5));
+      verifyFreonCommand(
+          new ParameterBuilder().setVolumeName("vol2").setBucketName("bucket1")
+              .setPrefixFilePath("/dir1/dir2/dir3").setTotalThreadCount(10)
+              .setNumOfReadOperations(10).setNumOfWriteOperations(5)
+              .setFileCountForRead(10).setFileCountForWrite(5));
+      verifyFreonCommand(
+          new ParameterBuilder().setVolumeName("vol3").setBucketName("bucket1")
+              .setPrefixFilePath("/").setTotalThreadCount(15)
+              .setNumOfReadOperations(5).setNumOfWriteOperations(3)
+              .setFileCountForRead(5).setFileCountForWrite(3));
+      verifyFreonCommand(
+          new ParameterBuilder().setVolumeName("vol4").setBucketName("bucket1")
+              .setPrefixFilePath("/dir1/").setTotalThreadCount(10)
+              .setNumOfReadOperations(5).setNumOfWriteOperations(3)
+              .setFileCountForRead(5).setFileCountForWrite(3).
+              setFileSizeInBytes(64).setBufferSize(16));
+      verifyFreonCommand(
+          new ParameterBuilder().setVolumeName("vol5").setBucketName("bucket1")
+              .setPrefixFilePath("/dir1/dir2/dir3").setTotalThreadCount(10)
+              .setNumOfReadOperations(5).setNumOfWriteOperations(0)
+              .setFileCountForRead(5));
+      verifyFreonCommand(
+          new ParameterBuilder().setVolumeName("vol6").setBucketName("bucket1")
+              .setPrefixFilePath("/dir1/dir2/dir3/dir4").setTotalThreadCount(20)
+              .setNumOfReadOperations(0).setNumOfWriteOperations(5)
+              .setFileCountForRead(0).setFileCountForWrite(5));
+    } finally {
+      shutdown();
+    }
+  }
+
+  private void verifyFreonCommand(ParameterBuilder parameterBuilder)
+      throws IOException {
+    store.createVolume(parameterBuilder.volumeName);
+    OzoneVolume volume = store.getVolume(parameterBuilder.volumeName);
+    volume.createBucket(parameterBuilder.bucketName);
+    String rootPath = "o3fs://" + parameterBuilder.bucketName + "." +
+            parameterBuilder.volumeName + parameterBuilder.prefixFilePath;
+    String confPath = new File(path, "conf").getAbsolutePath();
+    new Freon().execute(
+        new String[]{"-conf", confPath, "obrwf", "-P", rootPath,
+            "-r", String.valueOf(parameterBuilder.fileCountForRead),
+            "-w", String.valueOf(parameterBuilder.fileCountForWrite),
+            "-g", String.valueOf(parameterBuilder.fileSizeInBytes),
+            "-b", String.valueOf(parameterBuilder.bufferSize),
+            "-l", String.valueOf(parameterBuilder.length),
+            "-c", String.valueOf(parameterBuilder.totalThreadCount),
+            "-T", String.valueOf(parameterBuilder.readThreadPercentage),
+            "-R", String.valueOf(parameterBuilder.numOfReadOperations),
+            "-W", String.valueOf(parameterBuilder.numOfWriteOperations),
+            "-n", String.valueOf(1)});
+
+    LOG.info("Started verifying OM bucket read/write ops file generation...");
+    FileSystem fileSystem = FileSystem.get(URI.create(rootPath),
+        conf);
+    Path rootDir = new Path(rootPath.concat(OzoneConsts.OM_KEY_PREFIX));
+    FileStatus[] fileStatuses = fileSystem.listStatus(rootDir);
+    verifyFileCreation(2, fileStatuses, true);
+
+    Path readDir = new Path(rootPath.concat("/readPath"));
+    FileStatus[] readFileStatuses = fileSystem.listStatus(readDir);
+    verifyFileCreation(parameterBuilder.fileCountForRead, readFileStatuses,
+        false);
+
+    int readThreadCount = (parameterBuilder.readThreadPercentage *
+        parameterBuilder.totalThreadCount) / 100;
+    int writeThreadCount = parameterBuilder.totalThreadCount - readThreadCount;
+
+    Path writeDir = new Path(rootPath.concat("/writePath"));
+    FileStatus[] writeFileStatuses = fileSystem.listStatus(writeDir);
+    verifyFileCreation(writeThreadCount * parameterBuilder.fileCountForWrite *
+        parameterBuilder.numOfWriteOperations, writeFileStatuses, false);
+
+    verifyOMLockMetrics(cluster.getOzoneManager().getMetadataManager().getLock()
+        .getOMLockMetrics());
+  }
+
+  private void verifyFileCreation(int expectedCount, FileStatus[] fileStatuses,
+                          boolean checkDirectoryCount) {
+    int actual = 0;
+    if (checkDirectoryCount) {
+      for (FileStatus fileStatus : fileStatuses) {
+        if (fileStatus.isDirectory()) {
+          ++actual;
+        }
+      }
+    } else {
+      for (FileStatus fileStatus : fileStatuses) {
+        if (fileStatus.isFile()) {
+          ++actual;
+        }
+      }
+    }
+    Assert.assertEquals("Mismatch Count!", expectedCount, actual);
+  }
+
+  private void verifyOMLockMetrics(OMLockMetrics omLockMetrics) {
+    String readLockWaitingTimeMsStat =
+        omLockMetrics.getReadLockWaitingTimeMsStat();
+    LOG.info("Read Lock Waiting Time Stat: " + readLockWaitingTimeMsStat);
+    LOG.info("Longest Read Lock Waiting Time (ms): " +
+        omLockMetrics.getLongestReadLockWaitingTimeMs());
+    int readWaitingSamples =
+        Integer.parseInt(readLockWaitingTimeMsStat.split(" ")[2]);
+    Assert.assertTrue("Read Lock Waiting Samples should be positive",
+        readWaitingSamples > 0);
+
+    String readLockHeldTimeMsStat = omLockMetrics.getReadLockHeldTimeMsStat();
+    LOG.info("Read Lock Held Time Stat: " + readLockHeldTimeMsStat);
+    LOG.info("Longest Read Lock Held Time (ms): " +
+        omLockMetrics.getLongestReadLockHeldTimeMs());
+    int readHeldSamples =
+        Integer.parseInt(readLockHeldTimeMsStat.split(" ")[2]);
+    Assert.assertTrue("Read Lock Held Samples should be positive",
+        readHeldSamples > 0);
+
+    String writeLockWaitingTimeMsStat =
+        omLockMetrics.getWriteLockWaitingTimeMsStat();
+    LOG.info("Write Lock Waiting Time Stat: " + writeLockWaitingTimeMsStat);
+    LOG.info("Longest Write Lock Waiting Time (ms): " +
+        omLockMetrics.getLongestWriteLockWaitingTimeMs());
+    int writeWaitingSamples =
+        Integer.parseInt(writeLockWaitingTimeMsStat.split(" ")[2]);
+    Assert.assertTrue("Write Lock Waiting Samples should be positive",
+        writeWaitingSamples > 0);
+
+    String writeLockHeldTimeMsStat = omLockMetrics.getWriteLockHeldTimeMsStat();
+    LOG.info("Write Lock Held Time Stat: " + writeLockHeldTimeMsStat);
+    LOG.info("Longest Write Lock Held Time (ms): " +
+        omLockMetrics.getLongestWriteLockHeldTimeMs());
+    int writeHeldSamples =
+        Integer.parseInt(writeLockHeldTimeMsStat.split(" ")[2]);
+    Assert.assertTrue("Write Lock Held Samples should be positive",
+        writeHeldSamples > 0);
+  }
+
+  private static class ParameterBuilder {
+
+    private String volumeName = "vol1";
+    private String bucketName = "bucket1";
+    private String prefixFilePath = "/dir1/dir2";
+    private int fileCountForRead = 100;
+    private int fileCountForWrite = 10;
+    private long fileSizeInBytes = 256;
+    private int bufferSize = 64;
+    private int length = 10;
+    private int totalThreadCount = 100;
+    private int readThreadPercentage = 90;
+    private int numOfReadOperations = 50;
+    private int numOfWriteOperations = 10;
+
+    private ParameterBuilder setVolumeName(String volumeNameParam) {
+      volumeName = volumeNameParam;
+      return this;
+    }
+
+    private ParameterBuilder setBucketName(String bucketNameParam) {
+      bucketName = bucketNameParam;
+      return this;
+    }
+
+    private ParameterBuilder setPrefixFilePath(String prefixFilePathParam) {
+      prefixFilePath = prefixFilePathParam;
+      return this;
+    }
+
+    private ParameterBuilder setFileCountForRead(int fileCountForReadParam) {
+      fileCountForRead = fileCountForReadParam;
+      return this;
+    }
+
+    private ParameterBuilder setFileCountForWrite(int fileCountForWriteParam) {
+      fileCountForWrite = fileCountForWriteParam;
+      return this;
+    }
+
+    private ParameterBuilder setFileSizeInBytes(long fileSizeInBytesParam) {
+      fileSizeInBytes = fileSizeInBytesParam;
+      return this;
+    }
+
+    private ParameterBuilder setBufferSize(int bufferSizeParam) {
+      bufferSize = bufferSizeParam;
+      return this;
+    }
+
+    private ParameterBuilder setLength(int lengthParam) {
+      length = lengthParam;
+      return this;
+    }
+
+    private ParameterBuilder setTotalThreadCount(int totalThreadCountParam) {
+      totalThreadCount = totalThreadCountParam;
+      return this;
+    }
+
+    private ParameterBuilder setReadThreadPercentage(
+        int readThreadPercentageParam) {
+      readThreadPercentage = readThreadPercentageParam;
+      return this;
+    }
+
+    private ParameterBuilder setNumOfReadOperations(
+        int numOfReadOperationsParam) {
+      numOfReadOperations = numOfReadOperationsParam;
+      return this;
+    }
+
+    private ParameterBuilder setNumOfWriteOperations(
+        int numOfWriteOperationsParam) {
+      numOfWriteOperations = numOfWriteOperationsParam;
+      return this;
+    }
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
index 0bb5011..e1874c3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
@@ -40,6 +40,7 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.log4j.Logger;
+import org.apache.ozone.test.tag.Flaky;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -281,6 +282,7 @@
   }
 
   @Test
+  @Flaky("HDDS-6644")
   public void testReadRequest() throws Exception {
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
     ObjectStore objectStore = getObjectStore();
@@ -318,6 +320,7 @@
   }
 
   @Test
+  @Flaky("HDDS-6642")
   public void testListVolumes() throws Exception {
     String userName = UserGroupInformation.getCurrentUser().getUserName();
     ObjectStore objectStore = getObjectStore();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java
index 8862b0f..82dd430 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java
@@ -45,6 +45,7 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.tag.Flaky;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -129,6 +130,7 @@
   // LIST        LIST         READ      (V1 LIST=>READ)
   // READ_ACL    READ_ACL     READ      (V1 READ_ACL=>READ)
   @Test
+  @Flaky("HDDS-6335")
   public void testKeyAcl()
       throws IOException {
     OzoneObj keyObj;
diff --git a/hadoop-ozone/ozonefs-shaded/pom.xml b/hadoop-ozone/ozonefs-shaded/pom.xml
index 46419e1..9d26a77 100644
--- a/hadoop-ozone/ozonefs-shaded/pom.xml
+++ b/hadoop-ozone/ozonefs-shaded/pom.xml
@@ -53,6 +53,10 @@
           <artifactId>hadoop-annotations</artifactId>
         </exclusion>
         <exclusion>
+          <groupId>org.apache.hadoop.thirdparty</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
           <groupId>log4j</groupId>
           <artifactId>log4j</artifactId>
         </exclusion>
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
index c5b9a3a..914b709 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
@@ -63,7 +63,8 @@
         GeneratorDatanode.class,
         ClosedContainerReplicator.class,
         StreamingGenerator.class,
-        SCMThroughputBenchmark.class},
+        SCMThroughputBenchmark.class,
+        OmBucketReadWriteFileOps.class},
     versionProvider = HddsVersionProvider.class,
     mixinStandardHelpOptions = true)
 public class Freon extends GenericCli {
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmBucketReadWriteFileOps.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmBucketReadWriteFileOps.java
new file mode 100644
index 0000000..401e71b1
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmBucketReadWriteFileOps.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import com.codahale.metrics.Timer;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+
+/**
+ * Synthetic read/write file operations workload generator tool.
+ */
+@Command(name = "obrwf",
+    aliases = "om-bucket-read-write-file-ops",
+    description = "Creates files, performs respective read/write " +
+        "operations to measure lock performance.",
+    versionProvider = HddsVersionProvider.class,
+    mixinStandardHelpOptions = true,
+    showDefaultValues = true)
+
+public class OmBucketReadWriteFileOps extends BaseFreonGenerator
+    implements Callable<Void> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OmBucketReadWriteFileOps.class);
+
+  @Option(names = {"-P", "--root-path"},
+      description = "Root path",
+      defaultValue = "o3fs://bucket1.vol1/dir1/dir2")
+  private String rootPath;
+
+  @Option(names = {"-r", "--file-count-for-read"},
+      description = "Number of files to be written in the read directory.",
+      defaultValue = "100")
+  private int fileCountForRead;
+
+  @Option(names = {"-w", "--file-count-for-write"},
+      description = "Number of files to be written in the write directory.",
+      defaultValue = "10")
+  private int fileCountForWrite;
+
+  @Option(names = {"-g", "--file-size"},
+      description = "Generated data size (in bytes) of each file to be " +
+          "written in each directory.",
+      defaultValue = "256")
+  private long fileSizeInBytes;
+
+  @Option(names = {"-b", "--buffer"},
+      description = "Size of buffer used to generated the file content.",
+      defaultValue = "64")
+  private int bufferSize;
+
+  @Option(names = {"-l", "--name-len"},
+      description = "Length of the random name of directory you want to " +
+          "create.",
+      defaultValue = "10")
+  private int length;
+
+  @Option(names = {"-c", "--total-thread-count"},
+      description = "Total number of threads to be executed.",
+      defaultValue = "100")
+  private int totalThreadCount;
+
+  @Option(names = {"-T", "--read-thread-percentage"},
+      description = "Percentage of the total number of threads to be " +
+          "allocated for read operations. The remaining percentage of " +
+          "threads will be allocated for write operations.",
+      defaultValue = "90")
+  private int readThreadPercentage;
+
+  @Option(names = {"-R", "--num-of-read-operations"},
+      description = "Number of read operations to be performed by each thread.",
+      defaultValue = "50")
+  private int numOfReadOperations;
+
+  @Option(names = {"-W", "--num-of-write-operations"},
+      description = "Number of write operations to be performed by each " +
+          "thread.",
+      defaultValue = "10")
+  private int numOfWriteOperations;
+
+  private Timer timer;
+
+  private ContentGenerator contentGenerator;
+
+  private FileSystem fileSystem;
+
+  private int readThreadCount;
+  private int writeThreadCount;
+
+  @Override
+  public Void call() throws Exception {
+    init();
+
+    readThreadCount = (readThreadPercentage * totalThreadCount) / 100;
+    writeThreadCount = totalThreadCount - readThreadCount;
+
+    print("rootPath: " + rootPath);
+    print("fileCountForRead: " + fileCountForRead);
+    print("fileCountForWrite: " + fileCountForWrite);
+    print("fileSizeInBytes: " + fileSizeInBytes);
+    print("bufferSize: " + bufferSize);
+    print("totalThreadCount: " + totalThreadCount);
+    print("readThreadPercentage: " + readThreadPercentage);
+    print("writeThreadPercentage: " + (100 - readThreadPercentage));
+    print("readThreadCount: " + readThreadCount);
+    print("writeThreadCount: " + writeThreadCount);
+    print("numOfReadOperations: " + numOfReadOperations);
+    print("numOfWriteOperations: " + numOfWriteOperations);
+
+    OzoneConfiguration configuration = createOzoneConfiguration();
+    fileSystem = FileSystem.get(URI.create(rootPath), configuration);
+
+    contentGenerator = new ContentGenerator(fileSizeInBytes, bufferSize);
+    timer = getMetrics().timer("file-create");
+
+    runTests(this::mainMethod);
+    return null;
+  }
+
+  private void mainMethod(long counter) throws Exception {
+
+    int readResult = readOperations();
+    int writeResult = writeOperations();
+
+    print("Total Files Read: " + readResult);
+    print("Total Files Written: " + writeResult * fileCountForWrite);
+
+    // TODO: print read/write lock metrics (HDDS-6435, HDDS-6436).
+  }
+
+  private int readOperations() throws Exception {
+
+    // Create fileCountForRead (defaultValue = 1000) files under
+    // rootPath/readPath directory
+    String readPath =
+        rootPath.concat(OzoneConsts.OM_KEY_PREFIX).concat("readPath");
+    fileSystem.mkdirs(new Path(readPath));
+    createFiles(readPath, fileCountForRead);
+
+    // Start readThreadCount (defaultValue = 90) concurrent read threads
+    // performing numOfReadOperations (defaultValue = 50) iterations
+    // of read operations (fileSystem.listStatus(rootPath/readPath))
+    ExecutorService readService = Executors.newFixedThreadPool(readThreadCount);
+    CompletionService<Integer> readExecutorCompletionService =
+        new ExecutorCompletionService<>(readService);
+    List<Future<Integer>> readFutures = new ArrayList<>();
+    for (int i = 0; i < readThreadCount; i++) {
+      readFutures.add(readExecutorCompletionService.submit(() -> {
+        int readCount = 0;
+        try {
+          for (int j = 0; j < numOfReadOperations; j++) {
+            FileStatus[] status =
+                fileSystem.listStatus(new Path(readPath));
+            readCount += status.length;
+          }
+        } catch (IOException e) {
+          LOG.warn("Exception while listing status ", e);
+        }
+        return readCount;
+      }));
+    }
+
+    int readResult = 0;
+    for (int i = 0; i < readFutures.size(); i++) {
+      try {
+        readResult += readExecutorCompletionService.take().get();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (ExecutionException e) {
+        e.printStackTrace();
+      }
+    }
+    readService.shutdown();
+
+    return readResult;
+  }
+
+  private int writeOperations() throws Exception {
+
+    // Start writeThreadCount (defaultValue = 10) concurrent write threads
+    // performing numOfWriteOperations (defaultValue = 10) iterations
+    // of write operations (createFiles(rootPath/writePath))
+    String writePath =
+        rootPath.concat(OzoneConsts.OM_KEY_PREFIX).concat("writePath");
+    fileSystem.mkdirs(new Path(writePath));
+
+    ExecutorService writeService =
+        Executors.newFixedThreadPool(writeThreadCount);
+    CompletionService<Integer> writeExecutorCompletionService =
+        new ExecutorCompletionService<>(writeService);
+    List<Future<Integer>> writeFutures = new ArrayList<>();
+    for (int i = 0; i < writeThreadCount; i++) {
+      writeFutures.add(writeExecutorCompletionService.submit(() -> {
+        int writeCount = 0;
+        try {
+          for (int j = 0; j < numOfWriteOperations; j++) {
+            createFiles(writePath, fileCountForWrite);
+            writeCount++;
+          }
+        } catch (IOException e) {
+          LOG.warn("Exception while creating file ", e);
+        }
+        return writeCount;
+      }));
+    }
+
+    int writeResult = 0;
+    for (int i = 0; i < writeFutures.size(); i++) {
+      try {
+        writeResult += writeExecutorCompletionService.take().get();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (ExecutionException e) {
+        e.printStackTrace();
+      }
+    }
+    writeService.shutdown();
+
+    return writeResult;
+  }
+
+  private void createFile(String dir, long counter) throws Exception {
+    String fileName = dir.concat(OzoneConsts.OM_KEY_PREFIX)
+        .concat(RandomStringUtils.randomAlphanumeric(length));
+    Path file = new Path(fileName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("FilePath:{}", file);
+    }
+    timer.time(() -> {
+      try (FSDataOutputStream output = fileSystem.create(file)) {
+        contentGenerator.write(output);
+      }
+      return null;
+    });
+  }
+
+  private void createFiles(String dir, int fileCount) throws Exception {
+    for (int i = 0; i < fileCount; i++) {
+      createFile(dir, i);
+    }
+  }
+}