blob: 078b434fadedf82bcb9f622cb44604ba4ca48913 [file] [log] [blame]
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>table.exec.async-lookup.buffer-capacity</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>The max number of async i/o operation that the async lookup join can trigger.</td>
</tr>
<tr>
<td><h5>table.exec.async-lookup.output-mode</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">ORDERED</td>
<td><p>Enum</p></td>
<td>Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the result, otherwise ORDERED will be still used.<br /><br />Possible values:<ul><li>"ORDERED"</li><li>"ALLOW_UNORDERED"</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.async-lookup.timeout</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">3 min</td>
<td>Duration</td>
<td>The async timeout for the asynchronous operation to complete.</td>
</tr>
<tr>
<td><h5>table.exec.async-scalar.buffer-capacity</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>The max number of async i/o operation that the async lookup join can trigger.</td>
</tr>
<tr>
<td><h5>table.exec.async-scalar.max-attempts</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>The max number of async retry attempts to make before task execution is failed.</td>
</tr>
<tr>
<td><h5>table.exec.async-scalar.retry-delay</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">100 ms</td>
<td>Duration</td>
<td>The delay to wait before trying again.</td>
</tr>
<tr>
<td><h5>table.exec.async-scalar.retry-strategy</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">FIXED_DELAY</td>
<td><p>Enum</p></td>
<td>Restart strategy which will be used, FIXED_DELAY by default.<br /><br />Possible values:<ul><li>"NO_RETRY"</li><li>"FIXED_DELAY"</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.async-scalar.timeout</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">3 min</td>
<td>Duration</td>
<td>The async timeout for the asynchronous operation to complete.</td>
</tr>
<tr>
<td><h5>table.exec.deduplicate.insert-update-after-sensitive-enabled</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Set whether the job (especially the sinks) is sensitive to INSERT messages and UPDATE_AFTER messages. If false, Flink may, sometimes (e.g. deduplication for last row), send UPDATE_AFTER instead of INSERT for the first row. If true, Flink will guarantee to send INSERT for the first row, in that case there will be additional overhead. Default is true.</td>
</tr>
<tr>
<td><h5>table.exec.deduplicate.mini-batch.compact-changes-enabled</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Set whether to compact the changes sent downstream in row-time mini-batch. If true, Flink will compact changes and send only the latest change downstream. Note that if the downstream needs the details of versioned data, this optimization cannot be applied. If false, Flink will send all changes to downstream just like when the mini-batch is not enabled.</td>
</tr>
<tr>
<td><h5>table.exec.disabled-operators</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Mainly for testing. A comma-separated list of operator names, each name represents a kind of disabled operator.
Operators that can be disabled include "NestedLoopJoin", "ShuffleHashJoin", "BroadcastHashJoin", "SortMergeJoin", "HashAgg", "SortAgg".
By default no operator is disabled.</td>
</tr>
<tr>
<td><h5>table.exec.interval-join.min-cleanup-interval</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">0 ms</td>
<td>Duration</td>
<td>Specifies a minimum time interval for how long cleanup unmatched records in the interval join operator. Before Flink 1.18, the default value of this param was the half of interval duration. Note: Set this option greater than 0 will cause unmatched records in outer joins to be output later than watermark, leading to possible discarding of these records by downstream watermark-dependent operators, such as window operators. The default value is 0, which means it will clean up unmatched records immediately.</td>
</tr>
<tr>
<td><h5>table.exec.legacy-cast-behaviour</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">DISABLED</td>
<td><p>Enum</p></td>
<td>Determines whether CAST will operate following the legacy behaviour or the new one that introduces various fixes and improvements.<br /><br />Possible values:<ul><li>"ENABLED": CAST will operate following the legacy behaviour.</li><li>"DISABLED": CAST will operate following the new correct behaviour.</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">0.5</td>
<td>Double</td>
<td>The distinct value rate can be defined as the number of local aggregation results for the sampled data divided by the sampling threshold (see table.exec.local-hash-agg.adaptive.sampling-threshold). If the computed result is lower than the given configuration value, the remaining input records proceed to do local aggregation, otherwise the remaining input records are subjected to simple projection which calculation cost is less than local aggregation. The default value is 0.5.</td>
</tr>
<tr>
<td><h5>table.exec.local-hash-agg.adaptive.enabled</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to enable adaptive local hash aggregation. Adaptive local hash aggregation is an optimization of local hash aggregation, which can adaptively determine whether to continue to do local hash aggregation according to the distinct value rate of sampling data. If distinct value rate bigger than defined threshold (see parameter: table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold), we will stop aggregating and just send the input data to the downstream after a simple projection. Otherwise, we will continue to do aggregation. Adaptive local hash aggregation only works in batch mode. Default value of this parameter is true.</td>
</tr>
<tr>
<td><h5>table.exec.local-hash-agg.adaptive.sampling-threshold</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">500000</td>
<td>Long</td>
<td>If adaptive local hash aggregation is enabled, this value defines how many records will be used as sampled data to calculate distinct value rate (see parameter: table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold) for the local aggregate. The higher the sampling threshold, the more accurate the distinct value rate is. But as the sampling threshold increases, local aggregation is meaningless when the distinct values rate is low. The default value is 500000.</td>
</tr>
<tr>
<td><h5>table.exec.mini-batch.allow-latency</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">0 ms</td>
<td>Duration</td>
<td>The maximum latency can be used for MiniBatch to buffer input records. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: If table.exec.mini-batch.enabled is set true, its value must be greater than zero.</td>
</tr>
<tr>
<td><h5>table.exec.mini-batch.enabled</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Specifies whether to enable MiniBatch optimization. MiniBatch is an optimization to buffer input records to reduce state access. This is disabled by default. To enable this, users should set this config to true. NOTE: If mini-batch is enabled, 'table.exec.mini-batch.allow-latency' and 'table.exec.mini-batch.size' must be set.</td>
</tr>
<tr>
<td><h5>table.exec.mini-batch.size</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">-1</td>
<td>Long</td>
<td>The maximum number of input records can be buffered for MiniBatch. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: MiniBatch only works for non-windowed aggregations currently. If table.exec.mini-batch.enabled is set true, its value must be positive.</td>
</tr>
<tr>
<td><h5>table.exec.operator-fusion-codegen.enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, multiple physical operators will be compiled into a single operator by planner which can improve the performance.</td>
</tr>
<tr>
<td><h5>table.exec.rank.topn-cache-size</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">10000</td>
<td>Long</td>
<td>Rank operators have a cache which caches partial state contents to reduce state access. Cache size is the number of records in each ranking task.</td>
</tr>
<tr>
<td><h5>table.exec.resource.default-parallelism</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
<td>Sets default parallelism for all operators (such as aggregate, join, filter) to run with parallel instances. This config has a higher priority than parallelism of StreamExecutionEnvironment (actually, this config overrides the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no default parallelism is set, then it will fallback to use the parallelism of StreamExecutionEnvironment.</td>
</tr>
<tr>
<td><h5>table.exec.simplify-operator-name-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>When it is true, the optimizer will simplify the operator name with id and type of ExecNode and keep detail in description. Default value is true.</td>
</tr>
<tr>
<td><h5>table.exec.sink.keyed-shuffle</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">AUTO</td>
<td><p>Enum</p></td>
<td>In order to minimize the distributed disorder problem when writing data into table with primary keys that many users suffers. FLINK will auto add a keyed shuffle by default when the sink parallelism differs from upstream operator and sink parallelism is not 1. This works only when the upstream ensures the multi-records' order on the primary key, if not, the added shuffle can not solve the problem (In this situation, a more proper way is to consider the deduplicate operation for the source firstly or use an upsert source with primary key definition which truly reflect the records evolution).<br />By default, the keyed shuffle will be added when the sink's parallelism differs from upstream operator. You can set to no shuffle(NONE) or force shuffle(FORCE).<br /><br />Possible values:<ul><li>"NONE"</li><li>"AUTO"</li><li>"FORCE"</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.sink.not-null-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">ERROR</td>
<td><p>Enum</p></td>
<td>Determines how Flink enforces NOT NULL column constraints when inserting null values.<br /><br />Possible values:<ul><li>"ERROR": Throw a runtime exception when writing null values into NOT NULL column.</li><li>"DROP": Drop records silently if a null value would have to be inserted into a NOT NULL column.</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.sink.rowtime-inserter</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">ENABLED</td>
<td><p>Enum</p></td>
<td>Some sink implementations require a single rowtime attribute in the input that can be inserted into the underlying stream record. This option allows disabling the timestamp insertion and avoids errors around multiple time attributes being present in the query schema.<br /><br />Possible values:<ul><li>"ENABLED": Insert a rowtime attribute (if available) into the underlying stream record. This requires at most one time attribute in the input for the sink.</li><li>"DISABLED": Do not insert the rowtime attribute into the underlying stream record.</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.sink.type-length-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">IGNORE</td>
<td><p>Enum</p></td>
<td>Determines whether values for columns with CHAR(&lt;length&gt;)/VARCHAR(&lt;length&gt;)/BINARY(&lt;length&gt;)/VARBINARY(&lt;length&gt;) types will be trimmed or padded (only for CHAR(&lt;length&gt;)/BINARY(&lt;length&gt;)), so that their length will match the one defined by the length of their respective CHAR/VARCHAR/BINARY/VARBINARY column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR/BINARY/VARBINARY length directive.</li><li>"TRIM_PAD": Trim and pad string and binary values to match the length defined by the CHAR/VARCHAR/BINARY/VARBINARY length.</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.sink.upsert-materialize</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">AUTO</td>
<td><p>Enum</p></td>
<td>Because of the disorder of ChangeLog data caused by Shuffle in distributed system, the data received by Sink may not be the order of global upsert. So add upsert materialize operator before upsert sink. It receives the upstream changelog records and generate an upsert view for the downstream.<br />By default, the materialize operator will be added when a distributed disorder occurs on unique keys. You can also choose no materialization(NONE) or force materialization(FORCE).<br /><br />Possible values:<ul><li>"NONE"</li><li>"AUTO"</li><li>"FORCE"</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.sort.async-merge-enabled</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to asynchronously merge sorted spill files.</td>
</tr>
<tr>
<td><h5>table.exec.sort.default-limit</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
<td>Default limit when user don't set a limit after order by. -1 indicates that this configuration is ignored.</td>
</tr>
<tr>
<td><h5>table.exec.sort.max-num-file-handles</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">128</td>
<td>Integer</td>
<td>The maximal fan-in for external merge sort. It limits the number of file handles per operator. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading.</td>
</tr>
<tr>
<td><h5>table.exec.source.cdc-events-duplicate</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Indicates whether the CDC (Change Data Capture) sources in the job will produce duplicate change events that requires the framework to deduplicate and get consistent result. CDC source refers to the source that produces full change events, including INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE, for example Kafka source with Debezium format. The value of this configuration is false by default.<br /><br />However, it's a common case that there are duplicate change events. Because usually the CDC tools (e.g. Debezium) work in at-least-once delivery when failover happens. Thus, in the abnormal situations Debezium may deliver duplicate change events to Kafka and Flink will get the duplicate events. This may cause Flink query to get wrong results or unexpected exceptions.<br /><br />Therefore, it is recommended to turn on this configuration if your CDC tool is at-least-once delivery. Enabling this configuration requires to define PRIMARY KEY on the CDC sources. The primary key will be used to deduplicate change events and generate normalized changelog stream at the cost of an additional stateful operator.</td>
</tr>
<tr>
<td><h5>table.exec.source.idle-timeout</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">0 ms</td>
<td>Duration</td>
<td>When a source do not receive any elements for the timeout time, it will be marked as temporarily idle. This allows downstream tasks to advance their watermarks without the need to wait for watermarks from this source while it is idle. Default value is 0, which means detecting source idleness is not enabled.</td>
</tr>
<tr>
<td><h5>table.exec.spill-compression.block-size</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">64 kb</td>
<td>MemorySize</td>
<td>The memory size used to do compress when spilling data. The larger the memory, the higher the compression ratio, but more memory resource will be consumed by the job.</td>
</tr>
<tr>
<td><h5>table.exec.spill-compression.enabled</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to compress spilled data. Currently we only support compress spilled data for sort and hash-agg and hash-join operators.</td>
</tr>
<tr>
<td><h5>table.exec.state.ttl</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">0 ms</td>
<td>Duration</td>
<td>Specifies a minimum time interval for how long idle state (i.e. state which was not updated), will be retained. State will never be cleared until it was idle for less than the minimum time, and will be cleared at some time after it was idle. Default is never clean-up the state. NOTE: Cleaning up state requires additional overhead for bookkeeping. Default value is 0, which means that it will never clean up state.</td>
</tr>
<tr>
<td><h5>table.exec.uid.format</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">"&lt;id&gt;_&lt;transformation&gt;"</td>
<td>String</td>
<td>Defines the format pattern for generating the UID of an ExecNode streaming transformation. The pattern can be defined globally or per-ExecNode in the compiled plan. Supported arguments are: &lt;id&gt; (from static counter), &lt;type&gt; (e.g. 'stream-exec-sink'), &lt;version&gt;, and &lt;transformation&gt; (e.g. 'constraint-validator' for a sink). In Flink 1.15.x the pattern was wrongly defined as '&lt;id&gt;_&lt;type&gt;_&lt;version&gt;_&lt;transformation&gt;' which would prevent migrations in the future.</td>
</tr>
<tr>
<td><h5>table.exec.uid.generation</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">PLAN_ONLY</td>
<td><p>Enum</p></td>
<td>In order to remap state to operators during a restore, it is required that the pipeline's streaming transformations get a UID assigned.<br />The planner can generate and assign explicit UIDs. If no UIDs have been set by the planner, the UIDs will be auto-generated by lower layers that can take the complete topology into account for uniqueness of the IDs. See the DataStream API for more information.<br />This configuration option is for experts only and the default should be sufficient for most use cases. By default, only pipelines created from a persisted compiled plan will get UIDs assigned explicitly. Thus, these pipelines can be arbitrarily moved around within the same topology without affecting the stable UIDs.<br /><br />Possible values:<ul><li>"PLAN_ONLY": Sets UIDs on streaming transformations if and only if the pipeline definition comes from a compiled plan. Pipelines that have been constructed in the API without a compilation step will not set an explicit UID as it might not be stable across multiple translations.</li><li>"ALWAYS": Always sets UIDs on streaming transformations. This strategy is for experts only! Pipelines that have been constructed in the API without a compilation step might not be able to be restored properly. The UID generation depends on previously declared pipelines (potentially across jobs if the same JVM is used). Thus, a stable environment must be ensured. Pipeline definitions that come from a compiled plan are safe to use.</li><li>"DISABLED": No explicit UIDs will be set.</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.window-agg.buffer-size-limit</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">100000</td>
<td>Integer</td>
<td>Sets the window elements buffer size limit used in group window agg operator.</td>
</tr>
</tbody>
</table>