blob: 6844ee4cd4840f2eb3670c41d438fb7d162de8a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.configuration;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import static org.apache.flink.configuration.ConfigOptions.key;
/**
* The set of configuration options relating to TaskManager and Task settings.
*/
@PublicEvolving
public class TaskManagerOptions {
// ------------------------------------------------------------------------
// General TaskManager Options
// ------------------------------------------------------------------------
/**
* JVM heap size for the TaskManagers with memory size.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
public static final ConfigOption<String> TASK_MANAGER_HEAP_MEMORY =
key("taskmanager.heap.size")
.defaultValue("1024m")
.withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" +
" the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
" YARN container, minus a certain tolerance value.");
/**
* JVM heap size (in megabytes) for the TaskManagers.
*
* @deprecated use {@link #TASK_MANAGER_HEAP_MEMORY}
*/
@Deprecated
public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY_MB =
key("taskmanager.heap.mb")
.defaultValue(1024)
.withDescription("JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of" +
" the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
" YARN container, minus a certain tolerance value.");
/**
* Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.
*/
public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
key("taskmanager.jvm-exit-on-oom")
.defaultValue(false)
.withDescription("Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.");
/**
* Whether the quarantine monitor for task managers shall be started. The quarantine monitor
* shuts down the actor system if it detects that it has quarantined another actor system
* or if it has been quarantined by another actor system.
*/
public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR =
key("taskmanager.exit-on-fatal-akka-error")
.defaultValue(false)
.withDescription("Whether the quarantine monitor for task managers shall be started. The quarantine monitor" +
" shuts down the actor system if it detects that it has quarantined another actor system" +
" or if it has been quarantined by another actor system.");
/**
* The config parameter defining the task manager's hostname.
*/
public static final ConfigOption<String> HOST =
key("taskmanager.host")
.noDefaultValue()
.withDescription("The hostname of the network interface that the TaskManager binds to. By default, the" +
" TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers." +
" This option can be used to define a hostname if that strategy fails for some reason. Because" +
" different TaskManagers need different values for this option, it usually is specified in an" +
" additional non-shared TaskManager-specific config file.");
/**
* The default network port range the task manager expects incoming IPC connections. The {@code "0"} means that
* the TaskManager searches for a free port.
*/
public static final ConfigOption<String> RPC_PORT =
key("taskmanager.rpc.port")
.defaultValue("0")
.withDescription("The task manager’s IPC port. Accepts a list of ports (“50100,50101”), ranges" +
" (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid" +
" collisions when multiple TaskManagers are running on the same machine.");
/**
* The default network port the task manager expects to receive transfer envelopes on. The {@code 0} means that
* the TaskManager searches for a free port.
*/
public static final ConfigOption<Integer> DATA_PORT =
key("taskmanager.data.port")
.defaultValue(0)
.withDescription("The task manager’s port used for data exchange operations.");
/**
* Config parameter to override SSL support for taskmanager's data transport.
*/
public static final ConfigOption<Boolean> DATA_SSL_ENABLED =
key("taskmanager.data.ssl.enabled")
.defaultValue(true)
.withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" +
" global flag for internal SSL (" + SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true");
/**
* The initial registration backoff between two consecutive registration attempts. The backoff
* is doubled for each new registration attempt until it reaches the maximum registration backoff.
*/
public static final ConfigOption<String> INITIAL_REGISTRATION_BACKOFF =
key("taskmanager.registration.initial-backoff")
.defaultValue("500 ms")
.withDeprecatedKeys("taskmanager.initial-registration-pause")
.withDescription("The initial registration backoff between two consecutive registration attempts. The backoff" +
" is doubled for each new registration attempt until it reaches the maximum registration backoff.");
/**
* The maximum registration backoff between two consecutive registration attempts.
*/
public static final ConfigOption<String> REGISTRATION_MAX_BACKOFF =
key("taskmanager.registration.max-backoff")
.defaultValue("30 s")
.withDeprecatedKeys("taskmanager.max-registration-pause")
.withDescription("The maximum registration backoff between two consecutive registration attempts. The max" +
" registration backoff requires a time unit specifier (ms/s/min/h/d).");
/**
* The backoff after a registration has been refused by the job manager before retrying to connect.
*/
public static final ConfigOption<String> REFUSED_REGISTRATION_BACKOFF =
key("taskmanager.registration.refused-backoff")
.defaultValue("10 s")
.withDeprecatedKeys("taskmanager.refused-registration-pause")
.withDescription("The backoff after a registration has been refused by the job manager before retrying to connect.");
/**
* Defines the timeout it can take for the TaskManager registration. If the duration is
* exceeded without a successful registration, then the TaskManager terminates.
*/
public static final ConfigOption<String> REGISTRATION_TIMEOUT =
key("taskmanager.registration.timeout")
.defaultValue("5 min")
.withDeprecatedKeys("taskmanager.maxRegistrationDuration")
.withDescription("Defines the timeout for the TaskManager registration. If the duration is" +
" exceeded without a successful registration, then the TaskManager terminates.");
/**
* The config parameter defining the number of task slots of a task manager.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_PARALLELISM_SLOTS)
public static final ConfigOption<Integer> NUM_TASK_SLOTS =
key("taskmanager.numberOfTaskSlots")
.defaultValue(1)
.withDescription("The number of parallel operator or user function instances that a single TaskManager can" +
" run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or" +
" operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the" +
" available memory is divided between the different operator or function instances. This value" +
" is typically proportional to the number of physical CPU cores that the TaskManager's machine has" +
" (e.g., equal to the number of cores, or half the number of cores).");
public static final ConfigOption<Boolean> DEBUG_MEMORY_LOG =
key("taskmanager.debug.memory.log")
.defaultValue(false)
.withDeprecatedKeys("taskmanager.debug.memory.startLogThread")
.withDescription("Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.");
public static final ConfigOption<Long> DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS =
key("taskmanager.debug.memory.log-interval")
.defaultValue(5000L)
.withDeprecatedKeys("taskmanager.debug.memory.logIntervalMs")
.withDescription("The interval (in ms) for the log thread to log the current memory usage.");
// ------------------------------------------------------------------------
// Managed Memory Options
// ------------------------------------------------------------------------
/**
* Size of memory buffers used by the network stack and the memory manager.
*/
public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
.defaultValue("32kb")
.withDescription("Size of memory buffers used by the network stack and the memory manager.");
/**
* Amount of memory to be allocated by the task manager's memory manager. If not
* set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
*/
public static final ConfigOption<String> MANAGED_MEMORY_SIZE =
key("taskmanager.memory.size")
.defaultValue("0")
.withDescription("Amount of memory to be allocated by the task manager's memory manager." +
" If not set, a relative fraction will be allocated.");
/**
* Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is
* not set.
*/
public static final ConfigOption<Float> MANAGED_MEMORY_FRACTION =
key("taskmanager.memory.fraction")
.defaultValue(0.7f)
.withDescription("The relative amount of memory (after subtracting the amount of memory used by network" +
" buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results." +
" For example, a value of `0.8` means that a task manager reserves 80% of its memory" +
" for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" +
" created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() +
" is not set.");
/**
* Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager
* as well as the network buffers.
**/
public static final ConfigOption<Boolean> MEMORY_OFF_HEAP =
key("taskmanager.memory.off-heap")
.defaultValue(false)
.withDescription("Memory allocation method (JVM heap or off-heap), used for managed memory of the" +
" TaskManager as well as the network buffers.");
/**
* Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.
*/
public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
key("taskmanager.memory.preallocate")
.defaultValue(false)
.withDescription("Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.");
// ------------------------------------------------------------------------
// Network Options
// ------------------------------------------------------------------------
/**
* Number of buffers used in the network stack. This defines the number of possible tasks and
* shuffles.
*
* @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
* and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
*/
@Deprecated
public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
key("taskmanager.network.numberOfBuffers")
.defaultValue(2048);
/**
* Fraction of JVM memory to use for network buffers.
*/
public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
key("taskmanager.network.memory.fraction")
.defaultValue(0.1f)
.withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
" data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
" are. If a job is rejected or you get a warning that the system has not enough buffers available," +
" increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
"` and \"taskmanager.network.memory.max\" may override this fraction.");
/**
* Minimum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
key("taskmanager.network.memory.min")
.defaultValue("64mb")
.withDescription("Minimum memory size for network buffers.");
/**
* Maximum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
key("taskmanager.network.memory.max")
.defaultValue("1gb")
.withDescription("Maximum memory size for network buffers.");
/**
* Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
*
* <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
*/
public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
key("taskmanager.network.memory.buffers-per-channel")
.defaultValue(2)
.withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
"In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
" configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
" for parallel serialization.");
/**
* Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
*/
public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
key("taskmanager.network.memory.floating-buffers-per-gate")
.defaultValue(8)
.withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
" In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
" The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
" help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
" increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");
/**
* Minimum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
key("taskmanager.network.request-backoff.initial")
.defaultValue(100)
.withDeprecatedKeys("taskmanager.net.request-backoff.initial")
.withDescription("Minimum backoff in milliseconds for partition requests of input channels.");
/**
* Maximum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
key("taskmanager.network.request-backoff.max")
.defaultValue(10000)
.withDeprecatedKeys("taskmanager.net.request-backoff.max")
.withDescription("Maximum backoff in milliseconds for partition requests of input channels.");
/**
* Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
* lengths.
*/
public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS =
key("taskmanager.network.detailed-metrics")
.defaultValue(false)
.withDescription("Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.");
/**
* Boolean flag to enable/disable network credit-based flow control.
*
* @deprecated Will be removed for Flink 1.6 when the old code will be dropped in favour of
* credit-based flow control.
*/
@Deprecated
public static final ConfigOption<Boolean> NETWORK_CREDIT_MODEL =
key("taskmanager.network.credit-model")
.defaultValue(true)
.withDeprecatedKeys("taskmanager.network.credit-based-flow-control.enabled")
.withDescription("Boolean flag to enable/disable network credit-based flow control.");
// ------------------------------------------------------------------------
// Task Options
// ------------------------------------------------------------------------
/**
* Time interval in milliseconds between two successive task cancellation
* attempts.
*/
public static final ConfigOption<Long> TASK_CANCELLATION_INTERVAL =
key("task.cancellation.interval")
.defaultValue(30000L)
.withDeprecatedKeys("task.cancellation-interval")
.withDescription("Time interval between two successive task cancellation attempts in milliseconds.");
/**
* Timeout in milliseconds after which a task cancellation times out and
* leads to a fatal TaskManager error. A value of <code>0</code> deactivates
* the watch dog.
*/
public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT =
key("task.cancellation.timeout")
.defaultValue(180000L)
.withDescription("Timeout in milliseconds after which a task cancellation times out and" +
" leads to a fatal TaskManager error. A value of 0 deactivates" +
" the watch dog.");
/**
* This configures how long we wait for the timers in milliseconds to finish all pending timer threads
* when the stream task is cancelled.
*/
public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT_TIMERS = ConfigOptions
.key("task.cancellation.timers.timeout")
.defaultValue(7500L)
.withDeprecatedKeys("timerservice.exceptional.shutdown.timeout")
.withDescription("Time we wait for the timers in milliseconds to finish all pending timer threads" +
" when the stream task is cancelled.");
/**
* The maximum number of bytes that a checkpoint alignment may buffer.
* If the checkpoint alignment buffers more than the configured amount of
* data, the checkpoint is aborted (skipped).
*
* <p>The default value of {@code -1} indicates that there is no limit.
*/
public static final ConfigOption<Long> TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT =
key("task.checkpoint.alignment.max-size")
.defaultValue(-1L)
.withDescription("The maximum number of bytes that a checkpoint alignment may buffer. If the checkpoint" +
" alignment buffers more than the configured amount of data, the checkpoint is aborted (skipped)." +
" A value of -1 indicates that there is no limit.");
// ------------------------------------------------------------------------
/** Not intended to be instantiated. */
private TaskManagerOptions() {}
}