blob: a5fc55fb9676b6317c702acff53ac55a152eafcc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.util.Preconditions;
import com.esotericsoftware.kryo.Serializer;
import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* A config to define the behavior of the program execution. It allows to define (among other
* options) the following settings:
*
* <ul>
* <li>The default parallelism of the program, i.e., how many parallel tasks to use for
* all functions that do not define a specific value directly.</li>
* <li>The number of retries in the case of failed executions.</li>
* <li>The delay between delay between execution retries.</li>
* <li>The {@link ExecutionMode} of the program: Batch or Pipelined.
* The default execution mode is {@link ExecutionMode#PIPELINED}</li>
* <li>Enabling or disabling the "closure cleaner". The closure cleaner pre-processes
* the implementations of functions. In case they are (anonymous) inner classes,
* it removes unused references to the enclosing class to fix certain serialization-related
* problems and to reduce the size of the closure.</li>
* <li>The config allows to register types and serializers to increase the efficiency of
* handling <i>generic types</i> and <i>POJOs</i>. This is usually only needed
* when the functions return not only the types declared in their signature, but
* also subclasses of those types.</li>
* <li>The {@link CodeAnalysisMode} of the program: Enable hinting/optimizing or disable
* the "static code analyzer". The static code analyzer pre-interprets user-defined functions in order to
* get implementation insights for program improvements that can be printed to the log or
* automatically applied.</li>
* </ul>
*/
@Public
public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecutionConfig> {
private static final long serialVersionUID = 1L;
/**
* The constant to use for the parallelism, if the system should use the number
* of currently available slots.
*/
@Deprecated
public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;
/**
* The flag value indicating use of the default parallelism. This value can
* be used to reset the parallelism back to the default state.
*/
public static final int PARALLELISM_DEFAULT = -1;
/**
* The flag value indicating an unknown or unset parallelism. This value is
* not a valid parallelism and indicates that the parallelism should remain
* unchanged.
*/
public static final int PARALLELISM_UNKNOWN = -2;
/**
* The default sampling rate of tracing metric based on records count.
*/
public static final int DEFAULT_TRACING_METRICS_SAMPLE_INTERVAL = 100;
private static final long DEFAULT_RESTART_DELAY = 10000L;
// --------------------------------------------------------------------------------------------
/** Defines how data exchange happens - batch or pipelined */
private ExecutionMode executionMode = ExecutionMode.PIPELINED;
private boolean useClosureCleaner = true;
private int parallelism = PARALLELISM_DEFAULT;
/**
* The program wide maximum parallelism used for operators which haven't specified a maximum
* parallelism. The maximum parallelism specifies the upper limit for dynamic scaling and the
* number of key groups used for partitioned state.
*/
private int maxParallelism = -1;
/**
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
*/
@Deprecated
private int numberOfExecutionRetries = -1;
private boolean forceKryo = false;
/** Flag to indicate whether generic types (through Kryo) are supported */
private boolean disableGenericTypes = false;
private boolean objectReuse = false;
private boolean autoTypeRegistrationEnabled = true;
private boolean forceAvro = false;
private CodeAnalysisMode codeAnalysisMode = CodeAnalysisMode.DISABLE;
/** If set to true, progress updates are printed to System.out during execution */
private boolean printProgressDuringExecution = true;
private long autoWatermarkInterval = 0;
/**
* Interval in milliseconds for sending latency tracking marks from the sources to the sinks.
*/
private long latencyTrackingInterval = -1;
/**
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
*/
@Deprecated
private long executionRetryDelay = DEFAULT_RESTART_DELAY;
private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration;
private long taskCancellationIntervalMillis = -1;
/**
* Timeout after which an ongoing task cancellation will lead to a fatal
* TaskManager error, usually killing the JVM.
*/
private long taskCancellationTimeoutMillis = -1;
/** This flag defines if we use compression for the state snapshot data or not. Default: false */
private boolean useSnapshotCompression = false;
/** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: false */
private boolean failTaskOnCheckpointError = false;
// ------------------------------- User code values --------------------------------------------
private GlobalJobParameters globalJobParameters;
// Serializers and types registered with Kryo and the PojoSerializer
// we store them in linked maps/sets to ensure they are registered in order in all kryo instances.
private LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithKryoSerializers = new LinkedHashMap<>();
private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
private LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers = new LinkedHashMap<>();
private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses = new LinkedHashMap<>();
private LinkedHashSet<Class<?>> registeredKryoTypes = new LinkedHashSet<>();
private LinkedHashSet<Class<?>> registeredPojoTypes = new LinkedHashSet<>();
private boolean tracingMetricsEnabled = false;
private int tracingMetricsInterval = DEFAULT_TRACING_METRICS_SAMPLE_INTERVAL;
// --------------------------------------------------------------------------------------------
/**
* Enables the ClosureCleaner. This analyzes user code functions and sets fields to null
* that are not used. This will in most cases make closures or anonymous inner classes
* serializable that where not serializable due to some Scala or Java implementation artifact.
* User code must be serializable because it needs to be sent to worker nodes.
*/
public ExecutionConfig enableClosureCleaner() {
useClosureCleaner = true;
return this;
}
/**
* Disables the ClosureCleaner.
*
* @see #enableClosureCleaner()
*/
public ExecutionConfig disableClosureCleaner() {
useClosureCleaner = false;
return this;
}
/**
* Returns whether the ClosureCleaner is enabled.
*
* @see #enableClosureCleaner()
*/
public boolean isClosureCleanerEnabled() {
return useClosureCleaner;
}
/**
* Sets the interval of the automatic watermark emission. Watermarks are used throughout
* the streaming system to keep track of the progress of time. They are used, for example,
* for time based windowing.
*
* @param interval The interval between watermarks in milliseconds.
*/
@PublicEvolving
public ExecutionConfig setAutoWatermarkInterval(long interval) {
this.autoWatermarkInterval = interval;
return this;
}
/**
* Returns the interval of the automatic watermark emission.
*
* @see #setAutoWatermarkInterval(long)
*/
@PublicEvolving
public long getAutoWatermarkInterval() {
return this.autoWatermarkInterval;
}
/**
* Interval for sending latency tracking marks from the sources to the sinks.
* Flink will send latency tracking marks from the sources at the specified interval.
*
* Recommended value: 2000 (2 seconds).
*
* Setting a tracking interval <= 0 disables the latency tracking.
*
* @param interval Interval in milliseconds.
*/
@PublicEvolving
public ExecutionConfig setLatencyTrackingInterval(long interval) {
this.latencyTrackingInterval = interval;
return this;
}
/**
* Returns the latency tracking interval.
* @return The latency tracking interval in milliseconds
*/
@PublicEvolving
public long getLatencyTrackingInterval() {
return latencyTrackingInterval;
}
/**
* Returns if latency tracking is enabled
* @return True, if the tracking is enabled, false otherwise.
*/
@PublicEvolving
public boolean isLatencyTrackingEnabled() {
return latencyTrackingInterval > 0;
}
/**
* Gets the parallelism with which operation are executed by default. Operations can
* individually override this value to use a specific parallelism.
*
* Other operations may need to run with a different parallelism - for example calling
* a reduce operation over the entire data set will involve an operation that runs
* with a parallelism of one (the final reduce to the single result value).
*
* @return The parallelism used by operations, unless they override that value. This method
* returns {@link #PARALLELISM_DEFAULT} if the environment's default parallelism
* should be used.
*/
public int getParallelism() {
return parallelism;
}
/**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
* x parallel instances.
* <p>
* This method overrides the default parallelism for this environment.
* The local execution environment uses by default a value equal to the number of hardware
* contexts (CPU cores / threads). When executing the program via the command line client
* from a JAR file, the default parallelism is the one configured for that setup.
*
* @param parallelism The parallelism to use
*/
public ExecutionConfig setParallelism(int parallelism) {
if (parallelism != PARALLELISM_UNKNOWN) {
if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) {
throw new IllegalArgumentException(
"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
}
this.parallelism = parallelism;
}
return this;
}
/**
* Gets the maximum degree of parallelism defined for the program.
*
* The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
* defines the number of key groups used for partitioned state.
*
* @return Maximum degree of parallelism
*/
@PublicEvolving
public int getMaxParallelism() {
return maxParallelism;
}
/**
* Sets the maximum degree of parallelism defined for the program.
*
* The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
* defines the number of key groups used for partitioned state.
*
* @param maxParallelism Maximum degree of parallelism to be used for the program.
*/
@PublicEvolving
public void setMaxParallelism(int maxParallelism) {
checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
this.maxParallelism = maxParallelism;
}
/**
* Gets the interval (in milliseconds) between consecutive attempts to cancel a running task.
*/
public long getTaskCancellationInterval() {
return this.taskCancellationIntervalMillis;
}
/**
* Sets the configuration parameter specifying the interval (in milliseconds)
* between consecutive attempts to cancel a running task.
* @param interval the interval (in milliseconds).
*/
public ExecutionConfig setTaskCancellationInterval(long interval) {
this.taskCancellationIntervalMillis = interval;
return this;
}
/**
* Returns the timeout (in milliseconds) after which an ongoing task
* cancellation leads to a fatal TaskManager error.
*
* <p>The value <code>0</code> means that the timeout is disabled. In
* this case a stuck cancellation will not lead to a fatal error.
*/
@PublicEvolving
public long getTaskCancellationTimeout() {
return this.taskCancellationTimeoutMillis;
}
/**
* Sets the timeout (in milliseconds) after which an ongoing task cancellation
* is considered failed, leading to a fatal TaskManager error.
*
* <p>The cluster default is configured via {@link TaskManagerOptions#TASK_CANCELLATION_TIMEOUT}.
*
* <p>The value <code>0</code> disables the timeout. In this case a stuck
* cancellation will not lead to a fatal error.
*
* @param timeout The task cancellation timeout (in milliseconds).
*/
@PublicEvolving
public ExecutionConfig setTaskCancellationTimeout(long timeout) {
checkArgument(timeout >= 0, "Timeout needs to be >= 0.");
this.taskCancellationTimeoutMillis = timeout;
return this;
}
/**
* Sets the restart strategy to be used for recovery.
*
* <pre>{@code
* ExecutionConfig config = env.getConfig();
*
* config.setRestartStrategy(RestartStrategies.fixedDelayRestart(
* 10, // number of retries
* 1000 // delay between retries));
* }</pre>
*
* @param restartStrategyConfiguration Configuration defining the restart strategy to use
*/
@PublicEvolving
public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
this.restartStrategyConfiguration = restartStrategyConfiguration;
}
/**
* Returns the restart strategy which has been set for the current job.
*
* @return The specified restart configuration
*/
@PublicEvolving
@SuppressWarnings("deprecation")
public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
if (restartStrategyConfiguration == null) {
// support the old API calls by creating a restart strategy from them
if (getNumberOfExecutionRetries() > 0 && getExecutionRetryDelay() >= 0) {
return RestartStrategies.fixedDelayRestart(getNumberOfExecutionRetries(), getExecutionRetryDelay());
} else if (getNumberOfExecutionRetries() == 0) {
return RestartStrategies.noRestart();
} else {
return null;
}
} else {
return restartStrategyConfiguration;
}
}
/**
* Gets the number of times the system will try to re-execute failed tasks. A value
* of {@code -1} indicates that the system default value (as defined in the configuration)
* should be used.
*
* @return The number of times the system will try to re-execute failed tasks.
*
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
*/
@Deprecated
public int getNumberOfExecutionRetries() {
return numberOfExecutionRetries;
}
/**
* Returns the delay between execution retries.
*
* @return The delay between successive execution retries in milliseconds.
*
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
*/
@Deprecated
public long getExecutionRetryDelay() {
return executionRetryDelay;
}
/**
* Sets the number of times that failed tasks are re-executed. A value of zero
* effectively disables fault tolerance. A value of {@code -1} indicates that the system
* default value (as defined in the configuration) should be used.
*
* @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
*
* @return The current execution configuration
*
* @deprecated This method will be replaced by {@link #setRestartStrategy}. The
* {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the number of
* execution retries.
*/
@Deprecated
public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries) {
if (numberOfExecutionRetries < -1) {
throw new IllegalArgumentException(
"The number of execution retries must be non-negative, or -1 (use system default)");
}
this.numberOfExecutionRetries = numberOfExecutionRetries;
return this;
}
/**
* Sets the delay between executions.
*
* @param executionRetryDelay The number of milliseconds the system will wait to retry.
*
* @return The current execution configuration
*
* @deprecated This method will be replaced by {@link #setRestartStrategy}. The
* {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the delay between
* successive execution attempts.
*/
@Deprecated
public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) {
if (executionRetryDelay < 0 ) {
throw new IllegalArgumentException(
"The delay between retries must be non-negative.");
}
this.executionRetryDelay = executionRetryDelay;
return this;
}
/**
* Sets the execution mode to execute the program. The execution mode defines whether
* data exchanges are performed in a batch or on a pipelined manner.
*
* The default execution mode is {@link ExecutionMode#PIPELINED}.
*
* @param executionMode The execution mode to use.
*/
public void setExecutionMode(ExecutionMode executionMode) {
this.executionMode = executionMode;
}
/**
* Gets the execution mode used to execute the program. The execution mode defines whether
* data exchanges are performed in a batch or on a pipelined manner.
*
* The default execution mode is {@link ExecutionMode#PIPELINED}.
*
* @return The execution mode for the program.
*/
public ExecutionMode getExecutionMode() {
return executionMode;
}
/**
* Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO.
* In some cases this might be preferable. For example, when using interfaces
* with subclasses that cannot be analyzed as POJO.
*/
public void enableForceKryo() {
forceKryo = true;
}
/**
* Disable use of Kryo serializer for all POJOs.
*/
public void disableForceKryo() {
forceKryo = false;
}
public boolean isForceKryoEnabled() {
return forceKryo;
}
/**
* Enables the use generic types which are serialized via Kryo.
*
* <p>Generic types are enabled by default.
*
* @see #disableGenericTypes()
*/
public void enableGenericTypes() {
disableGenericTypes = false;
}
/**
* Disables the use of generic types (types that would be serialized via Kryo). If this option
* is used, Flink will throw an {@code UnsupportedOperationException} whenever it encounters
* a data type that would go through Kryo for serialization.
*
* <p>Disabling generic types can be helpful to eagerly find and eliminate teh use of types
* that would go through Kryo serialization during runtime. Rather than checking types
* individually, using this option will throw exceptions eagerly in the places where generic
* types are used.
*
* <p><b>Important:</b> We recommend to use this option only during development and pre-production
* phases, not during actual production use. The application program and/or the input data may be
* such that new, previously unseen, types occur at some point. In that case, setting this option
* would cause the program to fail.
*
* @see #enableGenericTypes()
*/
public void disableGenericTypes() {
disableGenericTypes = true;
}
/**
* Checks whether generic types are supported. Generic types are types that go through Kryo during
* serialization.
*
* <p>Generic types are enabled by default.
*
* @see #enableGenericTypes()
* @see #disableGenericTypes()
*/
public boolean hasGenericTypesDisabled() {
return disableGenericTypes;
}
/**
* Forces Flink to use the Apache Avro serializer for POJOs.
*
* <b>Important:</b> Make sure to include the <i>flink-avro</i> module.
*/
public void enableForceAvro() {
forceAvro = true;
}
/**
* Disables the Apache Avro serializer as the forced serializer for POJOs.
*/
public void disableForceAvro() {
forceAvro = false;
}
/**
* Returns whether the Apache Avro is the default serializer for POJOs.
*/
public boolean isForceAvroEnabled() {
return forceAvro;
}
/**
* Enables reusing objects that Flink internally uses for deserialization and passing
* data to user-code functions. Keep in mind that this can lead to bugs when the
* user-code function of an operation is not aware of this behaviour.
*/
public ExecutionConfig enableObjectReuse() {
objectReuse = true;
return this;
}
/**
* Disables reusing objects that Flink internally uses for deserialization and passing
* data to user-code functions. @see #enableObjectReuse()
*/
public ExecutionConfig disableObjectReuse() {
objectReuse = false;
return this;
}
/**
* Returns whether object reuse has been enabled or disabled. @see #enableObjectReuse()
*/
public boolean isObjectReuseEnabled() {
return objectReuse;
}
/**
* Sets the {@link CodeAnalysisMode} of the program. Specifies to which extent user-defined
* functions are analyzed in order to give the Flink optimizer an insight of UDF internals
* and inform the user about common implementation mistakes. The static code analyzer pre-interprets
* user-defined functions in order to get implementation insights for program improvements
* that can be printed to the log, automatically applied, or disabled.
*
* @param codeAnalysisMode see {@link CodeAnalysisMode}
*/
@PublicEvolving
public void setCodeAnalysisMode(CodeAnalysisMode codeAnalysisMode) {
this.codeAnalysisMode = codeAnalysisMode;
}
/**
* Returns the {@link CodeAnalysisMode} of the program.
*/
@PublicEvolving
public CodeAnalysisMode getCodeAnalysisMode() {
return codeAnalysisMode;
}
/**
* Enables the printing of progress update messages to {@code System.out}
*
* @return The ExecutionConfig object, to allow for function chaining.
*/
public ExecutionConfig enableSysoutLogging() {
this.printProgressDuringExecution = true;
return this;
}
/**
* Disables the printing of progress update messages to {@code System.out}
*
* @return The ExecutionConfig object, to allow for function chaining.
*/
public ExecutionConfig disableSysoutLogging() {
this.printProgressDuringExecution = false;
return this;
}
/**
* Gets whether progress update messages should be printed to {@code System.out}
*
* @return True, if progress update messages should be printed, false otherwise.
*/
public boolean isSysoutLoggingEnabled() {
return this.printProgressDuringExecution;
}
public GlobalJobParameters getGlobalJobParameters() {
return globalJobParameters;
}
/**
* Register a custom, serializable user configuration object.
* @param globalJobParameters Custom user configuration object
*/
public void setGlobalJobParameters(GlobalJobParameters globalJobParameters) {
this.globalJobParameters = globalJobParameters;
}
// --------------------------------------------------------------------------------------------
// Registry for types and serializers
// --------------------------------------------------------------------------------------------
/**
* Adds a new Kryo default serializer to the Runtime.
*
* Note that the serializer instance must be serializable (as defined by java.io.Serializable),
* because it may be distributed to the worker nodes by java serialization.
*
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
public <T extends Serializer<?> & Serializable>void addDefaultKryoSerializer(Class<?> type, T serializer) {
if (type == null || serializer == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
defaultKryoSerializers.put(type, new SerializableSerializer<>(serializer));
}
/**
* Adds a new Kryo default serializer to the Runtime.
*
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
if (type == null || serializerClass == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
defaultKryoSerializerClasses.put(type, serializerClass);
}
/**
* Registers the given type with a Kryo Serializer.
*
* Note that the serializer instance must be serializable (as defined by java.io.Serializable),
* because it may be distributed to the worker nodes by java serialization.
*
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
public <T extends Serializer<?> & Serializable>void registerTypeWithKryoSerializer(Class<?> type, T serializer) {
if (type == null || serializer == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
registeredTypesWithKryoSerializers.put(type, new SerializableSerializer<>(serializer));
}
/**
* Registers the given Serializer via its class as a serializer for the given type at the KryoSerializer
*
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
@SuppressWarnings("rawtypes")
public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass) {
if (type == null || serializerClass == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
@SuppressWarnings("unchecked")
Class<? extends Serializer<?>> castedSerializerClass = (Class<? extends Serializer<?>>) serializerClass;
registeredTypesWithKryoSerializerClasses.put(type, castedSerializerClass);
}
/**
* Registers the given type with the serialization stack. If the type is eventually
* serialized as a POJO, then the type is registered with the POJO serializer. If the
* type ends up being serialized with Kryo, then it will be registered at Kryo to make
* sure that only tags are written.
*
* @param type The class of the type to register.
*/
public void registerPojoType(Class<?> type) {
if (type == null) {
throw new NullPointerException("Cannot register null type class.");
}
if (!registeredPojoTypes.contains(type)) {
registeredPojoTypes.add(type);
}
}
/**
* Registers the given type with the serialization stack. If the type is eventually
* serialized as a POJO, then the type is registered with the POJO serializer. If the
* type ends up being serialized with Kryo, then it will be registered at Kryo to make
* sure that only tags are written.
*
* @param type The class of the type to register.
*/
public void registerKryoType(Class<?> type) {
if (type == null) {
throw new NullPointerException("Cannot register null type class.");
}
registeredKryoTypes.add(type);
}
/**
* Returns the registered types with Kryo Serializers.
*/
public LinkedHashMap<Class<?>, SerializableSerializer<?>> getRegisteredTypesWithKryoSerializers() {
return registeredTypesWithKryoSerializers;
}
/**
* Returns the registered types with their Kryo Serializer classes.
*/
public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> getRegisteredTypesWithKryoSerializerClasses() {
return registeredTypesWithKryoSerializerClasses;
}
/**
* Returns the registered default Kryo Serializers.
*/
public LinkedHashMap<Class<?>, SerializableSerializer<?>> getDefaultKryoSerializers() {
return defaultKryoSerializers;
}
/**
* Returns the registered default Kryo Serializer classes.
*/
public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> getDefaultKryoSerializerClasses() {
return defaultKryoSerializerClasses;
}
/**
* Returns the registered Kryo types.
*/
public LinkedHashSet<Class<?>> getRegisteredKryoTypes() {
if (isForceKryoEnabled()) {
// if we force kryo, we must also return all the types that
// were previously only registered as POJO
LinkedHashSet<Class<?>> result = new LinkedHashSet<>();
result.addAll(registeredKryoTypes);
for(Class<?> t : registeredPojoTypes) {
if (!result.contains(t)) {
result.add(t);
}
}
return result;
} else {
return registeredKryoTypes;
}
}
/**
* Returns the registered POJO types.
*/
public LinkedHashSet<Class<?>> getRegisteredPojoTypes() {
return registeredPojoTypes;
}
public boolean isAutoTypeRegistrationDisabled() {
return !autoTypeRegistrationEnabled;
}
/**
* Control whether Flink is automatically registering all types in the user programs with
* Kryo.
*
*/
public void disableAutoTypeRegistration() {
this.autoTypeRegistrationEnabled = false;
}
public boolean isUseSnapshotCompression() {
return useSnapshotCompression;
}
public void setUseSnapshotCompression(boolean useSnapshotCompression) {
this.useSnapshotCompression = useSnapshotCompression;
}
/**
* This method is visible because of the way the configuration is currently forwarded from the checkpoint config to
* the task. This should not be called by the user, please use CheckpointConfig.isFailTaskOnCheckpointError()
* instead.
*/
@Internal
public boolean isFailTaskOnCheckpointError() {
return failTaskOnCheckpointError;
}
/**
* This method is visible because of the way the configuration is currently forwarded from the checkpoint config to
* the task. This should not be called by the user, please use CheckpointConfig.setFailOnCheckpointingErrors(...)
* instead.
*/
@Internal
public void setFailTaskOnCheckpointError(boolean failTaskOnCheckpointError) {
this.failTaskOnCheckpointError = failTaskOnCheckpointError;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ExecutionConfig) {
ExecutionConfig other = (ExecutionConfig) obj;
return other.canEqual(this) &&
Objects.equals(executionMode, other.executionMode) &&
useClosureCleaner == other.useClosureCleaner &&
parallelism == other.parallelism &&
((restartStrategyConfiguration == null && other.restartStrategyConfiguration == null) ||
(null != restartStrategyConfiguration && restartStrategyConfiguration.equals(other.restartStrategyConfiguration))) &&
forceKryo == other.forceKryo &&
disableGenericTypes == other.disableGenericTypes &&
objectReuse == other.objectReuse &&
autoTypeRegistrationEnabled == other.autoTypeRegistrationEnabled &&
forceAvro == other.forceAvro &&
Objects.equals(codeAnalysisMode, other.codeAnalysisMode) &&
printProgressDuringExecution == other.printProgressDuringExecution &&
Objects.equals(globalJobParameters, other.globalJobParameters) &&
autoWatermarkInterval == other.autoWatermarkInterval &&
registeredTypesWithKryoSerializerClasses.equals(other.registeredTypesWithKryoSerializerClasses) &&
defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses) &&
registeredKryoTypes.equals(other.registeredKryoTypes) &&
registeredPojoTypes.equals(other.registeredPojoTypes) &&
taskCancellationIntervalMillis == other.taskCancellationIntervalMillis &&
useSnapshotCompression == other.useSnapshotCompression;
} else {
return false;
}
}
@Override
public int hashCode() {
return Objects.hash(
executionMode,
useClosureCleaner,
parallelism,
restartStrategyConfiguration,
forceKryo,
disableGenericTypes,
objectReuse,
autoTypeRegistrationEnabled,
forceAvro,
codeAnalysisMode,
printProgressDuringExecution,
globalJobParameters,
autoWatermarkInterval,
registeredTypesWithKryoSerializerClasses,
defaultKryoSerializerClasses,
registeredKryoTypes,
registeredPojoTypes,
taskCancellationIntervalMillis,
useSnapshotCompression);
}
public boolean canEqual(Object obj) {
return obj instanceof ExecutionConfig;
}
@Override
@Internal
public ArchivedExecutionConfig archive() {
return new ArchivedExecutionConfig(this);
}
public boolean isTracingMetricsEnabled() {
return tracingMetricsEnabled;
}
public ExecutionConfig setTracingMetricsEnabled(boolean tracingMetricsEnabled) {
this.tracingMetricsEnabled = tracingMetricsEnabled;
return this;
}
public int getTracingMetricsInterval() {
return tracingMetricsInterval;
}
public ExecutionConfig setTracingMetricsInterval(int tracingMetricsInterval) {
Preconditions.checkArgument(tracingMetricsInterval > 0);
this.tracingMetricsInterval = tracingMetricsInterval;
return this;
}
// ------------------------------ Utilities ----------------------------------
public static class SerializableSerializer<T extends Serializer<?> & Serializable> implements Serializable {
private static final long serialVersionUID = 4687893502781067189L;
private T serializer;
public SerializableSerializer(T serializer) {
this.serializer = serializer;
}
public T getSerializer() {
return serializer;
}
}
/**
* Abstract class for a custom user configuration object registered at the execution config.
*
* This user config is accessible at runtime through
* getRuntimeContext().getExecutionConfig().GlobalJobParameters()
*/
public static class GlobalJobParameters implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Convert UserConfig into a {@code Map<String, String>} representation.
* This can be used by the runtime, for example for presenting the user config in the web frontend.
*
* @return Key/Value representation of the UserConfig
*/
public Map<String, String> toMap() {
return Collections.emptyMap();
}
}
}