blob: 8f76e05074a28fd2d3c456624587e7e3c251d7b7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.dag.speculation.legacy;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import com.google.common.base.Preconditions;
/**
* Maintains runtime estimation statistics. Makes periodic updates
* estimates based on progress and decides on when to trigger a
* speculative attempt. Speculation attempts are triggered when the
* estimated runtime is more than a threshold beyond the mean runtime
* and the original task still has enough estimated runtime left that
* the speculative version is expected to finish sooner than that. If
* the original is close to completion then we dont start a speculation
* because it may be likely a wasted attempt. There is a delay between
* successive speculations.
*/
public class LegacySpeculator {
private static final long ON_SCHEDULE = Long.MIN_VALUE;
private static final long ALREADY_SPECULATING = Long.MIN_VALUE + 1;
private static final long TOO_NEW = Long.MIN_VALUE + 2;
private static final long PROGRESS_IS_GOOD = Long.MIN_VALUE + 3;
private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
private static final int MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
private static final Log LOG = LogFactory.getLog(LegacySpeculator.class);
private final ConcurrentMap<TezTaskID, Boolean> runningTasks
= new ConcurrentHashMap<TezTaskID, Boolean>();
// Used to track any TaskAttempts that aren't heart-beating for a while, so
// that we can aggressively speculate instead of waiting for task-timeout.
private final ConcurrentMap<TezTaskAttemptID, TaskAttemptHistoryStatistics>
runningTaskAttemptStatistics = new ConcurrentHashMap<TezTaskAttemptID,
TaskAttemptHistoryStatistics>();
// Regular heartbeat from tasks is every 3 secs. So if we don't get a
// heartbeat in 9 secs (3 heartbeats), we simulate a heartbeat with no change
// in progress.
private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000;
private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>();
private Vertex vertex;
private TaskRuntimeEstimator estimator;
private final Clock clock;
private long nextSpeculateTime = Long.MIN_VALUE;
public LegacySpeculator(Configuration conf, AppContext context, Vertex vertex) {
this(conf, context.getClock(), vertex);
}
public LegacySpeculator(Configuration conf, Clock clock, Vertex vertex) {
this(conf, getEstimator(conf, vertex), clock, vertex);
}
static private TaskRuntimeEstimator getEstimator
(Configuration conf, Vertex vertex) {
TaskRuntimeEstimator estimator = new LegacyTaskRuntimeEstimator();
estimator.contextualize(conf, vertex);
return estimator;
}
// This constructor is designed to be called by other constructors.
// However, it's public because we do use it in the test cases.
// Normally we figure out our own estimator.
public LegacySpeculator
(Configuration conf, TaskRuntimeEstimator estimator, Clock clock, Vertex vertex) {
this.vertex = vertex;
this.estimator = estimator;
this.clock = clock;
}
/* ************************************************************* */
void maybeSpeculate() {
long now = clock.getTime();
if (now < nextSpeculateTime) {
return;
}
int speculations = maybeScheduleASpeculation();
long mininumRecomp
= speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
: SOONEST_RETRY_AFTER_NO_SPECULATE;
long wait = Math.max(mininumRecomp,
clock.getTime() - now);
nextSpeculateTime = now + wait;
if (speculations > 0) {
LOG.info("We launched " + speculations
+ " speculations. Waiting " + wait + " milliseconds.");
}
}
/* ************************************************************* */
public void notifyAttemptStarted(TezTaskAttemptID taId, long timestamp) {
estimator.enrollAttempt(taId, timestamp);
}
public void notifyAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState reportedState,
long timestamp) {
statusUpdate(taId, reportedState, timestamp);
maybeSpeculate();
}
/**
* Absorbs one TaskAttemptStatus
*
* @param reportedStatus the status report that we got from a task attempt
* that we want to fold into the speculation data for this job
* @param timestamp the time this status corresponds to. This matters
* because statuses contain progress.
*/
private void statusUpdate(TezTaskAttemptID attemptID, TaskAttemptState reportedState, long timestamp) {
TezTaskID taskID = attemptID.getTaskID();
Task task = vertex.getTask(taskID);
Preconditions.checkState(task != null, "Null task for attempt: " + attemptID);
estimator.updateAttempt(attemptID, reportedState, timestamp);
//if (stateString.equals(TaskAttemptState.RUNNING.name())) {
if (reportedState == TaskAttemptState.RUNNING) {
runningTasks.putIfAbsent(taskID, Boolean.TRUE);
} else {
runningTasks.remove(taskID, Boolean.TRUE);
//if (!stateString.equals(TaskAttemptState.STARTING.name())) {
if (reportedState == TaskAttemptState.STARTING) {
runningTaskAttemptStatistics.remove(attemptID);
}
}
}
/* ************************************************************* */
// This is the code section that runs periodically and adds speculations for
// those jobs that need them.
// This can return a few magic values for tasks that shouldn't speculate:
// returns ON_SCHEDULE if thresholdRuntime(taskID) says that we should not
// considering speculating this task
// returns ALREADY_SPECULATING if that is true. This has priority.
// returns TOO_NEW if our companion task hasn't gotten any information
// returns PROGRESS_IS_GOOD if the task is sailing through
// returns NOT_RUNNING if the task is not running
//
// All of these values are negative. Any value that should be allowed to
// speculate is 0 or positive.
private long speculationValue(Task task, long now) {
Map<TezTaskAttemptID, TaskAttempt> attempts = task.getAttempts();
TezTaskID taskID = task.getTaskId();
long acceptableRuntime = Long.MIN_VALUE;
long result = Long.MIN_VALUE;
// short circuit completed tasks. no need to spend time on them
if (task.getState() == TaskState.SUCCEEDED) {
return NOT_RUNNING;
}
if (!mayHaveSpeculated.contains(taskID)) {
acceptableRuntime = estimator.thresholdRuntime(taskID);
if (acceptableRuntime == Long.MAX_VALUE) {
return ON_SCHEDULE;
}
}
TezTaskAttemptID runningTaskAttemptID = null;
int numberRunningAttempts = 0;
for (TaskAttempt taskAttempt : attempts.values()) {
if (taskAttempt.getState() == TaskAttemptState.RUNNING
|| taskAttempt.getState() == TaskAttemptState.STARTING) {
if (++numberRunningAttempts > 1) {
return ALREADY_SPECULATING;
}
runningTaskAttemptID = taskAttempt.getID();
long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
long taskAttemptStartTime
= estimator.attemptEnrolledTime(runningTaskAttemptID);
if (taskAttemptStartTime > now) {
// This background process ran before we could process the task
// attempt status change that chronicles the attempt start
return TOO_NEW;
}
long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
long estimatedReplacementEndTime
= now + estimator.newAttemptEstimatedRuntime();
float progress = taskAttempt.getProgress();
TaskAttemptHistoryStatistics data =
runningTaskAttemptStatistics.get(runningTaskAttemptID);
if (data == null) {
runningTaskAttemptStatistics.put(runningTaskAttemptID,
new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
} else {
if (estimatedRunTime == data.getEstimatedRunTime()
&& progress == data.getProgress()) {
// Previous stats are same as same stats
if (data.notHeartbeatedInAWhile(now)) {
// Stats have stagnated for a while, simulate heart-beat.
// Now simulate the heart-beat
statusUpdate(taskAttempt.getID(), taskAttempt.getState(), clock.getTime());
}
} else {
// Stats have changed - update our data structure
data.setEstimatedRunTime(estimatedRunTime);
data.setProgress(progress);
data.resetHeartBeatTime(now);
}
}
if (estimatedEndTime < now) {
return PROGRESS_IS_GOOD;
}
if (estimatedReplacementEndTime >= estimatedEndTime) {
return TOO_LATE_TO_SPECULATE;
}
result = estimatedEndTime - estimatedReplacementEndTime;
}
}
// If we are here, there's at most one task attempt.
if (numberRunningAttempts == 0) {
return NOT_RUNNING;
}
if (acceptableRuntime == Long.MIN_VALUE) {
acceptableRuntime = estimator.thresholdRuntime(taskID);
if (acceptableRuntime == Long.MAX_VALUE) {
return ON_SCHEDULE;
}
}
return result;
}
//Add attempt to a given Task.
protected void addSpeculativeAttempt(TezTaskID taskID) {
LOG.info
("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
vertex.scheduleSpeculativeTask(taskID);
mayHaveSpeculated.add(taskID);
}
private int maybeScheduleASpeculation() {
int successes = 0;
long now = clock.getTime();
int numberSpeculationsAlready = 0;
int numberRunningTasks = 0;
Map<TezTaskID, Task> tasks = vertex.getTasks();
int numberAllowedSpeculativeTasks
= (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
TezTaskID bestTaskID = null;
long bestSpeculationValue = -1L;
// this loop is potentially pricey.
// TODO track the tasks that are potentially worth looking at
for (Map.Entry<TezTaskID, Task> taskEntry : tasks.entrySet()) {
long mySpeculationValue = speculationValue(taskEntry.getValue(), now);
if (mySpeculationValue == ALREADY_SPECULATING) {
++numberSpeculationsAlready;
}
if (mySpeculationValue != NOT_RUNNING) {
++numberRunningTasks;
}
if (mySpeculationValue > bestSpeculationValue) {
bestTaskID = taskEntry.getKey();
bestSpeculationValue = mySpeculationValue;
}
}
numberAllowedSpeculativeTasks
= (int) Math.max(numberAllowedSpeculativeTasks,
PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
// If we found a speculation target, fire it off
if (bestTaskID != null
&& numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
addSpeculativeAttempt(bestTaskID);
++successes;
}
return successes;
}
static class TaskAttemptHistoryStatistics {
private long estimatedRunTime;
private float progress;
private long lastHeartBeatTime;
public TaskAttemptHistoryStatistics(long estimatedRunTime, float progress,
long nonProgressStartTime) {
this.estimatedRunTime = estimatedRunTime;
this.progress = progress;
resetHeartBeatTime(nonProgressStartTime);
}
public long getEstimatedRunTime() {
return this.estimatedRunTime;
}
public float getProgress() {
return this.progress;
}
public void setEstimatedRunTime(long estimatedRunTime) {
this.estimatedRunTime = estimatedRunTime;
}
public void setProgress(float progress) {
this.progress = progress;
}
public boolean notHeartbeatedInAWhile(long now) {
if (now - lastHeartBeatTime <= MAX_WAITTING_TIME_FOR_HEARTBEAT) {
return false;
} else {
resetHeartBeatTime(now);
return true;
}
}
public void resetHeartBeatTime(long lastHeartBeatTime) {
this.lastHeartBeatTime = lastHeartBeatTime;
}
}
}