blob: e54971e491c3209a2d04315f4afd6fb4fe2bc1e0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.test;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.Enumeration;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Appender;
import org.apache.log4j.Layout;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.WriterAppender;
import org.junit.Assert;
import org.junit.Assume;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.util.functional.CommonCallableSupplier.submit;
import static org.apache.hadoop.util.functional.CommonCallableSupplier.waitForCompletion;
/**
* Test provides some very generic helpers which might be used across the tests
*/
public abstract class GenericTestUtils {
public static final int EXECUTOR_THREAD_COUNT = 64;
private static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(GenericTestUtils.class);
public static final String PREFIX = "file-";
private static final AtomicInteger sequence = new AtomicInteger();
/**
* system property for test data: {@value}
*/
public static final String SYSPROP_TEST_DATA_DIR = "test.build.data";
/**
* Default path for test data: {@value}
*/
public static final String DEFAULT_TEST_DATA_DIR =
"target" + File.separator + "test" + File.separator + "data";
/**
* The default path for using in Hadoop path references: {@value}
*/
public static final String DEFAULT_TEST_DATA_PATH = "target/test/data/";
/**
* Error string used in
* {@link GenericTestUtils#waitFor(Supplier, long, long)}.
*/
public static final String ERROR_MISSING_ARGUMENT =
"Input supplier interface should be initailized";
public static final String ERROR_INVALID_ARGUMENT =
"Total wait time should be greater than check interval time";
@Deprecated
public static Logger toLog4j(org.slf4j.Logger logger) {
return LogManager.getLogger(logger.getName());
}
/**
* @deprecated use {@link #disableLog(org.slf4j.Logger)} instead
*/
@Deprecated
public static void disableLog(Logger logger) {
logger.setLevel(Level.OFF);
}
public static void disableLog(org.slf4j.Logger logger) {
disableLog(toLog4j(logger));
}
public static void setLogLevel(Logger logger, Level level) {
logger.setLevel(level);
}
/**
* @deprecated
* use {@link #setLogLevel(org.slf4j.Logger, org.slf4j.event.Level)} instead
*/
@Deprecated
public static void setLogLevel(org.slf4j.Logger logger, Level level) {
setLogLevel(toLog4j(logger), level);
}
public static void setLogLevel(org.slf4j.Logger logger,
org.slf4j.event.Level level) {
setLogLevel(toLog4j(logger), Level.toLevel(level.toString()));
}
public static void setRootLogLevel(org.slf4j.event.Level level) {
setLogLevel(LogManager.getRootLogger(), Level.toLevel(level.toString()));
}
public static void setCurrentLoggersLogLevel(org.slf4j.event.Level level) {
for (Enumeration<?> loggers = LogManager.getCurrentLoggers();
loggers.hasMoreElements();) {
Logger logger = (Logger) loggers.nextElement();
logger.setLevel(Level.toLevel(level.toString()));
}
}
public static org.slf4j.event.Level toLevel(String level) {
return toLevel(level, org.slf4j.event.Level.DEBUG);
}
public static org.slf4j.event.Level toLevel(
String level, org.slf4j.event.Level defaultLevel) {
try {
return org.slf4j.event.Level.valueOf(level);
} catch (IllegalArgumentException e) {
return defaultLevel;
}
}
/**
* Extracts the name of the method where the invocation has happened
* @return String name of the invoking method
*/
public static String getMethodName() {
return Thread.currentThread().getStackTrace()[2].getMethodName();
}
/**
* Generates a process-wide unique sequence number.
* @return an unique sequence number
*/
public static int uniqueSequenceId() {
return sequence.incrementAndGet();
}
/**
* Creates a directory for the data/logs of the unit test.
* It first deletes the directory if it exists.
*
* @param testClass the unit test class.
* @return the Path of the root directory.
*/
public static File setupTestRootDir(Class<?> testClass) {
File testRootDir = getTestDir(testClass.getSimpleName());
if (testRootDir.exists()) {
FileUtil.fullyDelete(testRootDir);
}
testRootDir.mkdirs();
return testRootDir;
}
/**
* Get the (created) base directory for tests.
* @return the absolute directory
*/
public static File getTestDir() {
String prop = System.getProperty(SYSPROP_TEST_DATA_DIR, DEFAULT_TEST_DATA_DIR);
if (prop.isEmpty()) {
// corner case: property is there but empty
prop = DEFAULT_TEST_DATA_DIR;
}
File dir = new File(prop).getAbsoluteFile();
dir.mkdirs();
assertExists(dir);
return dir;
}
/**
* Get an uncreated directory for tests.
* @return the absolute directory for tests. Caller is expected to create it.
*/
public static File getTestDir(String subdir) {
return new File(getTestDir(), subdir).getAbsoluteFile();
}
/**
* Get an uncreated directory for tests with a randomized alphanumeric
* name. This is likely to provide a unique path for tests run in parallel
* @return the absolute directory for tests. Caller is expected to create it.
*/
public static File getRandomizedTestDir() {
return new File(getRandomizedTempPath());
}
/**
* Get a temp path. This may or may not be relative; it depends on what the
* {@link #SYSPROP_TEST_DATA_DIR} is set to. If unset, it returns a path
* under the relative path {@link #DEFAULT_TEST_DATA_PATH}
* @param subpath sub path, with no leading "/" character
* @return a string to use in paths
*/
public static String getTempPath(String subpath) {
String prop = (Path.WINDOWS) ? DEFAULT_TEST_DATA_PATH
: System.getProperty(SYSPROP_TEST_DATA_DIR, DEFAULT_TEST_DATA_PATH);
if (prop.isEmpty()) {
// corner case: property is there but empty
prop = DEFAULT_TEST_DATA_PATH;
}
if (!prop.endsWith("/")) {
prop = prop + "/";
}
return prop + subpath;
}
/**
* Get a temp path. This may or may not be relative; it depends on what the
* {@link #SYSPROP_TEST_DATA_DIR} is set to. If unset, it returns a path
* under the relative path {@link #DEFAULT_TEST_DATA_PATH}
* @return a string to use in paths
*/
public static String getRandomizedTempPath() {
return getTempPath(RandomStringUtils.randomAlphanumeric(10));
}
/**
* Assert that a given file exists.
*/
public static void assertExists(File f) {
Assert.assertTrue("File " + f + " should exist", f.exists());
}
/**
* List all of the files in 'dir' that match the regex 'pattern'.
* Then check that this list is identical to 'expectedMatches'.
* @throws IOException if the dir is inaccessible
*/
public static void assertGlobEquals(File dir, String pattern,
String ... expectedMatches) throws IOException {
Set<String> found = new TreeSet<>();
for (File f : FileUtil.listFiles(dir)) {
if (f.getName().matches(pattern)) {
found.add(f.getName());
}
}
Set<String> expectedSet = new TreeSet<>(
Arrays.asList(expectedMatches));
Assert.assertEquals("Bad files matching " + pattern + " in " + dir,
Joiner.on(",").join(expectedSet),
Joiner.on(",").join(found));
}
static final String E_NULL_THROWABLE = "Null Throwable";
static final String E_NULL_THROWABLE_STRING =
"Null Throwable.toString() value";
static final String E_UNEXPECTED_EXCEPTION = "but got unexpected exception";
/**
* Assert that an exception's <code>toString()</code> value
* contained the expected text.
* @param expectedText expected string
* @param t thrown exception
* @throws AssertionError if the expected string is not found
*/
public static void assertExceptionContains(String expectedText, Throwable t) {
assertExceptionContains(expectedText, t, "");
}
/**
* Assert that an exception's <code>toString()</code> value
* contained the expected text.
* @param expectedText expected string
* @param t thrown exception
* @param message any extra text for the string
* @throws AssertionError if the expected string is not found
*/
public static void assertExceptionContains(String expectedText,
Throwable t,
String message) {
Assert.assertNotNull(E_NULL_THROWABLE, t);
String msg = t.toString();
if (msg == null) {
throw new AssertionError(E_NULL_THROWABLE_STRING, t);
}
if (expectedText != null && !msg.contains(expectedText)) {
String prefix = org.apache.commons.lang3.StringUtils.isEmpty(message)
? "" : (message + ": ");
throw new AssertionError(
String.format("%s Expected to find '%s' %s: %s",
prefix, expectedText, E_UNEXPECTED_EXCEPTION,
StringUtils.stringifyException(t)),
t);
}
}
/**
* Wait for the specified test to return true. The test will be performed
* initially and then every {@code checkEveryMillis} until at least
* {@code waitForMillis} time has expired. If {@code check} is null or
* {@code waitForMillis} is less than {@code checkEveryMillis} this method
* will throw an {@link IllegalArgumentException}.
*
* @param check the test to perform
* @param checkEveryMillis how often to perform the test
* @param waitForMillis the amount of time after which no more tests will be
* performed
* @throws TimeoutException if the test does not return true in the allotted
* time
* @throws InterruptedException if the method is interrupted while waiting
*/
public static void waitFor(final Supplier<Boolean> check,
final long checkEveryMillis, final long waitForMillis)
throws TimeoutException, InterruptedException {
waitFor(check, checkEveryMillis, waitForMillis, null);
}
/**
* Wait for the specified test to return true. The test will be performed
* initially and then every {@code checkEveryMillis} until at least
* {@code waitForMillis} time has expired. If {@code check} is null or
* {@code waitForMillis} is less than {@code checkEveryMillis} this method
* will throw an {@link IllegalArgumentException}.
*
* @param check the test to perform.
* @param checkEveryMillis how often to perform the test.
* @param waitForMillis the amount of time after which no more tests will be
* performed.
* @param errorMsg error message to provide in TimeoutException.
* @throws TimeoutException if the test does not return true in the allotted
* time.
* @throws InterruptedException if the method is interrupted while waiting.
*/
public static void waitFor(final Supplier<Boolean> check,
final long checkEveryMillis, final long waitForMillis,
final String errorMsg) throws TimeoutException, InterruptedException {
Objects.requireNonNull(check, ERROR_MISSING_ARGUMENT);
if (waitForMillis < checkEveryMillis) {
throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT);
}
long st = Time.monotonicNow();
boolean result = check.get();
while (!result && (Time.monotonicNow() - st < waitForMillis)) {
Thread.sleep(checkEveryMillis);
result = check.get();
}
if (!result) {
final String exceptionErrorMsg = "Timed out waiting for condition. "
+ (org.apache.commons.lang3.StringUtils.isNotEmpty(errorMsg)
? "Error Message: " + errorMsg : "")
+ "\nThread diagnostics:\n" +
TimedOutTestsListener.buildThreadDiagnosticString();
throw new TimeoutException(exceptionErrorMsg);
}
}
/**
* Prints output to one {@link PrintStream} while copying to the other.
* <p>
* Closing the main {@link PrintStream} will NOT close the other.
*/
public static class TeePrintStream extends PrintStream {
private final PrintStream other;
public TeePrintStream(OutputStream main, PrintStream other) {
super(main);
this.other = other;
}
@Override
public void flush() {
super.flush();
other.flush();
}
@Override
public void write(byte[] buf, int off, int len) {
super.write(buf, off, len);
other.write(buf, off, len);
}
}
/**
* Capture output printed to {@link System#err}.
* <p>
* Usage:
* <pre>
* try (SystemErrCapturer capture = new SystemErrCapturer()) {
* ...
* // Call capture.getOutput() to get the output string
* }
* </pre>
*
* TODO: Add lambda support once Java 8 is common.
* <pre>
* SystemErrCapturer.withCapture(capture -> {
* ...
* })
* </pre>
*/
public static class SystemErrCapturer implements AutoCloseable {
final private ByteArrayOutputStream bytes;
final private PrintStream bytesPrintStream;
final private PrintStream oldErr;
public SystemErrCapturer() {
bytes = new ByteArrayOutputStream();
bytesPrintStream = new PrintStream(bytes);
oldErr = System.err;
System.setErr(new TeePrintStream(oldErr, bytesPrintStream));
}
public String getOutput() {
return bytes.toString();
}
@Override
public void close() throws Exception {
IOUtils.closeStream(bytesPrintStream);
System.setErr(oldErr);
}
}
public static class LogCapturer {
private StringWriter sw = new StringWriter();
private WriterAppender appender;
private Logger logger;
public static LogCapturer captureLogs(org.slf4j.Logger logger) {
if (logger.getName().equals("root")) {
return new LogCapturer(org.apache.log4j.Logger.getRootLogger());
}
return new LogCapturer(toLog4j(logger));
}
public static LogCapturer captureLogs(Logger logger) {
return new LogCapturer(logger);
}
private LogCapturer(Logger logger) {
this.logger = logger;
Appender defaultAppender = Logger.getRootLogger().getAppender("stdout");
if (defaultAppender == null) {
defaultAppender = Logger.getRootLogger().getAppender("console");
}
final Layout layout = (defaultAppender == null) ? new PatternLayout() :
defaultAppender.getLayout();
this.appender = new WriterAppender(layout, sw);
logger.addAppender(this.appender);
}
public String getOutput() {
return sw.toString();
}
public void stopCapturing() {
logger.removeAppender(appender);
}
public void clearOutput() {
sw.getBuffer().setLength(0);
}
}
/**
* Mockito answer helper that triggers one latch as soon as the
* method is called, then waits on another before continuing.
*/
public static class DelayAnswer implements Answer<Object> {
private final org.slf4j.Logger LOG;
private final CountDownLatch fireLatch = new CountDownLatch(1);
private final CountDownLatch waitLatch = new CountDownLatch(1);
private final CountDownLatch resultLatch = new CountDownLatch(1);
private final AtomicInteger fireCounter = new AtomicInteger(0);
private final AtomicInteger resultCounter = new AtomicInteger(0);
// Result fields set after proceed() is called.
private volatile Throwable thrown;
private volatile Object returnValue;
public DelayAnswer(org.slf4j.Logger log) {
this.LOG = log;
}
/**
* Wait until the method is called.
*/
public void waitForCall() throws InterruptedException {
fireLatch.await();
}
/**
* Tell the method to proceed.
* This should only be called after waitForCall()
*/
public void proceed() {
waitLatch.countDown();
}
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
LOG.info("DelayAnswer firing fireLatch");
fireCounter.getAndIncrement();
fireLatch.countDown();
try {
LOG.info("DelayAnswer waiting on waitLatch");
waitLatch.await();
LOG.info("DelayAnswer delay complete");
} catch (InterruptedException ie) {
throw new IOException("Interrupted waiting on latch", ie);
}
return passThrough(invocation);
}
protected Object passThrough(InvocationOnMock invocation) throws Throwable {
try {
Object ret = invocation.callRealMethod();
returnValue = ret;
return ret;
} catch (Throwable t) {
thrown = t;
throw t;
} finally {
resultCounter.incrementAndGet();
resultLatch.countDown();
}
}
/**
* After calling proceed(), this will wait until the call has
* completed and a result has been returned to the caller.
*/
public void waitForResult() throws InterruptedException {
resultLatch.await();
}
/**
* After the call has gone through, return any exception that
* was thrown, or null if no exception was thrown.
*/
public Throwable getThrown() {
return thrown;
}
/**
* After the call has gone through, return the call's return value,
* or null in case it was void or an exception was thrown.
*/
public Object getReturnValue() {
return returnValue;
}
public int getFireCount() {
return fireCounter.get();
}
public int getResultCount() {
return resultCounter.get();
}
}
/**
* An Answer implementation that simply forwards all calls through
* to a delegate.
*
* This is useful as the default Answer for a mock object, to create
* something like a spy on an RPC proxy. For example:
* <code>
* NamenodeProtocol origNNProxy = secondary.getNameNode();
* NamenodeProtocol spyNNProxy = Mockito.mock(NameNodeProtocol.class,
* new DelegateAnswer(origNNProxy);
* doThrow(...).when(spyNNProxy).getBlockLocations(...);
* ...
* </code>
*/
public static class DelegateAnswer implements Answer<Object> {
private final Object delegate;
private final org.slf4j.Logger log;
public DelegateAnswer(Object delegate) {
this(null, delegate);
}
public DelegateAnswer(org.slf4j.Logger log, Object delegate) {
this.log = log;
this.delegate = delegate;
}
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
try {
if (log != null) {
log.info("Call to " + invocation + " on " + delegate,
new Exception("TRACE"));
}
return invocation.getMethod().invoke(
delegate, invocation.getArguments());
} catch (InvocationTargetException ite) {
throw ite.getCause();
}
}
}
/**
* An Answer implementation which sleeps for a random number of milliseconds
* between 0 and a configurable value before delegating to the real
* implementation of the method. This can be useful for drawing out race
* conditions.
*/
public static class SleepAnswer implements Answer<Object> {
private final int minSleepTime;
private final int maxSleepTime;
private static Random r = new Random();
public SleepAnswer(int maxSleepTime) {
this(0, maxSleepTime);
}
public SleepAnswer(int minSleepTime, int maxSleepTime) {
this.minSleepTime = minSleepTime;
this.maxSleepTime = maxSleepTime;
}
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
Thread.sleep(r.nextInt(maxSleepTime - minSleepTime) + minSleepTime);
} catch (InterruptedException ie) {
interrupted = true;
}
try {
return invocation.callRealMethod();
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
public static void assertDoesNotMatch(String output, String pattern) {
Assert.assertFalse("Expected output to match /" + pattern + "/" +
" but got:\n" + output,
Pattern.compile(pattern).matcher(output).find());
}
public static void assertMatches(String output, String pattern) {
Assert.assertTrue("Expected output to match /" + pattern + "/" +
" but got:\n" + output,
Pattern.compile(pattern).matcher(output).find());
}
public static void assertValueNear(long expected, long actual, long allowedError) {
assertValueWithinRange(expected - allowedError, expected + allowedError, actual);
}
public static void assertValueWithinRange(long expectedMin, long expectedMax,
long actual) {
Assert.assertTrue("Expected " + actual + " to be in range (" + expectedMin + ","
+ expectedMax + ")", expectedMin <= actual && actual <= expectedMax);
}
/**
* Determine if there are any threads whose name matches the regex.
* @param pattern a Pattern object used to match thread names
* @return true if there is any thread that matches the pattern
*/
public static boolean anyThreadMatching(Pattern pattern) {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] infos =
threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20);
for (ThreadInfo info : infos) {
if (info == null)
continue;
if (pattern.matcher(info.getThreadName()).matches()) {
return true;
}
}
return false;
}
/**
* Assert that there are no threads running whose name matches the
* given regular expression.
* @param regex the regex to match against
*/
public static void assertNoThreadsMatching(String regex) {
Pattern pattern = Pattern.compile(regex);
if (anyThreadMatching(pattern)) {
Assert.fail("Leaked thread matches " + regex);
}
}
/**
* Periodically check and wait for any threads whose name match the
* given regular expression.
*
* @param regex the regex to match against.
* @param checkEveryMillis time (in milliseconds) between checks.
* @param waitForMillis total time (in milliseconds) to wait before throwing
* a time out exception.
* @throws TimeoutException
* @throws InterruptedException
*/
public static void waitForThreadTermination(String regex,
int checkEveryMillis, final int waitForMillis) throws TimeoutException,
InterruptedException {
final Pattern pattern = Pattern.compile(regex);
waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
return !anyThreadMatching(pattern);
}
}, checkEveryMillis, waitForMillis);
}
/**
* Skip test if native build profile of Maven is not activated.
* Sub-project using this must set 'runningWithNative' property to true
* in the definition of native profile in pom.xml.
*/
public static void assumeInNativeProfile() {
Assume.assumeTrue(
Boolean.parseBoolean(System.getProperty("runningWithNative", "false")));
}
/**
* Get the diff between two files.
*
* @param a
* @param b
* @return The empty string if there is no diff; the diff, otherwise.
*
* @throws IOException If there is an error reading either file.
*/
public static String getFilesDiff(File a, File b) throws IOException {
StringBuilder bld = new StringBuilder();
try (BufferedReader ra = new BufferedReader(
new InputStreamReader(new FileInputStream(a)));
BufferedReader rb = new BufferedReader(
new InputStreamReader(new FileInputStream(b)))) {
while (true) {
String la = ra.readLine();
String lb = rb.readLine();
if (la == null) {
if (lb != null) {
addPlusses(bld, ra);
}
break;
} else if (lb == null) {
if (la != null) {
addPlusses(bld, rb);
}
break;
}
if (!la.equals(lb)) {
bld.append(" - ").append(la).append("\n");
bld.append(" + ").append(lb).append("\n");
}
}
}
return bld.toString();
}
private static void addPlusses(StringBuilder bld, BufferedReader r)
throws IOException {
String l;
while ((l = r.readLine()) != null) {
bld.append(" + ").append(l).append("\n");
}
}
/**
* Formatted fail, via {@link String#format(String, Object...)}.
* @param format format string
* @param args argument list. If the last argument is a throwable, it
* is used as the inner cause of the exception
* @throws AssertionError with the formatted message
*/
public static void failf(String format, Object... args) {
String message = String.format(Locale.ENGLISH, format, args);
AssertionError error = new AssertionError(message);
int len = args.length;
if (len > 0 && args[len - 1] instanceof Throwable) {
error.initCause((Throwable) args[len - 1]);
}
throw error;
}
/**
* Conditional formatted fail, via {@link String#format(String, Object...)}.
* @param condition condition: if true the method fails
* @param format format string
* @param args argument list. If the last argument is a throwable, it
* is used as the inner cause of the exception
* @throws AssertionError with the formatted message
*/
public static void failif(boolean condition,
String format,
Object... args) {
if (condition) {
failf(format, args);
}
}
/**
* Retreive the max number of parallel test threads when running under maven.
* @return int number of threads
*/
public static int getTestsThreadCount() {
String propString = System.getProperty("testsThreadCount", "1");
int threadCount = 1;
if (propString != null) {
String trimProp = propString.trim();
if (trimProp.endsWith("C")) {
double multiplier = Double.parseDouble(
trimProp.substring(0, trimProp.length()-1));
double calculated = multiplier * ((double) Runtime
.getRuntime()
.availableProcessors());
threadCount = calculated > 0d ? Math.max((int) calculated, 1) : 0;
} else {
threadCount = Integer.parseInt(trimProp);
}
}
return threadCount;
}
/**
* Write the text to a file asynchronously. Logs the operation duration.
* @param fs filesystem
* @param path path
* @return future to the patch created.
*/
private static CompletableFuture<Path> put(FileSystem fs,
Path path, String text) {
return submit(EXECUTOR, () -> {
try (DurationInfo ignore =
new DurationInfo(LOG, false, "Creating %s", path)) {
createFile(fs, path, true, text.getBytes(StandardCharsets.UTF_8));
return path;
}
});
}
/**
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
* @param fileCount number of files to create.
* @param dirCount number of dirs to create at each level
* @return the list of files created.
*/
public static List<Path> createFiles(final FileSystem fs,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount) throws IOException {
return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
new ArrayList<>(fileCount),
new ArrayList<>(dirCount));
}
/**
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
* @param fileCount number of files to create.
* @param dirCount number of dirs to create at each level
* @param paths [out] list of file paths created
* @param dirs [out] list of directory paths created.
* @return the list of files created.
*/
public static List<Path> createDirsAndFiles(final FileSystem fs,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount,
final List<Path> paths,
final List<Path> dirs) throws IOException {
buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
+ dirs.size());
// create directories. With dir marker retention, that adds more entries
// to cause deletion issues
try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
for (Path path : dirs) {
futures.add(submit(EXECUTOR, () ->{
fs.mkdirs(path);
return path;
}));
}
waitForCompletion(futures);
}
try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d files", paths.size())) {
for (Path path : paths) {
futures.add(put(fs, path, path.getName()));
}
waitForCompletion(futures);
return paths;
}
}
/**
* Recursive method to build up lists of files and directories.
* @param filePaths list of file paths to add entries to.
* @param dirPaths list of directory paths to add entries to.
* @param destDir destination directory.
* @param depth depth of directories
* @param fileCount number of files.
* @param dirCount number of directories.
*/
public static void buildPaths(final List<Path> filePaths,
final List<Path> dirPaths, final Path destDir, final int depth,
final int fileCount, final int dirCount) {
if (depth <= 0) {
return;
}
// create the file paths
for (int i = 0; i < fileCount; i++) {
String name = filenameOfIndex(i);
Path p = new Path(destDir, name);
filePaths.add(p);
}
for (int i = 0; i < dirCount; i++) {
String name = String.format("dir-%03d", i);
Path p = new Path(destDir, name);
dirPaths.add(p);
buildPaths(filePaths, dirPaths, p, depth - 1, fileCount, dirCount);
}
}
/**
* Given an index, return a string to use as the filename.
* @param i index
* @return name
*/
public static String filenameOfIndex(final int i) {
return String.format("%s%03d", PREFIX, i);
}
/**
* For submitting work.
*/
private static final BlockingThreadPoolExecutorService EXECUTOR =
BlockingThreadPoolExecutorService.newInstance(
EXECUTOR_THREAD_COUNT,
EXECUTOR_THREAD_COUNT * 2,
30, TimeUnit.SECONDS,
"test-operations");
}