blob: 565a8a7c4c6bc85e6b59d96cf5d4d464cb985428 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.configuration;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig.ClosureCleanerLevel;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.TextElement;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.code;
/** The {@link ConfigOption configuration options} for job execution. */
@PublicEvolving
public class PipelineOptions {
/** The job name used for printing and logging. */
public static final ConfigOption<String> NAME =
key("pipeline.name")
.stringType()
.noDefaultValue()
.withDescription("The job name used for printing and logging.");
/**
* A list of jar files that contain the user-defined function (UDF) classes and all classes used
* from within the UDFs.
*/
public static final ConfigOption<List<String>> JARS =
key("pipeline.jars")
.stringType()
.asList()
.noDefaultValue()
.withDescription(
"A semicolon-separated list of the jars to package with the job jars to be sent to the"
+ " cluster. These have to be valid paths.");
/**
* A list of URLs that are added to the classpath of each user code classloader of the program.
* Paths must specify a protocol (e.g. file://) and be accessible on all nodes
*/
public static final ConfigOption<List<String>> CLASSPATHS =
key("pipeline.classpaths")
.stringType()
.asList()
.noDefaultValue()
.withDescription(
"A semicolon-separated list of the classpaths to package with the job jars to be sent to"
+ " the cluster. These have to be valid URLs.");
public static final ConfigOption<Boolean> AUTO_GENERATE_UIDS =
key("pipeline.auto-generate-uids")
.booleanType()
.defaultValue(true)
.withDescription(
Description.builder()
.text(
"When auto-generated UIDs are disabled, users are forced to manually specify UIDs on DataStream applications.")
.linebreak()
.linebreak()
.text(
"It is highly recommended that users specify UIDs before deploying to"
+ " production since they are used to match state in savepoints to operators"
+ " in a job. Because auto-generated ID's are likely to change when modifying"
+ " a job, specifying custom IDs allow an application to evolve over time"
+ " without discarding state.")
.build());
public static final ConfigOption<Boolean> AUTO_TYPE_REGISTRATION =
key("pipeline.auto-type-registration")
.booleanType()
.defaultValue(true)
.withDescription(
"Controls whether Flink is automatically registering all types in the user programs"
+ " with Kryo.");
public static final ConfigOption<Duration> AUTO_WATERMARK_INTERVAL =
key("pipeline.auto-watermark-interval")
.durationType()
.defaultValue(Duration.ZERO)
.withDescription(
"The interval of the automatic watermark emission. Watermarks are used throughout"
+ " the streaming system to keep track of the progress of time. They are used, for example,"
+ " for time based windowing.");
public static final ConfigOption<ClosureCleanerLevel> CLOSURE_CLEANER_LEVEL =
key("pipeline.closure-cleaner-level")
.enumType(ClosureCleanerLevel.class)
.defaultValue(ClosureCleanerLevel.RECURSIVE)
.withDescription("Configures the mode in which the closure cleaner works.");
public static final ConfigOption<Boolean> FORCE_AVRO =
key("pipeline.force-avro")
.booleanType()
.defaultValue(false)
.withDescription(
Description.builder()
.text(
"Forces Flink to use the Apache Avro serializer for POJOs.")
.linebreak()
.linebreak()
.text(
"Important: Make sure to include the %s module.",
code("flink-avro"))
.build());
public static final ConfigOption<Boolean> FORCE_KRYO =
key("pipeline.force-kryo")
.booleanType()
.defaultValue(false)
.withDescription(
"If enabled, forces TypeExtractor to use Kryo serializer for POJOS even though we could"
+ " analyze as POJO. In some cases this might be preferable. For example, when using interfaces"
+ " with subclasses that cannot be analyzed as POJO.");
public static final ConfigOption<Boolean> GENERIC_TYPES =
key("pipeline.generic-types")
.booleanType()
.defaultValue(true)
.withDescription(
Description.builder()
.text(
"If the use of generic types is disabled, Flink will throw an %s whenever it encounters"
+ " a data type that would go through Kryo for serialization.",
code("UnsupportedOperationException"))
.linebreak()
.linebreak()
.text(
"Disabling generic types can be helpful to eagerly find and eliminate the use of types"
+ " that would go through Kryo serialization during runtime. Rather than checking types"
+ " individually, using this option will throw exceptions eagerly in the places where generic"
+ " types are used.")
.linebreak()
.linebreak()
.text(
"We recommend to use this option only during development and pre-production"
+ " phases, not during actual production use. The application program and/or the input data may be"
+ " such that new, previously unseen, types occur at some point. In that case, setting this option"
+ " would cause the program to fail.")
.build());
public static final ConfigOption<Map<String, String>> GLOBAL_JOB_PARAMETERS =
key("pipeline.global-job-parameters")
.mapType()
.noDefaultValue()
.withDescription(
"Register a custom, serializable user configuration object. The configuration can be "
+ " accessed in operators");
public static final ConfigOption<Integer> MAX_PARALLELISM =
key("pipeline.max-parallelism")
.intType()
.defaultValue(-1)
.withDescription(
"The program-wide maximum parallelism used for operators which haven't specified a"
+ " maximum parallelism. The maximum parallelism specifies the upper limit for dynamic scaling and"
+ " the number of key groups used for partitioned state.");
public static final ConfigOption<Boolean> OBJECT_REUSE =
key("pipeline.object-reuse")
.booleanType()
.defaultValue(false)
.withDescription(
"When enabled objects that Flink internally uses for deserialization and passing"
+ " data to user-code functions will be reused. Keep in mind that this can lead to bugs when the"
+ " user-code function of an operation is not aware of this behaviour.");
public static final ConfigOption<List<String>> KRYO_DEFAULT_SERIALIZERS =
key("pipeline.default-kryo-serializers")
.stringType()
.asList()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Semicolon separated list of pairs of class names and Kryo serializers class names to be used"
+ " as Kryo default serializers")
.linebreak()
.linebreak()
.text("Example:")
.linebreak()
.add(
TextElement.code(
"class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;"
+ " class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2"))
.build());
public static final ConfigOption<List<String>> KRYO_REGISTERED_CLASSES =
key("pipeline.registered-kryo-types")
.stringType()
.asList()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Semicolon separated list of types to be registered with the serialization stack. If the type"
+ " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the"
+ " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
+ " sure that only tags are written.")
.build());
public static final ConfigOption<List<String>> POJO_REGISTERED_CLASSES =
key("pipeline.registered-pojo-types")
.stringType()
.asList()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Semicolon separated list of types to be registered with the serialization stack. If the type"
+ " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the"
+ " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
+ " sure that only tags are written.")
.build());
public static final ConfigOption<Boolean> OPERATOR_CHAINING =
key("pipeline.operator-chaining")
.booleanType()
.defaultValue(true)
.withDescription(
"Operator chaining allows non-shuffle operations to be co-located in the same thread "
+ "fully avoiding serialization and de-serialization.");
public static final ConfigOption<List<String>> CACHED_FILES =
key("pipeline.cached-files")
.stringType()
.asList()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Files to be registered at the distributed cache under the given name. The files will be "
+ "accessible from any user-defined function in the (distributed) runtime under a local path. "
+ "Files may be local files (which will be distributed via BlobServer), or files in a distributed "
+ "file system. The runtime will copy the files temporarily to a local cache, if needed.")
.linebreak()
.linebreak()
.text("Example:")
.linebreak()
.add(
TextElement.code(
"name:file1,path:`file:///tmp/file1`;name:file2,path:`hdfs:///tmp/file2`"))
.build());
}