| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.configuration; |
| |
| import org.apache.flink.annotation.PublicEvolving; |
| import org.apache.flink.annotation.docs.ConfigGroup; |
| import org.apache.flink.annotation.docs.ConfigGroups; |
| import org.apache.flink.annotation.docs.Documentation; |
| import org.apache.flink.configuration.description.Description; |
| |
| import org.apache.flink.shaded.guava18.com.google.common.base.Splitter; |
| import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; |
| |
| import static org.apache.flink.configuration.ConfigOptions.key; |
| |
| /** The set of configuration options for core parameters. */ |
| @PublicEvolving |
| @ConfigGroups(groups = {@ConfigGroup(name = "Environment", keyPrefix = "env")}) |
| public class CoreOptions { |
| |
| // ------------------------------------------------------------------------ |
| // Classloading Parameters |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Defines the class resolution strategy when loading classes from user code, meaning whether to |
| * first check the user code jar ({@code "child-first"}) or the application classpath ({@code |
| * "parent-first"}) |
| * |
| * <p>The default settings indicate to load classes first from the user code jar, which means |
| * that user code jars can include and load different dependencies than Flink uses |
| * (transitively). |
| * |
| * <p>Exceptions to the rules are defined via {@link #ALWAYS_PARENT_FIRST_LOADER_PATTERNS}. |
| */ |
| @Documentation.Section(Documentation.Sections.EXPERT_CLASS_LOADING) |
| public static final ConfigOption<String> CLASSLOADER_RESOLVE_ORDER = |
| ConfigOptions.key("classloader.resolve-order") |
| .defaultValue("child-first") |
| .withDescription( |
| "Defines the class resolution strategy when loading classes from user code, meaning whether to" |
| + " first check the user code jar (\"child-first\") or the application classpath (\"parent-first\")." |
| + " The default settings indicate to load classes first from the user code jar, which means that user code" |
| + " jars can include and load different dependencies than Flink uses (transitively)."); |
| |
| /** |
| * The namespace patterns for classes that are loaded with a preference from the parent |
| * classloader, meaning the application class path, rather than any user code jar file. This |
| * option only has an effect when {@link #CLASSLOADER_RESOLVE_ORDER} is set to {@code |
| * "child-first"}. |
| * |
| * <p>It is important that all classes whose objects move between Flink's runtime and any user |
| * code (including Flink connectors that run as part of the user code) are covered by these |
| * patterns here. Otherwise it is be possible that the Flink runtime and the user code load two |
| * different copies of a class through the different class loaders. That leads to errors like "X |
| * cannot be cast to X" exceptions, where both class names are equal, or "X cannot be assigned |
| * to Y", where X should be a subclass of Y. |
| * |
| * <p>The following classes are loaded parent-first, to avoid any duplication: |
| * |
| * <ul> |
| * <li>All core Java classes (java.*), because they must never be duplicated. |
| * <li>All core Scala classes (scala.*). Currently Scala is used in the Flink runtime and in |
| * the user code, and some Scala classes cross the boundary, such as the <i>FunctionX</i> |
| * classes. That may change if Scala eventually lives purely as part of the user code. |
| * <li>All Flink classes (org.apache.flink.*). Note that this means that connectors and |
| * formats (flink-avro, etc) are loaded parent-first as well if they are in the core |
| * classpath. |
| * <li>Java annotations and loggers, defined by the following list: |
| * javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback. |
| * This is done for convenience, to avoid duplication of annotations and multiple log |
| * bindings. |
| * </ul> |
| */ |
| @Documentation.Section(Documentation.Sections.EXPERT_CLASS_LOADING) |
| public static final ConfigOption<String> ALWAYS_PARENT_FIRST_LOADER_PATTERNS = |
| ConfigOptions.key("classloader.parent-first-patterns.default") |
| .defaultValue( |
| "java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback;org.xml;javax.xml;org.apache.xerces;org.w3c") |
| .withDeprecatedKeys("classloader.parent-first-patterns") |
| .withDescription( |
| "A (semicolon-separated) list of patterns that specifies which classes should always be" |
| + " resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against" |
| + " the fully qualified class name. This setting should generally not be modified. To add another pattern we" |
| + " recommend to use \"classloader.parent-first-patterns.additional\" instead."); |
| |
| @Documentation.Section(Documentation.Sections.EXPERT_CLASS_LOADING) |
| public static final ConfigOption<String> ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL = |
| ConfigOptions.key("classloader.parent-first-patterns.additional") |
| .defaultValue("") |
| .withDescription( |
| "A (semicolon-separated) list of patterns that specifies which classes should always be" |
| + " resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against" |
| + " the fully qualified class name. These patterns are appended to \"" |
| + ALWAYS_PARENT_FIRST_LOADER_PATTERNS.key() |
| + "\"."); |
| |
| @Documentation.Section(Documentation.Sections.EXPERT_CLASS_LOADING) |
| public static final ConfigOption<Boolean> FAIL_ON_USER_CLASS_LOADING_METASPACE_OOM = |
| ConfigOptions.key("classloader.fail-on-metaspace-oom-error") |
| .booleanType() |
| .defaultValue(true) |
| .withDescription( |
| "Fail Flink JVM processes if 'OutOfMemoryError: Metaspace' is " |
| + "thrown while trying to load a user code class."); |
| |
| public static String[] getParentFirstLoaderPatterns(Configuration config) { |
| String base = config.getString(ALWAYS_PARENT_FIRST_LOADER_PATTERNS); |
| String append = config.getString(ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL); |
| return parseParentFirstLoaderPatterns(base, append); |
| } |
| |
| /** |
| * Plugin-specific option of {@link #ALWAYS_PARENT_FIRST_LOADER_PATTERNS}. Plugins use this |
| * parent first list instead of the global version. |
| */ |
| @Documentation.ExcludeFromDocumentation( |
| "Plugin classloader list is considered an implementation detail. " |
| + "Configuration only included in case to mitigate unintended side-effects of this young feature.") |
| public static final ConfigOption<String> PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS = |
| ConfigOptions.key("plugin.classloader.parent-first-patterns.default") |
| .stringType() |
| .defaultValue( |
| "java.;scala.;org.apache.flink.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache" |
| + ".logging;org.apache.commons.logging;ch.qos.logback") |
| .withDescription( |
| "A (semicolon-separated) list of patterns that specifies which classes should always be" |
| + " resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked " |
| + " against the fully qualified class name. This setting should generally not be modified. To add another " |
| + " pattern we recommend to use \"plugin.classloader.parent-first-patterns.additional\" instead."); |
| |
| @Documentation.ExcludeFromDocumentation( |
| "Plugin classloader list is considered an implementation detail. " |
| + "Configuration only included in case to mitigate unintended side-effects of this young feature.") |
| public static final ConfigOption<String> PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL = |
| ConfigOptions.key("plugin.classloader.parent-first-patterns.additional") |
| .stringType() |
| .defaultValue("") |
| .withDescription( |
| "A (semicolon-separated) list of patterns that specifies which classes should always be" |
| + " resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked " |
| + " against the fully qualified class name. These patterns are appended to \"" |
| + PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS.key() |
| + "\"."); |
| |
| public static String[] getPluginParentFirstLoaderPatterns(Configuration config) { |
| String base = config.getString(PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS); |
| String append = config.getString(PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL); |
| return parseParentFirstLoaderPatterns(base, append); |
| } |
| |
| private static String[] parseParentFirstLoaderPatterns(String base, String append) { |
| Splitter splitter = Splitter.on(';').omitEmptyStrings(); |
| return Iterables.toArray( |
| Iterables.concat(splitter.split(base), splitter.split(append)), String.class); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // process parameters |
| // ------------------------------------------------------------------------ |
| |
| public static final ConfigOption<String> FLINK_JVM_OPTIONS = |
| ConfigOptions.key("env.java.opts") |
| .stringType() |
| .defaultValue("") |
| .withDescription( |
| Description.builder() |
| .text( |
| "Java options to start the JVM of all Flink processes with.") |
| .build()); |
| |
| public static final ConfigOption<String> FLINK_JM_JVM_OPTIONS = |
| ConfigOptions.key("env.java.opts.jobmanager") |
| .stringType() |
| .defaultValue("") |
| .withDescription( |
| Description.builder() |
| .text("Java options to start the JVM of the JobManager with.") |
| .build()); |
| |
| public static final ConfigOption<String> FLINK_TM_JVM_OPTIONS = |
| ConfigOptions.key("env.java.opts.taskmanager") |
| .stringType() |
| .defaultValue("") |
| .withDescription( |
| Description.builder() |
| .text("Java options to start the JVM of the TaskManager with.") |
| .build()); |
| |
| public static final ConfigOption<String> FLINK_HS_JVM_OPTIONS = |
| ConfigOptions.key("env.java.opts.historyserver") |
| .stringType() |
| .defaultValue("") |
| .withDescription( |
| Description.builder() |
| .text( |
| "Java options to start the JVM of the HistoryServer with.") |
| .build()); |
| |
| public static final ConfigOption<String> FLINK_CLI_JVM_OPTIONS = |
| ConfigOptions.key("env.java.opts.client") |
| .stringType() |
| .defaultValue("") |
| .withDescription( |
| Description.builder() |
| .text("Java options to start the JVM of the Flink Client with.") |
| .build()); |
| |
| /** |
| * This options is here only for documentation generation, it is only evaluated in the shell |
| * scripts. |
| */ |
| @SuppressWarnings("unused") |
| public static final ConfigOption<String> FLINK_LOG_DIR = |
| ConfigOptions.key("env.log.dir") |
| .noDefaultValue() |
| .withDescription( |
| "Defines the directory where the Flink logs are saved. It has to be an absolute path." |
| + " (Defaults to the log directory under Flinkās home)"); |
| |
| /** |
| * This options is here only for documentation generation, it is only evaluated in the shell |
| * scripts. |
| */ |
| @SuppressWarnings("unused") |
| public static final ConfigOption<Integer> FLINK_LOG_MAX = |
| ConfigOptions.key("env.log.max") |
| .defaultValue(5) |
| .withDescription("The maximum number of old log files to keep."); |
| |
| /** |
| * This options is here only for documentation generation, it is only evaluated in the shell |
| * scripts. |
| */ |
| @SuppressWarnings("unused") |
| public static final ConfigOption<String> FLINK_SSH_OPTIONS = |
| ConfigOptions.key("env.ssh.opts") |
| .noDefaultValue() |
| .withDescription( |
| "Additional command line options passed to SSH clients when starting or stopping JobManager," |
| + " TaskManager, and Zookeeper services (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh," |
| + " stop-zookeeper-quorum.sh)."); |
| |
| /** |
| * This options is here only for documentation generation, it is only evaluated in the shell |
| * scripts. |
| */ |
| @SuppressWarnings("unused") |
| public static final ConfigOption<String> FLINK_HADOOP_CONF_DIR = |
| ConfigOptions.key("env.hadoop.conf.dir") |
| .noDefaultValue() |
| .withDescription( |
| "Path to hadoop configuration directory. It is required to read HDFS and/or YARN" |
| + " configuration. You can also set it via environment variable."); |
| |
| /** |
| * This options is here only for documentation generation, it is only evaluated in the shell |
| * scripts. |
| */ |
| @SuppressWarnings("unused") |
| public static final ConfigOption<String> FLINK_YARN_CONF_DIR = |
| ConfigOptions.key("env.yarn.conf.dir") |
| .noDefaultValue() |
| .withDescription( |
| "Path to yarn configuration directory. It is required to run flink on YARN. You can also" |
| + " set it via environment variable."); |
| |
| // ------------------------------------------------------------------------ |
| // generic io |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * The config parameter defining the directories for temporary files, separated by ",", "|", or |
| * the system's {@link java.io.File#pathSeparator}. |
| */ |
| @Documentation.OverrideDefault( |
| "'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty(\"java.io.tmpdir\") in standalone.") |
| @Documentation.Section(Documentation.Sections.COMMON_MISCELLANEOUS) |
| public static final ConfigOption<String> TMP_DIRS = |
| key("io.tmp.dirs") |
| .defaultValue(System.getProperty("java.io.tmpdir")) |
| .withDeprecatedKeys("taskmanager.tmp.dirs") |
| .withDescription( |
| "Directories for temporary files, separated by\",\", \"|\", or the system's java.io.File.pathSeparator."); |
| |
| // ------------------------------------------------------------------------ |
| // program |
| // ------------------------------------------------------------------------ |
| |
| public static final ConfigOption<Integer> DEFAULT_PARALLELISM = |
| ConfigOptions.key("parallelism.default") |
| .defaultValue(1) |
| .withDescription("Default parallelism for jobs."); |
| |
| // ------------------------------------------------------------------------ |
| // file systems |
| // ------------------------------------------------------------------------ |
| |
| /** The default filesystem scheme, used for paths that do not declare a scheme explicitly. */ |
| @Documentation.Section(Documentation.Sections.COMMON_MISCELLANEOUS) |
| public static final ConfigOption<String> DEFAULT_FILESYSTEM_SCHEME = |
| ConfigOptions.key("fs.default-scheme") |
| .noDefaultValue() |
| .withDescription( |
| "The default filesystem scheme, used for paths that do not declare a scheme explicitly." |
| + " May contain an authority, e.g. host:port in case of an HDFS NameNode."); |
| |
| @Documentation.Section(Documentation.Sections.COMMON_MISCELLANEOUS) |
| public static final ConfigOption<String> ALLOWED_FALLBACK_FILESYSTEMS = |
| ConfigOptions.key("fs.allowed-fallback-filesystems") |
| .stringType() |
| .defaultValue("") |
| .withDescription( |
| "A (semicolon-separated) list of file schemes, for which Hadoop can be used instead " |
| + "of an appropriate Flink plugin. (example: s3;wasb)"); |
| |
| /** Specifies whether file output writers should overwrite existing files by default. */ |
| @Documentation.Section(Documentation.Sections.DEPRECATED_FILE_SINKS) |
| public static final ConfigOption<Boolean> FILESYTEM_DEFAULT_OVERRIDE = |
| key("fs.overwrite-files") |
| .defaultValue(false) |
| .withDescription( |
| "Specifies whether file output writers should overwrite existing files by default. Set to" |
| + " \"true\" to overwrite by default,\"false\" otherwise."); |
| |
| /** |
| * Specifies whether the file systems should always create a directory for the output, even with |
| * a parallelism of one. |
| */ |
| @Documentation.Section(Documentation.Sections.DEPRECATED_FILE_SINKS) |
| public static final ConfigOption<Boolean> FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY = |
| key("fs.output.always-create-directory") |
| .defaultValue(false) |
| .withDescription( |
| "File writers running with a parallelism larger than one create a directory for the output" |
| + " file path and put the different result files (one per parallel writer task) into that directory." |
| + " If this option is set to \"true\", writers with a parallelism of 1 will also create a" |
| + " directory and place a single result file into it. If the option is set to \"false\"," |
| + " the writer will directly create the file directly at the output path, without creating a containing" |
| + " directory."); |
| |
| /** |
| * The total number of input plus output connections that a file system for the given scheme may |
| * open. Unlimited be default. |
| */ |
| public static ConfigOption<Integer> fileSystemConnectionLimit(String scheme) { |
| return ConfigOptions.key("fs." + scheme + ".limit.total").defaultValue(-1); |
| } |
| |
| /** |
| * The total number of input connections that a file system for the given scheme may open. |
| * Unlimited be default. |
| */ |
| public static ConfigOption<Integer> fileSystemConnectionLimitIn(String scheme) { |
| return ConfigOptions.key("fs." + scheme + ".limit.input").defaultValue(-1); |
| } |
| |
| /** |
| * The total number of output connections that a file system for the given scheme may open. |
| * Unlimited be default. |
| */ |
| public static ConfigOption<Integer> fileSystemConnectionLimitOut(String scheme) { |
| return ConfigOptions.key("fs." + scheme + ".limit.output").defaultValue(-1); |
| } |
| |
| /** |
| * If any connection limit is configured, this option can be optionally set to define after |
| * which time (in milliseconds) stream opening fails with a timeout exception, if no stream |
| * connection becomes available. Unlimited timeout be default. |
| */ |
| public static ConfigOption<Long> fileSystemConnectionLimitTimeout(String scheme) { |
| return ConfigOptions.key("fs." + scheme + ".limit.timeout").defaultValue(0L); |
| } |
| |
| /** |
| * If any connection limit is configured, this option can be optionally set to define after |
| * which time (in milliseconds) inactive streams are reclaimed. This option can help to prevent |
| * that inactive streams make up the full pool of limited connections, and no further |
| * connections can be established. Unlimited timeout be default. |
| */ |
| public static ConfigOption<Long> fileSystemConnectionLimitStreamInactivityTimeout( |
| String scheme) { |
| return ConfigOptions.key("fs." + scheme + ".limit.stream-timeout").defaultValue(0L); |
| } |
| } |