blob: 086c9d08bd83bf120b6a4088b63701337ebf3f19 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.master.scheduler;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.exception.UnknownExecutionStateException;
import org.apache.nemo.common.exception.UnrecoverableFailureException;
import org.apache.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.*;
import org.apache.nemo.runtime.common.state.StageState;
import org.apache.nemo.runtime.common.state.TaskState;
import org.apache.nemo.runtime.master.BlockManagerMaster;
import org.apache.nemo.runtime.master.PlanAppender;
import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;
import java.util.*;
import java.util.stream.Collectors;
/**
* (CONCURRENCY) Only a single dedicated thread should use the public methods of this class.
* (i.e., runtimeMasterThread in RuntimeMaster)
* <p>
* BatchScheduler receives a single {@link PhysicalPlan} to execute and schedules the Tasks.
*
* Note: When modifying this class, take a look at {@link SimulationScheduler}.
*/
@DriverSide
@NotThreadSafe
public final class BatchScheduler implements Scheduler {
private static final Logger LOG = LoggerFactory.getLogger(BatchScheduler.class.getName());
/**
* Run-time optimizations.
*/
private final PlanRewriter planRewriter;
/**
* Components related to scheduling the given plan.
*/
private final TaskDispatcher taskDispatcher; // Class for dispatching tasks.
private final PendingTaskCollectionPointer pendingTaskCollectionPointer; // A 'pointer' to the list of pending tasks.
private final ExecutorRegistry executorRegistry; // A registry for executors available for the job.
private final PlanStateManager planStateManager; // A component that manages the state of the plan.
/**
* Other necessary components of this {@link org.apache.nemo.runtime.master.RuntimeMaster}.
*/
private final BlockManagerMaster blockManagerMaster; // A component that manages data blocks.
/**
* The below variables depend on the submitted plan to execute.
*/
private List<List<Stage>> sortedScheduleGroups; // Stages, sorted in the order to be scheduled.
@Inject
private BatchScheduler(final PlanRewriter planRewriter,
final TaskDispatcher taskDispatcher,
final PendingTaskCollectionPointer pendingTaskCollectionPointer,
final BlockManagerMaster blockManagerMaster,
final ExecutorRegistry executorRegistry,
final PlanStateManager planStateManager) {
this.planRewriter = planRewriter;
this.taskDispatcher = taskDispatcher;
this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
this.blockManagerMaster = blockManagerMaster;
this.executorRegistry = executorRegistry;
this.planStateManager = planStateManager;
}
////////////////////////////////////////////////////////////////////// Methods for plan rewriting.
@Override
public void updatePlan(final PhysicalPlan newPhysicalPlan) {
// update the physical plan in the scheduler.
// NOTE: what's already been executed is not modified in the new physical plan.
// TODO #182: Consider reshaping in run-time optimization. At now, we only consider plan appending.
updatePlan(newPhysicalPlan, planStateManager.getMaxScheduleAttempt());
}
/**
* Update the physical plan in the scheduler.
*
* @param newPhysicalPlan the new physical plan to update.
* @param maxScheduleAttempt the maximum number of task scheduling attempt.
*/
private void updatePlan(final PhysicalPlan newPhysicalPlan,
final int maxScheduleAttempt) {
planStateManager.updatePlan(newPhysicalPlan, maxScheduleAttempt);
this.sortedScheduleGroups = newPhysicalPlan.getStageDAG().getVertices().stream()
.collect(Collectors.groupingBy(Stage::getScheduleGroup))
.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.collect(Collectors.toList());
}
/**
* Process the RuntimePassMessage.
* @param taskId that generated the message.
* @param data of the message.
*/
public void onRunTimePassMessage(final String taskId, final Object data) {
BatchSchedulerUtils.onRunTimePassMessage(planStateManager, planRewriter, taskId, data);
}
////////////////////////////////////////////////////////////////////// Methods for scheduling.
/**
* Schedules a given plan.
* If multiple physical plans are submitted, they will be appended and handled as a single plan.
* TODO #182: Consider reshaping in run-time optimization. At now, we only consider plan appending.
*
* @param submittedPhysicalPlan the physical plan to schedule.
* @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted.
*/
@Override
public void schedulePlan(final PhysicalPlan submittedPhysicalPlan,
final int maxScheduleAttempt) {
LOG.info("Plan to schedule: {}", submittedPhysicalPlan.getPlanId());
if (!planStateManager.isInitialized()) {
// First scheduling.
taskDispatcher.run();
updatePlan(submittedPhysicalPlan, maxScheduleAttempt);
planStateManager.storeJSON("submitted");
} else {
// Append the submitted plan to the original plan.
final PhysicalPlan appendedPlan =
PlanAppender.appendPlan(planStateManager.getPhysicalPlan(), submittedPhysicalPlan);
updatePlan(appendedPlan, maxScheduleAttempt);
planStateManager.storeJSON("appended");
}
doSchedule();
}
/**
* Handles task state transition notifications sent from executors.
* Note that we can receive notifications for previous task attempts, due to the nature of asynchronous events.
* We ignore such late-arriving notifications, and only handle notifications for the current task attempt.
*
* @param executorId the id of the executor where the message was sent from.
* @param taskId whose state has changed
* @param taskAttemptIndex of the task whose state has changed
* @param newState the state to change to
* @param vertexPutOnHold the ID of vertex that is put on hold. It is null otherwise.
*/
@Override
public void onTaskStateReportFromExecutor(final String executorId,
final String taskId,
final int taskAttemptIndex,
final TaskState.State newState,
@Nullable final String vertexPutOnHold,
final TaskState.RecoverableTaskFailureCause failureCause) {
// Do change state, as this notification is for the current task attempt.
planStateManager.onTaskStateChanged(taskId, newState);
switch (newState) {
case COMPLETE:
BatchSchedulerUtils.onTaskExecutionComplete(executorRegistry, executorId, taskId);
break;
case SHOULD_RETRY:
// SHOULD_RETRY from an executor means that the task ran into a recoverable failure
BatchSchedulerUtils.onTaskExecutionFailedRecoverable(planStateManager, blockManagerMaster, executorRegistry,
executorId, taskId, failureCause);
break;
case ON_HOLD:
final Optional<PhysicalPlan> optionalPhysicalPlan =
BatchSchedulerUtils
.onTaskExecutionOnHold(planStateManager, executorRegistry, planRewriter, executorId, taskId);
optionalPhysicalPlan.ifPresent(this::updatePlan);
break;
case FAILED:
throw new UnrecoverableFailureException(new Exception(String.format("The plan failed on %s in %s",
taskId, executorId)));
case READY:
case EXECUTING:
throw new RuntimeException("The states READY/EXECUTING cannot occur at this point");
default:
throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + newState));
}
// Invoke doSchedule()
switch (newState) {
case COMPLETE:
case ON_HOLD:
// If the stage has completed
final String stageIdForTaskUponCompletion = RuntimeIdManager.getStageIdFromTaskId(taskId);
if (planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE)
&& !planStateManager.isPlanDone()) {
doSchedule();
}
break;
case SHOULD_RETRY:
// Do retry
doSchedule();
break;
default:
break;
}
// Invoke taskDispatcher.onExecutorSlotAvailable()
switch (newState) {
// These three states mean that a slot is made available.
case COMPLETE:
case ON_HOLD:
case SHOULD_RETRY:
taskDispatcher.onExecutorSlotAvailable();
break;
default:
break;
}
}
@Override
public void onSpeculativeExecutionCheck() {
MutableBoolean isNewCloneCreated = new MutableBoolean(false);
BatchSchedulerUtils.selectEarliestSchedulableGroup(sortedScheduleGroups, planStateManager)
.ifPresent(scheduleGroup ->
scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
final Stage stage = planStateManager.getPhysicalPlan().getStageDAG().getVertexById(stageId);
// Only if the ClonedSchedulingProperty is set...
stage.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
if (!cloneConf.isUpFrontCloning()) { // Upfront cloning is already handled.
isNewCloneCreated.setValue(doSpeculativeExecution(stage, cloneConf));
}
});
}));
if (isNewCloneCreated.booleanValue()) {
doSchedule(); // Do schedule the new clone.
}
}
@Override
public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
executorRegistry.registerExecutor(executorRepresenter);
taskDispatcher.onExecutorSlotAvailable();
}
@Override
public void onExecutorRemoved(final String executorId) {
LOG.info("{} removed", executorId);
blockManagerMaster.removeWorker(executorId);
// These are tasks that were running at the time of executor removal.
final Set<String> interruptedTasks = new HashSet<>();
executorRegistry.updateExecutor(executorId, (executor, state) -> {
interruptedTasks.addAll(executor.onExecutorFailed());
return Pair.of(executor, ExecutorRegistry.ExecutorState.FAILED);
});
// Blocks of the interrupted tasks are failed.
interruptedTasks.forEach(blockManagerMaster::onProducerTaskFailed);
// Retry the interrupted tasks (and required parents)
BatchSchedulerUtils.retryTasksAndRequiredParents(planStateManager, blockManagerMaster, interruptedTasks);
// Trigger the scheduling of SHOULD_RETRY tasks in the earliest scheduleGroup
doSchedule();
}
@Override
public void terminate() {
this.taskDispatcher.terminate();
this.executorRegistry.terminate();
}
////////////////////////////////////////////////////////////////////// Task launch methods.
/**
* The main entry point for task scheduling.
* This operation can be invoked at any point during job execution, as it is designed to be free of side-effects.
* <p>
* These are the reasons why.
* - We 'reset' {@link PendingTaskCollectionPointer}, and not 'add' new tasks to it
* - We make {@link TaskDispatcher} dispatch only the tasks that are READY.
*/
private void doSchedule() {
final Optional<List<Stage>> earliest =
BatchSchedulerUtils.selectEarliestSchedulableGroup(sortedScheduleGroups, planStateManager);
if (earliest.isPresent()) {
final List<Task> tasksToSchedule = earliest.get().stream()
.flatMap(stage ->
BatchSchedulerUtils.selectSchedulableTasks(planStateManager, blockManagerMaster, stage).stream())
.collect(Collectors.toList());
if (!tasksToSchedule.isEmpty()) {
LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
.map(Task::getTaskId)
.map(RuntimeIdManager::getStageIdFromTaskId)
.collect(Collectors.toSet()));
// Set the pointer to the schedulable tasks.
pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
// Notify the dispatcher that a new collection is available.
taskDispatcher.onNewPendingTaskCollectionAvailable();
}
} else {
LOG.info("Skipping this round as no ScheduleGroup is schedulable.");
}
}
////////////////////////////////////////////////////////////////////// Task cloning methods.
/**
* @return true if a new clone is created.
* false otherwise.
*/
private boolean doSpeculativeExecution(final Stage stage, final ClonedSchedulingProperty.CloneConf cloneConf) {
final double fractionToWaitFor = cloneConf.getFractionToWaitFor();
final Object[] completedTaskTimes = planStateManager.getCompletedTaskTimeListMs(stage.getId()).toArray();
// Only after the fraction of the tasks are done...
// Delayed cloning (aggressive)
if (completedTaskTimes.length > 0
&& completedTaskTimes.length >= Math.round(stage.getTaskIndices().size() * fractionToWaitFor)) {
// Only if the running task is considered a 'straggler'....
Arrays.sort(completedTaskTimes);
final long medianTime = (long) completedTaskTimes[completedTaskTimes.length / 2];
final double medianTimeMultiplier = cloneConf.getMedianTimeMultiplier();
final Map<String, Long> executingTaskToTime = planStateManager.getExecutingTaskToRunningTimeMs(stage.getId());
return modifyStageNumCloneUsingMedianTime(
stage.getId(), completedTaskTimes.length, medianTime, medianTimeMultiplier, executingTaskToTime);
} else {
return false;
}
}
/**
* @return true if the number of clones for the stage is modified.
* false otherwise.
*/
private boolean modifyStageNumCloneUsingMedianTime(final String stageId,
final long numCompletedTasks,
final long medianTime,
final double medianTimeMultiplier,
final Map<String, Long> executingTaskToTime) {
for (final Map.Entry<String, Long> entry : executingTaskToTime.entrySet()) {
final long runningTime = entry.getValue();
if (runningTime > Math.round(medianTime * medianTimeMultiplier)) {
final String taskId = entry.getKey();
final boolean isNumCloneModified = planStateManager
.setNumOfClones(stageId, RuntimeIdManager.getIndexFromTaskId(taskId), 2);
if (isNumCloneModified) {
LOG.info("Cloned {}, because its running time {} (ms) is bigger than {} tasks' "
+ "(median) {} (ms) * (multiplier) {}", taskId, runningTime, numCompletedTasks,
medianTime, medianTimeMultiplier);
return true;
}
}
}
return false;
}
}