blob: 3ea9ab8fce5ab05247457901f36eef27db7a6855 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.test;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.util.Time;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
/**
* Class containing methods and associated classes to make the most of Lambda
* expressions in Hadoop tests.
*
* The code has been designed from the outset to be Java-8 friendly, but
* to still be usable in Java 7.
*
* The code is modelled on {@code GenericTestUtils#waitFor(Supplier, int, int)},
* but also lifts concepts from Scalatest's {@code awaitResult} and
* its notion of pluggable retry logic (simple, backoff, maybe even things
* with jitter: test author gets to choose).
* The {@link #intercept(Class, Callable)} method is also all credit due
* Scalatest, though it's been extended to also support a string message
* check; useful when checking the contents of the exception.
*/
public final class LambdaTestUtils {
public static final Logger LOG =
LoggerFactory.getLogger(LambdaTestUtils.class);
private LambdaTestUtils() {
}
/**
* This is the string included in the assertion text in
* {@link #intercept(Class, Callable)} if
* the closure returned a null value.
*/
public static final String NULL_RESULT = "(null)";
/**
* Interface to implement for converting a timeout into some form
* of exception to raise.
*/
public interface TimeoutHandler {
/**
* Create an exception (or throw one, if desired).
* @param timeoutMillis timeout which has arisen
* @param caught any exception which was caught; may be null
* @return an exception which will then be thrown
* @throws Exception if the handler wishes to raise an exception
* that way.
*/
Throwable evaluate(int timeoutMillis, Throwable caught) throws Throwable;
}
/**
* Wait for a condition to be met, with a retry policy returning the
* sleep time before the next attempt is made. If, at the end
* of the timeout period, the condition is still false (or failing with
* an exception), the timeout handler is invoked, passing in the timeout
* and any exception raised in the last invocation. The exception returned
* by this timeout handler is then rethrown.
* <p>
* Example: Wait 30s for a condition to be met, with a sleep of 30s
* between each probe.
* If the operation is failing, then, after 30s, the timeout handler
* is called. This returns the exception passed in (if any),
* or generates a new one.
* <pre>
* await(
* 30 * 1000,
* () -> { return 0 == filesystem.listFiles(new Path("/")).length); },
* () -> 500),
* (timeout, ex) -> ex != null ? ex : new TimeoutException("timeout"));
* </pre>
*
* @param timeoutMillis timeout in milliseconds.
* Can be zero, in which case only one attempt is made.
* @param check predicate to evaluate
* @param retry retry escalation logic
* @param timeoutHandler handler invoked on timeout;
* the returned exception will be thrown
* @return the number of iterations before the condition was satisfied
* @throws Exception the exception returned by {@code timeoutHandler} on
* timeout
* @throws FailFastException immediately if the evaluated operation raises it
* @throws InterruptedException if interrupted.
*/
public static int await(int timeoutMillis,
Callable<Boolean> check,
Callable<Integer> retry,
TimeoutHandler timeoutHandler)
throws Exception {
Preconditions.checkArgument(timeoutMillis >= 0,
"timeoutMillis must be >= 0");
Preconditions.checkNotNull(timeoutHandler);
long endTime = Time.now() + timeoutMillis;
Throwable ex = null;
boolean running = true;
int iterations = 0;
while (running) {
iterations++;
try {
if (check.call()) {
return iterations;
}
// the probe failed but did not raise an exception. Reset any
// exception raised by a previous probe failure.
ex = null;
} catch (InterruptedException
| FailFastException
| VirtualMachineError e) {
throw e;
} catch (Throwable e) {
LOG.debug("eventually() iteration {}", iterations, e);
ex = e;
}
running = Time.now() < endTime;
if (running) {
int sleeptime = retry.call();
if (sleeptime >= 0) {
Thread.sleep(sleeptime);
} else {
running = false;
}
}
}
// timeout
Throwable evaluate;
try {
evaluate = timeoutHandler.evaluate(timeoutMillis, ex);
if (evaluate == null) {
// bad timeout handler logic; fall back to GenerateTimeout so the
// underlying problem isn't lost.
LOG.error("timeout handler {} did not throw an exception ",
timeoutHandler);
evaluate = new GenerateTimeout().evaluate(timeoutMillis, ex);
}
} catch (Throwable throwable) {
evaluate = throwable;
}
return raise(evaluate);
}
/**
* Simplified {@link #await(int, Callable, Callable, TimeoutHandler)}
* operation with a fixed interval
* and {@link GenerateTimeout} handler to generate a {@code TimeoutException}.
* <p>
* Example: await for probe to succeed:
* <pre>
* await(
* 30 * 1000, 500,
* () -> { return 0 == filesystem.listFiles(new Path("/")).length); });
* </pre>
*
* @param timeoutMillis timeout in milliseconds.
* Can be zero, in which case only one attempt is made.
* @param intervalMillis interval in milliseconds between checks
* @param check predicate to evaluate
* @return the number of iterations before the condition was satisfied
* @throws Exception returned by {@code failure} on timeout
* @throws FailFastException immediately if the evaluated operation raises it
* @throws InterruptedException if interrupted.
*/
public static int await(int timeoutMillis,
int intervalMillis,
Callable<Boolean> check) throws Exception {
return await(timeoutMillis, check,
new FixedRetryInterval(intervalMillis),
new GenerateTimeout());
}
/**
* Repeatedly execute a closure until it returns a value rather than
* raise an exception.
* Exceptions are caught and, with one exception,
* trigger a sleep and retry. This is similar of ScalaTest's
* {@code eventually(timeout, closure)} operation, though that lacks
* the ability to fail fast if the inner closure has determined that
* a failure condition is non-recoverable.
* <p>
* Example: spin until an the number of files in a filesystem is non-zero,
* returning the files found.
* The sleep interval backs off by 500 ms each iteration to a maximum of 5s.
* <pre>
* FileStatus[] files = eventually( 30 * 1000,
* () -> {
* FileStatus[] f = filesystem.listFiles(new Path("/"));
* assertEquals(0, f.length);
* return f;
* },
* new ProportionalRetryInterval(500, 5000));
* </pre>
* This allows for a fast exit, yet reduces probe frequency over time.
*
* @param <T> return type
* @param timeoutMillis timeout in milliseconds.
* Can be zero, in which case only one attempt is made before failing.
* @param eval expression to evaluate
* @param retry retry interval generator
* @return result of the first successful eval call
* @throws Exception the last exception thrown before timeout was triggered
* @throws FailFastException if raised -without any retry attempt.
* @throws InterruptedException if interrupted during the sleep operation.
* @throws OutOfMemoryError you've run out of memory.
*/
public static <T> T eventually(int timeoutMillis,
Callable<T> eval,
Callable<Integer> retry) throws Exception {
Preconditions.checkArgument(timeoutMillis >= 0,
"timeoutMillis must be >= 0");
long endTime = Time.now() + timeoutMillis;
Throwable ex;
boolean running;
int sleeptime;
int iterations = 0;
do {
iterations++;
try {
return eval.call();
} catch (InterruptedException
| FailFastException
| VirtualMachineError e) {
// these two exceptions trigger an immediate exit
throw e;
} catch (Throwable e) {
LOG.debug("evaluate() iteration {}", iterations, e);
ex = e;
}
running = Time.now() < endTime;
if (running && (sleeptime = retry.call()) >= 0) {
Thread.sleep(sleeptime);
}
} while (running);
// timeout. Throw the last exception raised
return raise(ex);
}
/**
* Take the throwable and raise it as an exception or an error, depending
* upon its type. This allows callers to declare that they only throw
* Exception (i.e. can be invoked by Callable) yet still rethrow a
* previously caught Throwable.
* @param throwable Throwable to rethrow
* @param <T> expected return type
* @return never
* @throws Exception if throwable is an Exception
* @throws Error if throwable is not an Exception
*/
private static <T> T raise(Throwable throwable) throws Exception {
if (throwable instanceof Exception) {
throw (Exception) throwable;
} else {
throw (Error) throwable;
}
}
/**
* Variant of {@link #eventually(int, Callable, Callable)} method for
* void lambda expressions.
* @param timeoutMillis timeout in milliseconds.
* Can be zero, in which case only one attempt is made before failing.
* @param eval expression to evaluate
* @param retry retry interval generator
* @throws Exception the last exception thrown before timeout was triggered
* @throws FailFastException if raised -without any retry attempt.
* @throws InterruptedException if interrupted during the sleep operation.
*/
public static void eventually(int timeoutMillis,
VoidCallable eval,
Callable<Integer> retry) throws Exception {
eventually(timeoutMillis, new VoidCaller(eval), retry);
}
/**
* Simplified {@link #eventually(int, Callable, Callable)} method
* with a fixed interval.
* <p>
* Example: wait 30s until an assertion holds, sleeping 1s between each
* check.
* <pre>
* eventually( 30 * 1000, 1000,
* () -> { assertEquals(0, filesystem.listFiles(new Path("/")).length); }
* );
* </pre>
*
* @param timeoutMillis timeout in milliseconds.
* Can be zero, in which case only one attempt is made before failing.
* @param intervalMillis interval in milliseconds
* @param eval expression to evaluate
* @return result of the first successful invocation of {@code eval()}
* @throws Exception the last exception thrown before timeout was triggered
* @throws FailFastException if raised -without any retry attempt.
* @throws InterruptedException if interrupted during the sleep operation.
*/
public static <T> T eventually(int timeoutMillis,
int intervalMillis,
Callable<T> eval) throws Exception {
return eventually(timeoutMillis, eval,
new FixedRetryInterval(intervalMillis));
}
/**
/**
* Variant of {@link #eventually(int, int, Callable)} method for
* void lambda expressions.
* @param timeoutMillis timeout in milliseconds.
* Can be zero, in which case only one attempt is made before failing.
* @param intervalMillis interval in milliseconds
* @param eval expression to evaluate
* @throws Exception the last exception thrown before timeout was triggered
* @throws FailFastException if raised -without any retry attempt.
* @throws InterruptedException if interrupted during the sleep operation.
*/
public static void eventually(int timeoutMillis,
int intervalMillis,
VoidCallable eval) throws Exception {
eventually(timeoutMillis, eval,
new FixedRetryInterval(intervalMillis));
}
/**
* Intercept an exception; throw an {@code AssertionError} if one not raised.
* The caught exception is rethrown if it is of the wrong class or
* does not contain the text defined in {@code contained}.
* <p>
* Example: expect deleting a nonexistent file to raise a
* {@code FileNotFoundException}.
* <pre>
* FileNotFoundException ioe = intercept(FileNotFoundException.class,
* () -> {
* filesystem.delete(new Path("/missing"), false);
* });
* </pre>
*
* @param clazz class of exception; the raised exception must be this class
* <i>or a subclass</i>.
* @param eval expression to eval
* @param <T> return type of expression
* @param <E> exception class
* @return the caught exception if it was of the expected type
* @throws Exception any other exception raised
* @throws AssertionError if the evaluation call didn't raise an exception.
* The error includes the {@code toString()} value of the result, if this
* can be determined.
*/
@SuppressWarnings("unchecked")
public static <T, E extends Throwable> E intercept(
Class<E> clazz,
Callable<T> eval)
throws Exception {
try {
T result = eval.call();
throw new AssertionError("Expected an exception, got "
+ robustToString(result));
} catch (Throwable e) {
if (clazz.isAssignableFrom(e.getClass())) {
return (E)e;
}
throw e;
}
}
/**
* Variant of {@link #intercept(Class, Callable)} to simplify void
* invocations.
* @param clazz class of exception; the raised exception must be this class
* <i>or a subclass</i>.
* @param eval expression to eval
* @param <E> exception class
* @return the caught exception if it was of the expected type
* @throws Exception any other exception raised
* @throws AssertionError if the evaluation call didn't raise an exception.
*/
@SuppressWarnings("unchecked")
public static <E extends Throwable> E intercept(
Class<E> clazz,
VoidCallable eval)
throws Exception {
try {
eval.call();
throw new AssertionError("Expected an exception");
} catch (Throwable e) {
if (clazz.isAssignableFrom(e.getClass())) {
return (E)e;
}
throw e;
}
}
/**
* Intercept an exception; throw an {@code AssertionError} if one not raised.
* The caught exception is rethrown if it is of the wrong class or
* does not contain the text defined in {@code contained}.
* <p>
* Example: expect deleting a nonexistent file to raise a
* {@code FileNotFoundException} with the {@code toString()} value
* containing the text {@code "missing"}.
* <pre>
* FileNotFoundException ioe = intercept(FileNotFoundException.class,
* "missing",
* () -> {
* filesystem.delete(new Path("/missing"), false);
* });
* </pre>
*
* @param clazz class of exception; the raised exception must be this class
* <i>or a subclass</i>.
* @param contained string which must be in the {@code toString()} value
* of the exception
* @param eval expression to eval
* @param <T> return type of expression
* @param <E> exception class
* @return the caught exception if it was of the expected type and contents
* @throws Exception any other exception raised
* @throws AssertionError if the evaluation call didn't raise an exception.
* The error includes the {@code toString()} value of the result, if this
* can be determined.
* @see GenericTestUtils#assertExceptionContains(String, Throwable)
*/
public static <T, E extends Throwable> E intercept(
Class<E> clazz,
String contained,
Callable<T> eval)
throws Exception {
E ex = intercept(clazz, eval);
GenericTestUtils.assertExceptionContains(contained, ex);
return ex;
}
/**
* Variant of {@link #intercept(Class, Callable)} to simplify void
* invocations.
* @param clazz class of exception; the raised exception must be this class
* <i>or a subclass</i>.
* @param contained string which must be in the {@code toString()} value
* of the exception
* @param eval expression to eval
* @param <E> exception class
* @return the caught exception if it was of the expected type
* @throws Exception any other exception raised
* @throws AssertionError if the evaluation call didn't raise an exception.
*/
public static <E extends Throwable> E intercept(
Class<E> clazz,
String contained,
VoidCallable eval)
throws Exception {
E ex = intercept(clazz, eval);
GenericTestUtils.assertExceptionContains(contained, ex);
return ex;
}
/**
* Robust string converter for exception messages; if the {@code toString()}
* method throws an exception then that exception is caught and logged,
* then a simple string of the classname logged.
* This stops a {@code toString()} failure hiding underlying problems.
* @param o object to stringify
* @return a string for exception messages
*/
private static String robustToString(Object o) {
if (o == null) {
return NULL_RESULT;
} else {
try {
return o.toString();
} catch (Exception e) {
LOG.info("Exception calling toString()", e);
return o.getClass().toString();
}
}
}
/**
* Returns {@code TimeoutException} on a timeout. If
* there was a inner class passed in, includes it as the
* inner failure.
*/
public static class GenerateTimeout implements TimeoutHandler {
private final String message;
public GenerateTimeout(String message) {
this.message = message;
}
public GenerateTimeout() {
this("timeout");
}
/**
* Evaluate operation creates a new {@code TimeoutException}.
* @param timeoutMillis timeout in millis
* @param caught optional caught exception
* @return TimeoutException
*/
@Override
public Throwable evaluate(int timeoutMillis, Throwable caught)
throws Throwable {
String s = String.format("%s: after %d millis", message,
timeoutMillis);
String caughtText = caught != null
? ("; " + robustToString(caught)) : "";
return (new TimeoutException(s + caughtText)
.initCause(caught));
}
}
/**
* Retry at a fixed time period between calls.
*/
public static class FixedRetryInterval implements Callable<Integer> {
private final int intervalMillis;
private int invocationCount = 0;
public FixedRetryInterval(int intervalMillis) {
Preconditions.checkArgument(intervalMillis > 0);
this.intervalMillis = intervalMillis;
}
@Override
public Integer call() throws Exception {
invocationCount++;
return intervalMillis;
}
public int getInvocationCount() {
return invocationCount;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"FixedRetryInterval{");
sb.append("interval=").append(intervalMillis);
sb.append(", invocationCount=").append(invocationCount);
sb.append('}');
return sb.toString();
}
}
/**
* Gradually increase the sleep time by the initial interval, until
* the limit set by {@code maxIntervalMillis} is reached.
*/
public static class ProportionalRetryInterval implements Callable<Integer> {
private final int intervalMillis;
private final int maxIntervalMillis;
private int current;
private int invocationCount = 0;
public ProportionalRetryInterval(int intervalMillis,
int maxIntervalMillis) {
Preconditions.checkArgument(intervalMillis > 0);
Preconditions.checkArgument(maxIntervalMillis > 0);
this.intervalMillis = intervalMillis;
this.current = intervalMillis;
this.maxIntervalMillis = maxIntervalMillis;
}
@Override
public Integer call() throws Exception {
invocationCount++;
int last = current;
if (last < maxIntervalMillis) {
current += intervalMillis;
}
return last;
}
public int getInvocationCount() {
return invocationCount;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"ProportionalRetryInterval{");
sb.append("interval=").append(intervalMillis);
sb.append(", current=").append(current);
sb.append(", limit=").append(maxIntervalMillis);
sb.append(", invocationCount=").append(invocationCount);
sb.append('}');
return sb.toString();
}
}
/**
* An exception which triggers a fast exist from the
* {@link #eventually(int, Callable, Callable)} and
* {@link #await(int, Callable, Callable, TimeoutHandler)} loops.
*/
public static class FailFastException extends Exception {
public FailFastException(String detailMessage) {
super(detailMessage);
}
public FailFastException(String message, Throwable cause) {
super(message, cause);
}
/**
* Instantiate from a format string.
* @param format format string
* @param args arguments to format
* @return an instance with the message string constructed.
*/
public static FailFastException newInstance(String format, Object...args) {
return new FailFastException(String.format(format, args));
}
}
/**
* A simple interface for lambdas, which returns nothing; this exists
* to simplify lambda tests on operations with no return value.
*/
public interface VoidCallable {
void call() throws Exception;
}
/**
* Bridge class to make {@link VoidCallable} something to use in anything
* which takes an {@link Callable}.
*/
public static class VoidCaller implements Callable<Void> {
private final VoidCallable callback;
public VoidCaller(VoidCallable callback) {
this.callback = callback;
}
@Override
public Void call() throws Exception {
callback.call();
return null;
}
}
}