blob: 97417d6bea94f6a7637f89ffd0ba9a8afcce5a19 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.minion;
import com.google.common.base.Preconditions;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.task.TaskState;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.minion.TaskGeneratorMostRecentRunInfo;
import org.apache.pinot.common.minion.TaskManagerStatusCache;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.exception.TaskAlreadyExistsException;
import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The class <code>PinotTaskManager</code> is the component inside Pinot Controller to periodically check the Pinot
* cluster status and schedule new tasks.
* <p><code>PinotTaskManager</code> is also responsible for checking the health status on each type of tasks, detect and
* fix issues accordingly.
*/
public class PinotTaskManager extends ControllerPeriodicTask<Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskManager.class);
public final static String PINOT_TASK_MANAGER_KEY = "PinotTaskManager";
public final static String SKIP_LATE_CRON_SCHEDULE = "SkipLateCronSchedule";
public final static String MAX_CRON_SCHEDULE_DELAY_IN_SECONDS = "MaxCronScheduleDelayInSeconds";
public final static String LEAD_CONTROLLER_MANAGER_KEY = "LeadControllerManager";
public final static String SCHEDULE_KEY = "schedule";
public final static String MINION_INSTANCE_TAG_CONFIG = "minionInstanceTag";
private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
private static final String TASK_QUEUE_PATH_PATTERN = "/TaskRebalancer/TaskQueue_%s/Context";
private final PinotHelixTaskResourceManager _helixTaskResourceManager;
private final ClusterInfoAccessor _clusterInfoAccessor;
private final TaskGeneratorRegistry _taskGeneratorRegistry;
// For cron-based scheduling
private final Scheduler _scheduler;
private final boolean _skipLateCronSchedule;
private final int _maxCronScheduleDelayInSeconds;
private final Map<String, Map<String, String>> _tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<>();
private final Map<String, TableTaskSchedulerUpdater> _tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<>();
// For metrics
private final Map<String, TaskTypeMetricsUpdater> _taskTypeMetricsUpdaterMap = new ConcurrentHashMap<>();
private final Map<TaskState, Integer> _taskStateToCountMap = new ConcurrentHashMap<>();
private final ZkTableConfigChangeListener _zkTableConfigChangeListener = new ZkTableConfigChangeListener();
private final TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> _taskManagerStatusCache;
public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager,
ControllerConf controllerConf, ControllerMetrics controllerMetrics,
TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> taskManagerStatusCache, Executor executor,
PoolingHttpClientConnectionManager connectionManager) {
super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager,
controllerMetrics);
_helixTaskResourceManager = helixTaskResourceManager;
_taskManagerStatusCache = taskManagerStatusCache;
_clusterInfoAccessor =
new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf, controllerMetrics,
leadControllerManager, executor, connectionManager);
_taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);
_skipLateCronSchedule = controllerConf.isSkipLateCronSchedule();
_maxCronScheduleDelayInSeconds = controllerConf.getMaxCronScheduleDelayInSeconds();
if (controllerConf.isPinotTaskManagerSchedulerEnabled()) {
try {
_scheduler = new StdSchedulerFactory().getScheduler();
_scheduler.start();
synchronized (_zkTableConfigChangeListener) {
// Subscribe child changes before reading the data to avoid missing changes
LOGGER.info("Check and subscribe to tables change under PropertyStore path: {}", TABLE_CONFIG_PARENT_PATH);
_pinotHelixResourceManager.getPropertyStore()
.subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, _zkTableConfigChangeListener);
List<String> tables = _pinotHelixResourceManager.getPropertyStore()
.getChildNames(TABLE_CONFIG_PARENT_PATH, AccessOption.PERSISTENT);
if (CollectionUtils.isNotEmpty(tables)) {
checkTableConfigChanges(tables);
}
}
} catch (SchedulerException e) {
throw new RuntimeException("Caught exception while setting up the scheduler", e);
}
} else {
_scheduler = null;
}
}
public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName,
Map<String, String> taskConfigs)
throws Exception {
if (taskName == null) {
taskName = tableName + "_" + UUID.randomUUID();
LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
}
String minionInstanceTag =
taskConfigs.getOrDefault(MINION_INSTANCE_TAG_CONFIG, CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);
String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
if (taskState != null) {
throw new TaskAlreadyExistsException(
"Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
}
List<String> tableNameWithTypes = new ArrayList<>();
if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) {
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
if (_pinotHelixResourceManager.hasOfflineTable(offlineTableName)) {
tableNameWithTypes.add(offlineTableName);
}
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
if (_pinotHelixResourceManager.hasRealtimeTable(realtimeTableName)) {
tableNameWithTypes.add(realtimeTableName);
}
} else {
if (_pinotHelixResourceManager.hasTable(tableName)) {
tableNameWithTypes.add(tableName);
}
}
if (tableNameWithTypes.isEmpty()) {
throw new TableNotFoundException("'tableName' " + tableName + " is not found");
}
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
// Generate each type of tasks
if (taskGenerator == null) {
throw new UnknownTaskTypeException(
"Task type: " + taskType + " is not registered, cannot enable it for table: " + tableName);
}
// responseMap holds the table to task name mapping.
Map<String, String> responseMap = new HashMap<>();
for (String tableNameWithType : tableNameWithTypes) {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
LOGGER.info("Trying to create tasks of type: {}, table: {}", taskType, tableNameWithType);
List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(tableConfig, taskConfigs);
if (pinotTaskConfigs.isEmpty()) {
LOGGER.warn("No ad-hoc task generated for task type: {}", taskType);
continue;
}
LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", taskType, pinotTaskConfigs);
_controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1);
responseMap.put(tableNameWithType,
_helixTaskResourceManager.submitTask(parentTaskName, pinotTaskConfigs, minionInstanceTag,
taskGenerator.getTaskTimeoutMs(), taskGenerator.getNumConcurrentTasksPerInstance(),
taskGenerator.getMaxAttemptsPerTask()));
}
if (responseMap.isEmpty()) {
LOGGER.warn("No task submitted for tableName: {}", tableName);
}
return responseMap;
}
private class ZkTableConfigChangeListener implements IZkChildListener {
@Override
public synchronized void handleChildChange(String path, List<String> tableNamesWithType) {
checkTableConfigChanges(tableNamesWithType);
}
}
private void checkTableConfigChanges(List<String> tableNamesWithType) {
LOGGER.info("Checking task config changes in table configs");
// NOTE: we avoided calling _leadControllerManager::isLeaderForTable here to skip tables the current
// controller is not leader for. Because _leadControllerManager updates its leadership states based
// on a ZK event, and that ZK event may come later than this method call, making current controller
// think it's not lead for certain tables, when it should be if the leadership states are fully updated.
if (_tableTaskSchedulerUpdaterMap.isEmpty()) {
// Initial setup
for (String tableNameWithType : tableNamesWithType) {
subscribeTableConfigChanges(tableNameWithType);
}
} else {
Set<String> existingTables = new HashSet<>(_tableTaskSchedulerUpdaterMap.keySet());
Set<String> tablesToAdd = new HashSet<>();
for (String tableNameWithType : tableNamesWithType) {
if (!existingTables.remove(tableNameWithType)) {
tablesToAdd.add(tableNameWithType);
}
}
for (String tableNameWithType : tablesToAdd) {
subscribeTableConfigChanges(tableNameWithType);
}
if (!existingTables.isEmpty()) {
LOGGER.info("Found tables to clean up cron task scheduler: {}", existingTables);
for (String tableNameWithType : existingTables) {
cleanUpCronTaskSchedulerForTable(tableNameWithType);
}
}
}
}
private String getPropertyStorePathForTable(String tableWithType) {
return TABLE_CONFIG_PATH_PREFIX + tableWithType;
}
private String getPropertyStorePathForTaskQueue(String taskType) {
return String.format(TASK_QUEUE_PATH_PATTERN, taskType);
}
public synchronized void cleanUpCronTaskSchedulerForTable(String tableWithType) {
LOGGER.info("Cleaning up task in scheduler for table {}", tableWithType);
TableTaskSchedulerUpdater tableTaskSchedulerUpdater = _tableTaskSchedulerUpdaterMap.get(tableWithType);
if (tableTaskSchedulerUpdater != null) {
_pinotHelixResourceManager.getPropertyStore()
.unsubscribeDataChanges(getPropertyStorePathForTable(tableWithType), tableTaskSchedulerUpdater);
}
removeAllTasksFromCronExpressions(tableWithType);
_tableTaskSchedulerUpdaterMap.remove(tableWithType);
}
private synchronized void removeAllTasksFromCronExpressions(String tableWithType) {
Set<JobKey> jobKeys;
try {
jobKeys = _scheduler.getJobKeys(GroupMatcher.anyJobGroup());
} catch (SchedulerException e) {
LOGGER.error("Got exception when fetching all jobKeys", e);
return;
}
for (JobKey jobKey : jobKeys) {
if (jobKey.getName().equals(tableWithType)) {
try {
_scheduler.deleteJob(jobKey);
_controllerMetrics.addValueToTableGauge(getCronJobName(tableWithType, jobKey.getGroup()),
ControllerGauge.CRON_SCHEDULER_JOB_SCHEDULED, -1L);
} catch (SchedulerException e) {
LOGGER.error("Got exception when deleting the scheduled job - {}", jobKey, e);
}
}
}
_tableTaskTypeToCronExpressionMap.remove(tableWithType);
}
public static String getCronJobName(String tableWithType, String taskType) {
return String.format("%s.%s", tableWithType, taskType);
}
public synchronized void subscribeTableConfigChanges(String tableWithType) {
if (_tableTaskSchedulerUpdaterMap.containsKey(tableWithType)) {
return;
}
TableTaskSchedulerUpdater tableTaskSchedulerUpdater = new TableTaskSchedulerUpdater(tableWithType, this);
_pinotHelixResourceManager.getPropertyStore()
.subscribeDataChanges(getPropertyStorePathForTable(tableWithType), tableTaskSchedulerUpdater);
_tableTaskSchedulerUpdaterMap.put(tableWithType, tableTaskSchedulerUpdater);
try {
updateCronTaskScheduler(tableWithType);
} catch (Exception e) {
LOGGER.error("Failed to create cron task in scheduler for table: {}", tableWithType, e);
}
}
public synchronized void updateCronTaskScheduler(String tableWithType) {
LOGGER.info("Trying to update task schedule for table: {}", tableWithType);
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableWithType);
if (tableConfig == null) {
LOGGER.info("tableConfig is null, trying to remove all the tasks for table {} if any", tableWithType);
removeAllTasksFromCronExpressions(tableWithType);
return;
}
TableTaskConfig taskConfig = tableConfig.getTaskConfig();
if (taskConfig == null) {
LOGGER.info("taskConfig is null, trying to remove all the tasks for table {} if any", tableWithType);
removeAllTasksFromCronExpressions(tableWithType);
return;
}
Map<String, Map<String, String>> taskTypeConfigsMap = taskConfig.getTaskTypeConfigsMap();
if (taskTypeConfigsMap == null) {
LOGGER.info("taskTypeConfigsMap is null, trying to remove all the tasks for table {} if any", tableWithType);
removeAllTasksFromCronExpressions(tableWithType);
return;
}
Map<String, String> taskToCronExpressionMap = getTaskToCronExpressionMap(taskTypeConfigsMap);
LOGGER.info("Got taskToCronExpressionMap {} ", taskToCronExpressionMap);
updateCronTaskScheduler(tableWithType, taskToCronExpressionMap);
}
private void updateCronTaskScheduler(String tableWithType, Map<String, String> taskToCronExpressionMap) {
Map<String, String> existingScheduledTasks = _tableTaskTypeToCronExpressionMap.get(tableWithType);
if (existingScheduledTasks != null && !existingScheduledTasks.isEmpty()) {
// Loop over existing tasks to identify tasks to be removed or updated
for (Map.Entry<String, String> entry : existingScheduledTasks.entrySet()) {
String existingTaskType = entry.getKey();
String newCronExpression = taskToCronExpressionMap.get(existingTaskType);
if (newCronExpression == null) {
// Task should be removed
try {
_scheduler.deleteJob(JobKey.jobKey(tableWithType, existingTaskType));
_controllerMetrics.addValueToTableGauge(getCronJobName(tableWithType, existingTaskType),
ControllerGauge.CRON_SCHEDULER_JOB_SCHEDULED, -1L);
} catch (SchedulerException e) {
LOGGER.error("Failed to delete scheduled job for table {}, task type {}", tableWithType, existingTaskType,
e);
}
continue;
}
if (!entry.getValue().equalsIgnoreCase(newCronExpression)) {
// Update existing task with new cron expr
try {
TriggerKey triggerKey = TriggerKey.triggerKey(tableWithType, existingTaskType);
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule(newCronExpression)).build();
_scheduler.rescheduleJob(triggerKey, trigger);
} catch (SchedulerException e) {
LOGGER.error("Failed to update scheduled job for table {}, task type {}", tableWithType, existingTaskType,
e);
}
}
}
// Loop over new tasks to identify tasks to be added
for (Map.Entry<String, String> entry : taskToCronExpressionMap.entrySet()) {
String newTaskType = entry.getKey();
if (!existingScheduledTasks.containsKey(newTaskType)) {
String newCronExpression = entry.getValue();
try {
scheduleJob(tableWithType, newTaskType, newCronExpression);
} catch (SchedulerException e) {
LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", tableWithType, newTaskType,
newCronExpression, e);
}
}
}
} else {
for (String taskType : taskToCronExpressionMap.keySet()) {
// Schedule new job
String cronExpr = taskToCronExpressionMap.get(taskType);
try {
scheduleJob(tableWithType, taskType, cronExpr);
} catch (SchedulerException e) {
LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", tableWithType, taskType,
cronExpr, e);
}
}
}
_tableTaskTypeToCronExpressionMap.put(tableWithType, taskToCronExpressionMap);
}
private void scheduleJob(String tableWithType, String taskType, String cronExprStr)
throws SchedulerException {
boolean exists = false;
try {
exists = _scheduler.checkExists(JobKey.jobKey(tableWithType, taskType));
} catch (SchedulerException e) {
LOGGER.error("Failed to check job existence for job key - table: {}, task: {} ", tableWithType, taskType, e);
}
if (!exists) {
LOGGER.info("Trying to schedule a job with cron expression: {} for table {}, task type: {}", cronExprStr,
tableWithType, taskType);
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(TriggerKey.triggerKey(tableWithType, taskType))
.withSchedule(CronScheduleBuilder.cronSchedule(cronExprStr)).build();
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put(PINOT_TASK_MANAGER_KEY, this);
jobDataMap.put(LEAD_CONTROLLER_MANAGER_KEY, _leadControllerManager);
jobDataMap.put(SKIP_LATE_CRON_SCHEDULE, _skipLateCronSchedule);
jobDataMap.put(MAX_CRON_SCHEDULE_DELAY_IN_SECONDS, _maxCronScheduleDelayInSeconds);
JobDetail jobDetail =
JobBuilder.newJob(CronJobScheduleJob.class).withIdentity(tableWithType, taskType).setJobData(jobDataMap)
.build();
try {
_scheduler.scheduleJob(jobDetail, trigger);
_controllerMetrics.addValueToTableGauge(getCronJobName(tableWithType, taskType),
ControllerGauge.CRON_SCHEDULER_JOB_SCHEDULED, 1L);
} catch (Exception e) {
LOGGER.error("Failed to parse Cron expression - " + cronExprStr, e);
throw e;
}
Date nextRuntime = trigger.getNextFireTime();
LOGGER.info("Scheduled task for table: {}, task type: {}, next runtime: {}", tableWithType, taskType,
nextRuntime);
}
}
private Map<String, String> getTaskToCronExpressionMap(Map<String, Map<String, String>> taskTypeConfigsMap) {
Map<String, String> taskToCronExpressionMap = new HashMap<>();
for (String taskType : taskTypeConfigsMap.keySet()) {
Map<String, String> taskTypeConfig = taskTypeConfigsMap.get(taskType);
if (taskTypeConfig == null || !taskTypeConfig.containsKey(SCHEDULE_KEY)) {
continue;
}
String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
if (cronExprStr == null) {
continue;
}
taskToCronExpressionMap.put(taskType, cronExprStr);
}
return taskToCronExpressionMap;
}
/**
* Returns the cluster info accessor.
* <p>Cluster info accessor can be used to initialize the task generator.
*/
public ClusterInfoAccessor getClusterInfoAccessor() {
return _clusterInfoAccessor;
}
/**
* Returns the task generator registry.
*/
public TaskGeneratorRegistry getTaskGeneratorRegistry() {
return _taskGeneratorRegistry;
}
/**
* Registers a task generator.
* <p>This method can be used to plug in custom task generators.
*/
public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
_taskGeneratorRegistry.registerTaskGenerator(taskGenerator);
}
/**
* Schedules tasks (all task types) for all tables.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, minionInstanceTag);
}
/**
* Schedules tasks (all task types) for all tables in the given database.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleAllTasksForDatabase(@Nullable String database,
@Nullable String minionInstanceTag) {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), false, minionInstanceTag);
}
/**
* Schedules tasks (all task types) for the given table.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleAllTasksForTable(String tableNameWithType,
@Nullable String minionInstanceTag) {
return scheduleTasks(List.of(tableNameWithType), false, minionInstanceTag);
}
/**
* Schedules task for the given task type for all tables.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
*/
@Nullable
public synchronized List<String> scheduleTaskForAllTables(String taskType, @Nullable String minionInstanceTag) {
return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag);
}
/**
* Schedules task for the given task type for all tables in the given database.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
*/
@Nullable
public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database,
@Nullable String minionInstanceTag) {
return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
}
/**
* Schedules task for the given task type for the give table.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
*/
@Nullable
public synchronized List<String> scheduleTaskForTable(String taskType, String tableNameWithType,
@Nullable String minionInstanceTag) {
return scheduleTask(taskType, List.of(tableNameWithType), minionInstanceTag);
}
/**
* Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map
* from the task type to the list of the tasks scheduled.
*/
private synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType, boolean isLeader,
@Nullable String minionInstanceTag) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
// Scan all table configs to get the tables with tasks enabled
Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>();
for (String tableNameWithType : tableNamesWithType) {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig != null && tableConfig.getTaskConfig() != null) {
Set<String> enabledTaskTypes = tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet();
for (String enabledTaskType : enabledTaskTypes) {
enabledTableConfigMap.computeIfAbsent(enabledTaskType, k -> new ArrayList<>()).add(tableConfig);
}
}
}
// Generate each type of tasks
Map<String, List<String>> tasksScheduled = new HashMap<>();
for (Map.Entry<String, List<TableConfig>> entry : enabledTableConfigMap.entrySet()) {
String taskType = entry.getKey();
List<TableConfig> enabledTableConfigs = entry.getValue();
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
if (taskGenerator != null) {
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);
tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, isLeader, minionInstanceTag));
} else {
List<String> enabledTables = new ArrayList<>(enabledTableConfigs.size());
for (TableConfig enabledTableConfig : enabledTableConfigs) {
enabledTables.add(enabledTableConfig.getTableName());
}
LOGGER.warn("Task type: {} is not registered, cannot enable it for tables: {}", taskType, enabledTables);
tasksScheduled.put(taskType, null);
}
}
return tasksScheduled;
}
@Nullable
private synchronized List<String> scheduleTask(String taskType, List<String> tables,
@Nullable String minionInstanceTag) {
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
// Scan all table configs to get the tables with task enabled
List<TableConfig> enabledTableConfigs = new ArrayList<>();
for (String tableNameWithType : tables) {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig()
.isTaskTypeEnabled(taskType)) {
enabledTableConfigs.add(tableConfig);
}
}
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);
return scheduleTask(taskGenerator, enabledTableConfigs, false, minionInstanceTag);
}
/**
* Helper method to schedule task with the given task generator for the given tables that have the task enabled.
* Returns the list of task names, or {@code null} if no task is scheduled.
*/
@Nullable
private List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
boolean isLeader, @Nullable String minionInstanceTagForTask) {
LOGGER.info("Trying to schedule task type: {}, isLeader: {}", taskGenerator.getTaskType(), isLeader);
Map<String, List<PinotTaskConfig>> minionInstanceTagToTaskConfigs = new HashMap<>();
String taskType = taskGenerator.getTaskType();
for (TableConfig tableConfig : enabledTableConfigs) {
String tableName = tableConfig.getTableName();
try {
String minionInstanceTag = minionInstanceTagForTask != null ? minionInstanceTagForTask
: taskGenerator.getMinionInstanceTag(tableConfig);
List<PinotTaskConfig> presentTaskConfig =
minionInstanceTagToTaskConfigs.computeIfAbsent(minionInstanceTag, k -> new ArrayList<>());
taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
minionInstanceTagToTaskConfigs.put(minionInstanceTag, presentTaskConfig);
long successRunTimestamp = System.currentTimeMillis();
_taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addSuccessRunTs(successRunTimestamp));
// before the first task schedule, the follow two gauge metrics will be empty
// TODO: find a better way to report task generation information
_controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
ControllerGauge.TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION,
() -> System.currentTimeMillis() - successRunTimestamp);
_controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR, 0L);
} catch (Exception e) {
StringWriter errors = new StringWriter();
try (PrintWriter pw = new PrintWriter(errors)) {
e.printStackTrace(pw);
}
long failureRunTimestamp = System.currentTimeMillis();
_taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(failureRunTimestamp,
errors.toString()));
// before the first task schedule, the follow gauge metric will be empty
// TODO: find a better way to report task generation information
_controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR, 1L);
LOGGER.error("Failed to generate tasks for task type {} for table {}", taskType, tableName, e);
}
}
if (!isLeader) {
taskGenerator.nonLeaderCleanUp();
}
int numErrorTasksScheduled = 0;
List<String> submittedTaskNames = new ArrayList<>();
for (String minionInstanceTag : minionInstanceTagToTaskConfigs.keySet()) {
List<PinotTaskConfig> pinotTaskConfigs = minionInstanceTagToTaskConfigs.get(minionInstanceTag);
int numTasks = pinotTaskConfigs.size();
try {
if (numTasks > 0) {
// This might lead to lot of logs, maybe sum it up and move outside the loop
LOGGER.info("Submitting {} tasks for task type: {} to minionInstance: {} with task configs: {}", numTasks,
taskType, minionInstanceTag, pinotTaskConfigs);
String submittedTaskName = _helixTaskResourceManager.submitTask(pinotTaskConfigs, minionInstanceTag,
taskGenerator.getTaskTimeoutMs(), taskGenerator.getNumConcurrentTasksPerInstance(),
taskGenerator.getMaxAttemptsPerTask());
submittedTaskNames.add(submittedTaskName);
_controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
}
} catch (Exception e) {
numErrorTasksScheduled++;
LOGGER.error("Failed to schedule task type {} on minion instance {} with task configs: {}", taskType,
minionInstanceTag, pinotTaskConfigs, e);
}
}
if (numErrorTasksScheduled > 0) {
LOGGER.warn("Failed to schedule {} tasks for task type type {}", numErrorTasksScheduled, taskType);
}
// No job got scheduled
if (numErrorTasksScheduled == minionInstanceTagToTaskConfigs.size() || submittedTaskNames.isEmpty()) {
return null;
}
// atleast one job got scheduled
return submittedTaskNames;
}
@Override
protected void processTables(List<String> tableNamesWithType, Properties taskProperties) {
scheduleTasks(tableNamesWithType, true, null);
}
@Override
public void cleanUpTask() {
LOGGER.info("Cleaning up all task generators");
for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
_taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
}
}
@Nullable
public Scheduler getScheduler() {
return _scheduler;
}
public synchronized void reportMetrics(String taskType) {
// Reset all counters to 0
for (Map.Entry<TaskState, Integer> entry : _taskStateToCountMap.entrySet()) {
entry.setValue(0);
}
if (_helixTaskResourceManager.getTaskTypes().contains(taskType)) {
Map<String, TaskState> taskStates = _helixTaskResourceManager.getTaskStates(taskType);
for (TaskState taskState : taskStates.values()) {
_taskStateToCountMap.merge(taskState, 1, Integer::sum);
}
}
for (Map.Entry<TaskState, Integer> taskStateEntry : _taskStateToCountMap.entrySet()) {
_controllerMetrics.setValueOfTableGauge(String.format("%s.%s", taskType, taskStateEntry.getKey()),
ControllerGauge.TASK_STATUS, taskStateEntry.getValue());
}
}
private synchronized void addTaskTypeMetricsUpdaterIfNeeded(String taskType) {
if (!_taskTypeMetricsUpdaterMap.containsKey(taskType)) {
TaskTypeMetricsUpdater taskTypeMetricsUpdater = new TaskTypeMetricsUpdater(taskType, this);
_pinotHelixResourceManager.getPropertyStore()
.subscribeDataChanges(getPropertyStorePathForTaskQueue(taskType), taskTypeMetricsUpdater);
_taskTypeMetricsUpdaterMap.put(taskType, taskTypeMetricsUpdater);
}
}
}