blob: ef54374740bcceec2bd158b3516b281f24e7180c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.environment;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionConfigTest;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.BiConsumer;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
/**
* Tests for configuring {@link StreamExecutionEnvironment} via {@link
* StreamExecutionEnvironment#configure(ReadableConfig, ClassLoader)}.
*
* @see StreamExecutionEnvironmentComplexConfigurationTest
*/
@RunWith(Parameterized.class)
public class StreamExecutionEnvironmentConfigurationTest {
@Parameterized.Parameters(name = "{0}")
public static Collection<TestSpec> specs() {
return Arrays.asList(
TestSpec.testValue(TimeCharacteristic.IngestionTime)
.whenSetFromFile("pipeline.time-characteristic", "IngestionTime")
.viaSetter(StreamExecutionEnvironment::setStreamTimeCharacteristic)
.getterVia(StreamExecutionEnvironment::getStreamTimeCharacteristic)
.nonDefaultValue(TimeCharacteristic.EventTime),
TestSpec.testValue(60000L)
.whenSetFromFile("execution.buffer-timeout", "1 min")
.viaSetter(StreamExecutionEnvironment::setBufferTimeout)
.getterVia(StreamExecutionEnvironment::getBufferTimeout)
.nonDefaultValue(12000L),
TestSpec.testValue(false)
.whenSetFromFile("pipeline.operator-chaining", "false")
.viaSetter(
(env, b) -> {
if (b) {
throw new IllegalArgumentException(
"Cannot programmatically enable operator chaining");
} else {
env.disableOperatorChaining();
}
})
.getterVia(StreamExecutionEnvironment::isChainingEnabled)
.nonDefaultValue(false),
TestSpec.testValue(ExecutionConfig.ClosureCleanerLevel.TOP_LEVEL)
.whenSetFromFile("pipeline.closure-cleaner-level", "TOP_LEVEL")
.viaSetter((env, v) -> env.getConfig().setClosureCleanerLevel(v))
.getterVia(env -> env.getConfig().getClosureCleanerLevel())
.nonDefaultValue(ExecutionConfig.ClosureCleanerLevel.NONE),
TestSpec.testValue(12000L)
.whenSetFromFile("execution.checkpointing.timeout", "12 s")
.viaSetter((env, v) -> env.getCheckpointConfig().setCheckpointTimeout(v))
.getterVia(env -> env.getCheckpointConfig().getCheckpointTimeout())
.nonDefaultValue(100L));
}
@Parameterized.Parameter public TestSpec spec;
@Test
public void testLoadingFromConfiguration() {
StreamExecutionEnvironment configFromSetters =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment configFromFile =
StreamExecutionEnvironment.getExecutionEnvironment();
Configuration configuration = new Configuration();
configuration.setString(spec.key, spec.value);
configFromFile.configure(configuration, ExecutionConfigTest.class.getClassLoader());
spec.setValue(configFromSetters);
spec.assertEqual(configFromFile, configFromSetters);
}
@Test
public void testNotOverridingIfNotSet() {
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
spec.setNonDefaultValue(environment);
Configuration configuration = new Configuration();
environment.configure(configuration, ExecutionConfigTest.class.getClassLoader());
spec.assertEqualNonDefault(environment);
}
private static class TestSpec<T> {
private String key;
private String value;
private final T objectValue;
private T nonDefaultValue;
private BiConsumer<StreamExecutionEnvironment, T> setter;
private Function<StreamExecutionEnvironment, T> getter;
private TestSpec(T value) {
this.objectValue = value;
}
public static <T> TestSpec<T> testValue(T value) {
return new TestSpec<>(value);
}
public TestSpec<T> whenSetFromFile(String key, String value) {
this.key = key;
this.value = value;
return this;
}
public TestSpec<T> viaSetter(BiConsumer<StreamExecutionEnvironment, T> setter) {
this.setter = setter;
return this;
}
public TestSpec<T> getterVia(Function<StreamExecutionEnvironment, T> getter) {
this.getter = getter;
return this;
}
public TestSpec<T> nonDefaultValue(T nonDefaultValue) {
this.nonDefaultValue = nonDefaultValue;
return this;
}
public void setValue(StreamExecutionEnvironment config) {
setter.accept(config, objectValue);
}
public void setNonDefaultValue(StreamExecutionEnvironment config) {
setter.accept(config, nonDefaultValue);
}
public void assertEqual(
StreamExecutionEnvironment configFromFile,
StreamExecutionEnvironment configFromSetters) {
assertThat(getter.apply(configFromFile), equalTo(getter.apply(configFromSetters)));
}
public void assertEqualNonDefault(StreamExecutionEnvironment configFromFile) {
assertThat(getter.apply(configFromFile), equalTo(nonDefaultValue));
}
@Override
public String toString() {
return "key='" + key + '\'';
}
}
}