blob: 3cf8543656dabfb1aa83c81a50ccbf0273e7b6e6 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.junit.concurrency;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ThreadFactory;
/**
* ThreadTestCoordinator provides an array of binary latches that allows threads to wait for other threads or to send
* them a signal that allows them to continue running or to wait for another thread to signal them. The binary latch
* array is always a square array, allowing one latch from and to every thread. Upon accepting an allow signal from one
* sender the latches for all senders for a are cleared. This class is always used in conjunction with
* {@link TestRunnable} for writing concurrent test code that coordinates multi-threaded activity in order to reproduce
* concurrency bugs.
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Accept test threads to coordinate.
* <tr><td> Allow test threads to send 'allow to continue' signals.
* <tr><td> Allow test threads to wait on this coordinator for 'allow to continue' signals.
* <tr><td> Report error messages from test threads.
* <tr><td> Report exceptions from test threads.
* <tr><td> Provide method to wait until all test threads have completed.
* </table>
*
* @todo This code was hacked together as a bit of an experiment, because I wasn't sure if this idea would work. It has
* proved extremely usefull. Some documentation for this needs to be written to explain it better.
*
* @todo Consider how deadlock detection will be handled. If all threads are blocking on the coordinator, waiting for
* each other, they are deadlocked and there is something wrong with the test code that put them in that
* situation. If they are all blocked elsewhere, they may be deadlocked, or could just be waiting on some
* external event. A timeout should be used. Timeout is already implemented, just need to sanity check how
* this is working and document it.
*
* @todo Consider how livelock detection could be implemented? LockFree data structures might cause live locks. I
* guess a longish timeout is the only thing that can be done for that.
*
* @todo Only course grained synchronous at the method class level can be obtained. This is because test code can
* only insert synchronization points between method calls it makes. So this code will not be usefull for
* checking sequences of events within methods, unless the code under test is explicitly instrumented for it.
* It might be possible to instrument code by using labels, and then use the debugger/profiler interface to
* put breakpoints on the labels and use them as synchronization points. Not perfect, but at the unused labels
* can be left in the code, without altering its behaviour.
*
* @author Rupert Smith
*/
public class ThreadTestCoordinator
{
/** Used for logging. */
private static final Logger log = LoggerFactory.getLogger(ThreadTestCoordinator.class);
/** Keeps track of the test threads by their ids. */
private TestRunnable[] testThreads; // = new TestRunnable[2];
/** An explicit thread monitor for the coordinator. Threads wait on the coordinator whilst waiting for events. */
private final Object coordinatorLock = new Object();
/** A set of monitors for each test thread. */
private Object[] locks;
/** The binary latch array, this is always a square array allowing one event from and to every thread. */
private boolean[][] allowEvents;
/** Keeps track of the number of threads being coordinated. */
private int threadCount = 0;
/** Accumulates any exceptions resulting from the threads run methods. */
private Collection<Exception> exceptions = new ArrayList<Exception>();
/**
* Holds the deadlock timeout after which threads are given a runtime exception to signal that a potential
* deadlock may be happening.
*/
private long deadlockTimeout = 1000 * 1000000;
/** Holds the factory to create test thread with. */
private ThreadFactory threadFactory;
/**
* Creates a new test thread coordinator. The number of threads to run must be specified here.
*
* @param numThreads The number of threads to run.
*/
public ThreadTestCoordinator(int numThreads)
{
this.threadCount = numThreads;
// Create an array big enough to hold all the test threads.
testThreads = new TestRunnable[threadCount];
// Use the default thread factory, as none specified.
threadFactory = new DefaultThreadFactory();
}
/**
* Creates a new test thread coordinator with a specific thread factory. The number of threads to run must be
* specified here.
*
* @param numThreads The number of threads to run.
* @param threadFactory The factory to use to create the test threads.
*/
public ThreadTestCoordinator(int numThreads, ThreadFactory threadFactory)
{
this.threadCount = numThreads;
// Create an array big enough to hold all the test threads.
testThreads = new TestRunnable[threadCount];
// Use the specified thread factory.
this.threadFactory = threadFactory;
}
/**
* Adds a thread to this coordinator and assigns an id to it. The ids must be numbered sequentially from 0 and
* it is up to the caller to do this.
*
* @param runnable The test thread.
* @param id The explicit id to assign to the test thread.
*/
public void addTestThread(TestRunnable runnable, int id)
{
testThreads[id] = runnable;
runnable.setCoordinator(this);
runnable.setId(id);
}
/**
* Starts all the coordinated threads running.
*/
public void run()
{
// Create the monitors for each thread.
locks = new Object[threadCount];
// Create an appropriately sized event queue to allow one event from and to each thread.
allowEvents = new boolean[threadCount][threadCount];
// Initialize the monitors and clear the event queues.
for (int i = 0; i < locks.length; i++)
{
locks[i] = new Object();
for (int j = 0; j < locks.length; j++)
{
allowEvents[i][j] = false;
}
}
// Start all the threads running.
for (TestRunnable nextRunnable : testThreads)
{
// Create a Java thread for the test thread.
Thread newThread = threadFactory.newThread(nextRunnable);
nextRunnable.setThread(newThread);
// Start it running.
newThread.start();
}
}
/**
* Waits until all the test threads have completed and returns any accumulated error messages from them. Any
* exceptions thrown by their run methods are also kept at this point.
*
* @return The accumulated error messages from all the threads concatenated together.
*/
public String joinAndRetrieveMessages()
{
// Create an empty error message.
String errorMessage = "";
// Join all the test threads.
for (TestRunnable r : testThreads)
{
Thread t = r.getThread();
try
{
t.join();
}
catch (InterruptedException e)
{ }
// Add any accumulated error messages to the return value.
errorMessage += r.getErrorMessage();
// Keep any exceptions resulting from the threads run method.
Exception e = r.getException();
if (e != null)
{
exceptions.add(e);
}
}
return errorMessage;
}
/**
* Reports any accumulated exceptions from the test threads run methods. This method must be called after
* {@link #joinAndRetrieveMessages}.
*
* @return Any accumulated exceptions from the test threads run methods. This method must be called after
*/
public Collection<Exception> getExceptions()
{
return exceptions;
}
/**
* Sets a timeout to break out of potential deadlocks. If all threads are waiting for other threads to send
* them continue events for longer than this timeout then the threads are all terminated.
*
* @param millis The minimum time to allow to pass before breaking out of any potential deadlocks.
*
* @todo This has not been implemented yet. If a potential deadlock happens then the joinAndRetrieveMessages
* method should throw a PotentialDeadlockException.
*/
public void setDeadlockTimeout(long millis)
{
deadlockTimeout = millis * 1000000;
}
/**
* Creates a set of 'allow to continue' events on the event queues of the specified threads.
*
* @param threads The set of threads to allow to continue.
* @param callerId The explicit id of the calling test thread.
* @param caller The calling test thread.
*/
void produceAllowEvents(int[] threads, int callerId, TestRunnable caller)
{
// Generate some debugging messages. Very usefull to know how thread synchronization is progressing.
String message = "Thread " + callerId + " is allowing threads [ ";
for (int j = 0; j < threads.length; j++)
{
message += threads[j] + ((j < (threads.length - 1)) ? ", " : "");
}
message += " ] to continue.";
log.debug(message);
// For each allow event, synchronize on the threads lock then set the event flag to true.
for (int id : threads)
{
// Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
// being blocked at this time.
caller.setWaitingOnCoordinator(true);
synchronized (locks[id])
{
// Release the wating on coordinator flag now that this thread is running again.
caller.setWaitingOnCoordinator(false);
// Send the allow to continue event to the receiving thread.
allowEvents[id][callerId] = true;
}
}
// Wake up any threads waiting on the coordinator lock to recheck their event queues.
// Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
// being blocked at this time.
caller.setWaitingOnCoordinator(true);
synchronized (coordinatorLock)
{
// Release the wating on coordinator flag now that this thread is running again.
caller.setWaitingOnCoordinator(false);
coordinatorLock.notifyAll();
}
}
/**
* Consumes an 'allow to continue' from one of the specified threads or waits until one is available or in some
* cases if one of the specified threads is blocked elsewhere to accept that as an 'allow to continue' event.
*
* @param threads The set of threads to accept an allow to continue event from.
* @param otherWaitIsAllow Whether or not to accept threads being blocked elsewhere as permission to continue.
* @param callerId The explicit id of the calling test thread.
* @param caller The calling test thread.
*
* @return If the <tt>otherWaitIsAllow</tt> flag is set, then <tt>true</tt> is returned when the thread being waited on is found
* to be blocked outside of the thread test coordinator. <tt>false</tt> under all other conditions.
*/
boolean consumeAllowEvent(int[] threads, boolean otherWaitIsAllow, int callerId, TestRunnable caller)
{
// Generate some debugging messages. Very usefull to know how thread synchronization is progressing.
String message = "Thread " + callerId + " is requesting threads [ ";
// Record the time at which this method was called. Will be used for breaking out of potential deadlocks.
long startTime = System.nanoTime();
for (int j = 0; j < threads.length; j++)
{
message += threads[j] + ((j < (threads.length - 1)) ? ", " : "");
}
message += " ] to allow it to continue.";
log.debug(message);
// Loop until an allow to continue event is received.
while (true)
{
// Look at all the allowing thread to see if one has created an event for consumption.
for (int allowerId : threads)
{
// Get the threads lock for the event to consume.
// Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
// being blocked at this time.
caller.setWaitingOnCoordinator(true);
synchronized (locks[callerId])
{
// Release the wating on coordinator flag now that this thread is running again.
caller.setWaitingOnCoordinator(false);
// Check if there is an event on the queue from the allowing thread to this one.
if (allowEvents[callerId][allowerId])
{
log.debug("Found an allow event, thread " + allowerId + ", is allowing thread " + callerId
+ ", to continue.");
// Consume all the allow events for this thread.
/*for (int i = 0; i < allowEvents[callerId].length; i++)
{
allowEvents[callerId][i] = false;
}*/
// Consume just the event from the allower to the consumer, leaving other pending allow events alone.
allowEvents[callerId][allowerId] = false;
return false;
}
}
}
// If waiting elsewhere is to be interpreted as an 'allow to continue' event, then look at the thread status
// for the threads being waited on to see if any are blocked on other resources.
if (otherWaitIsAllow)
{
log.debug("Other wait is to be interpreted as an allow event.");
// Look at all the potential allower threads.
for (int allowerId : threads)
{
// Get the Java thread state for the allowing thread.
Thread threadToTest = testThreads[allowerId].getThread();
Thread.State state = threadToTest.getState();
// Check if the thread is blocked and so a potential candidate for releasing this one.
if ((state == Thread.State.BLOCKED) || (state == Thread.State.WAITING)
|| (state == Thread.State.TIMED_WAITING))
{
log.debug("Found an allower thread, id = " + allowerId + ", that is blocked or wating.");
// Check that the allower thread is not waiting on the coordinator lock or any of the
// individual thread locks. It must be waiting or blocked on another monitor.
TestRunnable allowingRunnable = testThreads[allowerId];
boolean isWaitingOnCoordinator = allowingRunnable.isWaitingOnCoordinator();
if (!isWaitingOnCoordinator)
{
log.debug("The allower thread, id = " + allowerId
+ ", is blocked or waiting other than on the coordinator.");
// Get the threads lock for the event to consume.
caller.setWaitingOnCoordinator(true);
synchronized (locks[callerId])
{
caller.setWaitingOnCoordinator(false);
// Consume all the allow events for this thread.
for (int i = 0; i < allowEvents[callerId].length; i++)
{
allowEvents[callerId][i] = false;
}
return true;
}
}
else
{
log.debug("The waiting allower thread, " + allowerId
+ ", is waiting on the coordinator so does not allow thread " + callerId + " to continue.");
}
}
}
}
// Keep waiting until an 'allow to continue' event can be consumed.
try
{
// Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
// being blocked at this time.
caller.setWaitingOnCoordinator(true);
synchronized (coordinatorLock)
{
// Release the wating on coordinator flag now that this thread is running again.
caller.setWaitingOnCoordinator(false);
log.debug("Thread " + callerId + " is waiting on coordinator lock for more allow events.");
// Set the waiting on coordinator flag to true in case the coordinator tries to test this thread for
// being blocked at this time.
caller.setWaitingOnCoordinator(true);
coordinatorLock.wait(10);
}
}
catch (InterruptedException e)
{ }
// Release the waiting on coordinator flag now that this thread is running again.
caller.setWaitingOnCoordinator(false);
// Check if this thread has been waiting for longer than the deadlock timeout and raise a possible
// deadlock exception if so.
long waitTime = System.nanoTime() - startTime;
log.debug("Thread " + callerId + " has been waiting for " + (waitTime / 1000000) + " milliseconds.");
if (waitTime > deadlockTimeout)
{
// Throw a possible deadlock exception.
throw new PossibleDeadlockException("Possible deadlock due to timeout with state:\n" + this);
}
log.debug("Thread " + callerId + " has woken up, was waiting for more allow events to become available.");
}
}
/**
* Pretty prints the state of the thread test coordinator, for debugging purposes.
*
* @return Pretty printed state of the thread test coordinator.
*/
public String toString()
{
String result = "[";
for (int i = 0; i < allowEvents.length; i++)
{
for (int j = 0; j < allowEvents[i].length; j++)
{
result += allowEvents[i][j];
result += (j < (allowEvents[i].length - 1)) ? ", " : "";
}
result += (i < (allowEvents.length - 1)) ? ",\n " : "";
}
result += "]";
for (int i = 0; i < testThreads.length; i++)
{
result += "thread[" + i + "] = " + testThreads[i].toString();
}
return result;
}
}