blob: 0770f77516d9ed887b2ca2866d9b860184aa6bea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.vortex.driver;
import org.apache.reef.io.serialization.SerializableCodec;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.util.Optional;
import org.apache.reef.vortex.api.FutureCallback;
import org.apache.reef.vortex.api.VortexFunction;
import org.apache.reef.vortex.api.VortexFuture;
import org.apache.reef.vortex.common.TaskletFailureReport;
import org.apache.reef.vortex.common.TaskletReport;
import org.apache.reef.vortex.common.TaskletResultReport;
import org.apache.reef.vortex.common.WorkerReport;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.*;
/**
* Test whether DefaultVortexMaster correctly handles (simulated) events.
*/
public class DefaultVortexMasterTest {
private static final byte[] EMPTY_RESULT = new byte[0];
private static final byte[] INTEGER_RESULT = new SerializableCodec<Integer>().encode(1);
private TestUtil testUtil = new TestUtil();
/**
* Test handling of single tasklet execution without failure.
*/
@Test(timeout = 10000)
public void testSingleTaskletNoFailure() throws Exception {
final VortexFunction vortexFunction = testUtil.newIntegerFunction();
final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy(),
testUtil.newAggregateFunctionRepository());
final PendingTasklets pendingTasklets = new PendingTasklets();
final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets,
testUtil.newAggregateFunctionRepository(), 5);
final AtomicBoolean callbackReceived = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
vortexMaster.workerAllocated(vortexWorkerManager1);
final FutureCallback<Integer> testCallbackHandler = new FutureCallback<Integer>() {
@Override
public void onSuccess(final Integer integer) {
callbackReceived.set(true);
latch.countDown();
}
@Override
public void onFailure(final Throwable throwable) {
throw new RuntimeException("Did not expect exception in test.", throwable);
}
};
final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null, Optional.of(testCallbackHandler));
final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1);
for (final int taskletId : taskletIds) {
final TaskletReport taskletReport = new TaskletResultReport(taskletId, INTEGER_RESULT);
vortexMaster.workerReported(
vortexWorkerManager1.getId(), new WorkerReport(Collections.singletonList(taskletReport)));
}
assertTrue("The VortexFuture should be done", future.isDone());
latch.await();
assertTrue("Callback should have been received", callbackReceived.get());
}
/**
* Test handling of single tasklet execution with a failure.
*/
@Test(timeout = 10000)
public void testSingleTaskletFailure() throws Exception {
final VortexFunction vortexFunction = testUtil.newFunction();
final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
final VortexWorkerManager vortexWorkerManager2 = testUtil.newWorker();
final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy(),
testUtil.newAggregateFunctionRepository());
final PendingTasklets pendingTasklets = new PendingTasklets();
final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets,
testUtil.newAggregateFunctionRepository(), 5);
// Allocate worker & tasklet and schedule
vortexMaster.workerAllocated(vortexWorkerManager1);
final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null,
Optional.<FutureCallback<Integer>>empty());
final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, pendingTasklets, 1);
// Preemption!
vortexMaster.workerPreempted(vortexWorkerManager1.getId());
assertFalse("The VortexFuture should not be done", future.isDone());
// New resource allocation and scheduling
vortexMaster.workerAllocated(vortexWorkerManager2);
final ArrayList<Integer> taskletIds2 = launchTasklets(runningWorkers, pendingTasklets, 1);
assertEquals("Both lists need to contain the same single tasklet id", taskletIds1, taskletIds2);
// Completed?
for (final int taskletId : taskletIds2) {
final TaskletReport taskletReport = new TaskletResultReport(taskletId, EMPTY_RESULT);
vortexMaster.workerReported(
vortexWorkerManager2.getId(), new WorkerReport(Collections.singletonList(taskletReport)));
}
assertTrue("The VortexFuture should be done", future.isDone());
}
/**
* Test handling of multiple tasklet execution with failures.
*/
@Test(timeout = 10000)
public void testMultipleTaskletsFailure() throws Exception {
// The tasklets that need to be executed
final ArrayList<VortexFuture> vortexFutures = new ArrayList<>();
final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy(),
testUtil.newAggregateFunctionRepository());
final PendingTasklets pendingTasklets = new PendingTasklets();
final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets,
testUtil.newAggregateFunctionRepository(), 5);
// Allocate iniital evaluators (will all be preempted later...)
final List<VortexWorkerManager> initialWorkers = new ArrayList<>();
final int numOfWorkers = 100;
for (int i = 0; i < numOfWorkers; i++) {
final VortexWorkerManager vortexWorkerManager = testUtil.newWorker();
initialWorkers.add(vortexWorkerManager);
vortexMaster.workerAllocated(vortexWorkerManager);
}
// Schedule tasklets
final int numOfTasklets = 100;
for (int i = 0; i < numOfTasklets; i++) {
vortexFutures.add(vortexMaster.enqueueTasklet(testUtil.newFunction(), null,
Optional.<FutureCallback<Void>>empty()));
}
final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, pendingTasklets, numOfTasklets);
// Preempt all evaluators
for (int i = 0; i < numOfWorkers; i++) {
vortexMaster.workerPreempted(initialWorkers.get(i).getId());
}
// Allocate new evaluators and reschedule
for (int i = 0; i < numOfWorkers; i++) {
vortexMaster.workerAllocated(testUtil.newWorker());
}
final ArrayList<Integer> taskletIds2 = launchTasklets(runningWorkers, pendingTasklets, numOfTasklets);
assertEquals("Must contain same tasklet ids", new HashSet<>(taskletIds1), new HashSet<>(taskletIds2));
// Completed?
for (final int taskletId : taskletIds2) {
final String workerId = runningWorkers.getWhereTaskletWasScheduledTo(taskletId);
assertNotNull("The tasklet must have been scheduled", workerId);
final TaskletReport taskletReport = new TaskletResultReport(taskletId, EMPTY_RESULT);
vortexMaster.workerReported(
workerId, new WorkerReport(Collections.singletonList(taskletReport)));
}
for (final VortexFuture vortexFuture : vortexFutures) {
assertTrue("The VortexFuture should be done", vortexFuture.isDone());
}
}
/**
* Test handling of single tasklet execution with a failure.
*/
@Test(timeout = 10000)
public void testTaskletThrowException() throws Exception {
final VortexFunction vortexFunction = testUtil.newIntegerFunction();
final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy(),
testUtil.newAggregateFunctionRepository());
final PendingTasklets pendingTasklets = new PendingTasklets();
final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets,
testUtil.newAggregateFunctionRepository(), 5);
final AtomicBoolean callbackReceived = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
vortexMaster.workerAllocated(vortexWorkerManager1);
final FutureCallback<Integer> testCallbackHandler = new FutureCallback<Integer>() {
@Override
public void onSuccess(final Integer integer) {
throw new RuntimeException("Did not expect success in test.");
}
@Override
public void onFailure(final Throwable throwable) {
callbackReceived.set(true);
latch.countDown();
}
};
final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null, Optional.of(testCallbackHandler));
final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1);
for (final int taskletId : taskletIds) {
final TaskletReport taskletReport = new TaskletFailureReport(taskletId, new RuntimeException("Test exception."));
vortexMaster.workerReported(
vortexWorkerManager1.getId(), new WorkerReport(Collections.singletonList(taskletReport)));
}
assertTrue("The VortexFuture should be done", future.isDone());
latch.await();
assertTrue("Callback should have been received", callbackReceived.get());
}
/**
* Test handling of single tasklet execution with a cancellation after launch.
*/
@Test(timeout = 10000)
public void testSingleTaskletCancellation() throws Exception {
final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy(),
testUtil.newAggregateFunctionRepository());
final PendingTasklets pendingTasklets = new PendingTasklets();
final VortexFuture future = createTaskletCancellationFuture(runningWorkers, pendingTasklets);
launchTasklets(runningWorkers, pendingTasklets, 1);
assertTrue(future.cancel(true));
assertTrue("The VortexFuture should be cancelled.", future.isCancelled());
assertTrue("The VortexFuture should be done", future.isDone());
}
/**
* Test handling of single tasklet execution with a cancellation before launch.
*/
@Test(timeout = 10000)
public void testSingleTaskletCancellationBeforeLaunch() throws Exception {
final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy(),
testUtil.newAggregateFunctionRepository());
final PendingTasklets pendingTasklets = new PendingTasklets();
final VortexFuture future = createTaskletCancellationFuture(runningWorkers, pendingTasklets);
try {
future.cancel(true, 100, TimeUnit.MILLISECONDS);
fail();
} catch (final TimeoutException expected) {
// TimeoutException is expected.
}
launchTasklets(runningWorkers, pendingTasklets, 1);
assertTrue(future.cancel(true));
assertTrue("The VortexFuture should be cancelled.", future.isCancelled());
assertTrue("The VortexFuture should be done", future.isDone());
}
private VortexFuture createTaskletCancellationFuture(
final RunningWorkers runningWorkers, final PendingTasklets pendingTasklets) throws InjectionException {
final VortexFunction vortexFunction = testUtil.newInfiniteLoopFunction();
final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(
runningWorkers, pendingTasklets,
testUtil.newAggregateFunctionRepository(), 5);
final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(vortexMaster);
// Allocate worker & tasklet and schedule
vortexMaster.workerAllocated(vortexWorkerManager1);
return vortexMaster.enqueueTasklet(vortexFunction, null, Optional.<FutureCallback<Integer>>empty());
}
/**
* Launch specified number of tasklets as a substitute for PendingTaskletLauncher.
* @return ids of launched tasklets
*/
private ArrayList<Integer> launchTasklets(final RunningWorkers runningWorkers,
final PendingTasklets pendingTasklets,
final int numOfTasklets) throws InterruptedException {
final ArrayList<Integer> taskletIds = new ArrayList<>();
for (int i = 0; i < numOfTasklets; i++) {
final Tasklet tasklet = pendingTasklets.takeFirst(); // blocks when no tasklet exists
assertNotNull("Tasklet should exist in the pending queue", tasklet);
runningWorkers.launchTasklet(tasklet); // blocks when no resource exists
taskletIds.add(tasklet.getId());
}
return taskletIds;
}
}