blob: c2bbd12a0551e9b7474f20fd9c0d5eb00e215178 [file] [log] [blame]
/*
* Copyright (C) 2018 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
import edu.snu.nemo.runtime.common.RuntimeIdManager;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.MessageSender;
import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import edu.snu.nemo.runtime.common.state.PlanState;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.BlockManagerMaster;
import edu.snu.nemo.runtime.master.MetricMessageHandler;
import edu.snu.nemo.runtime.master.PlanStateManager;
import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
import edu.snu.nemo.runtime.common.plan.TestPlanGenerator;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.tang.Injector;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
/**
* Tests fault tolerance.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({BlockManagerMaster.class, TaskDispatcher.class, SchedulingConstraintRegistry.class,
PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class})
public final class TaskRetryTest {
@Rule public TestName testName = new TestName();
private static final Logger LOG = LoggerFactory.getLogger(TaskRetryTest.class.getName());
private static final AtomicInteger ID_OFFSET = new AtomicInteger(1);
private Random random;
private Scheduler scheduler;
private ExecutorRegistry executorRegistry;
private PlanStateManager planStateManager;
private static final int MAX_SCHEDULE_ATTEMPT = Integer.MAX_VALUE;
@Before
public void setUp() throws Exception {
// To understand which part of the log belongs to which test
LOG.info("===== Testing {} =====", testName.getMethodName());
final Injector injector = LocalMessageEnvironment.forkInjector(LocalMessageDispatcher.getInjector(),
MessageEnvironment.MASTER_COMMUNICATION_ID);
// Get random
random = new Random(0); // Fixed seed for reproducing test results.
// Get executorRegistry
executorRegistry = injector.getInstance(ExecutorRegistry.class);
// Get PlanStateManager
runPhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, injector);
}
@Test(timeout=7000)
public void testExecutorRemoved() throws Exception {
// Until the plan finishes, events happen
while (!planStateManager.isPlanDone()) {
// 50% chance remove, 50% chance add, 80% chance task completed
executorRemoved(0.5);
executorAdded(0.5);
taskCompleted(0.8);
// 10ms sleep
Thread.sleep(10);
}
// Plan should COMPLETE
assertEquals(PlanState.State.COMPLETE, planStateManager.getPlanState());
assertTrue(planStateManager.isPlanDone());
}
@Test(timeout=7000)
public void testTaskOutputWriteFailure() throws Exception {
// Three executors are used
executorAdded(1.0);
executorAdded(1.0);
executorAdded(1.0);
// Until the plan finishes, events happen
while (!planStateManager.isPlanDone()) {
// 50% chance task completed
// 50% chance task output write failed
taskCompleted(0.5);
taskOutputWriteFailed(0.5);
// 10ms sleep
Thread.sleep(10);
}
// Plan should COMPLETE
assertEquals(PlanState.State.COMPLETE, planStateManager.getPlanState());
assertTrue(planStateManager.isPlanDone());
}
////////////////////////////////////////////////////////////////// Events
private void executorAdded(final double chance) {
if (random.nextDouble() > chance) {
return;
}
final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class);
final ActiveContext activeContext = mock(ActiveContext.class);
Mockito.doThrow(new RuntimeException()).when(activeContext).close();
final ExecutorService serExecutorService = Executors.newSingleThreadExecutor();
final ResourceSpecification computeSpec = new ResourceSpecification(ResourcePriorityProperty.COMPUTE, 2, 0);
final ExecutorRepresenter executor = new ExecutorRepresenter("EXECUTOR" + ID_OFFSET.getAndIncrement(),
computeSpec, mockMsgSender, activeContext, serExecutorService, "NODE" + ID_OFFSET.getAndIncrement());
scheduler.onExecutorAdded(executor);
}
private void executorRemoved(final double chance) {
if (random.nextDouble() > chance) {
return;
}
executorRegistry.viewExecutors(executors -> {
if (executors.isEmpty()) {
return;
}
final List<ExecutorRepresenter> executorList = new ArrayList<>(executors);
final int randomIndex = random.nextInt(executorList.size());
// Because synchronized blocks are reentrant and there's no additional operation after this point,
// we can scheduler.onExecutorRemoved() while being inside executorRegistry.viewExecutors()
scheduler.onExecutorRemoved(executorList.get(randomIndex).getExecutorId());
});
}
private void taskCompleted(final double chance) {
if (random.nextDouble() > chance) {
return;
}
final List<String> executingTasks = getTasksInState(planStateManager, TaskState.State.EXECUTING);
if (!executingTasks.isEmpty()) {
final int randomIndex = random.nextInt(executingTasks.size());
final String selectedTask = executingTasks.get(randomIndex);
SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, selectedTask,
TaskState.State.COMPLETE, RuntimeIdManager.getAttemptFromTaskId(selectedTask));
}
}
private void taskOutputWriteFailed(final double chance) {
if (random.nextDouble() > chance) {
return;
}
final List<String> executingTasks = getTasksInState(planStateManager, TaskState.State.EXECUTING);
if (!executingTasks.isEmpty()) {
final int randomIndex = random.nextInt(executingTasks.size());
final String selectedTask = executingTasks.get(randomIndex);
SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, selectedTask,
TaskState.State.SHOULD_RETRY, RuntimeIdManager.getAttemptFromTaskId(selectedTask),
TaskState.RecoverableTaskFailureCause.OUTPUT_WRITE_FAILURE);
}
}
////////////////////////////////////////////////////////////////// Helper methods
private List<String> getTasksInState(final PlanStateManager planStateManager, final TaskState.State state) {
return planStateManager.getAllTaskAttemptIdsToItsState()
.entrySet()
.stream()
.filter(entry -> entry.getValue().equals(state))
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
private void runPhysicalPlan(final TestPlanGenerator.PlanType planType,
final Injector injector) throws Exception {
final MetricMessageHandler metricMessageHandler = mock(MetricMessageHandler.class);
final PhysicalPlan plan = TestPlanGenerator.generatePhysicalPlan(planType, false);
// Get scheduler
injector.bindVolatileInstance(MetricMessageHandler.class, metricMessageHandler);
injector.bindVolatileInstance(PubSubEventHandlerWrapper.class, mock(PubSubEventHandlerWrapper.class));
injector.bindVolatileInstance(UpdatePhysicalPlanEventHandler.class, mock(UpdatePhysicalPlanEventHandler.class));
injector.bindVolatileInstance(SchedulingConstraintRegistry.class, mock(SchedulingConstraintRegistry.class));
planStateManager = injector.getInstance(PlanStateManager.class);
scheduler = injector.getInstance(Scheduler.class);
scheduler.schedulePlan(plan, MAX_SCHEDULE_ATTEMPT);
}
}