blob: b05c45ad96bf7ec236aaf32db357e3b244b66d8e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.library.vertexmanager;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.ShuffleEdgeManagerConfigPayloadProto;
import javax.annotation.Nullable;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
/**
* Starts scheduling tasks when number of completed source tasks crosses
* <code>slowStartMinSrcCompletionFraction</code> and schedules all tasks
* when <code>slowStartMaxSrcCompletionFraction</code> is reached
*/
@Public
@Evolving
public class ShuffleVertexManager extends ShuffleVertexManagerBase {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleVertexManager.class);
/**
* The desired size of input per task. Parallelism will be changed to meet this criteria
*/
public static final String TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE =
"tez.shuffle-vertex-manager.desired-task-input-size";
public static final long
TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT = 100 * MB;
/**
* Enables automatic parallelism determination for the vertex. Based on input data
* statistics the parallelism is decreased to a desired level.
*/
public static final String TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL =
"tez.shuffle-vertex-manager.enable.auto-parallel";
public static final boolean
TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT = false;
/**
* Automatic parallelism determination will not decrease parallelism below this value
*/
public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM =
"tez.shuffle-vertex-manager.min-task-parallelism";
public static final int TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT = 1;
/**
* In case of a ScatterGather connection, the fraction of source tasks which
* should complete before tasks for the current vertex are scheduled
*/
public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION =
"tez.shuffle-vertex-manager.min-src-fraction";
public static final float TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT = 0.25f;
/**
* In case of a ScatterGather connection, once this fraction of source tasks
* have completed, all tasks on the current vertex can be scheduled. Number of
* tasks ready for scheduling on the current vertex scales linearly between
* min-fraction and max-fraction. Defaults to the greater of the default value
* or tez.shuffle-vertex-manager.min-src-fraction.
*/
public static final String TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION =
"tez.shuffle-vertex-manager.max-src-fraction";
public static final float TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
ShuffleVertexManagerConfig mgrConfig;
private int[][] targetIndexes;
private int basePartitionRange;
private int remainderRangeForLastShuffler;
public ShuffleVertexManager(VertexManagerPluginContext context) {
super(context);
}
static class ShuffleVertexManagerConfig extends ShuffleVertexManagerBaseConfig {
final int minTaskParallelism;
public ShuffleVertexManagerConfig(final boolean enableAutoParallelism,
final long desiredTaskInputDataSize, final float slowStartMinFraction,
final float slowStartMaxFraction, final int minTaskParallelism) {
super(enableAutoParallelism, desiredTaskInputDataSize,
slowStartMinFraction, slowStartMaxFraction);
this.minTaskParallelism = minTaskParallelism;
LOG.info("minTaskParallelism {}", this.minTaskParallelism);
}
int getMinTaskParallelism() {
return minTaskParallelism;
}
}
@Override
ShuffleVertexManagerBaseConfig initConfiguration() {
float slowStartMinFraction = conf.getFloat(
TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT);
mgrConfig = new ShuffleVertexManagerConfig(
conf.getBoolean(
TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT),
conf.getLong(
TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT),
slowStartMinFraction,
conf.getFloat(
TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
Math.max(slowStartMinFraction,
TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT)),
Math.max(1, conf
.getInt(TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT)));
return mgrConfig;
}
static int[] createIndices(int partitionRange, int taskIndex,
int offSetPerTask) {
int startIndex = taskIndex * offSetPerTask;
int[] indices = new int[partitionRange];
for (int currentIndex = 0; currentIndex < partitionRange; ++currentIndex) {
indices[currentIndex] = (startIndex + currentIndex);
}
return indices;
}
public static class CustomShuffleEdgeManager extends EdgeManagerPluginOnDemand {
int numSourceTaskOutputs;
int numDestinationTasks;
int basePartitionRange;
int remainderRangeForLastShuffler;
int numSourceTasks;
int[][] sourceIndices;
int[][] targetIndices;
public CustomShuffleEdgeManager(EdgeManagerPluginContext context) {
super(context);
}
@Override
public void initialize() {
// Nothing to do. This class isn't currently designed to be used at the DAG API level.
UserPayload userPayload = getContext().getUserPayload();
if (userPayload == null || userPayload.getPayload() == null ||
userPayload.getPayload().limit() == 0) {
throw new RuntimeException("Could not initialize CustomShuffleEdgeManager"
+ " from provided user payload");
}
CustomShuffleEdgeManagerConfig config;
try {
config = CustomShuffleEdgeManagerConfig.fromUserPayload(userPayload);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Could not initialize CustomShuffleEdgeManager"
+ " from provided user payload", e);
}
this.numSourceTaskOutputs = config.numSourceTaskOutputs;
this.numDestinationTasks = config.numDestinationTasks;
this.basePartitionRange = config.basePartitionRange;
this.remainderRangeForLastShuffler = config.remainderRangeForLastShuffler;
this.numSourceTasks = getContext().getSourceVertexNumTasks();
Preconditions.checkState(this.numDestinationTasks == getContext().getDestinationVertexNumTasks());
}
@Override
public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
int partitionRange = 1;
if(destinationTaskIndex < numDestinationTasks-1) {
partitionRange = basePartitionRange;
} else {
partitionRange = remainderRangeForLastShuffler;
}
return numSourceTasks * partitionRange;
}
@Override
public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
return numSourceTaskOutputs;
}
@Override
public void routeDataMovementEventToDestination(DataMovementEvent event,
int sourceTaskIndex, int sourceOutputIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
int sourceIndex = event.getSourceIndex();
int destinationTaskIndex = sourceIndex/basePartitionRange;
int partitionRange = 1;
if(destinationTaskIndex < numDestinationTasks-1) {
partitionRange = basePartitionRange;
} else {
partitionRange = remainderRangeForLastShuffler;
}
// all inputs from a source task are next to each other in original order
int targetIndex =
sourceTaskIndex * partitionRange
+ sourceIndex % partitionRange;
destinationTaskAndInputIndices.put(
destinationTaskIndex, Collections.singletonList(targetIndex));
}
@Override
public EventRouteMetadata routeDataMovementEventToDestination(
int sourceTaskIndex, int sourceOutputIndex, int destTaskIndex) throws Exception {
int sourceIndex = sourceOutputIndex;
int destinationTaskIndex = sourceIndex/basePartitionRange;
if (destinationTaskIndex != destTaskIndex) {
return null;
}
int partitionRange = 1;
if(destinationTaskIndex < numDestinationTasks-1) {
partitionRange = basePartitionRange;
} else {
partitionRange = remainderRangeForLastShuffler;
}
// all inputs from a source task are next to each other in original order
int targetIndex =
sourceTaskIndex * partitionRange
+ sourceIndex % partitionRange;
return EventRouteMetadata.create(1, new int[]{targetIndex});
}
@Override
public void prepareForRouting() throws Exception {
// target indices derive from num src tasks
int numSourceTasks = getContext().getSourceVertexNumTasks();
targetIndices = new int[numSourceTasks][];
for (int srcTaskIndex=0; srcTaskIndex<numSourceTasks; ++srcTaskIndex) {
targetIndices[srcTaskIndex] = createIndices(basePartitionRange, srcTaskIndex,
basePartitionRange);
}
// source indices derive from num dest tasks (==partitions)
int numTargetTasks = getContext().getDestinationVertexNumTasks();
sourceIndices = new int[numTargetTasks][];
for (int destTaskIndex=0; destTaskIndex<numTargetTasks; ++destTaskIndex) {
int partitionRange = basePartitionRange;
if (destTaskIndex == (numTargetTasks-1)) {
partitionRange = remainderRangeForLastShuffler;
}
// skip the basePartitionRange per destination task
sourceIndices[destTaskIndex] = createIndices(partitionRange, destTaskIndex,
basePartitionRange);
}
}
private int[] createTargetIndicesForRemainder(int srcTaskIndex) {
// for the last task just generate on the fly instead of doubling the memory
return createIndices(remainderRangeForLastShuffler, srcTaskIndex,
remainderRangeForLastShuffler);
}
@Override
public @Nullable CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(
int sourceTaskIndex, int destinationTaskIndex)
throws Exception {
int[] targetIndicesToSend;
int partitionRange;
if(destinationTaskIndex == (numDestinationTasks-1)) {
if (remainderRangeForLastShuffler != basePartitionRange) {
targetIndicesToSend = createTargetIndicesForRemainder(sourceTaskIndex);
} else {
targetIndicesToSend = targetIndices[sourceTaskIndex];
}
partitionRange = remainderRangeForLastShuffler;
} else {
targetIndicesToSend = targetIndices[sourceTaskIndex];
partitionRange = basePartitionRange;
}
return CompositeEventRouteMetadata.create(partitionRange, targetIndicesToSend[0],
sourceIndices[destinationTaskIndex][0]);
}
@Override
public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
int sourceTaskIndex, int destinationTaskIndex) throws Exception {
int partitionRange = basePartitionRange;
if (destinationTaskIndex == (numDestinationTasks-1)) {
partitionRange = remainderRangeForLastShuffler;
}
int startOffset = sourceTaskIndex * partitionRange;
int[] targetIndices = new int[partitionRange];
for (int i=0; i<partitionRange; ++i) {
targetIndices[i] = (startOffset + i);
}
return EventRouteMetadata.create(partitionRange, targetIndices);
}
@Override
public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
if (remainderRangeForLastShuffler < basePartitionRange) {
int startOffset = sourceTaskIndex * basePartitionRange;
List<Integer> allIndices = Lists.newArrayListWithCapacity(basePartitionRange);
for (int i=0; i<basePartitionRange; ++i) {
allIndices.add(startOffset + i);
}
List<Integer> inputIndices = Collections.unmodifiableList(allIndices);
for (int i=0; i<numDestinationTasks-1; ++i) {
destinationTaskAndInputIndices.put(i, inputIndices);
}
startOffset = sourceTaskIndex * remainderRangeForLastShuffler;
allIndices = Lists.newArrayListWithCapacity(remainderRangeForLastShuffler);
for (int i=0; i<remainderRangeForLastShuffler; ++i) {
allIndices.add(startOffset+i);
}
inputIndices = Collections.unmodifiableList(allIndices);
destinationTaskAndInputIndices.put(numDestinationTasks-1, inputIndices);
} else {
// all tasks have same pattern
int startOffset = sourceTaskIndex * basePartitionRange;
List<Integer> allIndices = Lists.newArrayListWithCapacity(basePartitionRange);
for (int i=0; i<basePartitionRange; ++i) {
allIndices.add(startOffset + i);
}
List<Integer> inputIndices = Collections.unmodifiableList(allIndices);
for (int i=0; i<numDestinationTasks; ++i) {
destinationTaskAndInputIndices.put(i, inputIndices);
}
}
}
@Override
public int routeInputErrorEventToSource(InputReadErrorEvent event,
int destinationTaskIndex, int destinationFailedInputIndex) {
int partitionRange = 1;
if(destinationTaskIndex < numDestinationTasks-1) {
partitionRange = basePartitionRange;
} else {
partitionRange = remainderRangeForLastShuffler;
}
return destinationFailedInputIndex/partitionRange;
}
@Override
public int routeInputErrorEventToSource(int destinationTaskIndex,
int destinationFailedInputIndex) {
int partitionRange = 1;
if(destinationTaskIndex < numDestinationTasks-1) {
partitionRange = basePartitionRange;
} else {
partitionRange = remainderRangeForLastShuffler;
}
return destinationFailedInputIndex/partitionRange;
}
@Override
public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
return numDestinationTasks;
}
}
private static class CustomShuffleEdgeManagerConfig {
int numSourceTaskOutputs;
int numDestinationTasks;
int basePartitionRange;
int remainderRangeForLastShuffler;
private CustomShuffleEdgeManagerConfig(int numSourceTaskOutputs,
int numDestinationTasks,
int basePartitionRange,
int remainderRangeForLastShuffler) {
this.numSourceTaskOutputs = numSourceTaskOutputs;
this.numDestinationTasks = numDestinationTasks;
this.basePartitionRange = basePartitionRange;
this.remainderRangeForLastShuffler = remainderRangeForLastShuffler;
}
public UserPayload toUserPayload() {
return UserPayload.create(
ByteBuffer.wrap(ShuffleEdgeManagerConfigPayloadProto.newBuilder()
.setNumSourceTaskOutputs(numSourceTaskOutputs)
.setNumDestinationTasks(numDestinationTasks)
.setBasePartitionRange(basePartitionRange)
.setRemainderRangeForLastShuffler(remainderRangeForLastShuffler)
.build().toByteArray()));
}
public static CustomShuffleEdgeManagerConfig fromUserPayload(
UserPayload payload) throws InvalidProtocolBufferException {
ShuffleEdgeManagerConfigPayloadProto proto =
ShuffleEdgeManagerConfigPayloadProto.parseFrom(ByteString.copyFrom(payload.getPayload()));
return new CustomShuffleEdgeManagerConfig(
proto.getNumSourceTaskOutputs(),
proto.getNumDestinationTasks(),
proto.getBasePartitionRange(),
proto.getRemainderRangeForLastShuffler());
}
}
ReconfigVertexParams computeRouting() {
int currentParallelism = pendingTasks.size();
// Change this to use per partition stats for more accuracy TEZ-2962.
// Instead of aggregating overall size and then dividing equally - coalesce partitions until
// desired per partition size is achieved.
BigInteger expectedTotalSourceTasksOutputSize =
getExpectedTotalBipartiteSourceTasksOutputSize();
LOG.info("Expected output: {} based on actual output: {} from {} vertex " +
"manager events. desiredTaskInputSize: {} max slow start tasks: {} " +
" num sources completed: {}", expectedTotalSourceTasksOutputSize,
completedSourceTasksOutputSize, numVertexManagerEventsReceived,
config.getDesiredTaskInputDataSize(),
(totalNumBipartiteSourceTasks * config.getMaxFraction()),
numBipartiteSourceTasksCompleted);
// Calculate number of desired tasks by dividing with rounding up
BigInteger desiredTaskInputDataSize = BigInteger.valueOf(config.getDesiredTaskInputDataSize());
BigInteger desiredTaskInputDataSizeMinusOne = BigInteger.valueOf(config.getDesiredTaskInputDataSize() - 1);
BigInteger bigDesiredTaskParallelism =
expectedTotalSourceTasksOutputSize.add(desiredTaskInputDataSizeMinusOne).divide(desiredTaskInputDataSize);
if(bigDesiredTaskParallelism.compareTo(BigInteger.valueOf(Integer.MAX_VALUE)) > 0) {
LOG.info("Not reducing auto parallelism for vertex: {}"
+ " since the desired parallelism of {} is greater than or equal"
+ " to the max parallelism of {}", getContext().getVertexName(),
bigDesiredTaskParallelism, Integer.MAX_VALUE);
return null;
}
int desiredTaskParallelism = bigDesiredTaskParallelism.intValue();
if(desiredTaskParallelism < mgrConfig.getMinTaskParallelism()) {
desiredTaskParallelism = mgrConfig.getMinTaskParallelism();
}
if(desiredTaskParallelism >= currentParallelism) {
LOG.info("Not reducing auto parallelism for vertex: {}"
+ " since the desired parallelism of {} is greater than or equal"
+ " to the current parallelism of {}", getContext().getVertexName(),
desiredTaskParallelism, pendingTasks.size());
return null;
}
// most shufflers will be assigned this range
basePartitionRange = currentParallelism/desiredTaskParallelism;
if (basePartitionRange <= 1) {
// nothing to do if range is equal 1 partition. shuffler does it by default
LOG.info("Not reducing auto parallelism for vertex: {} by less than"
+ " half since combining two inputs will potentially break the"
+ " desired task input size of {}", getContext().getVertexName(),
config.getDesiredTaskInputDataSize());
return null;
}
int numShufflersWithBaseRange = currentParallelism / basePartitionRange;
remainderRangeForLastShuffler = currentParallelism % basePartitionRange;
int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
(numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);
LOG.info("Reducing auto parallelism for vertex: {} from {} to {}",
getContext().getVertexName(), pendingTasks.size(),
finalTaskParallelism);
if(finalTaskParallelism >= currentParallelism) {
return null;
}
CustomShuffleEdgeManagerConfig edgeManagerConfig =
new CustomShuffleEdgeManagerConfig(
currentParallelism, finalTaskParallelism, basePartitionRange,
((remainderRangeForLastShuffler > 0) ?
remainderRangeForLastShuffler : basePartitionRange));
EdgeManagerPluginDescriptor descriptor =
EdgeManagerPluginDescriptor.create(CustomShuffleEdgeManager.class.getName());
descriptor.setUserPayload(edgeManagerConfig.toUserPayload());
Iterable<Map.Entry<String, SourceVertexInfo>> bipartiteItr = getBipartiteInfo();
for(Map.Entry<String, SourceVertexInfo> entry : bipartiteItr) {
entry.getValue().newDescriptor = descriptor;
}
ReconfigVertexParams params =
new ReconfigVertexParams(finalTaskParallelism, null);
return params;
}
@Override
void postReconfigVertex() {
configureTargetMapping(pendingTasks.size());
}
private void configureTargetMapping(int tasks) {
targetIndexes = new int[tasks][];
for (int idx = 0; idx < tasks; ++idx) {
int partitionRange = basePartitionRange;
if (idx == (tasks - 1)) {
partitionRange = ((remainderRangeForLastShuffler > 0)
? remainderRangeForLastShuffler : basePartitionRange);
}
// skip the basePartitionRange per destination task
targetIndexes[idx] = createIndices(partitionRange, idx, basePartitionRange);
if (LOG.isDebugEnabled()) {
LOG.debug("targetIdx[{}] to {}", idx,
Arrays.toString(targetIndexes[idx]));
}
}
}
/**
* Get the list of tasks to schedule based on the overall progress.
* Parameter completedSourceAttempt is part of the base class used by other
* VertexManagerPlugins; it isn't used here.
*/
@Override
List<ScheduleTaskRequest> getTasksToSchedule(
TaskAttemptIdentifier completedSourceAttempt) {
float minSourceVertexCompletedTaskFraction =
getMinSourceVertexCompletedTaskFraction();
int numTasksToSchedule = getNumOfTasksToScheduleAndLog(
minSourceVertexCompletedTaskFraction);
if (numTasksToSchedule > 0) {
List<ScheduleTaskRequest> tasksToSchedule =
Lists.newArrayListWithCapacity(numTasksToSchedule);
while (!pendingTasks.isEmpty() && numTasksToSchedule > 0) {
numTasksToSchedule--;
Integer taskIndex = pendingTasks.get(0).getIndex();
tasksToSchedule.add(ScheduleTaskRequest.create(taskIndex, null));
pendingTasks.remove(0);
}
return tasksToSchedule;
}
return null;
}
@Override
void processPendingTasks() {
if (totalNumBipartiteSourceTasks > 0) {
//Sort in case partition stats are available
sortPendingTasksBasedOnDataSize();
}
}
private void sortPendingTasksBasedOnDataSize() {
//Get partition sizes from all source vertices
boolean statsUpdated = computePartitionSizes();
if (statsUpdated) {
//Order the pending tasks based on task size in reverse order
Collections.sort(pendingTasks, new Comparator<PendingTaskInfo>() {
@Override
public int compare(PendingTaskInfo left, PendingTaskInfo right) {
return (left.getInputStats() > right.getInputStats()) ? -1 :
((left.getInputStats() == right.getInputStats()) ? 0 : 1);
}
});
if (LOG.isDebugEnabled()) {
for (PendingTaskInfo pendingTask : pendingTasks) {
LOG.debug("Pending task: {}", pendingTask.toString());
}
}
}
}
/**
* Compute partition sizes in case statistics are available in vertex.
*
* @return boolean indicating whether stats are computed
*/
private boolean computePartitionSizes() {
boolean computedPartitionSizes = false;
for (PendingTaskInfo taskInfo : pendingTasks) {
int index = taskInfo.getIndex();
if (targetIndexes != null) { //parallelism has changed.
Preconditions.checkState(index < targetIndexes.length,
"index=" + index +", targetIndexes length=" + targetIndexes.length);
int[] mapping = targetIndexes[index];
int partitionStats = 0;
for (int i : mapping) {
partitionStats += getCurrentlyKnownStatsAtIndex(i);
}
computedPartitionSizes |= taskInfo.setInputStats(partitionStats);
} else {
computedPartitionSizes |= taskInfo.setInputStats(
getCurrentlyKnownStatsAtIndex(index));
}
}
return computedPartitionSizes;
}
/**
* Create a {@link VertexManagerPluginDescriptor} builder that can be used to
* configure the plugin.
*
* @param conf
* {@link Configuration} May be modified in place. May be null if the
* configuration parameters are to be set only via code. If
* configuration values may be changed at runtime via a config file
* then pass in a {@link Configuration} that is initialized from a
* config file. The parameters that are not overridden in code will
* be derived from the Configuration object.
* @return {@link ShuffleVertexManagerConfigBuilder}
*/
public static ShuffleVertexManagerConfigBuilder createConfigBuilder(
@Nullable Configuration conf) {
return new ShuffleVertexManagerConfigBuilder(conf);
}
/**
* Helper class to configure ShuffleVertexManager
*/
public static final class ShuffleVertexManagerConfigBuilder {
private final Configuration conf;
private ShuffleVertexManagerConfigBuilder(@Nullable Configuration conf) {
if (conf == null) {
this.conf = new Configuration(false);
} else {
this.conf = conf;
}
}
public ShuffleVertexManagerConfigBuilder setAutoReduceParallelism(
boolean enabled) {
conf.setBoolean(TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
enabled);
return this;
}
public ShuffleVertexManagerConfigBuilder
setSlowStartMinSrcCompletionFraction(float minFraction) {
conf.setFloat(TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, minFraction);
return this;
}
public ShuffleVertexManagerConfigBuilder
setSlowStartMaxSrcCompletionFraction(float maxFraction) {
conf.setFloat(TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, maxFraction);
return this;
}
public ShuffleVertexManagerConfigBuilder setDesiredTaskInputSize(
long desiredTaskInputSize) {
conf.setLong(TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
desiredTaskInputSize);
return this;
}
public ShuffleVertexManagerConfigBuilder setMinTaskParallelism(
int minTaskParallelism) {
conf.setInt(TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
minTaskParallelism);
return this;
}
public VertexManagerPluginDescriptor build() {
VertexManagerPluginDescriptor desc =
VertexManagerPluginDescriptor.create(
ShuffleVertexManager.class.getName());
try {
return desc.setUserPayload(TezUtils.createUserPayloadFromConf(this.conf));
} catch (IOException e) {
throw new TezUncheckedException(e);
}
}
}
}