blob: bb76bc5291f29a040b68ebf704a7191451535ff8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.master.scheduler;
import com.google.common.collect.Sets;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
import org.apache.nemo.runtime.common.plan.*;
import org.apache.nemo.runtime.common.state.BlockState;
import org.apache.nemo.runtime.common.state.TaskState;
import org.apache.nemo.runtime.master.PlanAppender;
import org.apache.nemo.runtime.master.DataSkewDynOptDataHandler;
import org.apache.nemo.runtime.master.DynOptDataHandler;
import org.apache.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
import org.apache.nemo.common.exception.*;
import org.apache.nemo.runtime.common.state.StageState;
import org.apache.nemo.runtime.master.BlockManagerMaster;
import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.reef.annotations.audience.DriverSide;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
/**
* (CONCURRENCY) Only a single dedicated thread should use the public methods of this class.
* (i.e., runtimeMasterThread in RuntimeMaster)
*
* BatchScheduler receives a single {@link PhysicalPlan} to execute and schedules the Tasks.
*/
@DriverSide
@NotThreadSafe
public final class BatchScheduler implements Scheduler {
private static final Logger LOG = LoggerFactory.getLogger(BatchScheduler.class.getName());
/**
* Components related to scheduling the given plan.
*/
private final TaskDispatcher taskDispatcher;
private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
private final ExecutorRegistry executorRegistry;
private final PlanStateManager planStateManager;
/**
* Other necessary components of this {@link org.apache.nemo.runtime.master.RuntimeMaster}.
*/
private final BlockManagerMaster blockManagerMaster;
private final PubSubEventHandlerWrapper pubSubEventHandlerWrapper;
/**
* The below variables depend on the submitted plan to execute.
*/
private List<List<Stage>> sortedScheduleGroups;
private List<DynOptDataHandler> dynOptDataHandlers;
@Inject
private BatchScheduler(final TaskDispatcher taskDispatcher,
final PendingTaskCollectionPointer pendingTaskCollectionPointer,
final BlockManagerMaster blockManagerMaster,
final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler,
final ExecutorRegistry executorRegistry,
final PlanStateManager planStateManager) {
this.taskDispatcher = taskDispatcher;
this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
this.blockManagerMaster = blockManagerMaster;
this.pubSubEventHandlerWrapper = pubSubEventHandlerWrapper;
updatePhysicalPlanEventHandler.setScheduler(this);
if (pubSubEventHandlerWrapper.getPubSubEventHandler() != null) {
pubSubEventHandlerWrapper.getPubSubEventHandler()
.subscribe(updatePhysicalPlanEventHandler.getEventClass(), updatePhysicalPlanEventHandler);
}
this.executorRegistry = executorRegistry;
this.planStateManager = planStateManager;
this.dynOptDataHandlers = new ArrayList<>();
dynOptDataHandlers.add(new DataSkewDynOptDataHandler());
}
/**
* Schedules a given plan.
* If multiple physical plans are submitted, they will be appended and handled as a single plan.
* TODO #182: Consider reshaping in run-time optimization. At now, we only consider plan appending.
*
* @param submittedPhysicalPlan the physical plan to schedule.
* @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted.
*/
@Override
public void schedulePlan(final PhysicalPlan submittedPhysicalPlan,
final int maxScheduleAttempt) {
LOG.info("Plan to schedule: {}", submittedPhysicalPlan.getPlanId());
if (!planStateManager.isInitialized()) {
// First scheduling.
taskDispatcher.run();
updatePlan(submittedPhysicalPlan, maxScheduleAttempt);
planStateManager.storeJSON("submitted");
} else {
// Append the submitted plan to the original plan.
final PhysicalPlan appendedPlan =
PlanAppender.appendPlan(planStateManager.getPhysicalPlan(), submittedPhysicalPlan);
updatePlan(appendedPlan, maxScheduleAttempt);
planStateManager.storeJSON("appended");
}
doSchedule();
}
@Override
public void updatePlan(final PhysicalPlan newPhysicalPlan) {
// update the physical plan in the scheduler.
// NOTE: what's already been executed is not modified in the new physical plan.
// TODO #182: Consider reshaping in run-time optimization. At now, we only consider plan appending.
updatePlan(newPhysicalPlan, planStateManager.getMaxScheduleAttempt());
}
/**
* Update the physical plan in the scheduler.
*
* @param newPhysicalPlan the new physical plan to update.
* @param maxScheduleAttempt the maximum number of task scheduling attempt.
*/
private void updatePlan(final PhysicalPlan newPhysicalPlan,
final int maxScheduleAttempt) {
planStateManager.updatePlan(newPhysicalPlan, maxScheduleAttempt);
this.sortedScheduleGroups = newPhysicalPlan.getStageDAG().getVertices().stream()
.collect(Collectors.groupingBy(Stage::getScheduleGroup))
.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.collect(Collectors.toList());
}
/**
* Handles task state transition notifications sent from executors.
* Note that we can receive notifications for previous task attempts, due to the nature of asynchronous events.
* We ignore such late-arriving notifications, and only handle notifications for the current task attempt.
*
* @param executorId the id of the executor where the message was sent from.
* @param taskId whose state has changed
* @param taskAttemptIndex of the task whose state has changed
* @param newState the state to change to
* @param vertexPutOnHold the ID of vertex that is put on hold. It is null otherwise.
*/
public void onTaskStateReportFromExecutor(final String executorId,
final String taskId,
final int taskAttemptIndex,
final TaskState.State newState,
@Nullable final String vertexPutOnHold,
final TaskState.RecoverableTaskFailureCause failureCause) {
// Do change state, as this notification is for the current task attempt.
planStateManager.onTaskStateChanged(taskId, newState);
switch (newState) {
case COMPLETE:
onTaskExecutionComplete(executorId, taskId);
break;
case SHOULD_RETRY:
// SHOULD_RETRY from an executor means that the task ran into a recoverable failure
onTaskExecutionFailedRecoverable(executorId, taskId, failureCause);
break;
case ON_HOLD:
onTaskExecutionOnHold(executorId, taskId);
break;
case FAILED:
throw new UnrecoverableFailureException(new Exception(String.format("The plan failed on %s in %s",
taskId, executorId)));
case READY:
case EXECUTING:
throw new RuntimeException("The states READY/EXECUTING cannot occur at this point");
default:
throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + newState));
}
// Invoke doSchedule()
switch (newState) {
case COMPLETE:
case ON_HOLD:
// If the stage has completed
final String stageIdForTaskUponCompletion = RuntimeIdManager.getStageIdFromTaskId(taskId);
if (planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE)) {
if (!planStateManager.isPlanDone()) {
doSchedule();
}
}
break;
case SHOULD_RETRY:
// Do retry
doSchedule();
break;
default:
break;
}
// Invoke taskDispatcher.onExecutorSlotAvailable()
switch (newState) {
// These three states mean that a slot is made available.
case COMPLETE:
case ON_HOLD:
case SHOULD_RETRY:
taskDispatcher.onExecutorSlotAvailable();
break;
default:
break;
}
}
@Override
public void onSpeculativeExecutionCheck() {
MutableBoolean isNumOfCloneChanged = new MutableBoolean(false);
selectEarliestSchedulableGroup().ifPresent(scheduleGroup -> {
scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
final Stage stage = planStateManager.getPhysicalPlan().getStageDAG().getVertexById(stageId);
// Only if the ClonedSchedulingProperty is set...
stage.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
if (!cloneConf.isUpFrontCloning()) { // Upfront cloning is already handled.
final double fractionToWaitFor = cloneConf.getFractionToWaitFor();
final int parallelism = stage.getParallelism();
final Object[] completedTaskTimes = planStateManager.getCompletedTaskTimeListMs(stageId).toArray();
// Only after the fraction of the tasks are done...
// Delayed cloning (aggressive)
if (completedTaskTimes.length > 0
&& completedTaskTimes.length >= Math.round(parallelism * fractionToWaitFor)) {
Arrays.sort(completedTaskTimes);
final long medianTime = (long) completedTaskTimes[completedTaskTimes.length / 2];
final double medianTimeMultiplier = cloneConf.getMedianTimeMultiplier();
final Map<String, Long> execTaskToTime = planStateManager.getExecutingTaskToRunningTimeMs(stageId);
for (final Map.Entry<String, Long> entry : execTaskToTime.entrySet()) {
// Only if the running task is considered a 'straggler'....
final long runningTime = entry.getValue();
if (runningTime > Math.round(medianTime * medianTimeMultiplier)) {
final String taskId = entry.getKey();
final boolean isCloned = planStateManager.setNumOfClones(
stageId, RuntimeIdManager.getIndexFromTaskId(taskId), 2);
if (isCloned) {
LOG.info("Cloned {}, because its running time {} (ms) is bigger than {} tasks' "
+ "(median) {} (ms) * (multiplier) {}", taskId, runningTime, completedTaskTimes.length,
medianTime, medianTimeMultiplier);
}
isNumOfCloneChanged.setValue(isCloned);
}
}
}
}
});
});
});
if (isNumOfCloneChanged.booleanValue()) {
doSchedule(); // Do schedule the new clone.
}
}
@Override
public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
executorRegistry.registerExecutor(executorRepresenter);
taskDispatcher.onExecutorSlotAvailable();
}
@Override
public void onExecutorRemoved(final String executorId) {
LOG.info("{} removed", executorId);
blockManagerMaster.removeWorker(executorId);
// These are tasks that were running at the time of executor removal.
final Set<String> interruptedTasks = new HashSet<>();
executorRegistry.updateExecutor(executorId, (executor, state) -> {
interruptedTasks.addAll(executor.onExecutorFailed());
return Pair.of(executor, ExecutorRegistry.ExecutorState.FAILED);
});
// Blocks of the interrupted tasks are failed.
interruptedTasks.forEach(blockManagerMaster::onProducerTaskFailed);
// Retry the interrupted tasks (and required parents)
retryTasksAndRequiredParents(interruptedTasks);
// Trigger the scheduling of SHOULD_RETRY tasks in the earliest scheduleGroup
doSchedule();
}
@Override
public void terminate() {
this.taskDispatcher.terminate();
this.executorRegistry.terminate();
}
////////////////////////////////////////////////////////////////////// Key methods for scheduling
/**
* The main entry point for task scheduling.
* This operation can be invoked at any point during job execution, as it is designed to be free of side-effects.
* <p>
* These are the reasons why.
* - We 'reset' {@link PendingTaskCollectionPointer}, and not 'add' new tasks to it
* - We make {@link TaskDispatcher} dispatch only the tasks that are READY.
*/
private void doSchedule() {
final Optional<List<Stage>> earliest = selectEarliestSchedulableGroup();
if (earliest.isPresent()) {
final List<Task> tasksToSchedule = earliest.get().stream()
.flatMap(stage -> selectSchedulableTasks(stage).stream())
.collect(Collectors.toList());
if (!tasksToSchedule.isEmpty()) {
LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", tasksToSchedule.stream()
.map(Task::getTaskId)
.map(RuntimeIdManager::getStageIdFromTaskId)
.collect(Collectors.toSet()));
// Set the pointer to the schedulable tasks.
pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
// Notify the dispatcher that a new collection is available.
taskDispatcher.onNewPendingTaskCollectionAvailable();
}
} else {
LOG.info("Skipping this round as no ScheduleGroup is schedulable.");
}
}
private Optional<List<Stage>> selectEarliestSchedulableGroup() {
if (sortedScheduleGroups == null) {
return Optional.empty();
}
return sortedScheduleGroups.stream()
.filter(scheduleGroup -> scheduleGroup.stream()
.map(Stage::getId)
.map(planStateManager::getStageState)
.anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // any incomplete stage in the group
.findFirst(); // selects the one with the smallest scheduling group index.
}
private List<Task> selectSchedulableTasks(final Stage stageToSchedule) {
if (stageToSchedule.getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).orElse(false)) {
// Ignore ghost stage.
for (final String taskId : planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId())) {
planStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
planStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
}
return Collections.emptyList();
}
final List<StageEdge> stageIncomingEdges =
planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
final List<StageEdge> stageOutgoingEdges =
planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
// Create and return tasks.
final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
final List<String> taskIdsToSchedule = planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
final List<Task> tasks = new ArrayList<>(taskIdsToSchedule.size());
taskIdsToSchedule.forEach(taskId -> {
final Set<String> blockIds = getOutputBlockIds(taskId);
blockManagerMaster.onProducerTaskScheduled(taskId, blockIds);
final int taskIdx = RuntimeIdManager.getIndexFromTaskId(taskId);
tasks.add(new Task(
planStateManager.getPhysicalPlan().getPlanId(),
taskId,
stageToSchedule.getExecutionProperties(),
stageToSchedule.getSerializedIRDAG(),
stageIncomingEdges,
stageOutgoingEdges,
vertexIdToReadables.get(taskIdx)));
});
return tasks;
}
////////////////////////////////////////////////////////////////////// Task state change handlers
/**
* Action after task execution has been completed.
* Note this method should not be invoked when the previous state of the task is ON_HOLD.
*
* @param executorId id of the executor.
* @param taskId the ID of the task completed.
*/
private void onTaskExecutionComplete(final String executorId,
final String taskId) {
LOG.debug("{} completed in {}", taskId, executorId);
executorRegistry.updateExecutor(executorId, (executor, state) -> {
executor.onTaskExecutionComplete(taskId);
return Pair.of(executor, state);
});
}
/**
* Get the target edge of dynamic optimization.
* The edge is annotated with {@link MetricCollectionProperty}, which is an outgoing edge of
* a parent of the stage put on hold.
*
* See {@link org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.SkewReshapingPass}
* for setting the target edge of dynamic optimization.
*
* @param taskId the task ID that sent stage-level aggregated metric for dynamic optimization.
* @return the edge to optimize.
*/
private StageEdge getEdgeToOptimize(final String taskId) {
// Get a stage including the given task
final Stage stagePutOnHold = planStateManager.getPhysicalPlan().getStageDAG().getVertices().stream()
.filter(stage -> stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
.findFirst()
.orElseThrow(() -> new RuntimeException());
// Stage put on hold, i.e. stage with vertex containing AggregateMetricTransform
// should have a parent stage whose outgoing edges contain the target edge of dynamic optimization.
final List<Stage> parentStages = planStateManager.getPhysicalPlan().getStageDAG()
.getParents(stagePutOnHold.getId());
if (parentStages.size() > 1) {
throw new RuntimeException("Error in setting target edge of dynamic optimization!");
}
// Get outgoing edges of that stage with MetricCollectionProperty
final List<StageEdge> stageEdges = planStateManager.getPhysicalPlan().getStageDAG()
.getOutgoingEdgesOf(parentStages.get(0));
for (StageEdge edge : stageEdges) {
if (edge.getExecutionProperties().containsKey(MetricCollectionProperty.class)) {
return edge;
}
}
return null;
}
/**
* Action for after task execution is put on hold.
*
* @param executorId the ID of the executor.
* @param taskId the ID of the task.
*/
private void onTaskExecutionOnHold(final String executorId,
final String taskId) {
LOG.info("{} put on hold in {}", new Object[]{taskId, executorId});
executorRegistry.updateExecutor(executorId, (executor, state) -> {
executor.onTaskExecutionComplete(taskId);
return Pair.of(executor, state);
});
final String stageIdForTaskUponCompletion = RuntimeIdManager.getStageIdFromTaskId(taskId);
final boolean stageComplete =
planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
final StageEdge targetEdge = getEdgeToOptimize(taskId);
if (targetEdge == null) {
throw new RuntimeException("No edges specified for data skew optimization");
}
if (stageComplete) {
final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
.filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
.findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
pubSubEventHandlerWrapper.getPubSubEventHandler()
.onNext(new DynamicOptimizationEvent(planStateManager.getPhysicalPlan(), dynOptDataHandler.getDynOptData(),
taskId, executorId, targetEdge));
}
}
/**
* Action for after task execution has failed but it's recoverable.
*
* @param executorId the ID of the executor
* @param taskId the ID of the task
* @param failureCause the cause of failure
*/
private void onTaskExecutionFailedRecoverable(final String executorId,
final String taskId,
final TaskState.RecoverableTaskFailureCause failureCause) {
LOG.info("{} failed in {} by {}", taskId, executorId, failureCause);
executorRegistry.updateExecutor(executorId, (executor, state) -> {
executor.onTaskExecutionFailed(taskId);
return Pair.of(executor, state);
});
switch (failureCause) {
// Previous task must be re-executed, and incomplete tasks of the belonging stage must be rescheduled.
case INPUT_READ_FAILURE:
// TODO #54: Handle remote data fetch failures
case OUTPUT_WRITE_FAILURE:
blockManagerMaster.onProducerTaskFailed(taskId);
break;
default:
throw new UnknownFailureCauseException(new Throwable("Unknown cause: " + failureCause));
}
retryTasksAndRequiredParents(Collections.singleton(taskId));
}
////////////////////////////////////////////////////////////////////// Helper methods
private void retryTasksAndRequiredParents(final Set<String> tasks) {
final Set<String> requiredParents = recursivelyGetParentTasksForLostBlocks(tasks);
final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
LOG.info("Will be retried: {}", tasksToRetry);
tasksToRetry.forEach(
taskToReExecute -> planStateManager.onTaskStateChanged(taskToReExecute, TaskState.State.SHOULD_RETRY));
}
private Set<String> recursivelyGetParentTasksForLostBlocks(final Set<String> children) {
if (children.isEmpty()) {
return Collections.emptySet();
}
final DAG<Stage, StageEdge> stageDAG = planStateManager.getPhysicalPlan().getStageDAG();
final Map<String, StageEdge> idToIncomingEdges = children.stream()
.map(RuntimeIdManager::getStageIdFromTaskId)
.flatMap(stageId -> stageDAG.getIncomingEdgesOf(stageId).stream())
// Ignore duplicates with the mergeFunction in toMap(_,_,mergeFunction)
.collect(Collectors.toMap(StageEdge::getId, Function.identity(), (l, r) -> l));
final Set<String> parentsWithLostBlocks = children.stream()
.flatMap(child -> getInputBlockIds(child).stream()) // child task id -> parent block ids
.map(RuntimeIdManager::getWildCardFromBlockId) // parent block id -> parent block wildcard
.collect(Collectors.toSet()).stream() // remove duplicate wildcards
.filter(parentBlockWildcard -> // lost block = no matching AVAILABLE block attempt for the wildcard
blockManagerMaster.getBlockHandlers(parentBlockWildcard, BlockState.State.AVAILABLE).isEmpty())
.flatMap(lostParentBlockWildcard -> {
// COMPLETE task attempts of the lostParentBlockWildcard must become SHOULD_RETRY
final String inEdgeId = RuntimeIdManager.getRuntimeEdgeIdFromBlockId(lostParentBlockWildcard);
final String parentStageId = idToIncomingEdges.get(inEdgeId).getSrc().getId();
final int parentTaskIndex = RuntimeIdManager.getTaskIndexFromBlockId(lostParentBlockWildcard);
return planStateManager.getAllTaskAttemptsOfStage(parentStageId)
.stream()
.filter(taskId -> RuntimeIdManager.getStageIdFromTaskId(taskId).equals(parentStageId)
&& RuntimeIdManager.getIndexFromTaskId(taskId) == parentTaskIndex)
// COMPLETE -> SHOULD_RETRY
.filter(taskId -> planStateManager.getTaskState(taskId).equals(TaskState.State.COMPLETE));
})
.collect(Collectors.toSet());
// Recursive call
return Sets.union(parentsWithLostBlocks, recursivelyGetParentTasksForLostBlocks(parentsWithLostBlocks));
}
private Set<String> getOutputBlockIds(final String taskId) {
return planStateManager.getPhysicalPlan().getStageDAG()
.getOutgoingEdgesOf(RuntimeIdManager.getStageIdFromTaskId(taskId))
.stream()
.map(stageEdge -> RuntimeIdManager.generateBlockId(stageEdge.getId(), taskId))
.collect(Collectors.toSet()); // ids of blocks this task will produce
}
private Set<String> getInputBlockIds(final String childTaskId) {
final String stageIdOfChildTask = RuntimeIdManager.getStageIdFromTaskId(childTaskId);
return planStateManager.getPhysicalPlan().getStageDAG().getIncomingEdgesOf(stageIdOfChildTask)
.stream()
.flatMap(inStageEdge -> {
final Set<String> parentTaskIds = planStateManager.getAllTaskAttemptsOfStage(inStageEdge.getSrc().getId());
switch (inStageEdge.getDataCommunicationPattern()) {
case Shuffle:
case BroadCast:
// All of the parent stage's tasks
return parentTaskIds.stream()
.map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
case OneToOne:
// Same-index tasks of the parent stage
return parentTaskIds.stream()
.filter(parentTaskId ->
RuntimeIdManager.getIndexFromTaskId(parentTaskId) == RuntimeIdManager.getIndexFromTaskId(childTaskId))
.map(parentTaskId -> RuntimeIdManager.generateBlockId(inStageEdge.getId(), parentTaskId));
default:
throw new IllegalStateException(inStageEdge.toString());
}
})
.collect(Collectors.toSet());
}
/**
* Update the data for dynamic optimization.
*
* @param dynOptData the data to update.
*/
public void updateDynOptData(final Object dynOptData) {
final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
.findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
dynOptDataHandler.updateDynOptData(dynOptData);
}
}