| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapred; |
| |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| |
| /** |
| * |
| * This abstract class that represents a bucketed series of |
| * measurements of a quantity being measured in a running task |
| * attempt. |
| * |
| * <p>The sole constructor is called with a count, which is the |
| * number of buckets into which we evenly divide the spectrum of |
| * progress from 0.0D to 1.0D . In the future we may provide for |
| * custom split points that don't have to be uniform. |
| * |
| * <p>A subclass determines how we fold readings for portions of a |
| * bucket and how we interpret the readings by overriding |
| * {@code extendInternal(...)} and {@code initializeInterval()} |
| */ |
| @Private |
| @Unstable |
| public abstract class PeriodicStatsAccumulator { |
| // The range of progress from 0.0D through 1.0D is divided into |
| // count "progress segments". This object accumulates an |
| // estimate of the effective value of a time-varying value during |
| // the zero-based i'th progress segment, ranging from i/count |
| // through (i+1)/count . |
| // This is an abstract class. We have two implementations: one |
| // for monotonically increasing time-dependent variables |
| // [currently, CPU time in milliseconds and wallclock time in |
| // milliseconds] and one for quantities that can vary arbitrarily |
| // over time, currently virtual and physical memory used, in |
| // kilobytes. |
| // We carry int's here. This saves a lot of JVM heap space in the |
| // job tracker per running task attempt [200 bytes per] but it |
| // has a small downside. |
| // No task attempt can run for more than 57 days nor occupy more |
| // than two terabytes of virtual memory. |
| protected final int count; |
| protected final int[] values; |
| |
| static class StatsetState { |
| int oldValue = 0; |
| double oldProgress = 0.0D; |
| |
| double currentAccumulation = 0.0D; |
| } |
| |
| // We provide this level of indirection to reduce the memory |
| // footprint of done task attempts. When a task's progress |
| // reaches 1.0D, we delete this objecte StatsetState. |
| StatsetState state = new StatsetState(); |
| |
| PeriodicStatsAccumulator(int count) { |
| this.count = count; |
| this.values = new int[count]; |
| for (int i = 0; i < count; ++i) { |
| values[i] = -1; |
| } |
| } |
| |
| protected int[] getValues() { |
| return values; |
| } |
| |
| // The concrete implementation of this abstract function |
| // accumulates more data into the current progress segment. |
| // newProgress [from the call] and oldProgress [from the object] |
| // must be in [or at the border of] a single progress segment. |
| /** |
| * |
| * adds a new reading to the current bucket. |
| * |
| * @param newProgress the endpoint of the interval this new |
| * reading covers |
| * @param newValue the value of the reading at {@code newProgress} |
| * |
| * The class has three instance variables, {@code oldProgress} and |
| * {@code oldValue} and {@code currentAccumulation}. |
| * |
| * {@code extendInternal} can count on three things: |
| * |
| * 1: The first time it's called in a particular instance, both |
| * oldXXX's will be zero. |
| * |
| * 2: oldXXX for a later call is the value of newXXX of the |
| * previous call. This ensures continuity in accumulation from |
| * one call to the next. |
| * |
| * 3: {@code currentAccumulation} is owned by |
| * {@code initializeInterval} and {@code extendInternal}. |
| */ |
| protected abstract void extendInternal(double newProgress, int newValue); |
| |
| // What has to be done when you open a new interval |
| /** |
| * initializes the state variables to be ready for a new interval |
| */ |
| protected void initializeInterval() { |
| state.currentAccumulation = 0.0D; |
| } |
| |
| // called for each new reading |
| /** |
| * This method calls {@code extendInternal} at least once. It |
| * divides the current progress interval [from the last call's |
| * {@code newProgress} to this call's {@code newProgress} ] |
| * into one or more subintervals by splitting at any point which |
| * is an interval boundary if there are any such points. It |
| * then calls {@code extendInternal} for each subinterval, or the |
| * whole interval if there are no splitting points. |
| * |
| * <p>For example, if the value was {@code 300} last time with |
| * {@code 0.3} progress, and count is {@code 5}, and you get a |
| * new reading with the variable at {@code 700} and progress at |
| * {@code 0.7}, you get three calls to {@code extendInternal}: |
| * one extending from progress {@code 0.3} to {@code 0.4} [the |
| * next boundary] with a value of {@code 400}, the next one |
| * through {@code 0.6} with a value of {@code 600}, and finally |
| * one at {@code 700} with a progress of {@code 0.7} . |
| * |
| * @param newProgress the endpoint of the progress range this new |
| * reading covers |
| * @param newValue the value of the reading at {@code newProgress} |
| */ |
| protected void extend(double newProgress, int newValue) { |
| if (state == null || newProgress < state.oldProgress) { |
| return; |
| } |
| |
| // This correctness of this code depends on 100% * count = count. |
| int oldIndex = (int)(state.oldProgress * count); |
| int newIndex = (int)(newProgress * count); |
| int originalOldValue = state.oldValue; |
| |
| double fullValueDistance = (double)newValue - state.oldValue; |
| double fullProgressDistance = newProgress - state.oldProgress; |
| double originalOldProgress = state.oldProgress; |
| |
| // In this loop we detect each subinterval boundary within the |
| // range from the old progress to the new one. Then we |
| // interpolate the value from the old value to the new one to |
| // infer what its value might have been at each such boundary. |
| // Lastly we make the necessary calls to extendInternal to fold |
| // in the data for each trapazoid where no such trapazoid |
| // crosses a boundary. |
| for (int closee = oldIndex; closee < newIndex; ++closee) { |
| double interpolationProgress = (double)(closee + 1) / count; |
| // In floats, x * y / y might not equal y. |
| interpolationProgress = Math.min(interpolationProgress, newProgress); |
| |
| double progressLength = (interpolationProgress - originalOldProgress); |
| double interpolationProportion = progressLength / fullProgressDistance; |
| |
| double interpolationValueDistance |
| = fullValueDistance * interpolationProportion; |
| |
| // estimates the value at the next [interpolated] subsegment boundary |
| int interpolationValue |
| = (int)interpolationValueDistance + originalOldValue; |
| |
| extendInternal(interpolationProgress, interpolationValue); |
| |
| advanceState(interpolationProgress, interpolationValue); |
| |
| values[closee] = (int)state.currentAccumulation; |
| initializeInterval(); |
| |
| } |
| |
| extendInternal(newProgress, newValue); |
| advanceState(newProgress, newValue); |
| |
| if (newIndex == count) { |
| state = null; |
| } |
| } |
| |
| protected void advanceState(double newProgress, int newValue) { |
| state.oldValue = newValue; |
| state.oldProgress = newProgress; |
| } |
| |
| int getCount() { |
| return count; |
| } |
| |
| int get(int index) { |
| return values[index]; |
| } |
| } |