blob: e9cb8dcc8508c6f3a1820cbfa34d7c8822424149 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink;
import java.util.List;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
/**
* Options which can be used to configure a Flink PortablePipelineRunner.
*
* <p>Avoid using `org.apache.flink.*` members below. This allows including the flink runner without
* requiring flink on the classpath (e.g. to use with the direct runner).
*/
public interface FlinkPipelineOptions
extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
String AUTO = "[auto]";
String PIPELINED = "PIPELINED";
String EXACTLY_ONCE = "EXACTLY_ONCE";
/**
* List of local files to make available to workers.
*
* <p>Jars are placed on the worker's classpath.
*
* <p>The default value is the list of jars from the main program's classpath.
*/
@Description(
"Jar-Files to send to all workers and put on the classpath. "
+ "The default value is all files from the classpath.")
List<String> getFilesToStage();
void setFilesToStage(List<String> value);
/**
* The url of the Flink JobManager on which to execute pipelines. This can either be the the
* address of a cluster JobManager, in the form "host:port" or one of the special Strings
* "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink Cluster in the JVM,
* "[collection]" will execute the pipeline on Java Collections while "[auto]" will let the system
* decide where to execute the pipeline based on the environment.
*/
@Description(
"Address of the Flink Master where the Pipeline should be executed. Can"
+ " either be of the form \"host:port\" or one of the special values [local], "
+ "[collection] or [auto].")
@Default.String(AUTO)
String getFlinkMaster();
void setFlinkMaster(String value);
@Description(
"The degree of parallelism to be used when distributing operations onto workers. "
+ "If the parallelism is not set, the configured Flink default is used, or 1 if none can be found.")
@Default.Integer(-1)
Integer getParallelism();
void setParallelism(Integer value);
@Description(
"The pipeline wide maximum degree of parallelism to be used. The maximum parallelism specifies the upper limit "
+ "for dynamic scaling and the number of key groups used for partitioned state.")
@Default.Integer(-1)
Integer getMaxParallelism();
void setMaxParallelism(Integer value);
@Description(
"The interval in milliseconds at which to trigger checkpoints of the running pipeline. "
+ "Default: No checkpointing.")
@Default.Long(-1L)
Long getCheckpointingInterval();
void setCheckpointingInterval(Long interval);
@Description("The checkpointing mode that defines consistency guarantee.")
@Default.String(EXACTLY_ONCE)
String getCheckpointingMode();
void setCheckpointingMode(String mode);
@Description(
"The maximum time in milliseconds that a checkpoint may take before being discarded.")
@Default.Long(-1L)
Long getCheckpointTimeoutMillis();
void setCheckpointTimeoutMillis(Long checkpointTimeoutMillis);
@Description("The minimal pause in milliseconds before the next checkpoint is triggered.")
@Default.Long(-1L)
Long getMinPauseBetweenCheckpoints();
void setMinPauseBetweenCheckpoints(Long minPauseInterval);
@Description(
"Sets the expected behaviour for tasks in case that they encounter an error in their "
+ "checkpointing procedure. If this is set to true, the task will fail on checkpointing error. "
+ "If this is set to false, the task will only decline a the checkpoint and continue running. ")
@Default.Boolean(true)
Boolean getFailOnCheckpointingErrors();
void setFailOnCheckpointingErrors(Boolean failOnCheckpointingErrors);
@Description(
"Sets the number of times that failed tasks are re-executed. "
+ "A value of zero effectively disables fault tolerance. A value of -1 indicates "
+ "that the system default value (as defined in the configuration) should be used.")
@Default.Integer(-1)
Integer getNumberOfExecutionRetries();
void setNumberOfExecutionRetries(Integer retries);
@Description(
"Sets the delay in milliseconds between executions. A value of {@code -1} "
+ "indicates that the default value should be used.")
@Default.Long(-1L)
Long getExecutionRetryDelay();
void setExecutionRetryDelay(Long delay);
@Description("Sets the behavior of reusing objects.")
@Default.Boolean(false)
Boolean getObjectReuse();
void setObjectReuse(Boolean reuse);
/**
* State backend to store Beam's state during computation. Note: Only applicable when executing in
* streaming mode.
*/
@Description(
"Sets the state backend factory to use in streaming mode. "
+ "Defaults to the flink cluster's state.backend configuration.")
Class<? extends FlinkStateBackendFactory> getStateBackendFactory();
void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory);
@Description("Enable/disable Beam metrics in Flink Runner")
@Default.Boolean(true)
Boolean getEnableMetrics();
void setEnableMetrics(Boolean enableMetrics);
/** Enables or disables externalized checkpoints. */
@Description(
"Enables or disables externalized checkpoints. "
+ "Works in conjunction with CheckpointingInterval")
@Default.Boolean(false)
Boolean isExternalizedCheckpointsEnabled();
void setExternalizedCheckpointsEnabled(Boolean externalCheckpoints);
@Description("Sets the behavior of externalized checkpoints on cancellation.")
@Default.Boolean(false)
Boolean getRetainExternalizedCheckpointsOnCancellation();
void setRetainExternalizedCheckpointsOnCancellation(Boolean retainOnCancellation);
@Description("The maximum number of elements in a bundle.")
@Default.Long(1000)
Long getMaxBundleSize();
void setMaxBundleSize(Long size);
@Description("The maximum time to wait before finalising a bundle (in milliseconds).")
@Default.Long(1000)
Long getMaxBundleTimeMills();
void setMaxBundleTimeMills(Long time);
/**
* Whether to shutdown sources when their watermark reaches {@code +Inf}. For production use cases
* you want this to be disabled because Flink will currently (versions {@literal <=} 1.5) stop
* doing checkpoints when any operator (which includes sources) is finished.
*
* <p>Please see <a href="https://issues.apache.org/jira/browse/FLINK-2491">FLINK-2491</a> for
* progress on this issue.
*/
@Description("If set, shutdown sources when their watermark reaches +Inf.")
@Default.Boolean(false)
Boolean isShutdownSourcesOnFinalWatermark();
void setShutdownSourcesOnFinalWatermark(Boolean shutdownOnFinalWatermark);
@Description(
"Interval in milliseconds for sending latency tracking marks from the sources to the sinks. "
+ "Interval value <= 0 disables the feature.")
@Default.Long(0)
Long getLatencyTrackingInterval();
void setLatencyTrackingInterval(Long interval);
@Description("The interval in milliseconds for automatic watermark emission.")
Long getAutoWatermarkInterval();
void setAutoWatermarkInterval(Long interval);
@Description(
"Flink mode for data exchange of batch pipelines. "
+ "Reference {@link org.apache.flink.api.common.ExecutionMode}. "
+ "Set this to BATCH_FORCED if pipelines get blocked, see "
+ "https://issues.apache.org/jira/browse/FLINK-10672")
@Default.String(PIPELINED)
String getExecutionModeForBatch();
void setExecutionModeForBatch(String executionMode);
@Description(
"Savepoint restore path. If specified, restores the streaming pipeline from the provided path.")
String getSavepointPath();
void setSavepointPath(String path);
@Description(
"Flag indicating whether non restored state is allowed if the savepoint "
+ "contains state for an operator that is no longer part of the pipeline.")
@Default.Boolean(false)
Boolean getAllowNonRestoredState();
void setAllowNonRestoredState(Boolean allowNonRestoredState);
@Description(
"Flag indicating whether auto-balance sharding for WriteFiles transform should be enabled. "
+ "This might prove useful in streaming use-case, where pipeline needs to write quite many events "
+ "into files, typically divided into N shards. Default behavior on Flink would be, that some workers "
+ "will receive more shards to take care of than others. This cause workers to go out of balance in "
+ "terms of processing backlog and memory usage. Enabling this feature will make shards to be spread "
+ "evenly among available workers in improve throughput and memory usage stability.")
@Default.Boolean(false)
Boolean isAutoBalanceWriteFilesShardingEnabled();
void setAutoBalanceWriteFilesShardingEnabled(Boolean autoBalanceWriteFilesShardingEnabled);
}