package brooklyn.util.task;

import static org.testng.Assert.*

import java.util.Map
import java.util.concurrent.Callable
import java.util.concurrent.CancellationException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.testng.annotations.BeforeMethod
import org.testng.annotations.Test

import com.google.common.base.Throwables;

import brooklyn.management.ExecutionManager
import brooklyn.management.Task
import brooklyn.test.TestUtils
import brooklyn.management.ExpirationPolicy

/**
 * Test the operation of the {@link BasicTask} class.
 *
 * TODO clarify test purpose
 */
public class BasicTaskExecutionTest {
    private static final Logger log = LoggerFactory.getLogger(BasicTaskExecutionTest.class)
 
    private static final int TIMEOUT_MS = 10*1000
    
    private ExecutionManager em
    private Map data

    @BeforeMethod
    public void setUp() {
        em = new BasicExecutionManager()
//        assertTrue em.allTasks.isEmpty()
        data = Collections.synchronizedMap(new HashMap())
        data.clear()
    }
    
    @Test
    public void runSimpleBasicTask() {
        data.clear()
        BasicTask t = [ { data.put(1, "b") } ]
        data.put(1, "a")
        BasicTask t2 = em.submit tag:"A", t
        assertEquals("a", t.get())
        assertEquals("b", data.get(1))
    }
    
    @Test
    public void runSimpleRunnable() {
        data.clear()
        data.put(1, "a")
        BasicTask t = em.submit tag:"A", new Runnable() { public void run() { data.put(1, "b") } }
        assertEquals(null, t.get())
        assertEquals("b", data.get(1))
    }

    @Test
    public void runSimpleCallable() {
        data.clear()
        data.put(1, "a")
        BasicTask t = em.submit tag:"A", new Callable() { public Object call() { data.put(1, "b") } }
        assertEquals("a", t.get())
        assertEquals("b", data.get(1))
    }

    @Test
    public void runBasicTaskWithWaits() {
        CountDownLatch signalStarted = new CountDownLatch(1);
        CountDownLatch allowCompletion = new CountDownLatch(1);
        data.clear()
        BasicTask t = [ {
            def result = data.put(1, "b")
            signalStarted.countDown();
            assertTrue(allowCompletion.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
            result
        } ]
        data.put(1, "a")

        BasicTask t2 = em.submit tag:"A", t
        assertEquals(t, t2)
        assertFalse(t.isDone())
        
        assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
        assertEquals("b", data.get(1))
        assertFalse(t.isDone())
        
        log.debug "runBasicTaskWithWaits, BasicTask status: {}", t.getStatusDetail(false)
        
        TestUtils.executeUntilSucceeds { t.getStatusDetail(false).toLowerCase().contains("waiting") }
        // "details="+t.getStatusDetail(false))
        
        allowCompletion.countDown();
        assertEquals("a", t.get())
    }

    @Test
    public void runMultipleBasicTasks() {
        data.clear()
        data.put(1, 1)
        BasicExecutionManager em = []
        2.times { em.submit expirationPolicy: ExpirationPolicy.NEVER, tag:"A", new BasicTask({ synchronized(data) { data.put(1, data.get(1)+1) } }) }
        2.times { em.submit expirationPolicy: ExpirationPolicy.NEVER, tag:"B", new BasicTask({ synchronized(data) { data.put(1, data.get(1)+1) } }) }
        int total = 0;
        em.getTaskTags().each {
                log.debug "tag {}", it
                em.getTasksWithTag(it).each {
                    log.debug "BasicTask {}, has {}", it, it.get()
                    total += it.get()
                }
            }
        assertEquals(10, total)
        //now that all have completed:
        assertEquals(5, data.get(1))
    }

    @Test
    public void runMultipleBasicTasksMultipleTags() {
        data.clear()
        data.put(1, 1)
        Collection<Task> tasks = []
        tasks += em.submit expirationPolicy: ExpirationPolicy.NEVER, tag:"A", new BasicTask({ synchronized(data) { data.put(1, data.get(1)+1) } })
        tasks += em.submit expirationPolicy: ExpirationPolicy.NEVER, tags:["A","B"], new BasicTask({ synchronized(data) { data.put(1, data.get(1)+1) } })
        tasks += em.submit expirationPolicy: ExpirationPolicy.NEVER, tags:["B","C"], new BasicTask({ synchronized(data) { data.put(1, data.get(1)+1) } })
        tasks += em.submit expirationPolicy: ExpirationPolicy.NEVER, tags:["D"], new BasicTask({ synchronized(data) { data.put(1, data.get(1)+1) } })
        int total = 0;

        tasks.each { Task t ->
                log.debug "BasicTask {}, has {}", t, t.get()
                total += t.get()
            }
        assertEquals(10, total)
 
        //now that all have completed:
        assertEquals data.get(1), 5
        assertEquals em.getTasksWithTag("A").size(), 2
        assertEquals em.getTasksWithAnyTag(["A"]).size(), 2
        assertEquals em.getTasksWithAllTags(["A"]).size(), 2

        assertEquals em.getTasksWithAnyTag(["A", "B"]).size(), 3
        assertEquals em.getTasksWithAllTags(["A", "B"]).size(), 1
        assertEquals em.getTasksWithAllTags(["B", "C"]).size(), 1
        assertEquals em.getTasksWithAnyTag(["A", "D"]).size(), 3
    }

    @Test
    public void testRetrievingTasksWithTagsReturnsExpectedTask() {
        Task t = new BasicTask({ /*no-op*/ })
        em.submit expirationPolicy: ExpirationPolicy.NEVER, tag:"A",t
        t.get();

        assertEquals(em.getTasksWithTag("A"), [t]);
        assertEquals(em.getTasksWithAnyTag(["A"]), [t]);
        assertEquals(em.getTasksWithAnyTag(["A","B"]), [t]);
        assertEquals(em.getTasksWithAllTags(["A"]), [t]);
    }

    @Test
    public void testRetrievingTasksWithTagsExcludesNonMatchingTasks() {
        Task t = new BasicTask({ /*no-op*/ })
        em.submit tag:"A",t
        t.get();

        assertEquals(em.getTasksWithTag("B"), []);
        assertEquals(em.getTasksWithAnyTag(["B"]), []);
        assertEquals(em.getTasksWithAllTags(["A","B"]), []);
    }
    
    @Test
    public void testRetrievingTasksWithMultipleTags() {
        Task t = new BasicTask({ /*no-op*/ })
        em.submit expirationPolicy: ExpirationPolicy.NEVER, tags:["A","B"], t
        t.get();

        assertEquals(em.getTasksWithTag("A"), [t]);
        assertEquals(em.getTasksWithTag("B"), [t]);
        assertEquals(em.getTasksWithAnyTag(["A"]), [t]);
        assertEquals(em.getTasksWithAnyTag(["B"]), [t]);
        assertEquals(em.getTasksWithAnyTag(["A","B"]), [t]);
        assertEquals(em.getTasksWithAllTags(["A","B"]), [t]);
        assertEquals(em.getTasksWithAllTags(["A"]), [t]);
        assertEquals(em.getTasksWithAllTags(["B"]), [t]);
    }

    // ENGR-1796: if nothing matched first tag, then returned whatever matched second tag!
    @Test
    public void testRetrievingTasksWithAllTagsWhenFirstNotMatched() {
        Task t = new BasicTask({ /*no-op*/ })
        em.submit tags:["A"], t
        t.get();

        assertEquals(em.getTasksWithAllTags(["not_there","A"]), []);
    }
    
    @Test
    public void testRetrievedTasksIncludesTasksInProgress() {
        CountDownLatch runningLatch = new CountDownLatch(1);
        CountDownLatch finishLatch = new CountDownLatch(1);
        Task t = new BasicTask({ runningLatch.countDown(); finishLatch.await() })
        em.submit tags:["A"], t
        
        try {
            runningLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS);
    
            assertEquals(em.getTasksWithTag("A"), [t]);
        } finally {
            finishLatch.countDown();
        }
    }
    
    @Test
    public void cancelBeforeRun() {
        CountDownLatch blockForever = new CountDownLatch(1);
        
        BasicTask t = [ { blockForever.await(); return 42 } ]
        t.cancel true
        assertTrue(t.isCancelled())
        assertTrue(t.isDone())
        assertTrue(t.isError())
        em.submit tag:"A", t
        try { t.get(); fail("get should have failed due to cancel"); } catch (CancellationException e) {}
        assertTrue(t.isCancelled())
        assertTrue(t.isDone())
        assertTrue(t.isError())
        
        log.debug "cancelBeforeRun status: {}", t.getStatusDetail(false)
        assertTrue(t.getStatusDetail(false).toLowerCase().contains("cancel"))
    }

    @Test
    public void cancelDuringRun() {
        CountDownLatch signalStarted = new CountDownLatch(1);
        CountDownLatch blockForever = new CountDownLatch(1);
        
        BasicTask t = [ { synchronized (data) { signalStarted.countDown(); blockForever.await() }; return 42 } ]
        em.submit tag:"A", t
        assertFalse(t.isCancelled())
        assertFalse(t.isDone())
        assertFalse(t.isError())
        
        assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
        t.cancel true
        
        assertTrue(t.isCancelled())
        assertTrue(t.isError())
        try { t.get(); fail("get should have failed due to cancel"); } catch (CancellationException e) {}
        assertTrue(t.isCancelled())
        assertTrue(t.isDone())
        assertTrue(t.isError())
    }
    
    @Test
    public void cancelAfterRun() {
        BasicTask t = [ { return 42 } ]
        em.submit tag:"A", t

        assertEquals(42, t.get());
        t.cancel true
        assertFalse(t.isCancelled())
        assertFalse(t.isError())
        assertTrue(t.isDone())
    }
    
    @Test
    public void errorDuringRun() {
        BasicTask t = [ { throw new IllegalStateException("Aaargh"); } ]
        
        em.submit tag:"A", t
        
        try { t.get(); fail("get should have failed due to error"); } catch (Exception eo) { Exception e = Throwables.getRootCause(eo); assertEquals("Aaargh", e.getMessage()) }
        
        assertFalse(t.isCancelled())
        assertTrue(t.isError())
        assertTrue(t.isDone())
        
        log.debug "errorDuringRun status: {}", t.getStatusDetail(false)
        assertTrue(t.getStatusDetail(false).contains("Aaargh"), "details="+t.getStatusDetail(false))
    }

    @Test
    public void fieldsSetForSimpleBasicTask() {
        CountDownLatch signalStarted = new CountDownLatch(1);
        CountDownLatch allowCompletion = new CountDownLatch(1);
        
        BasicTask t = [ { signalStarted.countDown(); allowCompletion.await(); return 42 } ]
        assertEquals(null, t.submittedByTask)
        assertEquals(-1, t.submitTimeUtc)
        assertNull(t.getResult())

        em.submit tag:"A", t
        assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
        
        assertTrue(t.submitTimeUtc > 0)
        assertTrue(t.startTimeUtc >= t.submitTimeUtc)
        assertNotNull(t.getResult())
        assertEquals(-1, t.endTimeUtc)
        assertEquals(false, t.isCancelled())
        
        allowCompletion.countDown()
        assertEquals(42, t.get())
        assertTrue(t.endTimeUtc >= t.startTimeUtc)

        log.debug "BasicTask duration (millis): {}", (t.endTimeUtc - t.submitTimeUtc)
    }

    @Test
    public void fieldsSetForBasicTaskSubmittedBasicTask() {
        //submitted BasicTask B is started by A, and waits for A to complete
        BasicTask t = new BasicTask( displayName: "sample", description: "some descr", {
                em.submit expirationPolicy: ExpirationPolicy.NEVER, tag:"B", {
                assertEquals(45, em.getTasksWithTag("A").iterator().next().get());
                46 };
            45 } )
        em.submit expirationPolicy: ExpirationPolicy.NEVER, tag:"A", t

        t.blockUntilEnded()
 
//        assertEquals em.getAllTasks().size(), 2
        
        BasicTask tb = em.getTasksWithTag("B").iterator().next();
        assertEquals( 46, tb.get() )
        assertEquals( t, em.getTasksWithTag("A").iterator().next() )
        assertNull( t.submittedByTask )
        
        BasicTask submitter = tb.submittedByTask;
        assertNotNull(submitter)
        assertEquals("sample", submitter.displayName)
        assertEquals("some descr", submitter.description)
        assertEquals(t, submitter)
        
        assertTrue(submitter.submitTimeUtc <= tb.submitTimeUtc)
        assertTrue(submitter.endTimeUtc <= tb.endTimeUtc)
        
        log.debug "BasicTask {} was submitted by {}", tb, submitter
    }
    
    // Previously, when we used a CopyOnWriteArraySet, performance for submitting new tasks was
    // terrible, and it degraded significantly as the number of previously executed tasks increased
    // (e.g. 9s for first 1000; 26s for next 1000; 42s for next 1000).
    @Test
    public void testExecutionManagerPerformance() {
        final int NUM_TASKS = 1000
        final int NUM_TIMES = 10
        final int MAX_ACCEPTABLE_TIME = 2500
        
        long tWarmup = execTasksAndWaitForDone(NUM_TASKS, ["A"])
        
        List<Long> times = []
        for (i in 1..NUM_TIMES) {
            times << execTasksAndWaitForDone(NUM_TASKS, ["A"])
        }
        
        assertNull( times.find({ it > MAX_ACCEPTABLE_TIME}), "warmup=$tWarmup; times=$times")
    }
    
    private long execTasksAndWaitForDone(int numTasks, List tags) {
        List<Task> tasks = []
        long startTimestamp = System.currentTimeMillis()
        for (i in 1..numTasks) {
            Task t = new BasicTask({ /*no-op*/ })
            em.submit tags:tags, t
            tasks << t
        }
        long submittedTimestamp = System.currentTimeMillis()

        for (Task t in tasks) {
            t.get();
        }
        long endTimestamp = System.currentTimeMillis()
        long submitTime = submittedTimestamp - startTimestamp
        long totalTime = endTimestamp - startTimestamp
        
        println "Executed $numTasks tasks; ${totalTime}ms total; ${submitTime}ms to submit"

        return totalTime
    }
}
