blob: 581025412eba65588ab642991bfdd5ccb1365268 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.table.client.gateway.local;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.util.DummyCustomCommandLine;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.streaming.connectors.kafka.KafkaITService;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.types.DataTypes;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.ViewEntry;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Contains basic tests for the {@link LocalExecutor}.
*/
public class LocalExecutorITCase extends TestLogger {
private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
private static final int NUM_TMS = 2;
private static final int NUM_SLOTS_PER_TM = 2;
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfig(),
NUM_TMS,
NUM_SLOTS_PER_TM),
true);
@ClassRule
public static final KafkaITService KAFKA_IT_SERVICE = new KafkaITService();
private static ClusterClient<?> clusterClient;
@BeforeClass
public static void setup() {
clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
}
private static Configuration getConfig() {
Configuration config = new Configuration();
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4 * 1024 * 1024);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
return config;
}
@Test
public void testValidateSession() throws Exception {
final Executor executor = createDefaultExecutor(clusterClient);
final SessionContext session = new SessionContext("test-session", new Environment());
executor.validateSession(session);
session.addView(ViewEntry.create("AdditionalView1", "SELECT 1"));
session.addView(ViewEntry.create("AdditionalView2", "SELECT * FROM AdditionalView1"));
executor.validateSession(session);
List<String> actualTables =
executor.listTables(session).stream().sorted().collect(Collectors.toList());
List<String> expectedTables = Arrays.asList(
"AdditionalView1",
"AdditionalView2",
"TableNumber1",
"TableNumber2",
"TableSourceSink",
"TestView1",
"TestView2").stream().sorted().collect(Collectors.toList());
assertEquals(expectedTables, actualTables);
session.removeView("AdditionalView1");
try {
executor.validateSession(session);
fail();
} catch (SqlExecutionException e) {
// AdditionalView2 needs AdditionalView1
}
session.removeView("AdditionalView2");
executor.validateSession(session);
actualTables =
executor.listTables(session).stream().sorted().collect(Collectors.toList());
expectedTables = Arrays.asList(
"TableNumber1",
"TableNumber2",
"TableSourceSink",
"TestView1",
"TestView2").stream().sorted().collect(Collectors.toList());
assertEquals(expectedTables, actualTables);
}
@Test
public void testListTables() throws Exception {
final Executor executor = createDefaultExecutor(clusterClient);
final SessionContext session = new SessionContext("test-session", new Environment());
final List<String> actualTables =
executor.listTables(session).stream().sorted().collect(Collectors.toList());
final List<String> expectedTables = Arrays.asList(
"TableNumber1",
"TableNumber2",
"TableSourceSink",
"TestView1",
"TestView2").stream().sorted().collect(Collectors.toList());
assertEquals(expectedTables, actualTables);
}
@Test
public void testListUserDefinedFunctions() throws Exception {
final Executor executor = createDefaultExecutor(clusterClient);
final SessionContext session = new SessionContext("test-session", new Environment());
final Set<String> actualUDXs = new HashSet<>(executor.listUserDefinedFunctions(session));
final List<String> expectedUDXs = Arrays.asList("aggregateUDF", "tableUDF", "scalarUDF");
expectedUDXs.stream().forEach(udf -> actualUDXs.contains(udf));
}
@Test
public void testGetSessionProperties() throws Exception {
final Executor executor = createDefaultExecutor(clusterClient);
final SessionContext session = new SessionContext("test-session", new Environment());
session.setSessionProperty("execution.result-mode", "changelog");
executor.getSessionProperties(session);
// modify defaults
session.setSessionProperty("execution.result-mode", "table");
final Map<String, String> actualProperties = executor.getSessionProperties(session);
final Map<String, String> expectedProperties = new HashMap<>();
expectedProperties.put("execution.type", "batch");
expectedProperties.put("execution.time-characteristic", "event-time");
expectedProperties.put("execution.periodic-watermarks-interval", "99");
expectedProperties.put("execution.parallelism", "1");
expectedProperties.put("execution.max-parallelism", "16");
expectedProperties.put("execution.max-idle-state-retention", "0");
expectedProperties.put("execution.min-idle-state-retention", "0");
expectedProperties.put("execution.result-mode", "table");
expectedProperties.put("execution.max-table-result-rows", "100");
expectedProperties.put("execution.restart-strategy.type", "failure-rate");
expectedProperties.put("execution.restart-strategy.max-failures-per-interval", "10");
expectedProperties.put("execution.restart-strategy.failure-rate-interval", "99000");
expectedProperties.put("execution.restart-strategy.delay", "1000");
expectedProperties.put("deployment.response-timeout", "5000");
assertEquals(expectedProperties, actualProperties);
}
@Test
public void testTableSchema() throws Exception {
final Executor executor = createDefaultExecutor(clusterClient);
final SessionContext session = new SessionContext("test-session", new Environment());
final TableSchema actualTableSchema = executor.getTableSchema(session, "TableNumber2");
final TableSchema expectedTableSchema = new TableSchema(
new String[] {"IntegerField2", "StringField2"},
new InternalType[]{DataTypes.INT, DataTypes.STRING});
assertEquals(expectedTableSchema, actualTableSchema);
}
// TODO: Code Complete is not picked.
@Test
@Ignore
public void testCompleteStatement() throws Exception {
final Executor executor = createDefaultExecutor(clusterClient);
final SessionContext session = new SessionContext("test-session", new Environment());
final List<String> expectedTableHints = Arrays.asList(
"TABLE",
"TableNumber1",
"TableNumber2",
"TableSourceSink");
assertEquals(expectedTableHints, executor.completeStatement(session, "SELECT * FROM Ta", 16));
final List<String> expectedClause = Collections.singletonList("WHERE");
assertEquals(expectedClause, executor.completeStatement(session, "SELECT * FROM TableNumber2 WH", 29));
final List<String> expectedField = Arrays.asList("INTERVAL", "IntegerField1");
assertEquals(expectedField, executor.completeStatement(session, "SELECT * FROM TableNumber1 WHERE Inte", 37));
}
@Test(timeout = 30_000L)
public void testStreamQueryExecutionChangelog() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_RESULT_MODE", "changelog");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
try {
// start job and retrieval
final ResultDescriptor desc = executor.executeQuery(
session,
"SELECT scalarUDF(IntegerField1), StringField1 FROM TableNumber1");
assertFalse(desc.isMaterialized());
final List<String> actualResults =
retrieveChangelogResult(executor, session, desc.getResultId());
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("(true,47,Hello World)");
expectedResults.add("(true,27,Hello World)");
expectedResults.add("(true,37,Hello World)");
expectedResults.add("(true,37,Hello World)");
expectedResults.add("(true,47,Hello World)");
expectedResults.add("(true,57,Hello World!!!!)");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
} finally {
executor.stop(session);
}
}
@Test(timeout = 30_000L)
public void testStreamQueryExecutionTable() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_RESULT_MODE", "table");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
final String query = "SELECT scalarUDF(IntegerField1), StringField1 FROM TableNumber1";
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("47,Hello World");
expectedResults.add("27,Hello World");
expectedResults.add("37,Hello World");
expectedResults.add("37,Hello World");
expectedResults.add("47,Hello World");
expectedResults.add("57,Hello World!!!!");
executeStreamQueryTable(replaceVars, query, expectedResults);
}
@Test(timeout = 30_000L)
public void testStreamQueryExecutionLimitedTable() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_RESULT_MODE", "table");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "1");
final String query = "SELECT COUNT(*), StringField1 FROM TableNumber1 GROUP BY StringField1";
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("1,Hello World!!!!");
executeStreamQueryTable(replaceVars, query, expectedResults);
}
@Test(timeout = 30_000L)
public void testBatchQueryExecution() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
replaceVars.put("$VAR_RESULT_MODE", "table");
replaceVars.put("$VAR_UPDATE_MODE", "");
replaceVars.put("$VAR_MAX_ROWS", "100");
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
try {
final ResultDescriptor desc = executor.executeQuery(session, "SELECT * FROM TestView1");
assertTrue(desc.isMaterialized());
final List<String> actualResults = retrieveTableResult(executor, session, desc.getResultId());
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("47");
expectedResults.add("27");
expectedResults.add("37");
expectedResults.add("37");
expectedResults.add("47");
expectedResults.add("57");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
} finally {
executor.stop(session);
}
}
@Test(timeout = 30_000L)
public void testStreamQueryExecutionSink() throws Exception {
final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString();
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
try {
// start job
final ProgramTargetDescriptor targetDescriptor = executor.executeUpdate(
session,
"INSERT INTO TableSourceSink SELECT IntegerField1 = 42, StringField1 FROM TableNumber1");
// wait for job completion and verify result
boolean isRunning = true;
while (isRunning) {
Thread.sleep(50); // slow the processing down
final JobStatus jobStatus = clusterClient.getJobStatus(JobID.fromHexString(targetDescriptor.getJobId())).get();
switch (jobStatus) {
case CREATED:
case RUNNING:
continue;
case FINISHED:
isRunning = false;
verifySinkResult(csvOutputPath);
break;
default:
fail("Unexpected job status.");
}
}
} finally {
executor.stop(session);
}
}
@Test(timeout = 30_000L)
public void testCreateTable() throws Exception {
final Executor executor = createDefaultExecutor(clusterClient);
final SessionContext session = new SessionContext("test-session", new Environment());
executor.createTable(session, "CREATE TABLE TableFromDDL(field1 INT, field2 VARCHAR) WITH (type = 'type', attributeKey1 = 'value1', attributeKey2 = 'value2')");
final List<String> actualTables =
executor.listTables(session).stream().sorted().collect(Collectors.toList());
final List<String> expectedTables = Arrays.asList(
"TableFromDDL",
"TableNumber1",
"TableNumber2",
"TableSourceSink",
"TestView1",
"TestView2").stream().sorted().collect(Collectors.toList());
assertEquals(expectedTables, actualTables);
final TableSchema schema = executor.getTableSchema(session, "TableFromDDL");
final String expectedSchema =
"root\n" +
" |-- name: field1\n" +
" |-- type: IntType\n" +
" |-- isNullable: true\n" +
" |-- name: field2\n" +
" |-- type: StringType\n" +
" |-- isNullable: true\n";
assertEquals(expectedSchema, schema.toString());
}
@Test(timeout = 30_000L)
public void testStreamQueryExecutionFromDDLTable() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_RESULT_MODE", "table");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
// Create a table with DDL, which has the same schema of TableNumber1
executor.createTable(
session,
"CREATE TABLE TableFromDDL(IntegerField1 INT, StringField1 VARCHAR)" +
" WITH (" +
" type = 'CSV'" +
", path = '" + url.getPath() + "'" +
", commentsPrefix = '#')");
final String query = "SELECT scalarUDF(IntegerField1), StringField1 FROM TableFromDDL";
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("47,Hello World");
expectedResults.add("27,Hello World");
expectedResults.add("37,Hello World");
expectedResults.add("37,Hello World");
expectedResults.add("47,Hello World");
expectedResults.add("57,Hello World!!!!");
try {
// start job and retrieval
final ResultDescriptor desc = executor.executeQuery(session, query);
assertTrue(desc.isMaterialized());
final List<String> actualResults = retrieveTableResult(executor, session, desc.getResultId());
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
} finally {
executor.stop(session);
}
}
@Test(timeout = 30_000L)
public void testStreamQueryExecutionSinkToDDLTable() throws Exception {
final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString();
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
executor.createTable(
session,
"CREATE TABLE SourceTableFromDDL(IntegerField1 INT, StringField1 VARCHAR)" +
" WITH (" +
" type = 'CSV'" +
", path = '" + url.getPath() + "'" +
", commentsPrefix = '#')");
executor.createTable(
session,
"CREATE TABLE SinkTableFromDDL(BooleanField BOOLEAN, StringField VARCHAR)" +
" WITH (" +
" type = 'CSV'" +
", path = '" + csvOutputPath + "')");
try {
final ProgramTargetDescriptor targetDescriptor = executor.executeUpdate(
session,
"INSERT INTO SinkTableFromDDL SELECT IntegerField1 = 42, StringField1 FROM SourceTableFromDDL");
// wait for job completion and verify result
boolean isRunning = true;
while (isRunning) {
Thread.sleep(50); // slow the processing down
final JobStatus jobStatus = clusterClient.getJobStatus(JobID.fromHexString(targetDescriptor.getJobId())).get();
switch (jobStatus) {
case CREATED:
case RUNNING:
continue;
case FINISHED:
isRunning = false;
verifySinkResult(csvOutputPath);
break;
default:
fail("Unexpected job status.");
}
}
} finally {
executor.stop(session);
}
}
@Test(timeout = 30_000L)
public void testStreamQueryExecutionUpsertSink() throws Exception {
final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString();
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
executor.createTable(
session,
"CREATE TABLE SourceTableFromDDL(IntegerField1 INT, StringField1 VARCHAR)" +
" WITH (" +
" type = 'CSV'" +
", path = '" + url.getPath() + "'" +
", commentsPrefix = '#')");
executor.createTable(
session,
"CREATE TABLE SinkTableFromDDL(IntegerField INT, StringField VARCHAR)" +
" WITH (" +
" type = 'CSV'" +
", updateMode = 'UPSERT'" +
", path = '" + csvOutputPath + "')");
try {
final ProgramTargetDescriptor targetDescriptor = executor.executeUpdate(
session,
"INSERT INTO SinkTableFromDDL SELECT SUM(IntegerField1), StringField1 FROM SourceTableFromDDL GROUP BY StringField1");
// wait for job completion and verify result
boolean isRunning = true;
while (isRunning) {
Thread.sleep(50); // slow the processing down
final JobStatus jobStatus = clusterClient.getJobStatus(JobID.fromHexString(targetDescriptor.getJobId())).get();
switch (jobStatus) {
case CREATED:
case RUNNING:
continue;
case FINISHED:
isRunning = false;
verifyUpsertSinkResult(csvOutputPath);
break;
default:
fail("Unexpected job status.");
}
}
} finally {
executor.stop(session);
}
}
@Test(timeout = 30_000L)
public void testStreamQueryExecutionRetractSink() throws Exception {
final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString();
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
executor.createTable(
session,
"CREATE TABLE SourceTableFromDDL(IntegerField1 INT, StringField1 VARCHAR)" +
" WITH (" +
" type = 'CSV'" +
", path = '" + url.getPath() + "'" +
", commentsPrefix = '#')");
executor.createTable(
session,
"CREATE TABLE SinkTableFromDDL(IntegerField INT, StringField VARCHAR)" +
" WITH (" +
" type = 'CSV'" +
", updateMode = 'RETRACT'" +
", path = '" + csvOutputPath + "')");
try {
final ProgramTargetDescriptor targetDescriptor = executor.executeUpdate(
session,
"INSERT INTO SinkTableFromDDL SELECT SUM(IntegerField1), StringField1 FROM SourceTableFromDDL GROUP BY StringField1");
// wait for job completion and verify result
boolean isRunning = true;
while (isRunning) {
Thread.sleep(50); // slow the processing down
final JobStatus jobStatus = clusterClient.getJobStatus(JobID.fromHexString(targetDescriptor.getJobId())).get();
switch (jobStatus) {
case CREATED:
case RUNNING:
continue;
case FINISHED:
isRunning = false;
verifyRetractSinkResult(csvOutputPath);
break;
default:
fail("Unexpected job status.");
}
}
} finally {
executor.stop(session);
}
}
@Test(timeout = 30_000L)
public void testTableSourceSinkTableAsSource() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_SINK_PATH", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_RESULT_MODE", "table");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "1");
final String query = "SELECT COUNT(*), StringField FROM TableSourceSink GROUP BY StringField";
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("1,Hello World!!!!");
executeStreamQueryTable(replaceVars, query, expectedResults);
}
@Test(timeout = 30_000L)
public void testStreamWindowQueryExecutionFromDDL() throws Exception {
final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString();
final URL url = getClass().getClassLoader().getResource("test-data2.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
executor.createTable(
session,
"CREATE TABLE SourceTableFromDDL(IntegerField1 INT, StringField1 VARCHAR, " +
"TsField TIMESTAMP, WATERMARK FOR TsField AS withOffset(TsField, 1000))" +
" WITH (" +
" type = 'CSV'" +
", path = '" + url.getPath() + "'" +
", commentsPrefix = '#')");
executor.createTable(
session,
"CREATE TABLE SinkTableFromDDL(StringField VARCHAR, WindowStart BIGINT, IntegerField INT)" +
" WITH (" +
" type = 'CSV'" +
", updateMode = 'APPEND'" +
", path = '" + csvOutputPath + "')");
try {
final ProgramTargetDescriptor targetDescriptor = executor.executeUpdate(
session,
"INSERT INTO SinkTableFromDDL " +
"SELECT StringField1, CAST(TUMBLE_START(TsField, INTERVAL '1' MINUTE) as BIGINT), SUM(IntegerField1) " +
"FROM SourceTableFromDDL " +
"GROUP BY StringField1, TUMBLE(TsField, INTERVAL '1' MINUTE)");
// wait for job completion and verify result
boolean isRunning = true;
while (isRunning) {
Thread.sleep(50); // slow the processing down
final JobStatus jobStatus = clusterClient.getJobStatus(JobID.fromHexString(targetDescriptor.getJobId())).get();
switch (jobStatus) {
case CREATED:
case RUNNING:
continue;
case FINISHED:
isRunning = false;
final List<String> actualResults = new ArrayList<>();
TestBaseUtils.readAllResultLines(actualResults, csvOutputPath);
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("Hello World,0,170");
expectedResults.add("Hello World!!!!,0,52");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
break;
default:
fail("Unexpected job status.");
}
}
} finally {
executor.stop(session);
}
}
@Test(timeout = 30_000L)
public void testBatchQueryExecutionFromDDLTable() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
replaceVars.put("$VAR_RESULT_MODE", "table");
replaceVars.put("$VAR_UPDATE_MODE", "");
replaceVars.put("$VAR_MAX_ROWS", "100");
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
// Create a table with DDL, which has the same schema of TableNumber1
executor.createTable(
session,
"CREATE TABLE TableFromDDL(IntegerField1 INT, StringField1 VARCHAR)" +
" WITH (" +
" type = 'CSV'" +
", path = '" + url.getPath() + "'" +
", commentsPrefix = '#')");
try {
final ResultDescriptor desc = executor.executeQuery(session, "SELECT scalarUDF(IntegerField1) FROM TableFromDDL");
assertTrue(desc.isMaterialized());
final List<String> actualResults = retrieveTableResult(executor, session, desc.getResultId());
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("47");
expectedResults.add("27");
expectedResults.add("37");
expectedResults.add("37");
expectedResults.add("47");
expectedResults.add("57");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
} finally {
executor.stop(session);
}
}
@Test(timeout = 30_000L)
public void testBatchQueryExecutionRobustness() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
replaceVars.put("$VAR_RESULT_MODE", "table");
replaceVars.put("$VAR_UPDATE_MODE", "");
replaceVars.put("$VAR_MAX_ROWS", "100");
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
// Create an unknown table, which would cause Exception in following queries.
executor.createTable(
session,
"CREATE TABLE UnknownTable(IntegerField1 INT, StringField1 VARCHAR)" +
" WITH (" +
" type = 'UNKNOWN'" +
", path = '" + url.getPath() + "'" +
", commentsPrefix = '#')");
executor.createTable(
session,
"CREATE TABLE TableFromDDL(IntegerField1 INT, StringField1 VARCHAR)" +
" WITH (" +
" type = 'CSV'" +
", path = '" + url.getPath() + "'" +
", commentsPrefix = '#')");
try {
final ResultDescriptor desc = executor.executeQuery(session, "SELECT * FROM UnknownTable");
} catch (Exception e) {
assertTrue(e instanceof SqlExecutionException);
e.printStackTrace();
}
try {
final ResultDescriptor desc = executor.executeQuery(session, "SELECT scalarUDF(IntegerField1) FROM TableFromDDL");
assertTrue(desc.isMaterialized());
final List<String> actualResults = retrieveTableResult(executor, session, desc.getResultId());
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("47");
expectedResults.add("27");
expectedResults.add("37");
expectedResults.add("37");
expectedResults.add("47");
expectedResults.add("57");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
} finally {
executor.stop(session);
}
}
@Test(timeout = 30_000L)
public void testBatchQueryExecutionFromDDLViewWithUDF() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
replaceVars.put("$VAR_RESULT_MODE", "table");
replaceVars.put("$VAR_UPDATE_MODE", "");
replaceVars.put("$VAR_MAX_ROWS", "100");
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
// Create a scalar function
executor.createFunction(
session,
"CREATE FUNCTION scalarDDL "
+ "AS 'org.apache.flink.table.client.gateway.utils.UserDefinedFunctions$ScalarUDF'");
// Create a view
executor.createView(
session,
"CREATE VIEW TestViewDDL AS SELECT scalarDDL(IntegerField1) FROM TableNumber1");
try {
final ResultDescriptor desc = executor.executeQuery(session, "SELECT * FROM TestViewDDL");
assertTrue(desc.isMaterialized());
final List<String> actualResults = retrieveTableResult(executor, session, desc.getResultId());
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("47");
expectedResults.add("27");
expectedResults.add("37");
expectedResults.add("37");
expectedResults.add("47");
expectedResults.add("57");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
} finally {
executor.stop(session);
}
}
@Test(timeout = 60_000L)
public void testReadFromKafka() throws Exception {
final String sourceTopic = "testReadFromKafka";
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_RESULT_MODE", "changelog");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
String resultId = null;
try {
executor.createTable(session, String.format(
"CREATE TABLE t(key VARBINARY, msg VARBINARY, `topic` VARCHAR, `partition` INT, `offset`" +
" BIGINT) with (type = 'KAFKA010', topic = '%s', `bootstrap.servers` =" +
" '%s', `group.id` = 'test-group', startupMode = 'EARLIEST')",
sourceTopic, KafkaITService.brokerConnectionStrings()));
produceMessages(sourceTopic, KafkaITService.brokerConnectionStrings());
ResultDescriptor descriptor = executor.executeQuery(session,
"SELECT CAST(key AS VARCHAR), CAST (msg AS VARCHAR) FROM t");
TypedResult<List<Tuple2<Boolean, Row>>> result;
resultId = descriptor.getResultId();
do {
Thread.sleep(1);
result = executor.retrieveResultChanges(session, resultId);
} while (result.getType() == TypedResult.ResultType.EMPTY);
assertEquals("key_0", result.getPayload().get(0).f1.getField(0));
assertEquals("value_0", result.getPayload().get(0).f1.getField(1));
} finally {
if (resultId != null && !resultId.isEmpty()) {
executor.cancelQuery(session, resultId);
}
}
}
@Test(timeout = 60_000L)
public void testProduceToKafka() throws Exception {
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_RESULT_MODE", "changelog");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
final String sinkTopic = "testProduceToKafka";
try {
executor.createTable(session, String.format(
"CREATE TABLE kafka_sink(messageKey VARBINARY, messageValue VARBINARY, PRIMARY KEY " +
"(messageKey)) with (type = 'KAFKA010', topic = '%s', `bootstrap.servers` = '%s', retries = '3')",
sinkTopic, KafkaITService.brokerConnectionStrings()));
executor.executeUpdate(session,
"INSERT INTO kafka_sink (messageKey, messageValue) VALUES (CAST('key_0' AS VARBINARY), " +
"CAST('value_0' AS VARBINARY))");
// Consume from the sink topic.
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaITService.brokerConnectionStrings());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroupId");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singleton(sinkTopic));
ConsumerRecords<String, String> consumerRecords = ConsumerRecords.empty();
while (consumerRecords.isEmpty()) {
consumerRecords = consumer.poll(1000L);
}
if (consumerRecords.isEmpty()) {
fail("Failed to insert into table.");
}
ConsumerRecord<String, String> record = consumerRecords.iterator().next();
assertEquals("key_0", record.key());
assertEquals("value_0", record.value());
}
} finally {
executor.stop(session);
}
}
private void produceMessages(String topic, String brokerConnectionStrings) {
KafkaITService.createTopic(topic, 1, 1);
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
try (Producer<String, String > producer = new KafkaProducer<>(properties)) {
TopicPartition tp = new TopicPartition(topic, 0);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>(tp.topic(), tp.partition(), "key_" + i, "value_" + i),
(recordMetadata, e) -> {
if (e != null) {
fail("Failed to send message to Kafka due to: " + e);
}
});
}
}
}
private void executeStreamQueryTable(
Map<String, String> replaceVars,
String query,
List<String> expectedResults) throws Exception {
final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
try {
// start job and retrieval
final ResultDescriptor desc = executor.executeQuery(session, query);
assertTrue(desc.isMaterialized());
final List<String> actualResults = retrieveTableResult(executor, session, desc.getResultId());
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
} finally {
executor.stop(session);
}
}
private void verifyUpsertSinkResult(String path) throws IOException {
final List<String> actualResults = new ArrayList<>();
TestBaseUtils.readAllResultLines(actualResults, path);
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("Add,42,Hello World");
expectedResults.add("Add,64,Hello World");
expectedResults.add("Add,96,Hello World");
expectedResults.add("Add,128,Hello World");
expectedResults.add("Add,170,Hello World");
expectedResults.add("Add,52,Hello World!!!!");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
}
private void verifyRetractSinkResult(String path) throws IOException {
final List<String> actualResults = new ArrayList<>();
TestBaseUtils.readAllResultLines(actualResults, path);
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("True,42,Hello World");
expectedResults.add("False,42,Hello World");
expectedResults.add("True,64,Hello World");
expectedResults.add("False,64,Hello World");
expectedResults.add("True,96,Hello World");
expectedResults.add("False,96,Hello World");
expectedResults.add("True,128,Hello World");
expectedResults.add("False,128,Hello World");
expectedResults.add("True,170,Hello World");
expectedResults.add("True,52,Hello World!!!!");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
}
private void verifySinkResult(String path) throws IOException {
final List<String> actualResults = new ArrayList<>();
TestBaseUtils.readAllResultLines(actualResults, path);
final List<String> expectedResults = new ArrayList<>();
expectedResults.add("true,Hello World");
expectedResults.add("false,Hello World");
expectedResults.add("false,Hello World");
expectedResults.add("false,Hello World");
expectedResults.add("true,Hello World");
expectedResults.add("false,Hello World!!!!");
TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
}
private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> clusterClient) throws Exception {
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
replaceVars.put("$VAR_UPDATE_MODE", "");
replaceVars.put("$VAR_MAX_ROWS", "100");
return new LocalExecutor(
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars),
Collections.emptyList(),
clusterClient.getFlinkConfiguration(),
new DummyCustomCommandLine<T>(clusterClient));
}
private <T> LocalExecutor createModifiedExecutor(ClusterClient<T> clusterClient, Map<String, String> replaceVars) throws Exception {
return new LocalExecutor(
EnvironmentFileUtil.parseModified(DEFAULTS_ENVIRONMENT_FILE, replaceVars),
Collections.emptyList(),
clusterClient.getFlinkConfiguration(),
new DummyCustomCommandLine<T>(clusterClient));
}
private List<String> retrieveTableResult(
Executor executor,
SessionContext session,
String resultID) throws InterruptedException {
final List<String> actualResults = new ArrayList<>();
while (true) {
Thread.sleep(50); // slow the processing down
final TypedResult<Integer> result = executor.snapshotResult(session, resultID, 2);
if (result.getType() == TypedResult.ResultType.PAYLOAD) {
actualResults.clear();
IntStream.rangeClosed(1, result.getPayload()).forEach((page) -> {
for (Row row : executor.retrieveResultPage(resultID, page)) {
actualResults.add(row.toString());
}
});
} else if (result.getType() == TypedResult.ResultType.EOS) {
break;
}
}
return actualResults;
}
private List<String> retrieveChangelogResult(
Executor executor,
SessionContext session,
String resultID) throws InterruptedException {
final List<String> actualResults = new ArrayList<>();
while (true) {
Thread.sleep(50); // slow the processing down
final TypedResult<List<Tuple2<Boolean, Row>>> result =
executor.retrieveResultChanges(session, resultID);
if (result.getType() == TypedResult.ResultType.PAYLOAD) {
for (Tuple2<Boolean, Row> change : result.getPayload()) {
actualResults.add(change.toString());
}
} else if (result.getType() == TypedResult.ResultType.EOS) {
break;
}
}
return actualResults;
}
}