blob: 321bd0eae2ceccfe58751d86c8c656cab6e8ed65 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
/**
* @author dsmith
*
*/
@Category(IntegrationTest.class)
public class ScheduledThreadPoolExecutorWithKeepAliveJUnitTest {
ScheduledThreadPoolExecutorWithKeepAlive ex;
@After
public void tearDown() throws Exception {
ex.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
ex.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
ex.shutdownNow();
assertTrue(ex.awaitTermination(10, TimeUnit.SECONDS));
}
@Test
public void testFuture() throws InterruptedException, ExecutionException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(
5, 60, TimeUnit.SECONDS, Executors.defaultThreadFactory());
final AtomicBoolean done = new AtomicBoolean();
Future f = ex.submit(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
fail("interrupted");
}
done.set(true);
}
});
f.get();
assertTrue("Task did not complete", done.get());
Thread.sleep(2000); // let the thread finish with the task
f = ex.submit(new Callable() {
public Object call() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
fail("interrupted");
}
return Boolean.TRUE;
}
});
assertTrue("Task did not complete", ((Boolean)f.get()).booleanValue());
assertEquals(2, ex.getLargestPoolSize());
}
@Test
public void testConcurrentExecutionAndExpiration() throws InterruptedException, ExecutionException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(
50, 1, TimeUnit.SECONDS, Executors.defaultThreadFactory());
Runnable waitForABit = new Runnable() {
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
fail("interrupted");
}
}
};
Future[] futures = new Future[50];
for(int i = 0; i < 50; i++) {
futures[i] = ex.submit(waitForABit);
}
long start = System.nanoTime();
for(int i = 0; i < 50; i++) {
futures[i].get();
}
long end = System.nanoTime();
assertTrue("Tasks executed in parallel", TimeUnit.NANOSECONDS.toSeconds(end - start) < 50);
assertEquals(50, ex.getLargestPoolSize());
//now make sure we expire back down.
Thread.sleep(5000);
assertEquals(1, ex.getPoolSize());
}
@Test
public void testConcurrentRepeatedTasks() throws InterruptedException, ExecutionException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(
50, 1, TimeUnit.SECONDS, Executors.defaultThreadFactory());
final AtomicInteger counter = new AtomicInteger();
Runnable waitForABit = new Runnable() {
public void run() {
try {
counter.incrementAndGet();
Thread.sleep(500);
} catch (InterruptedException e) {
fail("interrupted");
}
}
};
Future[] futures = new Future[50];
for(int i = 0; i < 50; i++) {
futures[i] = ex.scheduleAtFixedRate(waitForABit,0, 1, TimeUnit.SECONDS);
}
Thread.sleep(10000);
for(int i = 0; i < 50; i++) {
futures[i].cancel(true);
}
System.err.println("Counter = " + counter);
assertTrue("Tasks did not execute in parallel. Expected more than 300 executions, got " + counter.get(), counter.get() > 300);
assertEquals(50, ex.getLargestPoolSize());
//now make sure we expire back down.
Thread.sleep(5000);
assertEquals(1, ex.getPoolSize());
}
/**
* time, in nanoseconds, that we should tolerate as slop
* (evidently needed for windows)
*/
private static final long SLOP = TimeUnit.MILLISECONDS.toNanos(20);
@Test
public void testDelayedExcecution() throws InterruptedException, ExecutionException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(
50, 1, TimeUnit.SECONDS, Executors.defaultThreadFactory());
long start = System.nanoTime();
Future f = ex.schedule(new Runnable() { public void run() {}}, 10, TimeUnit.SECONDS);
f.get();
long end = System.nanoTime();
assertTrue("Execution was not delayed 10 seconds, only " + (end - start),
TimeUnit.SECONDS.toNanos(10) <= end - start + SLOP);
}
@Test
public void testRepeatedExecution() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(
50, 1, TimeUnit.SECONDS, Executors.defaultThreadFactory());
final AtomicInteger counter = new AtomicInteger();
Runnable run = new Runnable() {
public void run() {
counter.incrementAndGet();
}
};
ScheduledFuture f = ex.scheduleAtFixedRate(run, 0, 1, TimeUnit.SECONDS);
Thread.sleep(5000);
f.cancel(true);
assertTrue("Task was not executed repeatedly", counter.get() > 1);
int oldValue = counter.get();
Thread.sleep(5000);
assertEquals("Task was not cancelled", oldValue, counter.get());
}
@Test
public void testShutdown() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(
50, 1, TimeUnit.SECONDS, Executors.defaultThreadFactory());
ex.schedule(new Runnable() {
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
fail("interrupted");
}
}
}, 2, TimeUnit.SECONDS);
ex.shutdown();
long start = System.nanoTime();
assertTrue(ex.awaitTermination(10, TimeUnit.SECONDS));
long elapsed = System.nanoTime() - start;
assertTrue("Shutdown did not wait to task to complete. Only waited "
+ TimeUnit.NANOSECONDS.toMillis(elapsed),
TimeUnit.SECONDS.toNanos(4) < elapsed + SLOP);
}
@Test
public void testShutdown2() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(
50, 1, TimeUnit.SECONDS, Executors.defaultThreadFactory());
ex.submit(new Runnable() {
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
fail("interrupted");
}
}
});
//give it a chance to get in the worker pool
Thread.sleep(500);
ex.shutdown();
long start = System.nanoTime();
assertTrue(ex.awaitTermination(10, TimeUnit.SECONDS));
long elapsed = System.nanoTime() - start;
assertTrue("Shutdown did not wait to task to complete. Only waited "
+ TimeUnit.NANOSECONDS.toMillis(elapsed), TimeUnit.SECONDS.toNanos(2) < elapsed);
}
@Test
public void testShutdownNow() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(
50, 1, TimeUnit.SECONDS, Executors.defaultThreadFactory());
ex.schedule(new Runnable() {
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
fail("interrupted");
}
}
}, 2, TimeUnit.SECONDS);
ex.shutdownNow();
long start = System.nanoTime();
assertTrue(ex.awaitTermination(1, TimeUnit.SECONDS));
long elapsed = System.nanoTime() - start;
assertTrue("ShutdownNow should not have waited. Waited "
+ TimeUnit.NANOSECONDS.toMillis(elapsed), TimeUnit.SECONDS.toNanos(2) > elapsed);
}
@Test
public void testShutdownNow2() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(
50, 1, TimeUnit.SECONDS, Executors.defaultThreadFactory());
ex.submit(new Runnable() {
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
fail("interrupted");
}
}
});
//give it a chance to get in the worker pool.
Thread.sleep(500);
ex.shutdownNow();
long start = System.nanoTime();
assertTrue(ex.awaitTermination(1, TimeUnit.SECONDS));
long elapsed = System.nanoTime() - start;
assertTrue("ShutdownNow should not have waited. Waited "
+ TimeUnit.NANOSECONDS.toMillis(elapsed), TimeUnit.SECONDS.toNanos(2) > elapsed);
}
@Test
public void testShutdownDelayedTasks() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(
50, 1, TimeUnit.SECONDS, Executors.defaultThreadFactory());
ex.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
ex.schedule(new Runnable() {
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
fail("interrupted");
}
}
}, 5000, TimeUnit.MILLISECONDS);
ex.shutdown();
long start = System.nanoTime();
assertTrue(ex.awaitTermination(30, TimeUnit.SECONDS));
long elapsed = System.nanoTime() - start;
assertTrue("Shutdown should not have waited. Waited "
+ TimeUnit.NANOSECONDS.toMillis(elapsed), TimeUnit.SECONDS.toNanos(2) > elapsed);
}
@Test
public void testAllWorkersActive() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(
6, 1, TimeUnit.SECONDS, Executors.defaultThreadFactory());
final AtomicInteger counter = new AtomicInteger();
long start = System.nanoTime();
for(int i = 0; i < 100; i++) {
ex.submit(new Runnable() {
public void run() {
try {
Thread.sleep(500);
counter.incrementAndGet();
} catch (InterruptedException e) {
fail("interrupted");
}
}
});
}
long elapsed = System.nanoTime() - start;
assertTrue("calling ex.submit blocked the caller", TimeUnit.SECONDS.toNanos(1) > elapsed);
Thread.sleep(20 * 500 + 1000);
assertEquals(100, counter.get());
assertEquals(6, ex.getMaximumPoolSize());
}
}