blob: 05e2d8ce78fd7f30561b83ba39e307458a02ef06 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.library.vertexmanager;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.common.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.io.NonSyncByteArrayInputStream;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.TaskIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
import java.io.DataInputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.zip.Inflater;
/**
* It provides common functions used by ShuffleVertexManager and
* FairShuffleVertexManager.
*/
@Private
@Evolving
abstract class ShuffleVertexManagerBase extends VertexManagerPlugin {
static long MB = 1024l * 1024l;
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleVertexManagerBase.class);
ComputeRoutingAction computeRoutingAction = ComputeRoutingAction.WAIT;
int totalNumBipartiteSourceTasks = 0;
int numBipartiteSourceTasksCompleted = 0;
int numVertexManagerEventsReceived = 0;
List<VertexManagerEvent> pendingVMEvents = Lists.newLinkedList();
AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
private Set<TaskIdentifier> taskWithVmEvents = Sets.newHashSet();
//Track source vertex and its finished tasks
private final Map<String, SourceVertexInfo> srcVertexInfo = Maps.newConcurrentMap();
boolean sourceVerticesScheduled = false;
@VisibleForTesting
int bipartiteSources = 0;
long completedSourceTasksOutputSize = 0;
List<VertexStateUpdate> pendingStateUpdates = Lists.newArrayList();
List<PendingTaskInfo> pendingTasks = Lists.newLinkedList();
int totalTasksToSchedule = 0;
@VisibleForTesting
Configuration conf;
ShuffleVertexManagerBaseConfig config;
// requires synchronized access
final Inflater inflater;
/**
* Used when automatic parallelism is enabled
* Initially the vertex manager will start in WAIT state.
* After it gathers enough data, it will compute the new
* parallelism. In some special cases, it will skip the parallelism
* computation.
*/
enum ComputeRoutingAction {
WAIT, // not enough data yet.
SKIP, // skip the routing computation
COMPUTE; // compute the new routing
public boolean determined() {
return this != WAIT;
}
}
static class SourceVertexInfo {
final EdgeProperty edgeProperty;
boolean vertexIsConfigured;
final BitSet finishedTaskSet;
int numTasks;
int numVMEventsReceived;
long outputSize;
int[] statsInMB;
EdgeManagerPluginDescriptor newDescriptor;
SourceVertexInfo(final EdgeProperty edgeProperty,
int totalTasksToSchedule) {
this.edgeProperty = edgeProperty;
this.finishedTaskSet = new BitSet();
this.statsInMB = new int[totalTasksToSchedule];
}
int getNumTasks() {
return numTasks;
}
int getNumCompletedTasks() {
return finishedTaskSet.cardinality();
}
BigInteger getExpectedStatsAtIndex(int index) {
return (numVMEventsReceived == 0) ?
BigInteger.ZERO :
BigInteger.valueOf(statsInMB[index]).
multiply(BigInteger.valueOf(numTasks)).
divide(BigInteger.valueOf(numVMEventsReceived)).
multiply(BigInteger.valueOf(MB));
}
}
SourceVertexInfo createSourceVertexInfo(EdgeProperty edgeProperty,
int numTasks) {
return new SourceVertexInfo(edgeProperty, numTasks);
}
static class PendingTaskInfo {
final private int index;
private int inputStats;
public PendingTaskInfo(int index) {
this.index = index;
}
public String toString() {
return "[index=" + index + ", inputStats=" + inputStats + "]";
}
public int getIndex() {
return index;
}
public int getInputStats() {
return inputStats;
}
// return true if stat is set.
public boolean setInputStats(int inputStats) {
if (inputStats > 0 && this.inputStats != inputStats) {
this.inputStats = inputStats;
return true;
} else {
return false;
}
}
}
static class ReconfigVertexParams {
final private int finalParallelism;
final private VertexLocationHint locationHint;
public ReconfigVertexParams(final int finalParallelism,
final VertexLocationHint locationHint) {
this.finalParallelism = finalParallelism;
this.locationHint = locationHint;
}
public int getFinalParallelism() {
return finalParallelism;
}
public VertexLocationHint getLocationHint() {
return locationHint;
}
}
public ShuffleVertexManagerBase(VertexManagerPluginContext context) {
super(context);
inflater = TezCommonUtils.newInflater();
}
@Override
public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions) {
// examine edges after vertex started because until then these may not have been defined
Map<String, EdgeProperty> inputs = getContext().getInputVertexEdgeProperties();
for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
srcVertexInfo.put(entry.getKey(), createSourceVertexInfo(entry.getValue(),
getContext().getVertexNumTasks(getContext().getVertexName())));
// TODO what if derived class has already called this
// register for status update from all source vertices
getContext().registerForVertexStateUpdates(entry.getKey(),
EnumSet.of(VertexState.CONFIGURED));
if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
bipartiteSources++;
}
}
onVertexStartedCheck();
for (VertexStateUpdate stateUpdate : pendingStateUpdates) {
handleVertexStateUpdate(stateUpdate);
}
pendingStateUpdates.clear();
// track the tasks in this vertex
updatePendingTasks();
for (VertexManagerEvent vmEvent : pendingVMEvents) {
handleVertexManagerEvent(vmEvent);
}
pendingVMEvents.clear();
LOG.info("OnVertexStarted vertex: {} with {} source tasks and {} pending" +
" tasks", getContext().getVertexName(), totalNumBipartiteSourceTasks,
totalTasksToSchedule);
if (completions != null) {
for (TaskAttemptIdentifier attempt : completions) {
onSourceTaskCompleted(attempt);
}
}
onVertexStartedDone.set(true);
// for the special case when source has 0 tasks or min fraction == 0
processPendingTasks(null);
}
protected void onVertexStartedCheck() {
if(bipartiteSources == 0) {
throw new TezUncheckedException("At least 1 bipartite source should exist");
}
}
@Override
public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName();
int srcTaskId = attempt.getTaskIdentifier().getIdentifier();
SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName);
if (srcInfo.vertexIsConfigured) {
Preconditions.checkState(srcTaskId < srcInfo.numTasks,
"Received completion for srcTaskId " + srcTaskId + " but Vertex: " + srcVertexName +
" has only " + srcInfo.numTasks + " tasks");
}
//handle duplicate events and count task completions from all source vertices
BitSet completedSourceTasks = srcInfo.finishedTaskSet;
// duplicate notifications tracking
if (!completedSourceTasks.get(srcTaskId)) {
completedSourceTasks.set(srcTaskId);
// source task has completed
if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
numBipartiteSourceTasksCompleted++;
}
}
processPendingTasks(attempt);
}
@VisibleForTesting
void parsePartitionStats(SourceVertexInfo srcInfo,
RoaringBitmap partitionStats) {
Preconditions.checkState(srcInfo.statsInMB != null,
"Stats should be initialized");
Iterator<Integer> it = partitionStats.iterator();
final DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values();
final int RANGE_LEN = RANGES.length;
while (it.hasNext()) {
int pos = it.next();
int index = ((pos) / RANGE_LEN);
int rangeIndex = ((pos) % RANGE_LEN);
//Add to aggregated stats and normalize to DATA_RANGE_IN_MB.
if (RANGES[rangeIndex].getSizeInMB() > 0) {
srcInfo.statsInMB[index] += RANGES[rangeIndex].getSizeInMB();
}
}
}
void parseDetailedPartitionStats(SourceVertexInfo srcInfo,
List<Integer> partitionStats) {
for (int i=0; i<partitionStats.size(); i++) {
srcInfo.statsInMB[i] += partitionStats.get(i);
}
}
@Override
public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
if (onVertexStartedDone.get()) {
// internal data structures have been initialized - so handle the events directly
handleVertexManagerEvent(vmEvent);
} else {
// save this event for processing after vertex starts
pendingVMEvents.add(vmEvent);
}
}
private void handleVertexManagerEvent(VertexManagerEvent vmEvent) {
// currently events from multiple attempts of the same task can be ignored because
// their output will be the same.
TaskIdentifier producerTask = vmEvent.getProducerAttemptIdentifier().getTaskIdentifier();
if (!taskWithVmEvents.add(producerTask)) {
LOG.info("Ignoring vertex manager event from: {}", producerTask);
return;
}
String vName = producerTask.getVertexIdentifier().getName();
SourceVertexInfo srcInfo = srcVertexInfo.get(vName);
Preconditions.checkState(srcInfo != null,
"Unknown vmEvent from " + producerTask);
numVertexManagerEventsReceived++;
long sourceTaskOutputSize = 0;
if (vmEvent.getUserPayload() != null) {
// save output size
VertexManagerEventPayloadProto proto;
try {
proto = VertexManagerEventPayloadProto.parseFrom(
ByteString.copyFrom(vmEvent.getUserPayload()));
} catch (InvalidProtocolBufferException e) {
throw new TezUncheckedException(e);
}
sourceTaskOutputSize = proto.getOutputSize();
if (proto.hasPartitionStats()) {
try {
RoaringBitmap partitionStats = new RoaringBitmap();
ByteString compressedPartitionStats = proto.getPartitionStats();
byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(
compressedPartitionStats, inflater);
NonSyncByteArrayInputStream bin = new NonSyncByteArrayInputStream(rawData);
partitionStats.deserialize(new DataInputStream(bin));
parsePartitionStats(srcInfo, partitionStats);
} catch (IOException e) {
throw new TezUncheckedException(e);
}
} else if (proto.hasDetailedPartitionStats()) {
List<Integer> detailedPartitionStats =
proto.getDetailedPartitionStats().getSizeInMbList();
parseDetailedPartitionStats(srcInfo, detailedPartitionStats);
}
srcInfo.numVMEventsReceived++;
srcInfo.outputSize += sourceTaskOutputSize;
completedSourceTasksOutputSize += sourceTaskOutputSize;
}
if (LOG.isDebugEnabled()) {
LOG.debug("For attempt: {} received info of output size: {}"
+ " vertex numEventsReceived: {} vertex output size: {}"
+ " total numEventsReceived: {} total output size: {}",
vmEvent.getProducerAttemptIdentifier(), sourceTaskOutputSize,
srcInfo.numVMEventsReceived, srcInfo.outputSize,
numVertexManagerEventsReceived, completedSourceTasksOutputSize);
}
}
void updatePendingTasks() {
int tasks = getContext().getVertexNumTasks(getContext().getVertexName());
if (tasks == pendingTasks.size() || tasks <= 0) {
return;
}
pendingTasks.clear();
for (int i = 0; i < tasks; ++i) {
pendingTasks.add(new PendingTaskInfo(i));
}
totalTasksToSchedule = pendingTasks.size();
}
/**
* Beginning of functions related to how new parallelism is determined.
* ShuffleVertexManagerBase implements common functionality used by
* VertexManagerPlugin, while each VertexManagerPlugin implements its own
* routing policy.
*/
private ComputeRoutingAction getComputeRoutingAction(
float minSourceVertexCompletedTaskFraction) {
if (getNumOfTasksToSchedule(minSourceVertexCompletedTaskFraction) <= 0 &&
numBipartiteSourceTasksCompleted != totalNumBipartiteSourceTasks) {
// Wait when there aren't enough completed tasks
return ComputeRoutingAction.WAIT;
} else if (numVertexManagerEventsReceived == 0 &&
totalNumBipartiteSourceTasks > 0) {
// When source tasks don't have output data,
// there will be no VME.
return ComputeRoutingAction.SKIP;
} else if (
completedSourceTasksOutputSize < config.getDesiredTaskInputDataSize()
&& (minSourceVertexCompletedTaskFraction < config.getMaxFraction())) {
/**
* When overall completed output size is not even equal to
* desiredTaskInputSize, we can wait for some more data to be available to
* determine better parallelism until max.fraction is reached.
* min.fraction is just a hint to the framework and need not be
* honored strictly in this case.
*/
LOG.info("Defer scheduling tasks; vertex = {}"
+ ", totalNumBipartiteSourceTasks = {}"
+ ", completedSourceTasksOutputSize = {}"
+ ", numVertexManagerEventsReceived = {}"
+ ", numBipartiteSourceTasksCompleted = {}"
+ ", minSourceVertexCompletedTaskFraction = {}",
getContext().getVertexName(), totalNumBipartiteSourceTasks,
completedSourceTasksOutputSize, numVertexManagerEventsReceived,
numBipartiteSourceTasksCompleted,
minSourceVertexCompletedTaskFraction);
return ComputeRoutingAction.WAIT;
} else {
return ComputeRoutingAction.COMPUTE;
}
}
BigInteger getExpectedTotalBipartiteSourceTasksOutputSize() {
BigInteger expectedTotalSourceTasksOutputSize = BigInteger.ZERO;
for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
SourceVertexInfo srcInfo = vInfo.getValue();
if (srcInfo.numTasks > 0 && srcInfo.numVMEventsReceived > 0) {
// this assumes that 1 vmEvent is received per completed task - TEZ-2961
// Estimate total size by projecting based on the current average size per event
BigInteger srcOutputSize = BigInteger.valueOf(srcInfo.outputSize);
BigInteger srcNumTasks = BigInteger.valueOf(srcInfo.numTasks);
BigInteger srcNumVMEventsReceived = BigInteger.valueOf(srcInfo.numVMEventsReceived);
BigInteger expectedSrcOutputSize = srcOutputSize.multiply(
srcNumTasks).divide(srcNumVMEventsReceived);
expectedTotalSourceTasksOutputSize =
expectedTotalSourceTasksOutputSize.add(expectedSrcOutputSize);
}
}
return expectedTotalSourceTasksOutputSize;
}
int getCurrentlyKnownStatsAtIndex(int index) {
int stats = 0;
for(SourceVertexInfo entry : getAllSourceVertexInfo()) {
stats += entry.statsInMB[index];
}
return stats;
}
long getExpectedStatsAtIndex(int index) {
BigInteger stats = BigInteger.ZERO;
for(SourceVertexInfo entry : getAllSourceVertexInfo()) {
stats = stats.add(entry.getExpectedStatsAtIndex(index));
}
if (stats.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
LOG.warn("Partition {}'s size {} exceeded Long.MAX_VALUE", index, stats);
return Long.MAX_VALUE;
} else {
return stats.longValue();
}
}
/**
* Subclass might return null to indicate there is no new routing.
*/
abstract ReconfigVertexParams computeRouting();
abstract void postReconfigVertex();
/**
* Compute optimal parallelism needed for the job
* @return true (if parallelism is determined), false otherwise
*/
@VisibleForTesting
boolean determineParallelismAndApply(
float minSourceVertexCompletedTaskFraction) {
if (computeRoutingAction.equals(ComputeRoutingAction.WAIT)) {
ComputeRoutingAction computeRoutingAction = getComputeRoutingAction(
minSourceVertexCompletedTaskFraction);
if (computeRoutingAction.equals(computeRoutingAction.COMPUTE)) {
ReconfigVertexParams params = computeRouting();
if (params != null) {
reconfigVertex(params.getFinalParallelism());
updatePendingTasks();
postReconfigVertex();
}
}
if (!computeRoutingAction.equals(ComputeRoutingAction.WAIT)) {
getContext().doneReconfiguringVertex();
}
this.computeRoutingAction = computeRoutingAction;
}
return this.computeRoutingAction.determined();
}
private boolean determineParallelismAndApply() {
return determineParallelismAndApply(
getMinSourceVertexCompletedTaskFraction());
}
/**
* End of functions related to how new parallelism is determined.
*/
/**
* Subclass might return null or empty list to indicate no tasks
* to schedule at this point.
* @param completedSourceAttempt the completed source task attempt
* @return the list of tasks to schedule.
*/
abstract List<ScheduleTaskRequest> getTasksToSchedule(
TaskAttemptIdentifier completedSourceAttempt);
abstract void processPendingTasks();
private void schedulePendingTasks(
TaskAttemptIdentifier completedSourceAttempt) {
List<ScheduleTaskRequest> scheduledTasks =
getTasksToSchedule(completedSourceAttempt);
if (scheduledTasks != null && scheduledTasks.size() > 0) {
getContext().scheduleTasks(scheduledTasks);
}
}
Iterable<SourceVertexInfo> getAllSourceVertexInfo() {
return srcVertexInfo.values();
}
SourceVertexInfo getSourceVertexInfo(String vertextName) {
return srcVertexInfo.get(vertextName);
}
Iterable<Map.Entry<String, SourceVertexInfo>> getBipartiteInfo() {
return Iterables.filter(srcVertexInfo.entrySet(),
new Predicate<Map.Entry<String,SourceVertexInfo>>() {
public boolean apply(Map.Entry<String, SourceVertexInfo> input) {
return (input.getValue().edgeProperty.getDataMovementType() ==
DataMovementType.SCATTER_GATHER);
}
});
}
/**
* Verify whether each of the source vertices have completed at least 1 task
*
* @return boolean
*/
private boolean canScheduleTasks() {
for(Map.Entry<String, SourceVertexInfo> entry : srcVertexInfo.entrySet()) {
// need to check for vertex configured because until that we dont know
// if numTask s== 0 is valid
if (!entry.getValue().vertexIsConfigured) { // isConfigured
// vertex not scheduled tasks
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for vertex: {} in vertex: {}", entry.getKey(),
getContext().getVertexName());
}
return false;
}
}
sourceVerticesScheduled = true;
return sourceVerticesScheduled;
}
int getNumOfTasksToScheduleAndLog(float minFraction) {
int numTasksToSchedule = getNumOfTasksToSchedule(minFraction);
if (numTasksToSchedule > 0) {
// numTasksToSchedule can be -ve if minFraction
// is less than slowStartMinSrcCompletionFraction.
LOG.info("Scheduling {} tasks for vertex: {} with totalTasks: {}. " +
"{} source tasks completed out of {}. " +
"MinSourceTaskCompletedFraction: {} min: {} max: {}",
numTasksToSchedule, getContext().getVertexName(),
totalTasksToSchedule, numBipartiteSourceTasksCompleted,
totalNumBipartiteSourceTasks, minFraction,
config.getMinFraction(), config.getMaxFraction());
}
return numTasksToSchedule;
}
int getNumOfTasksToSchedule(float minSourceVertexCompletedTaskFraction) {
int numPendingTasks = pendingTasks.size();
if (numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks) {
LOG.info("All source tasks completed. Ramping up {} remaining tasks" +
" for vertex: {}", numPendingTasks, getContext().getVertexName());
return numPendingTasks;
}
// start scheduling when source tasks completed fraction is more than min.
// linearly increase the number of scheduled tasks such that all tasks are
// scheduled when source tasks completed fraction reaches max
float tasksFractionToSchedule = 1;
float percentRange =
config.getMaxFraction() - config.getMinFraction();
if (percentRange > 0) {
tasksFractionToSchedule =
(minSourceVertexCompletedTaskFraction -
config.getMinFraction()) / percentRange;
} else {
// min and max are equal. schedule 100% on reaching min
if(minSourceVertexCompletedTaskFraction <
config.getMinFraction()) {
tasksFractionToSchedule = 0;
}
}
tasksFractionToSchedule =
Math.max(0, Math.min(1, tasksFractionToSchedule));
// round up to avoid the corner case that single task cannot be scheduled
// until src completed fraction reach max
return ((int)(Math.ceil(tasksFractionToSchedule * totalTasksToSchedule)) -
(totalTasksToSchedule - numPendingTasks));
}
float getMinSourceVertexCompletedTaskFraction() {
float minSourceVertexCompletedTaskFraction = 1f;
if (numBipartiteSourceTasksCompleted != totalNumBipartiteSourceTasks) {
for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
SourceVertexInfo srcInfo = vInfo.getValue();
// canScheduleTasks check has already verified all sources are configured
Preconditions.checkState(srcInfo.vertexIsConfigured,
"Vertex: " + vInfo.getKey());
if (srcInfo.numTasks > 0) {
int numCompletedTasks = srcInfo.getNumCompletedTasks();
float completedFraction =
(float) numCompletedTasks / srcInfo.numTasks;
if (minSourceVertexCompletedTaskFraction > completedFraction) {
minSourceVertexCompletedTaskFraction = completedFraction;
}
}
}
}
return minSourceVertexCompletedTaskFraction;
}
private boolean preconditionsSatisfied() {
if (!onVertexStartedDone.get()) {
// vertex not started yet
return false;
}
if (!sourceVerticesScheduled && !canScheduleTasks()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Defer scheduling tasks for vertex: {} as one task needs " +
"to be completed per source vertex", getContext().getVertexName());
}
return false;
}
return true;
}
/**
* Process pending tasks when a source task has completed.
* The processing goes through 4 steps.
* Step 1: Precondition check such as whether the vertex has started.
* Step 2: Determine new parallelism if possible.
* Step 3: Process pending tasks such as sorting based on size.
* Step 4: Schedule the pending tasks.
* @param completedSourceAttempt
*/
private void processPendingTasks(TaskAttemptIdentifier completedSourceAttempt) {
if (!preconditionsSatisfied()) {
return;
}
// determine parallelism before scheduling the first time
// this is the latest we can wait before determining parallelism.
// currently this depends on task completion and so this is the best time
// to do this. This is the max time we have until we have to launch tasks
// as specified by the user. If/When we move to some other method of
// calculating parallelism or change parallelism while tasks are already
// running then we can create other parameters to trigger this calculation.
if(config.isAutoParallelismEnabled()) {
if (!determineParallelismAndApply()) {
//try to determine parallelism later when more info is available.
return;
}
}
processPendingTasks();
schedulePendingTasks(completedSourceAttempt);
}
static class ShuffleVertexManagerBaseConfig {
final private boolean enableAutoParallelism;
final private long desiredTaskInputDataSize;
final private float slowStartMinFraction;
final private float slowStartMaxFraction;
public ShuffleVertexManagerBaseConfig(final boolean enableAutoParallelism,
final long desiredTaskInputDataSize, final float slowStartMinFraction,
final float slowStartMaxFraction) {
if (slowStartMinFraction < 0 || slowStartMaxFraction > 1
|| slowStartMaxFraction < slowStartMinFraction) {
throw new IllegalArgumentException(
"Invalid values for slowStartMinFraction"
+ "/slowStartMaxFraction. Min "
+ "cannot be < 0, max cannot be > 1, and max cannot be < min."
+ ", configuredMin=" + slowStartMinFraction
+ ", configuredMax=" + slowStartMaxFraction);
}
this.enableAutoParallelism = enableAutoParallelism;
this.desiredTaskInputDataSize = desiredTaskInputDataSize;
this.slowStartMinFraction = slowStartMinFraction;
this.slowStartMaxFraction = slowStartMaxFraction;
LOG.info("Settings minFrac: {} maxFrac: {} auto: {} desiredTaskIput: {}",
slowStartMinFraction, slowStartMaxFraction, enableAutoParallelism,
desiredTaskInputDataSize);
}
public boolean isAutoParallelismEnabled() {
return this.enableAutoParallelism;
}
public long getDesiredTaskInputDataSize() {
return this.desiredTaskInputDataSize;
}
public float getMinFraction() {
return this.slowStartMinFraction;
}
public float getMaxFraction() {
return this.slowStartMaxFraction;
}
}
abstract ShuffleVertexManagerBaseConfig initConfiguration();
@Override
public void initialize() {
try {
conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
} catch (IOException e) {
throw new TezUncheckedException(e);
}
config = initConfiguration();
updatePendingTasks();
if (config.isAutoParallelismEnabled()) {
getContext().vertexReconfigurationPlanned();
}
// dont track the source tasks here since those tasks may themselves be
// dynamically changed as the DAG progresses.
}
private void handleVertexStateUpdate(VertexStateUpdate stateUpdate) {
Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED,
"Received incorrect state notification : " + stateUpdate.getVertexState() + " for vertex: "
+ stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
Preconditions.checkArgument(srcVertexInfo.containsKey(stateUpdate.getVertexName()),
"Received incorrect vertex notification : " + stateUpdate.getVertexState() + " for vertex: "
+ stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
SourceVertexInfo vInfo = srcVertexInfo.get(stateUpdate.getVertexName());
Preconditions.checkState(vInfo.vertexIsConfigured == false);
vInfo.vertexIsConfigured = true;
vInfo.numTasks = getContext().getVertexNumTasks(stateUpdate.getVertexName());
if (vInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) {
totalNumBipartiteSourceTasks += vInfo.numTasks;
}
LOG.info("Received configured notification : {}" + " for vertex: {} in" +
" vertex: {}" + " numBipartiteSourceTasks: {}",
stateUpdate.getVertexState(), stateUpdate.getVertexName(),
getContext().getVertexName(), totalNumBipartiteSourceTasks);
processPendingTasks(null);
}
@Override
public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
if (stateUpdate.getVertexState() == VertexState.CONFIGURED) {
// we will not register for updates until our vertex starts.
// derived classes can make other update requests for other states that we should
// ignore. However that will not be allowed until the state change notified supports
// multiple registers for the same vertex
if (onVertexStartedDone.get()) {
// normally this if check will always be true because we register after vertex
// start.
handleVertexStateUpdate(stateUpdate);
} else {
// normally this code will not trigger since we are the ones who register for
// the configured states updates and that will happen after vertex starts.
// So this code will only trigger if a derived class also registers for updates
// for the same vertices but multiple registers to the same vertex is currently
// not supported by the state change notifier code. This is just future proofing
// when that is supported
// vertex not started yet. So edge info may not have been defined correctly yet.
pendingStateUpdates.add(stateUpdate);
}
}
}
@Override
public synchronized void onRootVertexInitialized(String inputName,
InputDescriptor inputDescriptor, List<Event> events) {
// Not allowing this for now. Nothing to do.
}
private void reconfigVertex(final int finalTaskParallelism) {
Map<String, EdgeProperty> edgeProperties =
new HashMap<String, EdgeProperty>(bipartiteSources);
Iterable<Map.Entry<String, SourceVertexInfo>> bipartiteItr = getBipartiteInfo();
for(Map.Entry<String, SourceVertexInfo> entry : bipartiteItr) {
String vertex = entry.getKey();
EdgeProperty oldEdgeProp = entry.getValue().edgeProperty;
EdgeProperty newEdgeProp = EdgeProperty.create(entry.getValue().newDescriptor,
oldEdgeProp.getDataSourceType(), oldEdgeProp.getSchedulingType(),
oldEdgeProp.getEdgeSource(), oldEdgeProp.getEdgeDestination());
edgeProperties.put(vertex, newEdgeProp);
}
getContext().reconfigureVertex(finalTaskParallelism, null, edgeProperties);
}
}