HDDS-7053. Add client-side pipelines distribution metrics (#3626)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
new file mode 100644
index 0000000..1045f7a
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.Interns;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Container client metrics that describe how data writes are distributed to
+ * pipelines.
+ */
+@Metrics(about = "Client Metrics", context = OzoneConsts.OZONE)
+public final class ContainerClientMetrics {
+  private static ContainerClientMetrics instance;
+  @VisibleForTesting
+  static int referenceCount = 0;
+
+  private static final String SOURCE_NAME =
+      ContainerClientMetrics.class.getSimpleName();
+  private static int instanceCount = 0;
+
+  @Metric
+  private MutableCounterLong totalWriteChunkCalls;
+  @Metric
+  private MutableCounterLong totalWriteChunkBytes;
+  private final Map<PipelineID, MutableCounterLong> writeChunkCallsByPipeline;
+  private final Map<PipelineID, MutableCounterLong> writeChunkBytesByPipeline;
+  private final Map<UUID, MutableCounterLong> writeChunksCallsByLeaders;
+  private final MetricsRegistry registry;
+
+  public static synchronized ContainerClientMetrics acquire() {
+    if (instance == null) {
+      instanceCount++;
+      instance = DefaultMetricsSystem.instance().register(
+          SOURCE_NAME + instanceCount,
+          "Ozone Client Metrics", new ContainerClientMetrics());
+    }
+    referenceCount++;
+    return instance;
+  }
+
+  public static synchronized void release() {
+    if (instance == null) {
+      throw new IllegalStateException("This metrics class is not used.");
+    }
+    referenceCount--;
+    if (referenceCount == 0) {
+      DefaultMetricsSystem.instance().unregisterSource(
+          SOURCE_NAME + instanceCount);
+      instance = null;
+    }
+  }
+
+  private ContainerClientMetrics() {
+    this.registry = new MetricsRegistry(SOURCE_NAME);
+    writeChunkCallsByPipeline = new ConcurrentHashMap<>();
+    writeChunkBytesByPipeline = new ConcurrentHashMap<>();
+    writeChunksCallsByLeaders = new ConcurrentHashMap<>();
+  }
+
+  public void recordWriteChunk(Pipeline pipeline, long chunkSizeBytes) {
+    writeChunkCallsByPipeline.computeIfAbsent(pipeline.getId(),
+        pipelineID -> registry.newCounter(
+            Interns.info("writeChunkCallsPipeline-" + pipelineID.getId(),
+                "Number of writeChunk calls on a pipelines"),
+            0L)
+    ).incr();
+    writeChunkBytesByPipeline.computeIfAbsent(pipeline.getId(),
+        pipelineID -> registry.newCounter(
+            Interns.info("writeChunkBytesPipeline-" + pipelineID.getId(),
+                "Number of bytes written on a pipelines"),
+            0L)
+    ).incr(chunkSizeBytes);
+    if (pipeline.getLeaderId() != null) {
+      writeChunksCallsByLeaders.computeIfAbsent(pipeline.getLeaderId(),
+          leader -> registry.newCounter(
+              Interns.info("writeChunkCallsLeader-" + leader,
+                  "Number of writeChunk calls on a leader node"),
+              0L)
+      ).incr();
+    }
+    totalWriteChunkCalls.incr();
+    totalWriteChunkBytes.incr(chunkSizeBytes);
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getTotalWriteChunkBytes() {
+    return totalWriteChunkBytes;
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getTotalWriteChunkCalls() {
+    return totalWriteChunkCalls;
+  }
+
+  @VisibleForTesting
+  public Map<PipelineID, MutableCounterLong> getWriteChunkBytesByPipeline() {
+    return writeChunkBytesByPipeline;
+  }
+
+  @VisibleForTesting
+  public Map<PipelineID, MutableCounterLong> getWriteChunkCallsByPipeline() {
+    return writeChunkCallsByPipeline;
+  }
+
+  @VisibleForTesting
+  public Map<UUID, MutableCounterLong> getWriteChunksCallsByLeaders() {
+    return writeChunksCallsByLeaders;
+  }
+}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 599f68a..09fe30e 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -36,6 +36,7 @@
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
@@ -120,6 +121,8 @@ public class BlockOutputStream extends OutputStream {
   private ChunkBuffer currentBuffer;
   private final Token<? extends TokenIdentifier> token;
   private int replicationIndex;
+  private Pipeline pipeline;
+  private final ContainerClientMetrics clientMetrics;
 
   /**
    * Creates a new BlockOutputStream.
@@ -135,7 +138,8 @@ public BlockOutputStream(
       Pipeline pipeline,
       BufferPool bufferPool,
       OzoneClientConfig config,
-      Token<? extends TokenIdentifier> token
+      Token<? extends TokenIdentifier> token,
+      ContainerClientMetrics clientMetrics
   ) throws IOException {
     this.xceiverClientFactory = xceiverClientManager;
     this.config = config;
@@ -177,6 +181,8 @@ public BlockOutputStream(
     ioException = new AtomicReference<>(null);
     checksum = new Checksum(config.getChecksumType(),
         config.getBytesPerChecksum());
+    this.clientMetrics = clientMetrics;
+    this.pipeline = pipeline;
   }
 
   void refreshCurrentBuffer() {
@@ -694,7 +700,6 @@ CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
       LOG.debug("Writing chunk {} length {} at offset {}",
           chunkInfo.getChunkName(), effectiveChunkSize, offset);
     }
-
     try {
       XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
           blockID.get(), data, token, replicationIndex);
@@ -717,6 +722,7 @@ CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
             throw ce;
           });
       containerBlockData.addChunks(chunkInfo);
+      clientMetrics.recordWriteChunk(pipeline, chunkInfo.getLen());
       return validateFuture;
     } catch (IOException | ExecutionException e) {
       throw new IOException(EXCEPTION_MSG + e.toString(), e);
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 9f6bf05..9806fd7 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
@@ -66,10 +67,11 @@ public ECBlockOutputStream(
       Pipeline pipeline,
       BufferPool bufferPool,
       OzoneClientConfig config,
-      Token<? extends TokenIdentifier> token
+      Token<? extends TokenIdentifier> token,
+      ContainerClientMetrics clientMetrics
   ) throws IOException {
     super(blockID, xceiverClientManager,
-        pipeline, bufferPool, config, token);
+        pipeline, bufferPool, config, token, clientMetrics);
     // In EC stream, there will be only one node in pipeline.
     this.datanodeDetails = pipeline.getClosestNode();
   }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index 802adc1..7c2d87d 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -20,6 +20,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
@@ -74,9 +75,11 @@ public RatisBlockOutputStream(
       Pipeline pipeline,
       BufferPool bufferPool,
       OzoneClientConfig config,
-      Token<? extends TokenIdentifier> token
+      Token<? extends TokenIdentifier> token,
+      ContainerClientMetrics clientMetrics
   ) throws IOException {
-    super(blockID, xceiverClientManager, pipeline, bufferPool, config, token);
+    super(blockID, xceiverClientManager, pipeline,
+        bufferPool, config, token, clientMetrics);
     this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
   }
 
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestContainerClientMetrics.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestContainerClientMetrics.java
new file mode 100644
index 0000000..b0939ce
--- /dev/null
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestContainerClientMetrics.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test ContainerClientMetrics.
+ */
+public class TestContainerClientMetrics {
+  @Before
+  public void setup() {
+    while (ContainerClientMetrics.referenceCount > 0) {
+      ContainerClientMetrics.release();
+    }
+  }
+
+  @Test
+  public void testRecordChunkMetrics() {
+    ContainerClientMetrics metrics = ContainerClientMetrics.acquire();
+    PipelineID pipelineId1 = PipelineID.randomId();
+    UUID leaderId1 = UUID.randomUUID();
+    PipelineID pipelineId2 = PipelineID.randomId();
+    UUID leaderId2 = UUID.randomUUID();
+    PipelineID pipelineId3 = PipelineID.randomId();
+
+    metrics.recordWriteChunk(createPipeline(pipelineId1, leaderId1), 10);
+    metrics.recordWriteChunk(createPipeline(pipelineId2, leaderId2), 20);
+    metrics.recordWriteChunk(createPipeline(pipelineId3, leaderId1), 30);
+
+    assertEquals(3, metrics.getTotalWriteChunkCalls().value());
+    assertEquals(60, metrics.getTotalWriteChunkBytes().value());
+
+    assertEquals(3, metrics.getWriteChunkBytesByPipeline().size());
+    assertEquals(10,
+        metrics.getWriteChunkBytesByPipeline().get(pipelineId1).value());
+    assertEquals(20,
+        metrics.getWriteChunkBytesByPipeline().get(pipelineId2).value());
+    assertEquals(30,
+        metrics.getWriteChunkBytesByPipeline().get(pipelineId3).value());
+
+    assertEquals(3, metrics.getWriteChunkCallsByPipeline().size());
+    assertEquals(1,
+        metrics.getWriteChunkCallsByPipeline().get(pipelineId1).value());
+    assertEquals(1,
+        metrics.getWriteChunkCallsByPipeline().get(pipelineId2).value());
+    assertEquals(1,
+        metrics.getWriteChunkCallsByPipeline().get(pipelineId3).value());
+
+    assertEquals(2, metrics.getWriteChunksCallsByLeaders().size());
+    assertEquals(2,
+        metrics.getWriteChunksCallsByLeaders().get(leaderId1).value());
+    assertEquals(1,
+        metrics.getWriteChunksCallsByLeaders().get(leaderId2).value());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testReleaseWithoutUse() {
+    ContainerClientMetrics.release();
+  }
+
+  @Test
+  public void testAcquireAndRelease() {
+    Assertions.assertNotNull(ContainerClientMetrics.acquire());
+    Assertions.assertEquals(1, ContainerClientMetrics.referenceCount);
+    ContainerClientMetrics.release();
+    Assertions.assertEquals(0, ContainerClientMetrics.referenceCount);
+
+    Assertions.assertNotNull(ContainerClientMetrics.acquire());
+    Assertions.assertNotNull(ContainerClientMetrics.acquire());
+    Assertions.assertEquals(2, ContainerClientMetrics.referenceCount);
+    ContainerClientMetrics.release();
+    ContainerClientMetrics.release();
+    Assertions.assertEquals(0, ContainerClientMetrics.referenceCount);
+
+    ContainerClientMetrics.acquire();
+    Assertions.assertNotNull(ContainerClientMetrics.acquire());
+  }
+
+  private Pipeline createPipeline(PipelineID piplineId, UUID leaderId) {
+    return Pipeline.newBuilder()
+        .setId(piplineId)
+        .setReplicationConfig(Mockito.mock(ReplicationConfig.class))
+        .setState(Pipeline.PipelineState.OPEN)
+        .setNodes(Collections.emptyList())
+        .setLeaderId(leaderId)
+        .build();
+  }
+}
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
index 635f1f7..2b0cf12 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -36,6 +36,7 @@
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
@@ -110,7 +111,8 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
         pipeline,
         bufferPool,
         config,
-        null);
+        null,
+        ContainerClientMetrics.acquire());
     return outputStream;
   }
 
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
index dc28102..b233254 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
@@ -23,6 +23,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
@@ -87,6 +88,7 @@ public static Pipeline createRatisPipeline() {
         .setReplicationConfig(
             RatisReplicationConfig.getInstance(ReplicationFactor.THREE))
         .setNodes(nodes)
+        .setLeaderId(UUID.randomUUID())
         .build();
   }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 0c79cbc..b9da5cb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -100,9 +101,11 @@ public class ECReconstructionCoordinator implements Closeable {
 
   private final BlockInputStreamFactory blockInputStreamFactory;
   private final TokenHelper tokenHelper;
+  private final ContainerClientMetrics clientMetrics;
 
   public ECReconstructionCoordinator(ConfigurationSource conf,
-      CertificateClient certificateClient) throws IOException {
+                                     CertificateClient certificateClient)
+      throws IOException {
     this.containerOperationClient = new ECContainerOperationClient(conf,
         certificateClient);
     this.byteBufferPool = new ElasticByteBufferPool();
@@ -117,6 +120,7 @@ public ECReconstructionCoordinator(ConfigurationSource conf,
     this.blockInputStreamFactory = BlockInputStreamFactoryImpl
         .getInstance(byteBufferPool, () -> ecReconstructExecutor);
     tokenHelper = new TokenHelper(conf, certificateClient);
+    this.clientMetrics = ContainerClientMetrics.acquire();
   }
 
   public void reconstructECContainerGroup(long containerID,
@@ -242,7 +246,7 @@ void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
                 this.containerOperationClient.getXceiverClientManager(),
                 this.containerOperationClient
                     .singleNodePipeline(datanodeDetails, repConfig), bufferPool,
-                configuration, blockLocationInfo.getToken());
+                configuration, blockLocationInfo.getToken(), clientMetrics);
         bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
         // Make sure it's clean. Don't want to reuse the erroneously returned
         // buffers from the pool.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 2b3b756..7431a17 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -24,6 +24,7 @@
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -58,6 +59,7 @@ public class BlockOutputStreamEntry extends OutputStream {
   private final Token<OzoneBlockTokenIdentifier> token;
 
   private BufferPool bufferPool;
+  private ContainerClientMetrics clientMetrics;
 
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   BlockOutputStreamEntry(
@@ -67,7 +69,8 @@ public class BlockOutputStreamEntry extends OutputStream {
       long length,
       BufferPool bufferPool,
       Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config
+      OzoneClientConfig config,
+      ContainerClientMetrics clientMetrics
   ) {
     this.config = config;
     this.outputStream = null;
@@ -79,6 +82,7 @@ public class BlockOutputStreamEntry extends OutputStream {
     this.length = length;
     this.currentPosition = 0;
     this.bufferPool = bufferPool;
+    this.clientMetrics = clientMetrics;
   }
 
   /**
@@ -100,7 +104,11 @@ void checkStream() throws IOException {
    */
   void createOutputStream() throws IOException {
     outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager,
-        pipeline, bufferPool, config, token);
+        pipeline, bufferPool, config, token, clientMetrics);
+  }
+
+  ContainerClientMetrics getClientMetrics() {
+    return clientMetrics;
   }
 
   @Override
@@ -331,6 +339,7 @@ public static class Builder {
     private BufferPool bufferPool;
     private Token<OzoneBlockTokenIdentifier> token;
     private OzoneClientConfig config;
+    private ContainerClientMetrics clientMetrics;
 
     public Builder setBlockID(BlockID bID) {
       this.blockID = bID;
@@ -372,6 +381,10 @@ public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
       this.token = bToken;
       return this;
     }
+    public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
+      this.clientMetrics = clientMetrics;
+      return this;
+    }
 
     public BlockOutputStreamEntry build() {
       return new BlockOutputStreamEntry(blockID,
@@ -380,7 +393,7 @@ public BlockOutputStreamEntry build() {
           pipeline,
           length,
           bufferPool,
-          token, config);
+          token, config, clientMetrics);
     }
   }
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index bdfa3f2..5b81696 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -79,6 +80,7 @@ public class BlockOutputStreamEntryPool {
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private final long openID;
   private final ExcludeList excludeList;
+  private final ContainerClientMetrics clientMetrics;
 
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   public BlockOutputStreamEntryPool(
@@ -88,7 +90,8 @@ public BlockOutputStreamEntryPool(
       String uploadID, int partNumber,
       boolean isMultipart, OmKeyInfo info,
       boolean unsafeByteBufferConversion,
-      XceiverClientFactory xceiverClientFactory, long openID
+      XceiverClientFactory xceiverClientFactory, long openID,
+      ContainerClientMetrics clientMetrics
   ) {
     this.config = config;
     this.xceiverClientFactory = xceiverClientFactory;
@@ -110,19 +113,14 @@ public BlockOutputStreamEntryPool(
                 .getStreamBufferSize()),
             ByteStringConversion
                 .createByteBufferConversion(unsafeByteBufferConversion));
+    this.clientMetrics = clientMetrics;
   }
 
   ExcludeList createExcludeList() {
     return new ExcludeList();
   }
 
-  /**
-   * A constructor for testing purpose only.
-   *
-   * @see KeyOutputStream#KeyOutputStream()
-   */
-  @VisibleForTesting
-  BlockOutputStreamEntryPool() {
+  BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics) {
     streamEntries = new ArrayList<>();
     omClient = null;
     keyArgs = null;
@@ -140,6 +138,7 @@ ExcludeList createExcludeList() {
     currentStreamIndex = 0;
     openID = -1;
     excludeList = new ExcludeList();
+    this.clientMetrics = clientMetrics;
   }
 
   /**
@@ -185,6 +184,7 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
             .setLength(subKeyInfo.getLength())
             .setBufferPool(bufferPool)
             .setToken(subKeyInfo.getToken())
+            .setClientMetrics(clientMetrics)
             .build();
   }
 
@@ -247,6 +247,10 @@ OzoneClientConfig getConfig() {
     return config;
   }
 
+  ContainerClientMetrics getClientMetrics() {
+    return clientMetrics;
+  }
+
   /**
    * Discards the subsequent pre allocated blocks and removes the streamEntries
    * from the streamEntries list for the container which is closed.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 13c6a86..06466af 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -75,9 +76,9 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry {
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config) {
+      OzoneClientConfig config, ContainerClientMetrics clientMetrics) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
-        token, config);
+        token, config, clientMetrics);
     assertInstanceOf(
         pipeline.getReplicationConfig(), ECReplicationConfig.class);
     this.replicationConfig =
@@ -96,7 +97,7 @@ void checkStream() throws IOException {
         blockOutputStreams[i] =
             new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
                 createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 1),
-                getBufferPool(), getConf(), getToken());
+                getBufferPool(), getConf(), getToken(), getClientMetrics());
       }
     }
   }
@@ -395,6 +396,7 @@ public static class Builder {
     private BufferPool bufferPool;
     private Token<OzoneBlockTokenIdentifier> token;
     private OzoneClientConfig config;
+    private ContainerClientMetrics clientMetrics;
 
     public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
       this.blockID = bID;
@@ -440,6 +442,12 @@ public ECBlockOutputStreamEntry.Builder setToken(
       return this;
     }
 
+    public ECBlockOutputStreamEntry.Builder setClientMetrics(
+        ContainerClientMetrics containerClientMetrics) {
+      this.clientMetrics = containerClientMetrics;
+      return this;
+    }
+
     public ECBlockOutputStreamEntry build() {
       return new ECBlockOutputStreamEntry(blockID,
           key,
@@ -447,7 +455,7 @@ public ECBlockOutputStreamEntry build() {
           pipeline,
           length,
           bufferPool,
-          token, config);
+          token, config, clientMetrics);
     }
   }
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index 4651896..687bb84 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -19,6 +19,7 @@
 
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -54,10 +55,11 @@ public ECBlockOutputStreamEntryPool(OzoneClientConfig config,
       OmKeyInfo info,
       boolean unsafeByteBufferConversion,
       XceiverClientFactory xceiverClientFactory,
-      long openID) {
+      long openID,
+      ContainerClientMetrics clientMetrics) {
     super(config, omClient, requestId, replicationConfig, uploadID, partNumber,
         isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory,
-        openID);
+        openID, clientMetrics);
     assert replicationConfig instanceof ECReplicationConfig;
   }
 
@@ -79,6 +81,7 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
             .setLength(subKeyInfo.getLength())
             .setBufferPool(getBufferPool())
             .setToken(subKeyInfo.getToken())
+            .setClientMetrics(getClientMetrics())
             .build();
   }
 
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index bcc6761..2246b33 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -92,6 +92,7 @@ public List<OmKeyLocationInfo> getLocationInfoList() {
   }
 
   private ECKeyOutputStream(Builder builder) {
+    super(builder.getClientMetrics());
     this.config = builder.getClientConfig();
     this.bufferPool = builder.getByteBufferPool();
     // For EC, cell/chunk size and buffer size can be same for now.
@@ -111,7 +112,8 @@ private ECKeyOutputStream(Builder builder) {
             builder.getMultipartUploadID(), builder.getMultipartNumber(),
             builder.isMultipartKey(),
             info, builder.isUnsafeByteBufferConversionEnabled(),
-            builder.getXceiverManager(), builder.getOpenHandler().getId());
+            builder.getXceiverManager(), builder.getOpenHandler().getId(),
+            builder.getClientMetrics());
 
     this.writeOffset = 0;
     this.encoder = CodecUtil.createRawEncoderWithFallback(
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index ee69ca1..b60b59c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -89,11 +90,7 @@ enum StreamAction {
 
   private long clientID;
 
-  /**
-   * A constructor for testing purpose only.
-   */
-  @VisibleForTesting
-  public KeyOutputStream() {
+  public KeyOutputStream(ContainerClientMetrics clientMetrics) {
     closed = false;
     this.retryPolicyMap = HddsClientUtils.getExceptionList()
         .stream()
@@ -101,7 +98,7 @@ public KeyOutputStream() {
             e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
     retryCount = 0;
     offset = 0;
-    blockOutputStreamEntryPool = new BlockOutputStreamEntryPool();
+    blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(clientMetrics);
   }
 
   @VisibleForTesting
@@ -137,7 +134,8 @@ public KeyOutputStream(
       OzoneManagerProtocol omClient, int chunkSize,
       String requestId, ReplicationConfig replicationConfig,
       String uploadID, int partNumber, boolean isMultipart,
-      boolean unsafeByteBufferConversion
+      boolean unsafeByteBufferConversion,
+      ContainerClientMetrics clientMetrics
   ) {
     this.config = config;
     blockOutputStreamEntryPool =
@@ -149,7 +147,8 @@ public KeyOutputStream(
             isMultipart, handler.getKeyInfo(),
             unsafeByteBufferConversion,
             xceiverClientManager,
-            handler.getId());
+            handler.getId(),
+            clientMetrics);
     this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
         config.getMaxRetryCount(), config.getRetryInterval());
     this.retryCount = 0;
@@ -552,6 +551,7 @@ public static class Builder {
     private boolean unsafeByteBufferConversion;
     private OzoneClientConfig clientConfig;
     private ReplicationConfig replicationConfig;
+    private ContainerClientMetrics clientMetrics;
 
     public String getMultipartUploadID() {
       return multipartUploadID;
@@ -652,6 +652,15 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) {
       return this;
     }
 
+    public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
+      this.clientMetrics = clientMetrics;
+      return this;
+    }
+
+    public ContainerClientMetrics getClientMetrics() {
+      return clientMetrics;
+    }
+
     public KeyOutputStream build() {
       return new KeyOutputStream(
           clientConfig,
@@ -664,7 +673,8 @@ public KeyOutputStream build() {
           multipartUploadID,
           multipartNumber,
           isMultipartKey,
-          unsafeByteBufferConversion);
+          unsafeByteBufferConversion,
+          clientMetrics);
     }
 
   }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index adbf5d7..a588859 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -62,6 +62,7 @@
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
@@ -204,6 +205,7 @@ public class RpcClient implements ClientProtocol {
   private final BlockInputStreamFactory blockInputStreamFactory;
   private final OzoneManagerVersion omVersion;
   private volatile ExecutorService ecReconstructExecutor;
+  private final ContainerClientMetrics clientMetrics;
 
   /**
    * Creates RpcClient instance with the given configuration.
@@ -332,6 +334,7 @@ public void onRemoval(
     this.byteBufferPool = new ElasticByteBufferPool();
     this.blockInputStreamFactory = BlockInputStreamFactoryImpl
         .getInstance(byteBufferPool, this::getECReconstructExecutor);
+    this.clientMetrics = ContainerClientMetrics.acquire();
   }
 
   public XceiverClientFactory getXceiverClientManager() {
@@ -1446,6 +1449,7 @@ public void close() throws IOException {
     IOUtils.cleanupWithLogger(LOG, ozoneManagerClient, xceiverClientManager);
     keyProviderCache.invalidateAll();
     keyProviderCache.cleanUp();
+    ContainerClientMetrics.release();
   }
 
   @Deprecated
@@ -1940,7 +1944,8 @@ private KeyOutputStream.Builder createKeyOutputStream(OpenKeySession openKey,
         .setOmClient(ozoneManagerClient)
         .setRequestID(requestId)
         .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
-        .setConfig(clientConfig);
+        .setConfig(clientConfig)
+        .setClientMetrics(clientMetrics);
   }
 
   @Override