| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.table.api.config; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.annotation.PublicEvolving; |
| import org.apache.flink.annotation.docs.Documentation; |
| import org.apache.flink.configuration.ConfigOption; |
| import org.apache.flink.configuration.ConfigOptions; |
| import org.apache.flink.configuration.DescribedEnum; |
| import org.apache.flink.configuration.ExecutionOptions; |
| import org.apache.flink.configuration.MemorySize; |
| import org.apache.flink.configuration.description.Description; |
| import org.apache.flink.configuration.description.InlineElement; |
| |
| import java.time.Duration; |
| |
| import static org.apache.flink.configuration.ConfigOptions.key; |
| import static org.apache.flink.configuration.description.TextElement.code; |
| import static org.apache.flink.configuration.description.TextElement.text; |
| |
| /** |
| * This class holds configuration constants used by Flink's table module. |
| * |
| * <p>NOTE: All option keys in this class must start with "table.exec". |
| */ |
| @PublicEvolving |
| public class ExecutionConfigOptions { |
| |
| // ------------------------------------------------------------------------ |
| // State Options |
| // ------------------------------------------------------------------------ |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| public static final ConfigOption<Duration> IDLE_STATE_RETENTION = |
| key("table.exec.state.ttl") |
| .durationType() |
| .defaultValue(Duration.ofMillis(0)) |
| .withDescription( |
| "Specifies a minimum time interval for how long idle state " |
| + "(i.e. state which was not updated), will be retained. State will never be " |
| + "cleared until it was idle for less than the minimum time, and will be cleared " |
| + "at some time after it was idle. Default is never clean-up the state. " |
| + "NOTE: Cleaning up state requires additional overhead for bookkeeping. " |
| + "Default value is 0, which means that it will never clean up state."); |
| |
| // ------------------------------------------------------------------------ |
| // Source Options |
| // ------------------------------------------------------------------------ |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| public static final ConfigOption<Duration> TABLE_EXEC_SOURCE_IDLE_TIMEOUT = |
| key("table.exec.source.idle-timeout") |
| .durationType() |
| .defaultValue(Duration.ofMillis(0)) |
| .withDescription( |
| "When a source do not receive any elements for the timeout time, " |
| + "it will be marked as temporarily idle. This allows downstream " |
| + "tasks to advance their watermarks without the need to wait for " |
| + "watermarks from this source while it is idle. " |
| + "Default value is 0, which means detecting source idleness is not enabled."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| public static final ConfigOption<Boolean> TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE = |
| key("table.exec.source.cdc-events-duplicate") |
| .booleanType() |
| .defaultValue(false) |
| .withDescription( |
| Description.builder() |
| .text( |
| "Indicates whether the CDC (Change Data Capture) sources " |
| + "in the job will produce duplicate change events that requires the " |
| + "framework to deduplicate and get consistent result. CDC source refers to the " |
| + "source that produces full change events, including INSERT/UPDATE_BEFORE/" |
| + "UPDATE_AFTER/DELETE, for example Kafka source with Debezium format. " |
| + "The value of this configuration is false by default.") |
| .linebreak() |
| .linebreak() |
| .text( |
| "However, it's a common case that there are duplicate change events. " |
| + "Because usually the CDC tools (e.g. Debezium) work in at-least-once delivery " |
| + "when failover happens. Thus, in the abnormal situations Debezium may deliver " |
| + "duplicate change events to Kafka and Flink will get the duplicate events. " |
| + "This may cause Flink query to get wrong results or unexpected exceptions.") |
| .linebreak() |
| .linebreak() |
| .text( |
| "Therefore, it is recommended to turn on this configuration if your CDC tool " |
| + "is at-least-once delivery. Enabling this configuration requires to define " |
| + "PRIMARY KEY on the CDC sources. The primary key will be used to deduplicate " |
| + "change events and generate normalized changelog stream at the cost of " |
| + "an additional stateful operator.") |
| .build()); |
| |
| // ------------------------------------------------------------------------ |
| // Sink Options |
| // ------------------------------------------------------------------------ |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) |
| public static final ConfigOption<NotNullEnforcer> TABLE_EXEC_SINK_NOT_NULL_ENFORCER = |
| key("table.exec.sink.not-null-enforcer") |
| .enumType(NotNullEnforcer.class) |
| .defaultValue(NotNullEnforcer.ERROR) |
| .withDescription( |
| "Determines how Flink enforces NOT NULL column constraints when inserting null values."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) |
| public static final ConfigOption<TypeLengthEnforcer> TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER = |
| key("table.exec.sink.type-length-enforcer") |
| .enumType(TypeLengthEnforcer.class) |
| .defaultValue(TypeLengthEnforcer.IGNORE) |
| .withDescription( |
| "Determines whether values for columns with CHAR(<length>)/VARCHAR(<length>)" |
| + "/BINARY(<length>)/VARBINARY(<length>) types will be trimmed or padded " |
| + "(only for CHAR(<length>)/BINARY(<length>)), so that their length " |
| + "will match the one defined by the length of their respective " |
| + "CHAR/VARCHAR/BINARY/VARBINARY column type."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| public static final ConfigOption<UpsertMaterialize> TABLE_EXEC_SINK_UPSERT_MATERIALIZE = |
| key("table.exec.sink.upsert-materialize") |
| .enumType(UpsertMaterialize.class) |
| .defaultValue(UpsertMaterialize.AUTO) |
| .withDescription( |
| Description.builder() |
| .text( |
| "Because of the disorder of ChangeLog data caused by Shuffle in distributed system, " |
| + "the data received by Sink may not be the order of global upsert. " |
| + "So add upsert materialize operator before upsert sink. It receives the " |
| + "upstream changelog records and generate an upsert view for the downstream.") |
| .linebreak() |
| .text( |
| "By default, the materialize operator will be added when a distributed disorder " |
| + "occurs on unique keys. You can also choose no materialization(NONE) " |
| + "or force materialization(FORCE).") |
| .build()); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| public static final ConfigOption<SinkKeyedShuffle> TABLE_EXEC_SINK_KEYED_SHUFFLE = |
| key("table.exec.sink.keyed-shuffle") |
| .enumType(SinkKeyedShuffle.class) |
| .defaultValue(SinkKeyedShuffle.AUTO) |
| .withDescription( |
| Description.builder() |
| .text( |
| "In order to minimize the distributed disorder problem when writing data into table with primary keys that many users suffers. " |
| + "FLINK will auto add a keyed shuffle by default when the sink's parallelism differs from upstream operator and upstream is append only. " |
| + "This works only when the upstream ensures the multi-records' order on the primary key, if not, the added shuffle can not solve " |
| + "the problem (In this situation, a more proper way is to consider the deduplicate operation for the source firstly or use an " |
| + "upsert source with primary key definition which truly reflect the records evolution).") |
| .linebreak() |
| .text( |
| "By default, the keyed shuffle will be added when the sink's parallelism differs from upstream operator. " |
| + "You can set to no shuffle(NONE) or force shuffle(FORCE).") |
| .build()); |
| |
| // ------------------------------------------------------------------------ |
| // Sort Options |
| // ------------------------------------------------------------------------ |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) |
| public static final ConfigOption<Integer> TABLE_EXEC_SORT_DEFAULT_LIMIT = |
| key("table.exec.sort.default-limit") |
| .intType() |
| .defaultValue(-1) |
| .withDescription( |
| "Default limit when user don't set a limit after order by. -1 indicates that this configuration is ignored."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) |
| public static final ConfigOption<Integer> TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES = |
| key("table.exec.sort.max-num-file-handles") |
| .intType() |
| .defaultValue(128) |
| .withDescription( |
| "The maximal fan-in for external merge sort. It limits the number of file handles per operator. " |
| + "If it is too small, may cause intermediate merging. But if it is too large, " |
| + "it will cause too many files opened at the same time, consume memory and lead to random reading."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) |
| public static final ConfigOption<Boolean> TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED = |
| key("table.exec.sort.async-merge-enabled") |
| .booleanType() |
| .defaultValue(true) |
| .withDescription("Whether to asynchronously merge sorted spill files."); |
| |
| // ------------------------------------------------------------------------ |
| // Spill Options |
| // ------------------------------------------------------------------------ |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) |
| public static final ConfigOption<Boolean> TABLE_EXEC_SPILL_COMPRESSION_ENABLED = |
| key("table.exec.spill-compression.enabled") |
| .booleanType() |
| .defaultValue(true) |
| .withDescription( |
| "Whether to compress spilled data. " |
| + "Currently we only support compress spilled data for sort and hash-agg and hash-join operators."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) |
| public static final ConfigOption<MemorySize> TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE = |
| key("table.exec.spill-compression.block-size") |
| .memoryType() |
| .defaultValue(MemorySize.parse("64 kb")) |
| .withDescription( |
| "The memory size used to do compress when spilling data. " |
| + "The larger the memory, the higher the compression ratio, " |
| + "but more memory resource will be consumed by the job."); |
| |
| // ------------------------------------------------------------------------ |
| // Resource Options |
| // ------------------------------------------------------------------------ |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) |
| public static final ConfigOption<Integer> TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM = |
| key("table.exec.resource.default-parallelism") |
| .intType() |
| .defaultValue(-1) |
| .withDescription( |
| "Sets default parallelism for all operators " |
| + "(such as aggregate, join, filter) to run with parallel instances. " |
| + "This config has a higher priority than parallelism of " |
| + "StreamExecutionEnvironment (actually, this config overrides the parallelism " |
| + "of StreamExecutionEnvironment). A value of -1 indicates that no " |
| + "default parallelism is set, then it will fallback to use the parallelism " |
| + "of StreamExecutionEnvironment."); |
| |
| @Documentation.ExcludeFromDocumentation( |
| "Beginning from Flink 1.10, this is interpreted as a weight hint " |
| + "instead of an absolute memory requirement. Users should not need to change these carefully tuned weight hints.") |
| public static final ConfigOption<MemorySize> TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY = |
| key("table.exec.resource.external-buffer-memory") |
| .memoryType() |
| .defaultValue(MemorySize.parse("10 mb")) |
| .withDescription( |
| "Sets the external buffer memory size that is used in sort merge join" |
| + " and nested join and over window. Note: memory size is only a weight hint," |
| + " it will affect the weight of memory that can be applied by a single operator" |
| + " in the task, the actual memory used depends on the running environment."); |
| |
| @Documentation.ExcludeFromDocumentation( |
| "Beginning from Flink 1.10, this is interpreted as a weight hint " |
| + "instead of an absolute memory requirement. Users should not need to change these carefully tuned weight hints.") |
| public static final ConfigOption<MemorySize> TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY = |
| key("table.exec.resource.hash-agg.memory") |
| .memoryType() |
| .defaultValue(MemorySize.parse("128 mb")) |
| .withDescription( |
| "Sets the managed memory size of hash aggregate operator." |
| + " Note: memory size is only a weight hint, it will affect the weight of memory" |
| + " that can be applied by a single operator in the task, the actual memory used" |
| + " depends on the running environment."); |
| |
| @Documentation.ExcludeFromDocumentation( |
| "Beginning from Flink 1.10, this is interpreted as a weight hint " |
| + "instead of an absolute memory requirement. Users should not need to change these carefully tuned weight hints.") |
| public static final ConfigOption<MemorySize> TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY = |
| key("table.exec.resource.hash-join.memory") |
| .memoryType() |
| // in sync with other weights from Table API and DataStream API |
| .defaultValue(MemorySize.ofMebiBytes(128)) |
| .withDescription( |
| "Sets the managed memory for hash join operator. It defines the lower" |
| + " limit. Note: memory size is only a weight hint, it will affect the weight of" |
| + " memory that can be applied by a single operator in the task, the actual" |
| + " memory used depends on the running environment."); |
| |
| @Documentation.ExcludeFromDocumentation( |
| "Beginning from Flink 1.10, this is interpreted as a weight hint " |
| + "instead of an absolute memory requirement. Users should not need to change these carefully tuned weight hints.") |
| public static final ConfigOption<MemorySize> TABLE_EXEC_RESOURCE_SORT_MEMORY = |
| key("table.exec.resource.sort.memory") |
| .memoryType() |
| // in sync with other weights from Table API and DataStream API |
| .defaultValue(MemorySize.ofMebiBytes(128)) |
| .withDescription( |
| "Sets the managed buffer memory size for sort operator. Note: memory" |
| + " size is only a weight hint, it will affect the weight of memory that can be" |
| + " applied by a single operator in the task, the actual memory used depends on" |
| + " the running environment."); |
| |
| // ------------------------------------------------------------------------ |
| // Agg Options |
| // ------------------------------------------------------------------------ |
| |
| /** See {@code org.apache.flink.table.runtime.operators.window.grouping.HeapWindowsGrouping}. */ |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) |
| public static final ConfigOption<Integer> TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT = |
| key("table.exec.window-agg.buffer-size-limit") |
| .intType() |
| .defaultValue(100 * 1000) |
| .withDescription( |
| "Sets the window elements buffer size limit used in group window agg operator."); |
| |
| // ------------------------------------------------------------------------ |
| // Async Lookup Options |
| // ------------------------------------------------------------------------ |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) |
| public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY = |
| key("table.exec.async-lookup.buffer-capacity") |
| .intType() |
| .defaultValue(100) |
| .withDescription( |
| "The max number of async i/o operation that the async lookup join can trigger."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) |
| public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT = |
| key("table.exec.async-lookup.timeout") |
| .durationType() |
| .defaultValue(Duration.ofMinutes(3)) |
| .withDescription( |
| "The async timeout for the asynchronous operation to complete."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) |
| public static final ConfigOption<AsyncOutputMode> TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE = |
| key("table.exec.async-lookup.output-mode") |
| .enumType(AsyncOutputMode.class) |
| .defaultValue(AsyncOutputMode.ORDERED) |
| .withDescription( |
| "Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. " |
| + "If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not " |
| + "affect the correctness of the result, otherwise ORDERED will be still used."); |
| |
| // ------------------------------------------------------------------------ |
| // MiniBatch Options |
| // ------------------------------------------------------------------------ |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| public static final ConfigOption<Boolean> TABLE_EXEC_MINIBATCH_ENABLED = |
| key("table.exec.mini-batch.enabled") |
| .booleanType() |
| .defaultValue(false) |
| .withDescription( |
| "Specifies whether to enable MiniBatch optimization. " |
| + "MiniBatch is an optimization to buffer input records to reduce state access. " |
| + "This is disabled by default. To enable this, users should set this config to true. " |
| + "NOTE: If mini-batch is enabled, 'table.exec.mini-batch.allow-latency' and " |
| + "'table.exec.mini-batch.size' must be set."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| public static final ConfigOption<Duration> TABLE_EXEC_MINIBATCH_ALLOW_LATENCY = |
| key("table.exec.mini-batch.allow-latency") |
| .durationType() |
| .defaultValue(Duration.ofMillis(0)) |
| .withDescription( |
| "The maximum latency can be used for MiniBatch to buffer input records. " |
| + "MiniBatch is an optimization to buffer input records to reduce state access. " |
| + "MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. " |
| + "NOTE: If " |
| + TABLE_EXEC_MINIBATCH_ENABLED.key() |
| + " is set true, its value must be greater than zero."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| public static final ConfigOption<Long> TABLE_EXEC_MINIBATCH_SIZE = |
| key("table.exec.mini-batch.size") |
| .longType() |
| .defaultValue(-1L) |
| .withDescription( |
| "The maximum number of input records can be buffered for MiniBatch. " |
| + "MiniBatch is an optimization to buffer input records to reduce state access. " |
| + "MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. " |
| + "NOTE: MiniBatch only works for non-windowed aggregations currently. If " |
| + TABLE_EXEC_MINIBATCH_ENABLED.key() |
| + " is set true, its value must be positive."); |
| |
| // ------------------------------------------------------------------------ |
| // Other Exec Options |
| // ------------------------------------------------------------------------ |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) |
| public static final ConfigOption<String> TABLE_EXEC_DISABLED_OPERATORS = |
| key("table.exec.disabled-operators") |
| .stringType() |
| .noDefaultValue() |
| .withDescription( |
| "Mainly for testing. A comma-separated list of operator names, each name " |
| + "represents a kind of disabled operator.\n" |
| + "Operators that can be disabled include \"NestedLoopJoin\", \"ShuffleHashJoin\", \"BroadcastHashJoin\", " |
| + "\"SortMergeJoin\", \"HashAgg\", \"SortAgg\".\n" |
| + "By default no operator is disabled."); |
| |
| /** @deprecated Use {@link ExecutionOptions#BATCH_SHUFFLE_MODE} instead. */ |
| @Deprecated |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) |
| public static final ConfigOption<String> TABLE_EXEC_SHUFFLE_MODE = |
| key("table.exec.shuffle-mode") |
| .stringType() |
| .noDefaultValue() |
| .withDescription( |
| Description.builder() |
| .text("Sets exec shuffle mode.") |
| .linebreak() |
| .text("Accepted values are:") |
| .list( |
| text( |
| "%s: All edges will use blocking shuffle.", |
| code("ALL_EDGES_BLOCKING")), |
| text( |
| "%s: Forward edges will use pipelined shuffle, others blocking.", |
| code("FORWARD_EDGES_PIPELINED")), |
| text( |
| "%s: Pointwise edges will use pipelined shuffle, others blocking. " |
| + "Pointwise edges include forward and rescale edges.", |
| code("POINTWISE_EDGES_PIPELINED")), |
| text( |
| "%s: All edges will use pipelined shuffle.", |
| code("ALL_EDGES_PIPELINED")), |
| text( |
| "%s: the same as %s. Deprecated.", |
| code("batch"), code("ALL_EDGES_BLOCKING")), |
| text( |
| "%s: the same as %s. Deprecated.", |
| code("pipelined"), code("ALL_EDGES_PIPELINED"))) |
| .text( |
| "Note: Blocking shuffle means data will be fully produced before sent to consumer tasks. " |
| + "Pipelined shuffle means data will be sent to consumer tasks once produced.") |
| .build()); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) |
| public static final ConfigOption<LegacyCastBehaviour> TABLE_EXEC_LEGACY_CAST_BEHAVIOUR = |
| key("table.exec.legacy-cast-behaviour") |
| .enumType(LegacyCastBehaviour.class) |
| .defaultValue(LegacyCastBehaviour.DISABLED) |
| .withDescription( |
| "Determines whether CAST will operate following the legacy behaviour " |
| + "or the new one that introduces various fixes and improvements."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| public static final ConfigOption<Long> TABLE_EXEC_RANK_TOPN_CACHE_SIZE = |
| ConfigOptions.key("table.exec.rank.topn-cache-size") |
| .longType() |
| .defaultValue(10000L) |
| .withDeprecatedKeys("table.exec.topn-cache-size") |
| .withDescription( |
| "Rank operators have a cache which caches partial state contents " |
| + "to reduce state access. Cache size is the number of records " |
| + "in each ranking task."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) |
| public static final ConfigOption<Boolean> TABLE_EXEC_SIMPLIFY_OPERATOR_NAME_ENABLED = |
| key("table.exec.simplify-operator-name-enabled") |
| .booleanType() |
| .defaultValue(true) |
| .withDescription( |
| "When it is true, the optimizer will simplify the operator name with id and type of ExecNode and keep detail in description. Default value is true."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| public static final ConfigOption<Boolean> |
| TABLE_EXEC_DEDUPLICATE_INSERT_UPDATE_AFTER_SENSITIVE_ENABLED = |
| key("table.exec.deduplicate.insert-update-after-sensitive-enabled") |
| .booleanType() |
| .defaultValue(true) |
| .withDeprecatedKeys( |
| "table.exec.deduplicate.insert-and-updateafter-sensitive.enabled") |
| .withDescription( |
| "Set whether the job (especially the sinks) is sensitive to " |
| + "INSERT messages and UPDATE_AFTER messages. " |
| + "If false, Flink may, sometimes (e.g. deduplication " |
| + "for last row), send UPDATE_AFTER instead of INSERT " |
| + "for the first row. If true, Flink will guarantee to " |
| + "send INSERT for the first row, in that case there " |
| + "will be additional overhead. Default is true."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| public static final ConfigOption<Boolean> |
| TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES_ENABLED = |
| ConfigOptions.key("table.exec.deduplicate.mini-batch.compact-changes-enabled") |
| .booleanType() |
| .defaultValue(false) |
| .withDeprecatedKeys( |
| "table.exec.deduplicate.mini-batch.compact-changes.enabled") |
| .withDescription( |
| "Set whether to compact the changes sent downstream in row-time " |
| + "mini-batch. If true, Flink will compact changes and send " |
| + "only the latest change downstream. Note that if the " |
| + "downstream needs the details of versioned data, this " |
| + "optimization cannot be applied. If false, Flink will send " |
| + "all changes to downstream just like when the mini-batch is " |
| + "not enabled."); |
| |
| /** @deprecated Use {@link #TABLE_EXEC_UID_GENERATION} instead. */ |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| @Deprecated |
| public static final ConfigOption<Boolean> TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS = |
| key("table.exec.legacy-transformation-uids") |
| .booleanType() |
| .defaultValue(false) |
| .withDescription( |
| "This flag has been replaced by table.exec.uid.generation. Use the enum " |
| + "value DISABLED to restore legacy behavior. However, the new " |
| + "default value should be sufficient for most use cases as " |
| + "only pipelines from compiled plans get UIDs assigned."); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| public static final ConfigOption<UidGeneration> TABLE_EXEC_UID_GENERATION = |
| key("table.exec.uid.generation") |
| .enumType(UidGeneration.class) |
| .defaultValue(UidGeneration.PLAN_ONLY) |
| .withDescription( |
| Description.builder() |
| .text( |
| "In order to remap state to operators during a restore, " |
| + "it is required that the pipeline's streaming " |
| + "transformations get a UID assigned.") |
| .linebreak() |
| .text( |
| "The planner can generate and assign explicit UIDs. If no " |
| + "UIDs have been set by the planner, the UIDs will " |
| + "be auto-generated by lower layers that can take " |
| + "the complete topology into account for uniqueness " |
| + "of the IDs. See the DataStream API for more information.") |
| .linebreak() |
| .text( |
| "This configuration option is for experts only and the default " |
| + "should be sufficient for most use cases. By default, " |
| + "only pipelines created from a persisted compiled plan will " |
| + "get UIDs assigned explicitly. Thus, these pipelines can " |
| + "be arbitrarily moved around within the same topology without " |
| + "affecting the stable UIDs.") |
| .build()); |
| |
| @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) |
| public static final ConfigOption<String> TABLE_EXEC_UID_FORMAT = |
| key("table.exec.uid.format") |
| .stringType() |
| .defaultValue("<id>_<transformation>") |
| .withDescription( |
| "Defines the format pattern for generating the UID of an ExecNode streaming transformation. " |
| + "The pattern can be defined globally or per-ExecNode in the compiled plan. " |
| + "Supported arguments are: <id> (from static counter), <type> (e.g. 'stream-exec-sink'), " |
| + "<version>, and <transformation> (e.g. 'constraint-validator' for a sink). " |
| + "In Flink 1.15.x the pattern was wrongly defined as '<id>_<type>_<version>_<transformation>' " |
| + "which would prevent migrations in the future."); |
| |
| // ------------------------------------------------------------------------------------------ |
| // Enum option types |
| // ------------------------------------------------------------------------------------------ |
| |
| /** The enforcer to guarantee NOT NULL column constraint when writing data into sink. */ |
| @PublicEvolving |
| public enum NotNullEnforcer implements DescribedEnum { |
| ERROR(text("Throw a runtime exception when writing null values into NOT NULL column.")), |
| DROP( |
| text( |
| "Drop records silently if a null value would have to be inserted " |
| + "into a NOT NULL column.")); |
| |
| private final InlineElement description; |
| |
| NotNullEnforcer(InlineElement description) { |
| this.description = description; |
| } |
| |
| @Internal |
| @Override |
| public InlineElement getDescription() { |
| return description; |
| } |
| } |
| |
| /** |
| * The enforcer to guarantee that length of CHAR/VARCHAR/BINARY/VARBINARY columns is respected |
| * when writing data into a sink. |
| */ |
| @PublicEvolving |
| public enum TypeLengthEnforcer implements DescribedEnum { |
| IGNORE( |
| text( |
| "Don't apply any trimming and padding, and instead " |
| + "ignore the CHAR/VARCHAR/BINARY/VARBINARY length directive.")), |
| TRIM_PAD( |
| text( |
| "Trim and pad string and binary values to match the length " |
| + "defined by the CHAR/VARCHAR/BINARY/VARBINARY length.")); |
| |
| private final InlineElement description; |
| |
| TypeLengthEnforcer(InlineElement description) { |
| this.description = description; |
| } |
| |
| @Internal |
| @Override |
| public InlineElement getDescription() { |
| return description; |
| } |
| } |
| |
| /** Upsert materialize strategy before sink. */ |
| @PublicEvolving |
| public enum UpsertMaterialize { |
| |
| /** In no case will materialize operator be added. */ |
| NONE, |
| |
| /** Add materialize operator when a distributed disorder occurs on unique keys. */ |
| AUTO, |
| |
| /** Add materialize operator in any case. */ |
| FORCE |
| } |
| |
| /** Shuffle by primary key before sink. */ |
| @PublicEvolving |
| public enum SinkKeyedShuffle { |
| |
| /** No keyed shuffle will be added for sink. */ |
| NONE, |
| |
| /** Auto add keyed shuffle when the sink's parallelism differs from upstream operator. */ |
| AUTO, |
| |
| /** Add keyed shuffle in any case except single parallelism. */ |
| FORCE |
| } |
| |
| /** Output mode for asynchronous operations, equivalent to {@see AsyncDataStream.OutputMode}. */ |
| @PublicEvolving |
| public enum AsyncOutputMode { |
| |
| /** Ordered output mode, equivalent to {@see AsyncDataStream.OutputMode.ORDERED}. */ |
| ORDERED, |
| |
| /** |
| * Allow unordered output mode, will attempt to use {@see |
| * AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the |
| * result, otherwise ORDERED will be still used. |
| */ |
| ALLOW_UNORDERED |
| } |
| |
| /** Determine if CAST operates using the legacy behaviour or the new one. */ |
| @Deprecated |
| public enum LegacyCastBehaviour implements DescribedEnum { |
| ENABLED(true, text("CAST will operate following the legacy behaviour.")), |
| DISABLED(false, text("CAST will operate following the new correct behaviour.")); |
| |
| private final boolean enabled; |
| private final InlineElement description; |
| |
| LegacyCastBehaviour(boolean enabled, InlineElement description) { |
| this.enabled = enabled; |
| this.description = description; |
| } |
| |
| @Internal |
| @Override |
| public InlineElement getDescription() { |
| return description; |
| } |
| |
| public boolean isEnabled() { |
| return enabled; |
| } |
| } |
| |
| /** |
| * Strategy for generating transformation UIDs for remapping state to operators during restore. |
| */ |
| @PublicEvolving |
| public enum UidGeneration implements DescribedEnum { |
| PLAN_ONLY( |
| text( |
| "Sets UIDs on streaming transformations if and only if the pipeline definition " |
| + "comes from a compiled plan. Pipelines that have been constructed in " |
| + "the API without a compilation step will not set an explicit UID as " |
| + "it might not be stable across multiple translations.")), |
| ALWAYS( |
| text( |
| "Always sets UIDs on streaming transformations. This strategy is for experts only! " |
| + "Pipelines that have been constructed in the API without a compilation " |
| + "step might not be able to be restored properly. The UID generation " |
| + "depends on previously declared pipelines (potentially across jobs " |
| + "if the same JVM is used). Thus, a stable environment must be ensured. " |
| + "Pipeline definitions that come from a compiled plan are safe to use.")), |
| |
| DISABLED(text("No explicit UIDs will be set.")); |
| |
| private final InlineElement description; |
| |
| UidGeneration(InlineElement description) { |
| this.description = description; |
| } |
| |
| @Internal |
| @Override |
| public InlineElement getDescription() { |
| return description; |
| } |
| } |
| } |