blob: bfeb061f534da819923ceab8136c4066c98ae0a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.util;
import org.apache.reef.util.exception.InvalidIdentifierException;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Performs an asynchronous increment of an Integer.
*/
final class AsynchronousIncrementer implements Callable<Integer> {
private static final Logger LOG = Logger.getLogger(AsynchronousIncrementer.class.getName());
private final int sleepTimeMillis;
private final int input;
private final long identifier;
private final MultiAsyncToSync blocker;
/**
* Instantiate an incrementer with specific job parameters.
* @param input The input parameter for the work.
* @param identifier The identifier of the caller to wake on completion.
* @param sleepTimeMillis How long to work.
* @param blocker The MultiAsyncToSync object which is holding the blocked client.
*/
AsynchronousIncrementer(final int input, final long identifier,
final int sleepTimeMillis, final MultiAsyncToSync blocker) {
this.sleepTimeMillis = sleepTimeMillis;
this.input = input;
this.identifier = identifier;
this.blocker = blocker;
}
/**
* Sleep and then increment the input value by one.
* @return The input value of the operation incremented by one.
*/
@Override
public Integer call() throws InterruptedException, InvalidIdentifierException {
LOG.log(Level.INFO, "Sleeping...");
Thread.sleep(sleepTimeMillis);
LOG.log(Level.INFO, "Releasing caller on identifier [{0}]...", identifier);
blocker.release(identifier);
return input + 1;
}
}
/**
* Use the MultiAsyncToSync class to implement a synchronous API
* that uses asynchronous processing internally.
*/
final class SynchronousApi implements AutoCloseable {
private static final Logger LOG = Logger.getLogger(SynchronousApi.class.getName());
private final int incrementerSleepTimeMillis;
private final MultiAsyncToSync blocker;
private final ExecutorService executor;
private final ConcurrentLinkedQueue<FutureTask<Integer>> taskQueue = new ConcurrentLinkedQueue<>();
private final AtomicLong idCounter = new AtomicLong(0);
/**
* Parameterize the object as to length of processing time and call timeout.
* @param incrementerSleepTimeSeconds Length of time the incrementer sleeps before
* performing the increment and returning.
* @param timeoutPeriodSeconds The length of time before the call will timeout.
*/
SynchronousApi(final int incrementerSleepTimeSeconds,
final long timeoutPeriodSeconds, final int numberOfThreads) {
this.incrementerSleepTimeMillis = 1000 * incrementerSleepTimeSeconds;
this.blocker = new MultiAsyncToSync(timeoutPeriodSeconds, TimeUnit.SECONDS);
this.executor = Executors.newFixedThreadPool(numberOfThreads);
}
/**
* Initiates asynchronous processing inside the condition lock.
*/
private class AsyncInitiator implements Callable<Boolean> {
private final FutureTask<Integer> task;
private final ExecutorService executor;
AsyncInitiator(final FutureTask<Integer> task, final ExecutorService executor) {
this.task = task;
this.executor = executor;
}
@Override
public Boolean call() {
executor.execute(task);
return true;
}
}
/**
* Asynchronously increment the input parameter.
* @param input An integer object whose value is to be incremented by one.
* @return The input parameter incremented by one or zero for a timeout.
* @throws InterruptedException Thread was interrupted by another thread.
* @throws ExecutionException An exception was thrown an internal processing function.
* @throws InvalidIdentifierException The call identifier is invalid.
*/
public int apiCall(final Integer input) throws InterruptedException, InvalidIdentifierException, ExecutionException {
// Create a future to run the asynchronous processing.
final long identifier = idCounter.getAndIncrement();
final FutureTask<Integer> task =
new FutureTask<>(new AsynchronousIncrementer(input, identifier, incrementerSleepTimeMillis, blocker));
taskQueue.add(task);
LOG.log(Level.INFO, "Running the incrementer on identifier [{0}]...", identifier);
if (blocker.block(identifier, new FutureTask<>(new AsyncInitiator(task, executor)))) {
LOG.log(Level.INFO, "Call timed out...");
// Timeout occurred before the asynchronous processing completed.
return 0;
}
LOG.log(Level.INFO, "Call getting task result...");
return task.get();
}
/**
* Ensure all test tasks have completed.
*/
public void close() throws InterruptedException {
for (final FutureTask<Integer> task : taskQueue) {
try {
task.get();
} catch (final ExecutionException e) {
LOG.log(Level.INFO, "Caught exception waiting for completion...", e);
}
}
executor.shutdownNow();
}
}
/**
* Verify proper operation of the MultiAsyncToSync class.
*/
public final class MultiAsyncToSyncTest {
private static final Logger LOG = Logger.getLogger(MultiAsyncToSyncTest.class.getName());
/**
* Verify calculations successfully complete when no timeout occurs.
*/
@Test
public void testNoTimeout() throws InterruptedException, InvalidIdentifierException, ExecutionException {
LOG.log(Level.INFO, "Starting...");
// Parameters that do not force a timeout.
final int incrementerSleepTimeSeconds = 2;
final long timeoutPeriodSeconds = 4;
final int input = 1;
try (SynchronousApi apiObject = new SynchronousApi(incrementerSleepTimeSeconds, timeoutPeriodSeconds, 2)) {
final int result = apiObject.apiCall(input);
Assert.assertEquals("Value incremented by one", input + 1, result);
}
}
/**
* Verify an error is returned when a timeout occurs.
*/
@Test
public void testTimeout() throws InterruptedException, InvalidIdentifierException, ExecutionException {
LOG.log(Level.INFO, "Starting...");
// Parameters that force a timeout.
final int incrementerSleepTimeSeconds = 4;
final long timeoutPeriodSeconds = 2;
final int input = 1;
try (SynchronousApi apiObject = new SynchronousApi(incrementerSleepTimeSeconds, timeoutPeriodSeconds, 2)) {
final int result = apiObject.apiCall(input);
Assert.assertEquals("Timeout occurred", result, 0);
}
}
/**
* Verify no interaction occurs when multiple calls are in flight.
*/
@Test
public void testMultipleCalls()
throws InterruptedException, InvalidIdentifierException, ExecutionException, NoSuchMethodException {
LOG.log(Level.INFO, "Starting...");
// Parameters that do not force a timeout.
final int incrementerSleepTimeSeconds = 2;
final long timeoutPeriodSeconds = 4;
try (SynchronousApi apiObject = new SynchronousApi(incrementerSleepTimeSeconds, timeoutPeriodSeconds, 2)) {
final String function = "apiCall";
final int input = 1;
final FutureTask<Integer> task1 =
new FutureTask<>(new MethodCallable<Integer>(apiObject, function, input));
final FutureTask<Integer> task2
= new FutureTask<>(new MethodCallable<Integer>(apiObject, function, input + 1));
// Execute API calls concurrently.
final ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(task1);
executor.execute(task2);
final int result1 = task1.get();
final int result2 = task2.get();
Assert.assertEquals("Input must be incremented by one", input + 1, result1);
Assert.assertEquals("Input must be incremented by one", input + 2, result2);
executor.shutdownNow();
}
}
/**
* Verify no race conditions occurs when multiple calls are in flight.
*/
@Test
public void testRaceConditions()
throws InterruptedException, InvalidIdentifierException, ExecutionException, NoSuchMethodException {
LOG.log(Level.INFO, "Starting...");
// Parameters that do not force a timeout.
final int incrementerSleepTimeSeconds = 1;
final long timeoutPeriodSeconds = 10;
final String function = "apiCall";
final int nTasks = 100;
final FutureTask[] tasks = new FutureTask[nTasks];
final ExecutorService executor = Executors.newFixedThreadPool(10);
try (SynchronousApi apiObject = new SynchronousApi(incrementerSleepTimeSeconds, timeoutPeriodSeconds, 10)) {
for (int idx = 0; idx < nTasks; ++idx) {
tasks[idx] = new FutureTask<>(new MethodCallable<Integer>(apiObject, function, idx));
executor.execute(tasks[idx]);
}
for (int idx = 0; idx < nTasks; ++idx) {
final int result = (int)tasks[idx].get();
Assert.assertEquals("Input must be incremented by one", idx + 1, result);
}
}
executor.shutdownNow();
}
/**
* Verify calling block and release on same thread generates an exception.
*/
@Test(expected = ExecutionException.class)
public void testCallOnSameThread() throws InterruptedException, InvalidIdentifierException, ExecutionException {
LOG.log(Level.INFO, "Starting...");
final long identifier = 78;
final MultiAsyncToSync asyncToSync = new MultiAsyncToSync(2, TimeUnit.SECONDS);
final FutureTask<Object> syncProc = new FutureTask<>(new Callable<Object>() {
@Override
public Object call() throws InterruptedException, InvalidIdentifierException {
asyncToSync.release(identifier);
return null;
}
});
asyncToSync.block(identifier, syncProc);
syncProc.get(); // must throw ExecutionException
Assert.fail("syncProc.get() must throw");
}
}