blob: d3c0286f93c9c27ac52e276463522397030dccf6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.util.concurrent;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.junit.Assert;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class DefaultThreadPoolExecutorTest extends TestCase
{
public DefaultThreadPoolExecutorTest(String name)
{
super(name);
}
public static Test suite()
{
return new TestSuite(DefaultThreadPoolExecutorTest.class);
}
public void testSimpleExecution()
{
// Create a small pool with a synchronous queue (no bounding or storage).
Executor executor = new DefaultThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new SynchronousQueue());
final CountDownLatch continueSignal = new CountDownLatch(1);
executor.execute(new Runnable(){
public void run()
{
// Indicate that we've run.
continueSignal.countDown();
}
});
try
{
boolean success = continueSignal.await(2, TimeUnit.SECONDS);
if (!success)
{
fail("Test timed out waiting for execution to complete.");
}
}
catch (InterruptedException e)
{
fail("Test was interrupted: " + e);
}
// Shut down; because we create the thread pool in this specific impl be sure to shut it down.
DefaultThreadPoolExecutor tpe = (DefaultThreadPoolExecutor)executor;
tpe.shutdown();
}
public void testFailedExecutionHandling()
{
// Create a small pool with a bounded queue that can only contain one queued task.
final DefaultThreadPoolExecutor executor = new DefaultThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new ArrayBlockingQueue(1));
final CountDownLatch taskPauseSignal = new CountDownLatch(1);
final CountDownLatch executorPauseSignal = new CountDownLatch(1);
final ArrayList failedTasks = new ArrayList();
executor.setFailedExecutionHandler(new FailedExecutionHandler(){
public void failedExecution(Runnable command, Executor ex, Exception exception)
{
assertEquals(executor, ex);
assertTrue(exception instanceof RejectedExecutionException);
failedTasks.add(command);
// Unpause the initial task processing.
taskPauseSignal.countDown();
}
});
// This first task will pause.
executor.execute(new Task("first", this, taskPauseSignal, executorPauseSignal, executor));
// Now queue two more tasks to overflow the executor's queue.
executor.execute(new Task("second", this, null, null, null));
executor.execute(new Task("third", this, null, null, null));
// and wait.
try
{
boolean success = executorPauseSignal.await(5 * 60, TimeUnit.SECONDS);
if (!success)
{
fail("Test timed out waiting for execution to complete.");
}
}
catch (InterruptedException e)
{
fail("Test was interrupted: " + e);
}
// Test failed execution handling
assertEquals(failedTasks.size(), 1);
assertTrue(((Task)failedTasks.get(0)).name.equals("third"));
// Shut down; because we create the thread pool in this specific impl be sure we shut it down.
executor.shutdown();
}
static class Task implements Runnable
{
public Task(String name, TestCase test, CountDownLatch taskPauseSignal, CountDownLatch executorPauseSignal, DefaultThreadPoolExecutor executor)
{
this.name = name;
this.test = test;
this.taskPauseSignal = taskPauseSignal;
this.executorPauseSignal = executorPauseSignal;
this.executor = executor;
}
public String name;
private TestCase test;
private CountDownLatch taskPauseSignal;
private CountDownLatch executorPauseSignal;
private DefaultThreadPoolExecutor executor;
public void run()
{
if (taskPauseSignal != null)
{
try
{
boolean success = taskPauseSignal.await(5 * 60, TimeUnit.SECONDS);
if (!success)
{
Assert.fail("Test timed out waiting for execution to complete.");
}
else
{
// At this point we should have the second task queued and the third task should have failed (overflow).
assertEquals(executor.getQueue().size(), 1);
assertTrue(((Task)executor.getQueue().peek()).name.equals("second"));
// Unblock the executor test thread.
executorPauseSignal.countDown();
}
}
catch (InterruptedException e)
{
Assert.fail("Test was interrupted: " + e);
}
}
}
}
}