| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.overlord; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Optional; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import com.google.errorprone.annotations.concurrent.GuardedBy; |
| import org.apache.druid.annotations.SuppressFBWarnings; |
| import org.apache.druid.indexer.TaskLocation; |
| import org.apache.druid.indexer.TaskStatus; |
| import org.apache.druid.indexing.common.Counters; |
| import org.apache.druid.indexing.common.actions.TaskActionClientFactory; |
| import org.apache.druid.indexing.common.task.IndexTaskUtils; |
| import org.apache.druid.indexing.common.task.Task; |
| import org.apache.druid.indexing.common.task.Tasks; |
| import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException; |
| import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner; |
| import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; |
| import org.apache.druid.indexing.overlord.config.TaskLockConfig; |
| import org.apache.druid.indexing.overlord.config.TaskQueueConfig; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; |
| import org.apache.druid.java.util.common.lifecycle.LifecycleStart; |
| import org.apache.druid.java.util.common.lifecycle.LifecycleStop; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.java.util.emitter.service.ServiceEmitter; |
| import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; |
| import org.apache.druid.metadata.EntryExistsException; |
| import org.apache.druid.utils.CollectionUtils; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Interface between task producers and the task runner. |
| * <p/> |
| * This object accepts tasks from producers using {@link #add} and manages delivery of these tasks to a |
| * {@link TaskRunner}. Tasks will run in a mostly-FIFO order, with deviations when the natural next task is not ready |
| * in time (based on its {@link Task#isReady} method). |
| * <p/> |
| * For persistence, we save all new tasks and task status changes using a {@link TaskStorage} object. |
| */ |
| public class TaskQueue |
| { |
| private static final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60); |
| private static final long MIN_WAIT_TIME_MS = 100; |
| |
| // Task ID -> Task, for tasks that are active in some way (submitted, running, or finished and to-be-cleaned-up). |
| @GuardedBy("giant") |
| private final LinkedHashMap<String, Task> tasks = new LinkedHashMap<>(); |
| |
| // Task ID -> Future from the TaskRunner |
| @GuardedBy("giant") |
| private final Map<String, ListenableFuture<TaskStatus>> taskFutures = new HashMap<>(); |
| |
| // Tasks that are in the process of being cleaned up by notifyStatus. Prevents manageInternal from re-launching them. |
| @GuardedBy("giant") |
| private final Set<String> recentlyCompletedTasks = new HashSet<>(); |
| |
| private final TaskLockConfig lockConfig; |
| private final TaskQueueConfig config; |
| private final DefaultTaskConfig defaultTaskConfig; |
| private final TaskStorage taskStorage; |
| private final TaskRunner taskRunner; |
| private final TaskActionClientFactory taskActionClientFactory; |
| private final TaskLockbox taskLockbox; |
| private final ServiceEmitter emitter; |
| |
| private final ReentrantLock giant = new ReentrantLock(true); |
| @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") |
| private final BlockingQueue<Object> managementMayBeNecessary = new ArrayBlockingQueue<>(8); |
| private final ExecutorService managerExec = Executors.newSingleThreadExecutor( |
| new ThreadFactoryBuilder() |
| .setDaemon(false) |
| .setNameFormat("TaskQueue-Manager").build() |
| ); |
| private final ScheduledExecutorService storageSyncExec = Executors.newSingleThreadScheduledExecutor( |
| new ThreadFactoryBuilder() |
| .setDaemon(false) |
| .setNameFormat("TaskQueue-StorageSync").build() |
| ); |
| |
| private volatile boolean active = false; |
| |
| private static final EmittingLogger log = new EmittingLogger(TaskQueue.class); |
| |
| private final ConcurrentHashMap<String, AtomicLong> totalSuccessfulTaskCount = new ConcurrentHashMap<>(); |
| private final ConcurrentHashMap<String, AtomicLong> totalFailedTaskCount = new ConcurrentHashMap<>(); |
| @GuardedBy("totalSuccessfulTaskCount") |
| private Map<String, Long> prevTotalSuccessfulTaskCount = new HashMap<>(); |
| @GuardedBy("totalFailedTaskCount") |
| private Map<String, Long> prevTotalFailedTaskCount = new HashMap<>(); |
| |
| public TaskQueue( |
| TaskLockConfig lockConfig, |
| TaskQueueConfig config, |
| DefaultTaskConfig defaultTaskConfig, |
| TaskStorage taskStorage, |
| TaskRunner taskRunner, |
| TaskActionClientFactory taskActionClientFactory, |
| TaskLockbox taskLockbox, |
| ServiceEmitter emitter |
| ) |
| { |
| this.lockConfig = Preconditions.checkNotNull(lockConfig, "lockConfig"); |
| this.config = Preconditions.checkNotNull(config, "config"); |
| this.defaultTaskConfig = Preconditions.checkNotNull(defaultTaskConfig, "defaultTaskContextConfig"); |
| this.taskStorage = Preconditions.checkNotNull(taskStorage, "taskStorage"); |
| this.taskRunner = Preconditions.checkNotNull(taskRunner, "taskRunner"); |
| this.taskActionClientFactory = Preconditions.checkNotNull(taskActionClientFactory, "taskActionClientFactory"); |
| this.taskLockbox = Preconditions.checkNotNull(taskLockbox, "taskLockbox"); |
| this.emitter = Preconditions.checkNotNull(emitter, "emitter"); |
| } |
| |
| @VisibleForTesting |
| void setActive(boolean active) |
| { |
| this.active = active; |
| } |
| |
| /** |
| * Starts this task queue. Allows {@link #add(Task)} to accept new tasks. |
| */ |
| @LifecycleStart |
| public void start() |
| { |
| giant.lock(); |
| |
| try { |
| Preconditions.checkState(!active, "queue must be stopped"); |
| active = true; |
| syncFromStorage(); |
| managerExec.submit( |
| new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| while (true) { |
| try { |
| manage(); |
| break; |
| } |
| catch (InterruptedException e) { |
| log.info("Interrupted, exiting!"); |
| break; |
| } |
| catch (Exception e) { |
| final long restartDelay = config.getRestartDelay().getMillis(); |
| log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit(); |
| try { |
| Thread.sleep(restartDelay); |
| } |
| catch (InterruptedException e2) { |
| log.info("Interrupted, exiting!"); |
| break; |
| } |
| } |
| } |
| } |
| } |
| ); |
| ScheduledExecutors.scheduleAtFixedRate( |
| storageSyncExec, |
| config.getStorageSyncRate(), |
| new Callable<ScheduledExecutors.Signal>() |
| { |
| @Override |
| public ScheduledExecutors.Signal call() |
| { |
| try { |
| syncFromStorage(); |
| } |
| catch (Exception e) { |
| if (active) { |
| log.makeAlert(e, "Failed to sync with storage").emit(); |
| } |
| } |
| if (active) { |
| return ScheduledExecutors.Signal.REPEAT; |
| } else { |
| return ScheduledExecutors.Signal.STOP; |
| } |
| } |
| } |
| ); |
| requestManagement(); |
| } |
| finally { |
| giant.unlock(); |
| } |
| } |
| |
| /** |
| * Shuts down the queue. |
| */ |
| @LifecycleStop |
| public void stop() |
| { |
| giant.lock(); |
| |
| try { |
| tasks.clear(); |
| taskFutures.clear(); |
| active = false; |
| managerExec.shutdownNow(); |
| storageSyncExec.shutdownNow(); |
| requestManagement(); |
| } |
| finally { |
| giant.unlock(); |
| } |
| } |
| |
| public boolean isActive() |
| { |
| return active; |
| } |
| |
| /** |
| * Request management from the management thread. Non-blocking. |
| * |
| * Other callers (such as notifyStatus) should trigger activity on the |
| * TaskQueue thread by requesting management here. |
| */ |
| void requestManagement() |
| { |
| // use a BlockingQueue since the offer/poll/wait behaviour is simple |
| // and very easy to reason about |
| |
| // the request has to be offer (non blocking), since someone might request |
| // while already holding giant lock |
| |
| // do not care if the item fits into the queue: |
| // if the queue is already full, request has been triggered anyway |
| managementMayBeNecessary.offer(this); |
| } |
| |
| /** |
| * Await for an event to manage. |
| * |
| * This should only be called from the management thread to wait for activity. |
| * |
| * @param nanos |
| * @throws InterruptedException |
| */ |
| @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "using queue as notification mechanism, result has no value") |
| void awaitManagementNanos(long nanos) throws InterruptedException |
| { |
| // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops |
| try { |
| Thread.sleep(MIN_WAIT_TIME_MS); |
| } |
| catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| |
| // wait for an item, if an item arrives (or is already available), complete immediately |
| // (does not actually matter what the item is) |
| managementMayBeNecessary.poll(nanos - (TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS); |
| |
| // there may have been multiple requests, clear them all |
| managementMayBeNecessary.clear(); |
| } |
| |
| /** |
| * Main task runner management loop. Meant to run forever, or, at least until we're stopped. |
| */ |
| private void manage() throws InterruptedException |
| { |
| log.info("Beginning management in %s.", config.getStartDelay()); |
| Thread.sleep(config.getStartDelay().getMillis()); |
| |
| // Ignore return value- we'll get the IDs and futures from getKnownTasks later. |
| taskRunner.restore(); |
| |
| while (active) { |
| manageInternal(); |
| |
| // awaitNanos because management may become necessary without this condition signalling, |
| // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox. |
| awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS); |
| } |
| } |
| |
| @VisibleForTesting |
| void manageInternal() |
| { |
| Set<String> knownTaskIds = new HashSet<>(); |
| Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>(); |
| |
| giant.lock(); |
| |
| try { |
| manageInternalCritical(knownTaskIds, runnerTaskFutures); |
| } |
| finally { |
| giant.unlock(); |
| } |
| |
| manageInternalPostCritical(knownTaskIds, runnerTaskFutures); |
| } |
| |
| |
| /** |
| * Management loop critical section tasks. |
| * |
| * @param knownTaskIds will be modified - filled with known task IDs |
| * @param runnerTaskFutures will be modified - filled with futures related to getting the running tasks |
| */ |
| @GuardedBy("giant") |
| private void manageInternalCritical( |
| final Set<String> knownTaskIds, |
| final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures |
| ) |
| { |
| // Task futures available from the taskRunner |
| for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) { |
| if (!recentlyCompletedTasks.contains(workItem.getTaskId())) { |
| // Don't do anything with tasks that have recently finished; notifyStatus will handle it. |
| runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult()); |
| } |
| } |
| // Attain futures for all active tasks (assuming they are ready to run). |
| // Copy tasks list, as notifyStatus may modify it. |
| for (final Task task : ImmutableList.copyOf(tasks.values())) { |
| if (recentlyCompletedTasks.contains(task.getId())) { |
| // Don't do anything with tasks that have recently finished; notifyStatus will handle it. |
| continue; |
| } |
| |
| knownTaskIds.add(task.getId()); |
| |
| if (!taskFutures.containsKey(task.getId())) { |
| final ListenableFuture<TaskStatus> runnerTaskFuture; |
| if (runnerTaskFutures.containsKey(task.getId())) { |
| runnerTaskFuture = runnerTaskFutures.get(task.getId()); |
| } else { |
| // Task should be running, so run it. |
| final boolean taskIsReady; |
| try { |
| taskIsReady = task.isReady(taskActionClientFactory.create(task)); |
| } |
| catch (Exception e) { |
| log.warn(e, "Exception thrown during isReady for task: %s", task.getId()); |
| final String errorMessage; |
| if (e instanceof MaxAllowedLocksExceededException) { |
| errorMessage = e.getMessage(); |
| } else { |
| errorMessage = "Failed while waiting for the task to be ready to run. " |
| + "See overlord logs for more details."; |
| } |
| notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage); |
| continue; |
| } |
| if (taskIsReady) { |
| log.info("Asking taskRunner to run: %s", task.getId()); |
| runnerTaskFuture = taskRunner.run(task); |
| } else { |
| // Task.isReady() can internally lock intervals or segments. |
| // We should release them if the task is not ready. |
| taskLockbox.unlockAll(task); |
| continue; |
| } |
| } |
| taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture)); |
| } else if (isTaskPending(task)) { |
| // if the taskFutures contain this task and this task is pending, also let the taskRunner |
| // to run it to guarantee it will be assigned to run |
| // see https://github.com/apache/druid/pull/6991 |
| taskRunner.run(task); |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| private void manageInternalPostCritical( |
| final Set<String> knownTaskIds, |
| final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures |
| ) |
| { |
| // Kill tasks that shouldn't be running |
| final Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds); |
| if (!tasksToKill.isEmpty()) { |
| log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size()); |
| |
| // On large installations running several thousands of tasks, |
| // concatenating the list of known task ids can be compupationally expensive. |
| final boolean logKnownTaskIds = log.isDebugEnabled(); |
| final String reason = logKnownTaskIds |
| ? StringUtils.format("Task is not in knownTaskIds[%s]", knownTaskIds) |
| : "Task is not in knownTaskIds"; |
| |
| for (final String taskId : tasksToKill) { |
| try { |
| taskRunner.shutdown(taskId, reason); |
| } |
| catch (Exception e) { |
| log.warn(e, "TaskRunner failed to clean up task: %s", taskId); |
| } |
| } |
| } |
| } |
| |
| private boolean isTaskPending(Task task) |
| { |
| return taskRunner.getPendingTasks() |
| .stream() |
| .anyMatch(workItem -> workItem.getTaskId().equals(task.getId())); |
| } |
| |
| /** |
| * Adds some work to the queue and the underlying task storage facility with a generic "running" status. |
| * |
| * @param task task to add |
| * |
| * @return true |
| * |
| * @throws EntryExistsException if the task already exists |
| */ |
| public boolean add(final Task task) throws EntryExistsException |
| { |
| if (taskStorage.getTask(task.getId()).isPresent()) { |
| throw new EntryExistsException(StringUtils.format("Task %s already exists", task.getId())); |
| } |
| |
| // Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec. |
| task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock()); |
| defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent); |
| // Every task shuold use the lineage-based segment allocation protocol unless it is explicitly set to |
| // using the legacy protocol. |
| task.addToContextIfAbsent( |
| SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY, |
| SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION |
| ); |
| |
| giant.lock(); |
| |
| try { |
| Preconditions.checkState(active, "Queue is not active!"); |
| Preconditions.checkNotNull(task, "task"); |
| Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %,d)", config.getMaxSize()); |
| |
| // If this throws with any sort of exception, including TaskExistsException, we don't want to |
| // insert the task into our queue. So don't catch it. |
| taskStorage.insert(task, TaskStatus.running(task.getId())); |
| addTaskInternal(task); |
| requestManagement(); |
| return true; |
| } |
| finally { |
| giant.unlock(); |
| } |
| } |
| |
| @GuardedBy("giant") |
| private void addTaskInternal(final Task task) |
| { |
| final Task existingTask = tasks.putIfAbsent(task.getId(), task); |
| |
| if (existingTask == null) { |
| taskLockbox.add(task); |
| } else if (!existingTask.equals(task)) { |
| throw new ISE("Cannot add task ID [%s] with same ID as task that has already been added", task.getId()); |
| } |
| } |
| |
| /** |
| * Removes a task from {@link #tasks} and {@link #taskLockbox}, if it exists. Returns whether the task was |
| * removed or not. |
| */ |
| @GuardedBy("giant") |
| private boolean removeTaskInternal(final String taskId) |
| { |
| final Task task = tasks.remove(taskId); |
| if (task != null) { |
| taskLockbox.remove(task); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| /** |
| * Shuts down a task if it has not yet finished. |
| * |
| * @param taskId task to kill |
| * @param reasonFormat A format string indicating the shutdown reason |
| * @param args arguments for reasonFormat |
| */ |
| public void shutdown(final String taskId, String reasonFormat, Object... args) |
| { |
| giant.lock(); |
| |
| try { |
| final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId")); |
| if (task != null) { |
| notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args); |
| } |
| } |
| finally { |
| giant.unlock(); |
| } |
| } |
| |
| /** |
| * Shuts down a task, but records the task status as a success, unike {@link #shutdown(String, String, Object...)} |
| * |
| * @param taskId task to shutdown |
| * @param reasonFormat A format string indicating the shutdown reason |
| * @param args arguments for reasonFormat |
| */ |
| public void shutdownWithSuccess(final String taskId, String reasonFormat, Object... args) |
| { |
| giant.lock(); |
| |
| try { |
| final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId")); |
| if (task != null) { |
| notifyStatus(task, TaskStatus.success(taskId), reasonFormat, args); |
| } |
| } |
| finally { |
| giant.unlock(); |
| } |
| } |
| |
| /** |
| * Notify this queue that some task has an updated status. If this update is valid, the status will be persisted in |
| * the task storage facility. If the status is a completed status, the task will be unlocked and no further |
| * updates will be accepted. |
| * |
| * @param task task to update |
| * @param taskStatus new task status |
| * |
| * @throws NullPointerException if task or status is null |
| * @throws IllegalArgumentException if the task ID does not match the status ID |
| * @throws IllegalStateException if this queue is currently shut down |
| */ |
| private void notifyStatus(final Task task, final TaskStatus taskStatus, String reasonFormat, Object... args) |
| { |
| Preconditions.checkNotNull(task, "task"); |
| Preconditions.checkNotNull(taskStatus, "status"); |
| Preconditions.checkState(active, "Queue is not active!"); |
| Preconditions.checkArgument( |
| task.getId().equals(taskStatus.getId()), |
| "Mismatching task ids[%s/%s]", |
| task.getId(), |
| taskStatus.getId() |
| ); |
| |
| if (!taskStatus.isComplete()) { |
| // Nothing to do for incomplete statuses. |
| return; |
| } |
| |
| // Critical section: add this task to recentlyCompletedTasks, so it isn't managed while being cleaned up. |
| giant.lock(); |
| try { |
| recentlyCompletedTasks.add(task.getId()); |
| } |
| finally { |
| giant.unlock(); |
| } |
| |
| final TaskLocation taskLocation = taskRunner.getTaskLocation(task.getId()); |
| |
| // Save status to metadata store first, so if we crash while doing the rest of the shutdown, our successor |
| // remembers that this task has completed. |
| try { |
| final Optional<TaskStatus> previousStatus = taskStorage.getStatus(task.getId()); |
| if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) { |
| log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit(); |
| } else { |
| taskStorage.setStatus(taskStatus.withLocation(taskLocation)); |
| } |
| } |
| catch (Throwable e) { |
| // If persist fails, even after the retries performed in taskStorage, then metadata store and actual cluster |
| // state have diverged. Send out an alert and continue with the task shutdown routine. |
| log.makeAlert(e, "Failed to persist status for task") |
| .addData("task", task.getId()) |
| .addData("statusCode", taskStatus.getStatusCode()) |
| .emit(); |
| } |
| |
| // Inform taskRunner that this task can be shut down. |
| try { |
| taskRunner.shutdown(task.getId(), reasonFormat, args); |
| } |
| catch (Throwable e) { |
| // If task runner shutdown fails, continue with the task shutdown routine. We'll come back and try to |
| // shut it down again later in manageInternalPostCritical, once it's removed from the "tasks" map. |
| log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId()); |
| } |
| |
| // Critical section: remove this task from all of our tracking data structures. |
| giant.lock(); |
| try { |
| if (removeTaskInternal(task.getId())) { |
| taskFutures.remove(task.getId()); |
| } else { |
| log.warn("Unknown task completed: %s", task.getId()); |
| } |
| |
| recentlyCompletedTasks.remove(task.getId()); |
| requestManagement(); |
| } |
| finally { |
| giant.unlock(); |
| } |
| } |
| |
| /** |
| * Attach success and failure handlers to a task status future, such that when it completes, we perform the |
| * appropriate updates. |
| * |
| * @param statusFuture a task status future |
| * |
| * @return the same future, for convenience |
| */ |
| private ListenableFuture<TaskStatus> attachCallbacks(final Task task, final ListenableFuture<TaskStatus> statusFuture) |
| { |
| final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); |
| IndexTaskUtils.setTaskDimensions(metricBuilder, task); |
| |
| Futures.addCallback( |
| statusFuture, |
| new FutureCallback<TaskStatus>() |
| { |
| @Override |
| public void onSuccess(final TaskStatus status) |
| { |
| log.info("Received %s status for task: %s", status.getStatusCode(), status.getId()); |
| handleStatus(status); |
| } |
| |
| @Override |
| public void onFailure(final Throwable t) |
| { |
| log.makeAlert(t, "Failed to run task") |
| .addData("task", task.getId()) |
| .addData("type", task.getType()) |
| .addData("dataSource", task.getDataSource()) |
| .emit(); |
| handleStatus( |
| TaskStatus.failure(task.getId(), "Failed to run this task. See overlord logs for more details.") |
| ); |
| } |
| |
| private void handleStatus(final TaskStatus status) |
| { |
| try { |
| // If we're not supposed to be running anymore, don't do anything. Somewhat racey if the flag gets set |
| // after we check and before we commit the database transaction, but better than nothing. |
| if (!active) { |
| log.info("Abandoning task due to shutdown: %s", task.getId()); |
| return; |
| } |
| |
| notifyStatus(task, status, "notified status change from task"); |
| |
| // Emit event and log, if the task is done |
| if (status.isComplete()) { |
| IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status); |
| emitter.emit(metricBuilder.build("task/run/time", status.getDuration())); |
| |
| log.info( |
| "Task %s: %s (%d run duration)", |
| status.getStatusCode(), |
| task.getId(), |
| status.getDuration() |
| ); |
| |
| if (status.isSuccess()) { |
| Counters.incrementAndGetLong(totalSuccessfulTaskCount, task.getDataSource()); |
| } else { |
| Counters.incrementAndGetLong(totalFailedTaskCount, task.getDataSource()); |
| } |
| } |
| } |
| catch (Exception e) { |
| log.makeAlert(e, "Failed to handle task status") |
| .addData("task", task.getId()) |
| .addData("statusCode", status.getStatusCode()) |
| .emit(); |
| } |
| } |
| } |
| ); |
| return statusFuture; |
| } |
| |
| /** |
| * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state |
| * corresponds to the storage facility even if the latter is manually modified. |
| */ |
| private void syncFromStorage() |
| { |
| giant.lock(); |
| |
| try { |
| if (active) { |
| final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks()); |
| final int tasksSynced = newTasks.size(); |
| final Map<String, Task> oldTasks = new HashMap<>(tasks); |
| |
| // Calculate differences on IDs instead of Task Objects. |
| Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet())); |
| for (String taskID : commonIds) { |
| newTasks.remove(taskID); |
| oldTasks.remove(taskID); |
| } |
| Collection<Task> addedTasks = newTasks.values(); |
| Collection<Task> removedTasks = oldTasks.values(); |
| |
| // Clean up removed Tasks |
| for (Task task : removedTasks) { |
| removeTaskInternal(task.getId()); |
| } |
| |
| // Add newly Added tasks to the queue |
| for (Task task : addedTasks) { |
| addTaskInternal(task); |
| } |
| |
| log.info( |
| "Synced %d tasks from storage (%d tasks added, %d tasks removed).", |
| tasksSynced, |
| addedTasks.size(), |
| removedTasks.size() |
| ); |
| requestManagement(); |
| } else { |
| log.info("Not active. Skipping storage sync."); |
| } |
| } |
| catch (Exception e) { |
| log.warn(e, "Failed to sync tasks from storage!"); |
| throw new RuntimeException(e); |
| } |
| finally { |
| giant.unlock(); |
| } |
| } |
| |
| private static Map<String, Task> toTaskIDMap(List<Task> taskList) |
| { |
| Map<String, Task> rv = new HashMap<>(); |
| for (Task task : taskList) { |
| rv.put(task.getId(), task); |
| } |
| return rv; |
| } |
| |
| private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev) |
| { |
| return total.entrySet() |
| .stream() |
| .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L))); |
| } |
| |
| public Map<String, Long> getSuccessfulTaskCount() |
| { |
| Map<String, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get); |
| synchronized (totalSuccessfulTaskCount) { |
| Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); |
| prevTotalSuccessfulTaskCount = total; |
| return delta; |
| } |
| } |
| |
| public Map<String, Long> getFailedTaskCount() |
| { |
| Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get); |
| synchronized (totalFailedTaskCount) { |
| Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount); |
| prevTotalFailedTaskCount = total; |
| return delta; |
| } |
| } |
| |
| Map<String, String> getCurrentTaskDatasources() |
| { |
| giant.lock(); |
| try { |
| return tasks.values().stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); |
| } |
| finally { |
| giant.unlock(); |
| } |
| } |
| |
| public Map<String, Long> getRunningTaskCount() |
| { |
| Map<String, String> taskDatasources = getCurrentTaskDatasources(); |
| return taskRunner.getRunningTasks() |
| .stream() |
| .collect(Collectors.toMap( |
| e -> taskDatasources.getOrDefault(e.getTaskId(), ""), |
| e -> 1L, |
| Long::sum |
| )); |
| } |
| |
| public Map<String, Long> getPendingTaskCount() |
| { |
| Map<String, String> taskDatasources = getCurrentTaskDatasources(); |
| return taskRunner.getPendingTasks() |
| .stream() |
| .collect(Collectors.toMap( |
| e -> taskDatasources.getOrDefault(e.getTaskId(), ""), |
| e -> 1L, |
| Long::sum |
| )); |
| } |
| |
| public Map<String, Long> getWaitingTaskCount() |
| { |
| Set<String> runnerKnownTaskIds = taskRunner.getKnownTasks() |
| .stream() |
| .map(TaskRunnerWorkItem::getTaskId) |
| .collect(Collectors.toSet()); |
| |
| giant.lock(); |
| try { |
| return tasks.values().stream().filter(task -> !runnerKnownTaskIds.contains(task.getId())) |
| .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); |
| } |
| finally { |
| giant.unlock(); |
| } |
| } |
| |
| @VisibleForTesting |
| List<Task> getTasks() |
| { |
| giant.lock(); |
| try { |
| return new ArrayList<>(tasks.values()); |
| } |
| finally { |
| giant.unlock(); |
| } |
| } |
| } |