HDDS-7053. Add client-side pipelines distribution metrics (#3626)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
new file mode 100644
index 0000000..1045f7a
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.Interns;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Container client metrics that describe how data writes are distributed to
+ * pipelines.
+ */
+@Metrics(about = "Client Metrics", context = OzoneConsts.OZONE)
+public final class ContainerClientMetrics {
+ private static ContainerClientMetrics instance;
+ @VisibleForTesting
+ static int referenceCount = 0;
+
+ private static final String SOURCE_NAME =
+ ContainerClientMetrics.class.getSimpleName();
+ private static int instanceCount = 0;
+
+ @Metric
+ private MutableCounterLong totalWriteChunkCalls;
+ @Metric
+ private MutableCounterLong totalWriteChunkBytes;
+ private final Map<PipelineID, MutableCounterLong> writeChunkCallsByPipeline;
+ private final Map<PipelineID, MutableCounterLong> writeChunkBytesByPipeline;
+ private final Map<UUID, MutableCounterLong> writeChunksCallsByLeaders;
+ private final MetricsRegistry registry;
+
+ public static synchronized ContainerClientMetrics acquire() {
+ if (instance == null) {
+ instanceCount++;
+ instance = DefaultMetricsSystem.instance().register(
+ SOURCE_NAME + instanceCount,
+ "Ozone Client Metrics", new ContainerClientMetrics());
+ }
+ referenceCount++;
+ return instance;
+ }
+
+ public static synchronized void release() {
+ if (instance == null) {
+ throw new IllegalStateException("This metrics class is not used.");
+ }
+ referenceCount--;
+ if (referenceCount == 0) {
+ DefaultMetricsSystem.instance().unregisterSource(
+ SOURCE_NAME + instanceCount);
+ instance = null;
+ }
+ }
+
+ private ContainerClientMetrics() {
+ this.registry = new MetricsRegistry(SOURCE_NAME);
+ writeChunkCallsByPipeline = new ConcurrentHashMap<>();
+ writeChunkBytesByPipeline = new ConcurrentHashMap<>();
+ writeChunksCallsByLeaders = new ConcurrentHashMap<>();
+ }
+
+ public void recordWriteChunk(Pipeline pipeline, long chunkSizeBytes) {
+ writeChunkCallsByPipeline.computeIfAbsent(pipeline.getId(),
+ pipelineID -> registry.newCounter(
+ Interns.info("writeChunkCallsPipeline-" + pipelineID.getId(),
+ "Number of writeChunk calls on a pipelines"),
+ 0L)
+ ).incr();
+ writeChunkBytesByPipeline.computeIfAbsent(pipeline.getId(),
+ pipelineID -> registry.newCounter(
+ Interns.info("writeChunkBytesPipeline-" + pipelineID.getId(),
+ "Number of bytes written on a pipelines"),
+ 0L)
+ ).incr(chunkSizeBytes);
+ if (pipeline.getLeaderId() != null) {
+ writeChunksCallsByLeaders.computeIfAbsent(pipeline.getLeaderId(),
+ leader -> registry.newCounter(
+ Interns.info("writeChunkCallsLeader-" + leader,
+ "Number of writeChunk calls on a leader node"),
+ 0L)
+ ).incr();
+ }
+ totalWriteChunkCalls.incr();
+ totalWriteChunkBytes.incr(chunkSizeBytes);
+ }
+
+ @VisibleForTesting
+ public MutableCounterLong getTotalWriteChunkBytes() {
+ return totalWriteChunkBytes;
+ }
+
+ @VisibleForTesting
+ public MutableCounterLong getTotalWriteChunkCalls() {
+ return totalWriteChunkCalls;
+ }
+
+ @VisibleForTesting
+ public Map<PipelineID, MutableCounterLong> getWriteChunkBytesByPipeline() {
+ return writeChunkBytesByPipeline;
+ }
+
+ @VisibleForTesting
+ public Map<PipelineID, MutableCounterLong> getWriteChunkCallsByPipeline() {
+ return writeChunkCallsByPipeline;
+ }
+
+ @VisibleForTesting
+ public Map<UUID, MutableCounterLong> getWriteChunksCallsByLeaders() {
+ return writeChunksCallsByLeaders;
+ }
+}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 599f68a..09fe30e 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
@@ -120,6 +121,8 @@ public class BlockOutputStream extends OutputStream {
private ChunkBuffer currentBuffer;
private final Token<? extends TokenIdentifier> token;
private int replicationIndex;
+ private Pipeline pipeline;
+ private final ContainerClientMetrics clientMetrics;
/**
* Creates a new BlockOutputStream.
@@ -135,7 +138,8 @@ public BlockOutputStream(
Pipeline pipeline,
BufferPool bufferPool,
OzoneClientConfig config,
- Token<? extends TokenIdentifier> token
+ Token<? extends TokenIdentifier> token,
+ ContainerClientMetrics clientMetrics
) throws IOException {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
@@ -177,6 +181,8 @@ public BlockOutputStream(
ioException = new AtomicReference<>(null);
checksum = new Checksum(config.getChecksumType(),
config.getBytesPerChecksum());
+ this.clientMetrics = clientMetrics;
+ this.pipeline = pipeline;
}
void refreshCurrentBuffer() {
@@ -694,7 +700,6 @@ CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
LOG.debug("Writing chunk {} length {} at offset {}",
chunkInfo.getChunkName(), effectiveChunkSize, offset);
}
-
try {
XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
blockID.get(), data, token, replicationIndex);
@@ -717,6 +722,7 @@ CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
throw ce;
});
containerBlockData.addChunks(chunkInfo);
+ clientMetrics.recordWriteChunk(pipeline, chunkInfo.getLen());
return validateFuture;
} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 9f6bf05..9806fd7 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
@@ -66,10 +67,11 @@ public ECBlockOutputStream(
Pipeline pipeline,
BufferPool bufferPool,
OzoneClientConfig config,
- Token<? extends TokenIdentifier> token
+ Token<? extends TokenIdentifier> token,
+ ContainerClientMetrics clientMetrics
) throws IOException {
super(blockID, xceiverClientManager,
- pipeline, bufferPool, config, token);
+ pipeline, bufferPool, config, token, clientMetrics);
// In EC stream, there will be only one node in pipeline.
this.datanodeDetails = pipeline.getClosestNode();
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index 802adc1..7c2d87d 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
@@ -74,9 +75,11 @@ public RatisBlockOutputStream(
Pipeline pipeline,
BufferPool bufferPool,
OzoneClientConfig config,
- Token<? extends TokenIdentifier> token
+ Token<? extends TokenIdentifier> token,
+ ContainerClientMetrics clientMetrics
) throws IOException {
- super(blockID, xceiverClientManager, pipeline, bufferPool, config, token);
+ super(blockID, xceiverClientManager, pipeline,
+ bufferPool, config, token, clientMetrics);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestContainerClientMetrics.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestContainerClientMetrics.java
new file mode 100644
index 0000000..b0939ce
--- /dev/null
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestContainerClientMetrics.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test ContainerClientMetrics.
+ */
+public class TestContainerClientMetrics {
+ @Before
+ public void setup() {
+ while (ContainerClientMetrics.referenceCount > 0) {
+ ContainerClientMetrics.release();
+ }
+ }
+
+ @Test
+ public void testRecordChunkMetrics() {
+ ContainerClientMetrics metrics = ContainerClientMetrics.acquire();
+ PipelineID pipelineId1 = PipelineID.randomId();
+ UUID leaderId1 = UUID.randomUUID();
+ PipelineID pipelineId2 = PipelineID.randomId();
+ UUID leaderId2 = UUID.randomUUID();
+ PipelineID pipelineId3 = PipelineID.randomId();
+
+ metrics.recordWriteChunk(createPipeline(pipelineId1, leaderId1), 10);
+ metrics.recordWriteChunk(createPipeline(pipelineId2, leaderId2), 20);
+ metrics.recordWriteChunk(createPipeline(pipelineId3, leaderId1), 30);
+
+ assertEquals(3, metrics.getTotalWriteChunkCalls().value());
+ assertEquals(60, metrics.getTotalWriteChunkBytes().value());
+
+ assertEquals(3, metrics.getWriteChunkBytesByPipeline().size());
+ assertEquals(10,
+ metrics.getWriteChunkBytesByPipeline().get(pipelineId1).value());
+ assertEquals(20,
+ metrics.getWriteChunkBytesByPipeline().get(pipelineId2).value());
+ assertEquals(30,
+ metrics.getWriteChunkBytesByPipeline().get(pipelineId3).value());
+
+ assertEquals(3, metrics.getWriteChunkCallsByPipeline().size());
+ assertEquals(1,
+ metrics.getWriteChunkCallsByPipeline().get(pipelineId1).value());
+ assertEquals(1,
+ metrics.getWriteChunkCallsByPipeline().get(pipelineId2).value());
+ assertEquals(1,
+ metrics.getWriteChunkCallsByPipeline().get(pipelineId3).value());
+
+ assertEquals(2, metrics.getWriteChunksCallsByLeaders().size());
+ assertEquals(2,
+ metrics.getWriteChunksCallsByLeaders().get(leaderId1).value());
+ assertEquals(1,
+ metrics.getWriteChunksCallsByLeaders().get(leaderId2).value());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testReleaseWithoutUse() {
+ ContainerClientMetrics.release();
+ }
+
+ @Test
+ public void testAcquireAndRelease() {
+ Assertions.assertNotNull(ContainerClientMetrics.acquire());
+ Assertions.assertEquals(1, ContainerClientMetrics.referenceCount);
+ ContainerClientMetrics.release();
+ Assertions.assertEquals(0, ContainerClientMetrics.referenceCount);
+
+ Assertions.assertNotNull(ContainerClientMetrics.acquire());
+ Assertions.assertNotNull(ContainerClientMetrics.acquire());
+ Assertions.assertEquals(2, ContainerClientMetrics.referenceCount);
+ ContainerClientMetrics.release();
+ ContainerClientMetrics.release();
+ Assertions.assertEquals(0, ContainerClientMetrics.referenceCount);
+
+ ContainerClientMetrics.acquire();
+ Assertions.assertNotNull(ContainerClientMetrics.acquire());
+ }
+
+ private Pipeline createPipeline(PipelineID piplineId, UUID leaderId) {
+ return Pipeline.newBuilder()
+ .setId(piplineId)
+ .setReplicationConfig(Mockito.mock(ReplicationConfig.class))
+ .setState(Pipeline.PipelineState.OPEN)
+ .setNodes(Collections.emptyList())
+ .setLeaderId(leaderId)
+ .build();
+ }
+}
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
index 635f1f7..2b0cf12 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
@@ -110,7 +111,8 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
pipeline,
bufferPool,
config,
- null);
+ null,
+ ContainerClientMetrics.acquire());
return outputStream;
}
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
index dc28102..b233254 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
@@ -87,6 +88,7 @@ public static Pipeline createRatisPipeline() {
.setReplicationConfig(
RatisReplicationConfig.getInstance(ReplicationFactor.THREE))
.setNodes(nodes)
+ .setLeaderId(UUID.randomUUID())
.build();
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 0c79cbc..b9da5cb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -100,9 +101,11 @@ public class ECReconstructionCoordinator implements Closeable {
private final BlockInputStreamFactory blockInputStreamFactory;
private final TokenHelper tokenHelper;
+ private final ContainerClientMetrics clientMetrics;
public ECReconstructionCoordinator(ConfigurationSource conf,
- CertificateClient certificateClient) throws IOException {
+ CertificateClient certificateClient)
+ throws IOException {
this.containerOperationClient = new ECContainerOperationClient(conf,
certificateClient);
this.byteBufferPool = new ElasticByteBufferPool();
@@ -117,6 +120,7 @@ public ECReconstructionCoordinator(ConfigurationSource conf,
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
.getInstance(byteBufferPool, () -> ecReconstructExecutor);
tokenHelper = new TokenHelper(conf, certificateClient);
+ this.clientMetrics = ContainerClientMetrics.acquire();
}
public void reconstructECContainerGroup(long containerID,
@@ -242,7 +246,7 @@ void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
this.containerOperationClient.getXceiverClientManager(),
this.containerOperationClient
.singleNodePipeline(datanodeDetails, repConfig), bufferPool,
- configuration, blockLocationInfo.getToken());
+ configuration, blockLocationInfo.getToken(), clientMetrics);
bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
// Make sure it's clean. Don't want to reuse the erroneously returned
// buffers from the pool.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 2b3b756..7431a17 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -58,6 +59,7 @@ public class BlockOutputStreamEntry extends OutputStream {
private final Token<OzoneBlockTokenIdentifier> token;
private BufferPool bufferPool;
+ private ContainerClientMetrics clientMetrics;
@SuppressWarnings({"parameternumber", "squid:S00107"})
BlockOutputStreamEntry(
@@ -67,7 +69,8 @@ public class BlockOutputStreamEntry extends OutputStream {
long length,
BufferPool bufferPool,
Token<OzoneBlockTokenIdentifier> token,
- OzoneClientConfig config
+ OzoneClientConfig config,
+ ContainerClientMetrics clientMetrics
) {
this.config = config;
this.outputStream = null;
@@ -79,6 +82,7 @@ public class BlockOutputStreamEntry extends OutputStream {
this.length = length;
this.currentPosition = 0;
this.bufferPool = bufferPool;
+ this.clientMetrics = clientMetrics;
}
/**
@@ -100,7 +104,11 @@ void checkStream() throws IOException {
*/
void createOutputStream() throws IOException {
outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager,
- pipeline, bufferPool, config, token);
+ pipeline, bufferPool, config, token, clientMetrics);
+ }
+
+ ContainerClientMetrics getClientMetrics() {
+ return clientMetrics;
}
@Override
@@ -331,6 +339,7 @@ public static class Builder {
private BufferPool bufferPool;
private Token<OzoneBlockTokenIdentifier> token;
private OzoneClientConfig config;
+ private ContainerClientMetrics clientMetrics;
public Builder setBlockID(BlockID bID) {
this.blockID = bID;
@@ -372,6 +381,10 @@ public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
this.token = bToken;
return this;
}
+ public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
+ this.clientMetrics = clientMetrics;
+ return this;
+ }
public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(blockID,
@@ -380,7 +393,7 @@ public BlockOutputStreamEntry build() {
pipeline,
length,
bufferPool,
- token, config);
+ token, config, clientMetrics);
}
}
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index bdfa3f2..5b81696 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -79,6 +80,7 @@ public class BlockOutputStreamEntryPool {
private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
private final long openID;
private final ExcludeList excludeList;
+ private final ContainerClientMetrics clientMetrics;
@SuppressWarnings({"parameternumber", "squid:S00107"})
public BlockOutputStreamEntryPool(
@@ -88,7 +90,8 @@ public BlockOutputStreamEntryPool(
String uploadID, int partNumber,
boolean isMultipart, OmKeyInfo info,
boolean unsafeByteBufferConversion,
- XceiverClientFactory xceiverClientFactory, long openID
+ XceiverClientFactory xceiverClientFactory, long openID,
+ ContainerClientMetrics clientMetrics
) {
this.config = config;
this.xceiverClientFactory = xceiverClientFactory;
@@ -110,19 +113,14 @@ public BlockOutputStreamEntryPool(
.getStreamBufferSize()),
ByteStringConversion
.createByteBufferConversion(unsafeByteBufferConversion));
+ this.clientMetrics = clientMetrics;
}
ExcludeList createExcludeList() {
return new ExcludeList();
}
- /**
- * A constructor for testing purpose only.
- *
- * @see KeyOutputStream#KeyOutputStream()
- */
- @VisibleForTesting
- BlockOutputStreamEntryPool() {
+ BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics) {
streamEntries = new ArrayList<>();
omClient = null;
keyArgs = null;
@@ -140,6 +138,7 @@ ExcludeList createExcludeList() {
currentStreamIndex = 0;
openID = -1;
excludeList = new ExcludeList();
+ this.clientMetrics = clientMetrics;
}
/**
@@ -185,6 +184,7 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
.setLength(subKeyInfo.getLength())
.setBufferPool(bufferPool)
.setToken(subKeyInfo.getToken())
+ .setClientMetrics(clientMetrics)
.build();
}
@@ -247,6 +247,10 @@ OzoneClientConfig getConfig() {
return config;
}
+ ContainerClientMetrics getClientMetrics() {
+ return clientMetrics;
+ }
+
/**
* Discards the subsequent pre allocated blocks and removes the streamEntries
* from the streamEntries list for the container which is closed.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 13c6a86..06466af 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -75,9 +76,9 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry {
ECBlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
- OzoneClientConfig config) {
+ OzoneClientConfig config, ContainerClientMetrics clientMetrics) {
super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
- token, config);
+ token, config, clientMetrics);
assertInstanceOf(
pipeline.getReplicationConfig(), ECReplicationConfig.class);
this.replicationConfig =
@@ -96,7 +97,7 @@ void checkStream() throws IOException {
blockOutputStreams[i] =
new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 1),
- getBufferPool(), getConf(), getToken());
+ getBufferPool(), getConf(), getToken(), getClientMetrics());
}
}
}
@@ -395,6 +396,7 @@ public static class Builder {
private BufferPool bufferPool;
private Token<OzoneBlockTokenIdentifier> token;
private OzoneClientConfig config;
+ private ContainerClientMetrics clientMetrics;
public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
this.blockID = bID;
@@ -440,6 +442,12 @@ public ECBlockOutputStreamEntry.Builder setToken(
return this;
}
+ public ECBlockOutputStreamEntry.Builder setClientMetrics(
+ ContainerClientMetrics containerClientMetrics) {
+ this.clientMetrics = containerClientMetrics;
+ return this;
+ }
+
public ECBlockOutputStreamEntry build() {
return new ECBlockOutputStreamEntry(blockID,
key,
@@ -447,7 +455,7 @@ public ECBlockOutputStreamEntry build() {
pipeline,
length,
bufferPool,
- token, config);
+ token, config, clientMetrics);
}
}
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index 4651896..687bb84 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -54,10 +55,11 @@ public ECBlockOutputStreamEntryPool(OzoneClientConfig config,
OmKeyInfo info,
boolean unsafeByteBufferConversion,
XceiverClientFactory xceiverClientFactory,
- long openID) {
+ long openID,
+ ContainerClientMetrics clientMetrics) {
super(config, omClient, requestId, replicationConfig, uploadID, partNumber,
isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory,
- openID);
+ openID, clientMetrics);
assert replicationConfig instanceof ECReplicationConfig;
}
@@ -79,6 +81,7 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
.setLength(subKeyInfo.getLength())
.setBufferPool(getBufferPool())
.setToken(subKeyInfo.getToken())
+ .setClientMetrics(getClientMetrics())
.build();
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index bcc6761..2246b33 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -92,6 +92,7 @@ public List<OmKeyLocationInfo> getLocationInfoList() {
}
private ECKeyOutputStream(Builder builder) {
+ super(builder.getClientMetrics());
this.config = builder.getClientConfig();
this.bufferPool = builder.getByteBufferPool();
// For EC, cell/chunk size and buffer size can be same for now.
@@ -111,7 +112,8 @@ private ECKeyOutputStream(Builder builder) {
builder.getMultipartUploadID(), builder.getMultipartNumber(),
builder.isMultipartKey(),
info, builder.isUnsafeByteBufferConversionEnabled(),
- builder.getXceiverManager(), builder.getOpenHandler().getId());
+ builder.getXceiverManager(), builder.getOpenHandler().getId(),
+ builder.getClientMetrics());
this.writeOffset = 0;
this.encoder = CodecUtil.createRawEncoderWithFallback(
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index ee69ca1..b60b59c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -89,11 +90,7 @@ enum StreamAction {
private long clientID;
- /**
- * A constructor for testing purpose only.
- */
- @VisibleForTesting
- public KeyOutputStream() {
+ public KeyOutputStream(ContainerClientMetrics clientMetrics) {
closed = false;
this.retryPolicyMap = HddsClientUtils.getExceptionList()
.stream()
@@ -101,7 +98,7 @@ public KeyOutputStream() {
e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
retryCount = 0;
offset = 0;
- blockOutputStreamEntryPool = new BlockOutputStreamEntryPool();
+ blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(clientMetrics);
}
@VisibleForTesting
@@ -137,7 +134,8 @@ public KeyOutputStream(
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationConfig replicationConfig,
String uploadID, int partNumber, boolean isMultipart,
- boolean unsafeByteBufferConversion
+ boolean unsafeByteBufferConversion,
+ ContainerClientMetrics clientMetrics
) {
this.config = config;
blockOutputStreamEntryPool =
@@ -149,7 +147,8 @@ public KeyOutputStream(
isMultipart, handler.getKeyInfo(),
unsafeByteBufferConversion,
xceiverClientManager,
- handler.getId());
+ handler.getId(),
+ clientMetrics);
this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
config.getMaxRetryCount(), config.getRetryInterval());
this.retryCount = 0;
@@ -552,6 +551,7 @@ public static class Builder {
private boolean unsafeByteBufferConversion;
private OzoneClientConfig clientConfig;
private ReplicationConfig replicationConfig;
+ private ContainerClientMetrics clientMetrics;
public String getMultipartUploadID() {
return multipartUploadID;
@@ -652,6 +652,15 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) {
return this;
}
+ public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
+ this.clientMetrics = clientMetrics;
+ return this;
+ }
+
+ public ContainerClientMetrics getClientMetrics() {
+ return clientMetrics;
+ }
+
public KeyOutputStream build() {
return new KeyOutputStream(
clientConfig,
@@ -664,7 +673,8 @@ public KeyOutputStream build() {
multipartUploadID,
multipartNumber,
isMultipartKey,
- unsafeByteBufferConversion);
+ unsafeByteBufferConversion,
+ clientMetrics);
}
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index adbf5d7..a588859 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -62,6 +62,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
@@ -204,6 +205,7 @@ public class RpcClient implements ClientProtocol {
private final BlockInputStreamFactory blockInputStreamFactory;
private final OzoneManagerVersion omVersion;
private volatile ExecutorService ecReconstructExecutor;
+ private final ContainerClientMetrics clientMetrics;
/**
* Creates RpcClient instance with the given configuration.
@@ -332,6 +334,7 @@ public void onRemoval(
this.byteBufferPool = new ElasticByteBufferPool();
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
.getInstance(byteBufferPool, this::getECReconstructExecutor);
+ this.clientMetrics = ContainerClientMetrics.acquire();
}
public XceiverClientFactory getXceiverClientManager() {
@@ -1446,6 +1449,7 @@ public void close() throws IOException {
IOUtils.cleanupWithLogger(LOG, ozoneManagerClient, xceiverClientManager);
keyProviderCache.invalidateAll();
keyProviderCache.cleanUp();
+ ContainerClientMetrics.release();
}
@Deprecated
@@ -1940,7 +1944,8 @@ private KeyOutputStream.Builder createKeyOutputStream(OpenKeySession openKey,
.setOmClient(ozoneManagerClient)
.setRequestID(requestId)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
- .setConfig(clientConfig);
+ .setConfig(clientConfig)
+ .setClientMetrics(clientMetrics);
}
@Override