blob: 8be5222df653471c581ff73cd4a076361dad82b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.api;
import org.apache.flink.configuration.ConfigOption;
import static org.apache.flink.configuration.ConfigOptions.key;
/**
* This class holds configuration constants used by Flink's table module.
*/
public class TableConfigOptions {
// ------------------------------------------------------------------------
// Optimizer Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> SQL_OPTIMIZER_JOIN_REORDER_ENABLED =
key("sql.optimizer.join-reorder.enabled")
.defaultValue(false)
.withDescription("Enables join reorder in optimizer cbo. Default is disabled.");
public static final ConfigOption<String> SQL_OPTIMIZER_AGG_PHASE_ENFORCER =
key("sql.optimizer.agg.phase.enforcer")
.defaultValue("NONE")
.withDescription("Strategy for agg phase. Only NONE, TWO_PHASE or ONE_PHASE can be set.\n" +
"NONE: No special enforcer for aggregate stage. Whether to choose two stage aggregate or one" +
" stage aggregate depends on cost. \n" +
"TWO_PHASE: Enforce to use two stage aggregate which has localAggregate and globalAggregate. " +
"NOTE: If aggregate call does not support split into two phase, still use one stage aggregate.\n" +
"ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggregate.");
public static final ConfigOption<Integer> SQL_OPTIMIZER_CNF_NODES_LIMIT =
key("sql.optimizer.cnf.nodes.limit")
.defaultValue(-1)
.withDescription("When converting to conjunctive normal form (CNF), fail if the expression" +
" exceeds this threshold; the threshold is expressed in terms of number of nodes " +
"(only count RexCall node, including leaves and interior nodes). Negative number to" +
" use the default threshold: double of number of nodes.");
public static final ConfigOption<Boolean> SQL_OPTIMIZER_SHUFFLE_PARTIAL_KEY_ENABLED =
key("sql.optimizer.shuffle.partial-key.enabled")
.defaultValue(false)
.withDescription("Enables shuffle by partial partition keys. " +
"For example, A join with join condition: L.c1 = R.c1 and L.c2 = R.c2. " +
"If this flag is enabled, there are 3 shuffle strategy:\n " +
"1. L and R shuffle by c1 \n" +
"2. L and R shuffle by c2\n" +
"3. L and R shuffle by c1 and c2\n" +
"It can reduce some shuffle cost someTimes.");
public static final ConfigOption<Long> SQL_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD =
key("sql.optimizer.join.null.filter.threshold")
.defaultValue(2000000L)
.withDescription("If the source of InnerJoin has nullCount more than this value, " +
"it will add a null filter (possibly be pushDowned) before the join, filter" +
" null values to avoid the impact of null values on the single join node.");
public static final ConfigOption<Double> SQL_OPTIMIZER_SEMI_JOIN_BUILD_DISTINCT_NDV_RATIO =
key("sql.optimizer.semi-join.build-distinct.ndv-ratio")
.defaultValue(0.8)
.withDescription("When the semi-side of semiJoin can distinct a lot of data in advance," +
" we will add distinct node before semiJoin.");
public static final ConfigOption<Boolean> SQL_OPTIMIZER_SUBSECTION_UNIONALL_AS_BREAKPOINT_DISABLED =
key("sql.optimizer.subsection.unionall-as-breakpoint.disabled")
.defaultValue(false)
.withDescription("Disable union all as breakpoint in subsection optimization");
public static final ConfigOption<Boolean> SQL_OPTIMIZER_PLAN_DUMP_ENABLED =
key("sql.optimizer.plan.dump.enabled")
.defaultValue(false)
.withDescription("If dump optimized plan is enabled, the optimized plan need to be dump.");
public static final ConfigOption<String> SQL_OPTIMIZER_PLAN_DUMP_PATH =
key("sql.optimizer.plan.dump.path")
.noDefaultValue()
.withDescription("Sets the file path to dump optimized plan.");
public static final ConfigOption<Boolean> SQL_OPTIMIZER_REUSE_SUB_PLAN_ENABLED =
key("sql.optimizer.reuse.sub-plan.enabled")
.defaultValue(false)
.withDescription("When true, the optimizer will try to find out duplicated " +
"sub-plan and reuse them.");
public static final ConfigOption<Boolean> SQL_OPTIMIZER_REUSE_TABLE_SOURCE_ENABLED =
key("sql.optimizer.reuse.table-source.enabled")
.defaultValue(false)
.withDescription("When true, the optimizer will try to find out duplicated table-source and reuse them. " +
"This works only when " + SQL_OPTIMIZER_REUSE_SUB_PLAN_ENABLED + " is true.");
public static final ConfigOption<Boolean> SQL_OPTIMIZER_REUSE_NONDETERMINISTIC_OPERATOR_ENABLED =
key("sql.optimizer.reuse.nondeterministic-operator.enabled")
.defaultValue(false)
.withDescription("When true, the optimizer will try to find out duplicated nondeterministic-operator and" +
" reuse them. This works only when " + SQL_OPTIMIZER_REUSE_SUB_PLAN_ENABLED + " is true. \n" +
"Nondeterministic-operator contains :\n" +
"1. nondeterministic [[ScalarFunction]] (UDF, e.g. now)\n" +
"2. nondeterministic [[AggregateFunction]](UDAF)\n" +
"3. nondeterministic [[TableFunction]] (UDTF)");
public static final ConfigOption<Boolean> SQL_OPTIMIZER_DATA_SKEW_DISTINCT_AGG =
key("sql.optimizer.data-skew.distinct-agg")
.defaultValue(false)
.withDescription("Tell the optimizer whether there exists data skew in distinct aggregation\n"
+ "so as to enable the aggregation split optimization.");
public static final ConfigOption<Integer> SQL_OPTIMIZER_DATA_SKEW_DISTINCT_AGG_BUCKET =
key("sql.optimizer.data-skew.distinct-agg.bucket")
.defaultValue(1024)
.withDescription("Configure the number of buckets when splitting distinct aggregation.");
public static final ConfigOption<Boolean> SQL_OPTIMIZER_SMJ_REMOVE_SORT_ENABLE =
key("sql.optimizer.smj.remove-sort.enable")
.defaultValue(false)
.withDescription("When true, the optimizer will try to remove redundant sort for SortMergeJoin. " +
"However that will increase optimization time. Default value is false.");
// ------------------------------------------------------------------------
// CodGen Options
// ------------------------------------------------------------------------
public static final ConfigOption<Integer> SQL_CODEGEN_LENGTH_MAX =
key("sql.codegen.length.max")
.defaultValue(48 * 1024)
.withDescription("Generated code will be split if code length exceeds this limitation.");
// ------------------------------------------------------------------------
// Sort Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> SQL_EXEC_SORT_RANGE_ENABLED =
key("sql.exec.sort.range.enabled")
.defaultValue(false)
.withDescription("Sets whether to enable range sort, use range sort to sort all data in several partitions." +
"When it is false, sorting in only one partition");
public static final ConfigOption<Integer> SQL_EXEC_SORT_DEFAULT_LIMIT =
key("sql.exec.sort.default.limit")
.defaultValue(200)
.withDescription("Default limit when user don't set a limit after order by. " +
"This default value will be invalidated if " + SQL_EXEC_SORT_RANGE_ENABLED + " is set to be true.");
public static final ConfigOption<Integer> SQL_EXEC_SORT_FILE_HANDLES_MAX_NUM =
key("sql.exec.sort.file-handles.max.num")
.defaultValue(128)
.withDescription("Sort merge's maximum number of roads, too many roads, may cause too many files to be" +
" read at the same time, resulting in excessive use of memory.");
public static final ConfigOption<Boolean> SQL_EXEC_SORT_ASYNC_MERGE_ENABLED =
key("sql.exec.sort.async-merge.enabled")
.defaultValue(true)
.withDescription("Whether to asynchronously merge sort spill files.");
public static final ConfigOption<Boolean> SQL_EXEC_SORT_NON_TEMPORAL_ENABLED =
key("sql.exec.sort.non-temporal.enabled")
.defaultValue(false)
.withDescription("Switch on/off stream sort without temporal or limit." +
"Set whether to enable universal sort for stream. When it is false, " +
"universal sort can't use for stream, default false. Just for testing.");
// ------------------------------------------------------------------------
// Spill Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> SQL_EXEC_SPILL_COMPRESSION_ENABLED =
key("sql.exec.spill.compression.enabled")
.defaultValue(true)
.withDescription("Whether to compress spilled data. (Now include sort and hash agg and hash join)");
public static final ConfigOption<String> SQL_EXEC_SPILL_COMPRESSION_CODEC =
key("sql.exec.spill.compression.codec")
.defaultValue("lz4")
.withDescription("Use that compression codec to compress spilled file. Now we support lz4, gzip, bzip2.");
public static final ConfigOption<Integer> SQL_EXEC_SPILL_COMPRESSION_BLOCK_SIZE =
key("sql.exec.spill.compression.block-size")
.defaultValue(64 * 1024)
.withDescription("The buffer is to compress. The larger the buffer," +
" the better the compression ratio, but the more memory consumption.");
// ------------------------------------------------------------------------
// Join Options
// ------------------------------------------------------------------------
public static final ConfigOption<Long> SQL_EXEC_HASH_JOIN_BROADCAST_THRESHOLD =
key("sql.exec.hash-join.broadcast.threshold")
.defaultValue(1024 * 1024L)
.withDescription("Maximum size in bytes for data that could be broadcast to each parallel instance " +
"that holds a partition of all data when performing a hash join. " +
"Broadcast will be disabled if the value is -1.");
// ------------------------------------------------------------------------
// Agg Options
// ------------------------------------------------------------------------
public static final ConfigOption<Integer> SQL_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT =
key("sql.exec.window-agg.buffer-size.limit")
.defaultValue(100 * 1000)
.withDescription("Sets the window elements buffer limit in size used in group window agg operator.");
public static final ConfigOption<Boolean> SQL_EXEC_INCREMENTAL_AGG_ENABLED =
key("sql.exec.incremental-agg.enabled")
.defaultValue(true)
.withDescription("Whether to enable incremental aggregate.");
// ------------------------------------------------------------------------
// topN Options
// ------------------------------------------------------------------------
public static final ConfigOption<Long> SQL_EXEC_TOPN_CACHE_SIZE =
key("sql.exec.topn.cache.size")
.defaultValue(10000L)
.withDescription("Cache size of every topn task.");
// ------------------------------------------------------------------------
// Runtime Filter Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> SQL_EXEC_RUNTIME_FILTER_ENABLED =
key("sql.exec.runtime-filter.enabled")
.defaultValue(false)
.withDescription("Runtime filter for hash join. The Build side of HashJoin will " +
"build a bloomFilter in advance to filter the data on the probe side.");
public static final ConfigOption<Boolean> SQL_EXEC_RUNTIME_FILTER_WAIT =
key("sql.exec.runtime-filter.wait")
.defaultValue(false)
.withDescription("Weather to let probe side to wait bloom filter.");
public static final ConfigOption<Integer> SQL_EXEC_RUNTIME_FILTER_SIZE_MAX =
key("sql.exec.runtime-filter.size.max.mb")
.defaultValue(10)
.withDescription("The max size of MB to BloomFilter. A too large BloomFilter will cause " +
"the JobMaster bandwidth to fill up and affect scheduling.");
public static final ConfigOption<Double> SQL_EXEC_RUNTIME_FILTER_PROBE_FILTER_DEGREE_MIN =
key("sql.exec.runtime-filter.probe.filter-degree.min")
.defaultValue(0.5)
.withDescription("The minimum filtering degree of the probe side to enable runtime filter." +
"(1 - buildNdv / probeNdv) * (1 - minFpp) >= minProbeFilter.");
public static final ConfigOption<Long> SQL_EXEC_RUNTIME_FILTER_PROBE_ROW_COUNT_MIN =
key("sql.exec.runtime-filter.probe.row-count.min")
.defaultValue(100000000L)
.withDescription("The minimum row count of probe side to enable runtime filter." +
"Probe.rowCount >= minProbeRowCount.");
public static final ConfigOption<Double> SQL_EXEC_RUNTIME_FILTER_BUILD_PROBE_ROW_COUNT_RATIO_MAX =
key("sql.exec.runtime-filter.build-probe.row-count-ratio.max")
.defaultValue(0.5)
.withDescription("The rowCount of the build side and the rowCount of the probe should " +
"have a certain ratio before using the RuntimeFilter. " +
"Builder.rowCount / probe.rowCount <= maxRowCountRatio.");
public static final ConfigOption<Integer> SQL_EXEC_RUNTIME_FILTER_ROW_COUNT_NUM_BITS_RATIO =
key("sql.exec.runtime-filter.row-count.num-bits.ratio")
.defaultValue(40)
.withDescription("A ratio between the probe row count and the BloomFilter size. If the " +
"probe row count is too small, we should not use too large BloomFilter. maxBfBits = " +
"Math.min(probeRowCount / ratioOfRowAndBits, " + SQL_EXEC_RUNTIME_FILTER_SIZE_MAX + ")");
public static final ConfigOption<Double> SQL_EXEC_RUNTIME_FILTER_BUILDER_PUSH_DOWN_RATIO_MAX =
key("sql.exec.runtime-filter.builder.push-down.ratio.max")
.defaultValue(1.2)
.withDescription("If the join key is the same, we can push down the BloomFilter" +
" builder to the input node build, but we need to ensure that the NDV " +
"and row count doesn't change much. " +
"PushDownNdv / ndv <= maxRatio && pushDownRowCount / rowCount <= maxRatio.");
// ------------------------------------------------------------------------
// MiniBatch Options
// ------------------------------------------------------------------------
public static final ConfigOption<Long> SQL_EXEC_MINIBATCH_ALLOW_LATENCY =
key("sql.exec.mini-batch.allow-latency.ms")
.defaultValue(Long.MIN_VALUE)
.withDescription("MiniBatch allow latency(ms). Value > 0 means MiniBatch enabled.");
public static final ConfigOption<Long> SQL_EXEC_MINIBATCH_SIZE =
key("sql.exec.mini-batch.size")
.defaultValue(Long.MIN_VALUE)
.withDescription("The maximum number of inputs that MiniBatch buffer can accommodate.");
public static final ConfigOption<Boolean> SQL_EXEC_MINIBATCH_JOIN_ENABLED =
key("sql.exec.mini-batch.join.enabled")
.defaultValue(true)
.withDescription("Whether to enable miniBatch join.");
public static final ConfigOption<Boolean> SQL_EXEC_MINI_BATCH_FLUSH_BEFORE_SNAPSHOT =
key("sql.exec.mini-batch.flush-before-snapshot")
.defaultValue(true)
.withDescription("Whether to enable flushing buffered data before snapshot.");
// ------------------------------------------------------------------------
// Source Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> SQL_EXEC_SOURCE_VALUES_INPUT_ENABLED =
key("sql.exec.source.values-input.enabled")
.defaultValue(false)
.withDescription("Whether support values source input. The reason for disabling this " +
"feature is that checkpoint will not work properly when source finished.");
// ------------------------------------------------------------------------
// State Options
// ------------------------------------------------------------------------
public static final ConfigOption<Long> SQL_EXEC_STATE_TTL_MS =
key("sql.exec.state.ttl.ms")
.defaultValue(Long.MIN_VALUE)
.withDescription("The minimum time until state that was not updated will be retained. State" +
" might be cleared and removed if it was not updated for the defined period of time.");
public static final ConfigOption<Long> SQL_EXEC_STATE_TTL_MAX_MS =
key("sql.exec.state.ttl.max.ms")
.defaultValue(Long.MIN_VALUE)
.withDescription("The maximum time until state which was not updated will be retained." +
"State will be cleared and removed if it was not updated for the defined " +
"period of time.");
// ------------------------------------------------------------------------
// Other Exec Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> SQL_EXEC_DISABLED_OPERATORS =
key("sql.exec.disabled-operators")
.defaultValue("")
.withDescription("Mainly for testing. A comma-separated list of name of the OperatorType, each name " +
"means a kind of disabled operator. Its default value is empty that means no operators are disabled. " +
"If the configure's value is \"NestedLoopJoin, ShuffleHashJoin\", NestedLoopJoin and ShuffleHashJoin " +
"are disabled. If the configure's value is \"HashJoin\", " +
"ShuffleHashJoin and BroadcastHashJoin are disabled.");
public static final ConfigOption<Boolean> SQL_EXEC_DATA_EXCHANGE_MODE_ALL_BATCH =
key("sql.exec.data-exchange-mode.all-batch")
.defaultValue(true)
.withDescription("Sets Whether all data-exchange-mode is batch.");
public static final ConfigOption<Boolean> SQL_EXEC_OPERATOR_METRIC_DUMP_ENABLED =
key("sql.exec.operator-metric.dump.enabled")
.defaultValue(false)
.withDescription("If dump operator metric is enabled, the operator metrics need to " +
"be dump during runtime phase.");
public static final ConfigOption<String> SQL_EXEC_OPERATOR_METRIC_DUMP_PATH =
key("sql.exec.operator-metric.dump.path")
.noDefaultValue()
.withDescription("Sets the file path to dump stream graph plan with operator metrics." +
"Default is null.");
// ------------------------------------------------------------------------
// resource Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> SQL_RESOURCE_INFER_MODE =
key("sql.resource.infer.mode")
.defaultValue("NONE")
.withDescription("Sets infer resource mode according to statics. Only NONE, ONLY_SOURCE or ALL can be set.\n" +
"If set NONE, parallelism and memory of all node are set by config.\n" +
"If set ONLY_SOURCE, only source parallelism is inferred according to statics.\n" +
"If set ALL, parallelism and memory of all node are inferred according to statics.\n");
public static final ConfigOption<Integer> SQL_RESOURCE_HASH_AGG_TABLE_MEM =
key("sql.resource.hash-agg.table.memory.mb")
.defaultValue(32)
.withDescription("Sets the table reserved memory size of hashAgg operator. It defines the lower limit.");
public static final ConfigOption<Integer> SQL_RESOURCE_HASH_JOIN_TABLE_MEM =
key("sql.resource.hash-join.table.memory.mb")
.defaultValue(32)
.withDescription("Sets the HashTable reserved memory for hashJoin operator. It defines the lower limit for.");
public static final ConfigOption<Integer> SQL_RESOURCE_SORT_BUFFER_MEM =
key("sql.resource.sort.buffer.memory.mb")
.defaultValue(32)
.withDescription("Sets the buffer reserved memory size for sort. It defines the lower limit for the sort.");
public static final ConfigOption<Integer> SQL_RESOURCE_EXTERNAL_BUFFER_MEM =
key("sql.resource.external-buffer.memory.mb")
.defaultValue(10)
.withDescription("Sets the externalBuffer memory size that is used in sortMergeJoin and overWindow.");
// ------------------------------------------------------------------------
// prefer and max memory resource Options
// ------------------------------------------------------------------------
public static final ConfigOption<Integer> SQL_RESOURCE_SORT_BUFFER_PREFER_MEM =
key("sql.resource.sort.buffer-prefer-memory-mb")
.defaultValue(128)
.withDescription("Sets the preferred buffer memory size for sort. " +
"It defines the applied memory for the sort.");
public static final ConfigOption<Integer> SQL_RESOURCE_SORT_BUFFER_MAX_MEM =
key("sql.resource.sort.buffer-max-memory-mb")
.defaultValue(512)
.withDescription("Sets the max buffer memory size for sort. It defines the upper memory for the sort.");
public static final ConfigOption<Integer> SQL_RESOURCE_HASH_JOIN_TABLE_PREFER_MEM =
key("sql.resource.hash-join.table-prefer-memory-mb")
.defaultValue(128)
.withDescription("Sets the HashTable preferred memory for hashJoin operator. It defines the upper limit.");
public static final ConfigOption<Integer> SQL_RESOURCE_HASH_JOIN_TABLE_MAX_MEM =
key("sql.resource.hash-join.table-max-memory-mb")
.defaultValue(512)
.withDescription("Sets the HashTable max memory for hashJoin operator. It defines the upper limit.");
public static final ConfigOption<Integer> SQL_RESOURCE_HASH_AGG_TABLE_PREFER_MEM =
key("sql.resource.hash-agg.table-prefer-memory-mb")
.defaultValue(128)
.withDescription("Sets the table preferred memory size of hashAgg operator. It defines the upper limit.");
public static final ConfigOption<Integer> SQL_RESOURCE_HASH_AGG_TABLE_MAX_MEM =
key("sql.resource.hash-agg.table-max-memory-mb")
.defaultValue(512)
.withDescription("Sets the table max memory size of hashAgg operator. It defines the upper limit.");
public static final ConfigOption<Integer> SQL_RESOURCE_SINK_DEFAULT_MEM =
key("sql.resource.sink.default.memory.mb")
.defaultValue(16)
.withDescription("Sets the heap memory size of sink operator.");
public static final ConfigOption<Integer> SQL_RESOURCE_SINK_DIRECT_MEM =
key("sql.resource.sink.default.direct.memory.mb")
.defaultValue(0)
.withDescription("Sets the default direct memory size of sink operator.");
public static final ConfigOption<Integer> SQL_RESOURCE_SINK_PARALLELISM =
key("sql.resource.sink.parallelism")
.defaultValue(-1)
.withDescription("Sets sink parallelism if it is set. If it is not set, " +
"sink nodes will chain with ahead nodes as far as possible.");
public static final ConfigOption<Integer> SQL_RESOURCE_SOURCE_DEFAULT_MEM =
key("sql.resource.source.default.memory.mb")
.defaultValue(16)
.withDescription("Sets the heap memory size of source operator.");
public static final ConfigOption<Integer> SQL_RESOURCE_SOURCE_DIRECT_MEM =
key("sql.resource.source.default.direct.memory.mb")
.defaultValue(0)
.withDescription("Sets the default direct memory size of source operator.");
public static final ConfigOption<Integer> SQL_RESOURCE_DEFAULT_PARALLELISM =
key("sql.resource.default.parallelism")
.defaultValue(-1)
.withDescription("Default parallelism of the job. If any node do not have special parallelism, use it." +
"Its default value is the num of cpu cores in the client host.");
public static final ConfigOption<Integer> SQL_RESOURCE_SOURCE_PARALLELISM =
key("sql.resource.source.parallelism")
.defaultValue(-1)
.withDescription("Sets source parallelism if " + SQL_RESOURCE_INFER_MODE + " is NONE." +
"If it is not set, use " + SQL_RESOURCE_DEFAULT_PARALLELISM + " to set source parallelism.");
public static final ConfigOption<Double> SQL_RESOURCE_DEFAULT_CPU =
key("sql.resource.default.cpu")
.defaultValue(0.3)
.withDescription("Default cpu for each operator.");
public static final ConfigOption<Integer> SQL_RESOURCE_DEFAULT_MEM =
key("sql.resource.default.memory.mb")
.defaultValue(16)
.withDescription("Default heap memory size for each operator.");
public static final ConfigOption<Integer> SQL_RESOURCE_DEFAULT_DIRECT_MEM =
key("sql.resource.default.direct.memory.mb")
.defaultValue(0)
.withDescription("Default direct memory size for each operator.");
public static final ConfigOption<Long> SQL_RESOURCE_INFER_ROWS_PER_PARTITION =
key("sql.resource.infer.rows-per-partition")
.defaultValue(1000000L)
.withDescription("Sets how many rows one partition processes. We will infer parallelism according " +
"to input row count.");
public static final ConfigOption<Integer> SQL_RESOURCE_INFER_SOURCE_PARALLELISM_MAX =
key("sql.resource.infer.source.parallelism.max")
.defaultValue(1000)
.withDescription("Sets max parallelism for source operator.");
public static final ConfigOption<Integer> SQL_RESOURCE_INFER_SOURCE_MB_PER_PARTITION =
key("sql.resource.infer.source.mb-per-partition")
.defaultValue(100)
.withDescription("Sets how many data size in MB one partition processes. We will infer the source parallelism" +
" according to source data size.");
public static final ConfigOption<Integer> SQL_RESOURCE_INFER_OPERATOR_PARALLELISM_MAX =
key("sql.resource.infer.operator.parallelism.max")
.defaultValue(800)
.withDescription("Sets max parallelism for all operators.");
public static final ConfigOption<Integer> SQL_RESOURCE_INFER_OPERATOR_MEM_MAX =
key("sql.resource.infer.operator.memory.max.mb")
.defaultValue(1024)
.withDescription("Maybe inferred operator mem is too large, so this setting is upper limit for the" +
" inferred operator mem.");
public static final ConfigOption<Double> SQL_RESOURCE_RUNNING_UNIT_CPU_TOTAL =
key("sql.resource.running-unit.cpu.total")
.defaultValue(0d)
.withDescription("total cpu limit of a runningUnit. 0 means no limit.");
// ------------------------------------------------------------------------
// schedule Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> SQL_SCHEDULE_RUNNING_UNIT_ENABLED =
key("sql.schedule.running-unit.enabled")
.defaultValue(true)
.withDescription("Whether to schedule according to runningUnits.");
// ------------------------------------------------------------------------
// Parquet Options
// ------------------------------------------------------------------------
public static final ConfigOption<Integer> SQL_PARQUET_SINK_BLOCK_SIZE =
key("sql.parquet.sink.block.size")
.defaultValue(128 * 1024 * 1024)
.withDescription("Parquet block size in bytes, this value would be set as " +
"parquet.block.size and dfs.blocksize to improve alignment between " +
"row group and hdfs block.");
public static final ConfigOption<Boolean> SQL_PARQUET_SINK_DICTIONARY_ENABLED =
key("sql.parquet.sink.dictionary.enabled")
.defaultValue(true)
.withDescription("Enable Parquet dictionary encoding.");
public static final ConfigOption<Boolean> SQL_PARQUET_PREDICATE_PUSHDOWN_ENABLED =
key("sql.parquet.predicate-pushdown.enabled")
.defaultValue(true)
.withDescription("Allow trying to push filter down to a parquet [[TableSource]]. the default value is true, " +
"means allow the attempt.");
// ------------------------------------------------------------------------
// Orc Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> SQL_ORC_PREDICATE_PUSHDOWN_ENABLED =
key("sql.orc.predicate-pushdown.enabled")
.defaultValue(true)
.withDescription("Allow trying to push filter down to a orc [[TableSource]]. The default value is true, " +
"means allow the attempt.");
}