| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.configuration; |
| |
| import org.apache.flink.annotation.PublicEvolving; |
| import org.apache.flink.annotation.docs.ConfigGroup; |
| import org.apache.flink.annotation.docs.ConfigGroups; |
| import org.apache.flink.annotation.docs.Documentation; |
| |
| import static org.apache.flink.configuration.ConfigOptions.key; |
| |
| /** |
| * The set of configuration options for core parameters. |
| */ |
| @PublicEvolving |
| @ConfigGroups(groups = { |
| @ConfigGroup(name = "Environment", keyPrefix = "env"), |
| @ConfigGroup(name = "FileSystem", keyPrefix = "fs") |
| }) |
| public class CoreOptions { |
| |
| // ------------------------------------------------------------------------ |
| // Classloading Parameters |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Defines the class resolution strategy when loading classes from user code, |
| * meaning whether to first check the user code jar ({@code "child-first"}) or |
| * the application classpath ({@code "parent-first"}) |
| * |
| * <p>The default settings indicate to load classes first from the user code jar, |
| * which means that user code jars can include and load different dependencies than |
| * Flink uses (transitively). |
| * |
| * <p>Exceptions to the rules are defined via {@link #ALWAYS_PARENT_FIRST_LOADER_PATTERNS}. |
| */ |
| public static final ConfigOption<String> CLASSLOADER_RESOLVE_ORDER = ConfigOptions |
| .key("classloader.resolve-order") |
| .defaultValue("child-first") |
| .withDescription("Defines the class resolution strategy when loading classes from user code, meaning whether to" + |
| " first check the user code jar (\"child-first\") or the application classpath (\"parent-first\")." + |
| " The default settings indicate to load classes first from the user code jar, which means that user code" + |
| " jars can include and load different dependencies than Flink uses (transitively)."); |
| |
| /** |
| * The namespace patterns for classes that are loaded with a preference from the |
| * parent classloader, meaning the application class path, rather than any user code |
| * jar file. This option only has an effect when {@link #CLASSLOADER_RESOLVE_ORDER} is |
| * set to {@code "child-first"}. |
| * |
| * <p>It is important that all classes whose objects move between Flink's runtime and |
| * any user code (including Flink connectors that run as part of the user code) are |
| * covered by these patterns here. Otherwise it is be possible that the Flink runtime |
| * and the user code load two different copies of a class through the different class |
| * loaders. That leads to errors like "X cannot be cast to X" exceptions, where both |
| * class names are equal, or "X cannot be assigned to Y", where X should be a subclass |
| * of Y. |
| * |
| * <p>The following classes are loaded parent-first, to avoid any duplication: |
| * <ul> |
| * <li>All core Java classes (java.*), because they must never be duplicated.</li> |
| * <li>All core Scala classes (scala.*). Currently Scala is used in the Flink |
| * runtime and in the user code, and some Scala classes cross the boundary, |
| * such as the <i>FunctionX</i> classes. That may change if Scala eventually |
| * lives purely as part of the user code.</li> |
| * <li>All Flink classes (org.apache.flink.*). Note that this means that connectors |
| * and formats (flink-avro, etc) are loaded parent-first as well if they are in the |
| * core classpath.</li> |
| * <li>Java annotations and loggers, defined by the following list: |
| * javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback. |
| * This is done for convenience, to avoid duplication of annotations and multiple |
| * log bindings.</li> |
| * </ul> |
| */ |
| public static final ConfigOption<String> ALWAYS_PARENT_FIRST_LOADER_PATTERNS = ConfigOptions |
| .key("classloader.parent-first-patterns.default") |
| .defaultValue("java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback") |
| .withDeprecatedKeys("classloader.parent-first-patterns") |
| .withDescription("A (semicolon-separated) list of patterns that specifies which classes should always be" + |
| " resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against" + |
| " the fully qualified class name. This setting should generally not be modified. To add another pattern we" + |
| " recommend to use \"classloader.parent-first-patterns.additional\" instead."); |
| |
| public static final ConfigOption<String> ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL = ConfigOptions |
| .key("classloader.parent-first-patterns.additional") |
| .defaultValue("") |
| .withDescription("A (semicolon-separated) list of patterns that specifies which classes should always be" + |
| " resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against" + |
| " the fully qualified class name. These patterns are appended to \"" + ALWAYS_PARENT_FIRST_LOADER_PATTERNS.key() + "\"."); |
| |
| public static String[] getParentFirstLoaderPatterns(Configuration config) { |
| String base = config.getString(ALWAYS_PARENT_FIRST_LOADER_PATTERNS); |
| String append = config.getString(ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL); |
| |
| String[] basePatterns = base.isEmpty() |
| ? new String[0] |
| : base.split(";"); |
| |
| if (append.isEmpty()) { |
| return basePatterns; |
| } else { |
| String[] appendPatterns = append.split(";"); |
| |
| String[] joinedPatterns = new String[basePatterns.length + appendPatterns.length]; |
| System.arraycopy(basePatterns, 0, joinedPatterns, 0, basePatterns.length); |
| System.arraycopy(appendPatterns, 0, joinedPatterns, basePatterns.length, appendPatterns.length); |
| return joinedPatterns; |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // process parameters |
| // ------------------------------------------------------------------------ |
| |
| public static final ConfigOption<String> FLINK_JVM_OPTIONS = ConfigOptions |
| .key("env.java.opts") |
| .defaultValue(""); |
| |
| public static final ConfigOption<String> FLINK_JM_JVM_OPTIONS = ConfigOptions |
| .key("env.java.opts.jobmanager") |
| .defaultValue(""); |
| |
| public static final ConfigOption<String> FLINK_TM_JVM_OPTIONS = ConfigOptions |
| .key("env.java.opts.taskmanager") |
| .defaultValue(""); |
| |
| /** |
| * This options is here only for documentation generation, it is only |
| * evaluated in the shell scripts. |
| */ |
| @SuppressWarnings("unused") |
| public static final ConfigOption<String> FLINK_LOG_DIR = ConfigOptions |
| .key("env.log.dir") |
| .noDefaultValue() |
| .withDescription("Defines the directory where the Flink logs are saved. It has to be an absolute path." + |
| " (Defaults to the log directory under Flinkās home)"); |
| |
| /** |
| * This options is here only for documentation generation, it is only |
| * evaluated in the shell scripts. |
| */ |
| @SuppressWarnings("unused") |
| public static final ConfigOption<Integer> FLINK_LOG_MAX = ConfigOptions |
| .key("env.log.max") |
| .defaultValue(5) |
| .withDescription("The maximum number of old log files to keep."); |
| |
| /** |
| * This options is here only for documentation generation, it is only |
| * evaluated in the shell scripts. |
| */ |
| @SuppressWarnings("unused") |
| public static final ConfigOption<String> FLINK_SSH_OPTIONS = ConfigOptions |
| .key("env.ssh.opts") |
| .noDefaultValue() |
| .withDescription("Additional command line options passed to SSH clients when starting or stopping JobManager," + |
| " TaskManager, and Zookeeper services (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh," + |
| " stop-zookeeper-quorum.sh)."); |
| |
| // ------------------------------------------------------------------------ |
| // generic io |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * The config parameter defining the directories for temporary files, separated by |
| * ",", "|", or the system's {@link java.io.File#pathSeparator}. |
| */ |
| @Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")") |
| public static final ConfigOption<String> TMP_DIRS = |
| key("io.tmp.dirs") |
| .defaultValue(System.getProperty("java.io.tmpdir")) |
| .withDeprecatedKeys("taskmanager.tmp.dirs"); |
| |
| // ------------------------------------------------------------------------ |
| // program |
| // ------------------------------------------------------------------------ |
| |
| public static final ConfigOption<Integer> DEFAULT_PARALLELISM = ConfigOptions |
| .key("parallelism.default") |
| .defaultValue(1); |
| |
| /** |
| * The default stream partitioner, used when the upstream and downstream parallelisms are not equal |
| * and partitioner is not specified. |
| */ |
| public static final ConfigOption<String> DEFAULT_PARTITIONER = ConfigOptions |
| .key("partitioner.default") |
| .defaultValue("REBALANCE") |
| .withDescription("The default stream partitioner, used when the upstream and downstream parallelisms are " + |
| "not equal and partitioner is not specified. Possible values are 'RESCALE' and 'REBALANCE'."); |
| |
| public static final ConfigOption<Boolean> CHAIN_EAGERLY_ENABLED = ConfigOptions |
| .key("chain.eagerly.enabled") |
| .defaultValue(false) |
| .withDescription("Whether operators are chained more eagerly when the parallelism is one"); |
| |
| /** |
| * The default cpu cores, used when the default resources of operators need to be set but without |
| * the default resource settings for the job. |
| */ |
| public static final ConfigOption<Double> DEFAULT_RESOURCE_CPU_CORES = ConfigOptions |
| .key("resource.cpu.cores.default") |
| .defaultValue(0.01) |
| .withDescription("CPU cores for operators, use double so we can specify cpu like 0.1."); |
| |
| /** |
| * The default heap size, used when the default resources of operators need to be set but without |
| * the default resource settings for the job. |
| */ |
| public static final ConfigOption<Integer> DEFAULT_RESOURCE_HEAP_MEMORY = ConfigOptions |
| .key("resource.heap.mb.default") |
| .defaultValue(16) |
| .withDescription("Java heap size (in megabytes) for operators."); |
| |
| // ------------------------------------------------------------------------ |
| // file systems |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * The default filesystem scheme, used for paths that do not declare a scheme explicitly. |
| */ |
| public static final ConfigOption<String> DEFAULT_FILESYSTEM_SCHEME = ConfigOptions |
| .key("fs.default-scheme") |
| .noDefaultValue() |
| .withDescription("The default filesystem scheme, used for paths that do not declare a scheme explicitly." + |
| " May contain an authority, e.g. host:port in case of a HDFS NameNode."); |
| |
| /** |
| * Specifies whether file output writers should overwrite existing files by default. |
| */ |
| public static final ConfigOption<Boolean> FILESYTEM_DEFAULT_OVERRIDE = |
| key("fs.overwrite-files") |
| .defaultValue(false) |
| .withDescription("Specifies whether file output writers should overwrite existing files by default. Set to" + |
| " \"true\" to overwrite by default,\"false\" otherwise."); |
| |
| /** |
| * Specifies whether the file systems should always create a directory for the output, even with a parallelism of one. |
| */ |
| public static final ConfigOption<Boolean> FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY = |
| key("fs.output.always-create-directory") |
| .defaultValue(false) |
| .withDescription("File writers running with a parallelism larger than one create a directory for the output" + |
| " file path and put the different result files (one per parallel writer task) into that directory." + |
| " If this option is set to \"true\", writers with a parallelism of 1 will also create a" + |
| " directory and place a single result file into it. If the option is set to \"false\"," + |
| " the writer will directly create the file directly at the output path, without creating a containing" + |
| " directory."); |
| |
| /** |
| * The total number of input plus output connections that a file system for the given scheme may open. |
| * Unlimited be default. |
| */ |
| public static ConfigOption<Integer> fileSystemConnectionLimit(String scheme) { |
| return ConfigOptions.key("fs." + scheme + ".limit.total").defaultValue(-1); |
| } |
| |
| /** |
| * The total number of input connections that a file system for the given scheme may open. |
| * Unlimited be default. |
| */ |
| public static ConfigOption<Integer> fileSystemConnectionLimitIn(String scheme) { |
| return ConfigOptions.key("fs." + scheme + ".limit.input").defaultValue(-1); |
| } |
| |
| /** |
| * The total number of output connections that a file system for the given scheme may open. |
| * Unlimited be default. |
| */ |
| public static ConfigOption<Integer> fileSystemConnectionLimitOut(String scheme) { |
| return ConfigOptions.key("fs." + scheme + ".limit.output").defaultValue(-1); |
| } |
| |
| /** |
| * If any connection limit is configured, this option can be optionally set to define after |
| * which time (in milliseconds) stream opening fails with a timeout exception, if no stream |
| * connection becomes available. Unlimited timeout be default. |
| */ |
| public static ConfigOption<Long> fileSystemConnectionLimitTimeout(String scheme) { |
| return ConfigOptions.key("fs." + scheme + ".limit.timeout").defaultValue(0L); |
| } |
| |
| /** |
| * If any connection limit is configured, this option can be optionally set to define after |
| * which time (in milliseconds) inactive streams are reclaimed. This option can help to prevent |
| * that inactive streams make up the full pool of limited connections, and no further connections |
| * can be established. Unlimited timeout be default. |
| */ |
| public static ConfigOption<Long> fileSystemConnectionLimitStreamInactivityTimeout(String scheme) { |
| return ConfigOptions.key("fs." + scheme + ".limit.stream-timeout").defaultValue(0L); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Distributed architecture |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Constant value for the new execution mode. |
| */ |
| public static final String NEW_MODE = "new"; |
| |
| /** |
| * Constant value for the old execution mode. |
| */ |
| public static final String LEGACY_MODE = "legacy"; |
| |
| /** |
| * Switch to select the execution mode. Possible values are {@link CoreOptions#NEW_MODE} |
| * and {@link CoreOptions#LEGACY_MODE}. |
| */ |
| public static final ConfigOption<String> MODE = key("mode") |
| .defaultValue(NEW_MODE) |
| .withDescription("Switch to select the execution mode. Possible values are 'new' and 'legacy'."); |
| |
| /** |
| * Disable uploading the user jars. |
| * In sql cases the main class has been stored in job graph. |
| */ |
| public static final ConfigOption<Boolean> DISABLE_UPLOAD_USER_JARS = |
| key("user-jars.upload.disabled").defaultValue(false); |
| } |