package org.gparallelizer

import java.util.concurrent.Callable
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicBoolean
import java.lang.Thread.UncaughtExceptionHandler

/**
 * This class forms the core of the DSL initialized by <i>Asynchronizer</i>. The static methods of <i>AsyncInvokerUtil</i>
 * get attached to their first arguments (the Groovy Category mechanism) and can be then invoked as if they were part of
 * the argument classes.
 * @see org.gparallelizer.Asynchronizer
 *
 * @author Vaclav Pech
 * Date: Oct 23, 2008
 */
public class AsyncInvokerUtil {
    /**
     * Shedules the supplied closure for processing in the underlying thread pool.
     */
    private static Future callAsync(Closure task) {
        final ExecutorService pool = Asynchronizer.retrieveCurrentPool()
        if (!pool) throw new IllegalStateException("No ExecutorService available for the current thread.")
        return pool.submit(task as Callable)
    }

    /**
     * Calls a closure in a separate thread supplying the given arguments, returning a future for the potential return value,
     */
    public static Future callAsync(Closure cl, Object ... args) {
        callAsync {cl(args)}
    }

    /**
     * Submits the task for asynchronous processing returning the Future received from the executor service.
     * Allows for the followitn syntax:
     * <pre>
     * executorService << {println 'Inside parallel task'}* </pre>
     */
    public static Future leftShift(ExecutorService executorService, Closure task) {
        return executorService.submit(task as Callable)
    }

    /**
     * Creates an asynchronous variant of the supplied closure, which, when invoked returns a future for the potential return value
     */
    public static Closure async(Closure cl) {
        return {Object ... args -> callAsync(cl, args)}
    }

    /**
     * Starts multiple closures in separate threads, collecting their return values
     * If an exception is thrown from the closure when called on any of the collection's elements,
     * it will be rethrown in the calling thread when it calls the Future.get() method.
     * @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
     */
    public static List<Object> doInParallel(Closure ... closures) {
        return processResult(executeInParallel(closures))
    }

    /**
     * Starts multiple closures in separate threads, collecting Futures for their return values
     * If an exception is thrown from the closure when called on any of the collection's elements,
     * it will be rethrown in the calling thread when it calls the Future.get() method.
     */
    public static List<Future<Object>> executeInParallel(Closure ... closures) {
        Asynchronizer.withAsynchronizer(closures.size()) {ExecutorService executorService ->
            List<Future<Object>> result = closures.collect {cl ->
                return executorService.submit({
                        return cl.call()
                    } as Callable<Object>)
            }
            return result
        }
    }

    /**
     * Starts multiple closures in separate threads, using a new thread for the startup.
     * If any of the collection's elements causes the closure to throw an exception, an AsyncException is reported using System.err.
     * The original exceptions will be stored in the AsyncException's concurrentExceptions field.
     */
    public static void startInParallel(Closure ... closures) {
        startInParallel(createDefaultUncaughtExceptionHandler(), closures)
    }

    /**
     * Starts multiple closures in separate threads, using a new thread for the startup.
     * If any of the collection's elements causes the closure to throw an exception, an AsyncException is reported to the supplied instance of UncaughtExceptionHandler.
     * The original exceptions will be stored in the AsyncException's concurrentExceptions field.
     * @return The thread that submits the closures to the thread executor service so that the caller can take ownership of it and e.g. call <i>join()</i> on it to wait for all the closures to finish processing.
     */
    public static Thread startInParallel(java.lang.Thread.UncaughtExceptionHandler uncaughtExceptionHandler, Closure ... closures) {
        final Thread thread = new Thread({
                doInParallel(closures)
            } as Runnable)
        thread.daemon = false
        thread.uncaughtExceptionHandler = uncaughtExceptionHandler
        thread.start()
        return thread
    }

    /**
     * Iterates over a collection/object with the <i>each()</i> method using an asynchronous variant of the supplied closure
     * to evaluate each collection's element. A CountDownLatch is used to make the calling thread wait for all the results.
     * After this method returns, all the closures have been finished and all the potential shared resources have been updated
     * by the threads.
     * It's important to protect any shared resources used by the supplied closure from race conditions caused by multi-threaded access.
     * Example:
     * Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
     * def result = Collections.synchronizedSet(new HashSet())
     * service.eachAsync([1, 2, 3, 4, 5]) {Number number -> result.add(number * 10)}* assertEquals(new HashSet([10, 20, 30, 40, 50]), result)
     *}* Note that the <i>result</i> variable is synchronized to prevent race conditions between multiple threads.
     * Alternatively a DSL can be used to simplify the code. All collections/objects within the <i>withAsynchronizer</i> block
     * have a new <i>eachAsync(Closure cl)</i> method, which delegates to the <i>AsyncInvokerUtil</i> class.
     * Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
     * def result = Collections.synchronizedSet(new HashSet())
     * [1, 2, 3, 4, 5].eachAsync { Number number -> result.add(number * 10) }* assertEquals(new HashSet([10, 20, 30, 40, 50]), result)
     *}* @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
     */
    public static def eachAsync(Object collection, Closure cl) {
        final CountDownLatch latch = new CountDownLatch(collection.size());
        final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>())

        def result = collection.each(async({Object ... args ->
                    try {
                        cl(args)
                    } catch (Throwable e) {
                        exceptions.add(e)
                    } finally {
                        latch.countDown()
                    }
                }))
        latch.await()
        if (!exceptions.empty) throw new AsyncException("Some asynchronous operations failed. ${exceptions}", exceptions)
        else return result
    }

    /**
     * Iterates over a collection/object with the <i>collect()</i> method using an asynchronous variant of the supplied closure
     * to evaluate each collection's element.
     * After this method returns, all the closures have been finished and the caller can safely use the result.
     * It's important to protect any shared resources used by the supplied closure from race conditions caused by multi-threaded access.
     * Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
     * def result = service.collectAsync([1, 2, 3, 4, 5]){Number number -> number * 10}* assertEquals(new HashSet([10, 20, 30, 40, 50]), new HashSet((Collection)result))
     *}*
     * Alternatively a DSL can be used to simplify the code. All collections/objects within the <i>withAsynchronizer</i> block
     * have a new <i>collectAsync(Closure cl)</i> method, which delegates to the <i>AsyncInvokerUtil</i> class.
     * Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
     * def result = [1, 2, 3, 4, 5].collectAsync{Number number -> number * 10}* assertEquals(new HashSet([10, 20, 30, 40, 50]), new HashSet((Collection)result))
     *}* @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
     */
    public static def collectAsync(Object collection, Closure cl) {
        return processResult(collection.collect(async(cl)))
    }

    /**
     * Performs the <i>findAll()</i> operation using an asynchronous variant of the supplied closure
     * to evaluate each collection's/object's element.
     * After this method returns, all the closures have been finished and the caller can safely use the result.
     * It's important to protect any shared resources used by the supplied closure from race conditions caused by multi-threaded access.
     * Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
     * def result = service.findAllAsync([1, 2, 3, 4, 5]){Number number -> number > 2}* assertEquals(new HashSet([3, 4, 5]), new HashSet((Collection)result))
     *}*
     * Alternatively a DSL can be used to simplify the code. All collections/objects within the <i>withAsynchronizer</i> block
     * have a new <i>findAllAsync(Closure cl)</i> method, which delegates to the <i>AsyncInvokerUtil</i> class.
     * Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
     * def result = [1, 2, 3, 4, 5].findAllAsync{Number number -> number > 2}* assertEquals(new HashSet([3, 4, 5]), new HashSet((Collection)result))
     *}* @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
     */
    public static def findAllAsync(Object collection, Closure cl) {
        collectAsync(collection, {if (cl(it)) return it else return null}).findAll {it != null}
    }

    /**
     * Performs the <i>find()</i> operation using an asynchronous variant of the supplied closure
     * to evaluate each collection's/object's element.
     * After this method returns, all the closures have been finished and the caller can safely use the result.
     * It's important to protect any shared resources used by the supplied closure from race conditions caused by multi-threaded access.
     * Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
     * def result = service.findAsync([1, 2, 3, 4, 5]){Number number -> number > 2}* assert result in [3, 4, 5]
     *}*
     * Alternatively a DSL can be used to simplify the code. All collections/objects within the <i>withAsynchronizer</i> block
     * have a new <i>findAllAsync(Closure cl)</i> method, which delegates to the <i>AsyncInvokerUtil</i> class.
     * Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
     * def result = [1, 2, 3, 4, 5].findAsync{Number number -> number > 2}* assert result in [3, 4, 5]
     *}* @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
     */
    public static def findAsync(Object collection, Closure cl) {
        collectAsync(collection, {if (cl(it)) return it else return null}).find {it != null}
    }

    /**
     * Performs the <i>all()</i> operation using an asynchronous variant of the supplied closure
     * to evaluate each collection's/object's element.
     * After this method returns, all the closures have been finished and the caller can safely use the result.
     * It's important to protect any shared resources used by the supplied closure from race conditions caused by multi-threaded access.
     * Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
     * assert service.allAsync([1, 2, 3, 4, 5]){Number number -> number > 0}* assert !service.allAsync([1, 2, 3, 4, 5]){Number number -> number > 2}*}*
     * Alternatively a DSL can be used to simplify the code. All collections/objects within the <i>withAsynchronizer</i> block
     * have a new <i>findAllAsync(Closure cl)</i> method, which delegates to the <i>AsyncInvokerUtil</i> class.
     * Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
     * assert [1, 2, 3, 4, 5].allAsync{Number number -> number > 0}* assert ![1, 2, 3, 4, 5].allAsync{Number number -> number > 2}*}* @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
     */
    public static boolean allAsync(Object collection, Closure cl) {
        final AtomicBoolean flag = new AtomicBoolean(true)
        eachAsync(collection, {value -> if (!cl(value)) flag.set(false)})
        return flag.get()
    }


    /**
     * Performs the <i>any()</i> operation using an asynchronous variant of the supplied closure
     * to evaluate each collection's/object's element.
     * After this method returns, all the closures have been finished and the caller can safely use the result.
     * It's important to protect any shared resources used by the supplied closure from race conditions caused by multi-threaded access.
     * Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
     * assert service.anyAsync([1, 2, 3, 4, 5]){Number number -> number > 2}* assert !service.anyAsync([1, 2, 3, 4, 5]){Number number -> number > 6}*}*
     * Alternatively a DSL can be used to simplify the code. All collections/objects within the <i>withAsynchronizer</i> block
     * have a new <i>findAllAsync(Closure cl)</i> method, which delegates to the <i>AsyncInvokerUtil</i> class.
     * Asynchronizer.withAsynchronizer(5) {ExecutorService service ->
     * assert [1, 2, 3, 4, 5].anyAsync{Number number -> number > 2}* assert ![1, 2, 3, 4, 5].anyAsync{Number number -> number > 6}*}* @throws AsyncException If any of the collection's elements causes the closure to throw an exception. The original exceptions will be stored in the AsyncException's concurrentExceptions field.
     */
    public static boolean anyAsync(Object collection, Closure cl) {
        final AtomicBoolean flag = new AtomicBoolean(false)
        eachAsync(collection, {if (cl(it)) flag.set(true)})
        return flag.get()
    }

    private static List<Object> processResult(List<Future<Object>> futures) {
        final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>())

        final List<Object> result = futures.collect {
            try {
                return it.get()
            } catch (Throwable e) {
                exceptions.add(e)
                return e
            }
        }

        if (!exceptions.empty) throw new AsyncException("Some asynchronous operations failed. ${exceptions}", exceptions)
        else return result
    }

    private static UncaughtExceptionHandler createDefaultUncaughtExceptionHandler() {
        return {Thread failedThread, Throwable throwable ->
            System.err.println "Error processing background thread ${failedThread.name}: ${throwable.message}"
            throwable.printStackTrace(System.err)
        } as UncaughtExceptionHandler
    }
}
