| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.v2.app.speculate; |
| |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; |
| import org.apache.hadoop.mapreduce.v2.app.AMConstants; |
| import org.apache.hadoop.mapreduce.v2.app.AppContext; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; |
| |
| /** |
| * This estimator exponentially smooths the rate of progress versus wallclock |
| * time. Conceivably we could write an estimator that smooths time per |
| * unit progress, and get different results. |
| */ |
| public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase { |
| |
| private final ConcurrentMap<TaskAttemptId, AtomicReference<EstimateVector>> estimates |
| = new ConcurrentHashMap<TaskAttemptId, AtomicReference<EstimateVector>>(); |
| |
| private SmoothedValue smoothedValue; |
| |
| private long lambda; |
| |
| public enum SmoothedValue { |
| RATE, TIME_PER_UNIT_PROGRESS |
| } |
| |
| ExponentiallySmoothedTaskRuntimeEstimator |
| (long lambda, SmoothedValue smoothedValue) { |
| super(); |
| this.smoothedValue = smoothedValue; |
| this.lambda = lambda; |
| } |
| |
| public ExponentiallySmoothedTaskRuntimeEstimator() { |
| super(); |
| } |
| |
| // immutable |
| private class EstimateVector { |
| final double value; |
| final float basedOnProgress; |
| final long atTime; |
| |
| EstimateVector(double value, float basedOnProgress, long atTime) { |
| this.value = value; |
| this.basedOnProgress = basedOnProgress; |
| this.atTime = atTime; |
| } |
| |
| EstimateVector incorporate(float newProgress, long newAtTime) { |
| if (newAtTime <= atTime || newProgress < basedOnProgress) { |
| return this; |
| } |
| |
| double oldWeighting |
| = value < 0.0 |
| ? 0.0 : Math.exp(((double) (newAtTime - atTime)) / lambda); |
| |
| double newRead = (newProgress - basedOnProgress) / (newAtTime - atTime); |
| |
| if (smoothedValue == SmoothedValue.TIME_PER_UNIT_PROGRESS) { |
| newRead = 1.0 / newRead; |
| } |
| |
| return new EstimateVector |
| (value * oldWeighting + newRead * (1.0 - oldWeighting), |
| newProgress, newAtTime); |
| } |
| } |
| |
| private void incorporateReading |
| (TaskAttemptId attemptID, float newProgress, long newTime) { |
| //TODO: Refactor this method, it seems more complicated than necessary. |
| AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID); |
| |
| if (vectorRef == null) { |
| estimates.putIfAbsent(attemptID, new AtomicReference<EstimateVector>(null)); |
| incorporateReading(attemptID, newProgress, newTime); |
| return; |
| } |
| |
| EstimateVector oldVector = vectorRef.get(); |
| |
| if (oldVector == null) { |
| if (vectorRef.compareAndSet(null, |
| new EstimateVector(-1.0, 0.0F, Long.MIN_VALUE))) { |
| return; |
| } |
| |
| incorporateReading(attemptID, newProgress, newTime); |
| return; |
| } |
| |
| while (!vectorRef.compareAndSet |
| (oldVector, oldVector.incorporate(newProgress, newTime))) { |
| oldVector = vectorRef.get(); |
| } |
| } |
| |
| private EstimateVector getEstimateVector(TaskAttemptId attemptID) { |
| AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID); |
| |
| if (vectorRef == null) { |
| return null; |
| } |
| |
| return vectorRef.get(); |
| } |
| |
| private static final long DEFAULT_EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS |
| = 1000L * 60; |
| |
| @Override |
| public void contextualize(Configuration conf, AppContext context) { |
| super.contextualize(conf, context); |
| |
| lambda |
| = conf.getLong(AMConstants.EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS, |
| DEFAULT_EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS); |
| smoothedValue |
| = conf.getBoolean(AMConstants.EXPONENTIAL_SMOOTHING_SMOOTH_RATE, true) |
| ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS; |
| } |
| |
| @Override |
| public long estimatedRuntime(TaskAttemptId id) { |
| Long startTime = startTimes.get(id); |
| |
| if (startTime == null) { |
| return -1L; |
| } |
| |
| EstimateVector vector = getEstimateVector(id); |
| |
| if (vector == null) { |
| return -1L; |
| } |
| |
| long sunkTime = vector.atTime - startTime; |
| |
| double value = vector.value; |
| float progress = vector.basedOnProgress; |
| |
| if (value == 0) { |
| return -1L; |
| } |
| |
| double rate = smoothedValue == SmoothedValue.RATE ? value : 1.0 / value; |
| |
| if (rate == 0.0) { |
| return -1L; |
| } |
| |
| double remainingTime = (1.0 - progress) / rate; |
| |
| return sunkTime + (long)remainingTime; |
| } |
| |
| @Override |
| public long runtimeEstimateVariance(TaskAttemptId id) { |
| return -1L; |
| } |
| |
| @Override |
| public void updateAttempt(TaskAttemptStatus status, long timestamp) { |
| super.updateAttempt(status, timestamp); |
| TaskAttemptId attemptID = status.id; |
| |
| float progress = status.progress; |
| |
| incorporateReading(attemptID, progress, timestamp); |
| } |
| } |