blob: d97ca4b9c300216ad5c2ea3842dc1cffbeaa944f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.environment;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for configuring {@link CheckpointConfig} via {@link
* CheckpointConfig#configure(ReadableConfig)}.
*/
public class CheckpointConfigFromConfigurationTest {
private static Stream<TestSpec<?>> specs() {
return Stream.of(
TestSpec.testValue(org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE)
.whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE")
.viaSetter(CheckpointConfig::setCheckpointingMode)
.getterVia(CheckpointConfig::getCheckpointingMode)
.nonDefaultValue(
org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE),
TestSpec.testValue(CheckpointingMode.AT_LEAST_ONCE)
.whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE")
.viaSetter(CheckpointConfig::setCheckpointingConsistencyMode)
.getterVia(CheckpointConfig::getCheckpointingConsistencyMode)
.nonDefaultValue(CheckpointingMode.AT_LEAST_ONCE),
TestSpec.testValue(CheckpointingMode.AT_LEAST_ONCE)
.whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE")
.viaSetter(
(config, v) -> {
config.setCheckpointingMode(
org.apache.flink.streaming.api.CheckpointingMode
.valueOf(v.name()));
})
.getterVia(CheckpointConfig::getCheckpointingConsistencyMode)
.nonDefaultValue(CheckpointingMode.AT_LEAST_ONCE),
TestSpec.testValue(org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE)
.whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE")
.viaSetter(
(config, v) -> {
config.setCheckpointingConsistencyMode(
CheckpointingMode.valueOf(v.name()));
})
.getterVia(CheckpointConfig::getCheckpointingMode)
.nonDefaultValue(
org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE),
TestSpec.testValue(10000L)
.whenSetFromFile("execution.checkpointing.interval", "10 s")
.viaSetter(CheckpointConfig::setCheckpointInterval)
.getterVia(CheckpointConfig::getCheckpointInterval)
.nonDefaultValue(100L),
TestSpec.testValue(12000L)
.whenSetFromFile("execution.checkpointing.timeout", "12 s")
.viaSetter(CheckpointConfig::setCheckpointTimeout)
.getterVia(CheckpointConfig::getCheckpointTimeout)
.nonDefaultValue(100L),
TestSpec.testValue(12)
.whenSetFromFile("execution.checkpointing.max-concurrent-checkpoints", "12")
.viaSetter(CheckpointConfig::setMaxConcurrentCheckpoints)
.getterVia(CheckpointConfig::getMaxConcurrentCheckpoints)
.nonDefaultValue(100),
TestSpec.testValue(1000L)
.whenSetFromFile("execution.checkpointing.min-pause", "1 s")
.viaSetter(CheckpointConfig::setMinPauseBetweenCheckpoints)
.getterVia(CheckpointConfig::getMinPauseBetweenCheckpoints)
.nonDefaultValue(100L),
TestSpec.testValue(
CheckpointConfig.ExternalizedCheckpointCleanup
.RETAIN_ON_CANCELLATION)
.whenSetFromFile(
"execution.checkpointing.externalized-checkpoint-retention",
"RETAIN_ON_CANCELLATION")
.viaSetter(CheckpointConfig::setExternalizedCheckpointCleanup)
.getterVia(CheckpointConfig::getExternalizedCheckpointCleanup)
.nonDefaultValue(
CheckpointConfig.ExternalizedCheckpointCleanup
.DELETE_ON_CANCELLATION),
TestSpec.testValue(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
.whenSetFromFile(
"execution.checkpointing.externalized-checkpoint-retention",
"RETAIN_ON_CANCELLATION")
.viaSetter(CheckpointConfig::setExternalizedCheckpointRetention)
.getterVia(CheckpointConfig::getExternalizedCheckpointRetention)
.nonDefaultValue(ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION),
TestSpec.testValue(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
.whenSetFromFile(
"execution.checkpointing.externalized-checkpoint-retention",
"RETAIN_ON_CANCELLATION")
.viaSetter(
(config, v) -> {
config.setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.valueOf(
v.name()));
})
.getterVia(CheckpointConfig::getExternalizedCheckpointRetention)
.nonDefaultValue(ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION),
TestSpec.testValue(
CheckpointConfig.ExternalizedCheckpointCleanup
.RETAIN_ON_CANCELLATION)
.whenSetFromFile(
"execution.checkpointing.externalized-checkpoint-retention",
"RETAIN_ON_CANCELLATION")
.viaSetter(
(config, v) -> {
config.setExternalizedCheckpointRetention(
ExternalizedCheckpointRetention.valueOf(v.name()));
})
.getterVia(CheckpointConfig::getExternalizedCheckpointCleanup)
.nonDefaultValue(
CheckpointConfig.ExternalizedCheckpointCleanup
.DELETE_ON_CANCELLATION),
TestSpec.testValue(12)
.whenSetFromFile(
"execution.checkpointing.tolerable-failed-checkpoints", "12")
.viaSetter(CheckpointConfig::setTolerableCheckpointFailureNumber)
.getterVia(CheckpointConfig::getTolerableCheckpointFailureNumber)
.nonDefaultValue(100),
TestSpec.testValue(true)
.whenSetFromFile("execution.checkpointing.unaligned.enabled", "true")
.viaSetter(CheckpointConfig::enableUnalignedCheckpoints)
.getterVia(CheckpointConfig::isUnalignedCheckpointsEnabled)
.nonDefaultValue(true),
TestSpec.testValue(true)
.whenSetFromFile(
"execution.checkpointing.unaligned.interruptible-timers.enabled",
"true")
.viaSetter(CheckpointConfig::enableUnalignedCheckpointsInterruptibleTimers)
.getterVia(
CheckpointConfig::isUnalignedCheckpointsInterruptibleTimersEnabled)
.nonDefaultValue(true),
TestSpec.testValue(
(CheckpointStorage)
new FileSystemCheckpointStorage(
"file:///path/to/checkpoint/dir"))
.whenSetFromFile(
CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
"file:///path/to/checkpoint/dir")
.viaSetter(CheckpointConfig::setCheckpointStorage)
.getterVia(CheckpointConfig::getCheckpointStorage)
.nonDefaultValue(
new FileSystemCheckpointStorage("file:///path/to/checkpoint/dir"))
.customMatcher(
(actualValue, expectedValue) ->
assertThat(actualValue)
.hasSameClassAs(expectedValue)
.asInstanceOf(
InstanceOfAssertFactories.type(
FileSystemCheckpointStorage.class))
.extracting(
FileSystemCheckpointStorage
::getCheckpointPath)
.isEqualTo(
((FileSystemCheckpointStorage)
expectedValue)
.getCheckpointPath())));
}
@ParameterizedTest
@MethodSource("specs")
public void testLoadingFromConfiguration(TestSpec<?> spec) {
CheckpointConfig configFromSetters = new CheckpointConfig();
CheckpointConfig configFromFile = new CheckpointConfig();
Configuration configuration = new Configuration();
configuration.setString(spec.key, spec.value);
configFromFile.configure(configuration);
spec.setValue(configFromSetters);
spec.assertEqual(configFromFile, configFromSetters);
}
@ParameterizedTest
@MethodSource("specs")
public void testNotOverridingIfNotSet(TestSpec<?> spec) {
CheckpointConfig config = new CheckpointConfig();
spec.setNonDefaultValue(config);
Configuration configuration = new Configuration();
config.configure(configuration);
spec.assertEqualNonDefault(config);
}
private static class TestSpec<T> {
private String key;
private String value;
private final T objectValue;
private T nonDefaultValue;
private BiConsumer<CheckpointConfig, T> setter;
private Function<CheckpointConfig, T> getter;
private BiConsumer<T, T> customAssertion =
(actualValue, expectedValue) -> assertThat(actualValue).isEqualTo(expectedValue);
private TestSpec(T value) {
this.objectValue = value;
}
public static <T> TestSpec<T> testValue(T value) {
return new TestSpec<>(value);
}
public TestSpec<T> whenSetFromFile(String key, String value) {
this.key = key;
this.value = value;
return this;
}
public TestSpec<T> viaSetter(BiConsumer<CheckpointConfig, T> setter) {
this.setter = setter;
return this;
}
public TestSpec<T> getterVia(Function<CheckpointConfig, T> getter) {
this.getter = getter;
return this;
}
public TestSpec<T> nonDefaultValue(T nonDefaultValue) {
this.nonDefaultValue = nonDefaultValue;
return this;
}
public TestSpec<T> customMatcher(BiConsumer<T, T> customAssertion) {
this.customAssertion = customAssertion;
return this;
}
public void setValue(CheckpointConfig config) {
setter.accept(config, objectValue);
}
public void setNonDefaultValue(CheckpointConfig config) {
setter.accept(config, nonDefaultValue);
}
public void assertEqual(
CheckpointConfig configFromFile, CheckpointConfig configFromSetters) {
customAssertion.accept(getter.apply(configFromFile), getter.apply(configFromSetters));
}
public void assertEqualNonDefault(CheckpointConfig configFromFile) {
customAssertion.accept(getter.apply(configFromFile), nonDefaultValue);
}
@Override
public String toString() {
return "key='" + key + '\'';
}
}
}