blob: 0f973a14c6f510c4dd1b6aa575b9691fb0994b9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client.gateway.local;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.BatchQueryConfig;
import org.apache.flink.table.api.QueryConfig;
import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfigOptions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.functions.AggregateFunction;
import org.apache.flink.table.api.functions.FunctionService;
import org.apache.flink.table.api.functions.ScalarFunction;
import org.apache.flink.table.api.functions.TableFunction;
import org.apache.flink.table.api.functions.UserDefinedFunction;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ReadableCatalog;
import org.apache.flink.table.client.catalog.ClientCatalogFactory;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.CatalogEntry;
import org.apache.flink.table.client.config.entries.DeploymentEntry;
import org.apache.flink.table.client.config.entries.ExecutionEntry;
import org.apache.flink.table.client.config.entries.SinkTableEntry;
import org.apache.flink.table.client.config.entries.SourceSinkTableEntry;
import org.apache.flink.table.client.config.entries.SourceTableEntry;
import org.apache.flink.table.client.config.entries.TemporalTableEntry;
import org.apache.flink.table.client.config.entries.ViewEntry;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.factories.BatchTableSinkFactory;
import org.apache.flink.table.factories.BatchTableSourceFactory;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.util.FlinkException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Context for executing table programs. This class caches everything that can be cached across
* multiple queries as long as the session context does not change. This must be thread-safe as
* it might be reused across different query submissions.
*
* @param <T> cluster id
*/
public class ExecutionContext<T> {
private final SessionContext sessionContext;
private final Environment mergedEnv;
private final List<URL> dependencies;
private final ClassLoader classLoader;
private final Map<String, TableSource> tableSources;
private final Map<String, TableSink<?>> tableSinks;
private final Map<String, UserDefinedFunction> functions;
private final Configuration flinkConfig;
private final CommandLine commandLine;
private final CustomCommandLine<T> activeCommandLine;
private final RunOptions runOptions;
private final T clusterId;
private final ClusterSpecification clusterSpec;
private final EnvironmentInstance environmentInstance;
public ExecutionContext(Environment defaultEnvironment, SessionContext sessionContext, List<URL> dependencies,
Configuration flinkConfig, Options commandLineOptions, List<CustomCommandLine<?>> availableCommandLines) {
this.sessionContext = sessionContext.copy(); // create internal copy because session context is mutable
this.mergedEnv = Environment.merge(defaultEnvironment, sessionContext.getEnvironment());
this.dependencies = dependencies;
this.flinkConfig = flinkConfig;
// create class loader
classLoader = FlinkUserCodeClassLoaders.parentFirst(
dependencies.toArray(new URL[dependencies.size()]),
this.getClass().getClassLoader());
// create table sources & sinks.
tableSources = new HashMap<>();
tableSinks = new HashMap<>();
mergedEnv.getTables().forEach((name, entry) -> {
if (entry instanceof SourceTableEntry || entry instanceof SourceSinkTableEntry) {
tableSources.put(name, createTableSource(mergedEnv.getExecution(), entry.asMap(), classLoader));
}
if (entry instanceof SinkTableEntry || entry instanceof SourceSinkTableEntry) {
tableSinks.put(name, createTableSink(mergedEnv.getExecution(), entry.asMap(), classLoader));
}
});
// create user-defined functions
functions = new HashMap<>();
mergedEnv.getFunctions().forEach((name, entry) -> {
final UserDefinedFunction function = FunctionService.createFunction(entry.getDescriptor(), classLoader, false);
functions.put(name, function);
});
// convert deployment options into command line options that describe a cluster
commandLine = createCommandLine(mergedEnv.getDeployment(), commandLineOptions);
activeCommandLine = findActiveCommandLine(availableCommandLines, commandLine);
runOptions = createRunOptions(commandLine);
clusterId = activeCommandLine.getClusterId(commandLine);
clusterSpec = createClusterSpecification(activeCommandLine, commandLine);
// always share environment instance
environmentInstance = new EnvironmentInstance();
}
public SessionContext getSessionContext() {
return sessionContext;
}
public ClassLoader getClassLoader() {
return classLoader;
}
public Environment getMergedEnvironment() {
return mergedEnv;
}
public ClusterSpecification getClusterSpec() {
return clusterSpec;
}
public T getClusterId() {
return clusterId;
}
public ClusterDescriptor<T> createClusterDescriptor() throws Exception {
return activeCommandLine.createClusterDescriptor(commandLine);
}
public EnvironmentInstance createEnvironmentInstance() {
if (environmentInstance != null) {
return environmentInstance;
}
try {
return new EnvironmentInstance();
} catch (Throwable t) {
// catch everything such that a wrong environment does not affect invocations
throw new SqlExecutionException("Could not create environment instance.", t);
}
}
public Map<String, TableSource> getTableSources() {
return tableSources;
}
public Map<String, TableSink<?>> getTableSinks() {
return tableSinks;
}
/**
* Executes the given supplier using the execution context's classloader as thread classloader.
*/
public <R> R wrapClassLoader(Supplier<R> supplier) {
final ClassLoader previousClassloader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
try {
return supplier.get();
} finally {
Thread.currentThread().setContextClassLoader(previousClassloader);
}
}
// --------------------------------------------------------------------------------------------
private static CommandLine createCommandLine(DeploymentEntry deployment, Options commandLineOptions) {
try {
return deployment.getCommandLine(commandLineOptions);
} catch (Exception e) {
throw new SqlExecutionException("Invalid deployment options.", e);
}
}
@SuppressWarnings("unchecked")
private static <T> CustomCommandLine<T> findActiveCommandLine(List<CustomCommandLine<?>> availableCommandLines, CommandLine commandLine) {
for (CustomCommandLine<?> cli : availableCommandLines) {
if (cli.isActive(commandLine)) {
return (CustomCommandLine<T>) cli;
}
}
throw new SqlExecutionException("Could not find a matching deployment.");
}
private static RunOptions createRunOptions(CommandLine commandLine) {
try {
return new RunOptions(commandLine);
} catch (CliArgsException e) {
throw new SqlExecutionException("Invalid deployment run options.", e);
}
}
private static ClusterSpecification createClusterSpecification(CustomCommandLine<?> activeCommandLine, CommandLine commandLine) {
try {
return activeCommandLine.getClusterSpecification(commandLine);
} catch (FlinkException e) {
throw new SqlExecutionException("Could not create cluster specification for the given deployment.", e);
}
}
private static TableSource createTableSource(ExecutionEntry execution, Map<String, String> sourceProperties, ClassLoader classLoader) {
if (execution.isStreamingExecution()) {
final StreamTableSourceFactory<?> factory = (StreamTableSourceFactory<?>)
TableFactoryService.find(StreamTableSourceFactory.class, sourceProperties, classLoader);
return factory.createStreamTableSource(sourceProperties);
} else if (execution.isBatchExecution()) {
final BatchTableSourceFactory<?> factory = (BatchTableSourceFactory<?>)
TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader);
return factory.createBatchTableSource(sourceProperties);
}
throw new SqlExecutionException("Unsupported execution type for sources.");
}
private static TableSink<?> createTableSink(ExecutionEntry execution, Map<String, String> sinkProperties, ClassLoader classLoader) {
if (execution.isStreamingExecution()) {
final StreamTableSinkFactory<?> factory = (StreamTableSinkFactory<?>)
TableFactoryService.find(StreamTableSinkFactory.class, sinkProperties, classLoader);
return factory.createStreamTableSink(sinkProperties);
} else if (execution.isBatchExecution()) {
final BatchTableSinkFactory<?> factory = (BatchTableSinkFactory<?>)
TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader);
return factory.createBatchTableSink(sinkProperties);
}
throw new SqlExecutionException("Unsupported execution type for sinks.");
}
// --------------------------------------------------------------------------------------------
/**
* {@link ExecutionEnvironment} and {@link StreamExecutionEnvironment} cannot be reused
* across multiple queries because they are stateful. This class abstracts execution
* environments and table environments.
*/
public class EnvironmentInstance {
private final QueryConfig queryConfig;
private final StreamExecutionEnvironment streamExecEnv;
private final TableEnvironment tableEnv;
private EnvironmentInstance() {
// create environments
streamExecEnv = createStreamExecutionEnvironment();
if (mergedEnv.getExecution().isStreamingExecution()) {
tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv);
} else if (mergedEnv.getExecution().isBatchExecution()) {
tableEnv = TableEnvironment.getBatchTableEnvironment(streamExecEnv);
} else {
throw new SqlExecutionException("Unsupported execution type specified.");
}
// create query config
queryConfig = createQueryConfig();
// register table sources
tableSources.forEach(tableEnv::registerTableSource);
// register table sinks
tableSinks.forEach(tableEnv::registerTableSink);
// register user-defined functions
registerFunctions();
// register catalogs
mergedEnv.getCatalogs().forEach((name, catalog) -> {
ReadableCatalog readableCatalog = ClientCatalogFactory.createCatalog(catalog);
if (catalog.getDefaultDatabase().isPresent()) {
readableCatalog.setDefaultDatabaseName(catalog.getDefaultDatabase().get());
}
tableEnv.registerCatalog(name, readableCatalog);
});
// register views and temporal tables in specified order
mergedEnv.getTables().forEach((name, entry) -> {
// if registering a view fails at this point,
// it means that it accesses tables that are not available anymore
if (entry instanceof ViewEntry) {
final ViewEntry viewEntry = (ViewEntry) entry;
registerView(viewEntry);
} else if (entry instanceof TemporalTableEntry) {
final TemporalTableEntry temporalTableEntry = (TemporalTableEntry) entry;
registerTemporalTable(temporalTableEntry);
}
});
// TODO: sql client should parse and set job's TableConfig from execution configs from yaml config file.
tableEnv.getConfig().getConf().setInteger(
TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM,
mergedEnv.getExecution().getParallelism());
// always enable subsection optimization
tableEnv.getConfig().setSubsectionOptimization(true);
tableEnv.getConfig().getConf().setBoolean(
TableConfigOptions.SQL_EXEC_SOURCE_VALUES_INPUT_ENABLED,
true);
setDefaultCatalog(mergedEnv.getCatalogs());
}
private void setDefaultCatalog(Map<String, CatalogEntry> catalogs) {
List<CatalogEntry> defaultCatalog = catalogs.values().stream().filter(c -> c.isDefaultCatalog()).collect(Collectors.toList());
if (defaultCatalog.size() > 1) {
throw new IllegalArgumentException(String.format("Only one catalog can be set as default catalog, but currently there are %d", defaultCatalog.size()));
}
if (defaultCatalog.size() == 0) {
return;
}
tableEnv.setDefaultCatalog(defaultCatalog.get(0).getName());
}
public QueryConfig getQueryConfig() {
return queryConfig;
}
public StreamExecutionEnvironment getStreamExecutionEnvironment() {
return streamExecEnv;
}
public TableEnvironment getTableEnvironment() {
return tableEnv;
}
public ExecutionConfig getExecutionConfig() {
return streamExecEnv.getConfig();
}
public JobGraph createJobGraph(String name) {
final FlinkPlan plan = createPlan(name, flinkConfig);
return ClusterClient.getJobGraph(
flinkConfig,
plan,
dependencies,
runOptions.getClasspaths(),
runOptions.getSavepointRestoreSettings());
}
private FlinkPlan createPlan(String name, Configuration flinkConfig) {
StreamGraph graph = tableEnv.generateStreamGraph();
graph.setJobName(name);
return graph;
}
private StreamExecutionEnvironment createStreamExecutionEnvironment() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(mergedEnv.getExecution().getRestartStrategy());
env.setParallelism(mergedEnv.getExecution().getParallelism());
env.setMaxParallelism(mergedEnv.getExecution().getMaxParallelism());
env.setStreamTimeCharacteristic(mergedEnv.getExecution().getTimeCharacteristic());
if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
env.getConfig().setAutoWatermarkInterval(mergedEnv.getExecution().getPeriodicWatermarksInterval());
}
return env;
}
private QueryConfig createQueryConfig() {
if (mergedEnv.getExecution().isStreamingExecution()) {
final StreamQueryConfig config = new StreamQueryConfig();
final long minRetention = mergedEnv.getExecution().getMinStateRetention();
final long maxRetention = mergedEnv.getExecution().getMaxStateRetention();
config.withIdleStateRetentionTime(Time.milliseconds(minRetention), Time.milliseconds(maxRetention));
return config;
} else {
return new BatchQueryConfig();
}
}
private void registerFunctions() {
if (tableEnv instanceof StreamTableEnvironment) {
StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment) tableEnv;
functions.forEach((k, v) -> {
if (v instanceof ScalarFunction) {
streamTableEnvironment.registerFunction(k, (ScalarFunction) v);
} else if (v instanceof AggregateFunction) {
streamTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
} else if (v instanceof TableFunction) {
streamTableEnvironment.registerFunction(k, (TableFunction<?>) v);
} else {
throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
}
});
} else {
BatchTableEnvironment batchTableEnvironment = (BatchTableEnvironment) tableEnv;
functions.forEach((k, v) -> {
if (v instanceof ScalarFunction) {
batchTableEnvironment.registerFunction(k, (ScalarFunction) v);
} else if (v instanceof AggregateFunction) {
batchTableEnvironment.registerFunction(k, (AggregateFunction<?, ?>) v);
} else if (v instanceof TableFunction) {
batchTableEnvironment.registerFunction(k, (TableFunction<?>) v);
} else {
throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
}
});
}
}
private void registerView(ViewEntry viewEntry) {
try {
tableEnv.registerTable(viewEntry.getName(), tableEnv.sqlQuery(viewEntry.getQuery()));
} catch (Exception e) {
throw new SqlExecutionException(
"Invalid view '" + viewEntry.getName() + "' with query:\n" + viewEntry.getQuery()
+ "\nCause: " + e.getMessage());
}
}
private void registerTemporalTable(TemporalTableEntry temporalTableEntry) {
try {
final Table table = tableEnv.scan(temporalTableEntry.getHistoryTable());
final TableFunction<?> function = table.createTemporalTableFunction(
temporalTableEntry.getTimeAttribute(),
String.join(",", temporalTableEntry.getPrimaryKeyFields()));
if (tableEnv instanceof StreamTableEnvironment) {
StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment) tableEnv;
streamTableEnvironment.registerFunction(temporalTableEntry.getName(), function);
} else {
BatchTableEnvironment batchTableEnvironment = (BatchTableEnvironment) tableEnv;
batchTableEnvironment.registerFunction(temporalTableEntry.getName(), function);
}
} catch (Exception e) {
throw new SqlExecutionException(
"Invalid temporal table '" + temporalTableEntry.getName() + "' over table '" +
temporalTableEntry.getHistoryTable() + ".\nCause: " + e.getMessage());
}
}
}
}