| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.flink; |
| |
| import java.util.List; |
| import org.apache.beam.sdk.options.ApplicationNameOptions; |
| import org.apache.beam.sdk.options.Default; |
| import org.apache.beam.sdk.options.Description; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.StreamingOptions; |
| |
| /** |
| * Options which can be used to configure a Flink PortablePipelineRunner. |
| * |
| * <p>Avoid using `org.apache.flink.*` members below. This allows including the flink runner without |
| * requiring flink on the classpath (e.g. to use with the direct runner). |
| */ |
| public interface FlinkPipelineOptions |
| extends PipelineOptions, ApplicationNameOptions, StreamingOptions { |
| |
| String AUTO = "[auto]"; |
| String PIPELINED = "PIPELINED"; |
| String EXACTLY_ONCE = "EXACTLY_ONCE"; |
| |
| /** |
| * List of local files to make available to workers. |
| * |
| * <p>Jars are placed on the worker's classpath. |
| * |
| * <p>The default value is the list of jars from the main program's classpath. |
| */ |
| @Description( |
| "Jar-Files to send to all workers and put on the classpath. " |
| + "The default value is all files from the classpath.") |
| List<String> getFilesToStage(); |
| |
| void setFilesToStage(List<String> value); |
| |
| /** |
| * The url of the Flink JobManager on which to execute pipelines. This can either be the the |
| * address of a cluster JobManager, in the form "host:port" or one of the special Strings |
| * "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink Cluster in the JVM, |
| * "[collection]" will execute the pipeline on Java Collections while "[auto]" will let the system |
| * decide where to execute the pipeline based on the environment. |
| */ |
| @Description( |
| "Address of the Flink Master where the Pipeline should be executed. Can" |
| + " either be of the form \"host:port\" or one of the special values [local], " |
| + "[collection] or [auto].") |
| @Default.String(AUTO) |
| String getFlinkMaster(); |
| |
| void setFlinkMaster(String value); |
| |
| @Description( |
| "The degree of parallelism to be used when distributing operations onto workers. " |
| + "If the parallelism is not set, the configured Flink default is used, or 1 if none can be found.") |
| @Default.Integer(-1) |
| Integer getParallelism(); |
| |
| void setParallelism(Integer value); |
| |
| @Description( |
| "The pipeline wide maximum degree of parallelism to be used. The maximum parallelism specifies the upper limit " |
| + "for dynamic scaling and the number of key groups used for partitioned state.") |
| @Default.Integer(-1) |
| Integer getMaxParallelism(); |
| |
| void setMaxParallelism(Integer value); |
| |
| @Description( |
| "The interval in milliseconds at which to trigger checkpoints of the running pipeline. " |
| + "Default: No checkpointing.") |
| @Default.Long(-1L) |
| Long getCheckpointingInterval(); |
| |
| void setCheckpointingInterval(Long interval); |
| |
| @Description("The checkpointing mode that defines consistency guarantee.") |
| @Default.String(EXACTLY_ONCE) |
| String getCheckpointingMode(); |
| |
| void setCheckpointingMode(String mode); |
| |
| @Description( |
| "The maximum time in milliseconds that a checkpoint may take before being discarded.") |
| @Default.Long(-1L) |
| Long getCheckpointTimeoutMillis(); |
| |
| void setCheckpointTimeoutMillis(Long checkpointTimeoutMillis); |
| |
| @Description("The minimal pause in milliseconds before the next checkpoint is triggered.") |
| @Default.Long(-1L) |
| Long getMinPauseBetweenCheckpoints(); |
| |
| void setMinPauseBetweenCheckpoints(Long minPauseInterval); |
| |
| @Description( |
| "Sets the expected behaviour for tasks in case that they encounter an error in their " |
| + "checkpointing procedure. If this is set to true, the task will fail on checkpointing error. " |
| + "If this is set to false, the task will only decline a the checkpoint and continue running. ") |
| @Default.Boolean(true) |
| Boolean getFailOnCheckpointingErrors(); |
| |
| void setFailOnCheckpointingErrors(Boolean failOnCheckpointingErrors); |
| |
| @Description( |
| "Sets the number of times that failed tasks are re-executed. " |
| + "A value of zero effectively disables fault tolerance. A value of -1 indicates " |
| + "that the system default value (as defined in the configuration) should be used.") |
| @Default.Integer(-1) |
| Integer getNumberOfExecutionRetries(); |
| |
| void setNumberOfExecutionRetries(Integer retries); |
| |
| @Description( |
| "Sets the delay in milliseconds between executions. A value of {@code -1} " |
| + "indicates that the default value should be used.") |
| @Default.Long(-1L) |
| Long getExecutionRetryDelay(); |
| |
| void setExecutionRetryDelay(Long delay); |
| |
| @Description("Sets the behavior of reusing objects.") |
| @Default.Boolean(false) |
| Boolean getObjectReuse(); |
| |
| void setObjectReuse(Boolean reuse); |
| |
| /** |
| * State backend to store Beam's state during computation. Note: Only applicable when executing in |
| * streaming mode. |
| */ |
| @Description( |
| "Sets the state backend factory to use in streaming mode. " |
| + "Defaults to the flink cluster's state.backend configuration.") |
| Class<? extends FlinkStateBackendFactory> getStateBackendFactory(); |
| |
| void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory); |
| |
| @Description("Enable/disable Beam metrics in Flink Runner") |
| @Default.Boolean(true) |
| Boolean getEnableMetrics(); |
| |
| void setEnableMetrics(Boolean enableMetrics); |
| |
| /** Enables or disables externalized checkpoints. */ |
| @Description( |
| "Enables or disables externalized checkpoints. " |
| + "Works in conjunction with CheckpointingInterval") |
| @Default.Boolean(false) |
| Boolean isExternalizedCheckpointsEnabled(); |
| |
| void setExternalizedCheckpointsEnabled(Boolean externalCheckpoints); |
| |
| @Description("Sets the behavior of externalized checkpoints on cancellation.") |
| @Default.Boolean(false) |
| Boolean getRetainExternalizedCheckpointsOnCancellation(); |
| |
| void setRetainExternalizedCheckpointsOnCancellation(Boolean retainOnCancellation); |
| |
| @Description("The maximum number of elements in a bundle.") |
| @Default.Long(1000) |
| Long getMaxBundleSize(); |
| |
| void setMaxBundleSize(Long size); |
| |
| @Description("The maximum time to wait before finalising a bundle (in milliseconds).") |
| @Default.Long(1000) |
| Long getMaxBundleTimeMills(); |
| |
| void setMaxBundleTimeMills(Long time); |
| |
| /** |
| * Whether to shutdown sources when their watermark reaches {@code +Inf}. For production use cases |
| * you want this to be disabled because Flink will currently (versions {@literal <=} 1.5) stop |
| * doing checkpoints when any operator (which includes sources) is finished. |
| * |
| * <p>Please see <a href="https://issues.apache.org/jira/browse/FLINK-2491">FLINK-2491</a> for |
| * progress on this issue. |
| */ |
| @Description("If set, shutdown sources when their watermark reaches +Inf.") |
| @Default.Boolean(false) |
| Boolean isShutdownSourcesOnFinalWatermark(); |
| |
| void setShutdownSourcesOnFinalWatermark(Boolean shutdownOnFinalWatermark); |
| |
| @Description( |
| "Interval in milliseconds for sending latency tracking marks from the sources to the sinks. " |
| + "Interval value <= 0 disables the feature.") |
| @Default.Long(0) |
| Long getLatencyTrackingInterval(); |
| |
| void setLatencyTrackingInterval(Long interval); |
| |
| @Description("The interval in milliseconds for automatic watermark emission.") |
| Long getAutoWatermarkInterval(); |
| |
| void setAutoWatermarkInterval(Long interval); |
| |
| @Description( |
| "Flink mode for data exchange of batch pipelines. " |
| + "Reference {@link org.apache.flink.api.common.ExecutionMode}. " |
| + "Set this to BATCH_FORCED if pipelines get blocked, see " |
| + "https://issues.apache.org/jira/browse/FLINK-10672") |
| @Default.String(PIPELINED) |
| String getExecutionModeForBatch(); |
| |
| void setExecutionModeForBatch(String executionMode); |
| |
| @Description( |
| "Savepoint restore path. If specified, restores the streaming pipeline from the provided path.") |
| String getSavepointPath(); |
| |
| void setSavepointPath(String path); |
| |
| @Description( |
| "Flag indicating whether non restored state is allowed if the savepoint " |
| + "contains state for an operator that is no longer part of the pipeline.") |
| @Default.Boolean(false) |
| Boolean getAllowNonRestoredState(); |
| |
| void setAllowNonRestoredState(Boolean allowNonRestoredState); |
| |
| @Description( |
| "Flag indicating whether auto-balance sharding for WriteFiles transform should be enabled. " |
| + "This might prove useful in streaming use-case, where pipeline needs to write quite many events " |
| + "into files, typically divided into N shards. Default behavior on Flink would be, that some workers " |
| + "will receive more shards to take care of than others. This cause workers to go out of balance in " |
| + "terms of processing backlog and memory usage. Enabling this feature will make shards to be spread " |
| + "evenly among available workers in improve throughput and memory usage stability.") |
| @Default.Boolean(false) |
| Boolean isAutoBalanceWriteFilesShardingEnabled(); |
| |
| void setAutoBalanceWriteFilesShardingEnabled(Boolean autoBalanceWriteFilesShardingEnabled); |
| } |