| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.table.client.config; |
| |
| import org.apache.flink.table.client.SqlClientException; |
| import org.apache.flink.table.client.catalog.CatalogType; |
| import org.apache.flink.table.client.config.entries.CatalogEntry; |
| import org.apache.flink.table.client.config.entries.DeploymentEntry; |
| import org.apache.flink.table.client.config.entries.ExecutionEntry; |
| import org.apache.flink.table.client.config.entries.FunctionEntry; |
| import org.apache.flink.table.client.config.entries.TableEntry; |
| import org.apache.flink.table.client.config.entries.ViewEntry; |
| |
| import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException; |
| |
| import java.io.IOException; |
| import java.net.URL; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| /** |
| * Environment configuration that represents the content of an environment file. Environment files |
| * define tables, execution, catalogs, and deployment behavior. An environment might be defined by default or |
| * as part of a session. Environments can be merged or enriched with properties (e.g. from CLI command). |
| * |
| * <p>In future versions, we might restrict the merging or enrichment of deployment properties to not |
| * allow overwriting of a deployment by a session. |
| */ |
| public class Environment { |
| |
| public static final String EXECUTION_ENTRY = "execution"; |
| |
| public static final String DEPLOYMENT_ENTRY = "deployment"; |
| |
| private Map<String, TableEntry> tables; |
| |
| private Map<String, FunctionEntry> functions; |
| |
| private ExecutionEntry execution; |
| |
| private DeploymentEntry deployment; |
| |
| private Map<String, CatalogEntry> catalogs; |
| |
| public Environment() { |
| this.tables = Collections.emptyMap(); |
| this.functions = Collections.emptyMap(); |
| this.catalogs = Collections.emptyMap(); |
| this.execution = ExecutionEntry.DEFAULT_INSTANCE; |
| this.deployment = DeploymentEntry.DEFAULT_INSTANCE; |
| } |
| |
| public Map<String, TableEntry> getTables() { |
| return tables; |
| } |
| |
| public void setTables(List<Map<String, Object>> tables) { |
| this.tables = new LinkedHashMap<>(tables.size()); |
| |
| tables.forEach(config -> { |
| final TableEntry table = TableEntry.create(config); |
| if (this.tables.containsKey(table.getName())) { |
| throw new SqlClientException( |
| "Cannot create table '" + table.getName() + "' because a table with this name is already registered."); |
| } |
| this.tables.put(table.getName(), table); |
| }); |
| } |
| |
| public Map<String, FunctionEntry> getFunctions() { |
| return functions; |
| } |
| |
| public void setFunctions(List<Map<String, Object>> functions) { |
| this.functions = new HashMap<>(functions.size()); |
| |
| functions.forEach(config -> { |
| final FunctionEntry function = FunctionEntry.create(config); |
| if (this.functions.containsKey(function.getName())) { |
| throw new SqlClientException( |
| "Cannot create function '" + function.getName() + "' because a function with this name is already registered."); |
| } |
| this.functions.put(function.getName(), function); |
| }); |
| } |
| |
| public void setExecution(Map<String, Object> config) { |
| this.execution = ExecutionEntry.create(config); |
| } |
| |
| public ExecutionEntry getExecution() { |
| return execution; |
| } |
| |
| public void setDeployment(Map<String, Object> config) { |
| this.deployment = DeploymentEntry.create(config); |
| } |
| |
| public DeploymentEntry getDeployment() { |
| return deployment; |
| } |
| |
| public Map<String, CatalogEntry> getCatalogs() { |
| return catalogs; |
| } |
| |
| public void setCatalogs(List<Map<String, Object>> catalogList) { |
| this.catalogs = new HashMap<>(catalogList.size()); |
| |
| catalogList.forEach(config -> { |
| final CatalogEntry catalog = CatalogEntry.create(config); |
| |
| if (catalogs.containsKey(catalog.getName())) { |
| throw new SqlClientException( |
| String.format("Catalog %s is already registered", catalog.getName())); |
| } |
| |
| catalogs.put(catalog.getName(), catalog); |
| }); |
| } |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder(); |
| sb.append("===================== Tables =====================\n"); |
| tables.forEach((name, table) -> { |
| sb.append("- ").append(TableEntry.TABLES_NAME).append(": ").append(name).append("\n"); |
| table.asMap().forEach((k, v) -> sb.append(" ").append(k).append(": ").append(v).append('\n')); |
| }); |
| sb.append("=================== Functions ====================\n"); |
| functions.forEach((name, function) -> { |
| sb.append("- ").append(FunctionEntry.FUNCTIONS_NAME).append(": ").append(name).append("\n"); |
| function.asMap().forEach((k, v) -> sb.append(" ").append(k).append(": ").append(v).append('\n')); |
| }); |
| sb.append("=================== Execution ====================\n"); |
| execution.asTopLevelMap().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n')); |
| sb.append("=================== Deployment ===================\n"); |
| deployment.asTopLevelMap().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n')); |
| return sb.toString(); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Parses an environment file from an URL. |
| */ |
| public static Environment parse(URL url) throws IOException { |
| try { |
| Environment env = new ConfigUtil.LowerCaseYamlMapper().readValue(url, Environment.class); |
| validateEnvironment(env); |
| return env; |
| } catch (JsonMappingException e) { |
| throw new SqlClientException("Could not parse environment file. Cause: " + e.getMessage()); |
| } |
| } |
| |
| /** |
| * Parses an environment file from an String. |
| */ |
| public static Environment parse(String content) throws IOException { |
| try { |
| Environment env = new ConfigUtil.LowerCaseYamlMapper().readValue(content, Environment.class); |
| validateEnvironment(env); |
| return env; |
| } catch (JsonMappingException e) { |
| throw new SqlClientException("Could not parse environment file. Cause: " + e.getMessage()); |
| } |
| } |
| |
| /** |
| * check whether the environment file is valid. |
| */ |
| public static void validateEnvironment(Environment env) { |
| if (env.tables.size() < 1) { |
| return; |
| } |
| boolean isHiveCatalogDefault = env.catalogs.values().stream().filter(c -> |
| c.isDefaultCatalog() |
| && c.getType().isPresent() |
| && c.getType().get().equals(CatalogType.hive.name())).count() > 0; |
| if (isHiveCatalogDefault) { |
| throw new SqlClientException("Flink table cannot be registered into Hive catalog. " |
| + "Please set a flink_in_memory catalog as default catalog, " |
| + "or set none catalog as default in yaml config file"); |
| } |
| } |
| |
| /** |
| * Merges two environments. The properties of the first environment might be overwritten by the second one. |
| */ |
| public static Environment merge(Environment env1, Environment env2) { |
| final Environment mergedEnv = new Environment(); |
| |
| // merge tables |
| final Map<String, TableEntry> tables = new LinkedHashMap<>(env1.getTables()); |
| tables.putAll(env2.getTables()); |
| mergedEnv.tables = tables; |
| |
| // merge functions |
| final Map<String, FunctionEntry> functions = new HashMap<>(env1.getFunctions()); |
| functions.putAll(env2.getFunctions()); |
| mergedEnv.functions = functions; |
| |
| // merge execution properties |
| mergedEnv.execution = ExecutionEntry.merge(env1.getExecution(), env2.getExecution()); |
| |
| // merge deployment properties |
| mergedEnv.deployment = DeploymentEntry.merge(env1.getDeployment(), env2.getDeployment()); |
| |
| Map<String, CatalogEntry> catalogs = new HashMap<>(); |
| catalogs.putAll(env1.getCatalogs()); |
| catalogs.putAll(env2.getCatalogs()); |
| |
| mergedEnv.catalogs = catalogs; |
| |
| validateEnvironment(mergedEnv); |
| |
| return mergedEnv; |
| } |
| |
| /** |
| * Enriches an environment with new/modified properties or views and returns the new instance. |
| */ |
| public static Environment enrich( |
| Environment env, |
| Map<String, String> properties, |
| Map<String, ViewEntry> views) { |
| final Environment enrichedEnv = new Environment(); |
| |
| // merge tables |
| enrichedEnv.tables = new LinkedHashMap<>(env.getTables()); |
| enrichedEnv.tables.putAll(views); |
| |
| // merge functions |
| enrichedEnv.functions = new HashMap<>(env.getFunctions()); |
| |
| // enrich execution properties |
| enrichedEnv.execution = ExecutionEntry.enrich(env.execution, properties); |
| |
| // enrich deployment properties |
| enrichedEnv.deployment = DeploymentEntry.enrich(env.deployment, properties); |
| |
| validateEnvironment(enrichedEnv); |
| |
| return enrichedEnv; |
| } |
| } |