blob: 24e30bec8786afddd36f66392b8510708dde015b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.master.scheduler;
import org.apache.nemo.common.exception.UnknownExecutionStateException;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.plan.Stage;
import org.apache.nemo.runtime.common.plan.StageEdge;
import org.apache.nemo.runtime.common.plan.Task;
import org.apache.nemo.runtime.common.state.TaskState;
import org.apache.nemo.runtime.master.PipeManagerMaster;
import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* A simple scheduler for streaming workloads.
* - Keeps track of new executors
* - Schedules all tasks in the plan at once.
* - Crashes the system upon any other events (should be fixed in the future)
* - Never stops running.
*/
@DriverSide
@NotThreadSafe
public final class StreamingScheduler implements Scheduler {
private static final Logger LOG = LoggerFactory.getLogger(StreamingScheduler.class.getName());
private final TaskDispatcher taskDispatcher;
private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
private final ExecutorRegistry executorRegistry;
private final PlanStateManager planStateManager;
private final PipeManagerMaster pipeManagerMaster;
@Inject
StreamingScheduler(final TaskDispatcher taskDispatcher,
final PendingTaskCollectionPointer pendingTaskCollectionPointer,
final ExecutorRegistry executorRegistry,
final PlanStateManager planStateManager,
final PipeManagerMaster pipeManagerMaster) {
this.taskDispatcher = taskDispatcher;
this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
this.executorRegistry = executorRegistry;
this.planStateManager = planStateManager;
this.pipeManagerMaster = pipeManagerMaster;
}
@Override
public void schedulePlan(final PhysicalPlan submittedPhysicalPlan,
final int maxScheduleAttempt) {
// Housekeeping stuff
taskDispatcher.run();
planStateManager.updatePlan(submittedPhysicalPlan, maxScheduleAttempt);
planStateManager.storeJSON("submitted");
// Prepare tasks
final List<Stage> allStages = submittedPhysicalPlan.getStageDAG().getTopologicalSort();
final List<Task> allTasks = allStages.stream().flatMap(stageToSchedule -> {
// Helper variables for this stage
final List<StageEdge> stageIncomingEdges =
submittedPhysicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
final List<StageEdge> stageOutgoingEdges =
submittedPhysicalPlan.getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
final List<String> taskIdsToSchedule = planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
taskIdsToSchedule.forEach(taskId -> {
final int index = RuntimeIdManager.getIndexFromTaskId(taskId);
stageOutgoingEdges.forEach(outEdge -> pipeManagerMaster.onTaskScheduled(outEdge.getId(), index));
});
// Create tasks of this stage
return taskIdsToSchedule.stream().map(taskId -> new Task(
submittedPhysicalPlan.getPlanId(),
taskId,
stageToSchedule.getExecutionProperties(),
stageToSchedule.getSerializedIRDAG(),
stageIncomingEdges,
stageOutgoingEdges,
vertexIdToReadables.get(RuntimeIdManager.getIndexFromTaskId(taskId))));
}).collect(Collectors.toList());
// Schedule everything at once
pendingTaskCollectionPointer.setToOverwrite(allTasks);
taskDispatcher.onNewPendingTaskCollectionAvailable();
}
@Override
public void updatePlan(final PhysicalPlan newPhysicalPlan) {
// TODO #227: StreamingScheduler Dynamic Optimization
throw new UnsupportedOperationException();
}
@Override
public void onTaskStateReportFromExecutor(final String executorId,
final String taskId,
final int taskAttemptIndex,
final TaskState.State newState,
@Nullable final String vertexPutOnHold,
final TaskState.RecoverableTaskFailureCause failureCause) {
planStateManager.onTaskStateChanged(taskId, newState);
switch (newState) {
case COMPLETE:
// Do nothing.
break;
case ON_HOLD:
case FAILED:
case SHOULD_RETRY:
// TODO #226: StreamingScheduler Fault Tolerance
throw new UnsupportedOperationException();
case READY:
case EXECUTING:
throw new RuntimeException("The states READY/EXECUTING cannot occur at this point");
default:
throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + newState));
}
}
@Override
public void onSpeculativeExecutionCheck() {
// TODO #228: StreamingScheduler Speculative Execution
throw new UnsupportedOperationException();
}
@Override
public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
taskDispatcher.onExecutorSlotAvailable();
executorRegistry.registerExecutor(executorRepresenter);
}
@Override
public void onExecutorRemoved(final String executorId) {
// TODO #226: StreamingScheduler Fault Tolerance
throw new UnsupportedOperationException();
}
@Override
public void terminate() {
this.taskDispatcher.terminate();
this.executorRegistry.terminate();
}
}