blob: 7510805460ffe6aedaa08893978488366406d186 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* Throttles storage operations to minimize errors and maximum throughput. This
* improves throughput by as much as 35% when the service throttles requests due
* to exceeding account level ingress or egress limits.
*/
@InterfaceAudience.Private
class ClientThrottlingAnalyzer {
private static final Logger LOG = LoggerFactory.getLogger(
ClientThrottlingAnalyzer.class);
private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
private static final double MAX_EQUILIBRIUM_ERROR_PERCENTAGE = 1;
private static final double RAPID_SLEEP_DECREASE_FACTOR = .75;
private static final double RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS = 150
* 1000;
private static final double SLEEP_DECREASE_FACTOR = .975;
private static final double SLEEP_INCREASE_FACTOR = 1.05;
private int analysisPeriodMs;
private volatile int sleepDuration = 0;
private long consecutiveNoErrorCount = 0;
private String name = null;
private Timer timer = null;
private AtomicReference<BlobOperationMetrics> blobMetrics = null;
private ClientThrottlingAnalyzer() {
// hide default constructor
}
/**
* Creates an instance of the <code>ClientThrottlingAnalyzer</code> class with
* the specified name.
*
* @param name a name used to identify this instance.
*
* @throws IllegalArgumentException if name is null or empty.
*/
ClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
this(name, DEFAULT_ANALYSIS_PERIOD_MS);
}
/**
* Creates an instance of the <code>ClientThrottlingAnalyzer</code> class with
* the specified name and period.
*
* @param name A name used to identify this instance.
*
* @param period The frequency, in milliseconds, at which metrics are
* analyzed.
*
* @throws IllegalArgumentException
* If name is null or empty.
* If period is less than 1000 or greater than 30000 milliseconds.
*/
ClientThrottlingAnalyzer(String name, int period)
throws IllegalArgumentException {
Preconditions.checkArgument(
StringUtils.isNotEmpty(name),
"The argument 'name' cannot be null or empty.");
Preconditions.checkArgument(
period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
"The argument 'period' must be between 1000 and 30000.");
this.name = name;
this.analysisPeriodMs = period;
this.blobMetrics = new AtomicReference<BlobOperationMetrics>(
new BlobOperationMetrics(System.currentTimeMillis()));
this.timer = new Timer(
String.format("wasb-timer-client-throttling-analyzer-%s", name), true);
this.timer.schedule(new TimerTaskImpl(),
analysisPeriodMs,
analysisPeriodMs);
}
/**
* Updates metrics with results from the current storage operation.
*
* @param count The count of bytes transferred.
*
* @param isFailedOperation True if the operation failed; otherwise false.
*/
public void addBytesTransferred(long count, boolean isFailedOperation) {
BlobOperationMetrics metrics = blobMetrics.get();
if (isFailedOperation) {
metrics.bytesFailed.addAndGet(count);
metrics.operationsFailed.incrementAndGet();
} else {
metrics.bytesSuccessful.addAndGet(count);
metrics.operationsSuccessful.incrementAndGet();
}
}
/**
* Suspends the current storage operation, as necessary, to reduce throughput.
*/
public void suspendIfNecessary() {
int duration = sleepDuration;
if (duration > 0) {
try {
Thread.sleep(duration);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
@VisibleForTesting
int getSleepDuration() {
return sleepDuration;
}
private int analyzeMetricsAndUpdateSleepDuration(BlobOperationMetrics metrics,
int sleepDuration) {
final double percentageConversionFactor = 100;
double bytesFailed = metrics.bytesFailed.get();
double bytesSuccessful = metrics.bytesSuccessful.get();
double operationsFailed = metrics.operationsFailed.get();
double operationsSuccessful = metrics.operationsSuccessful.get();
double errorPercentage = (bytesFailed <= 0)
? 0
: percentageConversionFactor
* bytesFailed
/ (bytesFailed + bytesSuccessful);
long periodMs = metrics.endTime - metrics.startTime;
double newSleepDuration;
if (errorPercentage < MIN_ACCEPTABLE_ERROR_PERCENTAGE) {
++consecutiveNoErrorCount;
// Decrease sleepDuration in order to increase throughput.
double reductionFactor =
(consecutiveNoErrorCount * analysisPeriodMs
>= RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS)
? RAPID_SLEEP_DECREASE_FACTOR
: SLEEP_DECREASE_FACTOR;
newSleepDuration = sleepDuration * reductionFactor;
} else if (errorPercentage < MAX_EQUILIBRIUM_ERROR_PERCENTAGE) {
// Do not modify sleepDuration in order to stabilize throughput.
newSleepDuration = sleepDuration;
} else {
// Increase sleepDuration in order to minimize error rate.
consecutiveNoErrorCount = 0;
// Increase sleep duration in order to reduce throughput and error rate.
// First, calculate target throughput: bytesSuccessful / periodMs.
// Next, calculate time required to send *all* data (assuming next period
// is similar to previous) at the target throughput: (bytesSuccessful
// + bytesFailed) * periodMs / bytesSuccessful. Next, subtract periodMs to
// get the total additional delay needed.
double additionalDelayNeeded = 5 * analysisPeriodMs;
if (bytesSuccessful > 0) {
additionalDelayNeeded = (bytesSuccessful + bytesFailed)
* periodMs
/ bytesSuccessful
- periodMs;
}
// amortize the additional delay needed across the estimated number of
// requests during the next period
newSleepDuration = additionalDelayNeeded
/ (operationsFailed + operationsSuccessful);
final double maxSleepDuration = analysisPeriodMs;
final double minSleepDuration = sleepDuration * SLEEP_INCREASE_FACTOR;
// Add 1 ms to avoid rounding down and to decrease proximity to the server
// side ingress/egress limit. Ensure that the new sleep duration is
// larger than the current one to more quickly reduce the number of
// errors. Don't allow the sleep duration to grow unbounded, after a
// certain point throttling won't help, for example, if there are far too
// many tasks/containers/nodes no amount of throttling will help.
newSleepDuration = Math.max(newSleepDuration, minSleepDuration) + 1;
newSleepDuration = Math.min(newSleepDuration, maxSleepDuration);
}
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"%5.5s, %10d, %10d, %10d, %10d, %6.2f, %5d, %5d, %5d",
name,
(int) bytesFailed,
(int) bytesSuccessful,
(int) operationsFailed,
(int) operationsSuccessful,
errorPercentage,
periodMs,
(int) sleepDuration,
(int) newSleepDuration));
}
return (int) newSleepDuration;
}
/**
* Timer callback implementation for periodically analyzing metrics.
*/
class TimerTaskImpl extends TimerTask {
private AtomicInteger doingWork = new AtomicInteger(0);
/**
* Periodically analyzes a snapshot of the blob storage metrics and updates
* the sleepDuration in order to appropriately throttle storage operations.
*/
@Override
public void run() {
boolean doWork = false;
try {
doWork = doingWork.compareAndSet(0, 1);
// prevent concurrent execution of this task
if (!doWork) {
return;
}
long now = System.currentTimeMillis();
if (now - blobMetrics.get().startTime >= analysisPeriodMs) {
BlobOperationMetrics oldMetrics = blobMetrics.getAndSet(
new BlobOperationMetrics(now));
oldMetrics.endTime = now;
sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
sleepDuration);
}
}
finally {
if (doWork) {
doingWork.set(0);
}
}
}
}
/**
* Stores blob operation metrics during each analysis period.
*/
static class BlobOperationMetrics {
private AtomicLong bytesFailed;
private AtomicLong bytesSuccessful;
private AtomicLong operationsFailed;
private AtomicLong operationsSuccessful;
private long endTime;
private long startTime;
BlobOperationMetrics(long startTime) {
this.startTime = startTime;
this.bytesFailed = new AtomicLong();
this.bytesSuccessful = new AtomicLong();
this.operationsFailed = new AtomicLong();
this.operationsSuccessful = new AtomicLong();
}
}
}