blob: a66bab2669f1feab37e9d5c743b0251d8eba6a3a [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.api.utils;
// TODO:- Uncomment this
// import org.slf4j.Logger;
// import org.slf4j.LoggerFactory;
/**
* This is a class that helps to auto tune the max spout pending value
*/
public class DefaultMaxSpoutPendingTuner {
static final long CONSERVATIVE_MAX_SPOUT_PENDING = 10;
static final int NOOP_THRESHOLD = 5;
private Long adjustedMaxSpoutPending;
private Long initialMaxSpoutPending;
private Long prevProgress;
private Long restoreAdjustedMaxSpoutPending;
private Long restoreProgress;
private int callsInNoop;
private ACTION lastAction;
private float autoTuneFactor;
private double progressBound;
private ACTION speculativeAction;
/**
* Conv constructor when initing from a non-set initial value
*/
public DefaultMaxSpoutPendingTuner(float autoTuneFactor, double progressBound) {
this(null, autoTuneFactor, progressBound);
}
public DefaultMaxSpoutPendingTuner(Long maxSpoutPending, float autoTuneFactor,
double progressBound) {
if (maxSpoutPending == null) {
adjustedMaxSpoutPending = CONSERVATIVE_MAX_SPOUT_PENDING;
} else {
adjustedMaxSpoutPending = maxSpoutPending;
}
initialMaxSpoutPending = adjustedMaxSpoutPending;
prevProgress = new Long(-1);
restoreAdjustedMaxSpoutPending = adjustedMaxSpoutPending;
restoreProgress = new Long(-1);
lastAction = ACTION.NOOP;
callsInNoop = 0;
this.autoTuneFactor = autoTuneFactor;
this.progressBound = progressBound;
speculativeAction = ACTION.INCREASE;
}
public static boolean similarToNum(Long val, Long prevVal, double bound) {
if (((double) (Math.abs(val - prevVal))) / prevVal < bound) {
return true;
}
return false;
}
public static boolean moreThanNum(Long val, Long prevVal, double bound) {
if (!similarToNum(val, prevVal, bound)) {
return val > prevVal;
}
return false;
}
public static boolean lessThanNum(Long val, Long prevVal, double bound) {
if (!similarToNum(val, prevVal, bound)) {
return val < prevVal;
}
return false;
}
public Long get() {
return adjustedMaxSpoutPending;
}
/**
* Tune max default max spout pending based on progress
*/
public void autoTune(Long progress) {
// LOG.info ("Called auto tune with progress " + progress);
if (lastAction == ACTION.NOOP) {
// We did not take any action last time
if (prevProgress == -1) {
// The first time around when we are called
doAction(ACTION.INCREASE, autoTuneFactor, progress);
} else if (moreThanNum(progress, prevProgress, progressBound)) {
// We have seen a sudden increase in progress in the steady state (NOOP) try to push
// the max spout pending limit even more
doAction(ACTION.INCREASE, autoTuneFactor, progress);
} else if (lessThanNum(progress, prevProgress, progressBound)) {
// We have seen a sudden drop in progress in the steady state try to decrease
// the max spout pending proportionately to see if we can introduce an improvement
// in progress;
doAction(ACTION.DECREASE,
Math.max((prevProgress - progress) / (float) prevProgress, autoTuneFactor), progress);
} else {
++callsInNoop;
// If the progress remains the same then once in a while try increasing the
// max spout pending.
if (callsInNoop >= NOOP_THRESHOLD) {
doAction(speculativeAction, autoTuneFactor, progress);
speculativeAction =
speculativeAction == ACTION.INCREASE ? ACTION.DECREASE : ACTION.INCREASE;
}
}
} else if (lastAction == ACTION.INCREASE) {
if (moreThanNum(progress, prevProgress, autoTuneFactor - progressBound)) {
// Our increase last time did result in a commisurate increase. Increase even more
doAction(ACTION.INCREASE, autoTuneFactor, progress);
} else if (lessThanNum(progress, prevProgress, progressBound)) {
// Our increase last time actually resulted in a decrease.
// Check how much we decreased. If our decrease was disproportionate
// decrease accordingly.
float drop = Math.max((prevProgress - progress) / (float) prevProgress, autoTuneFactor);
if (drop > autoTuneFactor) {
doAction(ACTION.DECREASE, drop, progress);
} else {
doAction(ACTION.RESTORE, autoTuneFactor, progress);
}
} else {
// Our increase last time did not really change anything. So restore.
doAction(ACTION.RESTORE, autoTuneFactor, progress);
}
} else if (lastAction == ACTION.DECREASE) {
if (moreThanNum(progress, prevProgress, progressBound)) {
// We actually saw an increase in progress. So decrease more
doAction(ACTION.DECREASE, autoTuneFactor, progress);
} else {
// Hold off any decision. We will take any decision next time around.
doAction(ACTION.NOOP, autoTuneFactor, progress);
}
} else if (lastAction == ACTION.RESTORE) {
// Lets ignore the effects of the restore and wait out for the next cycle
doAction(ACTION.NOOP, autoTuneFactor, progress);
}
}
void doAction(ACTION action, float factor, Long currentProgress) {
if (action == ACTION.INCREASE) {
restoreAdjustedMaxSpoutPending = adjustedMaxSpoutPending;
restoreProgress = prevProgress;
adjustedMaxSpoutPending += Math.max(1,
Math.round((Float) (factor * adjustedMaxSpoutPending)));
callsInNoop = 0;
prevProgress = currentProgress;
// LOG.info("Increased max pending to " + adjustedMaxSpoutPending);
} else if (action == ACTION.DECREASE) {
restoreAdjustedMaxSpoutPending = adjustedMaxSpoutPending;
restoreProgress = prevProgress;
adjustedMaxSpoutPending -= Math.max(1,
Math.round((Float) (factor * adjustedMaxSpoutPending)));
// keep a min of CONSERVATIVE_MAX_SPOUT_PENDING or initialMaxSpoutPending tuple outstanding
adjustedMaxSpoutPending = Math.max(
Math.min(CONSERVATIVE_MAX_SPOUT_PENDING, initialMaxSpoutPending),
adjustedMaxSpoutPending);
callsInNoop = 0;
prevProgress = currentProgress;
// LOG.info("Decreased max pending to " + adjustedMaxSpoutPending);
} else if (action == ACTION.RESTORE) {
adjustedMaxSpoutPending = restoreAdjustedMaxSpoutPending;
prevProgress = restoreProgress;
callsInNoop = 0;
// LOG.info("Restoring max pending to " + adjustedMaxSpoutPending);
} else if (action == ACTION.NOOP) {
prevProgress = currentProgress;
}
lastAction = action;
}
// public static Logger LOG = LoggerFactory.getLogger(DefaultMaxSpoutPendingTuner.class);
enum ACTION {
NOOP, INCREASE, DECREASE, RESTORE
}
}