blob: 110143fd5275d2f00624405e22ba410c7a32106f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.vortex.driver;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.vortex.common.AggregateFunctionRepository;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Test Possible Race Conditions.
*/
public class RunningWorkersTest {
private final TestUtil testUtil = new TestUtil();
private final TestUtil.TestSchedulingPolicy schedulingPolicy = testUtil.newSchedulingPolicy();
private final RunningWorkers runningWorkers;
public RunningWorkersTest() throws InjectionException {
runningWorkers = new RunningWorkers(
schedulingPolicy, Tang.Factory.getTang().newInjector().getInstance(AggregateFunctionRepository.class));
}
/**
* Test executor preemption -> executor allocation.
* Possible scenario: RM preempts Evaluator before the Evaluator sends its first heartbeat to Driver.
*/
@Test
public void removeExecutorAndAddExecutor() throws Exception {
final VortexWorkerManager vortexWorkerManager = testUtil.newWorker();
assertFalse("Must be no running tasklets", runningWorkers.removeWorker(vortexWorkerManager.getId()).isPresent());
runningWorkers.addWorker(vortexWorkerManager);
assertFalse("Executor should not be running", runningWorkers.isWorkerRunning(vortexWorkerManager.getId()));
}
/**
* Test executor allocation -> tasklet launch -> executor preemption -> tasklet complete
* Possible scenario: Tasklet completion message from Evaluator comes in slowly, after preemption message from RM.
*/
@Test(timeout = 10000)
public void removeExecutorAndCompleteTasklet() throws Exception {
final VortexWorkerManager vortexWorkerManager = testUtil.newWorker();
final Tasklet tasklet = testUtil.newTasklet();
runningWorkers.addWorker(vortexWorkerManager);
runningWorkers.launchTasklet(tasklet); // blocks when no worker exists
final Collection<Tasklet> tasklets = runningWorkers.removeWorker(vortexWorkerManager.getId()).get();
assertEquals("Only 1 Tasklet must have been running", 1, tasklets.size());
assertTrue("This Tasklet must have been running", tasklets.contains(tasklet));
final List<Integer> taskletIds = new ArrayList<>();
for (final Tasklet taskletIter : tasklets) {
taskletIds.add(taskletIter.getId());
}
runningWorkers.doneTasklets(vortexWorkerManager.getId(), taskletIds);
assertFalse("Tasklet must not have been completed", schedulingPolicy.taskletIsDone(tasklet.getId()));
}
}