blob: 61c70dc9254a5b9922d597f9eec878698fd32756 [file] [log] [blame]
package org.gparallelizer
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
/**
* Enables a ExecutorService-based DSL on closures, objects and collections.
* E.g.
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* Collection<Future> result = [1, 2, 3, 4, 5].collect({it * 10}.async())
* assertEquals(new HashSet([10, 20, 30, 40, 50]), new HashSet((Collection)result*.get()))
* }
*
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* def result = [1, 2, 3, 4, 5].findAsync{Number number -> number > 2}
* assert result in [3, 4, 5]
* }
*
* @author Vaclav Pech
* Date: Oct 23, 2008
*/
class Asynchronizer {
private static ThreadLocal<ExecutorService> currentInvoker=new ThreadLocal<ExecutorService>()
protected static ExecutorService retrieveCurrentPool() {
currentInvoker.get()
}
private static createPool() {
return createPool(Runtime.getRuntime().availableProcessors() + 1)
}
private static createPool(int poolSize) {
return createPool(poolSize, createDefaultThreadFactory())
}
private static createPool(int poolSize, ThreadFactory threadFactory) {
if (!(poolSize in 1..Integer.MAX_VALUE)) throw new IllegalArgumentException("Invalid value $poolSize for the pool size has been specified. Please supply a positive int number.")
if (!threadFactory) throw new IllegalArgumentException("No value specified for threadFactory.")
return Executors.newFixedThreadPool(poolSize, threadFactory)
}
private static ThreadFactory createDefaultThreadFactory() {
return {Runnable runnable ->
final Thread thread = new Thread(runnable)
thread.daemon = false
return thread
} as ThreadFactory
}
/**
* Creates a new instance of <i>ExecutorService</i>, binds it to the current thread, enables the ExecutorService DSL
* and runs the supplied closure.
* Within the supplied code block the <i>ExecutorService</i> is available as the only parameter, objects have been
* enhanced with the <i>eachAsync()</i>, <i>collectAsync()</i> and other methods from the <i>AsyncInvokerUtil</i>
* category class as well as closures can be turned into asynchronous ones by calling the <i>async()</i> method on them.
* E.g. <i>closure,async</i> returns a new closure, which, when run will schedule the original closure
* for processing in the pool.
* Calling <i>images.eachAsync{processImage(it}}</i> will call the potentially long-lasting <i>processImage()</i>
* operation on each image in the <i>images</i> collection in parallel.
* <pre>
* def result = new ConcurrentSkipListSet()
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* [1, 2, 3, 4, 5].eachAsync{Number number -> result.add(number * 10)}
* assertEquals(new HashSet([10, 20, 30, 40, 50]), result)
* }
* </pre>
* @param cl The block of code to invoke with the DSL enabled
*/
public static withAsynchronizer(Closure cl) {
return withAsynchronizer(3, cl)
}
/**
* Creates a new instance of <i>ExecutorService</i>, binds it to the current thread, enables the ExecutorService DSL
* and runs the supplied closure.
* Within the supplied code block the <i>ExecutorService</i> is available as the only parameter, objects have been
* enhanced with the <i>eachAsync()</i>, <i>collectAsync()</i> and other methods from the <i>AsyncInvokerUtil</i>
* category class as well as closures can be turned into asynchronous ones by calling the <i>async()</i> method on them.
* E.g. <i>closure,async</i> returns a new closure, which, when run will schedule the original closure
* for processing in the pool.
* Calling <i>images.eachAsync{processImage(it}}</i> will call the potentially long-lasting <i>processImage()</i>
* operation on each image in the <i>images</i> collection in parallel.
* <pre>
* def result = new ConcurrentSkipListSet()
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* [1, 2, 3, 4, 5].eachAsync{Number number -> result.add(number * 10)}
* assertEquals(new HashSet([10, 20, 30, 40, 50]), result)
* }
* </pre>
* @param numberOfThreads Number of threads in the newly created thread pool
* @param cl The block of code to invoke with the DSL enabled
*/
public static withAsynchronizer(int numberOfThreads, Closure cl) {
return withAsynchronizer(numberOfThreads, createDefaultThreadFactory(), cl)
}
/**
* Creates a new instance of <i>ExecutorService</i>, binds it to the current thread, enables the ExecutorService DSL
* and runs the supplied closure.
* Within the supplied code block the <i>ExecutorService</i> is available as the only parameter, objects have been
* enhanced with the <i>eachAsync()</i>, <i>collectAsync()</i> and other methods from the <i>AsyncInvokerUtil</i>
* category class as well as closures can be turned into asynchronous ones by calling the <i>async()</i> method on them.
* E.g. <i>closure,async</i> returns a new closure, which, when run will schedule the original closure
* for processing in the pool.
* Calling <i>images.eachAsync{processImage(it}}</i> will call the potentially long-lasting <i>processImage()</i>
* operation on each image in the <i>images</i> collection in parallel.
* <pre>
* def result = new ConcurrentSkipListSet()
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* [1, 2, 3, 4, 5].eachAsync{Number number -> result.add(number * 10)}
* assertEquals(new HashSet([10, 20, 30, 40, 50]), result)
* }
* </pre>
* @param numberOfThreads Number of threads in the newly created thread pool
* @param threadFactory Factory for threads in the pool
* @param cl The block of code to invoke with the DSL enabled
*/
public static withAsynchronizer(int numberOfThreads, ThreadFactory threadFactory, Closure cl) {
final ExecutorService pool = createPool(numberOfThreads, threadFactory)
try {
return withExistingAsynchronizer(pool, cl)
} finally {
pool.shutdown()
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
}
}
/**
* Creates a new instance of <i>ExecutorService</i>, binds it to the current thread, enables the ExecutorService DSL
* and runs the supplied closure.
* Within the supplied code block the <i>ExecutorService</i> is available as the only parameter, objects have been
* enhanced with the <i>eachAsync()</i>, <i>collectAsync()</i> and other methods from the <i>AsyncInvokerUtil</i>
* category class as well as closures can be turned into asynchronous ones by calling the <i>async()</i> method on them.
* E.g. <i>closure,async</i> returns a new closure, which, when run will schedule the original closure
* for processing in the pool.
* Calling <i>images.eachAsync{processImage(it}}</i> will call the potentially long-lasting <i>processImage()</i>
* operation on each image in the <i>images</i> collection in parallel.
* <pre>
* def result = new ConcurrentSkipListSet()
* Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
* [1, 2, 3, 4, 5].eachAsync{Number number -> result.add(number * 10)}
* assertEquals(new HashSet([10, 20, 30, 40, 50]), result)
* }
* </pre>
* @param pool The <i>ExecutorService</i> to use, the service will not be shutdown after this method returns
*/
public static withExistingAsynchronizer(ExecutorService pool, Closure cl) {
currentInvoker.set(pool)
def result=null
try {
use(AsyncInvokerUtil) {
result = cl(pool)
}
} finally {
currentInvoker.remove()
}
return result
}
/**
* Enhanced the Object's meta-class with the <i>withAsynchronizer</i> and <i>withExistingAsynchronizer</i> methods,
* which call apropriate static methods on the <I>Asynchronizer</i> class. So the DSL can be then wrapped like this:
* <pre>
* Asynchronizer.initializeDSL()
* ...
* withAsynchronizer {* assert ([2, 4, 6, 8, 10] == [1, 2, 3, 4, 5].collectAsync {it * 2})
* assert [1, 2, 3, 4, 5].allAsync {it > 0}* assert [1, 2, 3, 4, 5].findAsync{Number number -> number > 2} in [3, 4, 5]
* ...
*}* </pre>
*/
public static void initializeDSL () {
Object.metaClass {
withAsynchronizer = {Closure cl ->
Asynchronizer.withAsynchronizer cl
}
withAsynchronizer = {int numberOfThreads, Closure cl ->
Asynchronizer.withAsynchronizer numberOfThreads, cl
}
withAsynchronizer = {int numberOfThreads, ThreadFactory threadFactory, Closure cl ->
Asynchronizer.withAsynchronizer numberOfThreads, threadFactory, cl
}
withExistingAsynchronizer = {ExecutorService pool, Closure cl ->
Asynchronizer.withExistingAsynchronizer pool, cl
}
}
}
//todo allow for destroying the DSL
}