blob: 3f13e549621653240355d082c4db31de87b246a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.util;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableConfigOptions;
import static org.apache.flink.configuration.ConfigOptions.key;
/**
* Deal with resource config for {@link org.apache.flink.table.plan.nodes.exec.ExecNode}.
*/
public class NodeResourceUtil {
/**
* How many Bytes per MB.
*/
public static final long SIZE_IN_MB = 1024L * 1024;
public static final ConfigOption<Integer> SQL_RESOURCE_INFER_OPERATOR_PARALLELISM_MIN =
key("sql.resource.infer.operator.parallelism.min")
.defaultValue(1)
.withDescription("Sets min parallelism for operators.");
public static final ConfigOption<Integer> SQL_RESOURCE_INFER_OPERATOR_MEMORY_MIN =
key("sql.resource.infer.operator.memory.min.mb")
.defaultValue(32)
.withDescription("Maybe the infer's reserved manager mem is too small, so this " +
"setting is lower limit for the infer's manager mem.");
public static final ConfigOption<Double> SQL_RESOURCE_INFER_MEM_RESERVE_PREFER_DISCOUNT =
key("sql.resource.infer.mem.reserve-prefer.discount")
.defaultValue(1.0d)
.withDescription("Sets reserve discount to prefer mem.");
public static final ConfigOption<Integer> SQL_RESOURCE_PER_REQUEST_MEM =
key("sql.resource.per-request.mem.mb")
.defaultValue(32)
.withDescription("Sets the number of per-requested buffers when the operator " +
"allocates much more segments from the floating memory pool.");
public static double getDefaultCpu(Configuration tableConf) {
return tableConf.getDouble(
TableConfigOptions.SQL_RESOURCE_DEFAULT_CPU);
}
/**
* Gets default parallelism of operator.
* @param tableConf Configuration.
* @return default parallelism of operator.
*/
public static int getOperatorDefaultParallelism(Configuration tableConf, int envParallelism) {
int parallelism = tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM);
if (parallelism <= 0) {
parallelism = envParallelism;
}
return parallelism;
}
/**
* Gets default resourceSpec for a operator that has no specific resource need.
* For converter rel.
* @param tableConf Configuration.
* @return default resourceSpec for a operator that has no specific resource need.
*/
public static ResourceSpec getDefaultResourceSpec(Configuration tableConf) {
ResourceSpec.Builder builder = new ResourceSpec.Builder();
builder.setCpuCores(getDefaultCpu(tableConf));
builder.setHeapMemoryInMB(getDefaultHeapMem(tableConf));
builder.setDirectMemoryInMB(getDefaultDirectMem(tableConf));
return builder.build();
}
/**
* Gets resourceSpec which specific heapMemory size. For sink rel.
* @param tableConf Configuration.
* @param heapMemory the specific heapMemory size.
* @return resourceSpec which specific heapMemory size.
*/
public static ResourceSpec getResourceSpec(
Configuration tableConf,
int heapMemory,
int directMemory) {
ResourceSpec.Builder builder = new ResourceSpec.Builder();
builder.setCpuCores(getDefaultCpu(tableConf));
builder.setHeapMemoryInMB(heapMemory);
builder.setDirectMemoryInMB(directMemory);
return builder.build();
}
/**
* Gets the config direct heap memory for operator.
* @param tableConf Configuration.
* @return the config direct heap memory for operator.
*/
public static int getDefaultHeapMem(Configuration tableConf) {
return tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_DEFAULT_MEM);
}
/**
* Gets the config direct memory for operator.
* @param tableConf Configuration.
* @return the config direct memory for operator.
*/
public static int getDefaultDirectMem(Configuration tableConf) {
return tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_DEFAULT_DIRECT_MEM);
}
/**
* Gets the config managedMemory for sort buffer.
* @param tableConf Configuration.
* @return the config managedMemory for sort buffer.
*/
public static int getSortBufferManagedMemory(Configuration tableConf) {
return tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MEM);
}
/**
* Gets the preferred managedMemory for sort buffer.
* @param tableConf Configuration.
* @return the prefer managedMemory for sort buffer.
*/
public static int getSortBufferManagedPreferredMemory(Configuration tableConf) {
return tableConf.getInteger(TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_PREFER_MEM);
}
/**
* Gets the max managedMemory for sort buffer.
* @param tableConf Configuration.
* @return the max managedMemory for sort buffer.
*/
public static int getSortBufferManagedMaxMemory(Configuration tableConf) {
return tableConf.getInteger(TableConfigOptions.SQL_RESOURCE_SORT_BUFFER_MAX_MEM);
}
/**
* Gets the config managedMemory for external buffer.
* @param tableConf Configuration.
* @return the config managedMemory for external buffer.
*/
public static int getExternalBufferManagedMemory(Configuration tableConf) {
return tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_EXTERNAL_BUFFER_MEM);
}
/**
* Gets the config managedMemory for hashJoin table.
* @param tableConf Configuration.
* @return the config managedMemory for hashJoin table.
*/
public static int getHashJoinTableManagedMemory(Configuration tableConf) {
return tableConf.getInteger(TableConfigOptions.SQL_RESOURCE_HASH_JOIN_TABLE_MEM);
}
/**
* Gets the config heap memory for source.
* @param tableConf Configuration.
* @return the config heap memory for source.
*/
public static int getSourceMem(Configuration tableConf) {
return tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_SOURCE_DEFAULT_MEM);
}
/**
* Gets the config direct memory for source.
* @param tableConf Configuration.
* @return the config direct memory for source.
*/
public static int getSourceDirectMem(Configuration tableConf) {
return tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_SOURCE_DIRECT_MEM);
}
/**
* Gets the config parallelism for source.
* @param tableConf Configuration.
* @return the config parallelism for source.
*/
public static int getSourceParallelism(Configuration tableConf, int envParallelism) {
int parallelism = tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_SOURCE_PARALLELISM);
if (parallelism <= 0) {
parallelism = getOperatorDefaultParallelism(tableConf, envParallelism);
}
return parallelism;
}
/**
* Gets the config parallelism for sink. If it is not set, return -1.
* @param tableConf Configuration.
* @return the config parallelism for sink.
*/
public static int getSinkParallelism(Configuration tableConf) {
return tableConf.getInteger(TableConfigOptions.SQL_RESOURCE_SINK_PARALLELISM);
}
/**
* Gets the config heap memory for sink.
* @param tableConf Configuration.
* @return the config heap memory for sink.
*/
public static int getSinkMem(Configuration tableConf) {
return tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_SINK_DEFAULT_MEM);
}
/**
* Gets the config direct memory for sink.
* @param tableConf Configuration.
* @return the config direct memory for sink.
*/
public static int getSinkDirectMem(Configuration tableConf) {
return tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_SINK_DIRECT_MEM);
}
/**
* Gets the config managedMemory for hashAgg.
* @param tableConf Configuration.
* @return the config managedMemory for hashAgg.
*/
public static int getHashAggManagedMemory(Configuration tableConf) {
return tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MEM);
}
/**
* Gets the config managedMemory for hashAgg.
* @param tableConf Configuration.
* @return the config managedMemory for hashAgg.
*/
public static int getWindowAggBufferLimitSize(Configuration tableConf) {
return tableConf.getInteger(
TableConfigOptions.SQL_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT);
}
/**
* Gets the config row count that one partition processes.
* @param tableConf Configuration.
* @return the config row count that one partition processes.
*/
public static long getRelCountPerPartition(Configuration tableConf) {
return tableConf.getLong(
TableConfigOptions.SQL_RESOURCE_INFER_ROWS_PER_PARTITION);
}
/**
* Gets the config data size that one partition processes.
* @param tableConf Configuration.
* @return the config data size that one partition processes.
*/
public static int getSourceSizePerPartition(Configuration tableConf) {
return tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_INFER_SOURCE_MB_PER_PARTITION);
}
/**
* Gets the config max num of source parallelism.
* @param tableConf Configuration.
* @return the config max num of source parallelism.
*/
public static int getSourceMaxParallelism(Configuration tableConf) {
return tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_INFER_SOURCE_PARALLELISM_MAX);
}
/**
* Gets the config max num of operator parallelism.
* @param tableConf Configuration.
* @return the config max num of operator parallelism.
*/
public static int getOperatorMaxParallelism(Configuration tableConf) {
return tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_INFER_OPERATOR_PARALLELISM_MAX);
}
/**
* Gets the config min num of operator parallelism.
* @param tableConf Configuration.
* @return the config max num of operator parallelism.
*/
public static int getOperatorMinParallelism(Configuration tableConf) {
return tableConf.getInteger(SQL_RESOURCE_INFER_OPERATOR_PARALLELISM_MIN);
}
/**
* Calculates operator parallelism based on rowcount of the operator.
* @param rowCount rowCount of the operator
* @param tableConf Configuration.
* @return the result of operator parallelism.
*/
public static int calOperatorParallelism(double rowCount, Configuration tableConf) {
int maxParallelism = getOperatorMaxParallelism(tableConf);
int minParallelism = getOperatorMinParallelism(tableConf);
int resultParallelism = (int) (rowCount / getRelCountPerPartition(tableConf));
return Math.max(Math.min(resultParallelism, maxParallelism), minParallelism);
}
/**
* Gets the preferred managedMemory for hashJoin table.
* @param tableConf Configuration.
* @return the preferred managedMemory for hashJoin table.
*/
public static int getHashJoinTableManagedPreferredMemory(Configuration tableConf) {
int memory = tableConf.getInteger(TableConfigOptions.SQL_RESOURCE_HASH_JOIN_TABLE_PREFER_MEM);
if (memory <= 0) {
memory = getHashJoinTableManagedMemory(tableConf);
}
return memory;
}
/**
* Gets the max managedMemory for hashJoin table.
* @param tableConf Configuration.
* @return the max managedMemory for hashJoin table.
*/
public static int getHashJoinTableManagedMaxMemory(Configuration tableConf) {
int memory = tableConf.getInteger(TableConfigOptions.SQL_RESOURCE_HASH_JOIN_TABLE_MAX_MEM);
if (memory <= 0) {
memory = getHashJoinTableManagedPreferredMemory(tableConf);
}
return memory;
}
/**
* Gets the preferred managedMemory for hashAgg.
* @param tableConf Configuration.
* @return the preferred managedMemory for hashAgg.
*/
public static int getHashAggManagedPreferredMemory(Configuration tableConf) {
int memory = tableConf.getInteger(TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_PREFER_MEM);
if (memory <= 0) {
memory = getHashAggManagedMemory(tableConf);
}
return memory;
}
/**
* Gets the max managedMemory for hashAgg.
* @param tableConf Configuration.
* @return the max managedMemory for hashAgg.
*/
public static int getHashAggManagedMaxMemory(Configuration tableConf) {
int memory = tableConf.getInteger(TableConfigOptions.SQL_RESOURCE_HASH_AGG_TABLE_MAX_MEM);
if (memory <= 0) {
memory = getHashAggManagedPreferredMemory(tableConf);
}
return memory;
}
/**
* Gets the min managedMemory.
* @param tableConf Configuration.
* @return the min managedMemory.
*/
public static int getOperatorMinManagedMem(Configuration tableConf) {
return tableConf.getInteger(SQL_RESOURCE_INFER_OPERATOR_MEMORY_MIN);
}
/**
* get the reserved, preferred and max managed mem by the inferred mem. And they should be between the maximum
* and the minimum mem.
*
* @param tableConf Configuration.
* @param memCostInMB the infer mem of per partition.
*/
public static Tuple3<Integer, Integer, Integer> reviseAndGetInferManagedMem(Configuration tableConf, int memCostInMB) {
double reservedDiscount = tableConf.getDouble(SQL_RESOURCE_INFER_MEM_RESERVE_PREFER_DISCOUNT);
if (reservedDiscount > 1 || reservedDiscount <= 0) {
throw new IllegalArgumentException(SQL_RESOURCE_INFER_MEM_RESERVE_PREFER_DISCOUNT + " should be > 0 and <= 1");
}
int maxMem = tableConf.getInteger(
TableConfigOptions.SQL_RESOURCE_INFER_OPERATOR_MEM_MAX);
int minMem = getOperatorMinManagedMem(tableConf);
int preferMem = Math.max(Math.min(maxMem, memCostInMB), minMem);
int reservedMem = Math.max((int) (preferMem * reservedDiscount), minMem);
return new Tuple3<>(reservedMem, preferMem, maxMem);
}
/**
* Gets the managedMemory for per-allocating.
*/
public static int getPerRequestManagedMemory(Configuration tableConf) {
return tableConf.getInteger(SQL_RESOURCE_PER_REQUEST_MEM);
}
/**
* Whether to enable schedule with runningUnit.
*/
public static boolean enableRunningUnitSchedule(Configuration tableConf) {
return tableConf.getBoolean(TableConfigOptions.SQL_SCHEDULE_RUNNING_UNIT_ENABLED);
}
/**
* Infer resource mode.
*/
public enum InferMode {
NONE, ONLY_SOURCE, ALL
}
public static InferMode getInferMode(Configuration tableConf) {
String config = tableConf.getString(
TableConfigOptions.SQL_RESOURCE_INFER_MODE);
try {
return InferMode.valueOf(config);
} catch (IllegalArgumentException ex) {
throw new IllegalArgumentException("Infer mode can only be set: NONE, SOURCE or ALL.");
}
}
}