blob: df6c128247a325d899a7a555f2cde4d1db4247cd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Test;
public class ScheduledThreadPoolExecutorWithKeepAliveJUnitTest {
ScheduledThreadPoolExecutorWithKeepAlive ex;
@After
public void tearDown() throws Exception {
ex.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
ex.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
ex.shutdownNow();
assertTrue(ex.awaitTermination(10, TimeUnit.SECONDS));
}
@Test
public void testFuture() throws InterruptedException, ExecutionException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(5, 60, TimeUnit.SECONDS,
Executors.defaultThreadFactory(), null);
final AtomicBoolean done = new AtomicBoolean();
Future f = ex.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
fail("interrupted");
}
done.set(true);
}
});
f.get();
assertTrue("Task did not complete", done.get());
Thread.sleep(2000); // let the thread finish with the task
f = ex.submit(new Callable() {
@Override
public Object call() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
fail("interrupted");
}
return Boolean.TRUE;
}
});
assertTrue("Task did not complete", ((Boolean) f.get()).booleanValue());
assertEquals(2, ex.getLargestPoolSize());
}
@Test
public void testConcurrentExecutionAndExpiration()
throws InterruptedException, ExecutionException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
Executors.defaultThreadFactory(), null);
Runnable waitForABit = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
fail("interrupted");
}
}
};
Future[] futures = new Future[50];
for (int i = 0; i < 50; i++) {
futures[i] = ex.submit(waitForABit);
}
long start = System.nanoTime();
for (int i = 0; i < 50; i++) {
futures[i].get();
}
long end = System.nanoTime();
assertTrue("Tasks executed in parallel", TimeUnit.NANOSECONDS.toSeconds(end - start) < 50);
assertEquals(50, ex.getLargestPoolSize());
// now make sure we expire back down.
Thread.sleep(5000);
assertEquals(1, ex.getPoolSize());
}
@Test
public void testConcurrentRepeatedTasks() throws InterruptedException, ExecutionException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
Executors.defaultThreadFactory(), null);
final AtomicInteger counter = new AtomicInteger();
Runnable waitForABit = new Runnable() {
@Override
public void run() {
try {
counter.incrementAndGet();
Thread.sleep(500);
} catch (InterruptedException e) {
fail("interrupted");
}
}
};
Future[] futures = new Future[50];
for (int i = 0; i < 50; i++) {
futures[i] = ex.scheduleAtFixedRate(waitForABit, 0, 1, TimeUnit.SECONDS);
}
Thread.sleep(10000);
for (int i = 0; i < 50; i++) {
futures[i].cancel(true);
}
System.err.println("Counter = " + counter);
assertTrue("Tasks did not execute in parallel. Expected more than 300 executions, got "
+ counter.get(), counter.get() > 300);
assertEquals(50, ex.getLargestPoolSize());
// now make sure we expire back down.
Thread.sleep(5000);
assertEquals(1, ex.getPoolSize());
}
/**
* time, in nanoseconds, that we should tolerate as slop (evidently needed for windows)
*/
private static final long SLOP = TimeUnit.MILLISECONDS.toNanos(20);
@Test
public void testDelayedExcecution() throws InterruptedException, ExecutionException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
Executors.defaultThreadFactory(), null);
long start = System.nanoTime();
Future f = ex.schedule(new Runnable() {
@Override
public void run() {}
}, 10, TimeUnit.SECONDS);
f.get();
long end = System.nanoTime();
assertTrue("Execution was not delayed 10 seconds, only " + (end - start),
TimeUnit.SECONDS.toNanos(10) <= end - start + SLOP);
}
@Test
public void testRepeatedExecution() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
Executors.defaultThreadFactory(), null);
final AtomicInteger counter = new AtomicInteger();
Runnable run = new Runnable() {
@Override
public void run() {
counter.incrementAndGet();
}
};
ScheduledFuture f = ex.scheduleAtFixedRate(run, 0, 1, TimeUnit.SECONDS);
await()
.untilAsserted(
() -> assertEquals("Task was not executed repeatedly", true, counter.get() > 1));
await()
.untilAsserted(() -> assertEquals("The task could not be cancelled", true, f.cancel(true)));
await()
.untilAsserted(
() -> assertEquals("Task was not cancelled within 30 sec", true, f.isCancelled()));
int oldValue = counter.get();
Thread.sleep(5000);
assertEquals("Task was not cancelled", oldValue, counter.get());
}
@Test
public void testShutdown() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
Executors.defaultThreadFactory(), null);
ex.schedule(new Runnable() {
@Override
public void run() {
try {
// change to sleep 3 seconds, the same as testShutdown2, to avoid not enough SLOP time
Thread.sleep(3000);
System.out.println("Finished scheduled task");
} catch (InterruptedException e) {
fail("interrupted");
}
}
}, 2, TimeUnit.SECONDS);
long start = System.nanoTime();
ex.shutdown();
assertTrue(ex.awaitTermination(10, TimeUnit.SECONDS));
long elapsed = System.nanoTime() - start;
assertTrue("Shutdown did not wait for task to complete. Only waited "
+ TimeUnit.NANOSECONDS.toMillis(elapsed), TimeUnit.SECONDS.toNanos(4) < elapsed + SLOP);
}
@Test
public void testShutdown2() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
Executors.defaultThreadFactory(), null);
ex.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
fail("interrupted");
}
}
});
// give it a chance to get in the worker pool
Thread.sleep(500);
ex.shutdown();
long start = System.nanoTime();
assertTrue(ex.awaitTermination(10, TimeUnit.SECONDS));
long elapsed = System.nanoTime() - start;
assertTrue("Shutdown did not wait to task to complete. Only waited "
+ TimeUnit.NANOSECONDS.toMillis(elapsed), TimeUnit.SECONDS.toNanos(2) < elapsed);
}
@Test
public void testShutdownNow() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
Executors.defaultThreadFactory(), null);
ex.schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
fail("interrupted");
}
}
}, 2, TimeUnit.SECONDS);
ex.shutdownNow();
long start = System.nanoTime();
assertTrue(ex.awaitTermination(1, TimeUnit.SECONDS));
long elapsed = System.nanoTime() - start;
assertTrue(
"ShutdownNow should not have waited. Waited " + TimeUnit.NANOSECONDS.toMillis(elapsed),
TimeUnit.SECONDS.toNanos(2) > elapsed);
}
@Test
public void testShutdownNow2() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
Executors.defaultThreadFactory(), null);
ex.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
fail("interrupted");
}
}
});
// give it a chance to get in the worker pool.
Thread.sleep(500);
ex.shutdownNow();
long start = System.nanoTime();
assertTrue(ex.awaitTermination(1, TimeUnit.SECONDS));
long elapsed = System.nanoTime() - start;
assertTrue(
"ShutdownNow should not have waited. Waited " + TimeUnit.NANOSECONDS.toMillis(elapsed),
TimeUnit.SECONDS.toNanos(2) > elapsed);
}
@Test
public void testShutdownDelayedTasks() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(50, 1, TimeUnit.SECONDS,
Executors.defaultThreadFactory(), null);
ex.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
ex.schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
fail("interrupted");
}
}
}, 5000, TimeUnit.MILLISECONDS);
ex.shutdown();
long start = System.nanoTime();
assertTrue(ex.awaitTermination(30, TimeUnit.SECONDS));
long elapsed = System.nanoTime() - start;
assertTrue("Shutdown should not have waited. Waited " + TimeUnit.NANOSECONDS.toMillis(elapsed),
TimeUnit.SECONDS.toNanos(2) > elapsed);
}
@Test
public void testAllWorkersActive() throws InterruptedException {
ex = new ScheduledThreadPoolExecutorWithKeepAlive(6, 1, TimeUnit.SECONDS,
Executors.defaultThreadFactory(), null);
final AtomicInteger counter = new AtomicInteger();
long start = System.nanoTime();
for (int i = 0; i < 100; i++) {
ex.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(500);
counter.incrementAndGet();
} catch (InterruptedException e) {
fail("interrupted");
}
}
});
}
long elapsed = System.nanoTime() - start;
assertTrue("calling ex.submit blocked the caller", TimeUnit.SECONDS.toNanos(1) > elapsed);
Thread.sleep(20 * 500 + 1000);
assertEquals(100, counter.get());
assertEquals(6, ex.getMaximumPoolSize());
}
}