blob: 401e71b174835a89cbe6b0aa42b2ccc97b279f13 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.freon;
import com.codahale.metrics.Timer;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
/**
* Synthetic read/write file operations workload generator tool.
*/
@Command(name = "obrwf",
aliases = "om-bucket-read-write-file-ops",
description = "Creates files, performs respective read/write " +
"operations to measure lock performance.",
versionProvider = HddsVersionProvider.class,
mixinStandardHelpOptions = true,
showDefaultValues = true)
public class OmBucketReadWriteFileOps extends BaseFreonGenerator
implements Callable<Void> {
private static final Logger LOG =
LoggerFactory.getLogger(OmBucketReadWriteFileOps.class);
@Option(names = {"-P", "--root-path"},
description = "Root path",
defaultValue = "o3fs://bucket1.vol1/dir1/dir2")
private String rootPath;
@Option(names = {"-r", "--file-count-for-read"},
description = "Number of files to be written in the read directory.",
defaultValue = "100")
private int fileCountForRead;
@Option(names = {"-w", "--file-count-for-write"},
description = "Number of files to be written in the write directory.",
defaultValue = "10")
private int fileCountForWrite;
@Option(names = {"-g", "--file-size"},
description = "Generated data size (in bytes) of each file to be " +
"written in each directory.",
defaultValue = "256")
private long fileSizeInBytes;
@Option(names = {"-b", "--buffer"},
description = "Size of buffer used to generated the file content.",
defaultValue = "64")
private int bufferSize;
@Option(names = {"-l", "--name-len"},
description = "Length of the random name of directory you want to " +
"create.",
defaultValue = "10")
private int length;
@Option(names = {"-c", "--total-thread-count"},
description = "Total number of threads to be executed.",
defaultValue = "100")
private int totalThreadCount;
@Option(names = {"-T", "--read-thread-percentage"},
description = "Percentage of the total number of threads to be " +
"allocated for read operations. The remaining percentage of " +
"threads will be allocated for write operations.",
defaultValue = "90")
private int readThreadPercentage;
@Option(names = {"-R", "--num-of-read-operations"},
description = "Number of read operations to be performed by each thread.",
defaultValue = "50")
private int numOfReadOperations;
@Option(names = {"-W", "--num-of-write-operations"},
description = "Number of write operations to be performed by each " +
"thread.",
defaultValue = "10")
private int numOfWriteOperations;
private Timer timer;
private ContentGenerator contentGenerator;
private FileSystem fileSystem;
private int readThreadCount;
private int writeThreadCount;
@Override
public Void call() throws Exception {
init();
readThreadCount = (readThreadPercentage * totalThreadCount) / 100;
writeThreadCount = totalThreadCount - readThreadCount;
print("rootPath: " + rootPath);
print("fileCountForRead: " + fileCountForRead);
print("fileCountForWrite: " + fileCountForWrite);
print("fileSizeInBytes: " + fileSizeInBytes);
print("bufferSize: " + bufferSize);
print("totalThreadCount: " + totalThreadCount);
print("readThreadPercentage: " + readThreadPercentage);
print("writeThreadPercentage: " + (100 - readThreadPercentage));
print("readThreadCount: " + readThreadCount);
print("writeThreadCount: " + writeThreadCount);
print("numOfReadOperations: " + numOfReadOperations);
print("numOfWriteOperations: " + numOfWriteOperations);
OzoneConfiguration configuration = createOzoneConfiguration();
fileSystem = FileSystem.get(URI.create(rootPath), configuration);
contentGenerator = new ContentGenerator(fileSizeInBytes, bufferSize);
timer = getMetrics().timer("file-create");
runTests(this::mainMethod);
return null;
}
private void mainMethod(long counter) throws Exception {
int readResult = readOperations();
int writeResult = writeOperations();
print("Total Files Read: " + readResult);
print("Total Files Written: " + writeResult * fileCountForWrite);
// TODO: print read/write lock metrics (HDDS-6435, HDDS-6436).
}
private int readOperations() throws Exception {
// Create fileCountForRead (defaultValue = 1000) files under
// rootPath/readPath directory
String readPath =
rootPath.concat(OzoneConsts.OM_KEY_PREFIX).concat("readPath");
fileSystem.mkdirs(new Path(readPath));
createFiles(readPath, fileCountForRead);
// Start readThreadCount (defaultValue = 90) concurrent read threads
// performing numOfReadOperations (defaultValue = 50) iterations
// of read operations (fileSystem.listStatus(rootPath/readPath))
ExecutorService readService = Executors.newFixedThreadPool(readThreadCount);
CompletionService<Integer> readExecutorCompletionService =
new ExecutorCompletionService<>(readService);
List<Future<Integer>> readFutures = new ArrayList<>();
for (int i = 0; i < readThreadCount; i++) {
readFutures.add(readExecutorCompletionService.submit(() -> {
int readCount = 0;
try {
for (int j = 0; j < numOfReadOperations; j++) {
FileStatus[] status =
fileSystem.listStatus(new Path(readPath));
readCount += status.length;
}
} catch (IOException e) {
LOG.warn("Exception while listing status ", e);
}
return readCount;
}));
}
int readResult = 0;
for (int i = 0; i < readFutures.size(); i++) {
try {
readResult += readExecutorCompletionService.take().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
readService.shutdown();
return readResult;
}
private int writeOperations() throws Exception {
// Start writeThreadCount (defaultValue = 10) concurrent write threads
// performing numOfWriteOperations (defaultValue = 10) iterations
// of write operations (createFiles(rootPath/writePath))
String writePath =
rootPath.concat(OzoneConsts.OM_KEY_PREFIX).concat("writePath");
fileSystem.mkdirs(new Path(writePath));
ExecutorService writeService =
Executors.newFixedThreadPool(writeThreadCount);
CompletionService<Integer> writeExecutorCompletionService =
new ExecutorCompletionService<>(writeService);
List<Future<Integer>> writeFutures = new ArrayList<>();
for (int i = 0; i < writeThreadCount; i++) {
writeFutures.add(writeExecutorCompletionService.submit(() -> {
int writeCount = 0;
try {
for (int j = 0; j < numOfWriteOperations; j++) {
createFiles(writePath, fileCountForWrite);
writeCount++;
}
} catch (IOException e) {
LOG.warn("Exception while creating file ", e);
}
return writeCount;
}));
}
int writeResult = 0;
for (int i = 0; i < writeFutures.size(); i++) {
try {
writeResult += writeExecutorCompletionService.take().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
writeService.shutdown();
return writeResult;
}
private void createFile(String dir, long counter) throws Exception {
String fileName = dir.concat(OzoneConsts.OM_KEY_PREFIX)
.concat(RandomStringUtils.randomAlphanumeric(length));
Path file = new Path(fileName);
if (LOG.isDebugEnabled()) {
LOG.debug("FilePath:{}", file);
}
timer.time(() -> {
try (FSDataOutputStream output = fileSystem.create(file)) {
contentGenerator.write(output);
}
return null;
});
}
private void createFiles(String dir, int fileCount) throws Exception {
for (int i = 0; i < fileCount; i++) {
createFile(dir, i);
}
}
}