blob: e1ef7bec907a8bae7f9c59d2ae7ef56f346a65ef [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.speculate.forecast;
import java.util.concurrent.atomic.AtomicReference;
/**
* Implementation of the static model for Simple exponential smoothing.
*/
public class SimpleExponentialSmoothing {
public final static double DEFAULT_FORECAST = -1.0;
private final int kMinimumReads;
private final long kStagnatedWindow;
private final long startTime;
private long timeConstant;
private AtomicReference<ForecastRecord> forecastRefEntry;
public static SimpleExponentialSmoothing createForecast(long timeConstant,
int skipCnt, long stagnatedWindow, long timeStamp) {
return new SimpleExponentialSmoothing(timeConstant, skipCnt,
stagnatedWindow, timeStamp);
}
SimpleExponentialSmoothing(long ktConstant, int skipCnt,
long stagnatedWindow, long timeStamp) {
kMinimumReads = skipCnt;
kStagnatedWindow = stagnatedWindow;
this.timeConstant = ktConstant;
this.startTime = timeStamp;
this.forecastRefEntry = new AtomicReference<ForecastRecord>(null);
}
private class ForecastRecord {
private double alpha;
private long timeStamp;
private double sample;
private double rawData;
private double forecast;
private double sseError;
private long myIndex;
ForecastRecord(double forecast, double rawData, long timeStamp) {
this(0.0, forecast, rawData, forecast, timeStamp, 0.0, 0);
}
ForecastRecord(double alpha, double sample, double rawData,
double forecast, long timeStamp, double accError, long index) {
this.timeStamp = timeStamp;
this.alpha = alpha;
this.sseError = 0.0;
this.sample = sample;
this.forecast = forecast;
this.rawData = rawData;
this.sseError = accError;
this.myIndex = index;
}
private double preProcessRawData(double rData, long newTime) {
return processRawData(this.rawData, this.timeStamp, rData, newTime);
}
public ForecastRecord append(long newTimeStamp, double rData) {
if (this.timeStamp > newTimeStamp) {
return this;
}
double newSample = preProcessRawData(rData, newTimeStamp);
long deltaTime = this.timeStamp - newTimeStamp;
if (this.myIndex == kMinimumReads) {
timeConstant = Math.max(timeConstant, newTimeStamp - startTime);
}
double smoothFactor =
1 - Math.exp(((double) deltaTime) / timeConstant);
double forecastVal =
smoothFactor * newSample + (1.0 - smoothFactor) * this.forecast;
double newSSEError =
this.sseError + Math.pow(newSample - this.forecast, 2);
return new ForecastRecord(smoothFactor, newSample, rData, forecastVal,
newTimeStamp, newSSEError, this.myIndex + 1);
}
}
public boolean isDataStagnated(long timeStamp) {
ForecastRecord rec = forecastRefEntry.get();
if (rec != null && rec.myIndex <= kMinimumReads) {
return (rec.timeStamp + kStagnatedWindow) < timeStamp;
}
return false;
}
static double processRawData(double oldRawData, long oldTime,
double newRawData, long newTime) {
double rate = (newRawData - oldRawData) / (newTime - oldTime);
return rate;
}
public void incorporateReading(long timeStamp, double rawData) {
ForecastRecord oldRec = forecastRefEntry.get();
if (oldRec == null) {
double oldForecast =
processRawData(0, startTime, rawData, timeStamp);
forecastRefEntry.compareAndSet(null,
new ForecastRecord(oldForecast, 0.0, startTime));
incorporateReading(timeStamp, rawData);
return;
}
while (!forecastRefEntry.compareAndSet(oldRec, oldRec.append(timeStamp,
rawData))) {
oldRec = forecastRefEntry.get();
}
}
public double getForecast() {
ForecastRecord rec = forecastRefEntry.get();
if (rec != null && rec.myIndex > kMinimumReads) {
return rec.forecast;
}
return DEFAULT_FORECAST;
}
public boolean isDefaultForecast(double value) {
return value == DEFAULT_FORECAST;
}
public double getSSE() {
ForecastRecord rec = forecastRefEntry.get();
if (rec != null) {
return rec.sseError;
}
return DEFAULT_FORECAST;
}
public boolean isErrorWithinBound(double bound) {
double squaredErr = getSSE();
if (squaredErr < 0) {
return false;
}
return bound > squaredErr;
}
public double getRawData() {
ForecastRecord rec = forecastRefEntry.get();
if (rec != null) {
return rec.rawData;
}
return DEFAULT_FORECAST;
}
public long getTimeStamp() {
ForecastRecord rec = forecastRefEntry.get();
if (rec != null) {
return rec.timeStamp;
}
return 0L;
}
public long getStartTime() {
return startTime;
}
public AtomicReference<ForecastRecord> getForecastRefEntry() {
return forecastRefEntry;
}
@Override
public String toString() {
String res = "NULL";
ForecastRecord rec = forecastRefEntry.get();
if (rec != null) {
res = "rec.index = " + rec.myIndex + ", forecast t: " + rec.timeStamp +
", forecast: " + rec.forecast
+ ", sample: " + rec.sample + ", raw: " + rec.rawData + ", error: "
+ rec.sseError + ", alpha: " + rec.alpha;
}
return res;
}
}