blob: 53cab57810b531e0e1f32c79f6c876d8d0f27458 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.master;
import com.google.common.annotations.VisibleForTesting;
import org.apache.nemo.common.StateMachine;
import org.apache.nemo.common.exception.IllegalStateTransitionException;
import org.apache.nemo.common.exception.UnknownExecutionStateException;
import org.apache.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.metric.JobMetric;
import org.apache.nemo.runtime.common.metric.StageMetric;
import org.apache.nemo.runtime.common.metric.TaskMetric;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.plan.Stage;
import org.apache.nemo.runtime.common.state.PlanState;
import org.apache.nemo.runtime.common.state.StageState;
import org.apache.nemo.runtime.common.state.TaskState;
import org.apache.nemo.runtime.master.metric.MetricStore;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.tang.annotations.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static org.apache.nemo.common.dag.DAG.EMPTY_DAG_DIRECTORY;
/**
* Maintains three levels of state machines (PlanState, StageState, and TaskState) of a physical plan.
* The main API this class provides is onTaskStateReportFromExecutor(), which directly changes a TaskState.
* PlanState and StageState are updated internally in the class, and can only be read from the outside.
* <p>
* (CONCURRENCY) The public methods of this class are synchronized.
*/
@DriverSide
@ThreadSafe
public final class PlanStateManager {
private static final Logger LOG = LoggerFactory.getLogger(PlanStateManager.class.getName());
private String planId;
private int maxScheduleAttempt;
private boolean initialized;
private int dagLogFileIndex = 0;
/**
* The data structures below track the execution states of this plan.
*/
private PlanState planState;
private final Map<String, StageState> stageIdToState;
// list of attempt states sorted by attempt idx
private final Map<String, Map<Integer, List<TaskState>>> stageIdToTaskIdxToAttemptStates;
/**
* Used for speculative cloning. (in the unit of milliseconds - ms)
*/
private final Map<String, Long> taskIdToStartTimeMs = new HashMap<>();
private final Map<String, List<Long>> stageIdToCompletedTaskTimeMsList = new HashMap<>();
private final Map<String, Map<Integer, Integer>> stageIdToTaskIndexToNumOfClones = new HashMap<>();
/**
* Represents the plan to manage.
*/
private PhysicalPlan physicalPlan;
/**
* A lock and condition to check whether the plan is finished or not.
*/
private final Lock finishLock;
private final Condition planFinishedCondition;
/**
* For metrics.
*/
private final String dagDirectory;
private MetricStore metricStore;
/**
* Constructor.
*/
@Inject
private PlanStateManager(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
this.planState = new PlanState();
this.stageIdToState = new HashMap<>();
this.stageIdToTaskIdxToAttemptStates = new HashMap<>();
this.finishLock = new ReentrantLock();
this.planFinishedCondition = finishLock.newCondition();
this.dagDirectory = dagDirectory;
this.metricStore = MetricStore.getStore();
this.initialized = false;
}
/**
* Static constructor for manual usage.
* @param dagDirectory the DAG directory to store the JSON to.
* @return a new PlanStateManager instance.
*/
public static PlanStateManager newInstance(final String dagDirectory) {
return new PlanStateManager(dagDirectory);
}
/**
* @param metricStore set the metric store of the paln state manager.
*/
public void setMetricStore(final MetricStore metricStore) {
this.metricStore = metricStore;
}
/**
* Update the physical plan and maximum attempt.
*
* @param physicalPlanToUpdate the physical plan to manage.
* @param maxScheduleAttemptToSet the maximum number of times this plan/sub-part of the plan should be attempted.
*/
public synchronized void updatePlan(final PhysicalPlan physicalPlanToUpdate,
final int maxScheduleAttemptToSet) {
if (!initialized) {
// First scheduling.
this.initialized = true;
} else {
LOG.info("Update Plan from {} to {}", physicalPlan.getPlanId(), physicalPlanToUpdate.getPlanId());
}
this.planState = new PlanState();
this.physicalPlan = physicalPlanToUpdate;
this.planId = physicalPlanToUpdate.getPlanId();
this.maxScheduleAttempt = maxScheduleAttemptToSet;
this.metricStore.getOrCreateMetric(JobMetric.class, planId).setStageDAG(physicalPlanToUpdate.getStageDAG());
this.metricStore.triggerBroadcast(JobMetric.class, planId);
initializeStates();
}
/**
* Initializes the states for the plan/stages/tasks for this plan.
* TODO #182: Consider reshaping in run-time optimization. At now, we only consider plan appending.
*/
private void initializeStates() {
onPlanStateChanged(PlanState.State.EXECUTING);
physicalPlan.getStageDAG().topologicalDo(stage -> {
stageIdToState.putIfAbsent(stage.getId(), new StageState());
stageIdToTaskIdxToAttemptStates.putIfAbsent(stage.getId(), new HashMap<>());
// for each task idx of this stage
stage.getTaskIndices().forEach(taskIndex ->
stageIdToTaskIdxToAttemptStates.get(stage.getId()).putIfAbsent(taskIndex, new ArrayList<>()));
// task states will be initialized lazily in getTaskAttemptsToSchedule()
});
}
/////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////// Core scheduling methods
/**
* Get task attempts that are "READY".
*
* @param stageId to run
* @return executable task attempts
*/
public synchronized List<String> getTaskAttemptsToSchedule(final String stageId) {
if (getStageState(stageId).equals(StageState.State.COMPLETE)) {
// This stage is done
return new ArrayList<>(0);
}
// For each task index....
final List<String> taskAttemptsToSchedule = new ArrayList<>();
final Stage stage = physicalPlan.getStageDAG().getVertexById(stageId);
for (final int taskIndex : stage.getTaskIndices()) {
final List<TaskState> attemptStatesForThisTaskIndex =
stageIdToTaskIdxToAttemptStates.get(stageId).get(taskIndex);
// If one of the attempts is COMPLETE, do not schedule
if (attemptStatesForThisTaskIndex
.stream()
.noneMatch(state -> state.getStateMachine().getCurrentState().equals(TaskState.State.COMPLETE))) {
// (Step 1) Create new READY attempts, as many as
// # of numOfConcurrentAttempts(including clones) - # of 'not-done' attempts
stageIdToTaskIndexToNumOfClones.putIfAbsent(stageId, new HashMap<>());
final Optional<ClonedSchedulingProperty.CloneConf> cloneConf =
stage.getPropertyValue(ClonedSchedulingProperty.class);
final int numOfConcurrentAttempts = cloneConf.isPresent() && cloneConf.get().isUpFrontCloning()
// For now we support up to 1 clone (2 concurrent = 1 original + 1 clone)
? 2
// If the property is not set, then we do not clone (= 1 concurrent)
: stageIdToTaskIndexToNumOfClones.get(stageId).getOrDefault(stageId, 1);
final long numOfNotDoneAttempts = attemptStatesForThisTaskIndex.stream().filter(this::isTaskNotDone).count();
for (int i = 0; i < numOfConcurrentAttempts - numOfNotDoneAttempts; i++) {
attemptStatesForThisTaskIndex.add(new TaskState());
}
// (Step 2) Check max attempt
if (attemptStatesForThisTaskIndex.size() > maxScheduleAttempt) {
throw new RuntimeException(
attemptStatesForThisTaskIndex.size() + " exceeds max attempt " + maxScheduleAttempt);
}
// (Step 3) Return all READY attempts
for (int attempt = 0; attempt < attemptStatesForThisTaskIndex.size(); attempt++) {
if (attemptStatesForThisTaskIndex.get(attempt).getStateMachine().getCurrentState()
.equals(TaskState.State.READY)) {
taskAttemptsToSchedule.add(RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt));
}
}
}
}
return taskAttemptsToSchedule;
}
/**
* @param stageId to query.
* @return all task attempt ids of the stage.
*/
public synchronized Set<String> getAllTaskAttemptsOfStage(final String stageId) {
return getTaskAttemptIdsToItsState(stageId).keySet();
}
/////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////// Speculative execution
/**
* @param stageId to query.
* @return a map from an EXECUTING task to its running time so far.
*/
public synchronized Map<String, Long> getExecutingTaskToRunningTimeMs(final String stageId) {
final long curTime = System.currentTimeMillis();
final Map<String, Long> result = new HashMap<>();
final Map<Integer, List<TaskState>> taskIdToState = stageIdToTaskIdxToAttemptStates.get(stageId);
for (final int taskIndex : taskIdToState.keySet()) {
final List<TaskState> attemptStates = taskIdToState.get(taskIndex);
for (int attempt = 0; attempt < attemptStates.size(); attempt++) {
if (TaskState.State.EXECUTING.equals(attemptStates.get(attempt).getStateMachine().getCurrentState())) {
final String taskId = RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt);
result.put(taskId, curTime - taskIdToStartTimeMs.get(taskId));
}
}
}
return result;
}
/**
* List of task times so far for this stage.
*
* @param stageId of the stage.
* @return a copy of the list, empty if none completed.
*/
public synchronized List<Long> getCompletedTaskTimeListMs(final String stageId) {
// Return a copy
return new ArrayList<>(stageIdToCompletedTaskTimeMsList.getOrDefault(stageId, new ArrayList<>(0)));
}
/**
* @param stageId of the clone.
* @param taskIndex of the clone.
* @param numOfClones of the clone.
* @return true if the numOfClones has been modified, false otherwise
*/
public synchronized boolean setNumOfClones(final String stageId, final int taskIndex, final int numOfClones) {
stageIdToTaskIndexToNumOfClones.putIfAbsent(stageId, new HashMap<>());
// overwrite the previous value.
final Integer previousNumOfClones = stageIdToTaskIndexToNumOfClones.get(stageId).put(taskIndex, numOfClones);
return (previousNumOfClones == null) || (previousNumOfClones != numOfClones);
}
/////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////// State transitions
/**
* Updates the state of a task.
* Task state changes can occur both in master and executor.
* State changes that occur in master are
* initiated in {@link org.apache.nemo.runtime.master.scheduler.BatchScheduler}.
* State changes that occur in executors are sent to master as a control message,
* and the call to this method is initiated in {@link org.apache.nemo.runtime.master.scheduler.BatchScheduler}
* when the message/event is received.
*
* @param taskId the ID of the task.
* @param newTaskState the new state of the task.
*/
public synchronized void onTaskStateChanged(final String taskId, final TaskState.State newTaskState) {
// Change task state
final StateMachine taskState = getTaskStateHelper(taskId).getStateMachine();
LOG.debug("Task State Transition: id {}, from {} to {}",
new Object[]{taskId, taskState.getCurrentState(), newTaskState});
metricStore.getOrCreateMetric(TaskMetric.class, taskId)
.addEvent((TaskState.State) taskState.getCurrentState(), newTaskState);
metricStore.triggerBroadcast(TaskMetric.class, taskId);
try {
taskState.setState(newTaskState);
} catch (IllegalStateTransitionException e) {
throw new RuntimeException(taskId + " - Illegal task state transition ", e);
}
// Log not-yet-completed tasks for us humans to track progress
final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
final Map<Integer, List<TaskState>> taskStatesOfThisStage = stageIdToTaskIdxToAttemptStates.get(stageId);
final long numOfCompletedTaskIndicesInThisStage = taskStatesOfThisStage.values().stream()
.filter(attempts -> {
final List<TaskState.State> states = attempts
.stream()
.map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
.collect(Collectors.toList());
return states.stream().anyMatch(curState -> curState.equals(TaskState.State.ON_HOLD)) // one of them is ON_HOLD
|| states.stream().anyMatch(curState -> curState.equals(TaskState.State.COMPLETE)); // one of them is COMPLETE
})
.count();
if (newTaskState.equals(TaskState.State.COMPLETE)) {
LOG.info("{} completed: {} Task(s) out of {} are remaining in this stage",
taskId, taskStatesOfThisStage.size() - numOfCompletedTaskIndicesInThisStage, taskStatesOfThisStage.size());
}
// Maintain info for speculative execution
if (newTaskState.equals(TaskState.State.EXECUTING)) {
taskIdToStartTimeMs.put(taskId, System.currentTimeMillis());
} else if (newTaskState.equals(TaskState.State.COMPLETE)) {
stageIdToCompletedTaskTimeMsList.putIfAbsent(stageId, new ArrayList<>());
stageIdToCompletedTaskTimeMsList.get(stageId).add(System.currentTimeMillis() - taskIdToStartTimeMs.get(taskId));
}
// Change stage state, if needed
switch (newTaskState) {
// INCOMPLETE stage
case SHOULD_RETRY:
final boolean isAPeerAttemptCompleted = getPeerAttemptsForTheSameTaskIndex(taskId).stream()
.anyMatch(state -> state.equals(TaskState.State.COMPLETE));
if (!isAPeerAttemptCompleted) {
// None of the peers has completed, hence this stage is incomplete
onStageStateChanged(stageId, StageState.State.INCOMPLETE);
}
break;
// COMPLETE stage
case COMPLETE:
case ON_HOLD:
if (numOfCompletedTaskIndicesInThisStage
== physicalPlan.getStageDAG().getVertexById(stageId).getTaskIndices().size()) {
onStageStateChanged(stageId, StageState.State.COMPLETE);
}
break;
// Doesn't affect StageState
case READY:
case EXECUTING:
case FAILED:
break;
default:
throw new UnknownExecutionStateException(new Throwable("This task state is unknown"));
}
}
/**
* (PRIVATE METHOD)
* Updates the state of a stage.
*
* @param stageId of the stage.
* @param newStageState of the stage.
*/
private void onStageStateChanged(final String stageId, final StageState.State newStageState) {
// Change stage state
final StateMachine stageStateMachine = stageIdToState.get(stageId).getStateMachine();
metricStore.getOrCreateMetric(StageMetric.class, stageId)
.addEvent(getStageState(stageId), newStageState);
metricStore.triggerBroadcast(StageMetric.class, stageId);
LOG.debug("Stage State Transition: id {} from {} to {}",
new Object[]{stageId, stageStateMachine.getCurrentState(), newStageState});
try {
stageStateMachine.setState(newStageState);
} catch (IllegalStateTransitionException e) {
throw new RuntimeException(stageId + " - Illegal stage state transition ", e);
}
// Change plan state if needed
final boolean allStagesCompleted = stageIdToState.values().stream().allMatch(state ->
state.getStateMachine().getCurrentState().equals(StageState.State.COMPLETE));
// avoid duplicate plan COMPLETE caused by cloning
if (allStagesCompleted && !PlanState.State.COMPLETE.equals(getPlanState())) {
onPlanStateChanged(PlanState.State.COMPLETE);
}
}
/**
* (PRIVATE METHOD)
* Updates the state of the plan.
*
* @param newState of the plan.
*/
private void onPlanStateChanged(final PlanState.State newState) {
metricStore.getOrCreateMetric(JobMetric.class, planId)
.addEvent((PlanState.State) planState.getStateMachine().getCurrentState(), newState);
metricStore.triggerBroadcast(JobMetric.class, planId);
try {
planState.getStateMachine().setState(newState);
} catch (IllegalStateTransitionException e) {
throw new RuntimeException(planId + " - Illegal plan state transition ", e);
}
if (newState == PlanState.State.EXECUTING) {
LOG.debug("Executing Plan ID {}...", this.planId);
} else if (newState == PlanState.State.COMPLETE || newState == PlanState.State.FAILED) {
LOG.debug("Plan ID {} {}!", planId, newState);
// Awake all threads waiting the finish of this plan.
finishLock.lock();
try {
planFinishedCondition.signalAll();
} finally {
finishLock.unlock();
}
} else {
throw new RuntimeException("Illegal Plan State Transition");
}
}
/////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////// Helper Methods
/**
* Wait for this plan to be finished and return the final state.
*
* @return the final state of this plan.
*/
public PlanState.State waitUntilFinish() {
return waitUntilFinish(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
/**
* Wait for this plan to be finished and return the final state.
* It wait for at most the given time.
*
* @param timeout of waiting.
* @param unit of the timeout.
* @return the final state of this plan.
*/
public PlanState.State waitUntilFinish(final long timeout, final TimeUnit unit) {
finishLock.lock();
try {
if (!isPlanDone()) {
while (!planFinishedCondition.await(timeout, unit)) {
LOG.warn("Timeout during waiting the finish of Plan ID {}", planId);
Thread.currentThread().interrupt();
}
}
} catch (final InterruptedException e) {
LOG.warn("Interrupted while waiting for the finish of Plan ID {}", planId);
Thread.currentThread().interrupt();
} finally {
finishLock.unlock();
}
return getPlanState();
}
/**
* @return a map from task attempt id to its current state.
*/
@VisibleForTesting
public synchronized Map<String, TaskState.State> getAllTaskAttemptIdsToItsState() {
return physicalPlan.getStageDAG().getVertices()
.stream()
.map(Stage::getId)
.flatMap(stageId -> getTaskAttemptIdsToItsState(stageId).entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
/**
* @return whether the execution for the plan is done or not.
*/
public synchronized boolean isPlanDone() {
return (getPlanState() == PlanState.State.COMPLETE || getPlanState() == PlanState.State.FAILED);
}
/**
* @return the ID of the plan.
*/
public synchronized String getPlanId() {
return planId;
}
/**
* @return the state of the plan.
*/
public synchronized PlanState.State getPlanState() {
return (PlanState.State) planState.getStateMachine().getCurrentState();
}
/**
* @param stageId the stage ID to query.
* @return the state of the stage.
*/
public synchronized StageState.State getStageState(final String stageId) {
return (StageState.State) stageIdToState.get(stageId).getStateMachine().getCurrentState();
}
/**
* @param taskId the ID of the task to query.
* @return the state of the task.
*/
public synchronized TaskState.State getTaskState(final String taskId) {
return (TaskState.State) getTaskStateHelper(taskId).getStateMachine().getCurrentState();
}
private Map<String, TaskState.State> getTaskAttemptIdsToItsState(final String stageId) {
final Map<String, TaskState.State> result = new HashMap<>();
final Map<Integer, List<TaskState>> taskIdToState = stageIdToTaskIdxToAttemptStates.get(stageId);
for (final int taskIndex : taskIdToState.keySet()) {
final List<TaskState> attemptStates = taskIdToState.get(taskIndex);
for (int attempt = 0; attempt < attemptStates.size(); attempt++) {
result.put(RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt),
(TaskState.State) attemptStates.get(attempt).getStateMachine().getCurrentState());
}
}
return result;
}
private TaskState getTaskStateHelper(final String taskId) {
return stageIdToTaskIdxToAttemptStates
.get(RuntimeIdManager.getStageIdFromTaskId(taskId))
.get(RuntimeIdManager.getIndexFromTaskId(taskId))
.get(RuntimeIdManager.getAttemptFromTaskId(taskId));
}
private boolean isTaskNotDone(final TaskState taskState) {
final TaskState.State state = (TaskState.State) taskState.getStateMachine().getCurrentState();
return state.equals(TaskState.State.READY)
|| state.equals(TaskState.State.EXECUTING)
|| state.equals(TaskState.State.ON_HOLD);
}
private List<TaskState.State> getPeerAttemptsForTheSameTaskIndex(final String taskId) {
final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
final int taskIndex = RuntimeIdManager.getIndexFromTaskId(taskId);
final int attempt = RuntimeIdManager.getAttemptFromTaskId(taskId);
final List<TaskState> otherAttemptsforTheSameTaskIndex =
new ArrayList<>(stageIdToTaskIdxToAttemptStates.get(stageId).get(taskIndex));
otherAttemptsforTheSameTaskIndex.remove(attempt);
return otherAttemptsforTheSameTaskIndex.stream()
.map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
.collect(Collectors.toList());
}
/**
* @return the physical plan.
*/
public synchronized PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}
/**
* @return the maximum number of task scheduling.
*/
public int getMaxScheduleAttempt() {
return maxScheduleAttempt;
}
/**
* @return whether any plan has been submitted and initialized.
*/
public synchronized boolean isInitialized() {
return initialized;
}
/**
* Stores JSON representation of plan state into a file.
*
* @param suffix suffix for file name
*/
public void storeJSON(final String suffix) {
if (dagDirectory.equals(EMPTY_DAG_DIRECTORY)) {
return;
}
final File file = new File(dagDirectory, planId + "-" + dagLogFileIndex + "-" + suffix + ".json");
file.getParentFile().mkdirs();
try (PrintWriter printWriter = new PrintWriter(file)) {
printWriter.println(toStringWithPhysicalPlan());
LOG.debug(String.format("JSON representation of plan state for %s(%s) was saved to %s",
planId, dagLogFileIndex + "-" + suffix, file.getPath()));
} catch (final IOException e) {
LOG.warn(String.format("Cannot store JSON representation of plan state for %s(%s) to %s: %s",
planId, dagLogFileIndex + "-" + suffix, file.getPath(), e.toString()));
} finally {
dagLogFileIndex++;
}
}
private String toStringWithPhysicalPlan() {
final StringBuilder sb = new StringBuilder("{");
sb.append("\"dag\": ").append(physicalPlan.getStageDAG().toString()).append(", ");
sb.append("\"planState\": ").append(toString()).append("}");
return sb.toString();
}
@Override
public synchronized String toString() {
final StringBuilder sb = new StringBuilder("{");
sb.append("\"planId\": \"").append(planId).append("\", ");
sb.append("\"stages\": [");
boolean isFirstStage = true;
for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
if (!isFirstStage) {
sb.append(", ");
}
isFirstStage = false;
final StageState stageState = stageIdToState.get(stage.getId());
sb.append("{\"id\": \"").append(stage.getId()).append("\", ");
sb.append("\"state\": \"").append(stageState.toString()).append("\", ");
sb.append("\"tasks\": [");
boolean isFirstTask = true;
for (final Map.Entry<String, TaskState.State> entry : getTaskAttemptIdsToItsState(stage.getId()).entrySet()) {
if (!isFirstTask) {
sb.append(", ");
}
isFirstTask = false;
sb.append("{\"id\": \"").append(entry.getKey()).append("\", ");
sb.append("\"state\": \"").append(entry.getValue().toString()).append("\"}");
}
sb.append("]}");
}
sb.append("]}");
return sb.toString();
}
}