blob: f2411cea96fe9ff66a1e972511c5606f728e24ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.openide.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import org.netbeans.junit.NbTestCase;
import org.netbeans.junit.RandomlyFails;
import org.openide.util.Cancellable;
import org.openide.util.Exceptions;
import org.openide.util.RequestProcessor;
/**
*
* @author Tim Boudreau
*/
public class RequestProcessor180386Test extends NbTestCase {
private static final Logger LOG = Logger.getLogger(RequestProcessor180386Test.class.getName());
public RequestProcessor180386Test(java.lang.String testName) {
super(testName);
}
public void testSubmit() throws Exception {
class C implements Callable<String> {
volatile boolean hasRun;
@Override
public String call() throws Exception {
String result = "Hello";
hasRun = true;
return result;
}
}
C c = new C();
Future<String> f = RequestProcessor.getDefault().submit(c);
assertEquals("Hello", f.get());
assertTrue(c.hasRun);
class R implements Runnable {
volatile boolean hasRun;
@Override
public void run() {
hasRun = true;
}
}
R r = new R();
f = RequestProcessor.getDefault().submit(r, "Goodbye");
assertEquals("Goodbye", f.get());
assertTrue(r.hasRun);
}
@RandomlyFails // NB-Core-Build #4352: notRun.empty
public void testSomeTasksNotRunIfShutDown() throws Exception {
final Object lock = new Object();
int count = 10;
final CountDownLatch waitAllLaunched = new CountDownLatch(count);
final CountDownLatch waitOneFinished = new CountDownLatch(1);
final RequestProcessor notificationThread = new RequestProcessor("notifier", 1, true, true);
RequestProcessor rp = new RequestProcessor("TestRP", count * 2);
class R implements Runnable {
volatile boolean hasStarted;
volatile boolean hasFinished;
@Override
public void run() {
hasStarted = true;
waitAllLaunched.countDown();
synchronized (lock) {
try {
lock.wait();
if (Thread.interrupted()) {
return;
}
} catch (InterruptedException ex) {
return;
} finally {
new N(waitOneFinished).launch();
}
hasFinished = true;
}
}
class N implements Runnable {
private final CountDownLatch l;
N (CountDownLatch l) {
this.l = l;
}
void launch() {
notificationThread.create(this).schedule(20);
}
@Override
public void run() {
l.countDown();
}
}
}
Set<Future<String>> s = new HashSet<Future<String>>();
Set<R> rs = new HashSet<R>();
for (int i = 0; i < count; i++) {
String currName = "Runnable " + i;
R r = new R();
rs.add(r);
s.add(rp.submit(r, currName));
}
waitAllLaunched.await();
synchronized (lock) {
//Notify just one thread
lock.notify();
}
waitOneFinished.await();
List<Runnable> notRun = rp.shutdownNow();
synchronized (lock) {
lock.notifyAll();
}
boolean allFinished = true;
int finishedCount = 0;
for (R r : rs) {
assertTrue(r.hasStarted);
allFinished &= r.hasFinished;
if (r.hasFinished) {
finishedCount++;
}
}
assertFalse("All tasks should not have completed", allFinished);
assertTrue("At least one task shall complete", finishedCount >= 1);
assertTrue(notRun.isEmpty());
assertTrue(rp.isShutdown());
//Technically not provable due to "spurious wakeups"
// assertEquals (1, finishedCount);
try {
RequestProcessor.getDefault().shutdown();
fail("Should not be able to shutdown() default RP");
} catch (Exception e) {
}
try {
RequestProcessor.getDefault().shutdownNow();
fail("Should not be able to shutdownNow() default RP");
} catch (Exception e) {
}
}
public void testAwaitTermination() throws Exception {
int count = 20;
final Object lock = new Object();
final CountDownLatch waitAllLaunched = new CountDownLatch(count);
final CountDownLatch waitAll = new CountDownLatch(count);
final RequestProcessor rp = new RequestProcessor("TestRP", count);
class R implements Runnable {
volatile boolean hasStarted;
volatile boolean hasFinished;
@Override
public void run() {
hasStarted = true;
waitAllLaunched.countDown();
synchronized (lock) {
try {
lock.wait();
if (Thread.interrupted()) {
return;
}
} catch (InterruptedException ex) {
return;
} finally {
hasFinished = true;
waitAll.countDown();
}
}
}
}
Set<Future<String>> s = new HashSet<Future<String>>();
Set<R> rs = new HashSet<R>();
for (int i = 0; i < count; i++) {
String currName = "Runnable " + i;
R r = new R();
rs.add(r);
s.add(rp.submit(r, currName));
}
waitAllLaunched.await();
synchronized (lock) {
//Notify just one thread
lock.notifyAll();
}
rp.shutdown();
boolean awaitTermination = rp.awaitTermination(1, TimeUnit.DAYS);
assertTrue(awaitTermination);
assertTrue(rp.isShutdown());
assertTrue(rp.isTerminated());
}
@RandomlyFails
public void testAwaitTerminationWaitsForNewlyAddedThreads() throws Exception {
final RequestProcessor rp = new RequestProcessor("testAwaitTerminationWaitsForNewlyAddedThreads", 50, false);
int count = 30;
final CountDownLatch waitLock = new CountDownLatch(1);
class R implements Runnable {
boolean done;
@Override
public void run() {
try {
waitLock.await();
} catch (InterruptedException ex) {
done = true;
} finally {
done = true;
}
}
}
Set<R> rs = new HashSet<R>();
for (int i= 0; i < count; i++) {
R r = new R();
rs.add(r);
rp.submit(r);
}
final CountDownLatch shutdownBegun = new CountDownLatch(1);
Runnable shutdowner = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
rp.shutdown();
shutdownBegun.countDown();
} catch (InterruptedException ex) {
Exceptions.printStackTrace(ex);
}
}
};
waitLock.countDown();
new Thread(shutdowner).start();
assertTrue(rp.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
Thread.sleep (600);
assertTrue (rp.isTerminated());
}
public void testInvokeAll() throws Exception {
int count = 20;
final CountDownLatch waitAll = new CountDownLatch(count);
final RequestProcessor notificationThread = new RequestProcessor("notifier", 1, true, true);
final RequestProcessor rp = new RequestProcessor("TestRP", count);
try {
class C implements Callable<String>, Runnable {
private final String result;
volatile boolean ran;
C(String result) {
this.result = result;
}
@Override
public String call() throws Exception {
ran = true;
//#182637 - the waiting thread can be notified before
//the Done flag on this runnable's future has been set,
//so ensure this thread's runnable has time to exit
//before we do the notification
notificationThread.create(this).schedule (20);
return result;
}
@Override
public void run() {
waitAll.countDown();
}
}
List<C> callables = new ArrayList<C>(count);
List<Future<String>> fs;
Set<String> names = new HashSet<String>(count);
for (int i = 0; i < count; i++) {
String name = "R" + i;
names.add(name);
C c = new C(name);
callables.add(c);
}
fs = rp.invokeAll(callables);
assertNotNull(fs);
waitAll.await();
assertEquals(0, waitAll.getCount());
for (Future<String> f : fs) {
assertTrue (f.isDone());
}
for (C c : callables) {
assertTrue (c.ran);
}
Set<String> s = new HashSet<String>(count);
for (Future<String> f : fs) {
s.add(f.get());
}
assertEquals(names, s);
} finally {
rp.stop();
}
}
public void testInvokeAllWithTimeout() throws Exception {
int count = 20;
final CountDownLatch blocker = new CountDownLatch(1);
final RequestProcessor rp = new RequestProcessor("TestRP", count);
try {
class C implements Callable<String> {
volatile boolean iAmSpecial;
private final String result;
volatile boolean ran;
C(String result) {
this.result = result;
}
@Override
public String call() throws Exception {
//Only one will be allowed to run, the rest
//will be cancelled
if (!iAmSpecial) {
blocker.await();
}
ran = true;
return result;
}
}
List<C> callables = new ArrayList<C>(count);
C special = new C("Special");
special.iAmSpecial = true;
callables.add(special);
List<Future<String>> fs;
Set<String> names = new HashSet<String>(count);
for (int i = 0; i < count; i++) {
String name = "R" + i;
names.add(name);
C c = new C(name);
callables.add(c);
}
fs = rp.invokeAll(callables, 1000, TimeUnit.MILLISECONDS);
assertNotNull(fs);
for (Future<String> f : fs) {
assertTrue (f.isDone());
}
for (C c : callables) {
if (c == special) {
assertTrue (c.ran);
} else {
assertFalse(c.ran);
}
}
} finally {
rp.stop();
}
}
public void testInvokeAllSingleThread() throws Exception {
int count = 20;
final CountDownLatch waitAll = new CountDownLatch(count);
final RequestProcessor rp = new RequestProcessor("TestRP", 1);
class C implements Callable<String> {
private final String result;
C(String result) {
this.result = result;
}
@Override
public String call() throws Exception {
waitAll.countDown();
return result;
}
}
List<C> l = new ArrayList<C>(count);
List<Future<String>> fs;
Set<String> names = new HashSet<String>(count);
for (int i = 0; i < count; i++) {
String name = "R" + i;
names.add(name);
C c = new C(name);
l.add(c);
}
fs = rp.invokeAll(l);
assertNotNull(fs);
Set<String> s = new HashSet<String>(count);
for (Future<String> f : fs) {
s.add(f.get());
}
assertEquals(names, s);
}
@RandomlyFails // NB-Core-Build #4165: res==null
public void testInvokeAny() throws Exception {
int count = 20;
final RequestProcessor rp = new RequestProcessor("TestRP", count + 1);
class C implements Callable<String> {
private final String result;
C(String result) {
this.result = result;
}
@Override
public String call() throws Exception {
if (Thread.interrupted()) {
return null;
}
return result;
}
}
List<C> l = new ArrayList<C>(count);
Set<String> names = new HashSet<String>(count);
for (int i = 0; i < count; i++) {
String name = "R" + i;
names.add(name);
C c = new C(name);
l.add(c);
}
String res = rp.invokeAny(l);
assertNotNull(res);
assertTrue(res.startsWith("R"));
}
public void testInvokeAnySingleThread() throws Exception {
int count = 1000;
final RequestProcessor rp = new RequestProcessor("TestRP", 20);
final CountDownLatch latch = new CountDownLatch(count);
final Set<Thread> ts = Collections.synchronizedSet(new HashSet<Thread>());
class C implements Callable<String> {
volatile boolean hasRun;
private final String name;
C(String name) {
this.name = name;
}
@Override
public String call() throws Exception {
latch.countDown();
if (!"R17".equals(name)) {
//Block all but one thread until threads have entered
Thread.currentThread().suspend();
}
hasRun = true;
return name;
}
}
List<C> l = new ArrayList<C>(count);
Set<String> names = new HashSet<String>(count);
for (int i = 0; i < count; i++) {
String name = "R" + i;
names.add(name);
C c = new C(name);
l.add(c);
}
String res = rp.invokeAny(l);
assertNotNull(res);
assertTrue(res.startsWith("R"));
int runCount = 0;
for (C c : l) {
if (c.hasRun) {
runCount++;
}
}
assertTrue("Not all " + count + " threads should have completed, but " + runCount + " did.", runCount < count);
for (Thread t : ts) {
t.resume();
}
}
public void testInvokeAnyWithTimeout() throws Exception {
int count = 20;
final RequestProcessor rp = new RequestProcessor("TestRP", count + 1);
final CountDownLatch latch = new CountDownLatch(1);
class C implements Callable<String> {
volatile boolean hasRun;
private final String result;
C(String result) {
this.result = result;
}
@Override
public String call() throws Exception {
latch.await();
if (Thread.interrupted()) {
return null;
}
hasRun = true;
return result;
}
}
List<C> l = new ArrayList<C>(count);
Set<String> names = new HashSet<String>(count);
for (int i = 0; i < count; i++) {
String name = "R" + i;
names.add(name);
C c = new C(name);
l.add(c);
}
//All threads are waiting on latch; we should time out
String res = rp.invokeAny(l, 400, TimeUnit.MILLISECONDS);
assertNull(res);
for (C c : l) {
assertFalse(c.hasRun);
}
}
public void testCancellation() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
class C implements Callable<String> {
volatile boolean hasRun;
volatile boolean interrupted;
@Override
public String call() throws Exception {
try {
latch.await();
} catch (InterruptedException e) {
interrupted = true;
return null;
}
if (Thread.interrupted()) {
interrupted = true;
return null;
}
hasRun = true;
return "Hello";
}
}
C c = new C();
Future<String> f = RequestProcessor.getDefault().submit(c);
f.cancel(true);
latch.countDown();
String s = null;
try {
s = f.get();
fail("CancellationException should have been thrown");
} catch (CancellationException e) {
}
assertNull(s);
assertTrue(c.interrupted || !c.hasRun);
assertFalse(c.hasRun);
}
public void testCancellablesGetCancelInvokedWithCallable() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch exit = new CountDownLatch(1);
class C implements Callable<String>, Cancellable {
volatile boolean hasRun;
volatile boolean interrupted;
volatile boolean cancelled;
@Override
public String call() throws Exception {
try {
try {
latch.await();
} catch (InterruptedException e) {
interrupted = true;
return null;
}
if (Thread.interrupted()) {
interrupted = true;
return null;
}
if (cancelled) {
return null;
}
hasRun = true;
return "Hello";
} finally {
exit.countDown();
}
}
@Override
public boolean cancel() {
cancelled = true;
exit.countDown();
return true;
}
}
C c = new C();
Future<String> f = RequestProcessor.getDefault().submit(c);
f.cancel(true);
assertTrue (c.cancelled);
latch.countDown();
exit.await();
String s = null;
try {
s = f.get();
fail ("Should have gotten cancellation exception");
} catch (CancellationException e) {
}
}
public void testCancellablesGetCancelInvokedWithRunnable() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch exit = new CountDownLatch(1);
class C implements Runnable, Cancellable {
volatile boolean hasRun;
volatile boolean interrupted;
volatile boolean cancelled;
@Override
public void run() {
try {
try {
latch.await();
} catch (InterruptedException e) {
interrupted = true;
return;
}
if (Thread.interrupted()) {
interrupted = true;
return;
}
if (cancelled) {
return;
}
hasRun = true;
} finally {
exit.countDown();
}
}
@Override
public boolean cancel() {
cancelled = true;
exit.countDown();
return true;
}
}
C c = new C();
Future<?> f = RequestProcessor.getDefault().submit(c);
f.cancel(true);
assertTrue (c.cancelled);
latch.countDown();
exit.await();
try {
f.get();
fail ("Should have gotten cancellation exception");
} catch (CancellationException e) {
}
assertFalse (c.hasRun);
}
public void testCancellablesThatSayTheyCantBeCancelledAreNotCancelledViaFutureDotCancel() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch exit = new CountDownLatch(1);
class C implements Runnable, Cancellable {
volatile boolean hasRun;
volatile boolean interrupted;
volatile boolean cancelCalled;
@Override
public void run() {
try {
try {
latch.await();
} catch (InterruptedException e) {
interrupted = true;
throw new AssertionError(e);
}
if (Thread.interrupted()) {
interrupted = true;
throw new AssertionError("Thread should not have been interrupted");
}
hasRun = true;
} finally {
exit.countDown();
}
}
@Override
public boolean cancel() {
cancelCalled = true;
return false;
}
}
C c = new C();
Future<?> f = RequestProcessor.getDefault().submit(c);
f.cancel(true);
assertFalse (f.isCancelled());
assertTrue (c.cancelCalled);
latch.countDown();
exit.await();
f.get();
assertFalse (f.isCancelled());
assertTrue (c.hasRun);
}
public void testInvokeAllCancellation() throws Exception {
int count = 20;
final CountDownLatch waitAll = new CountDownLatch(count);
final RequestProcessor rp = new RequestProcessor("TestRP", count);
class C implements Callable<String>, Cancellable {
private final String result;
volatile boolean cancelCalled;
C(String result) {
this.result = result;
}
@Override
public String call() throws Exception {
waitAll.countDown();
return cancelCalled ? null : result;
}
@Override
public boolean cancel() {
cancelCalled = true;
return false;
}
}
List<C> l = new ArrayList<C>(count);
List<Future<String>> fs;
Set<String> names = new HashSet<String>(count);
for (int i = 0; i < count; i++) {
String name = "R" + i;
names.add(name);
C c = new C(name);
l.add(c);
}
fs = rp.invokeAll(l);
assertNotNull(fs);
Set<String> s = new HashSet<String>(count);
for (Future<String> f : fs) {
s.add(f.get());
}
assertEquals(names, s);
}
public void testCannotScheduleLongerThanIntegerMaxValue() throws Exception {
Runnable r = new Runnable() {
@Override
public void run() {
fail ("Should not have been run");
}
};
try {
Future<?> f = RequestProcessor.getDefault().schedule(r, Long.MAX_VALUE, TimeUnit.DAYS);
f.cancel(true);
} catch (Exception e) {}
}
public void testCannotScheduleNegativeDelay() throws Exception {
Runnable r = new Runnable() {
@Override
public void run() {
fail ("Should not have been run");
}
};
try {
RequestProcessor.getDefault().schedule(r, -1L, TimeUnit.MILLISECONDS);
fail ("Negative value accepetd");
} catch (Exception e) {}
try {
RequestProcessor.getDefault().scheduleAtFixedRate(r, -1L, 22L, TimeUnit.MILLISECONDS);
fail ("Negative value accepetd");
} catch (Exception e) {}
try {
RequestProcessor.getDefault().scheduleAtFixedRate(r, 200, -22L, TimeUnit.MILLISECONDS);
fail ("Negative value accepetd");
} catch (Exception e) {}
try {
RequestProcessor.getDefault().scheduleWithFixedDelay(r, -1L, 22L, TimeUnit.MILLISECONDS);
fail ("Negative value accepetd");
} catch (Exception e) {}
try {
RequestProcessor.getDefault().scheduleWithFixedDelay(r, 1L, -22L, TimeUnit.MILLISECONDS);
fail ("Negative value accepetd");
} catch (Exception e) {}
}
public void testTaskCanRescheduleItself() throws Exception {
final CountDownLatch latch = new CountDownLatch(2);
class R implements Runnable {
volatile RequestProcessor.Task task;
volatile int runCount;
@Override
public void run() {
runCount++;
if (runCount == 1) {
task.schedule(0);
}
latch.countDown();
}
}
R r = new R();
RequestProcessor.Task t = RequestProcessor.getDefault().create(r);
r.task = t;
t.schedule(0);
latch.await ();
assertEquals (r.runCount, 2);
}
public void testScheduleRepeatingSanityFixedRate() throws Exception {
final CountDownLatch latch = new CountDownLatch(5);
class C implements Runnable {
volatile int runCount;
@Override
public void run() {
runCount++;
latch.countDown();
if (latch.getCount() <= 0) {
waitABitToGiveMainThreadChanceToRun();
}
}
private void waitABitToGiveMainThreadChanceToRun() {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
Exceptions.printStackTrace(ex);
}
}
}
C c = new C();
RequestProcessor.getDefault().scheduleWithFixedDelay(c, 0, 200, TimeUnit.MILLISECONDS);
// latch.await(5000, TimeUnit.MILLISECONDS);
latch.await();
assertAtLeast (5, c.runCount);
}
public void testScheduleRepeatingSanityFixedDelay() throws Exception {
final CountDownLatch latch = new CountDownLatch(5);
class C implements Runnable {
volatile int runCount;
@Override
public void run() {
runCount++;
latch.countDown();
waitABitToGiveMainThreadChanceToRun();
}
private void waitABitToGiveMainThreadChanceToRun() {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
Exceptions.printStackTrace(ex);
}
}
}
C c = new C();
RequestProcessor.getDefault().scheduleAtFixedRate(c, 0, 200, TimeUnit.MILLISECONDS);
latch.await(2000, TimeUnit.MILLISECONDS);
assertAtLeast (5, c.runCount);
}
public void testScheduleOneShot() throws Exception {
RequestProcessor rp = new RequestProcessor ("testScheduleOneShot", 5, true, true);
try {
class C implements Callable<String> {
volatile long start = System.currentTimeMillis();
private volatile long end;
@Override
public String call() throws Exception {
synchronized(this) {
end = System.currentTimeMillis();
}
return "Hello";
}
synchronized long elapsed() {
return end - start;
}
}
C c = new C();
long delay = 5000;
//Use a 20 second timeout to have a reasonable chance of accuracy
ScheduledFuture<String> f = rp.schedule(c, delay * 1000, TimeUnit.MICROSECONDS);
assertEquals (5000, f.getDelay(TimeUnit.MILLISECONDS));
assertNotNull(f.get());
//Allow 4 seconds fudge-factor
assertTrue (c.elapsed() > 4600);
assertTrue (f.isDone());
} finally {
rp.stop();
}
}
@RandomlyFails // NB-Core-Build #8322: hung
public void testScheduleRepeatingIntervalsAreRoughlyCorrect() throws Exception {
int runCount = 5;
final CountDownLatch latch = new CountDownLatch(runCount);
final List<Long> intervals = Collections.synchronizedList(new ArrayList<Long> (runCount));
// long initialDelay = 30000;
// long period = 20000;
// long fudgeFactor = 4000;
long initialDelay = 3000;
long period = 2000;
long fudgeFactor = 400;
long expectedInitialDelay = initialDelay - fudgeFactor;
long expectedPeriod = period - fudgeFactor;
class C implements Runnable {
volatile long start = System.currentTimeMillis();
private int runCount;
@Override
public void run() {
runCount++;
try {
synchronized(this) {
long end = System.currentTimeMillis();
intervals.add (end - start);
start = end;
}
} finally {
latch.countDown();
}
}
}
C c = new C();
RequestProcessor rp = new RequestProcessor ("testScheduleRepeating", 5, true);
try {
Future<?> f = rp.scheduleWithFixedDelay(c, initialDelay, period, TimeUnit.MILLISECONDS);
// latch.await(initialDelay + fudgeFactor + ((runCount - 1) * (period + fudgeFactor)), TimeUnit.MILLISECONDS); //XXX
latch.await();
f.cancel(true);
for (int i= 0; i < Math.min(runCount, intervals.size()); i++) {
long expect = i == 0 ? expectedInitialDelay : expectedPeriod;
assertTrue ("Expected at least " + expect + " milliseconds before run " + i + " but was " + intervals.get(i), intervals.get(i) >= expect);
}
//Ensure we have really exited
try {
f.get();
fail ("CancellationException should have been thrown");
} catch (CancellationException e) {}
assertTrue(f.isCancelled());
assertTrue(f.isDone());
} finally {
rp.stop();
}
}
@RandomlyFails
public void testScheduleFixedRateAreRoughlyCorrect() throws Exception {
if (!TaskTest.canWait1s()) {
LOG.warning("Skipping testWaitWithTimeOutReturnsAfterTimeOutWhenTheTaskIsNotComputedAtAll, as the computer is not able to wait 1s!");
return;
}
int runCount = 5;
final CountDownLatch latch = new CountDownLatch(runCount);
final List<Long> intervals = Collections.synchronizedList(new ArrayList<Long> (runCount));
// long initialDelay = 30000;
// long period = 20000;
// long fudgeFactor = 4000;
long initialDelay = 3000;
long period = 2000;
long fudgeFactor = 400;
long expectedInitialDelay = initialDelay - fudgeFactor;
long expectedPeriod = period - fudgeFactor;
class C implements Runnable {
volatile long start = System.currentTimeMillis();
private int runCount;
@Override
public void run() {
runCount++;
try {
synchronized(this) {
long end = System.currentTimeMillis();
intervals.add (end - start);
start = end;
}
} finally {
latch.countDown();
}
}
}
C c = new C();
RequestProcessor rp = new RequestProcessor ("testScheduleFixedRateAreRoughlyCorrect", 5, true);
try {
Future<?> f = rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS);
latch.await();
f.cancel(true);
StringBuilder failures = new StringBuilder();
failures.append("Expected at least ").append(expectedInitialDelay).
append(" milliseconds before run:\n");
boolean fail = false;
for (int i= 0; i < Math.min(runCount, intervals.size()); i++) {
long expect = i == 0 ? expectedInitialDelay : expectedPeriod;
failures.append("Round ").append(i).
append(" expected delay ").append(expect).
append(" but was ").append(intervals.get(i)).
append("\n");
if (intervals.get(i) < expect) {
fail = true;
}
}
if (fail) {
fail(failures.toString());
}
//Ensure we have really exited
try {
f.get();
fail ("CancellationException should have been thrown");
} catch (CancellationException e) {}
assertTrue(f.isCancelled());
assertTrue(f.isDone());
} finally {
rp.stop();
}
}
public void testScheduleFixedRateOnMultiThreadPoolDoesNotCauseConcurrentExecution() throws Exception {
final AtomicInteger val = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(10);
class C implements Runnable {
boolean failed;
@Override
public void run() {
try {
int now = val.incrementAndGet();
if (now > 1) {
failed = true;
fail (now + " threads simultaneously in run()");
}
try {
//Intentionally sleep *longer* than the interval
//between executions. We *want* to pile up all of the
//RP threads entering run() - synchronization should
//serialize them. This test is to prove that this
//method will never be called concurrently from two threads
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
} finally {
val.decrementAndGet();
latch.countDown();
}
}
}
C c = new C();
long initialDelay = 2000;
long period = 10;
RequestProcessor rp = new RequestProcessor("testScheduleFixedRateOnMultiThreadPoolDoesNotCauseConcurrentExecution", 10, true);
rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS);
latch.await();
assertFalse(c.failed);
rp.stop();
}
@RandomlyFails
public void testScheduleFixedRateWithShorterIntervalThanRunMethodTimeAreNotDelayed() throws Exception {
final CountDownLatch latch = new CountDownLatch(10);
final List<Long> intervals = new CopyOnWriteArrayList<Long>();
class C implements Runnable {
long start = Long.MIN_VALUE;
@Override
public void run() {
long end = System.currentTimeMillis();
if (start != Long.MIN_VALUE) {
intervals.add(end - start);
}
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
}
start = System.currentTimeMillis();
latch.countDown();
}
}
C c = new C();
long initialDelay = 100;
long period = 100;
RequestProcessor rp = new RequestProcessor("testScheduleFixedRateWithShorterIntervalThanRunMethodTimeAreNotDelayed", 10, true);
ScheduledFuture<?> f = rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS);
latch.await();
f.cancel(true);
rp.stop();
int max = intervals.size();
for (int i= 0; i < max; i++) {
long iv = intervals.get(i);
assertFalse ("Interval " + i + " should have been at least less than requested interval * 1.5 with fixed rate" + iv, iv > 150);
}
}
public void testCancelFutureInterruptsThreadEvenIfRequestProcessorForbidsIt() throws Exception {
RequestProcessor rp = new RequestProcessor ("X", 3, false, true);
final CountDownLatch releaseForRun = new CountDownLatch(1);
final CountDownLatch enterLatch = new CountDownLatch(1);
final CountDownLatch exitLatch = new CountDownLatch(1);
class R implements Runnable {
volatile boolean interrupted;
@Override
public void run() {
enterLatch.countDown();
try {
releaseForRun.await();
} catch (InterruptedException ex) {
interrupted = true;
}
interrupted |= Thread.interrupted();
exitLatch.countDown();
}
}
R r = new R();
Future<?> f = rp.submit(r);
enterLatch.await();
f.cancel(true);
assertTrue (f.isCancelled());
exitLatch.await();
assertTrue (r.interrupted);
}
public void testCancelDoesNotInterruptIfNotPassedToFutureDotCancel() throws Exception {
RequestProcessor rp = new RequestProcessor ("X", 3, false, true);
final CountDownLatch releaseForRun = new CountDownLatch(1);
final CountDownLatch enterLatch = new CountDownLatch(1);
final CountDownLatch exitLatch = new CountDownLatch(1);
class R implements Runnable {
volatile boolean interrupted;
@Override
public void run() {
enterLatch.countDown();
try {
releaseForRun.await();
} catch (InterruptedException ex) {
interrupted = true;
}
interrupted |= Thread.interrupted();
exitLatch.countDown();
}
}
R r = new R();
Future<?> f = rp.submit(r);
enterLatch.await();
f.cancel(false);
assertTrue (f.isCancelled());
assertFalse (r.interrupted);
}
public void testSubmittedTasksExecutedBeforeShutdown() throws InterruptedException {
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch executedLatch = new CountDownLatch(2);
Runnable dummyRunnable = new Runnable() {
@Override
public void run() {
try {
startLatch.await();
Thread.sleep(100);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} finally {
executedLatch.countDown();
}
}
};
RequestProcessor rp = new RequestProcessor("X", 1);
rp.submit(dummyRunnable);
rp.submit(dummyRunnable);
rp.shutdown();
startLatch.countDown();
assertTrue("Submitted tasks not executed", executedLatch.await(5, TimeUnit.SECONDS));
}
public void testExecutingTasksNotInterruptedOnShutdown() throws InterruptedException {
final CountDownLatch startLatch = new CountDownLatch(2);
final CountDownLatch blockingLatch = new CountDownLatch(1);
final CountDownLatch executedLatch = new CountDownLatch(1);
final AtomicBoolean interrupted = new AtomicBoolean(false);
Runnable dummyRunnable = new Runnable() {
@Override
public void run() {
try {
startLatch.countDown();
startLatch.await();
blockingLatch.await();
} catch (InterruptedException ex) {
interrupted.set(true);
Thread.currentThread().interrupt();
} finally {
executedLatch.countDown();
}
}
};
RequestProcessor rp = new RequestProcessor("X", 1);
rp.submit(dummyRunnable);
startLatch.countDown();
try {
startLatch.await();
} catch (InterruptedException ex) {
Exceptions.printStackTrace(ex);
}
rp.shutdown();
blockingLatch.countDown();
executedLatch.await();
assertFalse("Executing tasks interrupted", interrupted.get());
}
public void testAwaitingTasksReturnedOnShutdownNow() throws InterruptedException {
final CountDownLatch startupLatch = new CountDownLatch(2);
final CountDownLatch blockingLatch = new CountDownLatch(1);
Runnable blockingRunnable = new Runnable() {
@Override
public void run() {
try {
startupLatch.countDown();
startupLatch.await();
blockingLatch.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
};
Runnable awaitingRunnable = new Runnable() {
@Override
public void run() {
// noop
}
};
RequestProcessor rp = new RequestProcessor("X", 1);
rp.submit(blockingRunnable);
startupLatch.countDown();
startupLatch.await();
rp.submit(awaitingRunnable);
Set<Runnable> awaiting = new HashSet<Runnable>(rp.shutdownNow());
assertTrue("Awaiting task not returned on shutdownNow()", awaiting.contains(awaitingRunnable));
assertFalse("Running task returned on shutdownNow()", awaiting.contains(blockingRunnable));
}
private static void assertAtLeast(int exp, int real) {
if (exp > real) {
fail("Expecting at least " + exp + " but was only " + real);
}
}
}