blob: 2bf79c5c78248d27c9146956414c1c793f7b812e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.freon;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.UniformReservoir;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.ParentCommand;
import java.util.concurrent.LinkedBlockingQueue;
import java.security.MessageDigest;
/**
* Data generator tool to generate as much keys as possible.
*/
@Command(name = "randomkeys",
aliases = "rk",
description = "Generate volumes/buckets and put generated keys.",
versionProvider = HddsVersionProvider.class,
mixinStandardHelpOptions = true,
showDefaultValues = true)
public final class RandomKeyGenerator implements Callable<Void> {
@ParentCommand
private Freon freon;
enum FreonOps {
VOLUME_CREATE,
BUCKET_CREATE,
KEY_CREATE,
KEY_WRITE
}
private static final String DURATION_FORMAT = "HH:mm:ss,SSS";
private static final int QUANTILES = 10;
private static final int CHECK_INTERVAL_MILLIS = 5000;
private byte[] keyValueBuffer = null;
private static final String DIGEST_ALGORITHM = "MD5";
// A common initial MesssageDigest for each key without its UUID
private MessageDigest commonInitialMD = null;
private static final Logger LOG =
LoggerFactory.getLogger(RandomKeyGenerator.class);
private volatile boolean completed = false;
private volatile Exception exception = null;
@Option(names = "--numOfThreads",
description = "number of threads to be launched for the run",
defaultValue = "10")
private int numOfThreads = 10;
@Option(names = "--numOfVolumes",
description = "specifies number of Volumes to be created in offline mode",
defaultValue = "10")
private int numOfVolumes = 10;
@Option(names = "--numOfBuckets",
description = "specifies number of Buckets to be created per Volume",
defaultValue = "1000")
private int numOfBuckets = 1000;
@Option(
names = "--numOfKeys",
description = "specifies number of Keys to be created per Bucket",
defaultValue = "500000"
)
private int numOfKeys = 500000;
@Option(
names = "--keySize",
description = "Specifies the size of Key in bytes to be created",
defaultValue = "10240"
)
private long keySize = 10240;
@Option(
names = "--validateWrites",
description = "Specifies whether to validate keys after writing"
)
private boolean validateWrites = false;
@Option(
names = "--bufferSize",
description = "Specifies the buffer size while writing",
defaultValue = "4096"
)
private int bufferSize = 4096;
@Option(
names = "--json",
description = "directory where json is created."
)
private String jsonDir;
@Option(
names = "--replicationType",
description = "Replication type (STAND_ALONE, RATIS)",
defaultValue = "STAND_ALONE"
)
private ReplicationType type = ReplicationType.STAND_ALONE;
@Option(
names = "--factor",
description = "Replication factor (ONE, THREE)",
defaultValue = "ONE"
)
private ReplicationFactor factor = ReplicationFactor.ONE;
private int threadPoolSize;
private OzoneClient ozoneClient;
private ObjectStore objectStore;
private ExecutorService executor;
private long startTime;
private long jobStartTime;
private AtomicLong volumeCreationTime;
private AtomicLong bucketCreationTime;
private AtomicLong keyCreationTime;
private AtomicLong keyWriteTime;
private AtomicLong totalBytesWritten;
private int totalBucketCount;
private long totalKeyCount;
private AtomicInteger volumeCounter;
private AtomicInteger bucketCounter;
private AtomicLong keyCounter;
private Map<Integer, OzoneVolume> volumes;
private Map<Integer, OzoneBucket> buckets;
private AtomicInteger numberOfVolumesCreated;
private AtomicInteger numberOfBucketsCreated;
private AtomicLong numberOfKeysAdded;
private Long totalWritesValidated;
private Long writeValidationSuccessCount;
private Long writeValidationFailureCount;
private BlockingQueue<KeyValidate> validationQueue;
private ArrayList<Histogram> histograms = new ArrayList<>();
private OzoneConfiguration ozoneConfiguration;
private ProgressBar progressbar;
RandomKeyGenerator() {
}
@VisibleForTesting
RandomKeyGenerator(OzoneConfiguration ozoneConfiguration) {
this.ozoneConfiguration = ozoneConfiguration;
}
public void init(OzoneConfiguration configuration) throws IOException {
startTime = System.nanoTime();
jobStartTime = System.currentTimeMillis();
volumeCreationTime = new AtomicLong();
bucketCreationTime = new AtomicLong();
keyCreationTime = new AtomicLong();
keyWriteTime = new AtomicLong();
totalBytesWritten = new AtomicLong();
numberOfVolumesCreated = new AtomicInteger();
numberOfBucketsCreated = new AtomicInteger();
numberOfKeysAdded = new AtomicLong();
volumeCounter = new AtomicInteger();
bucketCounter = new AtomicInteger();
keyCounter = new AtomicLong();
volumes = new ConcurrentHashMap<>();
buckets = new ConcurrentHashMap<>();
ozoneClient = OzoneClientFactory.getClient(configuration);
objectStore = ozoneClient.getObjectStore();
for (FreonOps ops : FreonOps.values()) {
histograms.add(ops.ordinal(), new Histogram(new UniformReservoir()));
}
if (freon != null) {
freon.startHttpServer();
}
}
@Override
public Void call() throws Exception {
if (ozoneConfiguration != null) {
if (!ozoneConfiguration.getBoolean(
HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA,
HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA_DEFAULT)) {
LOG.info("Override validateWrites to false, because "
+ HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA + " is set to false.");
validateWrites = false;
}
init(ozoneConfiguration);
} else {
init(freon.createOzoneConfiguration());
}
keyValueBuffer = DFSUtil.string2Bytes(
RandomStringUtils.randomAscii(bufferSize));
// Compute the common initial digest for all keys without their UUID
if (validateWrites) {
commonInitialMD = DigestUtils.getDigest(DIGEST_ALGORITHM);
for (long nrRemaining = keySize; nrRemaining > 0;
nrRemaining -= bufferSize) {
int curSize = (int)Math.min(bufferSize, nrRemaining);
commonInitialMD.update(keyValueBuffer, 0, curSize);
}
}
totalBucketCount = numOfVolumes * numOfBuckets;
totalKeyCount = totalBucketCount * numOfKeys;
LOG.info("Number of Threads: " + numOfThreads);
threadPoolSize = numOfThreads;
executor = Executors.newFixedThreadPool(threadPoolSize);
addShutdownHook();
LOG.info("Number of Volumes: {}.", numOfVolumes);
LOG.info("Number of Buckets per Volume: {}.", numOfBuckets);
LOG.info("Number of Keys per Bucket: {}.", numOfKeys);
LOG.info("Key size: {} bytes", keySize);
LOG.info("Buffer size: {} bytes", bufferSize);
LOG.info("validateWrites : {}", validateWrites);
for (int i = 0; i < numOfThreads; i++) {
executor.submit(new ObjectCreator());
}
Thread validator = null;
if (validateWrites) {
totalWritesValidated = 0L;
writeValidationSuccessCount = 0L;
writeValidationFailureCount = 0L;
validationQueue = new LinkedBlockingQueue<>();
validator = new Thread(new Validator());
validator.start();
LOG.info("Data validation is enabled.");
}
Supplier<Long> currentValue = numberOfKeysAdded::get;
progressbar = new ProgressBar(System.out, totalKeyCount, currentValue);
LOG.info("Starting progress bar Thread.");
progressbar.start();
// wait until all keys are added or exception occurred.
while ((numberOfKeysAdded.get() != totalKeyCount)
&& exception == null) {
try {
Thread.sleep(CHECK_INTERVAL_MILLIS);
} catch (InterruptedException e) {
throw e;
}
}
executor.shutdown();
executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
completed = true;
if (exception != null) {
progressbar.terminate();
} else {
progressbar.shutdown();
}
if (validator != null) {
validator.join();
}
ozoneClient.close();
if (exception != null) {
throw exception;
}
return null;
}
/**
* Adds ShutdownHook to print statistics.
*/
private void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(
new Thread(() -> {
printStats(System.out);
if (freon != null) {
freon.stopHttpServer();
}
}));
}
/**
* Prints stats of {@link Freon} run to the PrintStream.
*
* @param out PrintStream
*/
private void printStats(PrintStream out) {
long endTime = System.nanoTime() - startTime;
String execTime = DurationFormatUtils
.formatDuration(TimeUnit.NANOSECONDS.toMillis(endTime),
DURATION_FORMAT);
long volumeTime = TimeUnit.NANOSECONDS.toMillis(volumeCreationTime.get())
/ threadPoolSize;
String prettyAverageVolumeTime =
DurationFormatUtils.formatDuration(volumeTime, DURATION_FORMAT);
long bucketTime = TimeUnit.NANOSECONDS.toMillis(bucketCreationTime.get())
/ threadPoolSize;
String prettyAverageBucketTime =
DurationFormatUtils.formatDuration(bucketTime, DURATION_FORMAT);
long averageKeyCreationTime =
TimeUnit.NANOSECONDS.toMillis(keyCreationTime.get())
/ threadPoolSize;
String prettyAverageKeyCreationTime = DurationFormatUtils
.formatDuration(averageKeyCreationTime, DURATION_FORMAT);
long averageKeyWriteTime =
TimeUnit.NANOSECONDS.toMillis(keyWriteTime.get()) / threadPoolSize;
String prettyAverageKeyWriteTime = DurationFormatUtils
.formatDuration(averageKeyWriteTime, DURATION_FORMAT);
out.println();
out.println("***************************************************");
out.println("Status: " + (exception != null ? "Failed" : "Success"));
out.println("Git Base Revision: " + VersionInfo.getRevision());
out.println("Number of Volumes created: " + numberOfVolumesCreated);
out.println("Number of Buckets created: " + numberOfBucketsCreated);
out.println("Number of Keys added: " + numberOfKeysAdded);
out.println("Ratis replication factor: " + factor.name());
out.println("Ratis replication type: " + type.name());
out.println(
"Average Time spent in volume creation: " + prettyAverageVolumeTime);
out.println(
"Average Time spent in bucket creation: " + prettyAverageBucketTime);
out.println(
"Average Time spent in key creation: " + prettyAverageKeyCreationTime);
out.println(
"Average Time spent in key write: " + prettyAverageKeyWriteTime);
out.println("Total bytes written: " + totalBytesWritten);
if (validateWrites) {
out.println("Total number of writes validated: " +
totalWritesValidated);
out.println("Writes validated: " +
(100.0 * totalWritesValidated / numberOfKeysAdded.get())
+ " %");
out.println("Successful validation: " +
writeValidationSuccessCount);
out.println("Unsuccessful validation: " +
writeValidationFailureCount);
}
out.println("Total Execution time: " + execTime);
out.println("***************************************************");
if (jsonDir != null) {
String[][] quantileTime =
new String[FreonOps.values().length][QUANTILES + 1];
String[] deviations = new String[FreonOps.values().length];
String[] means = new String[FreonOps.values().length];
for (FreonOps ops : FreonOps.values()) {
Snapshot snapshot = histograms.get(ops.ordinal()).getSnapshot();
for (int i = 0; i <= QUANTILES; i++) {
quantileTime[ops.ordinal()][i] = DurationFormatUtils.formatDuration(
TimeUnit.NANOSECONDS
.toMillis((long) snapshot.getValue((1.0 / QUANTILES) * i)),
DURATION_FORMAT);
}
deviations[ops.ordinal()] = DurationFormatUtils.formatDuration(
TimeUnit.NANOSECONDS.toMillis((long) snapshot.getStdDev()),
DURATION_FORMAT);
means[ops.ordinal()] = DurationFormatUtils.formatDuration(
TimeUnit.NANOSECONDS.toMillis((long) snapshot.getMean()),
DURATION_FORMAT);
}
FreonJobInfo jobInfo = new FreonJobInfo().setExecTime(execTime)
.setGitBaseRevision(VersionInfo.getRevision())
.setMeanVolumeCreateTime(means[FreonOps.VOLUME_CREATE.ordinal()])
.setDeviationVolumeCreateTime(
deviations[FreonOps.VOLUME_CREATE.ordinal()])
.setTenQuantileVolumeCreateTime(
quantileTime[FreonOps.VOLUME_CREATE.ordinal()])
.setMeanBucketCreateTime(means[FreonOps.BUCKET_CREATE.ordinal()])
.setDeviationBucketCreateTime(
deviations[FreonOps.BUCKET_CREATE.ordinal()])
.setTenQuantileBucketCreateTime(
quantileTime[FreonOps.BUCKET_CREATE.ordinal()])
.setMeanKeyCreateTime(means[FreonOps.KEY_CREATE.ordinal()])
.setDeviationKeyCreateTime(deviations[FreonOps.KEY_CREATE.ordinal()])
.setTenQuantileKeyCreateTime(
quantileTime[FreonOps.KEY_CREATE.ordinal()])
.setMeanKeyWriteTime(means[FreonOps.KEY_WRITE.ordinal()])
.setDeviationKeyWriteTime(deviations[FreonOps.KEY_WRITE.ordinal()])
.setTenQuantileKeyWriteTime(
quantileTime[FreonOps.KEY_WRITE.ordinal()]);
String jsonName =
new SimpleDateFormat("yyyyMMddHHmmss").format(Time.now()) + ".json";
String jsonPath = jsonDir + "/" + jsonName;
FileOutputStream os = null;
try {
os = new FileOutputStream(jsonPath);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.FIELD,
JsonAutoDetect.Visibility.ANY);
ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
writer.writeValue(os, jobInfo);
} catch (FileNotFoundException e) {
out.println("Json File could not be created for the path: " + jsonPath);
out.println(e);
} catch (IOException e) {
out.println("Json object could not be created");
out.println(e);
} finally {
try {
if (os != null) {
os.close();
}
} catch (IOException e) {
LOG.warn("Could not close the output stream for json", e);
}
}
}
}
/**
* Returns the number of volumes created.
*
* @return volume count.
*/
@VisibleForTesting
int getNumberOfVolumesCreated() {
return numberOfVolumesCreated.get();
}
/**
* Returns the number of buckets created.
*
* @return bucket count.
*/
@VisibleForTesting
int getNumberOfBucketsCreated() {
return numberOfBucketsCreated.get();
}
/**
* Returns the number of keys added.
*
* @return keys count.
*/
@VisibleForTesting
long getNumberOfKeysAdded() {
return numberOfKeysAdded.get();
}
/**
* Returns true if random validation of write is enabled.
*
* @return validateWrites
*/
@VisibleForTesting
boolean getValidateWrites() {
return validateWrites;
}
/**
* Returns the number of keys validated.
*
* @return validated key count.
*/
@VisibleForTesting
long getTotalKeysValidated() {
return totalWritesValidated;
}
/**
* Returns the number of successful validation.
*
* @return successful validation count.
*/
@VisibleForTesting
long getSuccessfulValidationCount() {
return writeValidationSuccessCount;
}
/**
* Returns the number of unsuccessful validation.
*
* @return unsuccessful validation count.
*/
@VisibleForTesting
long getUnsuccessfulValidationCount() {
return validateWrites ? writeValidationFailureCount : 0;
}
/**
* Wrapper to hold ozone keyValidate entry.
*/
private static class KeyValidate {
/**
* Bucket name.
*/
private OzoneBucket bucket;
/**
* Key name.
*/
private String keyName;
/**
* Digest of this key's full value.
*/
private byte[] digest;
/**
* Constructs a new ozone keyValidate.
*
* @param bucket bucket part
* @param keyName key part
* @param digest digest of this key's full value
*/
KeyValidate(OzoneBucket bucket, String keyName, byte[] digest) {
this.bucket = bucket;
this.keyName = keyName;
this.digest = digest;
}
}
private class ObjectCreator implements Runnable {
@Override
public void run() {
int v;
while ((v = volumeCounter.getAndIncrement()) < numOfVolumes) {
if (!createVolume(v)) {
return;
}
}
int b;
while ((b = bucketCounter.getAndIncrement()) < totalBucketCount) {
if (!createBucket(b)) {
return;
}
}
long k;
while ((k = keyCounter.getAndIncrement()) < totalKeyCount) {
if (!createKey(k)) {
return;
}
}
}
}
private boolean createVolume(int volumeNumber) {
String volumeName = "vol-" + volumeNumber + "-"
+ RandomStringUtils.randomNumeric(5);
LOG.trace("Creating volume: {}", volumeName);
try (Scope ignored = GlobalTracer.get().buildSpan("createVolume")
.startActive(true)) {
long start = System.nanoTime();
objectStore.createVolume(volumeName);
long volumeCreationDuration = System.nanoTime() - start;
volumeCreationTime.getAndAdd(volumeCreationDuration);
histograms.get(FreonOps.VOLUME_CREATE.ordinal())
.update(volumeCreationDuration);
numberOfVolumesCreated.getAndIncrement();
OzoneVolume volume = objectStore.getVolume(volumeName);
volumes.put(volumeNumber, volume);
return true;
} catch (IOException e) {
exception = e;
LOG.error("Could not create volume", e);
return false;
}
}
private boolean createBucket(int globalBucketNumber) {
int volumeNumber = globalBucketNumber % numOfVolumes;
int bucketNumber = globalBucketNumber / numOfVolumes;
OzoneVolume volume = getVolume(volumeNumber);
if (volume == null) {
return false;
}
String bucketName = "bucket-" + bucketNumber + "-" +
RandomStringUtils.randomNumeric(5);
LOG.trace("Creating bucket: {} in volume: {}",
bucketName, volume.getName());
try (Scope ignored = GlobalTracer.get().buildSpan("createBucket")
.startActive(true)) {
long start = System.nanoTime();
volume.createBucket(bucketName);
long bucketCreationDuration = System.nanoTime() - start;
histograms.get(FreonOps.BUCKET_CREATE.ordinal())
.update(bucketCreationDuration);
bucketCreationTime.getAndAdd(bucketCreationDuration);
numberOfBucketsCreated.getAndIncrement();
OzoneBucket bucket = volume.getBucket(bucketName);
buckets.put(globalBucketNumber, bucket);
return true;
} catch (IOException e) {
exception = e;
LOG.error("Could not create bucket ", e);
return false;
}
}
@SuppressFBWarnings("REC_CATCH_EXCEPTION")
private boolean createKey(long globalKeyNumber) {
int globalBucketNumber = (int) (globalKeyNumber % totalBucketCount);
long keyNumber = globalKeyNumber / totalBucketCount;
OzoneBucket bucket = getBucket(globalBucketNumber);
if (bucket == null) {
return false;
}
String bucketName = bucket.getName();
String volumeName = bucket.getVolumeName();
String keyName = "key-" + keyNumber + "-"
+ RandomStringUtils.randomNumeric(5);
LOG.trace("Adding key: {} in bucket: {} of volume: {}",
keyName, bucketName, volumeName);
try {
try (Scope scope = GlobalTracer.get().buildSpan("createKey")
.startActive(true)) {
long keyCreateStart = System.nanoTime();
OzoneOutputStream os = bucket.createKey(keyName, keySize, type,
factor, new HashMap<>());
long keyCreationDuration = System.nanoTime() - keyCreateStart;
histograms.get(FreonOps.KEY_CREATE.ordinal())
.update(keyCreationDuration);
keyCreationTime.getAndAdd(keyCreationDuration);
try (Scope writeScope = GlobalTracer.get().buildSpan("writeKeyData")
.startActive(true)) {
long keyWriteStart = System.nanoTime();
for (long nrRemaining = keySize;
nrRemaining > 0; nrRemaining -= bufferSize) {
int curSize = (int) Math.min(bufferSize, nrRemaining);
os.write(keyValueBuffer, 0, curSize);
}
os.close();
long keyWriteDuration = System.nanoTime() - keyWriteStart;
histograms.get(FreonOps.KEY_WRITE.ordinal())
.update(keyWriteDuration);
keyWriteTime.getAndAdd(keyWriteDuration);
totalBytesWritten.getAndAdd(keySize);
numberOfKeysAdded.getAndIncrement();
}
}
if (validateWrites) {
MessageDigest tmpMD = (MessageDigest) commonInitialMD.clone();
boolean validate = validationQueue.offer(
new KeyValidate(bucket, keyName, tmpMD.digest()));
if (validate) {
LOG.trace("Key {} is queued for validation.", keyName);
}
}
return true;
} catch (Exception e) {
exception = e;
LOG.error("Exception while adding key: {} in bucket: {}" +
" of volume: {}.", keyName, bucketName, volumeName, e);
return false;
}
}
private OzoneVolume getVolume(Integer volumeNumber) {
return waitUntilAddedToMap(volumes, volumeNumber);
}
private OzoneBucket getBucket(Integer bucketNumber) {
return waitUntilAddedToMap(buckets, bucketNumber);
}
/**
* Looks up volume or bucket from the cache. Waits for it to be created if
* needed (can happen for the last few items depending on the number of
* threads).
*
* @return may return null if this thread is interrupted, or if any other
* thread encounters an exception (and stores it to {@code exception})
*/
private <T> T waitUntilAddedToMap(Map<Integer, T> map, Integer i) {
while (exception == null && !map.containsKey(i)) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
return map.get(i);
}
private final class FreonJobInfo {
private String status;
private String gitBaseRevision;
private String jobStartTime;
private int numOfVolumes;
private int numOfBuckets;
private int numOfKeys;
private int numOfThreads;
private String dataWritten;
private String execTime;
private String replicationFactor;
private String replicationType;
private long keySize;
private int bufferSize;
private String totalThroughputPerSecond;
private String meanVolumeCreateTime;
private String deviationVolumeCreateTime;
private String[] tenQuantileVolumeCreateTime;
private String meanBucketCreateTime;
private String deviationBucketCreateTime;
private String[] tenQuantileBucketCreateTime;
private String meanKeyCreateTime;
private String deviationKeyCreateTime;
private String[] tenQuantileKeyCreateTime;
private String meanKeyWriteTime;
private String deviationKeyWriteTime;
private String[] tenQuantileKeyWriteTime;
private FreonJobInfo() {
this.status = exception != null ? "Failed" : "Success";
this.numOfVolumes = RandomKeyGenerator.this.numOfVolumes;
this.numOfBuckets = RandomKeyGenerator.this.numOfBuckets;
this.numOfKeys = RandomKeyGenerator.this.numOfKeys;
this.numOfThreads = RandomKeyGenerator.this.numOfThreads;
this.keySize = RandomKeyGenerator.this.keySize;
this.bufferSize = RandomKeyGenerator.this.bufferSize;
this.jobStartTime = Time.formatTime(RandomKeyGenerator.this.jobStartTime);
this.replicationFactor = RandomKeyGenerator.this.factor.name();
this.replicationType = RandomKeyGenerator.this.type.name();
long totalBytes =
(long) numOfVolumes * numOfBuckets * numOfKeys * keySize;
this.dataWritten = getInStorageUnits((double) totalBytes);
this.totalThroughputPerSecond = getInStorageUnits(
(totalBytes * 1.0) / TimeUnit.NANOSECONDS
.toSeconds(
RandomKeyGenerator.this.keyWriteTime.get() / threadPoolSize));
}
private String getInStorageUnits(Double value) {
double size;
OzoneQuota.Units unit;
if ((long) (value / OzoneConsts.TB) != 0) {
size = value / OzoneConsts.TB;
unit = OzoneQuota.Units.TB;
} else if ((long) (value / OzoneConsts.GB) != 0) {
size = value / OzoneConsts.GB;
unit = OzoneQuota.Units.GB;
} else if ((long) (value / OzoneConsts.MB) != 0) {
size = value / OzoneConsts.MB;
unit = OzoneQuota.Units.MB;
} else if ((long) (value / OzoneConsts.KB) != 0) {
size = value / OzoneConsts.KB;
unit = OzoneQuota.Units.KB;
} else {
size = value;
unit = OzoneQuota.Units.BYTES;
}
return size + " " + unit;
}
public FreonJobInfo setGitBaseRevision(String gitBaseRevisionVal) {
gitBaseRevision = gitBaseRevisionVal;
return this;
}
public FreonJobInfo setExecTime(String execTimeVal) {
execTime = execTimeVal;
return this;
}
public FreonJobInfo setMeanKeyWriteTime(String deviationKeyWriteTimeVal) {
this.meanKeyWriteTime = deviationKeyWriteTimeVal;
return this;
}
public FreonJobInfo setDeviationKeyWriteTime(
String deviationKeyWriteTimeVal) {
this.deviationKeyWriteTime = deviationKeyWriteTimeVal;
return this;
}
public FreonJobInfo setTenQuantileKeyWriteTime(
String[] tenQuantileKeyWriteTimeVal) {
this.tenQuantileKeyWriteTime = tenQuantileKeyWriteTimeVal;
return this;
}
public FreonJobInfo setMeanKeyCreateTime(String deviationKeyWriteTimeVal) {
this.meanKeyCreateTime = deviationKeyWriteTimeVal;
return this;
}
public FreonJobInfo setDeviationKeyCreateTime(
String deviationKeyCreateTimeVal) {
this.deviationKeyCreateTime = deviationKeyCreateTimeVal;
return this;
}
public FreonJobInfo setTenQuantileKeyCreateTime(
String[] tenQuantileKeyCreateTimeVal) {
this.tenQuantileKeyCreateTime = tenQuantileKeyCreateTimeVal;
return this;
}
public FreonJobInfo setMeanBucketCreateTime(
String deviationKeyWriteTimeVal) {
this.meanBucketCreateTime = deviationKeyWriteTimeVal;
return this;
}
public FreonJobInfo setDeviationBucketCreateTime(
String deviationBucketCreateTimeVal) {
this.deviationBucketCreateTime = deviationBucketCreateTimeVal;
return this;
}
public FreonJobInfo setTenQuantileBucketCreateTime(
String[] tenQuantileBucketCreateTimeVal) {
this.tenQuantileBucketCreateTime = tenQuantileBucketCreateTimeVal;
return this;
}
public FreonJobInfo setMeanVolumeCreateTime(
String deviationKeyWriteTimeVal) {
this.meanVolumeCreateTime = deviationKeyWriteTimeVal;
return this;
}
public FreonJobInfo setDeviationVolumeCreateTime(
String deviationVolumeCreateTimeVal) {
this.deviationVolumeCreateTime = deviationVolumeCreateTimeVal;
return this;
}
public FreonJobInfo setTenQuantileVolumeCreateTime(
String[] tenQuantileVolumeCreateTimeVal) {
this.tenQuantileVolumeCreateTime = tenQuantileVolumeCreateTimeVal;
return this;
}
public String getJobStartTime() {
return jobStartTime;
}
public int getNumOfVolumes() {
return numOfVolumes;
}
public int getNumOfBuckets() {
return numOfBuckets;
}
public int getNumOfKeys() {
return numOfKeys;
}
public int getNumOfThreads() {
return numOfThreads;
}
public String getExecTime() {
return execTime;
}
public String getReplicationFactor() {
return replicationFactor;
}
public String getReplicationType() {
return replicationType;
}
public String getStatus() {
return status;
}
public long getKeySize() {
return keySize;
}
public int getBufferSize() {
return bufferSize;
}
public String getGitBaseRevision() {
return gitBaseRevision;
}
public String getDataWritten() {
return dataWritten;
}
public String getTotalThroughputPerSecond() {
return totalThroughputPerSecond;
}
public String getMeanVolumeCreateTime() {
return meanVolumeCreateTime;
}
public String getDeviationVolumeCreateTime() {
return deviationVolumeCreateTime;
}
public String[] getTenQuantileVolumeCreateTime() {
return tenQuantileVolumeCreateTime;
}
public String getMeanBucketCreateTime() {
return meanBucketCreateTime;
}
public String getDeviationBucketCreateTime() {
return deviationBucketCreateTime;
}
public String[] getTenQuantileBucketCreateTime() {
return tenQuantileBucketCreateTime;
}
public String getMeanKeyCreateTime() {
return meanKeyCreateTime;
}
public String getDeviationKeyCreateTime() {
return deviationKeyCreateTime;
}
public String[] getTenQuantileKeyCreateTime() {
return tenQuantileKeyCreateTime;
}
public String getMeanKeyWriteTime() {
return meanKeyWriteTime;
}
public String getDeviationKeyWriteTime() {
return deviationKeyWriteTime;
}
public String[] getTenQuantileKeyWriteTime() {
return tenQuantileKeyWriteTime;
}
}
/**
* Validates the write done in ozone cluster.
*/
private class Validator implements Runnable {
@Override
public void run() {
DigestUtils dig = new DigestUtils(DIGEST_ALGORITHM);
while (true) {
if (completed && validationQueue.isEmpty()) {
return;
}
try {
KeyValidate kv = validationQueue.poll(5, TimeUnit.SECONDS);
if (kv != null) {
OzoneInputStream is = kv.bucket.readKey(kv.keyName);
dig.getMessageDigest().reset();
byte[] curDigest = dig.digest(is);
totalWritesValidated++;
if (MessageDigest.isEqual(kv.digest, curDigest)) {
writeValidationSuccessCount++;
} else {
writeValidationFailureCount++;
LOG.warn("Data validation error for key {}/{}/{}",
kv.bucket.getVolumeName(), kv.bucket, kv.keyName);
LOG.warn("Expected checksum: {}, Actual checksum: {}",
kv.digest, curDigest);
}
is.close();
}
} catch (IOException | InterruptedException ex) {
LOG.error("Exception while validating write: " + ex.getMessage());
}
}
}
}
@VisibleForTesting
public void setNumOfVolumes(int numOfVolumes) {
this.numOfVolumes = numOfVolumes;
}
@VisibleForTesting
public void setNumOfBuckets(int numOfBuckets) {
this.numOfBuckets = numOfBuckets;
}
@VisibleForTesting
public void setNumOfKeys(int numOfKeys) {
this.numOfKeys = numOfKeys;
}
@VisibleForTesting
public void setNumOfThreads(int numOfThreads) {
this.numOfThreads = numOfThreads;
}
@VisibleForTesting
public void setKeySize(long keySize) {
this.keySize = keySize;
}
@VisibleForTesting
public void setType(ReplicationType type) {
this.type = type;
}
@VisibleForTesting
public void setFactor(ReplicationFactor factor) {
this.factor = factor;
}
@VisibleForTesting
public void setValidateWrites(boolean validateWrites) {
this.validateWrites = validateWrites;
}
@VisibleForTesting
public int getThreadPoolSize() {
return threadPoolSize;
}
}