blob: 4751672867f31e774ffdb8aeb4404e64a428481c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
package org.apache.hadoop.ozone.freon;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.UniformReservoir;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.ParentCommand;
* Data generator tool to generate as much keys as possible.
@Command(name = "randomkeys",
aliases = "rk",
description = "Generate volumes/buckets and put generated keys.",
versionProvider = HddsVersionProvider.class,
mixinStandardHelpOptions = true,
showDefaultValues = true)
public final class RandomKeyGenerator implements Callable<Void> {
private Freon freon;
enum FreonOps {
private static final String DURATION_FORMAT = "HH:mm:ss,SSS";
private static final int QUANTILES = 10;
private static final int CHECK_INTERVAL_MILLIS = 5000;
private byte[] keyValueBuffer = null;
private static final String DIGEST_ALGORITHM = "MD5";
// A common initial MesssageDigest for each key without its UUID
private MessageDigest commonInitialMD = null;
private static final Logger LOG =
private volatile boolean completed = false;
private volatile Throwable exception;
@Option(names = "--numOfThreads",
description = "number of threads to be launched for the run",
defaultValue = "10")
private int numOfThreads = 10;
@Option(names = "--numOfVolumes",
description = "specifies number of Volumes to be created in offline mode",
defaultValue = "10")
private int numOfVolumes = 10;
@Option(names = "--numOfBuckets",
description = "specifies number of Buckets to be created per Volume",
defaultValue = "1000")
private int numOfBuckets = 1000;
names = "--numOfKeys",
description = "specifies number of Keys to be created per Bucket",
defaultValue = "500000"
private int numOfKeys = 500000;
names = "--keySize",
description = "Specifies the size of Key in bytes to be created",
defaultValue = "10240"
private long keySize = 10240;
names = "--validateWrites",
description = "Specifies whether to validate keys after writing"
private boolean validateWrites = false;
names = "--bufferSize",
description = "Specifies the buffer size while writing",
defaultValue = "4096"
private int bufferSize = 4096;
names = "--json",
description = "directory where json is created."
private String jsonDir;
names = "--replicationType",
description = "Replication type (STAND_ALONE, RATIS)",
defaultValue = "STAND_ALONE"
private ReplicationType type = ReplicationType.STAND_ALONE;
names = "--factor",
description = "Replication factor (ONE, THREE)",
defaultValue = "ONE"
private ReplicationFactor factor = ReplicationFactor.ONE;
names = "--om-service-id",
description = "OM Service ID"
private String omServiceID = null;
private int threadPoolSize;
private OzoneClient ozoneClient;
private ObjectStore objectStore;
private ExecutorService executor;
private long startTime;
private long jobStartTime;
private AtomicLong volumeCreationTime;
private AtomicLong bucketCreationTime;
private AtomicLong keyCreationTime;
private AtomicLong keyWriteTime;
private AtomicLong totalBytesWritten;
private int totalBucketCount;
private long totalKeyCount;
private AtomicInteger volumeCounter;
private AtomicInteger bucketCounter;
private AtomicLong keyCounter;
private Map<Integer, OzoneVolume> volumes;
private Map<Integer, OzoneBucket> buckets;
private AtomicInteger numberOfVolumesCreated;
private AtomicInteger numberOfBucketsCreated;
private AtomicLong numberOfKeysAdded;
private Long totalWritesValidated;
private Long writeValidationSuccessCount;
private Long writeValidationFailureCount;
private BlockingQueue<KeyValidate> validationQueue;
private ArrayList<Histogram> histograms = new ArrayList<>();
private OzoneConfiguration ozoneConfiguration;
private ProgressBar progressbar;
RandomKeyGenerator() {
RandomKeyGenerator(OzoneConfiguration ozoneConfiguration) {
this.ozoneConfiguration = ozoneConfiguration;
public void init(OzoneConfiguration configuration) throws IOException {
startTime = System.nanoTime();
jobStartTime = System.currentTimeMillis();
volumeCreationTime = new AtomicLong();
bucketCreationTime = new AtomicLong();
keyCreationTime = new AtomicLong();
keyWriteTime = new AtomicLong();
totalBytesWritten = new AtomicLong();
numberOfVolumesCreated = new AtomicInteger();
numberOfBucketsCreated = new AtomicInteger();
numberOfKeysAdded = new AtomicLong();
volumeCounter = new AtomicInteger();
bucketCounter = new AtomicInteger();
keyCounter = new AtomicLong();
volumes = new ConcurrentHashMap<>();
buckets = new ConcurrentHashMap<>();
if (omServiceID != null) {
ozoneClient = OzoneClientFactory.getRpcClient(omServiceID, configuration);
} else {
ozoneClient = OzoneClientFactory.getRpcClient(configuration);
objectStore = ozoneClient.getObjectStore();
for (FreonOps ops : FreonOps.values()) {
histograms.add(ops.ordinal(), new Histogram(new UniformReservoir()));
if (freon != null) {
public Void call() throws Exception {
if (ozoneConfiguration != null) {
if (!ozoneConfiguration.getBoolean(
HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA_DEFAULT)) {"Override validateWrites to false, because "
+ HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA + " is set to false.");
validateWrites = false;
} else {
keyValueBuffer = DFSUtil.string2Bytes(
// Compute the common initial digest for all keys without their UUID
if (validateWrites) {
commonInitialMD = DigestUtils.getDigest(DIGEST_ALGORITHM);
for (long nrRemaining = keySize; nrRemaining > 0;
nrRemaining -= bufferSize) {
int curSize = (int)Math.min(bufferSize, nrRemaining);
commonInitialMD.update(keyValueBuffer, 0, curSize);
totalBucketCount = numOfVolumes * numOfBuckets;
totalKeyCount = totalBucketCount * numOfKeys;"Number of Threads: {}", numOfThreads);
threadPoolSize = numOfThreads;
executor = Executors.newFixedThreadPool(threadPoolSize);
addShutdownHook();"Number of Volumes: {}.", numOfVolumes);"Number of Buckets per Volume: {}.", numOfBuckets);"Number of Keys per Bucket: {}.", numOfKeys);"Key size: {} bytes", keySize);"Buffer size: {} bytes", bufferSize);"validateWrites : {}", validateWrites);
for (int i = 0; i < numOfThreads; i++) {
executor.execute(new ObjectCreator());
Thread validator = null;
if (validateWrites) {
totalWritesValidated = 0L;
writeValidationSuccessCount = 0L;
writeValidationFailureCount = 0L;
validationQueue = new LinkedBlockingQueue<>();
validator = new Thread(new Validator());
validator.start();"Data validation is enabled.");
LongSupplier currentValue = numberOfKeysAdded::get;
progressbar = new ProgressBar(System.out, totalKeyCount, currentValue);"Starting progress bar Thread.");
// wait until all keys are added or exception occurred.
while ((numberOfKeysAdded.get() != totalKeyCount)
&& exception == null) {
try {
} catch (InterruptedException e) {
throw e;
executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
completed = true;
if (exception != null) {
} else {
if (validator != null) {
if (exception != null) {
throw new RuntimeException(exception);
return null;
* Adds ShutdownHook to print statistics.
private void addShutdownHook() {
new Thread(() -> {
if (freon != null) {
* Prints stats of {@link Freon} run to the PrintStream.
* @param out PrintStream
private void printStats(PrintStream out) {
long endTime = System.nanoTime() - startTime;
String execTime = DurationFormatUtils
long volumeTime = TimeUnit.NANOSECONDS.toMillis(volumeCreationTime.get())
/ threadPoolSize;
String prettyAverageVolumeTime =
DurationFormatUtils.formatDuration(volumeTime, DURATION_FORMAT);
long bucketTime = TimeUnit.NANOSECONDS.toMillis(bucketCreationTime.get())
/ threadPoolSize;
String prettyAverageBucketTime =
DurationFormatUtils.formatDuration(bucketTime, DURATION_FORMAT);
long averageKeyCreationTime =
/ threadPoolSize;
String prettyAverageKeyCreationTime = DurationFormatUtils
.formatDuration(averageKeyCreationTime, DURATION_FORMAT);
long averageKeyWriteTime =
TimeUnit.NANOSECONDS.toMillis(keyWriteTime.get()) / threadPoolSize;
String prettyAverageKeyWriteTime = DurationFormatUtils
.formatDuration(averageKeyWriteTime, DURATION_FORMAT);
out.println("Status: " + (exception != null ? "Failed" : "Success"));
out.println("Git Base Revision: " + VersionInfo.getRevision());
out.println("Number of Volumes created: " + numberOfVolumesCreated);
out.println("Number of Buckets created: " + numberOfBucketsCreated);
out.println("Number of Keys added: " + numberOfKeysAdded);
out.println("Ratis replication factor: " +;
out.println("Ratis replication type: " +;
"Average Time spent in volume creation: " + prettyAverageVolumeTime);
"Average Time spent in bucket creation: " + prettyAverageBucketTime);
"Average Time spent in key creation: " + prettyAverageKeyCreationTime);
"Average Time spent in key write: " + prettyAverageKeyWriteTime);
out.println("Total bytes written: " + totalBytesWritten);
if (validateWrites) {
out.println("Total number of writes validated: " +
out.println("Writes validated: " +
(100.0 * totalWritesValidated / numberOfKeysAdded.get())
+ " %");
out.println("Successful validation: " +
out.println("Unsuccessful validation: " +
out.println("Total Execution time: " + execTime);
if (jsonDir != null) {
String[][] quantileTime =
new String[FreonOps.values().length][QUANTILES + 1];
String[] deviations = new String[FreonOps.values().length];
String[] means = new String[FreonOps.values().length];
for (FreonOps ops : FreonOps.values()) {
Snapshot snapshot = histograms.get(ops.ordinal()).getSnapshot();
for (int i = 0; i <= QUANTILES; i++) {
quantileTime[ops.ordinal()][i] = DurationFormatUtils.formatDuration(
.toMillis((long) snapshot.getValue((1.0 / QUANTILES) * i)),
deviations[ops.ordinal()] = DurationFormatUtils.formatDuration(
TimeUnit.NANOSECONDS.toMillis((long) snapshot.getStdDev()),
means[ops.ordinal()] = DurationFormatUtils.formatDuration(
TimeUnit.NANOSECONDS.toMillis((long) snapshot.getMean()),
FreonJobInfo jobInfo = new FreonJobInfo().setExecTime(execTime)
String jsonName =
new SimpleDateFormat("yyyyMMddHHmmss").format( + ".json";
String jsonPath = jsonDir + "/" + jsonName;
try (FileOutputStream os = new FileOutputStream(jsonPath)) {
ObjectMapper mapper = new ObjectMapper();
ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
writer.writeValue(os, jobInfo);
} catch (FileNotFoundException e) {
out.println("Json File could not be created for the path: " + jsonPath);
} catch (IOException e) {
out.println("Json object could not be created");
* Returns the number of volumes created.
* @return volume count.
int getNumberOfVolumesCreated() {
return numberOfVolumesCreated.get();
* Returns the number of buckets created.
* @return bucket count.
int getNumberOfBucketsCreated() {
return numberOfBucketsCreated.get();
* Returns the number of keys added.
* @return keys count.
long getNumberOfKeysAdded() {
return numberOfKeysAdded.get();
* Returns true if random validation of write is enabled.
* @return validateWrites
boolean getValidateWrites() {
return validateWrites;
* Returns the number of keys validated.
* @return validated key count.
long getTotalKeysValidated() {
return totalWritesValidated;
* Returns the number of successful validation.
* @return successful validation count.
long getSuccessfulValidationCount() {
return writeValidationSuccessCount;
* Returns the number of unsuccessful validation.
* @return unsuccessful validation count.
long getUnsuccessfulValidationCount() {
return validateWrites ? writeValidationFailureCount : 0;
* Wrapper to hold ozone keyValidate entry.
private static class KeyValidate {
* Bucket name.
private OzoneBucket bucket;
* Key name.
private String keyName;
* Digest of this key's full value.
private byte[] digest;
* Constructs a new ozone keyValidate.
* @param bucket bucket part
* @param keyName key part
* @param digest digest of this key's full value
KeyValidate(OzoneBucket bucket, String keyName, byte[] digest) {
this.bucket = bucket;
this.keyName = keyName;
this.digest = digest;
private class ObjectCreator implements Runnable {
public void run() {
int v;
while ((v = volumeCounter.getAndIncrement()) < numOfVolumes) {
if (!createVolume(v)) {
int b;
while ((b = bucketCounter.getAndIncrement()) < totalBucketCount) {
if (!createBucket(b)) {
long k;
while ((k = keyCounter.getAndIncrement()) < totalKeyCount) {
if (!createKey(k)) {
private boolean createVolume(int volumeNumber) {
String volumeName = "vol-" + volumeNumber + "-"
+ RandomStringUtils.randomNumeric(5);
LOG.trace("Creating volume: {}", volumeName);
try (AutoCloseable scope = TracingUtil
.createActivatedSpan("createVolume")) {
long start = System.nanoTime();
long volumeCreationDuration = System.nanoTime() - start;
OzoneVolume volume = objectStore.getVolume(volumeName);
volumes.put(volumeNumber, volume);
return true;
} catch (Throwable e) {
exception = e;
LOG.error("Could not create volume", e);
return false;
private boolean createBucket(int globalBucketNumber) {
int volumeNumber = globalBucketNumber % numOfVolumes;
int bucketNumber = globalBucketNumber / numOfVolumes;
OzoneVolume volume = getVolume(volumeNumber);
if (volume == null) {
LOG.error("Could not find volume {}", volumeNumber);
return false;
String bucketName = "bucket-" + bucketNumber + "-" +
LOG.trace("Creating bucket: {} in volume: {}",
bucketName, volume.getName());
try (AutoCloseable scope = TracingUtil
.createActivatedSpan("createBucket")) {
long start = System.nanoTime();
long bucketCreationDuration = System.nanoTime() - start;
OzoneBucket bucket = volume.getBucket(bucketName);
buckets.put(globalBucketNumber, bucket);
return true;
} catch (Throwable e) {
exception = e;
LOG.error("Could not create bucket ", e);
return false;
private boolean createKey(long globalKeyNumber) {
int globalBucketNumber = (int) (globalKeyNumber % totalBucketCount);
long keyNumber = globalKeyNumber / totalBucketCount;
OzoneBucket bucket = getBucket(globalBucketNumber);
if (bucket == null) {
LOG.error("Could not find bucket {}", globalBucketNumber);
return false;
String bucketName = bucket.getName();
String volumeName = bucket.getVolumeName();
String keyName = "key-" + keyNumber + "-"
+ RandomStringUtils.randomNumeric(5);
LOG.trace("Adding key: {} in bucket: {} of volume: {}",
keyName, bucketName, volumeName);
try {
try (AutoCloseable scope = TracingUtil.createActivatedSpan("createKey")) {
long keyCreateStart = System.nanoTime();
try (OzoneOutputStream os = bucket.createKey(keyName, keySize, type,
factor, new HashMap<>())) {
long keyCreationDuration = System.nanoTime() - keyCreateStart;
try (AutoCloseable writeScope = TracingUtil
.createActivatedSpan("writeKeyData")) {
long keyWriteStart = System.nanoTime();
for (long nrRemaining = keySize;
nrRemaining > 0; nrRemaining -= bufferSize) {
int curSize = (int) Math.min(bufferSize, nrRemaining);
os.write(keyValueBuffer, 0, curSize);
long keyWriteDuration = System.nanoTime() - keyWriteStart;
if (validateWrites) {
MessageDigest tmpMD = (MessageDigest) commonInitialMD.clone();
boolean validate = validationQueue.offer(
new KeyValidate(bucket, keyName, tmpMD.digest()));
if (validate) {
LOG.trace("Key {} is queued for validation.", keyName);
return true;
} catch (Throwable e) {
exception = e;
LOG.error("Exception while adding key: {} in bucket: {}" +
" of volume: {}.", keyName, bucketName, volumeName, e);
return false;
private OzoneVolume getVolume(Integer volumeNumber) {
return waitUntilAddedToMap(volumes, volumeNumber);
private OzoneBucket getBucket(Integer bucketNumber) {
return waitUntilAddedToMap(buckets, bucketNumber);
* Looks up volume or bucket from the cache. Waits for it to be created if
* needed (can happen for the last few items depending on the number of
* threads).
* @return may return null if this thread is interrupted, or if any other
* thread encounters an exception (and stores it to {@code exception})
private <T> T waitUntilAddedToMap(Map<Integer, T> map, Integer i) {
while (exception == null && !map.containsKey(i)) {
try {
} catch (InterruptedException e) {
return null;
return map.get(i);
private final class FreonJobInfo {
private String status;
private String gitBaseRevision;
private String jobStartTime;
private int numOfVolumes;
private int numOfBuckets;
private int numOfKeys;
private int numOfThreads;
private String dataWritten;
private String execTime;
private String replicationFactor;
private String replicationType;
private long keySize;
private int bufferSize;
private String totalThroughputPerSecond;
private String meanVolumeCreateTime;
private String deviationVolumeCreateTime;
private String[] tenQuantileVolumeCreateTime;
private String meanBucketCreateTime;
private String deviationBucketCreateTime;
private String[] tenQuantileBucketCreateTime;
private String meanKeyCreateTime;
private String deviationKeyCreateTime;
private String[] tenQuantileKeyCreateTime;
private String meanKeyWriteTime;
private String deviationKeyWriteTime;
private String[] tenQuantileKeyWriteTime;
private FreonJobInfo() {
this.status = exception != null ? "Failed" : "Success";
this.numOfVolumes = RandomKeyGenerator.this.numOfVolumes;
this.numOfBuckets = RandomKeyGenerator.this.numOfBuckets;
this.numOfKeys = RandomKeyGenerator.this.numOfKeys;
this.numOfThreads = RandomKeyGenerator.this.numOfThreads;
this.keySize = RandomKeyGenerator.this.keySize;
this.bufferSize = RandomKeyGenerator.this.bufferSize;
this.jobStartTime = Time.formatTime(RandomKeyGenerator.this.jobStartTime);
this.replicationFactor =;
this.replicationType =;
long totalBytes =
(long) numOfVolumes * numOfBuckets * numOfKeys * keySize;
this.dataWritten = getInStorageUnits((double) totalBytes);
this.totalThroughputPerSecond = getInStorageUnits(
(totalBytes * 1.0) / TimeUnit.NANOSECONDS
RandomKeyGenerator.this.keyWriteTime.get() / threadPoolSize));
private String getInStorageUnits(Double value) {
double size;
OzoneQuota.Units unit;
if ((long) (value / OzoneConsts.TB) != 0) {
size = value / OzoneConsts.TB;
unit = OzoneQuota.Units.TB;
} else if ((long) (value / OzoneConsts.GB) != 0) {
size = value / OzoneConsts.GB;
unit = OzoneQuota.Units.GB;
} else if ((long) (value / OzoneConsts.MB) != 0) {
size = value / OzoneConsts.MB;
unit = OzoneQuota.Units.MB;
} else if ((long) (value / OzoneConsts.KB) != 0) {
size = value / OzoneConsts.KB;
unit = OzoneQuota.Units.KB;
} else {
size = value;
unit = OzoneQuota.Units.BYTES;
return size + " " + unit;
public FreonJobInfo setGitBaseRevision(String gitBaseRevisionVal) {
gitBaseRevision = gitBaseRevisionVal;
return this;
public FreonJobInfo setExecTime(String execTimeVal) {
execTime = execTimeVal;
return this;
public FreonJobInfo setMeanKeyWriteTime(String deviationKeyWriteTimeVal) {
this.meanKeyWriteTime = deviationKeyWriteTimeVal;
return this;
public FreonJobInfo setDeviationKeyWriteTime(
String deviationKeyWriteTimeVal) {
this.deviationKeyWriteTime = deviationKeyWriteTimeVal;
return this;
public FreonJobInfo setTenQuantileKeyWriteTime(
String[] tenQuantileKeyWriteTimeVal) {
this.tenQuantileKeyWriteTime = tenQuantileKeyWriteTimeVal;
return this;
public FreonJobInfo setMeanKeyCreateTime(String deviationKeyWriteTimeVal) {
this.meanKeyCreateTime = deviationKeyWriteTimeVal;
return this;
public FreonJobInfo setDeviationKeyCreateTime(
String deviationKeyCreateTimeVal) {
this.deviationKeyCreateTime = deviationKeyCreateTimeVal;
return this;
public FreonJobInfo setTenQuantileKeyCreateTime(
String[] tenQuantileKeyCreateTimeVal) {
this.tenQuantileKeyCreateTime = tenQuantileKeyCreateTimeVal;
return this;
public FreonJobInfo setMeanBucketCreateTime(
String deviationKeyWriteTimeVal) {
this.meanBucketCreateTime = deviationKeyWriteTimeVal;
return this;
public FreonJobInfo setDeviationBucketCreateTime(
String deviationBucketCreateTimeVal) {
this.deviationBucketCreateTime = deviationBucketCreateTimeVal;
return this;
public FreonJobInfo setTenQuantileBucketCreateTime(
String[] tenQuantileBucketCreateTimeVal) {
this.tenQuantileBucketCreateTime = tenQuantileBucketCreateTimeVal;
return this;
public FreonJobInfo setMeanVolumeCreateTime(
String deviationKeyWriteTimeVal) {
this.meanVolumeCreateTime = deviationKeyWriteTimeVal;
return this;
public FreonJobInfo setDeviationVolumeCreateTime(
String deviationVolumeCreateTimeVal) {
this.deviationVolumeCreateTime = deviationVolumeCreateTimeVal;
return this;
public FreonJobInfo setTenQuantileVolumeCreateTime(
String[] tenQuantileVolumeCreateTimeVal) {
this.tenQuantileVolumeCreateTime = tenQuantileVolumeCreateTimeVal;
return this;
public String getJobStartTime() {
return jobStartTime;
public int getNumOfVolumes() {
return numOfVolumes;
public int getNumOfBuckets() {
return numOfBuckets;
public int getNumOfKeys() {
return numOfKeys;
public int getNumOfThreads() {
return numOfThreads;
public String getExecTime() {
return execTime;
public String getReplicationFactor() {
return replicationFactor;
public String getReplicationType() {
return replicationType;
public String getStatus() {
return status;
public long getKeySize() {
return keySize;
public int getBufferSize() {
return bufferSize;
public String getGitBaseRevision() {
return gitBaseRevision;
public String getDataWritten() {
return dataWritten;
public String getTotalThroughputPerSecond() {
return totalThroughputPerSecond;
public String getMeanVolumeCreateTime() {
return meanVolumeCreateTime;
public String getDeviationVolumeCreateTime() {
return deviationVolumeCreateTime;
public String[] getTenQuantileVolumeCreateTime() {
return tenQuantileVolumeCreateTime;
public String getMeanBucketCreateTime() {
return meanBucketCreateTime;
public String getDeviationBucketCreateTime() {
return deviationBucketCreateTime;
public String[] getTenQuantileBucketCreateTime() {
return tenQuantileBucketCreateTime;
public String getMeanKeyCreateTime() {
return meanKeyCreateTime;
public String getDeviationKeyCreateTime() {
return deviationKeyCreateTime;
public String[] getTenQuantileKeyCreateTime() {
return tenQuantileKeyCreateTime;
public String getMeanKeyWriteTime() {
return meanKeyWriteTime;
public String getDeviationKeyWriteTime() {
return deviationKeyWriteTime;
public String[] getTenQuantileKeyWriteTime() {
return tenQuantileKeyWriteTime;
* Validates the write done in ozone cluster.
private class Validator implements Runnable {
public void run() {
DigestUtils dig = new DigestUtils(DIGEST_ALGORITHM);
while (true) {
if (completed && validationQueue.isEmpty()) {
try {
KeyValidate kv = validationQueue.poll(5, TimeUnit.SECONDS);
if (kv != null) {
OzoneInputStream is = kv.bucket.readKey(kv.keyName);
byte[] curDigest = dig.digest(is);
if (MessageDigest.isEqual(kv.digest, curDigest)) {
} else {
LOG.warn("Data validation error for key {}/{}/{}",
kv.bucket.getVolumeName(), kv.bucket, kv.keyName);
LOG.warn("Expected checksum: {}, Actual checksum: {}",
kv.digest, curDigest);
} catch (IOException ex) {
LOG.error("Exception while validating write.", ex);
} catch (InterruptedException ex) {
public void setNumOfVolumes(int numOfVolumes) {
this.numOfVolumes = numOfVolumes;
public void setNumOfBuckets(int numOfBuckets) {
this.numOfBuckets = numOfBuckets;
public void setNumOfKeys(int numOfKeys) {
this.numOfKeys = numOfKeys;
public void setNumOfThreads(int numOfThreads) {
this.numOfThreads = numOfThreads;
public void setKeySize(long keySize) {
this.keySize = keySize;
public void setType(ReplicationType type) {
this.type = type;
public void setFactor(ReplicationFactor factor) {
this.factor = factor;
public void setValidateWrites(boolean validateWrites) {
this.validateWrites = validateWrites;
public int getThreadPoolSize() {
return threadPoolSize;