blob: 6ced6d64fe343ca405346b298d77bc5b311de12b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.chaos.TestProbability;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A Simple Load generator for testing.
*/
public class MiniOzoneLoadGenerator {
private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneLoadGenerator.class);
private static String keyNameDelimiter = "_";
private ThreadPoolExecutor writeExecutor;
private int numWriteThreads;
// number of buffer to be allocated, each is allocated with length which
// is multiple of 2, each buffer is populated with random data.
private int numBuffers;
private List<ByteBuffer> buffers;
private AtomicBoolean isWriteThreadRunning;
private final List<OzoneBucket> ozoneBuckets;
private final AtomicInteger agedFileWrittenIndex;
private final ExecutorService agedFileExecutor;
private final OzoneBucket agedLoadBucket;
private final TestProbability agedWriteProbability;
MiniOzoneLoadGenerator(List<OzoneBucket> bucket,
OzoneBucket agedLoadBucket, int numThreads,
int numBuffers) {
this.ozoneBuckets = bucket;
this.numWriteThreads = numThreads;
this.numBuffers = numBuffers;
this.writeExecutor = new ThreadPoolExecutor(numThreads, numThreads, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
this.writeExecutor.prestartAllCoreThreads();
this.agedFileWrittenIndex = new AtomicInteger(0);
this.agedFileExecutor = Executors.newSingleThreadExecutor();
this.agedLoadBucket = agedLoadBucket;
this.agedWriteProbability = TestProbability.valueOf(10);
this.isWriteThreadRunning = new AtomicBoolean(false);
// allocate buffers and populate random data.
buffers = new ArrayList<>();
for (int i = 0; i < numBuffers; i++) {
int size = (int) StorageUnit.KB.toBytes(1 << i);
ByteBuffer buffer = ByteBuffer.allocate(size);
buffer.put(RandomUtils.nextBytes(size));
buffers.add(buffer);
}
}
// Start IO load on an Ozone bucket.
private void load(long runTimeMillis) {
long threadID = Thread.currentThread().getId();
LOG.info("Started IO Thread:{}.", threadID);
String threadName = Thread.currentThread().getName();
long startTime = Time.monotonicNow();
while (isWriteThreadRunning.get() &&
(Time.monotonicNow() < startTime + runTimeMillis)) {
OzoneBucket bucket =
ozoneBuckets.get((int) (Math.random() * ozoneBuckets.size()));
try {
int index = RandomUtils.nextInt();
String keyName = writeData(index, bucket, threadName);
readData(bucket, keyName, index);
deleteKey(bucket, keyName);
} catch (Exception e) {
LOG.error("LOADGEN: Exiting due to exception", e);
break;
}
}
// This will terminate other threads too.
isWriteThreadRunning.set(false);
LOG.info("Terminating IO thread:{}.", threadID);
}
private String writeData(int keyIndex, OzoneBucket bucket, String threadName)
throws Exception {
// choose a random buffer.
ByteBuffer buffer = buffers.get(keyIndex % numBuffers);
int bufferCapacity = buffer.capacity();
String keyName = getKeyName(keyIndex, threadName);
LOG.trace("LOADGEN: Writing key {}", keyName);
try (OzoneOutputStream stream = bucket.createKey(keyName,
bufferCapacity, ReplicationType.RATIS, ReplicationFactor.THREE,
new HashMap<>())) {
stream.write(buffer.array());
LOG.trace("LOADGEN: Written key {}", keyName);
} catch (Throwable t) {
LOG.error("LOADGEN: Create key:{} failed with exception, skipping",
keyName, t);
throw t;
}
return keyName;
}
private void readData(OzoneBucket bucket, String keyName, int index)
throws Exception {
LOG.trace("LOADGEN: Reading key {}", keyName);
ByteBuffer buffer = buffers.get(index % numBuffers);
int bufferCapacity = buffer.capacity();
try (OzoneInputStream stream = bucket.readKey(keyName)) {
byte[] readBuffer = new byte[bufferCapacity];
int readLen = stream.read(readBuffer);
if (readLen < bufferCapacity) {
throw new IOException("Read mismatch, key:" + keyName +
" read data length:" + readLen +
" is smaller than excepted:" + bufferCapacity);
}
if (!Arrays.equals(readBuffer, buffer.array())) {
throw new IOException("Read mismatch, key:" + keyName +
" read data does not match the written data");
}
LOG.trace("LOADGEN: Read key {}", keyName);
} catch (Throwable t) {
LOG.error("LOADGEN: Read key:{} failed with exception", keyName, t);
throw t;
}
}
private void deleteKey(OzoneBucket bucket, String keyName) throws Exception {
LOG.trace("LOADGEN: Deleting key {}", keyName);
try {
bucket.deleteKey(keyName);
LOG.trace("LOADGEN: Deleted key {}", keyName);
} catch (Throwable t) {
LOG.error("LOADGEN: Unable to delete key:{}", keyName, t);
throw t;
}
}
private Optional<Integer> randomKeyToRead() {
int currentIndex = agedFileWrittenIndex.get();
return currentIndex != 0
? Optional.of(RandomUtils.nextInt(0, currentIndex))
: Optional.empty();
}
private void startAgedFilesLoad(long runTimeMillis) {
long threadID = Thread.currentThread().getId();
LOG.info("AGED LOADGEN: Started Aged IO Thread:{}.", threadID);
String threadName = Thread.currentThread().getName();
long startTime = Time.monotonicNow();
while (isWriteThreadRunning.get() &&
(Time.monotonicNow() < startTime + runTimeMillis)) {
String keyName = null;
try {
if (agedWriteProbability.isTrue()) {
keyName = writeData(agedFileWrittenIndex.getAndIncrement(),
agedLoadBucket, threadName);
} else {
Optional<Integer> index = randomKeyToRead();
if (index.isPresent()) {
keyName = getKeyName(index.get(), threadName);
readData(agedLoadBucket, keyName, index.get());
}
}
} catch (Throwable t) {
LOG.error("AGED LOADGEN: {} Exiting due to exception", keyName, t);
break;
}
}
// This will terminate other threads too.
isWriteThreadRunning.set(false);
LOG.info("Terminating IO thread:{}.", threadID);
}
void startIO(long time, TimeUnit timeUnit) {
List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
LOG.info("Starting MiniOzoneLoadGenerator for time {}:{} with {} buffers " +
"and {} threads", time, timeUnit, numBuffers, numWriteThreads);
if (isWriteThreadRunning.compareAndSet(false, true)) {
// Start the IO thread
for (int i = 0; i < numWriteThreads; i++) {
writeFutures.add(
CompletableFuture.runAsync(() -> load(timeUnit.toMillis(time)),
writeExecutor));
}
writeFutures.add(CompletableFuture.runAsync(() ->
startAgedFilesLoad(timeUnit.toMillis(time)), agedFileExecutor));
// Wait for IO to complete
for (CompletableFuture<Void> f : writeFutures) {
try {
f.get();
} catch (Throwable t) {
LOG.error("startIO failed with exception", t);
}
}
}
}
public void shutdownLoadGenerator() {
try {
writeExecutor.shutdown();
writeExecutor.awaitTermination(1, TimeUnit.DAYS);
} catch (Exception e) {
LOG.error("error while closing ", e);
}
}
private static String getKeyName(int keyIndex, String threadName) {
return threadName + keyNameDelimiter + keyIndex;
}
}