Refactor PinotTaskManager class (#12964)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index e09bde84..0d9d3a0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -618,22 +618,20 @@
@ApiOperation("Schedule tasks and return a map from task type to task name scheduled")
public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
@ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName,
- @ApiParam(value = "Minion Instance tag to schedule the task explicitly on")
- @QueryParam("minionInstanceTag") @Nullable String minionInstanceTag,
- @Context HttpHeaders headers) {
+ @ApiParam(value = "Minion Instance tag to schedule the task explicitly on") @QueryParam("minionInstanceTag")
+ @Nullable String minionInstanceTag, @Context HttpHeaders headers) {
String database = headers != null ? headers.getHeaderString(DATABASE) : DEFAULT_DATABASE;
if (taskType != null) {
// Schedule task for the given task type
- List<String> taskNames = tableName != null
- ? _pinotTaskManager.scheduleTask(taskType,
+ List<String> taskNames = tableName != null ? _pinotTaskManager.scheduleTaskForTable(taskType,
DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
: _pinotTaskManager.scheduleTaskForDatabase(taskType, database, minionInstanceTag);
return Collections.singletonMap(taskType, taskNames == null ? null : StringUtils.join(taskNames, ','));
} else {
// Schedule tasks for all task types
- Map<String, List<String>> allTaskNames = tableName != null
- ? _pinotTaskManager.scheduleTasks(DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
- : _pinotTaskManager.scheduleTasksForDatabase(database, minionInstanceTag);
+ Map<String, List<String>> allTaskNames = tableName != null ? _pinotTaskManager.scheduleAllTasksForTable(
+ DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
+ : _pinotTaskManager.scheduleAllTasksForDatabase(database, minionInstanceTag);
return allTaskNames.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> String.join(",", entry.getValue())));
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
index 8c04338..f9b250b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
@@ -65,7 +65,7 @@
return;
}
long jobStartTime = System.currentTimeMillis();
- pinotTaskManager.scheduleTask(taskType, table);
+ pinotTaskManager.scheduleTaskForTable(taskType, table, null);
LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType,
jobExecutionContext.getNextFireTime());
pinotTaskManager.getControllerMetrics().addTimedTableValue(PinotTaskManager.getCronJobName(table, taskType),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 4029944..97417d6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -22,7 +22,6 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -480,30 +479,72 @@
}
/**
- * Public API to schedule tasks (all task types) for all tables in all databases.
+ * Schedules tasks (all task types) for all tables.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
*/
- public synchronized Map<String, List<String>> scheduleTasks() {
- return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, null);
+ public synchronized Map<String, List<String>> scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
+ return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, minionInstanceTag);
}
/**
- * Public API to schedule tasks (all task types) for all tables in given database.
+ * Schedules tasks (all task types) for all tables in the given database.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
*/
- public synchronized Map<String, List<String>> scheduleTasksForDatabase(@Nullable String database,
+ public synchronized Map<String, List<String>> scheduleAllTasksForDatabase(@Nullable String database,
@Nullable String minionInstanceTag) {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), false, minionInstanceTag);
}
/**
+ * Schedules tasks (all task types) for the given table.
+ * It might be called from the non-leader controller.
+ * Returns a map from the task type to the list of tasks scheduled.
+ */
+ public synchronized Map<String, List<String>> scheduleAllTasksForTable(String tableNameWithType,
+ @Nullable String minionInstanceTag) {
+ return scheduleTasks(List.of(tableNameWithType), false, minionInstanceTag);
+ }
+
+ /**
+ * Schedules task for the given task type for all tables.
+ * It might be called from the non-leader controller.
+ * Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
+ */
+ @Nullable
+ public synchronized List<String> scheduleTaskForAllTables(String taskType, @Nullable String minionInstanceTag) {
+ return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag);
+ }
+
+ /**
+ * Schedules task for the given task type for all tables in the given database.
+ * It might be called from the non-leader controller.
+ * Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
+ */
+ @Nullable
+ public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database,
+ @Nullable String minionInstanceTag) {
+ return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
+ }
+
+ /**
+ * Schedules task for the given task type for the give table.
+ * It might be called from the non-leader controller.
+ * Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
+ */
+ @Nullable
+ public synchronized List<String> scheduleTaskForTable(String taskType, String tableNameWithType,
+ @Nullable String minionInstanceTag) {
+ return scheduleTask(taskType, List.of(tableNameWithType), minionInstanceTag);
+ }
+
+ /**
* Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map
* from the task type to the list of the tasks scheduled.
*/
- private synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType,
- boolean isLeader, @Nullable String minionInstanceTag) {
+ private synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType, boolean isLeader,
+ @Nullable String minionInstanceTag) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
// Scan all table configs to get the tables with tasks enabled
@@ -541,6 +582,27 @@
return tasksScheduled;
}
+ @Nullable
+ private synchronized List<String> scheduleTask(String taskType, List<String> tables,
+ @Nullable String minionInstanceTag) {
+ PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+ Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
+
+ // Scan all table configs to get the tables with task enabled
+ List<TableConfig> enabledTableConfigs = new ArrayList<>();
+ for (String tableNameWithType : tables) {
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig()
+ .isTaskTypeEnabled(taskType)) {
+ enabledTableConfigs.add(tableConfig);
+ }
+ }
+
+ _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+ addTaskTypeMetricsUpdaterIfNeeded(taskType);
+ return scheduleTask(taskGenerator, enabledTableConfigs, false, minionInstanceTag);
+ }
+
/**
* Helper method to schedule task with the given task generator for the given tables that have the task enabled.
* Returns the list of task names, or {@code null} if no task is scheduled.
@@ -554,8 +616,8 @@
for (TableConfig tableConfig : enabledTableConfigs) {
String tableName = tableConfig.getTableName();
try {
- String minionInstanceTag = minionInstanceTagForTask != null
- ? minionInstanceTagForTask : taskGenerator.getMinionInstanceTag(tableConfig);
+ String minionInstanceTag = minionInstanceTagForTask != null ? minionInstanceTagForTask
+ : taskGenerator.getMinionInstanceTag(tableConfig);
List<PinotTaskConfig> presentTaskConfig =
minionInstanceTagToTaskConfigs.computeIfAbsent(minionInstanceTag, k -> new ArrayList<>());
taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
@@ -622,86 +684,6 @@
return submittedTaskNames;
}
- /**
- * Public API to schedule tasks (all task types) for the given table. It might be called from the non-leader
- * controller. Returns a map from the task type to the list of tasks scheduled.
- */
- public synchronized Map<String, List<String>> scheduleTasks(String tableNameWithType) {
- return scheduleTasks(Collections.singletonList(tableNameWithType), false, null);
- }
-
- /**
- * Public API to schedule tasks (all task types) for the given table on a specific instance tag.
- * It might be called from the non-leader controller. Returns a map from the task type to the list of tasks scheduled.
- */
- public synchronized Map<String, List<String>> scheduleTasks(String tableNameWithType,
- @Nullable String minionInstanceTag) {
- return scheduleTasks(Collections.singletonList(tableNameWithType), false, minionInstanceTag);
- }
-
- /**
- * Public API to schedule task for the given task type in all databases.
- * It might be called from the non-leader controller.
- * Returns the list of task names, or {@code null} if no task is scheduled.
- */
- @Nullable
- public synchronized List<String> scheduleTask(String taskType, @Nullable String minionInstanceTag) {
- return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag);
- }
-
- /**
- * Public API to schedule task for the given task type in given database.
- * It might be called from the non-leader controller.
- * Returns the list of task name, or {@code null} if no task is scheduled.
- */
- @Nullable
- public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database,
- @Nullable String minionInstanceTag) {
- return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
- }
-
- @Nullable
- private List<String> scheduleTask(String taskType, List<String> tables, @Nullable String minionInstanceTag) {
- PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
- Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
-
- // Scan all table configs to get the tables with task enabled
- List<TableConfig> enabledTableConfigs = new ArrayList<>();
- for (String tableNameWithType : tables) {
- TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig()
- .isTaskTypeEnabled(taskType)) {
- enabledTableConfigs.add(tableConfig);
- }
- }
-
- _helixTaskResourceManager.ensureTaskQueueExists(taskType);
- addTaskTypeMetricsUpdaterIfNeeded(taskType);
- return scheduleTask(taskGenerator, enabledTableConfigs, false, minionInstanceTag);
- }
-
- /**
- * Public API to schedule task for the given task type on the given table. It might be called from the non-leader
- * controller. Returns the list of task names, or {@code null} if no task is scheduled.
- */
- @Nullable
- public synchronized List<String> scheduleTask(String taskType, String tableNameWithType,
- @Nullable String minionInstanceTag) {
- PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
- Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
-
- TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
- Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType);
-
- Preconditions.checkState(
- tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig().isTaskTypeEnabled(taskType),
- "Table: %s does not have task type: %s enabled", tableNameWithType, taskType);
-
- _helixTaskResourceManager.ensureTaskQueueExists(taskType);
- addTaskTypeMetricsUpdaterIfNeeded(taskType);
- return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false, minionInstanceTag);
- }
-
@Override
protected void processTables(List<String> tableNamesWithType, Properties taskProperties) {
scheduleTasks(tableNamesWithType, true, null);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index b655416..c5be600 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -139,14 +139,14 @@
List<File> avroFiles = unpackAvroData(_tempDir);
// Create and upload segments
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(avroFiles, singleLevelConcatTableConfig, schema, 0, _segmentDir1, _tarDir1);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, singleLevelConcatTableConfig, schema, 0, _segmentDir1,
+ _tarDir1);
buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "1");
buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "2");
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema, 0, _segmentDir3, _tarDir3);
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(avroFiles, singleLevelConcatMetadataTableConfig, schema, 0, _segmentDir4, _tarDir4);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema, 0, _segmentDir3,
+ _tarDir3);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, singleLevelConcatMetadataTableConfig, schema, 0,
+ _segmentDir4, _tarDir4);
uploadSegments(SINGLE_LEVEL_CONCAT_TEST_TABLE, _tarDir1);
uploadSegments(SINGLE_LEVEL_ROLLUP_TEST_TABLE, _tarDir2);
uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, _tarDir3);
@@ -160,8 +160,8 @@
schema.setSchemaName(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE);
addSchema(schema);
TableConfig singleLevelConcatProcessAllRealtimeTableConfig =
- createRealtimeTableConfigWithProcessAllMode(avroFiles.get(0),
- MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE, PROCESS_ALL_MODE_KAFKA_TOPIC);
+ createRealtimeTableConfigWithProcessAllMode(avroFiles.get(0), MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE,
+ PROCESS_ALL_MODE_KAFKA_TOPIC);
addTableConfig(singleLevelConcatProcessAllRealtimeTableConfig);
// Push data into Kafka
@@ -172,9 +172,8 @@
ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles.subList(0, 3), "localhost:" + getKafkaPort(),
PROCESS_ALL_MODE_KAFKA_TOPIC, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(),
injectTombstones());
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(avroFiles.subList(3, 9), singleLevelConcatProcessAllRealtimeTableConfig, schema, 0,
- _segmentDir5, _tarDir5);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles.subList(3, 9),
+ singleLevelConcatProcessAllRealtimeTableConfig, schema, 0, _segmentDir5, _tarDir5);
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
@@ -216,14 +215,14 @@
private TableConfig createOfflineTableConfig(String tableName, TableTaskConfig taskConfig,
@Nullable SegmentPartitionConfig partitionConfig) {
- return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName)
- .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
- .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
- .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
- .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
- .setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
- .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
- .setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(partitionConfig).build();
+ return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+ .setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+ .setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+ .setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+ .setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+ .setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
+ .setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(getNullHandlingEnabled())
+ .setSegmentPartitionConfig(partitionConfig).build();
}
protected TableConfig createRealtimeTableConfigWithProcessAllMode(File sampleAvroFile, String tableName,
@@ -246,12 +245,12 @@
tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min");
tableTaskConfigs.put("WeatherDelay.aggregationType", "sum");
tableTaskConfigs.put("mode", "processAll");
- return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
- .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
- .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
- .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
- .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
- .setLoadMode(getLoadMode()).setTaskConfig(
+ return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+ .setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+ .setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+ .setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+ .setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+ .setTaskConfig(
new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs)))
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
.setQueryConfig(getQueryConfig()).setStreamConfigs(streamConfigs)
@@ -411,17 +410,16 @@
int numTasks = 0;
List<String> taskList;
for (String tasks =
- _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null;
- taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(
- _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
// Check watermark
@@ -527,17 +525,16 @@
int numTasks = 0;
List<String> taskList;
for (String tasks =
- _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null;
- taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(
- _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
// Check watermark
@@ -636,17 +633,16 @@
int numTasks = 0;
List<String> taskList;
for (String tasks =
- _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null;
- taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 1);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(
- _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
// Check watermark
@@ -788,17 +784,16 @@
int numTasks = 0;
List<String> taskList;
for (String tasks =
- _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null;
- taskList = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(
- _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
// Check watermark
@@ -859,8 +854,8 @@
return false;
}
// Check if the task metadata is cleaned up
- if (MinionTaskMetadataUtils
- .fetchTaskMetadata(_propertyStore, MinionConstants.MergeRollupTask.TASK_TYPE, tableNameWithType) != null) {
+ if (MinionTaskMetadataUtils.fetchTaskMetadata(_propertyStore, MinionConstants.MergeRollupTask.TASK_TYPE,
+ tableNameWithType) != null) {
return false;
}
return true;
@@ -921,18 +916,17 @@
int numTasks = 0;
List<String> taskList;
for (String tasks =
- taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null;
- taskList = taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
// assertEquals(helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(
- taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
// Check watermark
@@ -1027,17 +1021,16 @@
int numTasks = 0;
List<String> taskList;
for (String tasks =
- taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null; taskList =
- taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
assertTrue(helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(
- taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
// Check not using watermarks
@@ -1069,11 +1062,10 @@
waitForAllDocsLoaded(600_000L);
for (String tasks =
- taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null; taskList =
- taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ taskManager.scheduleAllTasksForTable(realtimeTableName, null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
waitForTaskToComplete();
// Check metrics
long numBucketsToProcess = MetricValueUtils.getGaugeValue(_controllerStarter.getControllerMetrics(),
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index c4ba131..da4e856 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.integration.tests;
-import com.google.common.collect.ImmutableList;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
@@ -63,7 +62,6 @@
private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3";
private static final String PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE = "myTable4";
-
protected PinotHelixTaskResourceManager _helixTaskResourceManager;
protected PinotTaskManager _taskManager;
protected PinotHelixResourceManager _pinotHelixResourceManager;
@@ -83,12 +81,8 @@
startBrokers(1);
startServers(1);
- List<String> allTables = ImmutableList.of(
- PURGE_FIRST_RUN_TABLE,
- PURGE_DELTA_PASSED_TABLE,
- PURGE_DELTA_NOT_PASSED_TABLE,
- PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
- );
+ List<String> allTables = List.of(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE,
+ PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
Schema schema = null;
TableConfig tableConfig = null;
for (String tableName : allTables) {
@@ -152,12 +146,9 @@
private void setRecordPurger() {
MinionContext minionContext = MinionContext.getInstance();
minionContext.setRecordPurgerFactory(rawTableName -> {
- List<String> tableNames = Arrays.asList(
- PURGE_FIRST_RUN_TABLE,
- PURGE_DELTA_PASSED_TABLE,
- PURGE_DELTA_NOT_PASSED_TABLE,
- PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
- );
+ List<String> tableNames =
+ Arrays.asList(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE,
+ PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
if (tableNames.contains(rawTableName)) {
return row -> row.getValue("ArrTime").equals(1);
} else {
@@ -195,11 +186,12 @@
// 5. Check the purge process itself by setting an expecting number of rows
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_FIRST_RUN_TABLE);
- assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNotNull(
+ _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
waitForTaskToComplete();
// Check that metadata contains expected values
@@ -209,7 +201,7 @@
metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX));
}
// Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay)
- assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
// 52 rows with ArrTime = 1
// 115545 totals rows
@@ -239,11 +231,12 @@
// 5. Check the purge process itself by setting an expecting number of rows
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE);
- assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNotNull(
+ _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
waitForTaskToComplete();
// Check that metadata contains expected values
@@ -255,7 +248,7 @@
assertTrue(System.currentTimeMillis() - Long.parseLong(purgeTime) < 86400000);
}
// Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay)
- assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
// 52 rows with ArrTime = 1
// 115545 totals rows
@@ -287,7 +280,7 @@
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE);
// No task should be schedule as the delay is not passed
- assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Check purge time
String purgeTime =
@@ -338,10 +331,11 @@
// schedule purge tasks
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
- assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNotNull(
+ _taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
- assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null).get(MinionConstants.PurgeTask.TASK_TYPE));
waitForTaskToComplete();
// Check that metadata contains expected values
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 043c654..e6c8ce2 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -134,14 +134,14 @@
Map<String, String> taskConfigsWithMetadata = new HashMap<>();
taskConfigsWithMetadata.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
- taskConfigsWithMetadata.put(
- BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.METADATA.toString());
+ taskConfigsWithMetadata.put(BatchConfigProperties.PUSH_MODE,
+ BatchConfigProperties.SegmentPushType.METADATA.toString());
String tableWithMetadataPush = "myTable2";
schema.setSchemaName(tableWithMetadataPush);
addSchema(schema);
TableConfig realtimeMetadataTableConfig = createRealtimeTableConfig(avroFiles.get(0), tableWithMetadataPush,
- new TableTaskConfig(Collections.singletonMap(
- MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigsWithMetadata)));
+ new TableTaskConfig(Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+ taskConfigsWithMetadata)));
realtimeMetadataTableConfig.setIngestionConfig(ingestionConfig);
realtimeMetadataTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
addTableConfig(realtimeMetadataTableConfig);
@@ -151,7 +151,6 @@
offlineMetadataTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
addTableConfig(offlineMetadataTableConfig);
-
// Push data into Kafka
pushAvroIntoKafka(avroFiles);
@@ -163,7 +162,6 @@
waitForDocsLoaded(600_000L, true, tableWithMetadataPush);
-
_taskResourceManager = _controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
_realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
@@ -181,8 +179,8 @@
}
_dataSmallestTimeMs = minSegmentTimeMs;
- segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_realtimeMetadataTableName);
- minSegmentTimeMs = Long.MAX_VALUE;
+ segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_realtimeMetadataTableName);
+ minSegmentTimeMs = Long.MAX_VALUE;
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
minSegmentTimeMs = Math.min(minSegmentTimeMs, segmentZKMetadata.getStartTimeMs());
@@ -193,29 +191,28 @@
private TableConfig createOfflineTableConfig(String tableName, @Nullable TableTaskConfig taskConfig,
@Nullable SegmentPartitionConfig partitionConfig) {
- return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName)
- .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
- .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
- .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
- .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
- .setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
- .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
- .setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(partitionConfig).build();
+ return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+ .setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+ .setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+ .setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+ .setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+ .setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
+ .setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(getNullHandlingEnabled())
+ .setSegmentPartitionConfig(partitionConfig).build();
}
protected TableConfig createRealtimeTableConfig(File sampleAvroFile, String tableName, TableTaskConfig taskConfig) {
AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
- return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
- .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
- .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
- .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
- .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
- .setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
- .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig())
- .setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
+ return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+ .setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+ .setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+ .setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+ .setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+ .setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
+ .setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig()).setStreamConfigs(getStreamConfigs())
+ .setNullHandlingEnabled(getNullHandlingEnabled()).build();
}
-
@Test
public void testRealtimeToOfflineSegmentsTask()
throws Exception {
@@ -234,12 +231,12 @@
long expectedWatermark = _dataSmallestTimeMs + 86400000;
for (int i = 0; i < 3; i++) {
// Schedule task
- assertNotNull(_taskManager.scheduleTasks(_realtimeTableName)
+ assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName, null)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
assertTrue(_taskResourceManager.getTaskQueues().contains(
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
// Should not generate more tasks
- assertNull(_taskManager.scheduleTasks(_realtimeTableName)
+ assertNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName, null)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
// Wait at most 600 seconds for all tasks COMPLETED
@@ -286,12 +283,12 @@
_taskManager.cleanUpTask();
for (int i = 0; i < 3; i++) {
// Schedule task
- assertNotNull(_taskManager.scheduleTasks(_realtimeMetadataTableName)
+ assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName, null)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
assertTrue(_taskResourceManager.getTaskQueues().contains(
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
// Should not generate more tasks
- assertNull(_taskManager.scheduleTasks(_realtimeMetadataTableName)
+ assertNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName, null)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
// Wait at most 600 seconds for all tasks COMPLETED
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 241c1c0..78aa4d1 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -87,8 +87,8 @@
properties.put(TASK_TYPE + MinionConstants.MAX_ATTEMPTS_PER_TASK_KEY_SUFFIX, "2");
helixResourceManager.getHelixAdmin().setConfig(
- new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
- .forCluster(helixResourceManager.getHelixClusterName()).build(), properties);
+ new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
+ helixResourceManager.getHelixClusterName()).build(), properties);
// Add 3 offline tables, where 2 of them have TestTask enabled
addDummySchema(TABLE_NAME_1);
@@ -136,7 +136,7 @@
assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(), 0);
// Should create the task queues and generate a task in the same minion instance
- List<String> task1 = _taskManager.scheduleTasks().get(TASK_TYPE);
+ List<String> task1 = _taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE);
assertNotNull(task1);
assertEquals(task1.size(), 1);
assertTrue(_helixTaskResourceManager.getTaskQueues()
@@ -150,7 +150,7 @@
verifyTaskCount(task1.get(0), 0, 1, 1, 2);
// Should generate one more task, with two sub-tasks. Both of these sub-tasks will wait
// since we have one minion instance that is still running one of the sub-tasks.
- List<String> task2 = _taskManager.scheduleTask(TASK_TYPE, null);
+ List<String> task2 = _taskManager.scheduleTaskForAllTables(TASK_TYPE, null);
assertNotNull(task2);
assertEquals(task2.size(), 1);
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2.get(0)));
@@ -159,8 +159,8 @@
// Should not generate more tasks since SimpleMinionClusterIntegrationTests.NUM_TASKS is 2.
// Our test task generator does not generate if there are already this many sub-tasks in the
// running+waiting count already.
- assertNull(_taskManager.scheduleTasks().get(TASK_TYPE));
- assertNull(_taskManager.scheduleTask(TASK_TYPE, null));
+ assertNull(_taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE));
+ assertNull(_taskManager.scheduleTaskForAllTables(TASK_TYPE, null));
// Wait at most 60 seconds for all tasks IN_PROGRESS
TestUtils.waitForCondition(input -> {
@@ -183,13 +183,12 @@
String inProgressGauge = TASK_TYPE + "." + TaskState.IN_PROGRESS;
String stoppedGauge = TASK_TYPE + "." + TaskState.STOPPED;
String completedGauge = TASK_TYPE + "." + TaskState.COMPLETED;
- TestUtils.waitForCondition(
- input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS)
+ TestUtils.waitForCondition(input ->
+ MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS)
== NUM_TASKS
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) == 0
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS)
- == 0,
- ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+ == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
// Stop the task queue
_helixTaskResourceManager.stopTaskQueue(TASK_TYPE);
@@ -211,14 +210,12 @@
}, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks STOPPED");
// Wait at most 30 seconds for ZK callback to update the controller gauges
- TestUtils.waitForCondition(
- input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS)
- == 0
+ TestUtils.waitForCondition(input ->
+ MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) == 0
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS)
== NUM_TASKS
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS)
- == 0,
- ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+ == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
// Task deletion requires the task queue to be stopped,
// so deleting task1 here before resuming the task queue.
@@ -247,13 +244,11 @@
}, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks COMPLETED");
// Wait at most 30 seconds for ZK callback to update the controller gauges
- TestUtils.waitForCondition(
- input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS)
- == 0
+ TestUtils.waitForCondition(input ->
+ MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) == 0
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) == 0
- && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS)
- == (NUM_TASKS - 1),
- ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+ && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS) == (
+ NUM_TASKS - 1), ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
// Delete the task queue
_helixTaskResourceManager.deleteTaskQueue(TASK_TYPE, false);
@@ -263,13 +258,11 @@
STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
// Wait at most 30 seconds for ZK callback to update the controller gauges
- TestUtils.waitForCondition(
- input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS)
- == 0
+ TestUtils.waitForCondition(input ->
+ MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) == 0
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) == 0
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS)
- == 0,
- ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+ == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
}
@AfterClass
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
index d292ef4..5058fd4 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
@@ -489,7 +489,7 @@
Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0);
// schedule offline segment generation
- Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks());
+ Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null));
// wait for offline segments
JsonNode offlineSegments = TestUtils.waitForResult(() -> {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index 238d515..19c3ac6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -471,8 +471,8 @@
waitForAllDocsLoaded(tableName, 600_000L, 1000);
assertEquals(getScore(tableName), 3692);
waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 3);
-
- assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+ String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+ assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null)
.get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
waitForTaskToComplete();
waitForAllDocsLoaded(tableName, 600_000L, 3);
@@ -501,8 +501,8 @@
waitForAllDocsLoaded(tableName, 600_000L, 2000);
assertEquals(getScore(tableName), 3692);
waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 5);
-
- assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+ String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+ assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null)
.get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
waitForTaskToComplete();
waitForAllDocsLoaded(tableName, 600_000L, 3);
@@ -546,7 +546,8 @@
// Run segment compaction. This time, we expect that the deleting rows are still there because they are
// as part of the consuming segment
- assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+ String realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+ assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null)
.get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
waitForTaskToComplete();
waitForAllDocsLoaded(tableName, 600_000L, 3);
@@ -563,7 +564,8 @@
assertEquals(getNumDeletedRows(tableName), 2);
// Run segment compaction. This time, we expect that the deleting rows are cleaned up
- assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+ realtimeTableName = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+ assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, null)
.get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
waitForTaskToComplete();
waitForAllDocsLoaded(tableName, 600_000L, 3);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
index e8389b3..08aa9ae 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
@@ -203,7 +203,7 @@
Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0);
// schedule offline segment generation
- Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks());
+ Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null));
// wait for offline segments
JsonNode offlineSegments = TestUtils.waitForResult(() -> {