| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.util; |
| |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; |
| |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects; |
| import org.joda.time.Duration; |
| |
| /** |
| * A fluent builder for {@link BackOff} objects that allows customization of the retry algorithm. |
| * |
| * @see #DEFAULT for the default configuration parameters. |
| */ |
| public final class FluentBackoff { |
| |
| private static final double DEFAULT_EXPONENT = 1.5; |
| private static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5; |
| private static final Duration DEFAULT_MIN_BACKOFF = Duration.standardSeconds(1); |
| private static final Duration DEFAULT_MAX_BACKOFF = Duration.standardDays(1000); |
| private static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; |
| private static final Duration DEFAULT_MAX_CUM_BACKOFF = Duration.standardDays(1000); |
| |
| private final double exponent; |
| private final Duration initialBackoff; |
| private final Duration maxBackoff; |
| private final Duration maxCumulativeBackoff; |
| private final int maxRetries; |
| |
| /** |
| * By default the {@link BackOff} created by this builder will use exponential backoff (base |
| * exponent 1.5) with an initial backoff of 1 second. These parameters can be overridden with |
| * {@link #withExponent(double)} and {@link #withInitialBackoff(Duration)}, respectively, and the |
| * maximum backoff after exponential increase can be capped using {@link |
| * FluentBackoff#withMaxBackoff(Duration)}. |
| * |
| * <p>The default {@link BackOff} does not limit the number of retries. To limit the backoff, the |
| * maximum total number of retries can be set using {@link #withMaxRetries(int)}. The total time |
| * spent in backoff can be time-bounded as well by configuring {@link |
| * #withMaxCumulativeBackoff(Duration)}. If either of these limits are reached, calls to {@link |
| * BackOff#nextBackOffMillis()} will return {@link BackOff#STOP} to signal that no more retries |
| * should continue. |
| */ |
| public static final FluentBackoff DEFAULT = |
| new FluentBackoff( |
| DEFAULT_EXPONENT, |
| DEFAULT_MIN_BACKOFF, |
| DEFAULT_MAX_BACKOFF, |
| DEFAULT_MAX_CUM_BACKOFF, |
| DEFAULT_MAX_RETRIES); |
| |
| /** |
| * Instantiates a {@link BackOff} that will obey the current configuration. |
| * |
| * @see FluentBackoff |
| */ |
| public BackOff backoff() { |
| return new BackoffImpl(this); |
| } |
| |
| /** |
| * Returns a copy of this {@link FluentBackoff} that instead uses the specified exponent to |
| * control the exponential growth of delay. |
| * |
| * <p>Does not modify this object. |
| * |
| * @see FluentBackoff |
| */ |
| public FluentBackoff withExponent(double exponent) { |
| checkArgument(exponent > 0, "exponent %s must be greater than 0", exponent); |
| return new FluentBackoff( |
| exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); |
| } |
| |
| /** |
| * Returns a copy of this {@link FluentBackoff} that instead uses the specified initial backoff |
| * duration. |
| * |
| * <p>Does not modify this object. |
| * |
| * @see FluentBackoff |
| */ |
| public FluentBackoff withInitialBackoff(Duration initialBackoff) { |
| checkArgument( |
| initialBackoff.isLongerThan(Duration.ZERO), |
| "initialBackoff %s must be at least 1 millisecond", |
| initialBackoff); |
| return new FluentBackoff( |
| exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); |
| } |
| |
| /** |
| * Returns a copy of this {@link FluentBackoff} that limits the maximum backoff of an individual |
| * attempt to the specified duration. |
| * |
| * <p>Does not modify this object. |
| * |
| * @see FluentBackoff |
| */ |
| public FluentBackoff withMaxBackoff(Duration maxBackoff) { |
| checkArgument( |
| maxBackoff.getMillis() > 0, "maxBackoff %s must be at least 1 millisecond", maxBackoff); |
| return new FluentBackoff( |
| exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); |
| } |
| |
| /** |
| * Returns a copy of this {@link FluentBackoff} that limits the total time spent in backoff |
| * returned across all calls to {@link BackOff#nextBackOffMillis()}. |
| * |
| * <p>Does not modify this object. |
| * |
| * @see FluentBackoff |
| */ |
| public FluentBackoff withMaxCumulativeBackoff(Duration maxCumulativeBackoff) { |
| checkArgument( |
| maxCumulativeBackoff.isLongerThan(Duration.ZERO), |
| "maxCumulativeBackoff %s must be at least 1 millisecond", |
| maxCumulativeBackoff); |
| return new FluentBackoff( |
| exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); |
| } |
| |
| /** |
| * Returns a copy of this {@link FluentBackoff} that limits the total number of retries, aka the |
| * total number of calls to {@link BackOff#nextBackOffMillis()} before returning {@link |
| * BackOff#STOP}. |
| * |
| * <p>Does not modify this object. |
| * |
| * @see FluentBackoff |
| */ |
| public FluentBackoff withMaxRetries(int maxRetries) { |
| checkArgument(maxRetries >= 0, "maxRetries %s cannot be negative", maxRetries); |
| return new FluentBackoff( |
| exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(FluentBackoff.class) |
| .add("exponent", exponent) |
| .add("initialBackoff", initialBackoff) |
| .add("maxBackoff", maxBackoff) |
| .add("maxRetries", maxRetries) |
| .add("maxCumulativeBackoff", maxCumulativeBackoff) |
| .toString(); |
| } |
| |
| private static class BackoffImpl implements BackOff { |
| |
| // Customization of this backoff. |
| private final FluentBackoff backoffConfig; |
| // Current state |
| private Duration currentCumulativeBackoff; |
| private int currentRetry; |
| |
| @Override |
| public void reset() { |
| currentRetry = 0; |
| currentCumulativeBackoff = Duration.ZERO; |
| } |
| |
| @Override |
| public long nextBackOffMillis() { |
| // Maximum number of retries reached. |
| if (currentRetry >= backoffConfig.maxRetries) { |
| return BackOff.STOP; |
| } |
| // Maximum cumulative backoff reached. |
| if (currentCumulativeBackoff.compareTo(backoffConfig.maxCumulativeBackoff) >= 0) { |
| return BackOff.STOP; |
| } |
| |
| double currentIntervalMillis = |
| Math.min( |
| backoffConfig.initialBackoff.getMillis() |
| * Math.pow(backoffConfig.exponent, currentRetry), |
| backoffConfig.maxBackoff.getMillis()); |
| double randomOffset = |
| (Math.random() * 2 - 1) * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis; |
| long nextBackoffMillis = Math.round(currentIntervalMillis + randomOffset); |
| // Cap to limit on cumulative backoff |
| Duration remainingCumulative = |
| backoffConfig.maxCumulativeBackoff.minus(currentCumulativeBackoff); |
| nextBackoffMillis = Math.min(nextBackoffMillis, remainingCumulative.getMillis()); |
| |
| // Update state and return backoff. |
| currentCumulativeBackoff = currentCumulativeBackoff.plus(nextBackoffMillis); |
| currentRetry += 1; |
| return nextBackoffMillis; |
| } |
| |
| private BackoffImpl(FluentBackoff backoffConfig) { |
| this.backoffConfig = backoffConfig; |
| this.reset(); |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(BackoffImpl.class) |
| .add("backoffConfig", backoffConfig) |
| .add("currentRetry", currentRetry) |
| .add("currentCumulativeBackoff", currentCumulativeBackoff) |
| .toString(); |
| } |
| } |
| |
| private FluentBackoff( |
| double exponent, |
| Duration initialBackoff, |
| Duration maxBackoff, |
| Duration maxCumulativeBackoff, |
| int maxRetries) { |
| this.exponent = exponent; |
| this.initialBackoff = initialBackoff; |
| this.maxBackoff = maxBackoff; |
| this.maxRetries = maxRetries; |
| this.maxCumulativeBackoff = maxCumulativeBackoff; |
| } |
| } |