blob: 320d999f50d1ba9252ce56c10c4007ebf49ac127 [file] [log] [blame]
package brooklyn.util.task;
import static org.testng.Assert.*
import java.util.Map
import java.util.concurrent.Callable
import java.util.concurrent.CancellationException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.testng.annotations.BeforeMethod
import org.testng.annotations.Test
import com.google.common.base.Throwables;
import brooklyn.management.ExecutionManager
import brooklyn.management.Task
import brooklyn.test.TestUtils
import brooklyn.management.ExpirationPolicy
/**
* Test the operation of the {@link BasicTask} class.
*
* TODO clarify test purpose
*/
public class BasicTaskExecutionTest {
private static final Logger log = LoggerFactory.getLogger(BasicTaskExecutionTest.class)
private static final int TIMEOUT_MS = 10*1000
private ExecutionManager em
private Map data
@BeforeMethod
public void setUp() {
em = new BasicExecutionManager()
// assertTrue em.allTasks.isEmpty()
data = Collections.synchronizedMap(new HashMap())
data.clear()
}
@Test
public void runSimpleBasicTask() {
data.clear()
BasicTask t = [ { data.put(1, "b") } ]
data.put(1, "a")
BasicTask t2 = em.submit tag:"A", t
assertEquals("a", t.get())
assertEquals("b", data.get(1))
}
@Test
public void runSimpleRunnable() {
data.clear()
data.put(1, "a")
BasicTask t = em.submit tag:"A", new Runnable() { public void run() { data.put(1, "b") } }
assertEquals(null, t.get())
assertEquals("b", data.get(1))
}
@Test
public void runSimpleCallable() {
data.clear()
data.put(1, "a")
BasicTask t = em.submit tag:"A", new Callable() { public Object call() { data.put(1, "b") } }
assertEquals("a", t.get())
assertEquals("b", data.get(1))
}
@Test
public void runBasicTaskWithWaits() {
CountDownLatch signalStarted = new CountDownLatch(1);
CountDownLatch allowCompletion = new CountDownLatch(1);
data.clear()
BasicTask t = [ {
def result = data.put(1, "b")
signalStarted.countDown();
assertTrue(allowCompletion.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
result
} ]
data.put(1, "a")
BasicTask t2 = em.submit tag:"A", t
assertEquals(t, t2)
assertFalse(t.isDone())
assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertEquals("b", data.get(1))
assertFalse(t.isDone())
log.debug "runBasicTaskWithWaits, BasicTask status: {}", t.getStatusDetail(false)
TestUtils.executeUntilSucceeds { t.getStatusDetail(false).toLowerCase().contains("waiting") }
// "details="+t.getStatusDetail(false))
allowCompletion.countDown();
assertEquals("a", t.get())
}
@Test
public void runMultipleBasicTasks() {
data.clear()
data.put(1, 1)
BasicExecutionManager em = []
2.times { em.submit expirationPolicy: ExpirationPolicy.NEVER, tag:"A", new BasicTask({ synchronized(data) { data.put(1, data.get(1)+1) } }) }
2.times { em.submit expirationPolicy: ExpirationPolicy.NEVER, tag:"B", new BasicTask({ synchronized(data) { data.put(1, data.get(1)+1) } }) }
int total = 0;
em.getTaskTags().each {
log.debug "tag {}", it
em.getTasksWithTag(it).each {
log.debug "BasicTask {}, has {}", it, it.get()
total += it.get()
}
}
assertEquals(10, total)
//now that all have completed:
assertEquals(5, data.get(1))
}
@Test
public void runMultipleBasicTasksMultipleTags() {
data.clear()
data.put(1, 1)
Collection<Task> tasks = []
tasks += em.submit expirationPolicy: ExpirationPolicy.NEVER, tag:"A", new BasicTask({ synchronized(data) { data.put(1, data.get(1)+1) } })
tasks += em.submit expirationPolicy: ExpirationPolicy.NEVER, tags:["A","B"], new BasicTask({ synchronized(data) { data.put(1, data.get(1)+1) } })
tasks += em.submit expirationPolicy: ExpirationPolicy.NEVER, tags:["B","C"], new BasicTask({ synchronized(data) { data.put(1, data.get(1)+1) } })
tasks += em.submit expirationPolicy: ExpirationPolicy.NEVER, tags:["D"], new BasicTask({ synchronized(data) { data.put(1, data.get(1)+1) } })
int total = 0;
tasks.each { Task t ->
log.debug "BasicTask {}, has {}", t, t.get()
total += t.get()
}
assertEquals(10, total)
//now that all have completed:
assertEquals data.get(1), 5
assertEquals em.getTasksWithTag("A").size(), 2
assertEquals em.getTasksWithAnyTag(["A"]).size(), 2
assertEquals em.getTasksWithAllTags(["A"]).size(), 2
assertEquals em.getTasksWithAnyTag(["A", "B"]).size(), 3
assertEquals em.getTasksWithAllTags(["A", "B"]).size(), 1
assertEquals em.getTasksWithAllTags(["B", "C"]).size(), 1
assertEquals em.getTasksWithAnyTag(["A", "D"]).size(), 3
}
@Test
public void testRetrievingTasksWithTagsReturnsExpectedTask() {
Task t = new BasicTask({ /*no-op*/ })
em.submit expirationPolicy: ExpirationPolicy.NEVER, tag:"A",t
t.get();
assertEquals(em.getTasksWithTag("A"), [t]);
assertEquals(em.getTasksWithAnyTag(["A"]), [t]);
assertEquals(em.getTasksWithAnyTag(["A","B"]), [t]);
assertEquals(em.getTasksWithAllTags(["A"]), [t]);
}
@Test
public void testRetrievingTasksWithTagsExcludesNonMatchingTasks() {
Task t = new BasicTask({ /*no-op*/ })
em.submit tag:"A",t
t.get();
assertEquals(em.getTasksWithTag("B"), []);
assertEquals(em.getTasksWithAnyTag(["B"]), []);
assertEquals(em.getTasksWithAllTags(["A","B"]), []);
}
@Test
public void testRetrievingTasksWithMultipleTags() {
Task t = new BasicTask({ /*no-op*/ })
em.submit expirationPolicy: ExpirationPolicy.NEVER, tags:["A","B"], t
t.get();
assertEquals(em.getTasksWithTag("A"), [t]);
assertEquals(em.getTasksWithTag("B"), [t]);
assertEquals(em.getTasksWithAnyTag(["A"]), [t]);
assertEquals(em.getTasksWithAnyTag(["B"]), [t]);
assertEquals(em.getTasksWithAnyTag(["A","B"]), [t]);
assertEquals(em.getTasksWithAllTags(["A","B"]), [t]);
assertEquals(em.getTasksWithAllTags(["A"]), [t]);
assertEquals(em.getTasksWithAllTags(["B"]), [t]);
}
// ENGR-1796: if nothing matched first tag, then returned whatever matched second tag!
@Test
public void testRetrievingTasksWithAllTagsWhenFirstNotMatched() {
Task t = new BasicTask({ /*no-op*/ })
em.submit tags:["A"], t
t.get();
assertEquals(em.getTasksWithAllTags(["not_there","A"]), []);
}
@Test
public void testRetrievedTasksIncludesTasksInProgress() {
CountDownLatch runningLatch = new CountDownLatch(1);
CountDownLatch finishLatch = new CountDownLatch(1);
Task t = new BasicTask({ runningLatch.countDown(); finishLatch.await() })
em.submit tags:["A"], t
try {
runningLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertEquals(em.getTasksWithTag("A"), [t]);
} finally {
finishLatch.countDown();
}
}
@Test
public void cancelBeforeRun() {
CountDownLatch blockForever = new CountDownLatch(1);
BasicTask t = [ { blockForever.await(); return 42 } ]
t.cancel true
assertTrue(t.isCancelled())
assertTrue(t.isDone())
assertTrue(t.isError())
em.submit tag:"A", t
try { t.get(); fail("get should have failed due to cancel"); } catch (CancellationException e) {}
assertTrue(t.isCancelled())
assertTrue(t.isDone())
assertTrue(t.isError())
log.debug "cancelBeforeRun status: {}", t.getStatusDetail(false)
assertTrue(t.getStatusDetail(false).toLowerCase().contains("cancel"))
}
@Test
public void cancelDuringRun() {
CountDownLatch signalStarted = new CountDownLatch(1);
CountDownLatch blockForever = new CountDownLatch(1);
BasicTask t = [ { synchronized (data) { signalStarted.countDown(); blockForever.await() }; return 42 } ]
em.submit tag:"A", t
assertFalse(t.isCancelled())
assertFalse(t.isDone())
assertFalse(t.isError())
assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
t.cancel true
assertTrue(t.isCancelled())
assertTrue(t.isError())
try { t.get(); fail("get should have failed due to cancel"); } catch (CancellationException e) {}
assertTrue(t.isCancelled())
assertTrue(t.isDone())
assertTrue(t.isError())
}
@Test
public void cancelAfterRun() {
BasicTask t = [ { return 42 } ]
em.submit tag:"A", t
assertEquals(42, t.get());
t.cancel true
assertFalse(t.isCancelled())
assertFalse(t.isError())
assertTrue(t.isDone())
}
@Test
public void errorDuringRun() {
BasicTask t = [ { throw new IllegalStateException("Aaargh"); } ]
em.submit tag:"A", t
try { t.get(); fail("get should have failed due to error"); } catch (Exception eo) { Exception e = Throwables.getRootCause(eo); assertEquals("Aaargh", e.getMessage()) }
assertFalse(t.isCancelled())
assertTrue(t.isError())
assertTrue(t.isDone())
log.debug "errorDuringRun status: {}", t.getStatusDetail(false)
assertTrue(t.getStatusDetail(false).contains("Aaargh"), "details="+t.getStatusDetail(false))
}
@Test
public void fieldsSetForSimpleBasicTask() {
CountDownLatch signalStarted = new CountDownLatch(1);
CountDownLatch allowCompletion = new CountDownLatch(1);
BasicTask t = [ { signalStarted.countDown(); allowCompletion.await(); return 42 } ]
assertEquals(null, t.submittedByTask)
assertEquals(-1, t.submitTimeUtc)
assertNull(t.getResult())
em.submit tag:"A", t
assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertTrue(t.submitTimeUtc > 0)
assertTrue(t.startTimeUtc >= t.submitTimeUtc)
assertNotNull(t.getResult())
assertEquals(-1, t.endTimeUtc)
assertEquals(false, t.isCancelled())
allowCompletion.countDown()
assertEquals(42, t.get())
assertTrue(t.endTimeUtc >= t.startTimeUtc)
log.debug "BasicTask duration (millis): {}", (t.endTimeUtc - t.submitTimeUtc)
}
@Test
public void fieldsSetForBasicTaskSubmittedBasicTask() {
//submitted BasicTask B is started by A, and waits for A to complete
BasicTask t = new BasicTask( displayName: "sample", description: "some descr", {
em.submit expirationPolicy: ExpirationPolicy.NEVER, tag:"B", {
assertEquals(45, em.getTasksWithTag("A").iterator().next().get());
46 };
45 } )
em.submit expirationPolicy: ExpirationPolicy.NEVER, tag:"A", t
t.blockUntilEnded()
// assertEquals em.getAllTasks().size(), 2
BasicTask tb = em.getTasksWithTag("B").iterator().next();
assertEquals( 46, tb.get() )
assertEquals( t, em.getTasksWithTag("A").iterator().next() )
assertNull( t.submittedByTask )
BasicTask submitter = tb.submittedByTask;
assertNotNull(submitter)
assertEquals("sample", submitter.displayName)
assertEquals("some descr", submitter.description)
assertEquals(t, submitter)
assertTrue(submitter.submitTimeUtc <= tb.submitTimeUtc)
assertTrue(submitter.endTimeUtc <= tb.endTimeUtc)
log.debug "BasicTask {} was submitted by {}", tb, submitter
}
// Previously, when we used a CopyOnWriteArraySet, performance for submitting new tasks was
// terrible, and it degraded significantly as the number of previously executed tasks increased
// (e.g. 9s for first 1000; 26s for next 1000; 42s for next 1000).
@Test
public void testExecutionManagerPerformance() {
final int NUM_TASKS = 1000
final int NUM_TIMES = 10
final int MAX_ACCEPTABLE_TIME = 2500
long tWarmup = execTasksAndWaitForDone(NUM_TASKS, ["A"])
List<Long> times = []
for (i in 1..NUM_TIMES) {
times << execTasksAndWaitForDone(NUM_TASKS, ["A"])
}
assertNull( times.find({ it > MAX_ACCEPTABLE_TIME}), "warmup=$tWarmup; times=$times")
}
private long execTasksAndWaitForDone(int numTasks, List tags) {
List<Task> tasks = []
long startTimestamp = System.currentTimeMillis()
for (i in 1..numTasks) {
Task t = new BasicTask({ /*no-op*/ })
em.submit tags:tags, t
tasks << t
}
long submittedTimestamp = System.currentTimeMillis()
for (Task t in tasks) {
t.get();
}
long endTimestamp = System.currentTimeMillis()
long submitTime = submittedTimestamp - startTimestamp
long totalTime = endTimestamp - startTimestamp
println "Executed $numTasks tasks; ${totalTime}ms total; ${submitTime}ms to submit"
return totalTime
}
}