Refactor PinotTaskManager class (#12964)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index e09bde84..0d9d3a0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -618,22 +618,20 @@
   @ApiOperation("Schedule tasks and return a map from task type to task name scheduled")
   public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
       @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName,
-      @ApiParam(value = "Minion Instance tag to schedule the task explicitly on")
-      @QueryParam("minionInstanceTag") @Nullable String minionInstanceTag,
-      @Context HttpHeaders headers) {
+      @ApiParam(value = "Minion Instance tag to schedule the task explicitly on") @QueryParam("minionInstanceTag")
+      @Nullable String minionInstanceTag, @Context HttpHeaders headers) {
     String database = headers != null ? headers.getHeaderString(DATABASE) : DEFAULT_DATABASE;
     if (taskType != null) {
       // Schedule task for the given task type
-      List<String> taskNames = tableName != null
-          ? _pinotTaskManager.scheduleTask(taskType,
+      List<String> taskNames = tableName != null ? _pinotTaskManager.scheduleTaskForTable(taskType,
           DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
           : _pinotTaskManager.scheduleTaskForDatabase(taskType, database, minionInstanceTag);
       return Collections.singletonMap(taskType, taskNames == null ? null : StringUtils.join(taskNames, ','));
     } else {
       // Schedule tasks for all task types
-      Map<String, List<String>> allTaskNames = tableName != null
-          ? _pinotTaskManager.scheduleTasks(DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
-          : _pinotTaskManager.scheduleTasksForDatabase(database, minionInstanceTag);
+      Map<String, List<String>> allTaskNames = tableName != null ? _pinotTaskManager.scheduleAllTasksForTable(
+          DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
+          : _pinotTaskManager.scheduleAllTasksForDatabase(database, minionInstanceTag);
       return allTaskNames.entrySet().stream()
           .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.join(",", entry.getValue())));
     }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
index 8c04338..f9b250b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
@@ -65,7 +65,7 @@
         return;
       }
       long jobStartTime = System.currentTimeMillis();
-      pinotTaskManager.scheduleTask(taskType, table);
+      pinotTaskManager.scheduleTaskForTable(taskType, table, null);
       LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType,
           jobExecutionContext.getNextFireTime());
       pinotTaskManager.getControllerMetrics().addTimedTableValue(PinotTaskManager.getCronJobName(table, taskType),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 4029944..97417d6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -22,7 +22,6 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -480,30 +479,72 @@
   }
 
   /**
-   * Public API to schedule tasks (all task types) for all tables in all databases.
+   * Schedules tasks (all task types) for all tables.
    * It might be called from the non-leader controller.
    * Returns a map from the task type to the list of tasks scheduled.
    */
-  public synchronized Map<String, List<String>> scheduleTasks() {
-    return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, null);
+  public synchronized Map<String, List<String>> scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
+    return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, minionInstanceTag);
   }
 
   /**
-   * Public API to schedule tasks (all task types) for all tables in given database.
+   * Schedules tasks (all task types) for all tables in the given database.
    * It might be called from the non-leader controller.
    * Returns a map from the task type to the list of tasks scheduled.
    */
-  public synchronized Map<String, List<String>> scheduleTasksForDatabase(@Nullable String database,
+  public synchronized Map<String, List<String>> scheduleAllTasksForDatabase(@Nullable String database,
       @Nullable String minionInstanceTag) {
     return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), false, minionInstanceTag);
   }
 
   /**
+   * Schedules tasks (all task types) for the given table.
+   * It might be called from the non-leader controller.
+   * Returns a map from the task type to the list of tasks scheduled.
+   */
+  public synchronized Map<String, List<String>> scheduleAllTasksForTable(String tableNameWithType,
+      @Nullable String minionInstanceTag) {
+    return scheduleTasks(List.of(tableNameWithType), false, minionInstanceTag);
+  }
+
+  /**
+   * Schedules task for the given task type for all tables.
+   * It might be called from the non-leader controller.
+   * Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
+   */
+  @Nullable
+  public synchronized List<String> scheduleTaskForAllTables(String taskType, @Nullable String minionInstanceTag) {
+    return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag);
+  }
+
+  /**
+   * Schedules task for the given task type for all tables in the given database.
+   * It might be called from the non-leader controller.
+   * Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
+   */
+  @Nullable
+  public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database,
+      @Nullable String minionInstanceTag) {
+    return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
+  }
+
+  /**
+   * Schedules task for the given task type for the give table.
+   * It might be called from the non-leader controller.
+   * Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
+   */
+  @Nullable
+  public synchronized List<String> scheduleTaskForTable(String taskType, String tableNameWithType,
+      @Nullable String minionInstanceTag) {
+    return scheduleTask(taskType, List.of(tableNameWithType), minionInstanceTag);
+  }
+
+  /**
    * Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map
    * from the task type to the list of the tasks scheduled.
    */
-  private synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType,
-      boolean isLeader, @Nullable String minionInstanceTag) {
+  private synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType, boolean isLeader,
+      @Nullable String minionInstanceTag) {
     _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
 
     // Scan all table configs to get the tables with tasks enabled
@@ -541,6 +582,27 @@
     return tasksScheduled;
   }
 
+  @Nullable
+  private synchronized List<String> scheduleTask(String taskType, List<String> tables,
+      @Nullable String minionInstanceTag) {
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
+
+    // Scan all table configs to get the tables with task enabled
+    List<TableConfig> enabledTableConfigs = new ArrayList<>();
+    for (String tableNameWithType : tables) {
+      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig()
+          .isTaskTypeEnabled(taskType)) {
+        enabledTableConfigs.add(tableConfig);
+      }
+    }
+
+    _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+    addTaskTypeMetricsUpdaterIfNeeded(taskType);
+    return scheduleTask(taskGenerator, enabledTableConfigs, false, minionInstanceTag);
+  }
+
   /**
    * Helper method to schedule task with the given task generator for the given tables that have the task enabled.
    * Returns the list of task names, or {@code null} if no task is scheduled.
@@ -554,8 +616,8 @@
     for (TableConfig tableConfig : enabledTableConfigs) {
       String tableName = tableConfig.getTableName();
       try {
-        String minionInstanceTag = minionInstanceTagForTask != null
-            ? minionInstanceTagForTask : taskGenerator.getMinionInstanceTag(tableConfig);
+        String minionInstanceTag = minionInstanceTagForTask != null ? minionInstanceTagForTask
+            : taskGenerator.getMinionInstanceTag(tableConfig);
         List<PinotTaskConfig> presentTaskConfig =
             minionInstanceTagToTaskConfigs.computeIfAbsent(minionInstanceTag, k -> new ArrayList<>());
         taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
@@ -622,86 +684,6 @@
     return submittedTaskNames;
   }
 
-  /**
-   * Public API to schedule tasks (all task types) for the given table. It might be called from the non-leader
-   * controller. Returns a map from the task type to the list of tasks scheduled.
-   */
-  public synchronized Map<String, List<String>> scheduleTasks(String tableNameWithType) {
-    return scheduleTasks(Collections.singletonList(tableNameWithType), false, null);
-  }
-
-  /**
-   * Public API to schedule tasks (all task types) for the given table on a specific instance tag.
-   * It might be called from the non-leader controller. Returns a map from the task type to the list of tasks scheduled.
-   */
-  public synchronized Map<String, List<String>> scheduleTasks(String tableNameWithType,
-      @Nullable String minionInstanceTag) {
-    return scheduleTasks(Collections.singletonList(tableNameWithType), false, minionInstanceTag);
-  }
-
-  /**
-   * Public API to schedule task for the given task type in all databases.
-   * It might be called from the non-leader controller.
-   * Returns the list of task names, or {@code null} if no task is scheduled.
-   */
-  @Nullable
-  public synchronized List<String> scheduleTask(String taskType, @Nullable String minionInstanceTag) {
-    return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag);
-  }
-
-  /**
-   * Public API to schedule task for the given task type in given database.
-   * It might be called from the non-leader controller.
-   * Returns the list of task name, or {@code null} if no task is scheduled.
-   */
-  @Nullable
-  public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database,
-      @Nullable String minionInstanceTag) {
-    return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
-  }
-
-  @Nullable
-  private List<String> scheduleTask(String taskType, List<String> tables, @Nullable String minionInstanceTag) {
-    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
-    Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
-
-    // Scan all table configs to get the tables with task enabled
-    List<TableConfig> enabledTableConfigs = new ArrayList<>();
-    for (String tableNameWithType : tables) {
-      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig()
-          .isTaskTypeEnabled(taskType)) {
-        enabledTableConfigs.add(tableConfig);
-      }
-    }
-
-    _helixTaskResourceManager.ensureTaskQueueExists(taskType);
-    addTaskTypeMetricsUpdaterIfNeeded(taskType);
-    return scheduleTask(taskGenerator, enabledTableConfigs, false, minionInstanceTag);
-  }
-
-  /**
-   * Public API to schedule task for the given task type on the given table. It might be called from the non-leader
-   * controller. Returns the list of task names, or {@code null} if no task is scheduled.
-   */
-  @Nullable
-  public synchronized List<String> scheduleTask(String taskType, String tableNameWithType,
-      @Nullable String minionInstanceTag) {
-    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
-    Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
-
-    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
-    Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType);
-
-    Preconditions.checkState(
-        tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig().isTaskTypeEnabled(taskType),
-        "Table: %s does not have task type: %s enabled", tableNameWithType, taskType);
-
-    _helixTaskResourceManager.ensureTaskQueueExists(taskType);
-    addTaskTypeMetricsUpdaterIfNeeded(taskType);
-    return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false, minionInstanceTag);
-  }
-
   @Override
   protected void processTables(List<String> tableNamesWithType, Properties taskProperties) {
     scheduleTasks(tableNamesWithType, true, null);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index b655416..c5be600 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -139,14 +139,14 @@
     List<File> avroFiles = unpackAvroData(_tempDir);
 
     // Create and upload segments
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(avroFiles, singleLevelConcatTableConfig, schema, 0, _segmentDir1, _tarDir1);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, singleLevelConcatTableConfig, schema, 0, _segmentDir1,
+        _tarDir1);
     buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "1");
     buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "2");
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema, 0, _segmentDir3, _tarDir3);
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(avroFiles, singleLevelConcatMetadataTableConfig, schema, 0, _segmentDir4, _tarDir4);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema, 0, _segmentDir3,
+        _tarDir3);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, singleLevelConcatMetadataTableConfig, schema, 0,
+        _segmentDir4, _tarDir4);
     uploadSegments(SINGLE_LEVEL_CONCAT_TEST_TABLE, _tarDir1);
     uploadSegments(SINGLE_LEVEL_ROLLUP_TEST_TABLE, _tarDir2);
     uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, _tarDir3);
@@ -160,8 +160,8 @@
     schema.setSchemaName(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE);
     addSchema(schema);
     TableConfig singleLevelConcatProcessAllRealtimeTableConfig =
-        createRealtimeTableConfigWithProcessAllMode(avroFiles.get(0),
-            MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE, PROCESS_ALL_MODE_KAFKA_TOPIC);
+        createRealtimeTableConfigWithProcessAllMode(avroFiles.get(0), MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE,
+            PROCESS_ALL_MODE_KAFKA_TOPIC);
     addTableConfig(singleLevelConcatProcessAllRealtimeTableConfig);
 
     // Push data into Kafka
@@ -172,9 +172,8 @@
     ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles.subList(0, 3), "localhost:" + getKafkaPort(),
         PROCESS_ALL_MODE_KAFKA_TOPIC, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(),
         injectTombstones());
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(avroFiles.subList(3, 9), singleLevelConcatProcessAllRealtimeTableConfig, schema, 0,
-            _segmentDir5, _tarDir5);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles.subList(3, 9),
+        singleLevelConcatProcessAllRealtimeTableConfig, schema, 0, _segmentDir5, _tarDir5);
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
 
@@ -216,14 +215,14 @@
 
   private TableConfig createOfflineTableConfig(String tableName, TableTaskConfig taskConfig,
       @Nullable SegmentPartitionConfig partitionConfig) {
-    return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName)
-        .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
-        .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
-        .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
-        .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
-        .setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
-        .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
-        .setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(partitionConfig).build();
+    return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+        .setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+        .setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+        .setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+        .setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+        .setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
+        .setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(getNullHandlingEnabled())
+        .setSegmentPartitionConfig(partitionConfig).build();
   }
 
   protected TableConfig createRealtimeTableConfigWithProcessAllMode(File sampleAvroFile, String tableName,
@@ -246,12 +245,12 @@
     tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min");
     tableTaskConfigs.put("WeatherDelay.aggregationType", "sum");
     tableTaskConfigs.put("mode", "processAll");
-    return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
-        .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
-        .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
-        .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
-        .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
-        .setLoadMode(getLoadMode()).setTaskConfig(
+    return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+        .setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+        .setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+        .setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+        .setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+        .setTaskConfig(
             new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs)))
         .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
         .setQueryConfig(getQueryConfig()).setStreamConfigs(streamConfigs)
@@ -411,17 +410,16 @@
     int numTasks = 0;
     List<String> taskList;
     for (String tasks =
-        _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null;
-        taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
       assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
-      assertNull(
-          _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
       // Check watermark
@@ -527,17 +525,16 @@
     int numTasks = 0;
     List<String> taskList;
     for (String tasks =
-        _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null;
-        taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
       assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
-      assertNull(
-          _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
       // Check watermark
@@ -636,17 +633,16 @@
     int numTasks = 0;
     List<String> taskList;
     for (String tasks =
-        _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null;
-        taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
       assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 1);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
-      assertNull(
-          _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
       // Check watermark
@@ -788,17 +784,16 @@
     int numTasks = 0;
     List<String> taskList;
     for (String tasks =
-        _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null;
-        taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
       assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
-      assertNull(
-          _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
       // Check watermark
@@ -859,8 +854,8 @@
         return false;
       }
       // Check if the task metadata is cleaned up
-      if (MinionTaskMetadataUtils
-          .fetchTaskMetadata(_propertyStore, MinionConstants.MergeRollupTask.TASK_TYPE, tableNameWithType) != null) {
+      if (MinionTaskMetadataUtils.fetchTaskMetadata(_propertyStore, MinionConstants.MergeRollupTask.TASK_TYPE,
+          tableNameWithType) != null) {
         return false;
       }
       return true;
@@ -921,18 +916,17 @@
     int numTasks = 0;
     List<String> taskList;
     for (String tasks =
-        taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null;
-        taskList = taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
 //      assertEquals(helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
       assertTrue(helixTaskResourceManager.getTaskQueues()
           .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
 
       // Will not schedule task if there's incomplete task
-      assertNull(
-          taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
       // Check watermark
@@ -1027,17 +1021,16 @@
     int numTasks = 0;
     List<String> taskList;
     for (String tasks =
-        taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null; taskList =
-        taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
       assertTrue(helixTaskResourceManager.getTaskQueues()
           .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
 
       // Will not schedule task if there's incomplete task
-      assertNull(
-          taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
       // Check not using watermarks
@@ -1069,11 +1062,10 @@
     waitForAllDocsLoaded(600_000L);
 
     for (String tasks =
-        taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null; taskList =
-        taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
       waitForTaskToComplete();
       // Check metrics
       long numBucketsToProcess = MetricValueUtils.getGaugeValue(_controllerStarter.getControllerMetrics(),
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index c4ba131..da4e856 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.integration.tests;
 
-import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -63,7 +62,6 @@
   private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3";
   private static final String PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE = "myTable4";
 
-
   protected PinotHelixTaskResourceManager _helixTaskResourceManager;
   protected PinotTaskManager _taskManager;
   protected PinotHelixResourceManager _pinotHelixResourceManager;
@@ -83,12 +81,8 @@
     startBrokers(1);
     startServers(1);
 
-    List<String> allTables = ImmutableList.of(
-        PURGE_FIRST_RUN_TABLE,
-        PURGE_DELTA_PASSED_TABLE,
-        PURGE_DELTA_NOT_PASSED_TABLE,
-        PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
-    );
+    List<String> allTables = List.of(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE,
+        PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
     Schema schema = null;
     TableConfig tableConfig = null;
     for (String tableName : allTables) {
@@ -152,12 +146,9 @@
   private void setRecordPurger() {
     MinionContext minionContext = MinionContext.getInstance();
     minionContext.setRecordPurgerFactory(rawTableName -> {
-      List<String> tableNames = Arrays.asList(
-          PURGE_FIRST_RUN_TABLE,
-          PURGE_DELTA_PASSED_TABLE,
-          PURGE_DELTA_NOT_PASSED_TABLE,
-          PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
-      );
+      List<String> tableNames =
+          Arrays.asList(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE,
+              PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
       if (tableNames.contains(rawTableName)) {
         return row -> row.getValue("ArrTime").equals(1);
       } else {
@@ -195,11 +186,12 @@
     // 5. Check the purge process itself by setting an expecting number of rows
 
     String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_FIRST_RUN_TABLE);
-    assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNotNull(
+        _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
     // Will not schedule task if there's incomplete task
-    assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
     waitForTaskToComplete();
 
     // Check that metadata contains expected values
@@ -209,7 +201,7 @@
           metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX));
     }
     // Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay)
-    assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
 
     // 52 rows with ArrTime = 1
     // 115545 totals rows
@@ -239,11 +231,12 @@
     // 5. Check the purge process itself by setting an expecting number of rows
 
     String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE);
-    assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNotNull(
+        _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
     // Will not schedule task if there's incomplete task
-    assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
     waitForTaskToComplete();
 
     // Check that metadata contains expected values
@@ -255,7 +248,7 @@
       assertTrue(System.currentTimeMillis() - Long.parseLong(purgeTime) < 86400000);
     }
     // Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay)
-    assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
 
     // 52 rows with ArrTime = 1
     // 115545 totals rows
@@ -287,7 +280,7 @@
     String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE);
 
     // No task should be schedule as the delay is not passed
-    assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
     for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
       // Check purge time
       String purgeTime =
@@ -338,10 +331,11 @@
 
     // schedule purge tasks
     String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
-    assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNotNull(
+        _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
-    assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
     waitForTaskToComplete();
 
     // Check that metadata contains expected values
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 043c654..e6c8ce2 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -134,14 +134,14 @@
 
     Map<String, String> taskConfigsWithMetadata = new HashMap<>();
     taskConfigsWithMetadata.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
-    taskConfigsWithMetadata.put(
-        BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.METADATA.toString());
+    taskConfigsWithMetadata.put(BatchConfigProperties.PUSH_MODE,
+        BatchConfigProperties.SegmentPushType.METADATA.toString());
     String tableWithMetadataPush = "myTable2";
     schema.setSchemaName(tableWithMetadataPush);
     addSchema(schema);
     TableConfig realtimeMetadataTableConfig = createRealtimeTableConfig(avroFiles.get(0), tableWithMetadataPush,
-        new TableTaskConfig(Collections.singletonMap(
-            MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigsWithMetadata)));
+        new TableTaskConfig(Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+            taskConfigsWithMetadata)));
     realtimeMetadataTableConfig.setIngestionConfig(ingestionConfig);
     realtimeMetadataTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
     addTableConfig(realtimeMetadataTableConfig);
@@ -151,7 +151,6 @@
     offlineMetadataTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
     addTableConfig(offlineMetadataTableConfig);
 
-
     // Push data into Kafka
     pushAvroIntoKafka(avroFiles);
 
@@ -163,7 +162,6 @@
 
     waitForDocsLoaded(600_000L, true, tableWithMetadataPush);
 
-
     _taskResourceManager = _controllerStarter.getHelixTaskResourceManager();
     _taskManager = _controllerStarter.getTaskManager();
     _realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
@@ -181,8 +179,8 @@
     }
     _dataSmallestTimeMs = minSegmentTimeMs;
 
-   segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_realtimeMetadataTableName);
-   minSegmentTimeMs = Long.MAX_VALUE;
+    segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_realtimeMetadataTableName);
+    minSegmentTimeMs = Long.MAX_VALUE;
     for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
       if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
         minSegmentTimeMs = Math.min(minSegmentTimeMs, segmentZKMetadata.getStartTimeMs());
@@ -193,29 +191,28 @@
 
   private TableConfig createOfflineTableConfig(String tableName, @Nullable TableTaskConfig taskConfig,
       @Nullable SegmentPartitionConfig partitionConfig) {
-    return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName)
-        .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
-        .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
-        .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
-        .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
-        .setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
-        .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
-        .setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(partitionConfig).build();
+    return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+        .setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+        .setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+        .setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+        .setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+        .setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
+        .setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(getNullHandlingEnabled())
+        .setSegmentPartitionConfig(partitionConfig).build();
   }
 
   protected TableConfig createRealtimeTableConfig(File sampleAvroFile, String tableName, TableTaskConfig taskConfig) {
     AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
-    return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
-        .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
-        .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
-        .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
-        .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
-        .setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
-        .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig())
-        .setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
+    return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+        .setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+        .setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+        .setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+        .setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+        .setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
+        .setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig()).setStreamConfigs(getStreamConfigs())
+        .setNullHandlingEnabled(getNullHandlingEnabled()).build();
   }
 
-
   @Test
   public void testRealtimeToOfflineSegmentsTask()
       throws Exception {
@@ -234,12 +231,12 @@
     long expectedWatermark = _dataSmallestTimeMs + 86400000;
     for (int i = 0; i < 3; i++) {
       // Schedule task
-      assertNotNull(_taskManager.scheduleTasks(_realtimeTableName)
+      assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName, null)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       assertTrue(_taskResourceManager.getTaskQueues().contains(
           PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
       // Should not generate more tasks
-      assertNull(_taskManager.scheduleTasks(_realtimeTableName)
+      assertNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName, null)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
 
       // Wait at most 600 seconds for all tasks COMPLETED
@@ -286,12 +283,12 @@
     _taskManager.cleanUpTask();
     for (int i = 0; i < 3; i++) {
       // Schedule task
-      assertNotNull(_taskManager.scheduleTasks(_realtimeMetadataTableName)
+      assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName, null)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       assertTrue(_taskResourceManager.getTaskQueues().contains(
           PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
       // Should not generate more tasks
-      assertNull(_taskManager.scheduleTasks(_realtimeMetadataTableName)
+      assertNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName, null)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
 
       // Wait at most 600 seconds for all tasks COMPLETED
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 241c1c0..78aa4d1 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -87,8 +87,8 @@
     properties.put(TASK_TYPE + MinionConstants.MAX_ATTEMPTS_PER_TASK_KEY_SUFFIX, "2");
 
     helixResourceManager.getHelixAdmin().setConfig(
-        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
-            .forCluster(helixResourceManager.getHelixClusterName()).build(), properties);
+        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
+            helixResourceManager.getHelixClusterName()).build(), properties);
 
     // Add 3 offline tables, where 2 of them have TestTask enabled
     addDummySchema(TABLE_NAME_1);
@@ -136,7 +136,7 @@
     assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(), 0);
 
     // Should create the task queues and generate a task in the same minion instance
-    List<String> task1 = _taskManager.scheduleTasks().get(TASK_TYPE);
+    List<String> task1 = _taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE);
     assertNotNull(task1);
     assertEquals(task1.size(), 1);
     assertTrue(_helixTaskResourceManager.getTaskQueues()
@@ -150,7 +150,7 @@
     verifyTaskCount(task1.get(0), 0, 1, 1, 2);
     // Should generate one more task, with two sub-tasks. Both of these sub-tasks will wait
     // since we have one minion instance that is still running one of the sub-tasks.
-    List<String> task2 = _taskManager.scheduleTask(TASK_TYPE, null);
+    List<String> task2 = _taskManager.scheduleTaskForAllTables(TASK_TYPE, null);
     assertNotNull(task2);
     assertEquals(task2.size(), 1);
     assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2.get(0)));
@@ -159,8 +159,8 @@
     // Should not generate more tasks since SimpleMinionClusterIntegrationTests.NUM_TASKS is 2.
     // Our test task generator does not generate if there are already this many sub-tasks in the
     // running+waiting count already.
-    assertNull(_taskManager.scheduleTasks().get(TASK_TYPE));
-    assertNull(_taskManager.scheduleTask(TASK_TYPE, null));
+    assertNull(_taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE));
+    assertNull(_taskManager.scheduleTaskForAllTables(TASK_TYPE, null));
 
     // Wait at most 60 seconds for all tasks IN_PROGRESS
     TestUtils.waitForCondition(input -> {
@@ -183,13 +183,12 @@
     String inProgressGauge = TASK_TYPE + "." + TaskState.IN_PROGRESS;
     String stoppedGauge = TASK_TYPE + "." + TaskState.STOPPED;
     String completedGauge = TASK_TYPE + "." + TaskState.COMPLETED;
-    TestUtils.waitForCondition(
-        input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS)
+    TestUtils.waitForCondition(input ->
+        MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS)
             == NUM_TASKS
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) == 0
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS)
-            == 0,
-        ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+            == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
 
     // Stop the task queue
     _helixTaskResourceManager.stopTaskQueue(TASK_TYPE);
@@ -211,14 +210,12 @@
     }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks STOPPED");
 
     // Wait at most 30 seconds for ZK callback to update the controller gauges
-    TestUtils.waitForCondition(
-        input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS)
-            == 0
+    TestUtils.waitForCondition(input ->
+        MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) == 0
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS)
             == NUM_TASKS
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS)
-            == 0,
-        ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+            == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
 
     // Task deletion requires the task queue to be stopped,
     // so deleting task1 here before resuming the task queue.
@@ -247,13 +244,11 @@
     }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks COMPLETED");
 
     // Wait at most 30 seconds for ZK callback to update the controller gauges
-    TestUtils.waitForCondition(
-        input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS)
-            == 0
+    TestUtils.waitForCondition(input ->
+        MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) == 0
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) == 0
-            && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS)
-            == (NUM_TASKS - 1),
-        ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+            && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS) == (
+            NUM_TASKS - 1), ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
 
     // Delete the task queue
     _helixTaskResourceManager.deleteTaskQueue(TASK_TYPE, false);
@@ -263,13 +258,11 @@
         STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
 
     // Wait at most 30 seconds for ZK callback to update the controller gauges
-    TestUtils.waitForCondition(
-        input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS)
-            == 0
+    TestUtils.waitForCondition(input ->
+        MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) == 0
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) == 0
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS)
-            == 0,
-        ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+            == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
   }
 
   @AfterClass
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
index d292ef4..5058fd4 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
@@ -489,7 +489,7 @@
     Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0);
 
     // schedule offline segment generation
-    Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks());
+    Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null));
 
     // wait for offline segments
     JsonNode offlineSegments = TestUtils.waitForResult(() -> {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index 238d515..19c3ac6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -471,8 +471,8 @@
     waitForAllDocsLoaded(tableName, 600_000L, 1000);
     assertEquals(getScore(tableName), 3692);
     waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 3);
-
-    assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+    String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+    assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null)
         .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
     waitForTaskToComplete();
     waitForAllDocsLoaded(tableName, 600_000L, 3);
@@ -501,8 +501,8 @@
     waitForAllDocsLoaded(tableName, 600_000L, 2000);
     assertEquals(getScore(tableName), 3692);
     waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 5);
-
-    assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+    String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+    assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null)
         .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
     waitForTaskToComplete();
     waitForAllDocsLoaded(tableName, 600_000L, 3);
@@ -546,7 +546,8 @@
 
     // Run segment compaction. This time, we expect that the deleting rows are still there because they are
     // as part of the consuming segment
-    assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+    String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+    assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null)
         .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
     waitForTaskToComplete();
     waitForAllDocsLoaded(tableName, 600_000L, 3);
@@ -563,7 +564,8 @@
     assertEquals(getNumDeletedRows(tableName), 2);
 
     // Run segment compaction. This time, we expect that the deleting rows are cleaned up
-    assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+    realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+    assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null)
         .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
     waitForTaskToComplete();
     waitForAllDocsLoaded(tableName, 600_000L, 3);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
index e8389b3..08aa9ae 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
@@ -203,7 +203,7 @@
     Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0);
 
     // schedule offline segment generation
-    Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks());
+    Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null));
 
     // wait for offline segments
     JsonNode offlineSegments = TestUtils.waitForResult(() -> {