blob: 5e3a6d4236ca3930dbbdab83f15b8e1870f74cc5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.configuration;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.ConfigGroup;
import org.apache.flink.annotation.docs.ConfigGroups;
import org.apache.flink.configuration.description.Description;
import java.time.Duration;
import static org.apache.flink.configuration.description.LinkElement.link;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;
/** Config options for restart strategies. */
@PublicEvolving
@ConfigGroups(
groups = {
@ConfigGroup(
name = "ExponentialDelayRestartStrategy",
keyPrefix = "restart-strategy.exponential-delay"),
@ConfigGroup(
name = "FixedDelayRestartStrategy",
keyPrefix = "restart-strategy.fixed-delay"),
@ConfigGroup(
name = "FailureRateRestartStrategy",
keyPrefix = "restart-strategy.failure-rate")
})
public class RestartStrategyOptions {
public static final ConfigOption<String> RESTART_STRATEGY =
ConfigOptions.key("restart-strategy")
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Defines the restart strategy to use in case of job failures.")
.linebreak()
.text("Accepted values are:")
.list(
text(
"%s, %s, %s: No restart strategy.",
code("none"), code("off"), code("disable")),
text(
"%s, %s: Fixed delay restart strategy. More details can be found %s.",
code("fixeddelay"),
code("fixed-delay"),
link(
"../../ops/state/task_failure_recovery#fixed-delay-restart-strategy",
"here")),
text(
"%s, %s: Failure rate restart strategy. More details can be found %s.",
code("failurerate"),
code("failure-rate"),
link(
"../../ops/state/task_failure_recovery#failure-rate-restart-strategy",
"here")),
text(
"%s, %s: Exponential delay restart strategy. More details can be found %s.",
code("exponentialdelay"),
code("exponential-delay"),
link(
"../../ops/state/task_failure_recovery#exponential-delay-restart-strategy",
"here")))
.text(
"If checkpointing is disabled, the default value is %s. "
+ "If checkpointing is enabled, the default value is %s with %s restart attempts and '%s' delay.",
code("none"),
code("fixed-delay"),
code("Integer.MAX_VALUE"),
code("1 s"))
.build());
public static final ConfigOption<Integer> RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS =
ConfigOptions.key("restart-strategy.fixed-delay.attempts")
.intType()
.defaultValue(1)
.withDescription(
Description.builder()
.text(
"The number of times that Flink retries the execution before the job is declared as failed if %s has been set to %s.",
code(RESTART_STRATEGY.key()), code("fixed-delay"))
.build());
public static final ConfigOption<Duration> RESTART_STRATEGY_FIXED_DELAY_DELAY =
ConfigOptions.key("restart-strategy.fixed-delay.delay")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription(
Description.builder()
.text(
"Delay between two consecutive restart attempts if %s has been set to %s. "
+ "Delaying the retries can be helpful when the program interacts with external systems where "
+ "for example connections or pending transactions should reach a timeout before re-execution "
+ "is attempted. It can be specified using notation: \"1 min\", \"20 s\"",
code(RESTART_STRATEGY.key()), code("fixed-delay"))
.build());
public static final ConfigOption<Integer>
RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL =
ConfigOptions.key("restart-strategy.failure-rate.max-failures-per-interval")
.defaultValue(1)
.withDescription(
Description.builder()
.text(
"Maximum number of restarts in given time interval before failing a job if %s has been set to %s.",
code(RESTART_STRATEGY.key()),
code("failure-rate"))
.build());
public static final ConfigOption<Duration> RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL =
ConfigOptions.key("restart-strategy.failure-rate.failure-rate-interval")
.durationType()
.defaultValue(Duration.ofMinutes(1))
.withDescription(
Description.builder()
.text(
"Time interval for measuring failure rate if %s has been set to %s. "
+ "It can be specified using notation: \"1 min\", \"20 s\"",
code(RESTART_STRATEGY.key()), code("failure-rate"))
.build());
public static final ConfigOption<Duration> RESTART_STRATEGY_FAILURE_RATE_DELAY =
ConfigOptions.key("restart-strategy.failure-rate.delay")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription(
Description.builder()
.text(
"Delay between two consecutive restart attempts if %s has been set to %s. "
+ "It can be specified using notation: \"1 min\", \"20 s\"",
code(RESTART_STRATEGY.key()), code("failure-rate"))
.build());
public static final ConfigOption<Duration> RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF =
ConfigOptions.key("restart-strategy.exponential-delay.initial-backoff")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription(
Description.builder()
.text(
"Starting duration between restarts if %s has been set to %s. "
+ "It can be specified using notation: \"1 min\", \"20 s\"",
code(RESTART_STRATEGY.key()), code("exponential-delay"))
.build());
public static final ConfigOption<Duration> RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF =
ConfigOptions.key("restart-strategy.exponential-delay.max-backoff")
.durationType()
.defaultValue(Duration.ofMinutes(5))
.withDescription(
Description.builder()
.text(
"The highest possible duration between restarts if %s has been set to %s. "
+ "It can be specified using notation: \"1 min\", \"20 s\"",
code(RESTART_STRATEGY.key()), code("exponential-delay"))
.build());
public static final ConfigOption<Double> RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER =
ConfigOptions.key("restart-strategy.exponential-delay.backoff-multiplier")
.doubleType()
.defaultValue(2.0)
.withDescription(
Description.builder()
.text(
"Backoff value is multiplied by this value after every failure,"
+ "until max backoff is reached if %s has been set to %s.",
code(RESTART_STRATEGY.key()), code("exponential-delay"))
.build());
public static final ConfigOption<Duration>
RESTART_STRATEGY_EXPONENTIAL_DELAY_RESET_BACKOFF_THRESHOLD =
ConfigOptions.key("restart-strategy.exponential-delay.reset-backoff-threshold")
.durationType()
.defaultValue(Duration.ofHours(1))
.withDescription(
Description.builder()
.text(
"Threshold when the backoff is reset to its initial value if %s has been set to %s. "
+ "It specifies how long the job must be running without failure "
+ "to reset the exponentially increasing backoff to its initial value. "
+ "It can be specified using notation: \"1 min\", \"20 s\"",
code(RESTART_STRATEGY.key()),
code("exponential-delay"))
.build());
public static final ConfigOption<Double> RESTART_STRATEGY_EXPONENTIAL_DELAY_JITTER_FACTOR =
ConfigOptions.key("restart-strategy.exponential-delay.jitter-factor")
.doubleType()
.defaultValue(0.1)
.withDescription(
Description.builder()
.text(
"Jitter specified as a portion of the backoff if %s has been set to %s. "
+ "It represents how large random value will be added or subtracted to the backoff. "
+ "Useful when you want to avoid restarting multiple jobs at the same time.",
code(RESTART_STRATEGY.key()), code("exponential-delay"))
.build());
private RestartStrategyOptions() {
throw new UnsupportedOperationException("This class should never be instantiated.");
}
}