blob: 9ebdf9c9db112d20c11d82cadcea32160865affa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.tools.ant.taskdefs;
import java.lang.reflect.Method;
import java.util.Enumeration;
import java.util.Vector;
import java.util.List;
import java.util.ArrayList;
import org.apache.tools.ant.BuildException;
import org.apache.tools.ant.Location;
import org.apache.tools.ant.Task;
import org.apache.tools.ant.TaskContainer;
import org.apache.tools.ant.util.StringUtils;
/**
* Executes the contained tasks in separate threads, continuing
* once all are completed.
* <p>
* New behavior allows for the ant script to specify a maximum number of
* threads that will be executed in parallel. One should be very careful about
* using the <code>waitFor</code> task when specifying <code>threadCount</code>
* as it can cause deadlocks if the number of threads is too small or if one of
* the nested tasks fails to execute completely. The task selection algorithm
* will insure that the tasks listed before a task have started before that
* task is started, but it will not insure a successful completion of those
* tasks or that those tasks will finish first (i.e. it's a classic race
* condition).
* </p>
* @since Ant 1.4
*
* @ant.task category="control"
*/
public class Parallel extends Task
implements TaskContainer {
private static final int NUMBER_TRIES = 100;
/** Class which holds a list of tasks to execute */
public static class TaskList implements TaskContainer {
/** Collection holding the nested tasks */
private List tasks = new ArrayList();
/**
* Add a nested task to execute parallel (asynchron).
* <p>
* @param nestedTask Nested task to be executed in parallel.
* must not be null.
*/
public void addTask(Task nestedTask) {
tasks.add(nestedTask);
}
}
/** Collection holding the nested tasks */
private Vector nestedTasks = new Vector();
/** Semaphore to notify of completed threads */
private final Object semaphore = new Object();
/** Total number of threads to run */
private int numThreads = 0;
/** Total number of threads per processor to run. */
private int numThreadsPerProcessor = 0;
/** The timeout period in milliseconds */
private long timeout;
/** Indicates threads are still running and new threads can be issued */
private volatile boolean stillRunning;
/** Indicates that the execution timedout */
private boolean timedOut;
/**
* Indicates whether failure of any of the nested tasks should end
* execution
*/
private boolean failOnAny;
/** The dameon task list if any */
private TaskList daemonTasks;
/** Accumulation of exceptions messages from all nested tasks */
private StringBuffer exceptionMessage;
/** Number of exceptions from nested tasks */
private int numExceptions = 0;
/** The first exception encountered */
private Throwable firstException;
/** The location of the first exception */
private Location firstLocation;
/**
* Add a group of daemon threads
* @param daemonTasks The tasks to be executed as daemon.
*/
public void addDaemons(TaskList daemonTasks) {
if (this.daemonTasks != null) {
throw new BuildException("Only one daemon group is supported");
}
this.daemonTasks = daemonTasks;
}
/**
* Interval to poll for completed threads when threadCount or
* threadsPerProcessor is specified. Integer in milliseconds.; optional
*
* @param pollInterval New value of property pollInterval.
*/
public void setPollInterval(int pollInterval) {
}
/**
* Control whether a failure in a nested task halts execution. Note that
* the task will complete but existing threads will continue to run - they
* are not stopped
*
* @param failOnAny if true any nested task failure causes parallel to
* complete.
*/
public void setFailOnAny(boolean failOnAny) {
this.failOnAny = failOnAny;
}
/**
* Add a nested task to execute in parallel.
* @param nestedTask Nested task to be executed in parallel
*/
public void addTask(Task nestedTask) {
nestedTasks.addElement(nestedTask);
}
/**
* Dynamically generates the number of threads to execute based on the
* number of available processors (via
* <code>java.lang.Runtime.availableProcessors()</code>). Requires a J2SE
* 1.4 VM, and it will overwrite the value set in threadCount.
* If used in a 1.1, 1.2, or 1.3 VM then the task will defer to
* <code>threadCount</code>.; optional
* @param numThreadsPerProcessor Number of threads to create per available
* processor.
*
*/
public void setThreadsPerProcessor(int numThreadsPerProcessor) {
this.numThreadsPerProcessor = numThreadsPerProcessor;
}
/**
* Statically determine the maximum number of tasks to execute
* simultaneously. If there are less tasks than threads then all will be
* executed at once, if there are more then only <code>threadCount</code>
* tasks will be executed at one time. If <code>threadsPerProcessor</code>
* is set and the JVM is at least a 1.4 VM then this value is
* ignored.; optional
*
* @param numThreads total number of threads.
*
*/
public void setThreadCount(int numThreads) {
this.numThreads = numThreads;
}
/**
* Sets the timeout on this set of tasks. If the timeout is reached
* before the other threads complete, the execution of this
* task completes with an exception.
*
* Note that existing threads continue to run.
*
* @param timeout timeout in milliseconds.
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
/**
* Execute the parallel tasks
*
* @exception BuildException if any of the threads failed.
*/
public void execute() throws BuildException {
updateThreadCounts();
if (numThreads == 0) {
numThreads = nestedTasks.size();
}
spinThreads();
}
/**
* Determine the number of threads based on the number of processors
*/
private void updateThreadCounts() {
if (numThreadsPerProcessor != 0) {
int numProcessors = getNumProcessors();
if (numProcessors != 0) {
numThreads = numProcessors * numThreadsPerProcessor;
}
}
}
private void processExceptions(TaskRunnable[] runnables) {
if (runnables == null) {
return;
}
for (int i = 0; i < runnables.length; ++i) {
Throwable t = runnables[i].getException();
if (t != null) {
numExceptions++;
if (firstException == null) {
firstException = t;
}
if (t instanceof BuildException
&& firstLocation == Location.UNKNOWN_LOCATION) {
firstLocation = ((BuildException) t).getLocation();
}
exceptionMessage.append(StringUtils.LINE_SEP);
exceptionMessage.append(t.getMessage());
}
}
}
/**
* Spin up required threads with a maximum number active at any given time.
*
* @exception BuildException if any of the threads failed.
*/
private void spinThreads() throws BuildException {
final int numTasks = nestedTasks.size();
TaskRunnable[] runnables = new TaskRunnable[numTasks];
stillRunning = true;
timedOut = false;
boolean interrupted = false;
int threadNumber = 0;
for (Enumeration e = nestedTasks.elements(); e.hasMoreElements();
threadNumber++) {
Task nestedTask = (Task) e.nextElement();
runnables[threadNumber]
= new TaskRunnable(nestedTask);
}
final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
TaskRunnable[] running = new TaskRunnable[maxRunning];
threadNumber = 0;
ThreadGroup group = new ThreadGroup("parallel");
TaskRunnable[] daemons = null;
if (daemonTasks != null && daemonTasks.tasks.size() != 0) {
daemons = new TaskRunnable[daemonTasks.tasks.size()];
}
synchronized (semaphore) {
// When we leave this block we can be sure all data is really
// stored in main memory before the new threads start, the new
// threads will for sure load the data from main memory.
//
// This probably is slightly paranoid.
}
synchronized (semaphore) {
// start any daemon threads
if (daemons != null) {
for (int i = 0; i < daemons.length; ++i) {
daemons[i] = new TaskRunnable((Task) daemonTasks.tasks.get(i));
Thread daemonThread = new Thread(group, daemons[i]);
daemonThread.setDaemon(true);
daemonThread.start();
}
}
// now run main threads in limited numbers...
// start initial batch of threads
for (int i = 0; i < maxRunning; ++i) {
running[i] = runnables[threadNumber++];
Thread thread = new Thread(group, running[i]);
thread.start();
}
if (timeout != 0) {
// start the timeout thread
Thread timeoutThread = new Thread() {
public synchronized void run() {
try {
wait(timeout);
synchronized (semaphore) {
stillRunning = false;
timedOut = true;
semaphore.notifyAll();
}
} catch (InterruptedException e) {
// ignore
}
}
};
timeoutThread.start();
}
try {
// now find available running slots for the remaining threads
outer: while (threadNumber < numTasks && stillRunning) {
for (int i = 0; i < maxRunning; i++) {
if (running[i] == null || running[i].isFinished()) {
running[i] = runnables[threadNumber++];
Thread thread = new Thread(group, running[i]);
thread.start();
// continue on outer while loop to get another
// available slot
continue outer;
}
}
// if we got here all slots in use, so sleep until
// something happens
semaphore.wait();
}
// are all threads finished
outer2: while (stillRunning) {
for (int i = 0; i < maxRunning; ++i) {
if (running[i] != null && !running[i].isFinished()) {
// System.out.println("Thread " + i + " is still
// alive ");
// still running - wait for it
semaphore.wait();
continue outer2;
}
}
stillRunning = false;
}
} catch (InterruptedException ie) {
interrupted = true;
}
killAll(running);
}
if (interrupted) {
throw new BuildException("Parallel execution interrupted.");
}
if (timedOut) {
throw new BuildException("Parallel execution timed out");
}
// now did any of the threads throw an exception
exceptionMessage = new StringBuffer();
numExceptions = 0;
firstException = null;
firstLocation = Location.UNKNOWN_LOCATION;
processExceptions(daemons);
processExceptions(runnables);
if (numExceptions == 1) {
if (firstException instanceof BuildException) {
throw (BuildException) firstException;
} else {
throw new BuildException(firstException);
}
} else if (numExceptions > 1) {
throw new BuildException(exceptionMessage.toString(),
firstLocation);
}
}
/**
* Doesn't do anything if all threads where already gone,
* else it tries to interrupt the threads 100 times.
* @param running The list of tasks that may currently be running.
*/
private void killAll(TaskRunnable[] running) {
boolean oneAlive;
int tries = 0;
do {
oneAlive = false;
for (int i = 0; i < running.length; i++) {
if (running[i] != null && !running[i].isFinished()) {
running[i].interrupt();
Thread.yield();
oneAlive = true;
}
}
if (oneAlive) {
tries++;
Thread.yield();
}
} while (oneAlive && tries < NUMBER_TRIES);
}
/**
* Determine the number of processors. Only effective on Java 1.4+
*
* @return the number of processors available or 0 if not determinable.
*/
private int getNumProcessors() {
try {
Class[] paramTypes = {};
Method availableProcessors =
Runtime.class.getMethod("availableProcessors", paramTypes);
Object[] args = {};
Integer ret = (Integer) availableProcessors.invoke(Runtime.getRuntime(), args);
return ret.intValue();
} catch (Exception e) {
// return a bogus number
return 0;
}
}
/**
* thread that execs a task
*/
private class TaskRunnable implements Runnable {
private Throwable exception;
private Task task;
private boolean finished;
private volatile Thread thread;
/**
* Construct a new TaskRunnable.<p>
*
* @param task the Task to be executed in a separate thread
*/
TaskRunnable(Task task) {
this.task = task;
}
/**
* Executes the task within a thread and takes care about
* Exceptions raised within the task.
*/
public void run() {
try {
thread = Thread.currentThread();
task.perform();
} catch (Throwable t) {
exception = t;
if (failOnAny) {
stillRunning = false;
}
} finally {
synchronized (semaphore) {
finished = true;
semaphore.notifyAll();
}
}
}
/**
* get any exception that got thrown during execution;
* @return an exception or null for no exception/not yet finished
*/
public Throwable getException() {
return exception;
}
/**
* Provides the indicator that the task has been finished.
* @return Returns true when the task is finished.
*/
boolean isFinished() {
return finished;
}
void interrupt() {
thread.interrupt();
}
}
}