Merge remote-tracking branch 'origin/master' into HDDS-4440-s3-performance
diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
index 073bd9d..9f2116c 100644
--- a/hadoop-hdds/client/pom.xml
+++ b/hadoop-hdds/client/pom.xml
@@ -46,6 +46,10 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>hdds-erasurecode</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
similarity index 100%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
similarity index 95%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
index d1bf7f3..6703216 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
@@ -22,8 +22,8 @@
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.Token;
import java.util.function.Function;
@@ -47,7 +47,7 @@
* @return BlockExtendedInputStream of the correct type.
*/
BlockExtendedInputStream create(ReplicationConfig repConfig,
- OmKeyLocationInfo blockInfo, Pipeline pipeline,
+ BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction);
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
similarity index 96%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 104e5bc..ba05ec2 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -25,10 +25,10 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.Token;
import java.util.concurrent.ExecutorService;
@@ -75,7 +75,7 @@
* @return BlockExtendedInputStream of the correct type.
*/
public BlockExtendedInputStream create(ReplicationConfig repConfig,
- OmKeyLocationInfo blockInfo, Pipeline pipeline,
+ BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
similarity index 97%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index b0e9755..70bd384 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -28,8 +28,8 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +53,7 @@
private final boolean verifyChecksum;
private final XceiverClientFactory xceiverClientFactory;
private final Function<BlockID, Pipeline> refreshFunction;
- private final OmKeyLocationInfo blockInfo;
+ private final BlockLocationInfo blockInfo;
private final DatanodeDetails[] dataLocations;
private final BlockExtendedInputStream[] blockStreams;
private final int maxLocations;
@@ -62,10 +62,6 @@
private boolean closed = false;
private boolean seeked = false;
- protected OmKeyLocationInfo getBlockInfo() {
- return blockInfo;
- }
-
protected ECReplicationConfig getRepConfig() {
return repConfig;
}
@@ -109,7 +105,7 @@
}
public ECBlockInputStream(ECReplicationConfig repConfig,
- OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+ BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory, Function<BlockID,
Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
this.repConfig = repConfig;
@@ -174,7 +170,7 @@
.setState(Pipeline.PipelineState.CLOSED)
.build();
- OmKeyLocationInfo blkInfo = new OmKeyLocationInfo.Builder()
+ BlockLocationInfo blkInfo = new BlockLocationInfo.Builder()
.setBlockID(blockInfo.getBlockID())
.setLength(internalBlockLength(locationIndex + 1))
.setPipeline(blockInfo.getPipeline())
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
similarity index 95%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
index 6c39e93..c9d2b76 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
@@ -23,7 +23,7 @@
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import java.util.List;
import java.util.function.Function;
@@ -52,7 +52,7 @@
*/
BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
- OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+ BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction);
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
similarity index 97%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
index 470df0c..efc3b31 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
@@ -24,8 +24,8 @@
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.io.ByteBufferPool;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -75,7 +75,7 @@
*/
public BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
- OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+ BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction) {
if (missingLocations) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
similarity index 97%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
index ecde9c6..49ee7c7 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
@@ -23,8 +23,8 @@
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +51,7 @@
private final boolean verifyChecksum;
private final XceiverClientFactory xceiverClientFactory;
private final Function<BlockID, Pipeline> refreshFunction;
- private final OmKeyLocationInfo blockInfo;
+ private final BlockLocationInfo blockInfo;
private final ECBlockInputStreamFactory ecBlockInputStreamFactory;
private BlockExtendedInputStream blockReader;
@@ -96,7 +96,7 @@
}
public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
- OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+ BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory, Function<BlockID,
Pipeline> refreshFunction, ECBlockInputStreamFactory streamFactory) {
this.repConfig = repConfig;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
similarity index 100%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
similarity index 99%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index d5ec6db..dc7daf8 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -25,9 +25,9 @@
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
import org.apache.hadoop.io.ByteBufferPool;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.ozone.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
import org.apache.ratis.util.Preconditions;
@@ -116,7 +116,7 @@
@SuppressWarnings("checkstyle:ParameterNumber")
public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
- OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+ BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory, Function<BlockID,
Pipeline> refreshFunction, BlockInputStreamFactory streamFactory,
ByteBufferPool byteBufferPool,
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java
similarity index 100%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/InsufficientLocationsException.java
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java
new file mode 100644
index 0000000..493ece8
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+/**
+ * This package contains Ozone I/O classes.
+ */
diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml
index 58c0788..cf65d8b 100644
--- a/hadoop-hdds/common/pom.xml
+++ b/hadoop-hdds/common/pom.xml
@@ -71,6 +71,10 @@
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-config</artifactId>
</dependency>
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 2a962d7..fc83469 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -403,6 +403,12 @@
public static final String OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT =
"120s";
+ public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL =
+ "ozone.scm.pipeline.scrub.interval";
+ public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT =
+ "5m";
+
+
// Allow SCM to auto create factor ONE ratis pipeline.
public static final String OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE =
"ozone.scm.pipeline.creation.auto.factor.one";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManagerReport.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManagerReport.java
index 2f2a7bf..60e61b2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManagerReport.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManagerReport.java
@@ -107,7 +107,7 @@
for (HddsProtos.KeyContainerIDList sample : proto.getStatSampleList()) {
report.setSample(sample.getKey(), sample.getContainerList()
.stream()
- .map(c -> ContainerID.getFromProtobuf(c))
+ .map(ContainerID::getFromProtobuf)
.collect(Collectors.toList()));
}
return report;
@@ -147,6 +147,34 @@
}
/**
+ * Return a map of all stats and their value as a long.
+ * @return
+ */
+ public Map<String, Long> getStats() {
+ Map<String, Long> result = new HashMap<>();
+ for (Map.Entry<String, LongAdder> e : stats.entrySet()) {
+ result.put(e.getKey(), e.getValue().longValue());
+ }
+ return result;
+ }
+
+ /**
+ * Return a map of all samples, with the stat as the key and the samples
+ * for the stat as a List of Long.
+ * @return
+ */
+ public Map<String, List<Long>> getSamples() {
+ Map<String, List<Long>> result = new HashMap<>();
+ for (Map.Entry<String, List<ContainerID>> e : containerSample.entrySet()) {
+ result.put(e.getKey(),
+ e.getValue().stream()
+ .map(c -> c.getId())
+ .collect(Collectors.toList()));
+ }
+ return result;
+ }
+
+ /**
* Get the stat for the given LifeCycleState. If there is no stat available
* for that stat -1 is returned.
* @param stat The requested stat.
@@ -184,7 +212,11 @@
}
protected void setStat(String stat, long value) {
- LongAdder adder = getStatAndEnsurePresent(stat);
+ LongAdder adder = stats.get(stat);
+ if (adder == null) {
+ // this is an unknown stat, so ignore it.
+ return;
+ }
if (adder.longValue() != 0) {
throw new IllegalStateException(stat + " is expected to be zero");
}
@@ -192,9 +224,11 @@
}
protected void setSample(String stat, List<ContainerID> sample) {
- // First get the stat, as we should not receive a sample for a stat which
- // does not exist.
- getStatAndEnsurePresent(stat);
+ LongAdder adder = stats.get(stat);
+ if (adder == null) {
+ // this is an unknown stat, so ignore it.
+ return;
+ }
// Now check there is not already a sample for this stat
List<ContainerID> existingSample = containerSample.get(stat);
if (existingSample != null) {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
new file mode 100644
index 0000000..286762d
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockLocationInfo.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.util.Objects;
+
+/**
+ * One key can be too huge to fit in one container. In which case it gets split
+ * into a number of subkeys. This class represents one such subkey instance.
+ */
+public class BlockLocationInfo {
+ private final BlockID blockID;
+ private long length;
+ private final long offset;
+ // Block token, required for client authentication when security is enabled.
+ private Token<OzoneBlockTokenIdentifier> token;
+ // the version number indicating when this block was added
+ private long createVersion;
+
+ private Pipeline pipeline;
+
+ // PartNumber is set for Multipart upload Keys.
+ private int partNumber;
+
+ protected BlockLocationInfo(Builder builder) {
+ this.blockID = builder.blockID;
+ this.pipeline = builder.pipeline;
+ this.length = builder.length;
+ this.offset = builder.offset;
+ this.token = builder.token;
+ this.partNumber = builder.partNumber;
+ this.createVersion = builder.createVersion;
+ }
+
+ public void setCreateVersion(long version) {
+ createVersion = version;
+ }
+
+ public long getCreateVersion() {
+ return createVersion;
+ }
+
+ public BlockID getBlockID() {
+ return blockID;
+ }
+
+ public long getContainerID() {
+ return blockID.getContainerID();
+ }
+
+ public long getLocalID() {
+ return blockID.getLocalID();
+ }
+
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public void setLength(long length) {
+ this.length = length;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getBlockCommitSequenceId() {
+ return blockID.getBlockCommitSequenceId();
+ }
+
+ public Token<OzoneBlockTokenIdentifier> getToken() {
+ return token;
+ }
+
+ public void setToken(Token<OzoneBlockTokenIdentifier> token) {
+ this.token = token;
+ }
+
+ public void setPipeline(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ }
+
+ public void setPartNumber(int partNumber) {
+ this.partNumber = partNumber;
+ }
+
+ public int getPartNumber() {
+ return partNumber;
+ }
+
+ /**
+ * Builder of BlockLocationInfo.
+ */
+ public static class Builder {
+ private BlockID blockID;
+ private long length;
+ private long offset;
+ private Token<OzoneBlockTokenIdentifier> token;
+ private Pipeline pipeline;
+ private int partNumber;
+ private long createVersion;
+
+ public Builder setBlockID(BlockID blockId) {
+ this.blockID = blockId;
+ return this;
+ }
+
+ public Builder setPipeline(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ return this;
+ }
+
+ public Builder setLength(long len) {
+ this.length = len;
+ return this;
+ }
+
+ public Builder setOffset(long off) {
+ this.offset = off;
+ return this;
+ }
+
+ public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
+ this.token = bToken;
+ return this;
+ }
+
+ public Builder setPartNumber(int partNum) {
+ this.partNumber = partNum;
+ return this;
+ }
+
+ public Builder setCreateVersion(long version) {
+ this.createVersion = version;
+ return this;
+ }
+
+ public BlockLocationInfo build() {
+ return new BlockLocationInfo(this);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "{blockID={containerID=" + blockID.getContainerID() +
+ ", localID=" + blockID.getLocalID() + "}" +
+ ", length=" + length +
+ ", offset=" + offset +
+ ", token=" + token +
+ ", pipeline=" + pipeline +
+ ", createVersion=" + createVersion +
+ ", partNumber=" + partNumber
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BlockLocationInfo that = (BlockLocationInfo) o;
+ return length == that.length &&
+ offset == that.offset &&
+ createVersion == that.createVersion &&
+ Objects.equals(blockID, that.blockID) &&
+ Objects.equals(token, that.token) &&
+ Objects.equals(pipeline, that.pipeline);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(blockID, length, offset, token, createVersion,
+ pipeline);
+ }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/JsonUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/server/JsonUtils.java
similarity index 100%
rename from hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/JsonUtils.java
rename to hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/server/JsonUtils.java
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 6c76ca6..709cf45 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1369,6 +1369,15 @@
</description>
</property>
<property>
+ <name>ozone.scm.pipeline.scrub.interval</name>
+ <value>5m</value>
+ <tag>OZONE, SCM, PIPELINE</tag>
+ <description>
+ SCM schedules a fixed interval job using the configured interval to
+ scrub pipelines.
+ </description>
+ </property>
+ <property>
<name>ozone.scm.pipeline.creation.auto.factor.one</name>
<value>true</value>
<tag>OZONE, SCM, PIPELINE</tag>
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java
index a05f9ab..a68f70c 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java
@@ -17,16 +17,22 @@
*/
package org.apache.hadoop.hdds.scm.container;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.server.JsonUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
+import static com.fasterxml.jackson.databind.node.JsonNodeType.ARRAY;
+
/**
* Tests for the ReplicationManagerReport class.
*/
@@ -64,6 +70,54 @@
report.getStat(HddsProtos.LifeCycleState.QUASI_CLOSED));
}
+
+ @Test
+ public void testJsonOutput() throws IOException {
+ report.increment(HddsProtos.LifeCycleState.OPEN);
+ report.increment(HddsProtos.LifeCycleState.CLOSED);
+ report.increment(HddsProtos.LifeCycleState.CLOSED);
+
+ report.incrementAndSample(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED,
+ new ContainerID(1));
+ report.incrementAndSample(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED,
+ new ContainerID(2));
+ report.incrementAndSample(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED,
+ new ContainerID(3));
+ report.setComplete();
+
+ String jsonString = JsonUtils.toJsonStringWithDefaultPrettyPrinter(report);
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode json = mapper.readTree(jsonString);
+
+ Assert.assertTrue(json.get("reportTimeStamp").longValue() > 0);
+ JsonNode stats = json.get("stats");
+ Assert.assertEquals(1, stats.get("OPEN").longValue());
+ Assert.assertEquals(0, stats.get("CLOSING").longValue());
+ Assert.assertEquals(0, stats.get("QUASI_CLOSED").longValue());
+ Assert.assertEquals(2, stats.get("CLOSED").longValue());
+ Assert.assertEquals(0, stats.get("DELETING").longValue());
+ Assert.assertEquals(0, stats.get("DELETED").longValue());
+
+ Assert.assertEquals(2, stats.get("UNDER_REPLICATED").longValue());
+ Assert.assertEquals(1, stats.get("OVER_REPLICATED").longValue());
+ Assert.assertEquals(0, stats.get("MIS_REPLICATED").longValue());
+ Assert.assertEquals(0, stats.get("MISSING").longValue());
+ Assert.assertEquals(0, stats.get("UNHEALTHY").longValue());
+ Assert.assertEquals(0, stats.get("EMPTY").longValue());
+ Assert.assertEquals(0, stats.get("OPEN_UNHEALTHY").longValue());
+ Assert.assertEquals(0, stats.get("QUASI_CLOSED_STUCK").longValue());
+
+ JsonNode samples = json.get("samples");
+ Assert.assertEquals(ARRAY, samples.get("UNDER_REPLICATED").getNodeType());
+ Assert.assertEquals(1, samples.get("UNDER_REPLICATED").get(0).longValue());
+ Assert.assertEquals(2, samples.get("UNDER_REPLICATED").get(1).longValue());
+ Assert.assertEquals(3, samples.get("OVER_REPLICATED").get(0).longValue());
+ }
+
@Test
public void testContainerIDsCanBeSampled() {
report.incrementAndSample(
@@ -146,6 +200,35 @@
}
}
+ @Test
+ public void testDeSerializeCanHandleUnknownMetric() {
+ HddsProtos.ReplicationManagerReportProto.Builder proto =
+ HddsProtos.ReplicationManagerReportProto.newBuilder();
+ proto.setTimestamp(12345);
+
+ proto.addStat(HddsProtos.KeyIntValue.newBuilder()
+ .setKey("unknownValue")
+ .setValue(15)
+ .build());
+
+ proto.addStat(HddsProtos.KeyIntValue.newBuilder()
+ .setKey(ReplicationManagerReport.HealthState.UNDER_REPLICATED
+ .toString())
+ .setValue(20)
+ .build());
+
+ HddsProtos.KeyContainerIDList.Builder sample
+ = HddsProtos.KeyContainerIDList.newBuilder();
+ sample.setKey("unknownValue");
+ sample.addContainer(ContainerID.valueOf(1).getProtobuf());
+ proto.addStatSample(sample.build());
+ // Ensure no exception is thrown
+ ReplicationManagerReport newReport =
+ ReplicationManagerReport.fromProtobuf(proto.build());
+ Assert.assertEquals(20, newReport.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ }
+
@Test(expected = IllegalStateException.class)
public void testStatCannotBeSetTwice() {
report.setStat(HddsProtos.LifeCycleState.CLOSED.toString(), 10);
diff --git a/hadoop-hdds/docs/pom.xml b/hadoop-hdds/docs/pom.xml
index fded922..9a6a330 100644
--- a/hadoop-hdds/docs/pom.xml
+++ b/hadoop-hdds/docs/pom.xml
@@ -28,9 +28,10 @@
<name>Apache Ozone/HDDS Documentation</name>
<packaging>jar</packaging>
- <dependencies>
+ <properties>
+ <skipDocs>false</skipDocs>
+ </properties>
- </dependencies>
<build>
<plugins>
<plugin>
@@ -42,11 +43,12 @@
<goal>exec</goal>
</goals>
<phase>compile</phase>
+ <configuration>
+ <executable>../../hadoop-ozone/dev-support/checks/docs.sh</executable>
+ <skip>${skipDocs}</skip>
+ </configuration>
</execution>
</executions>
- <configuration>
- <executable>dev-support/bin/generate-site.sh</executable>
- </configuration>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml
index 35248b7..d981032 100644
--- a/hadoop-hdds/pom.xml
+++ b/hadoop-hdds/pom.xml
@@ -109,6 +109,12 @@
<dependency>
<groupId>org.apache.ozone</groupId>
+ <artifactId>hdds-erasurecode</artifactId>
+ <version>${hdds.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
<artifactId>hdds-client</artifactId>
<version>${hdds.version}</version>
</dependency>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
index 2e7b04f..09d289e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
@@ -461,62 +461,44 @@
*/
private void checkIterationMoveResults(Set<DatanodeDetails> selectedTargets) {
this.countDatanodesInvolvedPerIteration = 0;
- this.sizeMovedPerIteration = 0;
- for (Map.Entry<ContainerMoveSelection,
- CompletableFuture<ReplicationManager.MoveResult>>
- futureEntry : moveSelectionToFutureMap.entrySet()) {
- ContainerMoveSelection moveSelection = futureEntry.getKey();
- CompletableFuture<ReplicationManager.MoveResult> future =
- futureEntry.getValue();
- try {
- ReplicationManager.MoveResult result = future.get(
- config.getMoveTimeout().toMillis(), TimeUnit.MILLISECONDS);
- if (result == ReplicationManager.MoveResult.COMPLETED) {
- try {
- ContainerInfo container =
- containerManager.getContainer(moveSelection.getContainerID());
- this.sizeMovedPerIteration += container.getUsedBytes();
- metrics.incrementNumContainerMovesInLatestIteration(1);
- LOG.info("Container move completed for container {} to target {}",
- container.containerID(),
- moveSelection.getTargetNode().getUuidString());
- } catch (ContainerNotFoundException e) {
- LOG.warn("Could not find Container {} while " +
- "checking move results in ContainerBalancer",
- moveSelection.getContainerID(), e);
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Container move for container {} to target {} failed: {}",
- moveSelection.getContainerID(),
- moveSelection.getTargetNode().getUuidString(), result);
- }
- }
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for container move result for " +
- "container {}.",
- moveSelection.getContainerID(), e);
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- LOG.warn("Container move for container {} completed exceptionally.",
- moveSelection.getContainerID(), e);
- } catch (TimeoutException e) {
- LOG.warn("Container move for container {} timed out.",
- moveSelection.getContainerID(), e);
- }
+
+ CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(
+ moveSelectionToFutureMap.values()
+ .toArray(new CompletableFuture[moveSelectionToFutureMap.size()]));
+ try {
+ allFuturesResult.get(config.getMoveTimeout().toMillis(),
+ TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (TimeoutException e) {
+ long timeoutCounts = moveSelectionToFutureMap.entrySet().stream()
+ .filter(entry -> !entry.getValue().isDone())
+ .peek(entry -> {
+ LOG.warn("Container move canceled for container {} to target {} " +
+ "due to timeout.", entry.getKey().getContainerID(),
+ entry.getKey().getTargetNode().getUuidString());
+ entry.getValue().cancel(true);
+ }).count();
+ LOG.warn("{} Container moves are canceled.", timeoutCounts);
+ metrics.incrementNumContainerMovesTimeoutInLatestIteration(timeoutCounts);
+ } catch (ExecutionException e) {
+ LOG.error("Got exception while checkIterationMoveResults", e);
}
+
countDatanodesInvolvedPerIteration =
sourceToTargetMap.size() + selectedTargets.size();
metrics.incrementNumDatanodesInvolvedInLatestIteration(
countDatanodesInvolvedPerIteration);
- sizeMovedPerIteration /= OzoneConsts.GB;
- metrics.incrementDataSizeMovedGBInLatestIteration(sizeMovedPerIteration);
- metrics.incrementNumContainerMoves(
- metrics.getNumContainerMovesInLatestIteration());
- metrics.incrementDataSizeMovedGB(sizeMovedPerIteration);
+ metrics.incrementNumContainerMovesCompleted(
+ metrics.getNumContainerMovesCompletedInLatestIteration());
+ metrics.incrementNumContainerMovesTimeout(
+ metrics.getNumContainerMovesTimeoutInLatestIteration());
+ metrics.incrementDataSizeMovedGB(
+ metrics.getDataSizeMovedGBInLatestIteration());
LOG.info("Number of datanodes involved in this iteration: {}. Size moved " +
"in this iteration: {}GB.",
- countDatanodesInvolvedPerIteration, sizeMovedPerIteration);
+ countDatanodesInvolvedPerIteration,
+ metrics.getDataSizeMovedGBInLatestIteration());
}
/**
@@ -603,25 +585,49 @@
*/
private boolean moveContainer(DatanodeDetails source,
ContainerMoveSelection moveSelection) {
- ContainerID container = moveSelection.getContainerID();
+ ContainerID containerID = moveSelection.getContainerID();
CompletableFuture<ReplicationManager.MoveResult> future;
try {
+ ContainerInfo containerInfo = containerManager.getContainer(containerID);
future = replicationManager
- .move(container, source, moveSelection.getTargetNode());
+ .move(containerID, source, moveSelection.getTargetNode())
+ .whenComplete((result, ex) -> {
+ if (ex != null) {
+ LOG.info("Container move for container {} from source {} to " +
+ "target {} failed with exceptions {}",
+ containerID.toString(),
+ source.getUuidString(),
+ moveSelection.getTargetNode().getUuidString(), ex);
+ } else {
+ if (result == ReplicationManager.MoveResult.COMPLETED) {
+ metrics.incrementDataSizeMovedGBInLatestIteration(
+ containerInfo.getUsedBytes() / OzoneConsts.GB);
+ metrics.incrementNumContainerMovesCompletedInLatestIteration(1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Container move completed for container {} to target {}",
+ containerID,
+ moveSelection.getTargetNode().getUuidString());
+ }
+ } else {
+ LOG.warn(
+ "Container move for container {} to target {} failed: {}",
+ moveSelection.getContainerID(),
+ moveSelection.getTargetNode().getUuidString(), result);
+ }
+ }
+ });
} catch (ContainerNotFoundException e) {
- LOG.warn("Could not find Container {} for container move", container, e);
+ LOG.warn("Could not find Container {} for container move",
+ containerID, e);
return false;
} catch (NodeNotFoundException e) {
- LOG.warn("Container move failed for container {}", container, e);
+ LOG.warn("Container move failed for container {}", containerID, e);
return false;
}
+
if (future.isDone()) {
if (future.isCompletedExceptionally()) {
- LOG.info("Container move for container {} from source {} to target {}" +
- "failed with exceptions",
- container.toString(),
- source.getUuidString(),
- moveSelection.getTargetNode().getUuidString());
return false;
} else {
ReplicationManager.MoveResult result = future.join();
@@ -785,7 +791,8 @@
this.countDatanodesInvolvedPerIteration = 0;
this.sizeMovedPerIteration = 0;
metrics.resetDataSizeMovedGBInLatestIteration();
- metrics.resetNumContainerMovesInLatestIteration();
+ metrics.resetNumContainerMovesCompletedInLatestIteration();
+ metrics.resetNumContainerMovesTimeoutInLatestIteration();
metrics.resetNumDatanodesInvolvedInLatestIteration();
metrics.resetDataSizeUnbalancedGB();
metrics.resetNumDatanodesUnbalanced();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
index f40dd44..3a7ce49 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerMetrics.java
@@ -40,9 +40,13 @@
" in the latest iteration.")
private MutableCounterLong dataSizeMovedGBInLatestIteration;
- @Metric(about = "Number of container moves performed by Container Balancer " +
- "in the latest iteration.")
- private MutableCounterLong numContainerMovesInLatestIteration;
+ @Metric(about = "Number of completed container moves performed by " +
+ "Container Balancer in the latest iteration.")
+ private MutableCounterLong numContainerMovesCompletedInLatestIteration;
+
+ @Metric(about = "Number of timeout container moves performed by " +
+ "Container Balancer in the latest iteration.")
+ private MutableCounterLong numContainerMovesTimeoutInLatestIteration;
@Metric(about = "Number of iterations that Container Balancer has run for.")
private MutableCounterLong numIterations;
@@ -57,9 +61,13 @@
@Metric(about = "Number of unbalanced datanodes.")
private MutableCounterLong numDatanodesUnbalanced;
- @Metric(about = "Total number of container moves across all iterations of " +
- "Container Balancer.")
- private MutableCounterLong numContainerMoves;
+ @Metric(about = "Total number of completed container moves across all " +
+ "iterations of Container Balancer.")
+ private MutableCounterLong numContainerMovesCompleted;
+
+ @Metric(about = "Total number of timeout container moves across " +
+ "all iterations of Container Balancer.")
+ private MutableCounterLong numContainerMovesTimeout;
@Metric(about = "Total data size in GB moved across all iterations of " +
"Container Balancer.")
@@ -104,17 +112,37 @@
* latest iteration.
* @return number of container moves
*/
- public long getNumContainerMovesInLatestIteration() {
- return numContainerMovesInLatestIteration.value();
+ public long getNumContainerMovesCompletedInLatestIteration() {
+ return numContainerMovesCompletedInLatestIteration.value();
}
- public void incrementNumContainerMovesInLatestIteration(long valueToAdd) {
- this.numContainerMovesInLatestIteration.incr(valueToAdd);
+ public void incrementNumContainerMovesCompletedInLatestIteration(
+ long valueToAdd) {
+ this.numContainerMovesCompletedInLatestIteration.incr(valueToAdd);
}
- public void resetNumContainerMovesInLatestIteration() {
- numContainerMovesInLatestIteration.incr(
- -getNumContainerMovesInLatestIteration());
+ public void resetNumContainerMovesCompletedInLatestIteration() {
+ numContainerMovesCompletedInLatestIteration.incr(
+ -getNumContainerMovesCompletedInLatestIteration());
+ }
+
+ /**
+ * Gets the number of timeout container moves performed by
+ * Container Balancer in the latest iteration.
+ * @return number of timeout container moves
+ */
+ public long getNumContainerMovesTimeoutInLatestIteration() {
+ return numContainerMovesTimeoutInLatestIteration.value();
+ }
+
+ public void incrementNumContainerMovesTimeoutInLatestIteration(
+ long valueToAdd) {
+ this.numContainerMovesTimeoutInLatestIteration.incr(valueToAdd);
+ }
+
+ public void resetNumContainerMovesTimeoutInLatestIteration() {
+ numContainerMovesTimeoutInLatestIteration.incr(
+ -getNumContainerMovesTimeoutInLatestIteration());
}
/**
@@ -179,12 +207,20 @@
numDatanodesUnbalanced.incr(-getNumDatanodesUnbalanced());
}
- public long getNumContainerMoves() {
- return numContainerMoves.value();
+ public long getNumContainerMovesCompleted() {
+ return numContainerMovesCompleted.value();
}
- public void incrementNumContainerMoves(long valueToAdd) {
- numContainerMoves.incr(valueToAdd);
+ public void incrementNumContainerMovesCompleted(long valueToAdd) {
+ numContainerMovesCompleted.incr(valueToAdd);
+ }
+
+ public long getNumContainerMovesTimeout() {
+ return numContainerMovesTimeout.value();
+ }
+
+ public void incrementNumContainerMovesTimeout(long valueToAdd) {
+ numContainerMovesTimeout.incr(valueToAdd);
}
public long getDataSizeMovedGB() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 6fed1e2..c9eb683 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -29,7 +29,6 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
-import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
@@ -88,9 +87,7 @@
BackgroundPipelineCreator(PipelineManager pipelineManager,
- ConfigurationSource conf,
- SCMServiceManager serviceManager,
- SCMContext scmContext) {
+ ConfigurationSource conf, SCMContext scmContext) {
this.pipelineManager = pipelineManager;
this.conf = conf;
this.scmContext = scmContext;
@@ -109,9 +106,6 @@
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
- // register BackgroundPipelineCreator to SCMServiceManager
- serviceManager.register(this);
-
// start RatisPipelineUtilsThread
start();
}
@@ -223,13 +217,6 @@
continue;
}
list.add(replicationConfig);
- if (!pipelineManager.getSafeModeStatus()) {
- try {
- pipelineManager.scrubPipeline(replicationConfig);
- } catch (IOException e) {
- LOG.error("Error while scrubbing pipelines.", e);
- }
- }
}
LoopingIterator it = new LoopingIterator(list);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java
new file mode 100644
index 0000000..2063ac3
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineScrubber.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Background service to clean up pipelines with following conditions.
+ * - CLOSED
+ * - ALLOCATED for too long
+ */
+public class BackgroundPipelineScrubber implements SCMService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BackgroundPipelineScrubber.class);
+
+ private static final String THREAD_NAME = "PipelineScrubberThread";
+
+ private final PipelineManager pipelineManager;
+ private final ConfigurationSource conf;
+ private final SCMContext scmContext;
+
+ private final Lock serviceLock = new ReentrantLock();
+ private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private Thread scrubThread;
+ private final long intervalInMillis;
+ private final long waitTimeInMillis;
+ private long lastTimeToBeReadyInMillis = 0;
+
+ public BackgroundPipelineScrubber(PipelineManager pipelineManager,
+ ConfigurationSource conf, SCMContext scmContext) {
+ this.pipelineManager = pipelineManager;
+ this.conf = conf;
+ this.scmContext = scmContext;
+
+ this.intervalInMillis = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ this.waitTimeInMillis = conf.getTimeDuration(
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ start();
+ }
+
+ @Override
+ public void notifyStatusChanged() {
+ serviceLock.lock();
+ try {
+ if (scmContext.isLeaderReady() && !scmContext.isInSafeMode()) {
+ if (serviceStatus != ServiceStatus.RUNNING) {
+ LOG.info("Service {} transitions to RUNNING.", getServiceName());
+ serviceStatus = ServiceStatus.RUNNING;
+ lastTimeToBeReadyInMillis = Time.monotonicNow();
+ }
+ } else {
+ if (serviceStatus != ServiceStatus.PAUSING) {
+ LOG.info("Service {} transitions to PAUSING.", getServiceName());
+ serviceStatus = ServiceStatus.PAUSING;
+ }
+ }
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean shouldRun() {
+ serviceLock.lock();
+ try {
+ // If safe mode is off, then this SCMService starts to run with a delay.
+ return serviceStatus == ServiceStatus.RUNNING &&
+ Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ @Override
+ public String getServiceName() {
+ return BackgroundPipelineScrubber.class.getSimpleName();
+ }
+
+ @Override
+ public void start() {
+ if (!running.compareAndSet(false, true)) {
+ LOG.info("Pipeline Scrubber Service is already running, skip start.");
+ return;
+ }
+ LOG.info("Starting Pipeline Scrubber Service.");
+
+ scrubThread = new Thread(this::run);
+ scrubThread.setName(THREAD_NAME);
+ scrubThread.setDaemon(true);
+ scrubThread.start();
+ }
+
+ @Override
+ public void stop() {
+ synchronized (this) {
+ if (!running.compareAndSet(true, false)) {
+ LOG.info("Pipeline Scrubber Service is not running, skip stop.");
+ return;
+ }
+ notifyAll();
+ }
+ LOG.info("Stopping Pipeline Scrubber Service.");
+ }
+
+ @VisibleForTesting
+ public boolean getRunning() {
+ return running.get();
+ }
+
+ private void run() {
+ while (running.get()) {
+ try {
+ if (shouldRun()) {
+ scrubAllPipelines();
+ }
+ synchronized (this) {
+ wait(intervalInMillis);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} is interrupted, exit", THREAD_NAME);
+ Thread.currentThread().interrupt();
+ running.set(false);
+ }
+ }
+ }
+
+ private void scrubAllPipelines() {
+ try {
+ pipelineManager.scrubPipelines();
+ } catch (IOException e) {
+ LOG.error("Unexpected error during pipeline scrubbing", e);
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 6a50876..ffce314 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -111,8 +111,7 @@
void closePipeline(Pipeline pipeline, boolean onTimeout) throws IOException;
- void scrubPipeline(ReplicationConfig replicationConfig)
- throws IOException;
+ void scrubPipelines() throws IOException;
void startPipelineCreator();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 19b7a51..83b037c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -76,6 +76,7 @@
private PipelineFactory pipelineFactory;
private PipelineStateManager stateManager;
private BackgroundPipelineCreator backgroundPipelineCreator;
+ private BackgroundPipelineScrubber backgroundPipelineScrubber;
private final ConfigurationSource conf;
private final EventPublisher eventPublisher;
// Pipeline Manager MXBean
@@ -141,10 +142,16 @@
// Create background thread.
BackgroundPipelineCreator backgroundPipelineCreator =
- new BackgroundPipelineCreator(
- pipelineManager, conf, serviceManager, scmContext);
+ new BackgroundPipelineCreator(pipelineManager, conf, scmContext);
+
+ BackgroundPipelineScrubber backgroundPipelineScrubber =
+ new BackgroundPipelineScrubber(pipelineManager, conf, scmContext);
pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
+ pipelineManager.setBackgroundPipelineScrubber(backgroundPipelineScrubber);
+
+ serviceManager.register(backgroundPipelineCreator);
+ serviceManager.register(backgroundPipelineScrubber);
return pipelineManager;
}
@@ -408,15 +415,14 @@
* Scrub pipelines.
*/
@Override
- public void scrubPipeline(ReplicationConfig config)
- throws IOException {
+ public void scrubPipelines() throws IOException {
Instant currentTime = Instant.now();
Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
- List<Pipeline> candidates = stateManager.getPipelines(config);
+ List<Pipeline> candidates = stateManager.getPipelines();
for (Pipeline p : candidates) {
// scrub pipelines who stay ALLOCATED for too long.
@@ -437,7 +443,6 @@
removePipeline(p);
}
}
- return;
}
/**
@@ -591,6 +596,9 @@
if (backgroundPipelineCreator != null) {
backgroundPipelineCreator.stop();
}
+ if (backgroundPipelineScrubber != null) {
+ backgroundPipelineScrubber.stop();
+ }
if (pmInfoBean != null) {
MBeans.unregister(this.pmInfoBean);
@@ -639,6 +647,16 @@
return this.backgroundPipelineCreator;
}
+ private void setBackgroundPipelineScrubber(
+ BackgroundPipelineScrubber backgroundPipelineScrubber) {
+ this.backgroundPipelineScrubber = backgroundPipelineScrubber;
+ }
+
+ @VisibleForTesting
+ public BackgroundPipelineScrubber getBackgroundPipelineScrubber() {
+ return this.backgroundPipelineScrubber;
+ }
+
@VisibleForTesting
public PipelineFactory getPipelineFactory() {
return pipelineFactory;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
index 0163152..48069c5 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
@@ -642,6 +642,40 @@
containerBalancer.stop();
}
+ @Test
+ public void checkIterationResultTimeout()
+ throws NodeNotFoundException, ContainerNotFoundException {
+
+ Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
+ Mockito.any(DatanodeDetails.class),
+ Mockito.any(DatanodeDetails.class)))
+ .thenReturn(genCompletableFuture(500), genCompletableFuture(2000));
+
+ balancerConfiguration.setThreshold(10);
+ balancerConfiguration.setIterations(1);
+ balancerConfiguration.setMaxSizeEnteringTarget(10 * OzoneConsts.GB);
+ balancerConfiguration.setMaxSizeToMovePerIteration(100 * OzoneConsts.GB);
+ balancerConfiguration.setMaxDatanodesPercentageToInvolvePerIteration(100);
+ balancerConfiguration.setMoveTimeout(Duration.ofMillis(1000));
+
+ startBalancer(balancerConfiguration);
+ sleepWhileBalancing(2000);
+
+ /*
+ According to the setup and configurations, this iteration's result should
+ be ITERATION_COMPLETED.
+ */
+ Assert.assertEquals(ContainerBalancer.IterationResult.ITERATION_COMPLETED,
+ containerBalancer.getIterationResult());
+ Assert.assertEquals(1,
+ containerBalancer.getMetrics()
+ .getNumContainerMovesCompletedInLatestIteration());
+ Assert.assertTrue(containerBalancer.getMetrics()
+ .getNumContainerMovesTimeoutInLatestIteration() > 1);
+ containerBalancer.stop();
+
+ }
+
/**
* Determines unBalanced nodes, that is, over and under utilized nodes,
* according to the generated utilization values for nodes and the threshold.
@@ -834,4 +868,16 @@
}
}
+ private CompletableFuture<ReplicationManager.MoveResult>
+ genCompletableFuture(int sleepMilSec) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ Thread.sleep(sleepMilSec);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return ReplicationManager.MoveResult.COMPLETED;
+ });
+ }
+
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 523efc0..932d2d5 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -218,8 +218,7 @@
}
@Override
- public void scrubPipeline(ReplicationConfig replicationConfig)
- throws IOException {
+ public void scrubPipelines() {
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java
new file mode 100644
index 0000000..dfa5284
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestBackgroundPipelineScrubber.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test for {@link BackgroundPipelineScrubber}.
+ */
+public class TestBackgroundPipelineScrubber {
+
+ private BackgroundPipelineScrubber scrubber;
+ private SCMContext scmContext;
+ private PipelineManager pipelineManager;
+ private OzoneConfiguration conf;
+
+ @Before
+ public void setup() throws IOException {
+ this.scmContext = SCMContext.emptyContext();
+ this.pipelineManager = mock(PipelineManager.class);
+ doNothing().when(pipelineManager).scrubPipelines();
+
+ // no initial delay after exit safe mode
+ this.conf = new OzoneConfiguration();
+ conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "0ms");
+
+ this.scrubber = new BackgroundPipelineScrubber(pipelineManager, conf,
+ scmContext);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ scrubber.stop();
+ }
+
+ @Test
+ public void testStop() {
+ assertTrue(scrubber.getRunning());
+ scrubber.stop();
+ assertFalse(scrubber.getRunning());
+ }
+
+ @Test
+ public void testNotifyStatusChanged() {
+ // init at PAUSING
+ assertFalse(scrubber.shouldRun());
+
+ // out of safe mode, PAUSING -> RUNNING
+ scrubber.notifyStatusChanged();
+ assertTrue(scrubber.shouldRun());
+
+ // go into safe mode, RUNNING -> PAUSING
+ scmContext.updateSafeModeStatus(new SafeModeStatus(true, true));
+ scrubber.notifyStatusChanged();
+ assertFalse(scrubber.shouldRun());
+ }
+
+ @Test
+ public void testRun() throws IOException {
+ assertFalse(scrubber.shouldRun());
+ // kick a run
+ synchronized (scrubber) {
+ scrubber.notifyStatusChanged();
+ assertTrue(scrubber.shouldRun());
+ scrubber.notifyAll();
+ }
+ verify(pipelineManager, timeout(3000).times(1)).scrubPipelines();
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index fd29929..64ef02e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -535,7 +535,7 @@
}
@Test
- public void testScrubPipeline() throws Exception {
+ public void testScrubPipelines() throws Exception {
// No timeout for pipeline scrubber.
conf.setTimeDuration(
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
@@ -543,33 +543,50 @@
PipelineManagerImpl pipelineManager = createPipelineManager(true);
pipelineManager.setScmContext(scmContext);
- Pipeline pipeline = pipelineManager
+ Pipeline allocatedPipeline = pipelineManager
.createPipeline(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE));
// At this point, pipeline is not at OPEN stage.
Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
- pipeline.getPipelineState());
+ allocatedPipeline.getPipelineState());
// pipeline should be seen in pipelineManager as ALLOCATED.
Assert.assertTrue(pipelineManager
.getPipelines(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE),
- Pipeline.PipelineState.ALLOCATED).contains(pipeline));
- pipelineManager
- .scrubPipeline(RatisReplicationConfig
- .getInstance(ReplicationFactor.THREE));
+ Pipeline.PipelineState.ALLOCATED).contains(allocatedPipeline));
- // pipeline should be scrubbed.
+ Pipeline closedPipeline = pipelineManager
+ .createPipeline(RatisReplicationConfig
+ .getInstance(ReplicationFactor.THREE));
+ pipelineManager.openPipeline(closedPipeline.getId());
+ pipelineManager.closePipeline(closedPipeline, true);
+
+ // pipeline should be seen in pipelineManager as CLOSED.
+ Assert.assertTrue(pipelineManager
+ .getPipelines(RatisReplicationConfig
+ .getInstance(ReplicationFactor.THREE),
+ Pipeline.PipelineState.CLOSED).contains(closedPipeline));
+
+ pipelineManager.scrubPipelines();
+
+ // The allocatedPipeline should be scrubbed.
Assert.assertFalse(pipelineManager
.getPipelines(RatisReplicationConfig
.getInstance(ReplicationFactor.THREE),
- Pipeline.PipelineState.ALLOCATED).contains(pipeline));
+ Pipeline.PipelineState.ALLOCATED).contains(allocatedPipeline));
+
+ // The closedPipeline should be scrubbed.
+ Assert.assertFalse(pipelineManager
+ .getPipelines(RatisReplicationConfig
+ .getInstance(ReplicationFactor.THREE),
+ Pipeline.PipelineState.CLOSED).contains(closedPipeline));
pipelineManager.close();
}
@Test
- public void testScrubPipelineShouldFailOnFollower() throws Exception {
+ public void testScrubPipelinesShouldFailOnFollower() throws Exception {
// No timeout for pipeline scrubber.
conf.setTimeDuration(
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
@@ -595,9 +612,7 @@
((SCMHAManagerStub) pipelineManager.getScmhaManager()).setIsLeader(false);
try {
- pipelineManager
- .scrubPipeline(RatisReplicationConfig
- .getInstance(ReplicationFactor.THREE));
+ pipelineManager.scrubPipelines();
} catch (NotLeaderException ex) {
pipelineManager.close();
return;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 1062274..823c2dd 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -354,6 +354,7 @@
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
pipelineManager.getBackgroundPipelineCreator().stop();
+ pipelineManager.getBackgroundPipelineScrubber().stop();
for (int i = 0; i < pipelineCount; i++) {
// Create pipeline
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
index 37bea18..25ed0e5 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java
@@ -63,11 +63,6 @@
description = "Format output as JSON")
private boolean json;
- @CommandLine.Option(names = { "--replicas" },
- defaultValue = "false",
- description = "Adds replica related details")
- private boolean addReplicaDetails;
-
@Parameters(description = "Decimal id of the container.")
private long containerID;
@@ -108,7 +103,7 @@
LOG.info("Datanodes: [{}]", machinesStr);
// Print the replica details if available
- if (addReplicaDetails && replicas != null) {
+ if (replicas != null) {
String replicaStr = replicas.stream().map(
InfoSubcommand::buildReplicaDetails)
.collect(Collectors.joining(",\n"));
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReportSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReportSubcommand.java
index 89b1a11..554316c 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReportSubcommand.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReportSubcommand.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.server.JsonUtils;
import picocli.CommandLine;
import java.io.IOException;
@@ -43,9 +44,20 @@
@CommandLine.Spec
private CommandLine.Model.CommandSpec spec;
+ @CommandLine.Option(names = { "--json" },
+ defaultValue = "false",
+ description = "Format output as JSON")
+ private boolean json;
+
@Override
public void execute(ScmClient scmClient) throws IOException {
ReplicationManagerReport report = scmClient.getReplicationManagerReport();
+
+ if (json) {
+ output(JsonUtils.toJsonStringWithDefaultPrettyPrinter(report));
+ return;
+ }
+
outputHeader(report.getReportTimeStamp());
blankLine();
outputContainerStats(report);
diff --git a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
index c604019..10b5758 100644
--- a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
+++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
@@ -97,7 +97,7 @@
.thenReturn(getReplicas(includeIndex));
cmd = new InfoSubcommand();
CommandLine c = new CommandLine(cmd);
- c.parseArgs("1", "--replicas");
+ c.parseArgs("1");
cmd.execute(scmClient);
// Ensure we have a line for Replicas:
diff --git a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestReportSubCommand.java b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestReportSubCommand.java
index be0e2c8..df8dd95 100644
--- a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestReportSubCommand.java
+++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestReportSubCommand.java
@@ -17,14 +17,18 @@
*/
package org.apache.hadoop.hdds.scm.cli.container;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import picocli.CommandLine;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -88,6 +92,26 @@
}
@Test
+ public void testValidJsonOutput() throws IOException {
+ // More complete testing of the Report JSON output is in
+ // TestReplicationManagerReport.
+ ScmClient scmClient = mock(ScmClient.class);
+ Mockito.when(scmClient.getReplicationManagerReport())
+ .thenAnswer(invocation -> new ReplicationManagerReport());
+
+ CommandLine c = new CommandLine(cmd);
+ c.parseArgs("--json");
+ cmd.execute(scmClient);
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode json = mapper.readTree(outContent.toString("UTF-8"));
+
+ Assert.assertTrue(json.get("reportTimeStamp") != null);
+ Assert.assertTrue(json.get("stats") != null);
+ Assert.assertTrue(json.get("samples") != null);
+ }
+
+ @Test
public void testCorrectValuesAppearInReport() throws IOException {
ScmClient scmClient = mock(ScmClient.class);
Mockito.when(scmClient.getReplicationManagerReport())
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
index 9df7518..9ff1ca9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
@@ -19,156 +19,72 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.UnknownPipelineStateException;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
import org.apache.hadoop.ozone.protocolPB.OzonePBHelper;
import org.apache.hadoop.security.token.Token;
-import java.util.Objects;
-
/**
* One key can be too huge to fit in one container. In which case it gets split
* into a number of subkeys. This class represents one such subkey instance.
*/
-public final class OmKeyLocationInfo {
- private final BlockID blockID;
- // the id of this subkey in all the subkeys.
- private long length;
- private final long offset;
- // Block token, required for client authentication when security is enabled.
- private Token<OzoneBlockTokenIdentifier> token;
- // the version number indicating when this block was added
- private long createVersion;
+public final class OmKeyLocationInfo extends BlockLocationInfo {
- private Pipeline pipeline;
-
- // PartNumber is set for Multipart upload Keys.
- private int partNumber = -1;
-
- private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length,
- long offset, int partNumber) {
- this.blockID = blockID;
- this.pipeline = pipeline;
- this.length = length;
- this.offset = offset;
- this.partNumber = partNumber;
- }
-
- private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length,
- long offset, Token<OzoneBlockTokenIdentifier> token, int partNumber) {
- this.blockID = blockID;
- this.pipeline = pipeline;
- this.length = length;
- this.offset = offset;
- this.token = token;
- this.partNumber = partNumber;
- }
-
- public void setCreateVersion(long version) {
- createVersion = version;
- }
-
- public long getCreateVersion() {
- return createVersion;
- }
-
- public BlockID getBlockID() {
- return blockID;
- }
-
- public long getContainerID() {
- return blockID.getContainerID();
- }
-
- public long getLocalID() {
- return blockID.getLocalID();
- }
-
- public Pipeline getPipeline() {
- return pipeline;
- }
-
- public long getLength() {
- return length;
- }
-
- public void setLength(long length) {
- this.length = length;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public long getBlockCommitSequenceId() {
- return blockID.getBlockCommitSequenceId();
- }
-
- public Token<OzoneBlockTokenIdentifier> getToken() {
- return token;
- }
-
- public void setToken(Token<OzoneBlockTokenIdentifier> token) {
- this.token = token;
- }
-
- public void setPipeline(Pipeline pipeline) {
- this.pipeline = pipeline;
- }
-
- public void setPartNumber(int partNumber) {
- this.partNumber = partNumber;
- }
-
- public int getPartNumber() {
- return partNumber;
+ private OmKeyLocationInfo(Builder builder) {
+ super(builder);
}
/**
* Builder of OmKeyLocationInfo.
*/
- public static class Builder {
- private BlockID blockID;
- private long length;
- private long offset;
- private Token<OzoneBlockTokenIdentifier> token;
- private Pipeline pipeline;
- private int partNumber;
+ public static class Builder extends BlockLocationInfo.Builder {
- public Builder setBlockID(BlockID blockId) {
- this.blockID = blockId;
+ @Override
+ public Builder setBlockID(BlockID blockID) {
+ super.setBlockID(blockID);
return this;
}
- @SuppressWarnings("checkstyle:hiddenfield")
+ @Override
public Builder setPipeline(Pipeline pipeline) {
- this.pipeline = pipeline;
+ super.setPipeline(pipeline);
return this;
}
- public Builder setLength(long len) {
- this.length = len;
+ @Override
+ public Builder setLength(long length) {
+ super.setLength(length);
return this;
}
- public Builder setOffset(long off) {
- this.offset = off;
+ @Override
+ public Builder setOffset(long offset) {
+ super.setOffset(offset);
return this;
}
- public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
- this.token = bToken;
+ @Override
+ public Builder setToken(Token<OzoneBlockTokenIdentifier> token) {
+ super.setToken(token);
return this;
}
- public Builder setPartNumber(int partNum) {
- this.partNumber = partNum;
+ @Override
+ public Builder setPartNumber(int partNumber) {
+ super.setPartNumber(partNumber);
return this;
}
+ @Override
+ public Builder setCreateVersion(long version) {
+ super.setCreateVersion(version);
+ return this;
+ }
+
+ @Override
public OmKeyLocationInfo build() {
- return new OmKeyLocationInfo(blockID, pipeline, length, offset, token,
- partNumber);
+ return new OmKeyLocationInfo(this);
}
}
@@ -178,13 +94,15 @@
public KeyLocation getProtobuf(boolean ignorePipeline, int clientVersion) {
KeyLocation.Builder builder = KeyLocation.newBuilder()
- .setBlockID(blockID.getProtobuf())
- .setLength(length)
- .setOffset(offset)
- .setCreateVersion(createVersion).setPartNumber(partNumber);
+ .setBlockID(getBlockID().getProtobuf())
+ .setLength(getLength())
+ .setOffset(getOffset())
+ .setCreateVersion(getCreateVersion())
+ .setPartNumber(getPartNumber());
if (!ignorePipeline) {
try {
- if (this.token != null) {
+ Token<OzoneBlockTokenIdentifier> token = getToken();
+ if (token != null) {
builder.setToken(OzonePBHelper.protoFromToken(token));
}
@@ -200,7 +118,8 @@
// TODO: this needs to be revisited when bucket versioning
// implementation is handled.
- if (this.pipeline != null) {
+ Pipeline pipeline = getPipeline();
+ if (pipeline != null) {
builder.setPipeline(pipeline.getProtobufMessage(clientVersion));
}
} catch (UnknownPipelineStateException e) {
@@ -220,51 +139,18 @@
}
public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
- OmKeyLocationInfo info = new OmKeyLocationInfo(
- BlockID.getFromProtobuf(keyLocation.getBlockID()),
- getPipeline(keyLocation),
- keyLocation.getLength(),
- keyLocation.getOffset(), keyLocation.getPartNumber());
+ Builder builder = new Builder()
+ .setBlockID(BlockID.getFromProtobuf(keyLocation.getBlockID()))
+ .setLength(keyLocation.getLength())
+ .setOffset(keyLocation.getOffset())
+ .setPipeline(getPipeline(keyLocation))
+ .setCreateVersion(keyLocation.getCreateVersion())
+ .setPartNumber(keyLocation.getPartNumber());
if (keyLocation.hasToken()) {
- info.token = (Token<OzoneBlockTokenIdentifier>)
- OzonePBHelper.tokenFromProto(keyLocation.getToken());
+ builder.setToken((Token<OzoneBlockTokenIdentifier>)
+ OzonePBHelper.tokenFromProto(keyLocation.getToken()));
}
- info.setCreateVersion(keyLocation.getCreateVersion());
- return info;
+ return builder.build();
}
- @Override
- public String toString() {
- return "{blockID={containerID=" + blockID.getContainerID() +
- ", localID=" + blockID.getLocalID() + "}" +
- ", length=" + length +
- ", offset=" + offset +
- ", token=" + token +
- ", pipeline=" + pipeline +
- ", createVersion=" + createVersion + ", partNumber=" + partNumber
- + '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- OmKeyLocationInfo that = (OmKeyLocationInfo) o;
- return length == that.length &&
- offset == that.offset &&
- createVersion == that.createVersion &&
- Objects.equals(blockID, that.blockID) &&
- Objects.equals(token, that.token) &&
- Objects.equals(pipeline, that.pipeline);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(blockID, length, offset, token, createVersion,
- pipeline);
- }
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
index 11dc5b2..f597150 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
@@ -585,6 +585,10 @@
omLockMetrics.unRegister();
}
+ public OMLockMetrics getOMLockMetrics() {
+ return omLockMetrics;
+ }
+
/**
* Resource defined in Ozone.
*/
diff --git a/hadoop-ozone/dev-support/checks/build.sh b/hadoop-ozone/dev-support/checks/build.sh
index 26d66b8..42edeb5 100755
--- a/hadoop-ozone/dev-support/checks/build.sh
+++ b/hadoop-ozone/dev-support/checks/build.sh
@@ -17,5 +17,5 @@
cd "$DIR/../../.." || exit 1
export MAVEN_OPTS="-Xmx4096m $MAVEN_OPTS"
-mvn -V -B -Dmaven.javadoc.skip=true -DskipTests clean install "$@"
+mvn -V -B -Dmaven.javadoc.skip=true -DskipTests -DskipDocs clean install "$@"
exit $?
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
index 4c4bdc6..0bbf8c9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
@@ -90,6 +90,7 @@
import org.apache.ozone.test.LambdaTestUtils;
import org.apache.ozone.test.TestClock;
+import org.apache.ozone.test.tag.Flaky;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -1297,6 +1298,7 @@
* since fs.rename(src,dst,options) is enabled.
*/
@Test
+ @Flaky("HDDS-6646")
public void testRenameToTrashEnabled() throws Exception {
// Create a file
String testKeyName = "testKey1";
@@ -1326,6 +1328,7 @@
* 2.Verify that the key gets deleted by the trash emptier.
*/
@Test
+ @Flaky("HDDS-6645")
public void testTrash() throws Exception {
String testKeyName = "testKey2";
Path path = new Path(OZONE_URI_DELIMITER, testKeyName);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
index 29f3326..695d22d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
@@ -61,6 +61,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.LambdaTestUtils;
+import org.apache.ozone.test.tag.Flaky;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -1260,6 +1261,7 @@
* fs.rename(src, dst, options).
*/
@Test
+ @Flaky({"HDDS-5819", "HDDS-6451"})
public void testRenameToTrashEnabled() throws IOException {
// Create a file
String testKeyName = "testKey2";
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
index 0503dbe..f3649ea 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
@@ -26,10 +26,10 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.Token;
import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
@@ -57,7 +57,7 @@
private ECStreamTestUtil() {
}
- public static OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+ public static BlockLocationInfo createKeyInfo(ReplicationConfig repConf,
long blockLength, Map<DatanodeDetails, Integer> dnMap) {
Pipeline pipeline = Pipeline.newBuilder()
@@ -68,7 +68,7 @@
.setReplicationConfig(repConf)
.build();
- OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
+ BlockLocationInfo keyInfo = new BlockLocationInfo.Builder()
.setBlockID(new BlockID(1, 1))
.setLength(blockLength)
.setOffset(0)
@@ -78,7 +78,7 @@
return keyInfo;
}
- public static OmKeyLocationInfo createKeyInfo(ReplicationConfig repConf,
+ public static BlockLocationInfo createKeyInfo(ReplicationConfig repConf,
int nodeCount, long blockLength) {
Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
for (int i = 0; i < nodeCount; i++) {
@@ -257,7 +257,7 @@
public synchronized BlockExtendedInputStream create(
ReplicationConfig repConfig,
- OmKeyLocationInfo blockInfo, Pipeline pipeline,
+ BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
index 17a0a6f..a9f9407 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
@@ -25,12 +25,12 @@
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.client.io.BadDataLocationException;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Before;
@@ -65,7 +65,7 @@
@Test
public void testSufficientLocations() {
// EC-3-2, 5MB block, so all 3 data locations are needed
- OmKeyLocationInfo keyInfo = ECStreamTestUtil
+ BlockLocationInfo keyInfo = ECStreamTestUtil
.createKeyInfo(repConfig, 5, 5 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
@@ -116,7 +116,7 @@
public void testCorrectBlockSizePassedToBlockStreamLessThanCell()
throws IOException {
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB - 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
@@ -133,7 +133,7 @@
public void testCorrectBlockSizePassedToBlockStreamTwoCells()
throws IOException {
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB + 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
@@ -149,7 +149,7 @@
public void testCorrectBlockSizePassedToBlockStreamThreeCells()
throws IOException {
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
@@ -166,7 +166,7 @@
public void testCorrectBlockSizePassedToBlockStreamThreeFullAndPartialStripe()
throws IOException {
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
@@ -183,7 +183,7 @@
public void testCorrectBlockSizePassedToBlockStreamSingleFullCell()
throws IOException {
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
@@ -198,7 +198,7 @@
public void testCorrectBlockSizePassedToBlockStreamSeveralFullCells()
throws IOException {
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 9 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
@@ -213,7 +213,7 @@
@Test
public void testSimpleRead() throws IOException {
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -236,7 +236,7 @@
*/
@Test
public void testSimpleReadUnderOneChunk() throws IOException {
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 1, ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -255,7 +255,7 @@
@Test
public void testReadPastEOF() throws IOException {
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 50);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -274,7 +274,7 @@
// EC-3-2, 5MB block, so all 3 data locations are needed
repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
100);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -309,7 +309,7 @@
public void testSeekPastBlockLength() throws IOException {
repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
ONEMB);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -321,7 +321,7 @@
public void testSeekToLength() throws IOException {
repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
ONEMB);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -333,7 +333,7 @@
public void testSeekToLengthZeroLengthBlock() throws IOException {
repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
ONEMB);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 0);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -347,7 +347,7 @@
public void testSeekToValidPosition() throws IOException {
repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
ONEMB);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -376,7 +376,7 @@
public void testErrorReadingBlockReportsBadLocation() throws IOException {
repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
ONEMB);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
@@ -417,7 +417,7 @@
}
public synchronized BlockExtendedInputStream create(
- ReplicationConfig repConfig, OmKeyLocationInfo blockInfo,
+ ReplicationConfig repConfig, BlockLocationInfo blockInfo,
Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum, XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java
index fd4e8ad..a45ab43 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java
@@ -24,10 +24,10 @@
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.ozone.client.io.BadDataLocationException;
import org.apache.hadoop.ozone.client.io.ECBlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -93,7 +93,7 @@
public void testAvailableDataLocations() {
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
- OmKeyLocationInfo blockInfo =
+ BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
Assert.assertEquals(1, ECBlockInputStreamProxy.availableDataLocations(
blockInfo.getPipeline(), 1));
@@ -120,7 +120,7 @@
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
- OmKeyLocationInfo blockInfo =
+ BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
@@ -135,7 +135,7 @@
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
- OmKeyLocationInfo blockInfo =
+ BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
@@ -150,7 +150,7 @@
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
- OmKeyLocationInfo blockInfo =
+ BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
dataGenerator = new SplittableRandom(randomSeed);
@@ -172,7 +172,7 @@
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
- OmKeyLocationInfo blockInfo =
+ BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
@@ -203,7 +203,7 @@
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
- OmKeyLocationInfo blockInfo =
+ BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
ByteBuffer readBuffer = ByteBuffer.allocate(100);
@@ -231,7 +231,7 @@
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(2, 3, 4, 5);
- OmKeyLocationInfo blockInfo =
+ BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
ByteBuffer readBuffer = ByteBuffer.allocate(100);
@@ -259,7 +259,7 @@
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
- OmKeyLocationInfo blockInfo =
+ BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
ByteBuffer readBuffer = ByteBuffer.allocate(100);
@@ -301,7 +301,7 @@
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
- OmKeyLocationInfo blockInfo =
+ BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
ByteBuffer readBuffer = ByteBuffer.allocate(100);
@@ -352,7 +352,7 @@
}
private ECBlockInputStreamProxy createBISProxy(ECReplicationConfig rConfig,
- OmKeyLocationInfo blockInfo) {
+ BlockLocationInfo blockInfo) {
return new ECBlockInputStreamProxy(
rConfig, blockInfo, true, null, null, streamFactory);
}
@@ -382,7 +382,7 @@
@Override
public BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedDatanodes,
- ReplicationConfig repConfig, OmKeyLocationInfo blockInfo,
+ ReplicationConfig repConfig, BlockLocationInfo blockInfo,
boolean verifyChecksum, XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction) {
this.failedLocations = failedDatanodes;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
index ee3f50a..823ddd5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
@@ -20,11 +20,11 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedInputStream;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -72,7 +72,7 @@
private ECBlockReconstructedStripeInputStream createStripeInputStream(
Map<DatanodeDetails, Integer> dnMap, long blockLength) {
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index 0eff8f9..b111b12 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
@@ -26,7 +27,6 @@
import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
import org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputStreamFactory;
import org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputStream;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.junit.After;
import org.junit.Assert;
@@ -82,7 +82,7 @@
@Test
public void testSufficientLocations() throws IOException {
// One chunk, only 1 location.
- OmKeyLocationInfo keyInfo = ECStreamTestUtil
+ BlockLocationInfo keyInfo = ECStreamTestUtil
.createKeyInfo(repConfig, 1, ONEMB);
try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
Assert.assertTrue(ecb.hasSufficientLocations());
@@ -168,7 +168,7 @@
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
- OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+ BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3 + partialStripeSize, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
@@ -223,7 +223,7 @@
// from the parity and padded blocks 2 and 3.
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(4, 5);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
dataGen = new SplittableRandom(randomSeed);
@@ -265,7 +265,7 @@
// from the parity and padded blocks 2 and 3.
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(4, 5);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
dataGen = new SplittableRandom(randomSeed);
@@ -322,7 +322,7 @@
addDataStreamsToFactory(dataBufs, parity);
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
dataGen = new SplittableRandom(randomSeed);
@@ -365,7 +365,7 @@
// from the parity and padded blocks 2 and 3.
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(4, 5);
- OmKeyLocationInfo keyInfo =
+ BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
try (ECBlockReconstructedStripeInputStream ecb =
@@ -407,7 +407,7 @@
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
- OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+ BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3 + partialStripeSize, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
@@ -461,7 +461,7 @@
public void testSeekToPartialOffsetFails() {
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 4, 5);
- OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+ BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
@@ -503,7 +503,7 @@
// Data block index 3 is missing and needs recovered initially.
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
- OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+ BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3 + partialStripeSize, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
@@ -583,7 +583,7 @@
// when containers are reported by SCM.
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
- OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+ BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
@@ -609,7 +609,7 @@
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
- OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+ BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3 + partialStripeSize, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
@@ -643,7 +643,7 @@
}
private ECBlockReconstructedStripeInputStream createInputStream(
- OmKeyLocationInfo keyInfo) {
+ BlockLocationInfo keyInfo) {
return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
null, null, streamFactory, bufferPool, ecReconstructExecutor);
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOmBucketReadWriteFileOps.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOmBucketReadWriteFileOps.java
new file mode 100644
index 0000000..a87b6a6
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOmBucketReadWriteFileOps.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.lock.OMLockMetrics;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Test for OmBucketReadWriteFileOps.
+ */
+public class TestOmBucketReadWriteFileOps {
+
+ private String path;
+ private OzoneConfiguration conf = null;
+ private MiniOzoneCluster cluster = null;
+ private ObjectStore store = null;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestOmBucketReadWriteFileOps.class);
+
+ @Before
+ public void setup() {
+ path = GenericTestUtils
+ .getTempPath(TestOmBucketReadWriteFileOps.class.getSimpleName());
+ GenericTestUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
+ File baseDir = new File(path);
+ baseDir.mkdirs();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ private void shutdown() throws IOException {
+ if (cluster != null) {
+ cluster.shutdown();
+ FileUtils.deleteDirectory(new File(path));
+ }
+ }
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ *
+ * @throws IOException
+ */
+ private void startCluster() throws Exception {
+ conf = getOzoneConfiguration();
+ conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
+ BucketLayout.LEGACY.name());
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
+ cluster.waitForClusterToBeReady();
+ cluster.waitTobeOutOfSafeMode();
+
+ store = OzoneClientFactory.getRpcClient(conf).getObjectStore();
+ }
+
+ protected OzoneConfiguration getOzoneConfiguration() {
+ return new OzoneConfiguration();
+ }
+
+ @Test
+ public void testOmBucketReadWriteFileOps() throws Exception {
+ try {
+ startCluster();
+ FileOutputStream out = FileUtils.openOutputStream(new File(path,
+ "conf"));
+ cluster.getConf().writeXml(out);
+ out.getFD().sync();
+ out.close();
+
+ verifyFreonCommand(new ParameterBuilder().setTotalThreadCount(10)
+ .setNumOfReadOperations(10).setNumOfWriteOperations(5)
+ .setFileCountForRead(10).setFileCountForWrite(5));
+ verifyFreonCommand(
+ new ParameterBuilder().setVolumeName("vol2").setBucketName("bucket1")
+ .setPrefixFilePath("/dir1/dir2/dir3").setTotalThreadCount(10)
+ .setNumOfReadOperations(10).setNumOfWriteOperations(5)
+ .setFileCountForRead(10).setFileCountForWrite(5));
+ verifyFreonCommand(
+ new ParameterBuilder().setVolumeName("vol3").setBucketName("bucket1")
+ .setPrefixFilePath("/").setTotalThreadCount(15)
+ .setNumOfReadOperations(5).setNumOfWriteOperations(3)
+ .setFileCountForRead(5).setFileCountForWrite(3));
+ verifyFreonCommand(
+ new ParameterBuilder().setVolumeName("vol4").setBucketName("bucket1")
+ .setPrefixFilePath("/dir1/").setTotalThreadCount(10)
+ .setNumOfReadOperations(5).setNumOfWriteOperations(3)
+ .setFileCountForRead(5).setFileCountForWrite(3).
+ setFileSizeInBytes(64).setBufferSize(16));
+ verifyFreonCommand(
+ new ParameterBuilder().setVolumeName("vol5").setBucketName("bucket1")
+ .setPrefixFilePath("/dir1/dir2/dir3").setTotalThreadCount(10)
+ .setNumOfReadOperations(5).setNumOfWriteOperations(0)
+ .setFileCountForRead(5));
+ verifyFreonCommand(
+ new ParameterBuilder().setVolumeName("vol6").setBucketName("bucket1")
+ .setPrefixFilePath("/dir1/dir2/dir3/dir4").setTotalThreadCount(20)
+ .setNumOfReadOperations(0).setNumOfWriteOperations(5)
+ .setFileCountForRead(0).setFileCountForWrite(5));
+ } finally {
+ shutdown();
+ }
+ }
+
+ private void verifyFreonCommand(ParameterBuilder parameterBuilder)
+ throws IOException {
+ store.createVolume(parameterBuilder.volumeName);
+ OzoneVolume volume = store.getVolume(parameterBuilder.volumeName);
+ volume.createBucket(parameterBuilder.bucketName);
+ String rootPath = "o3fs://" + parameterBuilder.bucketName + "." +
+ parameterBuilder.volumeName + parameterBuilder.prefixFilePath;
+ String confPath = new File(path, "conf").getAbsolutePath();
+ new Freon().execute(
+ new String[]{"-conf", confPath, "obrwf", "-P", rootPath,
+ "-r", String.valueOf(parameterBuilder.fileCountForRead),
+ "-w", String.valueOf(parameterBuilder.fileCountForWrite),
+ "-g", String.valueOf(parameterBuilder.fileSizeInBytes),
+ "-b", String.valueOf(parameterBuilder.bufferSize),
+ "-l", String.valueOf(parameterBuilder.length),
+ "-c", String.valueOf(parameterBuilder.totalThreadCount),
+ "-T", String.valueOf(parameterBuilder.readThreadPercentage),
+ "-R", String.valueOf(parameterBuilder.numOfReadOperations),
+ "-W", String.valueOf(parameterBuilder.numOfWriteOperations),
+ "-n", String.valueOf(1)});
+
+ LOG.info("Started verifying OM bucket read/write ops file generation...");
+ FileSystem fileSystem = FileSystem.get(URI.create(rootPath),
+ conf);
+ Path rootDir = new Path(rootPath.concat(OzoneConsts.OM_KEY_PREFIX));
+ FileStatus[] fileStatuses = fileSystem.listStatus(rootDir);
+ verifyFileCreation(2, fileStatuses, true);
+
+ Path readDir = new Path(rootPath.concat("/readPath"));
+ FileStatus[] readFileStatuses = fileSystem.listStatus(readDir);
+ verifyFileCreation(parameterBuilder.fileCountForRead, readFileStatuses,
+ false);
+
+ int readThreadCount = (parameterBuilder.readThreadPercentage *
+ parameterBuilder.totalThreadCount) / 100;
+ int writeThreadCount = parameterBuilder.totalThreadCount - readThreadCount;
+
+ Path writeDir = new Path(rootPath.concat("/writePath"));
+ FileStatus[] writeFileStatuses = fileSystem.listStatus(writeDir);
+ verifyFileCreation(writeThreadCount * parameterBuilder.fileCountForWrite *
+ parameterBuilder.numOfWriteOperations, writeFileStatuses, false);
+
+ verifyOMLockMetrics(cluster.getOzoneManager().getMetadataManager().getLock()
+ .getOMLockMetrics());
+ }
+
+ private void verifyFileCreation(int expectedCount, FileStatus[] fileStatuses,
+ boolean checkDirectoryCount) {
+ int actual = 0;
+ if (checkDirectoryCount) {
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.isDirectory()) {
+ ++actual;
+ }
+ }
+ } else {
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.isFile()) {
+ ++actual;
+ }
+ }
+ }
+ Assert.assertEquals("Mismatch Count!", expectedCount, actual);
+ }
+
+ private void verifyOMLockMetrics(OMLockMetrics omLockMetrics) {
+ String readLockWaitingTimeMsStat =
+ omLockMetrics.getReadLockWaitingTimeMsStat();
+ LOG.info("Read Lock Waiting Time Stat: " + readLockWaitingTimeMsStat);
+ LOG.info("Longest Read Lock Waiting Time (ms): " +
+ omLockMetrics.getLongestReadLockWaitingTimeMs());
+ int readWaitingSamples =
+ Integer.parseInt(readLockWaitingTimeMsStat.split(" ")[2]);
+ Assert.assertTrue("Read Lock Waiting Samples should be positive",
+ readWaitingSamples > 0);
+
+ String readLockHeldTimeMsStat = omLockMetrics.getReadLockHeldTimeMsStat();
+ LOG.info("Read Lock Held Time Stat: " + readLockHeldTimeMsStat);
+ LOG.info("Longest Read Lock Held Time (ms): " +
+ omLockMetrics.getLongestReadLockHeldTimeMs());
+ int readHeldSamples =
+ Integer.parseInt(readLockHeldTimeMsStat.split(" ")[2]);
+ Assert.assertTrue("Read Lock Held Samples should be positive",
+ readHeldSamples > 0);
+
+ String writeLockWaitingTimeMsStat =
+ omLockMetrics.getWriteLockWaitingTimeMsStat();
+ LOG.info("Write Lock Waiting Time Stat: " + writeLockWaitingTimeMsStat);
+ LOG.info("Longest Write Lock Waiting Time (ms): " +
+ omLockMetrics.getLongestWriteLockWaitingTimeMs());
+ int writeWaitingSamples =
+ Integer.parseInt(writeLockWaitingTimeMsStat.split(" ")[2]);
+ Assert.assertTrue("Write Lock Waiting Samples should be positive",
+ writeWaitingSamples > 0);
+
+ String writeLockHeldTimeMsStat = omLockMetrics.getWriteLockHeldTimeMsStat();
+ LOG.info("Write Lock Held Time Stat: " + writeLockHeldTimeMsStat);
+ LOG.info("Longest Write Lock Held Time (ms): " +
+ omLockMetrics.getLongestWriteLockHeldTimeMs());
+ int writeHeldSamples =
+ Integer.parseInt(writeLockHeldTimeMsStat.split(" ")[2]);
+ Assert.assertTrue("Write Lock Held Samples should be positive",
+ writeHeldSamples > 0);
+ }
+
+ private static class ParameterBuilder {
+
+ private String volumeName = "vol1";
+ private String bucketName = "bucket1";
+ private String prefixFilePath = "/dir1/dir2";
+ private int fileCountForRead = 100;
+ private int fileCountForWrite = 10;
+ private long fileSizeInBytes = 256;
+ private int bufferSize = 64;
+ private int length = 10;
+ private int totalThreadCount = 100;
+ private int readThreadPercentage = 90;
+ private int numOfReadOperations = 50;
+ private int numOfWriteOperations = 10;
+
+ private ParameterBuilder setVolumeName(String volumeNameParam) {
+ volumeName = volumeNameParam;
+ return this;
+ }
+
+ private ParameterBuilder setBucketName(String bucketNameParam) {
+ bucketName = bucketNameParam;
+ return this;
+ }
+
+ private ParameterBuilder setPrefixFilePath(String prefixFilePathParam) {
+ prefixFilePath = prefixFilePathParam;
+ return this;
+ }
+
+ private ParameterBuilder setFileCountForRead(int fileCountForReadParam) {
+ fileCountForRead = fileCountForReadParam;
+ return this;
+ }
+
+ private ParameterBuilder setFileCountForWrite(int fileCountForWriteParam) {
+ fileCountForWrite = fileCountForWriteParam;
+ return this;
+ }
+
+ private ParameterBuilder setFileSizeInBytes(long fileSizeInBytesParam) {
+ fileSizeInBytes = fileSizeInBytesParam;
+ return this;
+ }
+
+ private ParameterBuilder setBufferSize(int bufferSizeParam) {
+ bufferSize = bufferSizeParam;
+ return this;
+ }
+
+ private ParameterBuilder setLength(int lengthParam) {
+ length = lengthParam;
+ return this;
+ }
+
+ private ParameterBuilder setTotalThreadCount(int totalThreadCountParam) {
+ totalThreadCount = totalThreadCountParam;
+ return this;
+ }
+
+ private ParameterBuilder setReadThreadPercentage(
+ int readThreadPercentageParam) {
+ readThreadPercentage = readThreadPercentageParam;
+ return this;
+ }
+
+ private ParameterBuilder setNumOfReadOperations(
+ int numOfReadOperationsParam) {
+ numOfReadOperations = numOfReadOperationsParam;
+ return this;
+ }
+
+ private ParameterBuilder setNumOfWriteOperations(
+ int numOfWriteOperationsParam) {
+ numOfWriteOperations = numOfWriteOperationsParam;
+ return this;
+ }
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
index 0bb5011..e1874c3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.log4j.Logger;
+import org.apache.ozone.test.tag.Flaky;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
@@ -281,6 +282,7 @@
}
@Test
+ @Flaky("HDDS-6644")
public void testReadRequest() throws Exception {
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
ObjectStore objectStore = getObjectStore();
@@ -318,6 +320,7 @@
}
@Test
+ @Flaky("HDDS-6642")
public void testListVolumes() throws Exception {
String userName = UserGroupInformation.getCurrentUser().getUserName();
ObjectStore objectStore = getObjectStore();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java
index 8862b0f..82dd430 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestParentAcl.java
@@ -45,6 +45,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.tag.Flaky;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -129,6 +130,7 @@
// LIST LIST READ (V1 LIST=>READ)
// READ_ACL READ_ACL READ (V1 READ_ACL=>READ)
@Test
+ @Flaky("HDDS-6335")
public void testKeyAcl()
throws IOException {
OzoneObj keyObj;
diff --git a/hadoop-ozone/ozonefs-shaded/pom.xml b/hadoop-ozone/ozonefs-shaded/pom.xml
index 46419e1..9d26a77 100644
--- a/hadoop-ozone/ozonefs-shaded/pom.xml
+++ b/hadoop-ozone/ozonefs-shaded/pom.xml
@@ -53,6 +53,10 @@
<artifactId>hadoop-annotations</artifactId>
</exclusion>
<exclusion>
+ <groupId>org.apache.hadoop.thirdparty</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
index c5b9a3a..914b709 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
@@ -63,7 +63,8 @@
GeneratorDatanode.class,
ClosedContainerReplicator.class,
StreamingGenerator.class,
- SCMThroughputBenchmark.class},
+ SCMThroughputBenchmark.class,
+ OmBucketReadWriteFileOps.class},
versionProvider = HddsVersionProvider.class,
mixinStandardHelpOptions = true)
public class Freon extends GenericCli {
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmBucketReadWriteFileOps.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmBucketReadWriteFileOps.java
new file mode 100644
index 0000000..401e71b1
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmBucketReadWriteFileOps.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import com.codahale.metrics.Timer;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+
+/**
+ * Synthetic read/write file operations workload generator tool.
+ */
+@Command(name = "obrwf",
+ aliases = "om-bucket-read-write-file-ops",
+ description = "Creates files, performs respective read/write " +
+ "operations to measure lock performance.",
+ versionProvider = HddsVersionProvider.class,
+ mixinStandardHelpOptions = true,
+ showDefaultValues = true)
+
+public class OmBucketReadWriteFileOps extends BaseFreonGenerator
+ implements Callable<Void> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OmBucketReadWriteFileOps.class);
+
+ @Option(names = {"-P", "--root-path"},
+ description = "Root path",
+ defaultValue = "o3fs://bucket1.vol1/dir1/dir2")
+ private String rootPath;
+
+ @Option(names = {"-r", "--file-count-for-read"},
+ description = "Number of files to be written in the read directory.",
+ defaultValue = "100")
+ private int fileCountForRead;
+
+ @Option(names = {"-w", "--file-count-for-write"},
+ description = "Number of files to be written in the write directory.",
+ defaultValue = "10")
+ private int fileCountForWrite;
+
+ @Option(names = {"-g", "--file-size"},
+ description = "Generated data size (in bytes) of each file to be " +
+ "written in each directory.",
+ defaultValue = "256")
+ private long fileSizeInBytes;
+
+ @Option(names = {"-b", "--buffer"},
+ description = "Size of buffer used to generated the file content.",
+ defaultValue = "64")
+ private int bufferSize;
+
+ @Option(names = {"-l", "--name-len"},
+ description = "Length of the random name of directory you want to " +
+ "create.",
+ defaultValue = "10")
+ private int length;
+
+ @Option(names = {"-c", "--total-thread-count"},
+ description = "Total number of threads to be executed.",
+ defaultValue = "100")
+ private int totalThreadCount;
+
+ @Option(names = {"-T", "--read-thread-percentage"},
+ description = "Percentage of the total number of threads to be " +
+ "allocated for read operations. The remaining percentage of " +
+ "threads will be allocated for write operations.",
+ defaultValue = "90")
+ private int readThreadPercentage;
+
+ @Option(names = {"-R", "--num-of-read-operations"},
+ description = "Number of read operations to be performed by each thread.",
+ defaultValue = "50")
+ private int numOfReadOperations;
+
+ @Option(names = {"-W", "--num-of-write-operations"},
+ description = "Number of write operations to be performed by each " +
+ "thread.",
+ defaultValue = "10")
+ private int numOfWriteOperations;
+
+ private Timer timer;
+
+ private ContentGenerator contentGenerator;
+
+ private FileSystem fileSystem;
+
+ private int readThreadCount;
+ private int writeThreadCount;
+
+ @Override
+ public Void call() throws Exception {
+ init();
+
+ readThreadCount = (readThreadPercentage * totalThreadCount) / 100;
+ writeThreadCount = totalThreadCount - readThreadCount;
+
+ print("rootPath: " + rootPath);
+ print("fileCountForRead: " + fileCountForRead);
+ print("fileCountForWrite: " + fileCountForWrite);
+ print("fileSizeInBytes: " + fileSizeInBytes);
+ print("bufferSize: " + bufferSize);
+ print("totalThreadCount: " + totalThreadCount);
+ print("readThreadPercentage: " + readThreadPercentage);
+ print("writeThreadPercentage: " + (100 - readThreadPercentage));
+ print("readThreadCount: " + readThreadCount);
+ print("writeThreadCount: " + writeThreadCount);
+ print("numOfReadOperations: " + numOfReadOperations);
+ print("numOfWriteOperations: " + numOfWriteOperations);
+
+ OzoneConfiguration configuration = createOzoneConfiguration();
+ fileSystem = FileSystem.get(URI.create(rootPath), configuration);
+
+ contentGenerator = new ContentGenerator(fileSizeInBytes, bufferSize);
+ timer = getMetrics().timer("file-create");
+
+ runTests(this::mainMethod);
+ return null;
+ }
+
+ private void mainMethod(long counter) throws Exception {
+
+ int readResult = readOperations();
+ int writeResult = writeOperations();
+
+ print("Total Files Read: " + readResult);
+ print("Total Files Written: " + writeResult * fileCountForWrite);
+
+ // TODO: print read/write lock metrics (HDDS-6435, HDDS-6436).
+ }
+
+ private int readOperations() throws Exception {
+
+ // Create fileCountForRead (defaultValue = 1000) files under
+ // rootPath/readPath directory
+ String readPath =
+ rootPath.concat(OzoneConsts.OM_KEY_PREFIX).concat("readPath");
+ fileSystem.mkdirs(new Path(readPath));
+ createFiles(readPath, fileCountForRead);
+
+ // Start readThreadCount (defaultValue = 90) concurrent read threads
+ // performing numOfReadOperations (defaultValue = 50) iterations
+ // of read operations (fileSystem.listStatus(rootPath/readPath))
+ ExecutorService readService = Executors.newFixedThreadPool(readThreadCount);
+ CompletionService<Integer> readExecutorCompletionService =
+ new ExecutorCompletionService<>(readService);
+ List<Future<Integer>> readFutures = new ArrayList<>();
+ for (int i = 0; i < readThreadCount; i++) {
+ readFutures.add(readExecutorCompletionService.submit(() -> {
+ int readCount = 0;
+ try {
+ for (int j = 0; j < numOfReadOperations; j++) {
+ FileStatus[] status =
+ fileSystem.listStatus(new Path(readPath));
+ readCount += status.length;
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception while listing status ", e);
+ }
+ return readCount;
+ }));
+ }
+
+ int readResult = 0;
+ for (int i = 0; i < readFutures.size(); i++) {
+ try {
+ readResult += readExecutorCompletionService.take().get();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+ readService.shutdown();
+
+ return readResult;
+ }
+
+ private int writeOperations() throws Exception {
+
+ // Start writeThreadCount (defaultValue = 10) concurrent write threads
+ // performing numOfWriteOperations (defaultValue = 10) iterations
+ // of write operations (createFiles(rootPath/writePath))
+ String writePath =
+ rootPath.concat(OzoneConsts.OM_KEY_PREFIX).concat("writePath");
+ fileSystem.mkdirs(new Path(writePath));
+
+ ExecutorService writeService =
+ Executors.newFixedThreadPool(writeThreadCount);
+ CompletionService<Integer> writeExecutorCompletionService =
+ new ExecutorCompletionService<>(writeService);
+ List<Future<Integer>> writeFutures = new ArrayList<>();
+ for (int i = 0; i < writeThreadCount; i++) {
+ writeFutures.add(writeExecutorCompletionService.submit(() -> {
+ int writeCount = 0;
+ try {
+ for (int j = 0; j < numOfWriteOperations; j++) {
+ createFiles(writePath, fileCountForWrite);
+ writeCount++;
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception while creating file ", e);
+ }
+ return writeCount;
+ }));
+ }
+
+ int writeResult = 0;
+ for (int i = 0; i < writeFutures.size(); i++) {
+ try {
+ writeResult += writeExecutorCompletionService.take().get();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+ writeService.shutdown();
+
+ return writeResult;
+ }
+
+ private void createFile(String dir, long counter) throws Exception {
+ String fileName = dir.concat(OzoneConsts.OM_KEY_PREFIX)
+ .concat(RandomStringUtils.randomAlphanumeric(length));
+ Path file = new Path(fileName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("FilePath:{}", file);
+ }
+ timer.time(() -> {
+ try (FSDataOutputStream output = fileSystem.create(file)) {
+ contentGenerator.write(output);
+ }
+ return null;
+ });
+ }
+
+ private void createFiles(String dir, int fileCount) throws Exception {
+ for (int i = 0; i < fileCount; i++) {
+ createFile(dir, i);
+ }
+ }
+}