blob: ff7608c837fda5f1b4685a18f9e3937f56f44d09 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.types.DataType;
import org.apache.flink.table.api.types.DataTypes;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.utils.DummyTableSourceFactory;
import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.commons.cli.Options;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test for {@link ExecutionContext}.
*/
public class ExecutionContextTest {
private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
private static final String STREAMING_ENVIRONMENT_FILE = "test-sql-client-streaming.yaml";
@Test
public void testExecutionConfig() throws Exception {
final ExecutionContext<?> context = createDefaultExecutionContext();
final ExecutionConfig config = context.createEnvironmentInstance().getExecutionConfig();
assertEquals(99, config.getAutoWatermarkInterval());
final RestartStrategies.RestartStrategyConfiguration restartConfig = config.getRestartStrategy();
assertTrue(restartConfig instanceof RestartStrategies.FailureRateRestartStrategyConfiguration);
final RestartStrategies.FailureRateRestartStrategyConfiguration failureRateStrategy =
(RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig;
assertEquals(10, failureRateStrategy.getMaxFailureRate());
assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds());
assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds());
}
@Test
public void testFunctions() throws Exception {
final ExecutionContext<?> context = createDefaultExecutionContext();
final TableEnvironment tableEnv = context.createEnvironmentInstance().getTableEnvironment();
final String[] expected = new String[]{"scalarUDF", "tableUDF", "aggregateUDF"};
final Set<String> actual = new HashSet<>(Arrays.asList(tableEnv.listUserDefinedFunctions()));
// Arrays.sort(expected);
// Arrays.sort(actual);
// assertArrayEquals(expected, actual);
// TODO: Seems all build-in functions appears in function catalog of TableEnvironment,
// This could be a bug
Arrays.stream(expected).forEach(udf -> actual.contains(udf));
}
@Test
public void testTables() throws Exception {
final ExecutionContext<?> context = createDefaultExecutionContext();
final Map<String, TableSource> sources = context.getTableSources();
final Map<String, TableSink<?>> sinks = context.getTableSinks();
assertEquals(
new HashSet<>(Arrays.asList("TableSourceSink", "TableNumber1", "TableNumber2")),
sources.keySet());
assertEquals(
new HashSet<>(Collections.singletonList("TableSourceSink")),
sinks.keySet());
assertArrayEquals(
new String[]{"IntegerField1", "StringField1"},
sources.get("TableNumber1").getTableSchema().getFieldNames());
assertArrayEquals(
new DataType[]{DataTypes.INT, DataTypes.STRING},
sources.get("TableNumber1").getTableSchema().getFieldTypes());
assertArrayEquals(
new String[]{"IntegerField2", "StringField2"},
sources.get("TableNumber2").getTableSchema().getFieldNames());
assertArrayEquals(
new DataType[]{DataTypes.INT, DataTypes.STRING},
sources.get("TableNumber2").getTableSchema().getFieldTypes());
assertArrayEquals(
new String[]{"BooleanField", "StringField"},
sinks.get("TableSourceSink").getFieldNames());
assertArrayEquals(
new DataType[]{DataTypes.BOOLEAN, DataTypes.STRING},
sinks.get("TableSourceSink").getFieldTypes());
final TableEnvironment tableEnv = context.createEnvironmentInstance().getTableEnvironment();
// TODO: TableEnvironment.listTables returns a sorted array(SortedSet -> Array)
final String[] expected = new String[]{"TableNumber1", "TableNumber2", "TableSourceSink", "TestView1", "TestView2"};
final String[] actual = tableEnv.listTables();
Arrays.sort(expected);
Arrays.sort(actual);
assertArrayEquals(expected, actual);
}
@Test
public void testTemporalTables() throws Exception {
final ExecutionContext<?> context = createStreamingExecutionContext();
assertEquals(
new HashSet<>(Arrays.asList("EnrichmentSource", "HistorySource")),
context.getTableSources().keySet());
final StreamTableEnvironment tableEnv = (StreamTableEnvironment) context.createEnvironmentInstance().getTableEnvironment();
final String[] expected = new String[]{"EnrichmentSource", "HistorySource", "HistoryView", "TemporalTableUsage"};
final String[] actual = tableEnv.listTables();
Arrays.sort(expected);
Arrays.sort(actual);
assertArrayEquals(expected, actual);
// assertArrayEquals(
// new String[]{"SourceTemporalTable", "ViewTemporalTable"},
// tableEnv.listUserDefinedFunctions());
final String[] expectedUdtf = new String[]{"SourceTemporalTable", "ViewTemporalTable"};
final Set<String> actualUdx = new HashSet<>(Arrays.asList(tableEnv.listUserDefinedFunctions()));
Arrays.stream(expectedUdtf).forEach(udtf -> actualUdx.contains(udtf));
assertArrayEquals(
new String[]{"integerField", "stringField", "rowtimeField", "integerField0", "stringField0", "rowtimeField0"},
tableEnv.scan("TemporalTableUsage").getSchema().getFieldNames());
}
private <T> ExecutionContext<T> createExecutionContext(String file, Map<String, String> replaceVars) throws Exception {
final Environment env = EnvironmentFileUtil.parseModified(
file,
replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
final Configuration flinkConfig = new Configuration();
return new ExecutionContext<>(
env,
session,
Collections.emptyList(),
flinkConfig,
new Options(),
Collections.singletonList(new DefaultCLI(flinkConfig)));
}
private <T> ExecutionContext<T> createDefaultExecutionContext() throws Exception {
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_RESULT_MODE", "changelog");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
return createExecutionContext(DEFAULTS_ENVIRONMENT_FILE, replaceVars);
}
private <T> ExecutionContext<T> createStreamingExecutionContext() throws Exception {
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_CONNECTOR_TYPE", DummyTableSourceFactory.CONNECTOR_TYPE_VALUE);
replaceVars.put("$VAR_CONNECTOR_PROPERTY", DummyTableSourceFactory.TEST_PROPERTY);
replaceVars.put("$VAR_CONNECTOR_PROPERTY_VALUE", "");
return createExecutionContext(STREAMING_ENVIRONMENT_FILE, replaceVars);
}
}