blob: 8bafda1becbaeb7696657066839a5d0b6ad94a60 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client.config;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.catalog.CatalogType;
import org.apache.flink.table.client.config.entries.CatalogEntry;
import org.apache.flink.table.client.config.entries.DeploymentEntry;
import org.apache.flink.table.client.config.entries.ExecutionEntry;
import org.apache.flink.table.client.config.entries.FunctionEntry;
import org.apache.flink.table.client.config.entries.TableEntry;
import org.apache.flink.table.client.config.entries.ViewEntry;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException;
import java.io.IOException;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Environment configuration that represents the content of an environment file. Environment files
* define tables, execution, catalogs, and deployment behavior. An environment might be defined by default or
* as part of a session. Environments can be merged or enriched with properties (e.g. from CLI command).
*
* <p>In future versions, we might restrict the merging or enrichment of deployment properties to not
* allow overwriting of a deployment by a session.
*/
public class Environment {
public static final String EXECUTION_ENTRY = "execution";
public static final String DEPLOYMENT_ENTRY = "deployment";
private Map<String, TableEntry> tables;
private Map<String, FunctionEntry> functions;
private ExecutionEntry execution;
private DeploymentEntry deployment;
private Map<String, CatalogEntry> catalogs;
public Environment() {
this.tables = Collections.emptyMap();
this.functions = Collections.emptyMap();
this.catalogs = Collections.emptyMap();
this.execution = ExecutionEntry.DEFAULT_INSTANCE;
this.deployment = DeploymentEntry.DEFAULT_INSTANCE;
}
public Map<String, TableEntry> getTables() {
return tables;
}
public void setTables(List<Map<String, Object>> tables) {
this.tables = new LinkedHashMap<>(tables.size());
tables.forEach(config -> {
final TableEntry table = TableEntry.create(config);
if (this.tables.containsKey(table.getName())) {
throw new SqlClientException(
"Cannot create table '" + table.getName() + "' because a table with this name is already registered.");
}
this.tables.put(table.getName(), table);
});
}
public Map<String, FunctionEntry> getFunctions() {
return functions;
}
public void setFunctions(List<Map<String, Object>> functions) {
this.functions = new HashMap<>(functions.size());
functions.forEach(config -> {
final FunctionEntry function = FunctionEntry.create(config);
if (this.functions.containsKey(function.getName())) {
throw new SqlClientException(
"Cannot create function '" + function.getName() + "' because a function with this name is already registered.");
}
this.functions.put(function.getName(), function);
});
}
public void setExecution(Map<String, Object> config) {
this.execution = ExecutionEntry.create(config);
}
public ExecutionEntry getExecution() {
return execution;
}
public void setDeployment(Map<String, Object> config) {
this.deployment = DeploymentEntry.create(config);
}
public DeploymentEntry getDeployment() {
return deployment;
}
public Map<String, CatalogEntry> getCatalogs() {
return catalogs;
}
public void setCatalogs(List<Map<String, Object>> catalogList) {
this.catalogs = new HashMap<>(catalogList.size());
catalogList.forEach(config -> {
final CatalogEntry catalog = CatalogEntry.create(config);
if (catalogs.containsKey(catalog.getName())) {
throw new SqlClientException(
String.format("Catalog %s is already registered", catalog.getName()));
}
catalogs.put(catalog.getName(), catalog);
});
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("===================== Tables =====================\n");
tables.forEach((name, table) -> {
sb.append("- ").append(TableEntry.TABLES_NAME).append(": ").append(name).append("\n");
table.asMap().forEach((k, v) -> sb.append(" ").append(k).append(": ").append(v).append('\n'));
});
sb.append("=================== Functions ====================\n");
functions.forEach((name, function) -> {
sb.append("- ").append(FunctionEntry.FUNCTIONS_NAME).append(": ").append(name).append("\n");
function.asMap().forEach((k, v) -> sb.append(" ").append(k).append(": ").append(v).append('\n'));
});
sb.append("=================== Execution ====================\n");
execution.asTopLevelMap().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n'));
sb.append("=================== Deployment ===================\n");
deployment.asTopLevelMap().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n'));
return sb.toString();
}
// --------------------------------------------------------------------------------------------
/**
* Parses an environment file from an URL.
*/
public static Environment parse(URL url) throws IOException {
try {
Environment env = new ConfigUtil.LowerCaseYamlMapper().readValue(url, Environment.class);
validateEnvironment(env);
return env;
} catch (JsonMappingException e) {
throw new SqlClientException("Could not parse environment file. Cause: " + e.getMessage());
}
}
/**
* Parses an environment file from an String.
*/
public static Environment parse(String content) throws IOException {
try {
Environment env = new ConfigUtil.LowerCaseYamlMapper().readValue(content, Environment.class);
validateEnvironment(env);
return env;
} catch (JsonMappingException e) {
throw new SqlClientException("Could not parse environment file. Cause: " + e.getMessage());
}
}
/**
* check whether the environment file is valid.
*/
public static void validateEnvironment(Environment env) {
if (env.tables.size() < 1) {
return;
}
boolean isHiveCatalogDefault = env.catalogs.values().stream().filter(c ->
c.isDefaultCatalog()
&& c.getType().isPresent()
&& c.getType().get().equals(CatalogType.hive.name())).count() > 0;
if (isHiveCatalogDefault) {
throw new SqlClientException("Flink table cannot be registered into Hive catalog. "
+ "Please set a flink_in_memory catalog as default catalog, "
+ "or set none catalog as default in yaml config file");
}
}
/**
* Merges two environments. The properties of the first environment might be overwritten by the second one.
*/
public static Environment merge(Environment env1, Environment env2) {
final Environment mergedEnv = new Environment();
// merge tables
final Map<String, TableEntry> tables = new LinkedHashMap<>(env1.getTables());
tables.putAll(env2.getTables());
mergedEnv.tables = tables;
// merge functions
final Map<String, FunctionEntry> functions = new HashMap<>(env1.getFunctions());
functions.putAll(env2.getFunctions());
mergedEnv.functions = functions;
// merge execution properties
mergedEnv.execution = ExecutionEntry.merge(env1.getExecution(), env2.getExecution());
// merge deployment properties
mergedEnv.deployment = DeploymentEntry.merge(env1.getDeployment(), env2.getDeployment());
Map<String, CatalogEntry> catalogs = new HashMap<>();
catalogs.putAll(env1.getCatalogs());
catalogs.putAll(env2.getCatalogs());
mergedEnv.catalogs = catalogs;
validateEnvironment(mergedEnv);
return mergedEnv;
}
/**
* Enriches an environment with new/modified properties or views and returns the new instance.
*/
public static Environment enrich(
Environment env,
Map<String, String> properties,
Map<String, ViewEntry> views) {
final Environment enrichedEnv = new Environment();
// merge tables
enrichedEnv.tables = new LinkedHashMap<>(env.getTables());
enrichedEnv.tables.putAll(views);
// merge functions
enrichedEnv.functions = new HashMap<>(env.getFunctions());
// enrich execution properties
enrichedEnv.execution = ExecutionEntry.enrich(env.execution, properties);
// enrich deployment properties
enrichedEnv.deployment = DeploymentEntry.enrich(env.deployment, properties);
validateEnvironment(enrichedEnv);
return enrichedEnv;
}
}