[FLINK-15959][flink-runtime] Add min number of slots configuration to limit total number of slots
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 8ae5ab1..576e6e7 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -129,12 +129,30 @@
             <td>Maximum memory size the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.max'.</td>
         </tr>
         <tr>
+            <td><h5>slotmanager.min-total-resource.cpu</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Double</td>
+            <td>Minimum cpu cores the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.min'.</td>
+        </tr>
+        <tr>
+            <td><h5>slotmanager.min-total-resource.memory</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>MemorySize</td>
+            <td>Minimum memory size the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.min'.</td>
+        </tr>
+        <tr>
             <td><h5>slotmanager.number-of-slots.max</h5></td>
             <td style="word-wrap: break-word;">infinite</td>
             <td>Integer</td>
             <td>Defines the maximum number of slots that the Flink cluster allocates. This configuration option is meant for limiting the resource consumption for batch workloads. It is not recommended to configure this option for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.</td>
         </tr>
         <tr>
+            <td><h5>slotmanager.number-of-slots.min</h5></td>
+            <td style="word-wrap: break-word;">infinite</td>
+            <td>Integer</td>
+            <td>Defines the minimum number of slots that the Flink cluster allocates. This configuration option is meant for cluster to initialize certain workers in best efforts when starting. This can be used to speed up a job startup process. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.</td>
+        </tr>
+        <tr>
             <td><h5>slow-task-detector.check-interval</h5></td>
             <td style="word-wrap: break-word;">1 s</td>
             <td>Duration</td>
diff --git a/docs/layouts/shortcodes/generated/resource_manager_configuration.html b/docs/layouts/shortcodes/generated/resource_manager_configuration.html
index fe6388e..24f6973 100644
--- a/docs/layouts/shortcodes/generated/resource_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/resource_manager_configuration.html
@@ -69,12 +69,30 @@
             <td>Maximum memory size the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.max'.</td>
         </tr>
         <tr>
+            <td><h5>slotmanager.min-total-resource.cpu</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Double</td>
+            <td>Minimum cpu cores the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.min'.</td>
+        </tr>
+        <tr>
+            <td><h5>slotmanager.min-total-resource.memory</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>MemorySize</td>
+            <td>Minimum memory size the Flink cluster allocates for slots. Resources for JobManager and TaskManager framework are excluded. If not configured, it will be derived from 'slotmanager.number-of-slots.min'.</td>
+        </tr>
+        <tr>
             <td><h5>slotmanager.number-of-slots.max</h5></td>
             <td style="word-wrap: break-word;">infinite</td>
             <td>Integer</td>
             <td>Defines the maximum number of slots that the Flink cluster allocates. This configuration option is meant for limiting the resource consumption for batch workloads. It is not recommended to configure this option for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.</td>
         </tr>
         <tr>
+            <td><h5>slotmanager.number-of-slots.min</h5></td>
+            <td style="word-wrap: break-word;">infinite</td>
+            <td>Integer</td>
+            <td>Defines the minimum number of slots that the Flink cluster allocates. This configuration option is meant for cluster to initialize certain workers in best efforts when starting. This can be used to speed up a job startup process. Note that this configuration option does not take effect for standalone clusters, where how many slots are allocated is not controlled by Flink.</td>
+        </tr>
+        <tr>
             <td><h5>slotmanager.redundant-taskmanager-num</h5></td>
             <td style="word-wrap: break-word;">0</td>
             <td>Integer</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index 5e6e4b8..b28797e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -64,6 +64,18 @@
 
     @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
     @Documentation.OverrideDefault("infinite")
+    public static final ConfigOption<Integer> MIN_SLOT_NUM =
+            ConfigOptions.key("slotmanager.number-of-slots.min")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription(
+                            "Defines the minimum number of slots that the Flink cluster allocates. This configuration option "
+                                    + "is meant for cluster to initialize certain workers in best efforts when starting. This can "
+                                    + "be used to speed up a job startup process. Note that this configuration option does not take "
+                                    + "effect for standalone clusters, where how many slots are allocated is not controlled by Flink.");
+
+    @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+    @Documentation.OverrideDefault("infinite")
     public static final ConfigOption<Integer> MAX_SLOT_NUM =
             ConfigOptions.key("slotmanager.number-of-slots.max")
                     .intType()
@@ -75,6 +87,18 @@
                                     + "effect for standalone clusters, where how many slots are allocated is not controlled by Flink.");
 
     @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+    public static final ConfigOption<Double> MIN_TOTAL_CPU =
+            ConfigOptions.key("slotmanager.min-total-resource.cpu")
+                    .doubleType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Minimum cpu cores the Flink cluster allocates for slots. Resources "
+                                    + "for JobManager and TaskManager framework are excluded. If "
+                                    + "not configured, it will be derived from '"
+                                    + MIN_SLOT_NUM.key()
+                                    + "'.");
+
+    @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
     public static final ConfigOption<Double> MAX_TOTAL_CPU =
             ConfigOptions.key("slotmanager.max-total-resource.cpu")
                     .doubleType()
@@ -87,6 +111,18 @@
                                     + "'.");
 
     @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+    public static final ConfigOption<MemorySize> MIN_TOTAL_MEM =
+            ConfigOptions.key("slotmanager.min-total-resource.memory")
+                    .memoryType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Minimum memory size the Flink cluster allocates for slots. Resources "
+                                    + "for JobManager and TaskManager framework are excluded. If "
+                                    + "not configured, it will be derived from '"
+                                    + MIN_SLOT_NUM.key()
+                                    + "'.");
+
+    @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
     public static final ConfigOption<MemorySize> MAX_TOTAL_MEM =
             ConfigOptions.key("slotmanager.max-total-resource.memory")
                     .memoryType()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index c6ca3bb..330a65b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -30,6 +30,7 @@
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 
+import java.math.RoundingMode;
 import java.time.Duration;
 
 /** Configuration for the {@link SlotManager}. */
@@ -43,8 +44,11 @@
     private final boolean evenlySpreadOutSlots;
     private final WorkerResourceSpec defaultWorkerResourceSpec;
     private final int numSlotsPerWorker;
+    private final int minSlotNum;
     private final int maxSlotNum;
+    private final CPUResource minTotalCpu;
     private final CPUResource maxTotalCpu;
+    private final MemorySize minTotalMem;
     private final MemorySize maxTotalMem;
     private final int redundantTaskManagerNum;
 
@@ -58,8 +62,11 @@
             boolean evenlySpreadOutSlots,
             WorkerResourceSpec defaultWorkerResourceSpec,
             int numSlotsPerWorker,
+            int minSlotNum,
             int maxSlotNum,
+            CPUResource minTotalCpu,
             CPUResource maxTotalCpu,
+            MemorySize minTotalMem,
             MemorySize maxTotalMem,
             int redundantTaskManagerNum) {
 
@@ -72,15 +79,102 @@
         this.evenlySpreadOutSlots = evenlySpreadOutSlots;
         this.defaultWorkerResourceSpec = Preconditions.checkNotNull(defaultWorkerResourceSpec);
         Preconditions.checkState(numSlotsPerWorker > 0);
-        Preconditions.checkState(maxSlotNum > 0);
         this.numSlotsPerWorker = numSlotsPerWorker;
+        checkSlotNumResource(minSlotNum, maxSlotNum, defaultWorkerResourceSpec);
+        checkTotalCPUResource(minTotalCpu, maxTotalCpu, defaultWorkerResourceSpec);
+        checkTotalMemoryResource(minTotalMem, maxTotalMem, defaultWorkerResourceSpec);
+        this.minSlotNum = minSlotNum;
         this.maxSlotNum = maxSlotNum;
-        this.maxTotalCpu = Preconditions.checkNotNull(maxTotalCpu);
-        this.maxTotalMem = Preconditions.checkNotNull(maxTotalMem);
+        this.minTotalCpu = minTotalCpu;
+        this.maxTotalCpu = maxTotalCpu;
+        this.minTotalMem = minTotalMem;
+        this.maxTotalMem = maxTotalMem;
         Preconditions.checkState(redundantTaskManagerNum >= 0);
         this.redundantTaskManagerNum = redundantTaskManagerNum;
     }
 
+    private void checkSlotNumResource(
+            int minSlotNum, int maxSlotNum, WorkerResourceSpec workerResourceSpec) {
+        Preconditions.checkState(minSlotNum >= 0 && minSlotNum <= maxSlotNum);
+        Preconditions.checkState(maxSlotNum > 0);
+
+        if (minSlotNum == 0) {
+            // no need to check resource stability
+            return;
+        }
+
+        // cluster resource stability check.
+        int minSlotWorkerNum =
+                (int) Math.ceil((double) minSlotNum / workerResourceSpec.getNumSlots());
+        int maxSlotWorkerNum =
+                (int) Math.floor((double) maxSlotNum / workerResourceSpec.getNumSlots());
+
+        Preconditions.checkState(minSlotWorkerNum <= maxSlotWorkerNum);
+    }
+
+    private void checkTotalCPUResource(
+            CPUResource minTotalCpu,
+            CPUResource maxTotalCpu,
+            WorkerResourceSpec workerResourceSpec) {
+        Preconditions.checkNotNull(minTotalCpu);
+        Preconditions.checkNotNull(maxTotalCpu);
+        Preconditions.checkState(maxTotalCpu.compareTo(minTotalCpu) >= 0);
+
+        if (minTotalCpu.isZero()) {
+            // no need to check resource stability
+            return;
+        }
+
+        // cluster resource stability check.
+        int minCPUWorkerNum =
+                (int)
+                        minTotalCpu
+                                .getValue()
+                                .divide(
+                                        workerResourceSpec.getCpuCores().getValue(),
+                                        0,
+                                        RoundingMode.CEILING)
+                                .doubleValue();
+
+        int maxCPUWorkerNum =
+                (int)
+                        maxTotalCpu
+                                .getValue()
+                                .divide(
+                                        workerResourceSpec.getCpuCores().getValue(),
+                                        0,
+                                        RoundingMode.FLOOR)
+                                .doubleValue();
+
+        Preconditions.checkState(minCPUWorkerNum <= maxCPUWorkerNum);
+    }
+
+    private void checkTotalMemoryResource(
+            MemorySize minTotalMem, MemorySize maxTotalMem, WorkerResourceSpec workerResourceSpec) {
+        Preconditions.checkNotNull(minTotalMem);
+        Preconditions.checkNotNull(maxTotalMem);
+        Preconditions.checkState(maxTotalMem.compareTo(minTotalMem) >= 0);
+
+        if (minTotalMem.compareTo(MemorySize.ZERO) == 0) {
+            // no need to check resource stability
+            return;
+        }
+
+        // cluster resource stability check.
+        int minMemoryWorkerNum =
+                (int)
+                        Math.ceil(
+                                (double) minTotalMem.getBytes()
+                                        / workerResourceSpec.getTotalMemSize().getBytes());
+
+        int maxMemoryWorkerNum =
+                (int)
+                        Math.floor(
+                                (double) maxTotalMem.getBytes()
+                                        / workerResourceSpec.getTotalMemSize().getBytes());
+        Preconditions.checkState(minMemoryWorkerNum <= maxMemoryWorkerNum);
+    }
+
     public Time getTaskManagerRequestTimeout() {
         return taskManagerRequestTimeout;
     }
@@ -117,14 +211,26 @@
         return numSlotsPerWorker;
     }
 
+    public int getMinSlotNum() {
+        return minSlotNum;
+    }
+
     public int getMaxSlotNum() {
         return maxSlotNum;
     }
 
+    public CPUResource getMinTotalCpu() {
+        return minTotalCpu;
+    }
+
     public CPUResource getMaxTotalCpu() {
         return maxTotalCpu;
     }
 
+    public MemorySize getMinTotalMem() {
+        return minTotalMem;
+    }
+
     public MemorySize getMaxTotalMem() {
         return maxTotalMem;
     }
@@ -163,6 +269,7 @@
 
         int numSlotsPerWorker = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
 
+        int minSlotNum = configuration.getInteger(ResourceManagerOptions.MIN_SLOT_NUM);
         int maxSlotNum = configuration.getInteger(ResourceManagerOptions.MAX_SLOT_NUM);
 
         int redundantTaskManagerNum =
@@ -178,12 +285,32 @@
                 evenlySpreadOutSlots,
                 defaultWorkerResourceSpec,
                 numSlotsPerWorker,
+                minSlotNum,
                 maxSlotNum,
+                getMinTotalCpu(configuration, defaultWorkerResourceSpec, minSlotNum),
                 getMaxTotalCpu(configuration, defaultWorkerResourceSpec, maxSlotNum),
+                getMinTotalMem(configuration, defaultWorkerResourceSpec, minSlotNum),
                 getMaxTotalMem(configuration, defaultWorkerResourceSpec, maxSlotNum),
                 redundantTaskManagerNum);
     }
 
+    private static CPUResource getMinTotalCpu(
+            final Configuration configuration,
+            final WorkerResourceSpec defaultWorkerResourceSpec,
+            final int minSlotNum) {
+        return configuration
+                .getOptional(ResourceManagerOptions.MIN_TOTAL_CPU)
+                .map(CPUResource::new)
+                .orElseGet(
+                        () ->
+                                minSlotNum == 0
+                                        ? new CPUResource(Double.MIN_VALUE)
+                                        : defaultWorkerResourceSpec
+                                                .getCpuCores()
+                                                .multiply(minSlotNum)
+                                                .divide(defaultWorkerResourceSpec.getNumSlots()));
+    }
+
     private static CPUResource getMaxTotalCpu(
             final Configuration configuration,
             final WorkerResourceSpec defaultWorkerResourceSpec,
@@ -201,6 +328,22 @@
                                                 .divide(defaultWorkerResourceSpec.getNumSlots()));
     }
 
+    private static MemorySize getMinTotalMem(
+            final Configuration configuration,
+            final WorkerResourceSpec defaultWorkerResourceSpec,
+            final int minSlotNum) {
+        return configuration
+                .getOptional(ResourceManagerOptions.MIN_TOTAL_MEM)
+                .orElseGet(
+                        () ->
+                                minSlotNum == 0
+                                        ? MemorySize.ZERO
+                                        : defaultWorkerResourceSpec
+                                                .getTotalMemSize()
+                                                .multiply(minSlotNum)
+                                                .divide(defaultWorkerResourceSpec.getNumSlots()));
+    }
+
     private static MemorySize getMaxTotalMem(
             final Configuration configuration,
             final WorkerResourceSpec defaultWorkerResourceSpec,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
index bfc291a..57492af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerBuilder.java
@@ -44,6 +44,7 @@
     private WorkerResourceSpec defaultWorkerResourceSpec;
     private int numSlotsPerWorker;
     private SlotManagerMetricGroup slotManagerMetricGroup;
+    private int minSlotNum;
     private int maxSlotNum;
     private int redundantTaskManagerNum;
     private ResourceTracker resourceTracker;
@@ -61,6 +62,7 @@
         this.numSlotsPerWorker = 1;
         this.slotManagerMetricGroup =
                 UnregisteredMetricGroups.createUnregisteredSlotManagerMetricGroup();
+        this.minSlotNum = ResourceManagerOptions.MIN_SLOT_NUM.defaultValue();
         this.maxSlotNum = ResourceManagerOptions.MAX_SLOT_NUM.defaultValue();
         this.redundantTaskManagerNum =
                 ResourceManagerOptions.REDUNDANT_TASK_MANAGER_NUM.defaultValue();
@@ -113,6 +115,11 @@
         return this;
     }
 
+    public DeclarativeSlotManagerBuilder setMinSlotNum(int minSlotNum) {
+        this.minSlotNum = minSlotNum;
+        return this;
+    }
+
     public DeclarativeSlotManagerBuilder setMaxSlotNum(int maxSlotNum) {
         this.maxSlotNum = maxSlotNum;
         return this;
@@ -158,8 +165,11 @@
                         evenlySpreadOutSlots,
                         defaultWorkerResourceSpec,
                         numSlotsPerWorker,
+                        minSlotNum,
                         maxSlotNum,
+                        new CPUResource(Double.MIN_VALUE),
                         new CPUResource(Double.MAX_VALUE),
+                        MemorySize.ZERO,
                         MemorySize.MAX_VALUE,
                         redundantTaskManagerNum);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
index 745ae5b..d5d70e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationBuilder.java
@@ -36,8 +36,11 @@
     private boolean waitResultConsumedBeforeRelease;
     private WorkerResourceSpec defaultWorkerResourceSpec;
     private int numSlotsPerWorker;
+    private int minSlotNum;
     private int maxSlotNum;
+    private CPUResource minTotalCpu;
     private CPUResource maxTotalCpu;
+    private MemorySize minTotalMem;
     private MemorySize maxTotalMem;
     private int redundantTaskManagerNum;
     private boolean evenlySpreadOutSlots;
@@ -51,8 +54,11 @@
         this.waitResultConsumedBeforeRelease = true;
         this.defaultWorkerResourceSpec = WorkerResourceSpec.ZERO;
         this.numSlotsPerWorker = 1;
+        this.minSlotNum = ResourceManagerOptions.MIN_SLOT_NUM.defaultValue();
         this.maxSlotNum = ResourceManagerOptions.MAX_SLOT_NUM.defaultValue();
+        this.minTotalCpu = new CPUResource(Double.MIN_VALUE);
         this.maxTotalCpu = new CPUResource(Double.MAX_VALUE);
+        this.minTotalMem = MemorySize.ZERO;
         this.maxTotalMem = MemorySize.MAX_VALUE;
         this.redundantTaskManagerNum =
                 ResourceManagerOptions.REDUNDANT_TASK_MANAGER_NUM.defaultValue();
@@ -103,16 +109,28 @@
         return this;
     }
 
+    public void setMinSlotNum(int minSlotNum) {
+        this.minSlotNum = minSlotNum;
+    }
+
     public SlotManagerConfigurationBuilder setMaxSlotNum(int maxSlotNum) {
         this.maxSlotNum = maxSlotNum;
         return this;
     }
 
+    public void setMinTotalCpu(CPUResource minTotalCpu) {
+        this.minTotalCpu = minTotalCpu;
+    }
+
     public SlotManagerConfigurationBuilder setMaxTotalCpu(CPUResource maxTotalCpu) {
         this.maxTotalCpu = maxTotalCpu;
         return this;
     }
 
+    public void setMinTotalMem(MemorySize minTotalMem) {
+        this.minTotalMem = minTotalMem;
+    }
+
     public SlotManagerConfigurationBuilder setMaxTotalMem(MemorySize maxTotalMem) {
         this.maxTotalMem = maxTotalMem;
         return this;
@@ -141,8 +159,11 @@
                 evenlySpreadOutSlots,
                 defaultWorkerResourceSpec,
                 numSlotsPerWorker,
+                minSlotNum,
                 maxSlotNum,
+                minTotalCpu,
                 maxTotalCpu,
+                minTotalMem,
                 maxTotalMem,
                 redundantTaskManagerNum);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java
index 65ef366..0ebfaf7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java
@@ -28,10 +28,29 @@
 import java.math.BigDecimal;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 
 /** Tests for {@link SlotManagerConfiguration}. */
 class SlotManagerConfigurationTest {
     @Test
+    void testComputeMinTotalCpu() throws Exception {
+        final Configuration configuration = new Configuration();
+        final int minSlotNum = 9;
+        final int numSlots = 3;
+        final double cpuCores = 10;
+        configuration.set(ResourceManagerOptions.MIN_SLOT_NUM, minSlotNum);
+        final SlotManagerConfiguration slotManagerConfiguration =
+                SlotManagerConfiguration.fromConfiguration(
+                        configuration,
+                        new WorkerResourceSpec.Builder()
+                                .setNumSlots(numSlots)
+                                .setCpuCores(cpuCores)
+                                .build());
+        assertThat(slotManagerConfiguration.getMinTotalCpu().getValue().doubleValue())
+                .isEqualTo(cpuCores * minSlotNum / numSlots);
+    }
+
+    @Test
     void testComputeMaxTotalCpu() throws Exception {
         final Configuration configuration = new Configuration();
         final int maxSlotNum = 9;
@@ -50,6 +69,29 @@
     }
 
     @Test
+    void testComputeMinTotalMemory() throws Exception {
+        final Configuration configuration = new Configuration();
+        final int minSlotNum = 1000;
+        final int numSlots = 10;
+        final int totalTaskManagerMB =
+                MemorySize.parse("1", MemorySize.MemoryUnit.TERA_BYTES).getMebiBytes();
+        configuration.set(ResourceManagerOptions.MIN_SLOT_NUM, minSlotNum);
+        final SlotManagerConfiguration slotManagerConfiguration =
+                SlotManagerConfiguration.fromConfiguration(
+                        configuration,
+                        new WorkerResourceSpec.Builder()
+                                .setNumSlots(numSlots)
+                                .setTaskHeapMemoryMB(totalTaskManagerMB)
+                                .build());
+        assertThat(slotManagerConfiguration.getMinTotalMem().getBytes())
+                .isEqualTo(
+                        BigDecimal.valueOf(MemorySize.ofMebiBytes(totalTaskManagerMB).getBytes())
+                                .multiply(BigDecimal.valueOf(minSlotNum))
+                                .divide(BigDecimal.valueOf(numSlots))
+                                .longValue());
+    }
+
+    @Test
     void testComputeMaxTotalMemory() throws Exception {
         final Configuration configuration = new Configuration();
         final int maxSlotNum = 1_000_000;
@@ -71,4 +113,90 @@
                                 .divide(BigDecimal.valueOf(numSlots))
                                 .longValue());
     }
+
+    @Test
+    void testComputeMinMaxSlotNumIsValid() throws Exception {
+        final Configuration configuration = new Configuration();
+        final int minSlotNum = 9;
+        final int maxSlotNum = 12;
+        final int numSlots = 3;
+        final double cpuCores = 10;
+        configuration.set(ResourceManagerOptions.MIN_SLOT_NUM, minSlotNum);
+        configuration.set(ResourceManagerOptions.MAX_SLOT_NUM, maxSlotNum);
+
+        final SlotManagerConfiguration slotManagerConfiguration =
+                SlotManagerConfiguration.fromConfiguration(
+                        configuration,
+                        new WorkerResourceSpec.Builder()
+                                .setNumSlots(numSlots)
+                                .setCpuCores(cpuCores)
+                                .build());
+        assertThat(slotManagerConfiguration.getMinTotalCpu().getValue().doubleValue())
+                .isEqualTo(cpuCores * minSlotNum / numSlots);
+        assertThat(slotManagerConfiguration.getMaxTotalCpu().getValue().doubleValue())
+                .isEqualTo(cpuCores * maxSlotNum / numSlots);
+    }
+
+    @Test
+    void testComputeMinMaxSlotNumIsInvalid() {
+        final Configuration configuration = new Configuration();
+        final int minSlotNum = 10;
+        final int maxSlotNum = 11;
+        final int numSlots = 3;
+        configuration.set(ResourceManagerOptions.MIN_SLOT_NUM, minSlotNum);
+        configuration.set(ResourceManagerOptions.MAX_SLOT_NUM, maxSlotNum);
+
+        assertThatIllegalStateException()
+                .isThrownBy(
+                        () ->
+                                SlotManagerConfiguration.fromConfiguration(
+                                        configuration,
+                                        new WorkerResourceSpec.Builder()
+                                                .setNumSlots(numSlots)
+                                                .build()));
+    }
+
+    @Test
+    void testComputeMinMaxCpuIsInvalid() {
+        final Configuration configuration = new Configuration();
+        final double minTotalCpu = 10.0;
+        final double maxTotalCpu = 11.0;
+        final int numSlots = 3;
+        final double cpuCores = 3;
+        configuration.set(ResourceManagerOptions.MIN_TOTAL_CPU, minTotalCpu);
+        configuration.set(ResourceManagerOptions.MAX_TOTAL_CPU, maxTotalCpu);
+
+        assertThatIllegalStateException()
+                .isThrownBy(
+                        () ->
+                                SlotManagerConfiguration.fromConfiguration(
+                                        configuration,
+                                        new WorkerResourceSpec.Builder()
+                                                .setNumSlots(numSlots)
+                                                .setCpuCores(cpuCores)
+                                                .build()));
+    }
+
+    @Test
+    void testComputeMinMaxMemoryIsInvalid() {
+        final Configuration configuration = new Configuration();
+        final MemorySize minMemorySize = MemorySize.ofMebiBytes(500);
+        final MemorySize maxMemorySize = MemorySize.ofMebiBytes(700);
+        final int numSlots = 3;
+        configuration.set(ResourceManagerOptions.MIN_TOTAL_MEM, minMemorySize);
+        configuration.set(ResourceManagerOptions.MAX_TOTAL_MEM, maxMemorySize);
+
+        assertThatIllegalStateException()
+                .isThrownBy(
+                        () ->
+                                SlotManagerConfiguration.fromConfiguration(
+                                        configuration,
+                                        new WorkerResourceSpec.Builder()
+                                                .setNumSlots(numSlots)
+                                                .setTaskHeapMemoryMB(100)
+                                                .setManagedMemoryMB(100)
+                                                .setNetworkMemoryMB(100)
+                                                .setTaskOffHeapMemoryMB(100)
+                                                .build()));
+    }
 }