blob: ff50bc2f1d7fc22fd74dd8abc736cfaeb8437aae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.speculate;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AMConstants;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
/**
* This estimator exponentially smooths the rate of progress versus wallclock
* time. Conceivably we could write an estimator that smooths time per
* unit progress, and get different results.
*/
public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase {
private final ConcurrentMap<TaskAttemptId, AtomicReference<EstimateVector>> estimates
= new ConcurrentHashMap<TaskAttemptId, AtomicReference<EstimateVector>>();
private SmoothedValue smoothedValue;
private long lambda;
public enum SmoothedValue {
RATE, TIME_PER_UNIT_PROGRESS
}
ExponentiallySmoothedTaskRuntimeEstimator
(long lambda, SmoothedValue smoothedValue) {
super();
this.smoothedValue = smoothedValue;
this.lambda = lambda;
}
public ExponentiallySmoothedTaskRuntimeEstimator() {
super();
}
// immutable
private class EstimateVector {
final double value;
final float basedOnProgress;
final long atTime;
EstimateVector(double value, float basedOnProgress, long atTime) {
this.value = value;
this.basedOnProgress = basedOnProgress;
this.atTime = atTime;
}
EstimateVector incorporate(float newProgress, long newAtTime) {
if (newAtTime <= atTime || newProgress < basedOnProgress) {
return this;
}
double oldWeighting
= value < 0.0
? 0.0 : Math.exp(((double) (newAtTime - atTime)) / lambda);
double newRead = (newProgress - basedOnProgress) / (newAtTime - atTime);
if (smoothedValue == SmoothedValue.TIME_PER_UNIT_PROGRESS) {
newRead = 1.0 / newRead;
}
return new EstimateVector
(value * oldWeighting + newRead * (1.0 - oldWeighting),
newProgress, newAtTime);
}
}
private void incorporateReading
(TaskAttemptId attemptID, float newProgress, long newTime) {
//TODO: Refactor this method, it seems more complicated than necessary.
AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
if (vectorRef == null) {
estimates.putIfAbsent(attemptID, new AtomicReference<EstimateVector>(null));
incorporateReading(attemptID, newProgress, newTime);
return;
}
EstimateVector oldVector = vectorRef.get();
if (oldVector == null) {
if (vectorRef.compareAndSet(null,
new EstimateVector(-1.0, 0.0F, Long.MIN_VALUE))) {
return;
}
incorporateReading(attemptID, newProgress, newTime);
return;
}
while (!vectorRef.compareAndSet
(oldVector, oldVector.incorporate(newProgress, newTime))) {
oldVector = vectorRef.get();
}
}
private EstimateVector getEstimateVector(TaskAttemptId attemptID) {
AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
if (vectorRef == null) {
return null;
}
return vectorRef.get();
}
private static final long DEFAULT_EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS
= 1000L * 60;
@Override
public void contextualize(Configuration conf, AppContext context) {
super.contextualize(conf, context);
lambda
= conf.getLong(AMConstants.EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS,
DEFAULT_EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS);
smoothedValue
= conf.getBoolean(AMConstants.EXPONENTIAL_SMOOTHING_SMOOTH_RATE, true)
? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
}
@Override
public long estimatedRuntime(TaskAttemptId id) {
Long startTime = startTimes.get(id);
if (startTime == null) {
return -1L;
}
EstimateVector vector = getEstimateVector(id);
if (vector == null) {
return -1L;
}
long sunkTime = vector.atTime - startTime;
double value = vector.value;
float progress = vector.basedOnProgress;
if (value == 0) {
return -1L;
}
double rate = smoothedValue == SmoothedValue.RATE ? value : 1.0 / value;
if (rate == 0.0) {
return -1L;
}
double remainingTime = (1.0 - progress) / rate;
return sunkTime + (long)remainingTime;
}
@Override
public long runtimeEstimateVariance(TaskAttemptId id) {
return -1L;
}
@Override
public void updateAttempt(TaskAttemptStatus status, long timestamp) {
super.updateAttempt(status, timestamp);
TaskAttemptId attemptID = status.id;
float progress = status.progress;
incorporateReading(attemptID, progress, timestamp);
}
}