| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.openide.util; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.logging.Logger; |
| import org.netbeans.junit.NbTestCase; |
| import org.netbeans.junit.RandomlyFails; |
| import org.openide.util.Cancellable; |
| import org.openide.util.Exceptions; |
| import org.openide.util.RequestProcessor; |
| |
| /** |
| * |
| * @author Tim Boudreau |
| */ |
| public class RequestProcessor180386Test extends NbTestCase { |
| private static final Logger LOG = Logger.getLogger(RequestProcessor180386Test.class.getName()); |
| |
| public RequestProcessor180386Test(java.lang.String testName) { |
| super(testName); |
| } |
| |
| public void testSubmit() throws Exception { |
| class C implements Callable<String> { |
| |
| volatile boolean hasRun; |
| |
| @Override |
| public String call() throws Exception { |
| String result = "Hello"; |
| hasRun = true; |
| return result; |
| } |
| } |
| C c = new C(); |
| Future<String> f = RequestProcessor.getDefault().submit(c); |
| assertEquals("Hello", f.get()); |
| assertTrue(c.hasRun); |
| |
| class R implements Runnable { |
| |
| volatile boolean hasRun; |
| |
| @Override |
| public void run() { |
| hasRun = true; |
| } |
| } |
| R r = new R(); |
| f = RequestProcessor.getDefault().submit(r, "Goodbye"); |
| assertEquals("Goodbye", f.get()); |
| assertTrue(r.hasRun); |
| } |
| |
| @RandomlyFails // NB-Core-Build #4352: notRun.empty |
| public void testSomeTasksNotRunIfShutDown() throws Exception { |
| final Object lock = new Object(); |
| int count = 10; |
| final CountDownLatch waitAllLaunched = new CountDownLatch(count); |
| final CountDownLatch waitOneFinished = new CountDownLatch(1); |
| final RequestProcessor notificationThread = new RequestProcessor("notifier", 1, true, true); |
| |
| RequestProcessor rp = new RequestProcessor("TestRP", count * 2); |
| class R implements Runnable { |
| |
| volatile boolean hasStarted; |
| volatile boolean hasFinished; |
| |
| @Override |
| public void run() { |
| hasStarted = true; |
| waitAllLaunched.countDown(); |
| synchronized (lock) { |
| try { |
| lock.wait(); |
| if (Thread.interrupted()) { |
| return; |
| } |
| } catch (InterruptedException ex) { |
| return; |
| } finally { |
| new N(waitOneFinished).launch(); |
| } |
| hasFinished = true; |
| } |
| } |
| |
| class N implements Runnable { |
| private final CountDownLatch l; |
| N (CountDownLatch l) { |
| this.l = l; |
| } |
| |
| void launch() { |
| notificationThread.create(this).schedule(20); |
| } |
| |
| @Override |
| public void run() { |
| l.countDown(); |
| } |
| |
| } |
| } |
| Set<Future<String>> s = new HashSet<Future<String>>(); |
| Set<R> rs = new HashSet<R>(); |
| for (int i = 0; i < count; i++) { |
| String currName = "Runnable " + i; |
| R r = new R(); |
| rs.add(r); |
| s.add(rp.submit(r, currName)); |
| } |
| waitAllLaunched.await(); |
| synchronized (lock) { |
| //Notify just one thread |
| lock.notify(); |
| } |
| waitOneFinished.await(); |
| List<Runnable> notRun = rp.shutdownNow(); |
| synchronized (lock) { |
| lock.notifyAll(); |
| } |
| boolean allFinished = true; |
| int finishedCount = 0; |
| for (R r : rs) { |
| assertTrue(r.hasStarted); |
| allFinished &= r.hasFinished; |
| if (r.hasFinished) { |
| finishedCount++; |
| } |
| } |
| assertFalse("All tasks should not have completed", allFinished); |
| assertTrue("At least one task shall complete", finishedCount >= 1); |
| assertTrue(notRun.isEmpty()); |
| assertTrue(rp.isShutdown()); |
| //Technically not provable due to "spurious wakeups" |
| // assertEquals (1, finishedCount); |
| |
| try { |
| RequestProcessor.getDefault().shutdown(); |
| fail("Should not be able to shutdown() default RP"); |
| } catch (Exception e) { |
| } |
| try { |
| RequestProcessor.getDefault().shutdownNow(); |
| fail("Should not be able to shutdownNow() default RP"); |
| } catch (Exception e) { |
| } |
| } |
| |
| public void testAwaitTermination() throws Exception { |
| int count = 20; |
| final Object lock = new Object(); |
| final CountDownLatch waitAllLaunched = new CountDownLatch(count); |
| final CountDownLatch waitAll = new CountDownLatch(count); |
| final RequestProcessor rp = new RequestProcessor("TestRP", count); |
| class R implements Runnable { |
| |
| volatile boolean hasStarted; |
| volatile boolean hasFinished; |
| |
| @Override |
| public void run() { |
| hasStarted = true; |
| waitAllLaunched.countDown(); |
| synchronized (lock) { |
| try { |
| lock.wait(); |
| if (Thread.interrupted()) { |
| return; |
| } |
| } catch (InterruptedException ex) { |
| return; |
| } finally { |
| hasFinished = true; |
| waitAll.countDown(); |
| } |
| } |
| } |
| } |
| Set<Future<String>> s = new HashSet<Future<String>>(); |
| Set<R> rs = new HashSet<R>(); |
| for (int i = 0; i < count; i++) { |
| String currName = "Runnable " + i; |
| R r = new R(); |
| rs.add(r); |
| s.add(rp.submit(r, currName)); |
| } |
| waitAllLaunched.await(); |
| synchronized (lock) { |
| //Notify just one thread |
| lock.notifyAll(); |
| } |
| rp.shutdown(); |
| boolean awaitTermination = rp.awaitTermination(1, TimeUnit.DAYS); |
| assertTrue(awaitTermination); |
| assertTrue(rp.isShutdown()); |
| assertTrue(rp.isTerminated()); |
| } |
| |
| @RandomlyFails |
| public void testAwaitTerminationWaitsForNewlyAddedThreads() throws Exception { |
| final RequestProcessor rp = new RequestProcessor("testAwaitTerminationWaitsForNewlyAddedThreads", 50, false); |
| int count = 30; |
| final CountDownLatch waitLock = new CountDownLatch(1); |
| class R implements Runnable { |
| boolean done; |
| @Override |
| public void run() { |
| try { |
| waitLock.await(); |
| } catch (InterruptedException ex) { |
| done = true; |
| } finally { |
| done = true; |
| } |
| } |
| } |
| Set<R> rs = new HashSet<R>(); |
| for (int i= 0; i < count; i++) { |
| R r = new R(); |
| rs.add(r); |
| rp.submit(r); |
| } |
| final CountDownLatch shutdownBegun = new CountDownLatch(1); |
| Runnable shutdowner = new Runnable() { |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(1000); |
| rp.shutdown(); |
| shutdownBegun.countDown(); |
| } catch (InterruptedException ex) { |
| Exceptions.printStackTrace(ex); |
| } |
| } |
| }; |
| waitLock.countDown(); |
| new Thread(shutdowner).start(); |
| assertTrue(rp.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS)); |
| Thread.sleep (600); |
| assertTrue (rp.isTerminated()); |
| } |
| |
| public void testInvokeAll() throws Exception { |
| int count = 20; |
| final CountDownLatch waitAll = new CountDownLatch(count); |
| final RequestProcessor notificationThread = new RequestProcessor("notifier", 1, true, true); |
| final RequestProcessor rp = new RequestProcessor("TestRP", count); |
| try { |
| class C implements Callable<String>, Runnable { |
| |
| private final String result; |
| volatile boolean ran; |
| |
| C(String result) { |
| this.result = result; |
| } |
| |
| @Override |
| public String call() throws Exception { |
| ran = true; |
| //#182637 - the waiting thread can be notified before |
| //the Done flag on this runnable's future has been set, |
| //so ensure this thread's runnable has time to exit |
| //before we do the notification |
| notificationThread.create(this).schedule (20); |
| return result; |
| } |
| |
| @Override |
| public void run() { |
| waitAll.countDown(); |
| } |
| } |
| List<C> callables = new ArrayList<C>(count); |
| List<Future<String>> fs; |
| Set<String> names = new HashSet<String>(count); |
| for (int i = 0; i < count; i++) { |
| String name = "R" + i; |
| names.add(name); |
| C c = new C(name); |
| callables.add(c); |
| } |
| fs = rp.invokeAll(callables); |
| |
| assertNotNull(fs); |
| waitAll.await(); |
| assertEquals(0, waitAll.getCount()); |
| for (Future<String> f : fs) { |
| assertTrue (f.isDone()); |
| } |
| for (C c : callables) { |
| assertTrue (c.ran); |
| } |
| Set<String> s = new HashSet<String>(count); |
| for (Future<String> f : fs) { |
| s.add(f.get()); |
| } |
| assertEquals(names, s); |
| } finally { |
| rp.stop(); |
| } |
| } |
| |
| public void testInvokeAllWithTimeout() throws Exception { |
| int count = 20; |
| final CountDownLatch blocker = new CountDownLatch(1); |
| final RequestProcessor rp = new RequestProcessor("TestRP", count); |
| try { |
| class C implements Callable<String> { |
| volatile boolean iAmSpecial; |
| |
| private final String result; |
| volatile boolean ran; |
| |
| C(String result) { |
| this.result = result; |
| } |
| |
| @Override |
| public String call() throws Exception { |
| //Only one will be allowed to run, the rest |
| //will be cancelled |
| if (!iAmSpecial) { |
| blocker.await(); |
| } |
| ran = true; |
| return result; |
| } |
| } |
| List<C> callables = new ArrayList<C>(count); |
| C special = new C("Special"); |
| special.iAmSpecial = true; |
| callables.add(special); |
| List<Future<String>> fs; |
| Set<String> names = new HashSet<String>(count); |
| for (int i = 0; i < count; i++) { |
| String name = "R" + i; |
| names.add(name); |
| C c = new C(name); |
| callables.add(c); |
| } |
| fs = rp.invokeAll(callables, 1000, TimeUnit.MILLISECONDS); |
| assertNotNull(fs); |
| for (Future<String> f : fs) { |
| assertTrue (f.isDone()); |
| } |
| for (C c : callables) { |
| if (c == special) { |
| assertTrue (c.ran); |
| } else { |
| assertFalse(c.ran); |
| } |
| } |
| } finally { |
| rp.stop(); |
| } |
| } |
| |
| public void testInvokeAllSingleThread() throws Exception { |
| int count = 20; |
| final CountDownLatch waitAll = new CountDownLatch(count); |
| final RequestProcessor rp = new RequestProcessor("TestRP", 1); |
| class C implements Callable<String> { |
| |
| private final String result; |
| |
| C(String result) { |
| this.result = result; |
| } |
| |
| @Override |
| public String call() throws Exception { |
| waitAll.countDown(); |
| return result; |
| } |
| } |
| List<C> l = new ArrayList<C>(count); |
| List<Future<String>> fs; |
| Set<String> names = new HashSet<String>(count); |
| for (int i = 0; i < count; i++) { |
| String name = "R" + i; |
| names.add(name); |
| C c = new C(name); |
| l.add(c); |
| } |
| fs = rp.invokeAll(l); |
| assertNotNull(fs); |
| Set<String> s = new HashSet<String>(count); |
| for (Future<String> f : fs) { |
| s.add(f.get()); |
| } |
| assertEquals(names, s); |
| } |
| |
| @RandomlyFails // NB-Core-Build #4165: res==null |
| public void testInvokeAny() throws Exception { |
| int count = 20; |
| final RequestProcessor rp = new RequestProcessor("TestRP", count + 1); |
| class C implements Callable<String> { |
| |
| private final String result; |
| |
| C(String result) { |
| this.result = result; |
| } |
| |
| @Override |
| public String call() throws Exception { |
| if (Thread.interrupted()) { |
| return null; |
| } |
| return result; |
| } |
| } |
| List<C> l = new ArrayList<C>(count); |
| Set<String> names = new HashSet<String>(count); |
| for (int i = 0; i < count; i++) { |
| String name = "R" + i; |
| names.add(name); |
| C c = new C(name); |
| l.add(c); |
| } |
| String res = rp.invokeAny(l); |
| assertNotNull(res); |
| assertTrue(res.startsWith("R")); |
| } |
| |
| public void testInvokeAnySingleThread() throws Exception { |
| int count = 1000; |
| final RequestProcessor rp = new RequestProcessor("TestRP", 20); |
| final CountDownLatch latch = new CountDownLatch(count); |
| final Set<Thread> ts = Collections.synchronizedSet(new HashSet<Thread>()); |
| class C implements Callable<String> { |
| |
| volatile boolean hasRun; |
| private final String name; |
| |
| C(String name) { |
| this.name = name; |
| } |
| |
| @Override |
| public String call() throws Exception { |
| latch.countDown(); |
| if (!"R17".equals(name)) { |
| //Block all but one thread until threads have entered |
| Thread.currentThread().suspend(); |
| } |
| hasRun = true; |
| return name; |
| } |
| } |
| List<C> l = new ArrayList<C>(count); |
| Set<String> names = new HashSet<String>(count); |
| for (int i = 0; i < count; i++) { |
| String name = "R" + i; |
| names.add(name); |
| C c = new C(name); |
| l.add(c); |
| } |
| String res = rp.invokeAny(l); |
| assertNotNull(res); |
| assertTrue(res.startsWith("R")); |
| int runCount = 0; |
| for (C c : l) { |
| if (c.hasRun) { |
| runCount++; |
| } |
| } |
| assertTrue("Not all " + count + " threads should have completed, but " + runCount + " did.", runCount < count); |
| for (Thread t : ts) { |
| t.resume(); |
| } |
| } |
| |
| public void testInvokeAnyWithTimeout() throws Exception { |
| int count = 20; |
| final RequestProcessor rp = new RequestProcessor("TestRP", count + 1); |
| final CountDownLatch latch = new CountDownLatch(1); |
| class C implements Callable<String> { |
| |
| volatile boolean hasRun; |
| private final String result; |
| |
| C(String result) { |
| this.result = result; |
| } |
| |
| @Override |
| public String call() throws Exception { |
| latch.await(); |
| if (Thread.interrupted()) { |
| return null; |
| } |
| hasRun = true; |
| return result; |
| } |
| } |
| List<C> l = new ArrayList<C>(count); |
| Set<String> names = new HashSet<String>(count); |
| for (int i = 0; i < count; i++) { |
| String name = "R" + i; |
| names.add(name); |
| C c = new C(name); |
| l.add(c); |
| } |
| //All threads are waiting on latch; we should time out |
| String res = rp.invokeAny(l, 400, TimeUnit.MILLISECONDS); |
| assertNull(res); |
| for (C c : l) { |
| assertFalse(c.hasRun); |
| } |
| } |
| |
| public void testCancellation() throws Exception { |
| final CountDownLatch latch = new CountDownLatch(1); |
| class C implements Callable<String> { |
| |
| volatile boolean hasRun; |
| volatile boolean interrupted; |
| |
| @Override |
| public String call() throws Exception { |
| try { |
| latch.await(); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| return null; |
| } |
| if (Thread.interrupted()) { |
| interrupted = true; |
| return null; |
| } |
| hasRun = true; |
| return "Hello"; |
| } |
| } |
| C c = new C(); |
| Future<String> f = RequestProcessor.getDefault().submit(c); |
| f.cancel(true); |
| latch.countDown(); |
| String s = null; |
| try { |
| s = f.get(); |
| fail("CancellationException should have been thrown"); |
| } catch (CancellationException e) { |
| } |
| assertNull(s); |
| assertTrue(c.interrupted || !c.hasRun); |
| assertFalse(c.hasRun); |
| } |
| |
| public void testCancellablesGetCancelInvokedWithCallable() throws Exception { |
| final CountDownLatch latch = new CountDownLatch(1); |
| final CountDownLatch exit = new CountDownLatch(1); |
| class C implements Callable<String>, Cancellable { |
| |
| volatile boolean hasRun; |
| volatile boolean interrupted; |
| volatile boolean cancelled; |
| |
| @Override |
| public String call() throws Exception { |
| try { |
| try { |
| latch.await(); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| return null; |
| } |
| if (Thread.interrupted()) { |
| interrupted = true; |
| return null; |
| } |
| if (cancelled) { |
| return null; |
| } |
| hasRun = true; |
| return "Hello"; |
| } finally { |
| exit.countDown(); |
| } |
| } |
| |
| @Override |
| public boolean cancel() { |
| cancelled = true; |
| exit.countDown(); |
| return true; |
| } |
| } |
| C c = new C(); |
| Future<String> f = RequestProcessor.getDefault().submit(c); |
| f.cancel(true); |
| assertTrue (c.cancelled); |
| latch.countDown(); |
| exit.await(); |
| String s = null; |
| try { |
| s = f.get(); |
| fail ("Should have gotten cancellation exception"); |
| } catch (CancellationException e) { |
| |
| } |
| } |
| |
| public void testCancellablesGetCancelInvokedWithRunnable() throws Exception { |
| final CountDownLatch latch = new CountDownLatch(1); |
| final CountDownLatch exit = new CountDownLatch(1); |
| class C implements Runnable, Cancellable { |
| |
| volatile boolean hasRun; |
| volatile boolean interrupted; |
| volatile boolean cancelled; |
| |
| @Override |
| public void run() { |
| try { |
| try { |
| latch.await(); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| return; |
| } |
| if (Thread.interrupted()) { |
| interrupted = true; |
| return; |
| } |
| if (cancelled) { |
| return; |
| } |
| hasRun = true; |
| } finally { |
| exit.countDown(); |
| } |
| } |
| |
| @Override |
| public boolean cancel() { |
| cancelled = true; |
| exit.countDown(); |
| return true; |
| } |
| } |
| C c = new C(); |
| Future<?> f = RequestProcessor.getDefault().submit(c); |
| f.cancel(true); |
| assertTrue (c.cancelled); |
| latch.countDown(); |
| exit.await(); |
| try { |
| f.get(); |
| fail ("Should have gotten cancellation exception"); |
| } catch (CancellationException e) { |
| |
| } |
| assertFalse (c.hasRun); |
| } |
| |
| public void testCancellablesThatSayTheyCantBeCancelledAreNotCancelledViaFutureDotCancel() throws Exception { |
| final CountDownLatch latch = new CountDownLatch(1); |
| final CountDownLatch exit = new CountDownLatch(1); |
| class C implements Runnable, Cancellable { |
| |
| volatile boolean hasRun; |
| volatile boolean interrupted; |
| volatile boolean cancelCalled; |
| |
| @Override |
| public void run() { |
| try { |
| try { |
| latch.await(); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| throw new AssertionError(e); |
| } |
| if (Thread.interrupted()) { |
| interrupted = true; |
| throw new AssertionError("Thread should not have been interrupted"); |
| } |
| hasRun = true; |
| } finally { |
| exit.countDown(); |
| } |
| } |
| |
| @Override |
| public boolean cancel() { |
| cancelCalled = true; |
| return false; |
| } |
| } |
| C c = new C(); |
| Future<?> f = RequestProcessor.getDefault().submit(c); |
| f.cancel(true); |
| assertFalse (f.isCancelled()); |
| assertTrue (c.cancelCalled); |
| latch.countDown(); |
| exit.await(); |
| f.get(); |
| assertFalse (f.isCancelled()); |
| assertTrue (c.hasRun); |
| } |
| |
| public void testInvokeAllCancellation() throws Exception { |
| int count = 20; |
| final CountDownLatch waitAll = new CountDownLatch(count); |
| final RequestProcessor rp = new RequestProcessor("TestRP", count); |
| class C implements Callable<String>, Cancellable { |
| |
| private final String result; |
| volatile boolean cancelCalled; |
| |
| C(String result) { |
| this.result = result; |
| } |
| |
| @Override |
| public String call() throws Exception { |
| waitAll.countDown(); |
| return cancelCalled ? null : result; |
| } |
| |
| @Override |
| public boolean cancel() { |
| cancelCalled = true; |
| return false; |
| } |
| } |
| List<C> l = new ArrayList<C>(count); |
| List<Future<String>> fs; |
| Set<String> names = new HashSet<String>(count); |
| for (int i = 0; i < count; i++) { |
| String name = "R" + i; |
| names.add(name); |
| C c = new C(name); |
| l.add(c); |
| } |
| fs = rp.invokeAll(l); |
| assertNotNull(fs); |
| Set<String> s = new HashSet<String>(count); |
| for (Future<String> f : fs) { |
| s.add(f.get()); |
| } |
| assertEquals(names, s); |
| } |
| |
| public void testCannotScheduleLongerThanIntegerMaxValue() throws Exception { |
| Runnable r = new Runnable() { |
| |
| @Override |
| public void run() { |
| fail ("Should not have been run"); |
| } |
| }; |
| try { |
| Future<?> f = RequestProcessor.getDefault().schedule(r, Long.MAX_VALUE, TimeUnit.DAYS); |
| f.cancel(true); |
| } catch (Exception e) {} |
| } |
| |
| public void testCannotScheduleNegativeDelay() throws Exception { |
| Runnable r = new Runnable() { |
| |
| @Override |
| public void run() { |
| fail ("Should not have been run"); |
| } |
| }; |
| try { |
| RequestProcessor.getDefault().schedule(r, -1L, TimeUnit.MILLISECONDS); |
| fail ("Negative value accepetd"); |
| } catch (Exception e) {} |
| try { |
| RequestProcessor.getDefault().scheduleAtFixedRate(r, -1L, 22L, TimeUnit.MILLISECONDS); |
| fail ("Negative value accepetd"); |
| } catch (Exception e) {} |
| try { |
| RequestProcessor.getDefault().scheduleAtFixedRate(r, 200, -22L, TimeUnit.MILLISECONDS); |
| fail ("Negative value accepetd"); |
| } catch (Exception e) {} |
| try { |
| RequestProcessor.getDefault().scheduleWithFixedDelay(r, -1L, 22L, TimeUnit.MILLISECONDS); |
| fail ("Negative value accepetd"); |
| } catch (Exception e) {} |
| try { |
| RequestProcessor.getDefault().scheduleWithFixedDelay(r, 1L, -22L, TimeUnit.MILLISECONDS); |
| fail ("Negative value accepetd"); |
| } catch (Exception e) {} |
| } |
| |
| public void testTaskCanRescheduleItself() throws Exception { |
| final CountDownLatch latch = new CountDownLatch(2); |
| class R implements Runnable { |
| volatile RequestProcessor.Task task; |
| volatile int runCount; |
| @Override |
| public void run() { |
| runCount++; |
| if (runCount == 1) { |
| task.schedule(0); |
| } |
| latch.countDown(); |
| } |
| } |
| R r = new R(); |
| RequestProcessor.Task t = RequestProcessor.getDefault().create(r); |
| r.task = t; |
| t.schedule(0); |
| latch.await (); |
| assertEquals (r.runCount, 2); |
| } |
| |
| public void testScheduleRepeatingSanityFixedRate() throws Exception { |
| final CountDownLatch latch = new CountDownLatch(5); |
| class C implements Runnable { |
| volatile int runCount; |
| @Override |
| public void run() { |
| runCount++; |
| latch.countDown(); |
| if (latch.getCount() <= 0) { |
| waitABitToGiveMainThreadChanceToRun(); |
| } |
| } |
| private void waitABitToGiveMainThreadChanceToRun() { |
| try { |
| Thread.sleep(50); |
| } catch (InterruptedException ex) { |
| Exceptions.printStackTrace(ex); |
| } |
| } |
| } |
| C c = new C(); |
| RequestProcessor.getDefault().scheduleWithFixedDelay(c, 0, 200, TimeUnit.MILLISECONDS); |
| // latch.await(5000, TimeUnit.MILLISECONDS); |
| latch.await(); |
| assertAtLeast (5, c.runCount); |
| } |
| |
| public void testScheduleRepeatingSanityFixedDelay() throws Exception { |
| final CountDownLatch latch = new CountDownLatch(5); |
| class C implements Runnable { |
| volatile int runCount; |
| @Override |
| public void run() { |
| runCount++; |
| latch.countDown(); |
| waitABitToGiveMainThreadChanceToRun(); |
| } |
| |
| private void waitABitToGiveMainThreadChanceToRun() { |
| try { |
| Thread.sleep(50); |
| } catch (InterruptedException ex) { |
| Exceptions.printStackTrace(ex); |
| } |
| } |
| } |
| C c = new C(); |
| RequestProcessor.getDefault().scheduleAtFixedRate(c, 0, 200, TimeUnit.MILLISECONDS); |
| latch.await(2000, TimeUnit.MILLISECONDS); |
| |
| assertAtLeast (5, c.runCount); |
| } |
| |
| public void testScheduleOneShot() throws Exception { |
| RequestProcessor rp = new RequestProcessor ("testScheduleOneShot", 5, true, true); |
| try { |
| class C implements Callable<String> { |
| volatile long start = System.currentTimeMillis(); |
| private volatile long end; |
| |
| @Override |
| public String call() throws Exception { |
| synchronized(this) { |
| end = System.currentTimeMillis(); |
| } |
| return "Hello"; |
| } |
| |
| synchronized long elapsed() { |
| return end - start; |
| } |
| } |
| C c = new C(); |
| long delay = 5000; |
| //Use a 20 second timeout to have a reasonable chance of accuracy |
| ScheduledFuture<String> f = rp.schedule(c, delay * 1000, TimeUnit.MICROSECONDS); |
| assertEquals (5000, f.getDelay(TimeUnit.MILLISECONDS)); |
| assertNotNull(f.get()); |
| //Allow 4 seconds fudge-factor |
| assertTrue (c.elapsed() > 4600); |
| assertTrue (f.isDone()); |
| } finally { |
| rp.stop(); |
| } |
| } |
| |
| @RandomlyFails // NB-Core-Build #8322: hung |
| public void testScheduleRepeatingIntervalsAreRoughlyCorrect() throws Exception { |
| int runCount = 5; |
| final CountDownLatch latch = new CountDownLatch(runCount); |
| final List<Long> intervals = Collections.synchronizedList(new ArrayList<Long> (runCount)); |
| // long initialDelay = 30000; |
| // long period = 20000; |
| // long fudgeFactor = 4000; |
| long initialDelay = 3000; |
| long period = 2000; |
| long fudgeFactor = 400; |
| long expectedInitialDelay = initialDelay - fudgeFactor; |
| long expectedPeriod = period - fudgeFactor; |
| class C implements Runnable { |
| volatile long start = System.currentTimeMillis(); |
| private int runCount; |
| @Override |
| public void run() { |
| runCount++; |
| try { |
| synchronized(this) { |
| long end = System.currentTimeMillis(); |
| intervals.add (end - start); |
| start = end; |
| } |
| } finally { |
| latch.countDown(); |
| } |
| } |
| } |
| C c = new C(); |
| RequestProcessor rp = new RequestProcessor ("testScheduleRepeating", 5, true); |
| try { |
| Future<?> f = rp.scheduleWithFixedDelay(c, initialDelay, period, TimeUnit.MILLISECONDS); |
| // latch.await(initialDelay + fudgeFactor + ((runCount - 1) * (period + fudgeFactor)), TimeUnit.MILLISECONDS); //XXX |
| latch.await(); |
| f.cancel(true); |
| for (int i= 0; i < Math.min(runCount, intervals.size()); i++) { |
| long expect = i == 0 ? expectedInitialDelay : expectedPeriod; |
| assertTrue ("Expected at least " + expect + " milliseconds before run " + i + " but was " + intervals.get(i), intervals.get(i) >= expect); |
| } |
| //Ensure we have really exited |
| try { |
| f.get(); |
| fail ("CancellationException should have been thrown"); |
| } catch (CancellationException e) {} |
| assertTrue(f.isCancelled()); |
| assertTrue(f.isDone()); |
| } finally { |
| rp.stop(); |
| } |
| } |
| |
| @RandomlyFails |
| public void testScheduleFixedRateAreRoughlyCorrect() throws Exception { |
| if (!TaskTest.canWait1s()) { |
| LOG.warning("Skipping testWaitWithTimeOutReturnsAfterTimeOutWhenTheTaskIsNotComputedAtAll, as the computer is not able to wait 1s!"); |
| return; |
| } |
| int runCount = 5; |
| final CountDownLatch latch = new CountDownLatch(runCount); |
| final List<Long> intervals = Collections.synchronizedList(new ArrayList<Long> (runCount)); |
| // long initialDelay = 30000; |
| // long period = 20000; |
| // long fudgeFactor = 4000; |
| long initialDelay = 3000; |
| long period = 2000; |
| long fudgeFactor = 400; |
| long expectedInitialDelay = initialDelay - fudgeFactor; |
| long expectedPeriod = period - fudgeFactor; |
| class C implements Runnable { |
| volatile long start = System.currentTimeMillis(); |
| private int runCount; |
| @Override |
| public void run() { |
| runCount++; |
| try { |
| synchronized(this) { |
| long end = System.currentTimeMillis(); |
| intervals.add (end - start); |
| start = end; |
| } |
| } finally { |
| latch.countDown(); |
| } |
| } |
| } |
| C c = new C(); |
| RequestProcessor rp = new RequestProcessor ("testScheduleFixedRateAreRoughlyCorrect", 5, true); |
| try { |
| Future<?> f = rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS); |
| latch.await(); |
| f.cancel(true); |
| StringBuilder failures = new StringBuilder(); |
| failures.append("Expected at least ").append(expectedInitialDelay). |
| append(" milliseconds before run:\n"); |
| boolean fail = false; |
| for (int i= 0; i < Math.min(runCount, intervals.size()); i++) { |
| long expect = i == 0 ? expectedInitialDelay : expectedPeriod; |
| failures.append("Round ").append(i). |
| append(" expected delay ").append(expect). |
| append(" but was ").append(intervals.get(i)). |
| append("\n"); |
| if (intervals.get(i) < expect) { |
| fail = true; |
| } |
| } |
| if (fail) { |
| fail(failures.toString()); |
| } |
| //Ensure we have really exited |
| try { |
| f.get(); |
| fail ("CancellationException should have been thrown"); |
| } catch (CancellationException e) {} |
| assertTrue(f.isCancelled()); |
| assertTrue(f.isDone()); |
| } finally { |
| rp.stop(); |
| } |
| } |
| |
| public void testScheduleFixedRateOnMultiThreadPoolDoesNotCauseConcurrentExecution() throws Exception { |
| final AtomicInteger val = new AtomicInteger(0); |
| final CountDownLatch latch = new CountDownLatch(10); |
| class C implements Runnable { |
| boolean failed; |
| @Override |
| public void run() { |
| try { |
| int now = val.incrementAndGet(); |
| if (now > 1) { |
| failed = true; |
| fail (now + " threads simultaneously in run()"); |
| } |
| try { |
| //Intentionally sleep *longer* than the interval |
| //between executions. We *want* to pile up all of the |
| //RP threads entering run() - synchronization should |
| //serialize them. This test is to prove that this |
| //method will never be called concurrently from two threads |
| Thread.sleep(1000); |
| } catch (InterruptedException ex) { |
| |
| } |
| } finally { |
| val.decrementAndGet(); |
| latch.countDown(); |
| } |
| } |
| } |
| C c = new C(); |
| long initialDelay = 2000; |
| long period = 10; |
| RequestProcessor rp = new RequestProcessor("testScheduleFixedRateOnMultiThreadPoolDoesNotCauseConcurrentExecution", 10, true); |
| rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS); |
| latch.await(); |
| assertFalse(c.failed); |
| rp.stop(); |
| } |
| |
| @RandomlyFails |
| public void testScheduleFixedRateWithShorterIntervalThanRunMethodTimeAreNotDelayed() throws Exception { |
| final CountDownLatch latch = new CountDownLatch(10); |
| final List<Long> intervals = new CopyOnWriteArrayList<Long>(); |
| class C implements Runnable { |
| long start = Long.MIN_VALUE; |
| |
| @Override |
| public void run() { |
| long end = System.currentTimeMillis(); |
| if (start != Long.MIN_VALUE) { |
| intervals.add(end - start); |
| } |
| try { |
| Thread.sleep(500); |
| } catch (InterruptedException ex) { |
| |
| } |
| start = System.currentTimeMillis(); |
| latch.countDown(); |
| } |
| } |
| C c = new C(); |
| long initialDelay = 100; |
| long period = 100; |
| RequestProcessor rp = new RequestProcessor("testScheduleFixedRateWithShorterIntervalThanRunMethodTimeAreNotDelayed", 10, true); |
| ScheduledFuture<?> f = rp.scheduleAtFixedRate(c, initialDelay, period, TimeUnit.MILLISECONDS); |
| latch.await(); |
| f.cancel(true); |
| rp.stop(); |
| int max = intervals.size(); |
| for (int i= 0; i < max; i++) { |
| long iv = intervals.get(i); |
| assertFalse ("Interval " + i + " should have been at least less than requested interval * 1.5 with fixed rate" + iv, iv > 150); |
| } |
| } |
| |
| public void testCancelFutureInterruptsThreadEvenIfRequestProcessorForbidsIt() throws Exception { |
| RequestProcessor rp = new RequestProcessor ("X", 3, false, true); |
| final CountDownLatch releaseForRun = new CountDownLatch(1); |
| final CountDownLatch enterLatch = new CountDownLatch(1); |
| final CountDownLatch exitLatch = new CountDownLatch(1); |
| class R implements Runnable { |
| volatile boolean interrupted; |
| @Override |
| public void run() { |
| enterLatch.countDown(); |
| try { |
| releaseForRun.await(); |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| } |
| interrupted |= Thread.interrupted(); |
| exitLatch.countDown(); |
| } |
| } |
| R r = new R(); |
| Future<?> f = rp.submit(r); |
| enterLatch.await(); |
| f.cancel(true); |
| assertTrue (f.isCancelled()); |
| exitLatch.await(); |
| assertTrue (r.interrupted); |
| } |
| |
| public void testCancelDoesNotInterruptIfNotPassedToFutureDotCancel() throws Exception { |
| RequestProcessor rp = new RequestProcessor ("X", 3, false, true); |
| final CountDownLatch releaseForRun = new CountDownLatch(1); |
| final CountDownLatch enterLatch = new CountDownLatch(1); |
| final CountDownLatch exitLatch = new CountDownLatch(1); |
| class R implements Runnable { |
| volatile boolean interrupted; |
| @Override |
| public void run() { |
| enterLatch.countDown(); |
| try { |
| releaseForRun.await(); |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| } |
| interrupted |= Thread.interrupted(); |
| exitLatch.countDown(); |
| } |
| } |
| R r = new R(); |
| Future<?> f = rp.submit(r); |
| enterLatch.await(); |
| f.cancel(false); |
| assertTrue (f.isCancelled()); |
| assertFalse (r.interrupted); |
| } |
| |
| public void testSubmittedTasksExecutedBeforeShutdown() throws InterruptedException { |
| final CountDownLatch startLatch = new CountDownLatch(1); |
| final CountDownLatch executedLatch = new CountDownLatch(2); |
| Runnable dummyRunnable = new Runnable() { |
| |
| @Override |
| public void run() { |
| try { |
| startLatch.await(); |
| Thread.sleep(100); |
| } catch (InterruptedException ex) { |
| Thread.currentThread().interrupt(); |
| } finally { |
| executedLatch.countDown(); |
| } |
| } |
| }; |
| |
| RequestProcessor rp = new RequestProcessor("X", 1); |
| rp.submit(dummyRunnable); |
| rp.submit(dummyRunnable); |
| rp.shutdown(); |
| startLatch.countDown(); |
| |
| assertTrue("Submitted tasks not executed", executedLatch.await(5, TimeUnit.SECONDS)); |
| } |
| |
| public void testExecutingTasksNotInterruptedOnShutdown() throws InterruptedException { |
| final CountDownLatch startLatch = new CountDownLatch(2); |
| final CountDownLatch blockingLatch = new CountDownLatch(1); |
| final CountDownLatch executedLatch = new CountDownLatch(1); |
| final AtomicBoolean interrupted = new AtomicBoolean(false); |
| Runnable dummyRunnable = new Runnable() { |
| |
| @Override |
| public void run() { |
| try { |
| startLatch.countDown(); |
| startLatch.await(); |
| blockingLatch.await(); |
| } catch (InterruptedException ex) { |
| interrupted.set(true); |
| Thread.currentThread().interrupt(); |
| } finally { |
| executedLatch.countDown(); |
| } |
| } |
| }; |
| |
| RequestProcessor rp = new RequestProcessor("X", 1); |
| rp.submit(dummyRunnable); |
| startLatch.countDown(); |
| try { |
| startLatch.await(); |
| } catch (InterruptedException ex) { |
| Exceptions.printStackTrace(ex); |
| } |
| rp.shutdown(); |
| blockingLatch.countDown(); |
| executedLatch.await(); |
| assertFalse("Executing tasks interrupted", interrupted.get()); |
| } |
| |
| public void testAwaitingTasksReturnedOnShutdownNow() throws InterruptedException { |
| final CountDownLatch startupLatch = new CountDownLatch(2); |
| final CountDownLatch blockingLatch = new CountDownLatch(1); |
| Runnable blockingRunnable = new Runnable() { |
| |
| @Override |
| public void run() { |
| try { |
| startupLatch.countDown(); |
| startupLatch.await(); |
| blockingLatch.await(); |
| } catch (InterruptedException ex) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| }; |
| |
| Runnable awaitingRunnable = new Runnable() { |
| |
| @Override |
| public void run() { |
| // noop |
| } |
| }; |
| |
| RequestProcessor rp = new RequestProcessor("X", 1); |
| rp.submit(blockingRunnable); |
| startupLatch.countDown(); |
| startupLatch.await(); |
| rp.submit(awaitingRunnable); |
| Set<Runnable> awaiting = new HashSet<Runnable>(rp.shutdownNow()); |
| assertTrue("Awaiting task not returned on shutdownNow()", awaiting.contains(awaitingRunnable)); |
| assertFalse("Running task returned on shutdownNow()", awaiting.contains(blockingRunnable)); |
| } |
| |
| private static void assertAtLeast(int exp, int real) { |
| if (exp > real) { |
| fail("Expecting at least " + exp + " but was only " + real); |
| } |
| } |
| } |