blob: 11ebd1bd3c40734ac45db6b803a09b96cd20de7a [file] [log] [blame]
package org.gparallelizer
import java.util.concurrent.Callable
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicBoolean
import java.lang.Thread.UncaughtExceptionHandler
/**
* This class forms the core of the DSL initialized by <i>Asynchronizer</i>. The static methods of <i>AsyncInvokerUtil</i>
* get attached to their first arguments (the Groovy Category mechanism) and can be then invoked as if they were part of
* the argument classes.
* @see org.gparallelizer.Asynchronizer
*
* @author Vaclav Pech
* Date: Oct 23, 2008
*/
public class AsyncInvokerUtil {
/**
* Shedules the supplied closure for processing in the underlying thread pool.
*/
private static Future callAsync(Closure task) {
final ExecutorService pool = Asynchronizer.retrieveCurrentPool()
if (!pool) throw new IllegalStateException("No ExecutorService available for the current thread.")
return pool.submit(task as Callable)
}
/**
* Calls a closure in a separate thread supplying the given arguments, returning a future for the potential return value,
*/
public static Future callAsync(Closure cl, Object ... args) {
callAsync {cl(args)}
}
/**
* Submits the task for asynchronous processing returning the Future received from the executor service.
* Allows for the followitn syntax:
* <pre>
* executorService << {println 'Inside parallel task'}* </pre>
*/
public static Future leftShift(ExecutorService executorService, Closure task) {
return executorService.submit(task as Callable)
}
/**
* Creates an asynchronous variant of the supplied closure, which, when invoked returns a future for the potential return value
*/
public static Closure async(Closure cl) {
return {Object ... args -> callAsync(cl, args)}
}
/**
* Starts multiple closures in separate threads, collecting their return values
* If an exception is thrown from the closure when called on any of the collection's elements,
* it will be rethrown in the calling thread when it calls the Future.get() method.
* @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
*/
public static List<Object> doInParallel(Closure ... closures) {
return processResult(executeInParallel(closures))
}
/**
* Starts multiple closures in separate threads, collecting Futures for their return values
* If an exception is thrown from the closure when called on any of the collection's elements,
* it will be rethrown in the calling thread when it calls the Future.get() method.
*/
public static List<Future<Object>> executeInParallel(Closure ... closures) {
Asynchronizer.withAsynchronizer(closures.size()) {ExecutorService executorService ->
List<Future<Object>> result = closures.collect {cl ->
return executorService.submit({
return cl.call()
} as Callable<Object>)
}
return result
}
}
/**
* Starts multiple closures in separate threads, using a new thread for the startup.
* If any of the collection's elements causes the closure to throw an exception, an AsyncException is reported using System.err.
* The original exceptions will be stored in the AsyncException's concurrentExceptions field.
*/
public static void startInParallel(Closure ... closures) {
startInParallel(createDefaultUncaughtExceptionHandler(), closures)
}
/**
* Starts multiple closures in separate threads, using a new thread for the startup.
* If any of the collection's elements causes the closure to throw an exception, an AsyncException is reported to the supplied instance of UncaughtExceptionHandler.
* The original exceptions will be stored in the AsyncException's concurrentExceptions field.
* @return The thread that submits the closures to the thread executor service so that the caller can take ownership of it and e.g. call <i>join()</i> on it to wait for all the closures to finish processing.
*/
public static Thread startInParallel(java.lang.Thread.UncaughtExceptionHandler uncaughtExceptionHandler, Closure ... closures) {
final Thread thread = new Thread({
doInParallel(closures)
} as Runnable)
thread.daemon = false
thread.uncaughtExceptionHandler = uncaughtExceptionHandler
thread.start()
return thread
}
/**
* Iterates over a collection/object with the <i>each()</i> method using an asynchronous variant of the supplied closure
* to evaluate each collection's element. A CountDownLatch is used to make the calling thread wait for all the results.
* After this method returns, all the closures have been finished and all the potential shared resources have been updated
* by the threads.
* It's important to protect any shared resources used by the supplied closure from race conditions caused by multi-threaded access.
* Example:
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* def result = Collections.synchronizedSet(new HashSet())
* service.eachAsync([1, 2, 3, 4, 5]) {Number number -> result.add(number * 10)}* assertEquals(new HashSet([10, 20, 30, 40, 50]), result)
*}* Note that the <i>result</i> variable is synchronized to prevent race conditions between multiple threads.
* Alternatively a DSL can be used to simplify the code. All collections/objects within the <i>withAsynchronizer</i> block
* have a new <i>eachAsync(Closure cl)</i> method, which delegates to the <i>AsyncInvokerUtil</i> class.
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* def result = Collections.synchronizedSet(new HashSet())
* [1, 2, 3, 4, 5].eachAsync { Number number -> result.add(number * 10) }* assertEquals(new HashSet([10, 20, 30, 40, 50]), result)
*}* @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
*/
public static def eachAsync(Object collection, Closure cl) {
final CountDownLatch latch = new CountDownLatch(collection.size());
final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>())
def result = collection.each(async({Object ... args ->
try {
cl(args)
} catch (Throwable e) {
exceptions.add(e)
} finally {
latch.countDown()
}
}))
latch.await()
if (!exceptions.empty) throw new AsyncException("Some asynchronous operations failed. ${exceptions}", exceptions)
else return result
}
/**
* Iterates over a collection/object with the <i>collect()</i> method using an asynchronous variant of the supplied closure
* to evaluate each collection's element.
* After this method returns, all the closures have been finished and the caller can safely use the result.
* It's important to protect any shared resources used by the supplied closure from race conditions caused by multi-threaded access.
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* def result = service.collectAsync([1, 2, 3, 4, 5]){Number number -> number * 10}* assertEquals(new HashSet([10, 20, 30, 40, 50]), new HashSet((Collection)result))
*}*
* Alternatively a DSL can be used to simplify the code. All collections/objects within the <i>withAsynchronizer</i> block
* have a new <i>collectAsync(Closure cl)</i> method, which delegates to the <i>AsyncInvokerUtil</i> class.
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* def result = [1, 2, 3, 4, 5].collectAsync{Number number -> number * 10}* assertEquals(new HashSet([10, 20, 30, 40, 50]), new HashSet((Collection)result))
*}* @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
*/
public static def collectAsync(Object collection, Closure cl) {
return processResult(collection.collect(async(cl)))
}
/**
* Performs the <i>findAll()</i> operation using an asynchronous variant of the supplied closure
* to evaluate each collection's/object's element.
* After this method returns, all the closures have been finished and the caller can safely use the result.
* It's important to protect any shared resources used by the supplied closure from race conditions caused by multi-threaded access.
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* def result = service.findAllAsync([1, 2, 3, 4, 5]){Number number -> number > 2}* assertEquals(new HashSet([3, 4, 5]), new HashSet((Collection)result))
*}*
* Alternatively a DSL can be used to simplify the code. All collections/objects within the <i>withAsynchronizer</i> block
* have a new <i>findAllAsync(Closure cl)</i> method, which delegates to the <i>AsyncInvokerUtil</i> class.
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* def result = [1, 2, 3, 4, 5].findAllAsync{Number number -> number > 2}* assertEquals(new HashSet([3, 4, 5]), new HashSet((Collection)result))
*}* @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
*/
public static def findAllAsync(Object collection, Closure cl) {
collectAsync(collection, {if (cl(it)) return it else return null}).findAll {it != null}
}
/**
* Performs the <i>find()</i> operation using an asynchronous variant of the supplied closure
* to evaluate each collection's/object's element.
* After this method returns, all the closures have been finished and the caller can safely use the result.
* It's important to protect any shared resources used by the supplied closure from race conditions caused by multi-threaded access.
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* def result = service.findAsync([1, 2, 3, 4, 5]){Number number -> number > 2}* assert result in [3, 4, 5]
*}*
* Alternatively a DSL can be used to simplify the code. All collections/objects within the <i>withAsynchronizer</i> block
* have a new <i>findAllAsync(Closure cl)</i> method, which delegates to the <i>AsyncInvokerUtil</i> class.
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* def result = [1, 2, 3, 4, 5].findAsync{Number number -> number > 2}* assert result in [3, 4, 5]
*}* @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
*/
public static def findAsync(Object collection, Closure cl) {
collectAsync(collection, {if (cl(it)) return it else return null}).find {it != null}
}
/**
* Performs the <i>all()</i> operation using an asynchronous variant of the supplied closure
* to evaluate each collection's/object's element.
* After this method returns, all the closures have been finished and the caller can safely use the result.
* It's important to protect any shared resources used by the supplied closure from race conditions caused by multi-threaded access.
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* assert service.allAsync([1, 2, 3, 4, 5]){Number number -> number > 0}* assert !service.allAsync([1, 2, 3, 4, 5]){Number number -> number > 2}*}*
* Alternatively a DSL can be used to simplify the code. All collections/objects within the <i>withAsynchronizer</i> block
* have a new <i>findAllAsync(Closure cl)</i> method, which delegates to the <i>AsyncInvokerUtil</i> class.
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* assert [1, 2, 3, 4, 5].allAsync{Number number -> number > 0}* assert ![1, 2, 3, 4, 5].allAsync{Number number -> number > 2}*}* @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
*/
public static boolean allAsync(Object collection, Closure cl) {
final AtomicBoolean flag = new AtomicBoolean(true)
eachAsync(collection, {value -> if (!cl(value)) flag.set(false)})
return flag.get()
}
/**
* Performs the <i>any()</i> operation using an asynchronous variant of the supplied closure
* to evaluate each collection's/object's element.
* After this method returns, all the closures have been finished and the caller can safely use the result.
* It's important to protect any shared resources used by the supplied closure from race conditions caused by multi-threaded access.
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* assert service.anyAsync([1, 2, 3, 4, 5]){Number number -> number > 2}* assert !service.anyAsync([1, 2, 3, 4, 5]){Number number -> number > 6}*}*
* Alternatively a DSL can be used to simplify the code. All collections/objects within the <i>withAsynchronizer</i> block
* have a new <i>findAllAsync(Closure cl)</i> method, which delegates to the <i>AsyncInvokerUtil</i> class.
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* assert [1, 2, 3, 4, 5].anyAsync{Number number -> number > 2}* assert ![1, 2, 3, 4, 5].anyAsync{Number number -> number > 6}*}* @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
*/
public static boolean anyAsync(Object collection, Closure cl) {
final AtomicBoolean flag = new AtomicBoolean(false)
eachAsync(collection, {if (cl(it)) flag.set(true)})
return flag.get()
}
private static List<Object> processResult(List<Future<Object>> futures) {
final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>())
final List<Object> result = futures.collect {
try {
return it.get()
} catch (Throwable e) {
exceptions.add(e)
return e
}
}
if (!exceptions.empty) throw new AsyncException("Some asynchronous operations failed. ${exceptions}", exceptions)
else return result
}
private static UncaughtExceptionHandler createDefaultUncaughtExceptionHandler() {
return {Thread failedThread, Throwable throwable ->
System.err.println "Error processing background thread ${failedThread.name}: ${throwable.message}"
throwable.printStackTrace(System.err)
} as UncaughtExceptionHandler
}
}