| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.overlord.hrtr; |
| |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Optional; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Predicate; |
| import com.google.common.base.Supplier; |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.Collections2; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Maps; |
| import com.google.common.io.ByteSource; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListenableScheduledFuture; |
| import com.google.common.util.concurrent.ListeningScheduledExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.errorprone.annotations.concurrent.GuardedBy; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.druid.concurrent.LifecycleLock; |
| import org.apache.druid.discovery.DiscoveryDruidNode; |
| import org.apache.druid.discovery.DruidNodeDiscovery; |
| import org.apache.druid.discovery.DruidNodeDiscoveryProvider; |
| import org.apache.druid.discovery.WorkerNodeService; |
| import org.apache.druid.indexer.RunnerTaskState; |
| import org.apache.druid.indexer.TaskLocation; |
| import org.apache.druid.indexer.TaskState; |
| import org.apache.druid.indexer.TaskStatus; |
| import org.apache.druid.indexing.common.task.Task; |
| import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; |
| import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem; |
| import org.apache.druid.indexing.overlord.TaskRunnerListener; |
| import org.apache.druid.indexing.overlord.TaskRunnerUtils; |
| import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; |
| import org.apache.druid.indexing.overlord.TaskStorage; |
| import org.apache.druid.indexing.overlord.WorkerTaskRunner; |
| import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService; |
| import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; |
| import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; |
| import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig; |
| import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; |
| import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; |
| import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy; |
| import org.apache.druid.indexing.worker.TaskAnnouncement; |
| import org.apache.druid.indexing.worker.Worker; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.Pair; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.concurrent.Execs; |
| import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; |
| import org.apache.druid.java.util.common.lifecycle.LifecycleStart; |
| import org.apache.druid.java.util.common.lifecycle.LifecycleStop; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.java.util.http.client.HttpClient; |
| import org.apache.druid.java.util.http.client.Request; |
| import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; |
| import org.apache.druid.server.initialization.IndexerZkConfig; |
| import org.apache.druid.tasklogs.TaskLogStreamer; |
| import org.apache.zookeeper.KeeperException; |
| import org.jboss.netty.handler.codec.http.HttpMethod; |
| import org.joda.time.Period; |
| |
| import javax.annotation.Nullable; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.URL; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| |
| /** |
| * A Remote TaskRunner to manage tasks on Middle Manager nodes using internal-discovery({@link DruidNodeDiscoveryProvider}) |
| * to discover them and Http. |
| * Middle Managers manages list of assigned/completed tasks on disk and expose 3 HTTP endpoints |
| * 1. POST request for assigning a task |
| * 2. POST request for shutting down a task |
| * 3. GET request for getting list of assigned, running, completed tasks on Middle Manager and its enable/disable status. |
| * This endpoint is implemented to support long poll and holds the request till there is a change. This class |
| * sends the next request immediately as the previous finishes to keep the state up-to-date. |
| * <p> |
| * ZK_CLEANUP_TODO : As of 0.11.1, it is required to cleanup task status paths from ZK which are created by the |
| * workers to support deprecated RemoteTaskRunner. So a method "scheduleCompletedTaskStatusCleanupFromZk()" is added' |
| * which should be removed in the release that removes RemoteTaskRunner legacy ZK updation WorkerTaskMonitor class. |
| */ |
| public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer |
| { |
| private static final EmittingLogger log = new EmittingLogger(HttpRemoteTaskRunner.class); |
| |
| private final LifecycleLock lifecycleLock = new LifecycleLock(); |
| |
| // Executor for assigning pending tasks to workers. |
| private final ExecutorService pendingTasksExec; |
| |
| // All known tasks, TaskID -> HttpRemoteTaskRunnerWorkItem |
| // This is a ConcurrentMap as some of the reads are done without holding the lock. |
| @GuardedBy("statusLock") |
| private final ConcurrentMap<String, HttpRemoteTaskRunnerWorkItem> tasks = new ConcurrentHashMap<>(); |
| |
| // This is the list of pending tasks in the order they arrived, exclusively manipulated/used by thread that |
| // gives a new task to this class and threads in pendingTasksExec that are responsible for assigning tasks to |
| // workers. |
| @GuardedBy("statusLock") |
| private final List<String> pendingTaskIds = new ArrayList<>(); |
| |
| // All discovered workers, "host:port" -> WorkerHolder |
| private final ConcurrentMap<String, WorkerHolder> workers = new ConcurrentHashMap<>(); |
| |
| // Executor for syncing state of each worker. |
| private final ScheduledExecutorService workersSyncExec; |
| |
| // Workers that have been marked as lazy. these workers are not running any tasks and can be terminated safely by the scaling policy. |
| private final ConcurrentMap<String, WorkerHolder> lazyWorkers = new ConcurrentHashMap<>(); |
| |
| // Workers that have been blacklisted. |
| private final ConcurrentHashMap<String, WorkerHolder> blackListedWorkers = new ConcurrentHashMap<>(); |
| |
| // workers which were assigned a task and are yet to acknowledge same. |
| // Map: workerId -> taskId |
| // all writes are guarded |
| @GuardedBy("statusLock") |
| private final ConcurrentMap<String, String> workersWithUnacknowledgedTask = new ConcurrentHashMap<>(); |
| |
| // Executor to complete cleanup of workers which have disappeared. |
| private final ListeningScheduledExecutorService cleanupExec; |
| private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>(); |
| |
| |
| private final Object statusLock = new Object(); |
| |
| // task runner listeners |
| private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>(); |
| |
| private final ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy; |
| private ProvisioningService provisioningService; |
| |
| private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; |
| private final HttpClient httpClient; |
| private final ObjectMapper smileMapper; |
| |
| private final Supplier<WorkerBehaviorConfig> workerConfigRef; |
| private final HttpRemoteTaskRunnerConfig config; |
| |
| private final TaskStorage taskStorage; |
| |
| // ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and WorkerTaskMonitor are removed. |
| private static final Joiner JOINER = Joiner.on("/"); |
| |
| @Nullable // Null, if zk is disabled |
| private final CuratorFramework cf; |
| |
| @Nullable // Null, if zk is disabled |
| private final ScheduledExecutorService zkCleanupExec; |
| private final IndexerZkConfig indexerZkConfig; |
| |
| public HttpRemoteTaskRunner( |
| ObjectMapper smileMapper, |
| HttpRemoteTaskRunnerConfig config, |
| HttpClient httpClient, |
| Supplier<WorkerBehaviorConfig> workerConfigRef, |
| ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy, |
| DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, |
| TaskStorage taskStorage, |
| @Nullable CuratorFramework cf, |
| IndexerZkConfig indexerZkConfig |
| ) |
| { |
| this.smileMapper = smileMapper; |
| this.config = config; |
| this.httpClient = httpClient; |
| this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; |
| this.taskStorage = taskStorage; |
| this.workerConfigRef = workerConfigRef; |
| |
| this.pendingTasksExec = Execs.multiThreaded( |
| config.getPendingTasksRunnerNumThreads(), |
| "hrtr-pending-tasks-runner-%d" |
| ); |
| |
| this.workersSyncExec = ScheduledExecutors.fixed( |
| config.getWorkerSyncNumThreads(), |
| "HttpRemoteTaskRunner-worker-sync-%d" |
| ); |
| |
| this.cleanupExec = MoreExecutors.listeningDecorator( |
| ScheduledExecutors.fixed(1, "HttpRemoteTaskRunner-Worker-Cleanup-%d") |
| ); |
| |
| if (cf != null) { |
| this.cf = cf; |
| this.zkCleanupExec = ScheduledExecutors.fixed( |
| 1, |
| "HttpRemoteTaskRunner-zk-cleanup-%d" |
| ); |
| } else { |
| this.cf = null; |
| this.zkCleanupExec = null; |
| } |
| |
| this.indexerZkConfig = indexerZkConfig; |
| |
| this.provisioningStrategy = provisioningStrategy; |
| } |
| |
| @Override |
| @LifecycleStart |
| public void start() |
| { |
| if (!lifecycleLock.canStart()) { |
| return; |
| } |
| |
| try { |
| log.info("Starting..."); |
| |
| scheduleCompletedTaskStatusCleanupFromZk(); |
| |
| startWorkersHandling(); |
| |
| ScheduledExecutors.scheduleAtFixedRate( |
| cleanupExec, |
| Period.ZERO.toStandardDuration(), |
| config.getWorkerBlackListCleanupPeriod().toStandardDuration(), |
| this::checkAndRemoveWorkersFromBlackList |
| ); |
| |
| provisioningService = provisioningStrategy.makeProvisioningService(this); |
| |
| scheduleSyncMonitoring(); |
| |
| startPendingTaskHandling(); |
| |
| lifecycleLock.started(); |
| |
| log.info("Started."); |
| } |
| catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| finally { |
| lifecycleLock.exitStart(); |
| } |
| } |
| |
| private void scheduleCompletedTaskStatusCleanupFromZk() |
| { |
| if (cf == null) { |
| return; |
| } |
| |
| zkCleanupExec.scheduleAtFixedRate( |
| () -> { |
| try { |
| List<String> workers; |
| try { |
| workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath()); |
| } |
| catch (KeeperException.NoNodeException e) { |
| // statusPath doesn't exist yet; can occur if no middleManagers have started. |
| workers = ImmutableList.of(); |
| } |
| |
| Set<String> knownActiveTaskIds = new HashSet<>(); |
| if (!workers.isEmpty()) { |
| for (Task task : taskStorage.getActiveTasks()) { |
| knownActiveTaskIds.add(task.getId()); |
| } |
| } |
| |
| for (String workerId : workers) { |
| String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId); |
| |
| List<String> taskIds; |
| try { |
| taskIds = cf.getChildren().forPath(workerStatusPath); |
| } |
| catch (KeeperException.NoNodeException e) { |
| taskIds = ImmutableList.of(); |
| } |
| |
| for (String taskId : taskIds) { |
| if (!knownActiveTaskIds.contains(taskId)) { |
| String taskStatusPath = JOINER.join(workerStatusPath, taskId); |
| try { |
| cf.delete().guaranteed().forPath(taskStatusPath); |
| } |
| catch (KeeperException.NoNodeException e) { |
| log.info("Failed to delete taskStatusPath[%s].", taskStatusPath); |
| } |
| } |
| } |
| } |
| } |
| catch (InterruptedException ex) { |
| Thread.currentThread().interrupt(); |
| } |
| catch (Exception ex) { |
| log.error(ex, "Unknown error while doing task status cleanup in ZK."); |
| } |
| }, |
| 1, |
| 5, |
| TimeUnit.MINUTES |
| ); |
| } |
| |
| /** |
| * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} |
| */ |
| @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is safe |
| Map<String, ImmutableWorkerInfo> getWorkersEligibleToRunTasks() |
| { |
| // In this class, this method is called with statusLock held. |
| // writes to workersWithUnacknowledgedTask are always guarded by statusLock. |
| // however writes to lazyWorker/blacklistedWorkers aren't necessarily guarded by same lock, so technically there |
| // could be races in that a task could get assigned to a worker which in another thread is concurrently being |
| // marked lazy/blacklisted , but that is ok because that is equivalent to this worker being picked for task and |
| // being assigned lazy/blacklisted right after even when the two threads hold a mutually exclusive lock. |
| return Maps.transformEntries( |
| Maps.filterEntries( |
| workers, |
| input -> !lazyWorkers.containsKey(input.getKey()) && |
| !workersWithUnacknowledgedTask.containsKey(input.getKey()) && |
| !blackListedWorkers.containsKey(input.getKey()) && |
| input.getValue().isInitialized() && |
| input.getValue().isEnabled() |
| ), |
| (String key, WorkerHolder value) -> value.toImmutable() |
| ); |
| } |
| |
| private ImmutableWorkerInfo findWorkerToRunTask(Task task) |
| { |
| WorkerBehaviorConfig workerConfig = workerConfigRef.get(); |
| WorkerSelectStrategy strategy; |
| if (workerConfig == null || workerConfig.getSelectStrategy() == null) { |
| strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY; |
| log.debug("No worker selection strategy set. Using default of [%s]", strategy.getClass().getSimpleName()); |
| } else { |
| strategy = workerConfig.getSelectStrategy(); |
| } |
| |
| return strategy.findWorkerForTask( |
| config, |
| ImmutableMap.copyOf(getWorkersEligibleToRunTasks()), |
| task |
| ); |
| } |
| |
| private boolean runTaskOnWorker( |
| final HttpRemoteTaskRunnerWorkItem workItem, |
| final String workerHost |
| ) throws InterruptedException |
| { |
| String taskId = workItem.getTaskId(); |
| WorkerHolder workerHolder = workers.get(workerHost); |
| |
| if (workerHolder == null || lazyWorkers.containsKey(workerHost) || blackListedWorkers.containsKey(workerHost)) { |
| log.info("Not assigning task[%s] to removed or marked lazy/blacklisted worker[%s]", taskId, workerHost); |
| return false; |
| } |
| |
| log.info("Asking Worker[%s] to run task[%s]", workerHost, taskId); |
| |
| if (workerHolder.assignTask(workItem.getTask())) { |
| // Don't assign new tasks until the task we just assigned is actually running |
| // on a worker - this avoids overflowing a worker with tasks |
| long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis(); |
| long waitStart = System.currentTimeMillis(); |
| boolean isTaskAssignmentTimedOut = false; |
| synchronized (statusLock) { |
| while (tasks.containsKey(taskId) && tasks.get(taskId).getState().isPending()) { |
| long remaining = waitMs - (System.currentTimeMillis() - waitStart); |
| if (remaining > 0) { |
| statusLock.wait(remaining); |
| } else { |
| isTaskAssignmentTimedOut = true; |
| break; |
| } |
| } |
| } |
| |
| if (isTaskAssignmentTimedOut) { |
| log.makeAlert( |
| "Task assignment timed out on worker [%s], never ran task [%s] in timeout[%s]!", |
| workerHost, |
| taskId, |
| config.getTaskAssignmentTimeout() |
| ).emit(); |
| // taskComplete(..) must be called outside of statusLock, see comments on method. |
| taskComplete( |
| workItem, |
| workerHolder, |
| TaskStatus.failure( |
| taskId, |
| StringUtils.format( |
| "The worker that this task is assigned did not start it in timeout[%s]. " |
| + "See overlord and middleManager/indexer logs for more details.", |
| config.getTaskAssignmentTimeout() |
| ) |
| ) |
| ); |
| } |
| |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| // CAUTION: This method calls RemoteTaskRunnerWorkItem.setResult(..) which results in TaskQueue.notifyStatus() being called |
| // because that is attached by TaskQueue to task result future. So, this method must not be called with "statusLock" |
| // held. See https://github.com/apache/druid/issues/6201 |
| private void taskComplete( |
| HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem, |
| WorkerHolder workerHolder, |
| TaskStatus taskStatus |
| ) |
| { |
| Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread must not hold statusLock."); |
| Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem"); |
| Preconditions.checkNotNull(taskStatus, "taskStatus"); |
| if (workerHolder != null) { |
| log.info( |
| "Worker[%s] completed task[%s] with status[%s]", |
| workerHolder.getWorker().getHost(), |
| taskStatus.getId(), |
| taskStatus.getStatusCode() |
| ); |
| // Worker is done with this task |
| workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc()); |
| } |
| |
| if (taskRunnerWorkItem.getResult().isDone()) { |
| // This is not the first complete event. |
| try { |
| TaskState lastKnownState = taskRunnerWorkItem.getResult().get().getStatusCode(); |
| if (taskStatus.getStatusCode() != lastKnownState) { |
| log.warn( |
| "The state of the new task complete event is different from its last known state. " |
| + "New state[%s], last known state[%s]", |
| taskStatus.getStatusCode(), |
| lastKnownState |
| ); |
| } |
| } |
| catch (InterruptedException e) { |
| log.warn(e, "Interrupted while getting the last known task status."); |
| Thread.currentThread().interrupt(); |
| } |
| catch (ExecutionException e) { |
| // This case should not really happen. |
| log.warn(e, "Failed to get the last known task status. Ignoring this failure."); |
| } |
| } else { |
| // Notify interested parties |
| taskRunnerWorkItem.setResult(taskStatus); |
| TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus); |
| |
| // Update success/failure counters, Blacklist node if there are too many failures. |
| if (workerHolder != null) { |
| blacklistWorkerIfNeeded(taskStatus, workerHolder); |
| } |
| } |
| |
| synchronized (statusLock) { |
| statusLock.notifyAll(); |
| } |
| } |
| |
| private void startWorkersHandling() throws InterruptedException |
| { |
| final CountDownLatch workerViewInitialized = new CountDownLatch(1); |
| DruidNodeDiscovery druidNodeDiscovery = |
| druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY); |
| druidNodeDiscovery.registerListener( |
| new DruidNodeDiscovery.Listener() |
| { |
| @Override |
| public void nodesAdded(Collection<DiscoveryDruidNode> nodes) |
| { |
| nodes.forEach(node -> addWorker(toWorker(node))); |
| } |
| |
| @Override |
| public void nodesRemoved(Collection<DiscoveryDruidNode> nodes) |
| { |
| nodes.forEach(node -> removeWorker(toWorker(node))); |
| } |
| |
| @Override |
| public void nodeViewInitialized() |
| { |
| //CountDownLatch.countDown() does nothing when count has already reached 0. |
| workerViewInitialized.countDown(); |
| } |
| } |
| ); |
| |
| long workerDiscoveryStartTime = System.currentTimeMillis(); |
| while (!workerViewInitialized.await(30, TimeUnit.SECONDS)) { |
| if (System.currentTimeMillis() - workerDiscoveryStartTime > TimeUnit.MINUTES.toMillis(5)) { |
| throw new ISE("Couldn't discover workers."); |
| } else { |
| log.info("Waiting for worker discovery..."); |
| } |
| } |
| log.info("[%s] Workers are discovered.", workers.size()); |
| |
| // Wait till all worker state is sync'd so that we know which worker is running/completed what tasks or else |
| // We would start assigning tasks which are pretty soon going to be reported by discovered workers. |
| for (WorkerHolder worker : workers.values()) { |
| log.info("Waiting for worker[%s] to sync state...", worker.getWorker().getHost()); |
| worker.waitForInitialization(); |
| } |
| log.info("Workers have sync'd state successfully."); |
| } |
| |
| private Worker toWorker(DiscoveryDruidNode node) |
| { |
| return new Worker( |
| node.getDruidNode().getServiceScheme(), |
| node.getDruidNode().getHostAndPortToUse(), |
| ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getIp(), |
| ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getCapacity(), |
| ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getVersion(), |
| ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getCategory() |
| ); |
| } |
| |
| private void addWorker(final Worker worker) |
| { |
| synchronized (workers) { |
| log.info("Worker[%s] reportin' for duty!", worker.getHost()); |
| cancelWorkerCleanup(worker.getHost()); |
| |
| WorkerHolder holder = workers.get(worker.getHost()); |
| if (holder == null) { |
| List<TaskAnnouncement> expectedAnnouncements = new ArrayList<>(); |
| synchronized (statusLock) { |
| // It might be a worker that existed before, temporarily went away and came back. We might have a set of |
| // tasks that we think are running on this worker. Provide that information to WorkerHolder that |
| // manages the task syncing with that worker. |
| for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> e : tasks.entrySet()) { |
| if (e.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) { |
| Worker w = e.getValue().getWorker(); |
| if (w != null && w.getHost().equals(worker.getHost())) { |
| expectedAnnouncements.add( |
| TaskAnnouncement.create( |
| e.getValue().getTask(), |
| TaskStatus.running(e.getKey()), |
| e.getValue().getLocation() |
| ) |
| ); |
| } |
| } |
| } |
| } |
| holder = createWorkerHolder( |
| smileMapper, |
| httpClient, |
| config, |
| workersSyncExec, |
| this::taskAddedOrUpdated, |
| worker, |
| expectedAnnouncements |
| ); |
| holder.start(); |
| workers.put(worker.getHost(), holder); |
| } else { |
| log.info("Worker[%s] already exists.", worker.getHost()); |
| } |
| } |
| |
| synchronized (statusLock) { |
| statusLock.notifyAll(); |
| } |
| } |
| |
| protected WorkerHolder createWorkerHolder( |
| ObjectMapper smileMapper, |
| HttpClient httpClient, |
| HttpRemoteTaskRunnerConfig config, |
| ScheduledExecutorService workersSyncExec, |
| WorkerHolder.Listener listener, |
| Worker worker, |
| List<TaskAnnouncement> knownAnnouncements |
| ) |
| { |
| return new WorkerHolder(smileMapper, httpClient, config, workersSyncExec, listener, worker, knownAnnouncements); |
| } |
| |
| private void removeWorker(final Worker worker) |
| { |
| synchronized (workers) { |
| log.info("Kaboom! Worker[%s] removed!", worker.getHost()); |
| |
| WorkerHolder workerHolder = workers.remove(worker.getHost()); |
| |
| if (workerHolder != null) { |
| try { |
| workerHolder.stop(); |
| scheduleTasksCleanupForWorker(worker.getHost()); |
| } |
| catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| finally { |
| checkAndRemoveWorkersFromBlackList(); |
| } |
| } |
| lazyWorkers.remove(worker.getHost()); |
| } |
| } |
| |
| private boolean cancelWorkerCleanup(String workerHost) |
| { |
| ScheduledFuture previousCleanup = removedWorkerCleanups.remove(workerHost); |
| if (previousCleanup != null) { |
| log.info("Cancelling Worker[%s] scheduled task cleanup", workerHost); |
| previousCleanup.cancel(false); |
| } |
| return previousCleanup != null; |
| } |
| |
| private void scheduleTasksCleanupForWorker(final String workerHostAndPort) |
| { |
| cancelWorkerCleanup(workerHostAndPort); |
| |
| final ListenableScheduledFuture<?> cleanupTask = cleanupExec.schedule( |
| () -> { |
| log.info("Running scheduled cleanup for Worker[%s]", workerHostAndPort); |
| try { |
| Set<HttpRemoteTaskRunnerWorkItem> tasksToFail = new HashSet<>(); |
| synchronized (statusLock) { |
| for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> e : tasks.entrySet()) { |
| if (e.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) { |
| Worker w = e.getValue().getWorker(); |
| if (w != null && w.getHost().equals(workerHostAndPort)) { |
| tasksToFail.add(e.getValue()); |
| } |
| } |
| } |
| } |
| |
| for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) { |
| if (!taskItem.getResult().isDone()) { |
| log.warn( |
| "Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s].", |
| workerHostAndPort, |
| taskItem.getTaskId(), |
| config.getTaskCleanupTimeout() |
| ); |
| // taskComplete(..) must be called outside of statusLock, see comments on method. |
| taskComplete( |
| taskItem, |
| null, |
| TaskStatus.failure( |
| taskItem.getTaskId(), |
| StringUtils.format( |
| "The worker that this task was assigned disappeared and " |
| + "did not report cleanup within timeout[%s]. " |
| + "See overlord and middleManager/indexer logs for more details.", |
| config.getTaskCleanupTimeout() |
| ) |
| ) |
| ); |
| } |
| } |
| } |
| catch (Exception e) { |
| log.makeAlert("Exception while cleaning up worker[%s]", workerHostAndPort).emit(); |
| throw new RuntimeException(e); |
| } |
| }, |
| config.getTaskCleanupTimeout().toStandardDuration().getMillis(), |
| TimeUnit.MILLISECONDS |
| ); |
| |
| removedWorkerCleanups.put(workerHostAndPort, cleanupTask); |
| |
| // Remove this entry from removedWorkerCleanups when done, if it's actually the one in there. |
| Futures.addCallback( |
| cleanupTask, |
| new FutureCallback<Object>() |
| { |
| @Override |
| public void onSuccess(Object result) |
| { |
| removedWorkerCleanups.remove(workerHostAndPort, cleanupTask); |
| } |
| |
| @Override |
| public void onFailure(Throwable t) |
| { |
| removedWorkerCleanups.remove(workerHostAndPort, cleanupTask); |
| } |
| } |
| ); |
| } |
| |
| private void scheduleSyncMonitoring() |
| { |
| workersSyncExec.scheduleAtFixedRate( |
| () -> { |
| log.debug("Running the Sync Monitoring."); |
| |
| try { |
| for (Map.Entry<String, WorkerHolder> e : workers.entrySet()) { |
| WorkerHolder workerHolder = e.getValue(); |
| if (!workerHolder.getUnderlyingSyncer().isOK()) { |
| synchronized (workers) { |
| // check again that server is still there and only then reset. |
| if (workers.containsKey(e.getKey())) { |
| log.makeAlert( |
| "Worker[%s] is not syncing properly. Current state is [%s]. Resetting it.", |
| workerHolder.getWorker().getHost(), |
| workerHolder.getUnderlyingSyncer().getDebugInfo() |
| ).emit(); |
| removeWorker(workerHolder.getWorker()); |
| addWorker(workerHolder.getWorker()); |
| } |
| } |
| } |
| } |
| } |
| catch (Exception ex) { |
| if (ex instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); |
| } else { |
| log.makeAlert(ex, "Exception in sync monitoring.").emit(); |
| } |
| } |
| }, |
| 1, |
| 5, |
| TimeUnit.MINUTES |
| ); |
| } |
| |
| /** |
| * This method returns the debugging information exposed by {@link HttpRemoteTaskRunnerResource} and meant |
| * for that use only. It must not be used for any other purpose. |
| */ |
| Map<String, Object> getWorkerSyncerDebugInfo() |
| { |
| Preconditions.checkArgument(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); |
| |
| Map<String, Object> result = Maps.newHashMapWithExpectedSize(workers.size()); |
| for (Map.Entry<String, WorkerHolder> e : workers.entrySet()) { |
| WorkerHolder serverHolder = e.getValue(); |
| result.put( |
| e.getKey(), |
| serverHolder.getUnderlyingSyncer().getDebugInfo() |
| ); |
| } |
| return result; |
| } |
| |
| private void checkAndRemoveWorkersFromBlackList() |
| { |
| boolean shouldRunPendingTasks = false; |
| |
| Iterator<Map.Entry<String, WorkerHolder>> iterator = blackListedWorkers.entrySet().iterator(); |
| while (iterator.hasNext()) { |
| Map.Entry<String, WorkerHolder> e = iterator.next(); |
| if (shouldRemoveNodeFromBlackList(e.getValue())) { |
| iterator.remove(); |
| e.getValue().resetContinuouslyFailedTasksCount(); |
| e.getValue().setBlacklistedUntil(null); |
| shouldRunPendingTasks = true; |
| } |
| } |
| |
| if (shouldRunPendingTasks) { |
| synchronized (statusLock) { |
| statusLock.notifyAll(); |
| } |
| } |
| } |
| |
| private boolean shouldRemoveNodeFromBlackList(WorkerHolder workerHolder) |
| { |
| if (!workers.containsKey(workerHolder.getWorker().getHost())) { |
| return true; |
| } |
| |
| if (blackListedWorkers.size() > workers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0)) { |
| log.info( |
| "Removing [%s] from blacklist because percentage of blacklisted workers exceeds [%d]", |
| workerHolder.getWorker(), |
| config.getMaxPercentageBlacklistWorkers() |
| ); |
| |
| return true; |
| } |
| |
| long remainingMillis = workerHolder.getBlacklistedUntil().getMillis() - System.currentTimeMillis(); |
| if (remainingMillis <= 0) { |
| log.info("Removing [%s] from blacklist because backoff time elapsed", workerHolder.getWorker()); |
| return true; |
| } |
| |
| log.info("[%s] still blacklisted for [%,ds]", workerHolder.getWorker(), remainingMillis / 1000); |
| return false; |
| } |
| |
| private void blacklistWorkerIfNeeded(TaskStatus taskStatus, WorkerHolder workerHolder) |
| { |
| synchronized (blackListedWorkers) { |
| if (taskStatus.isSuccess()) { |
| workerHolder.resetContinuouslyFailedTasksCount(); |
| if (blackListedWorkers.remove(workerHolder.getWorker().getHost()) != null) { |
| workerHolder.setBlacklistedUntil(null); |
| log.info("[%s] removed from blacklist because a task finished with SUCCESS", workerHolder.getWorker()); |
| } |
| } else if (taskStatus.isFailure()) { |
| workerHolder.incrementContinuouslyFailedTasksCount(); |
| } |
| |
| if (workerHolder.getContinuouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() && |
| blackListedWorkers.size() <= workers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) { |
| workerHolder.setBlacklistedUntil(DateTimes.nowUtc().plus(config.getWorkerBlackListBackoffTime())); |
| if (blackListedWorkers.put(workerHolder.getWorker().getHost(), workerHolder) == null) { |
| log.info( |
| "Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.", |
| workerHolder.getWorker(), |
| workerHolder.getBlacklistedUntil(), |
| workerHolder.getContinuouslyFailedTasksCount() |
| ); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public Collection<ImmutableWorkerInfo> getWorkers() |
| { |
| return workers.values().stream().map(worker -> worker.toImmutable()).collect(Collectors.toList()); |
| } |
| |
| @Override |
| public Collection<Worker> getLazyWorkers() |
| { |
| return lazyWorkers.values().stream().map(holder -> holder.getWorker()).collect(Collectors.toList()); |
| } |
| |
| @Override |
| public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers) |
| { |
| synchronized (statusLock) { |
| for (Map.Entry<String, WorkerHolder> worker : workers.entrySet()) { |
| final WorkerHolder workerHolder = worker.getValue(); |
| try { |
| if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && isLazyWorker.apply(workerHolder.toImmutable())) { |
| log.info("Adding Worker[%s] to lazySet!", workerHolder.getWorker().getHost()); |
| lazyWorkers.put(worker.getKey(), workerHolder); |
| if (lazyWorkers.size() == maxWorkers) { |
| // only mark excess workers as lazy and allow their cleanup |
| break; |
| } |
| } |
| } |
| catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| return getLazyWorkers(); |
| } |
| } |
| |
| private boolean isWorkerOkForMarkingLazy(Worker worker) |
| { |
| // Check that worker is not running any tasks and no task is being assigned to it. |
| synchronized (statusLock) { |
| if (workersWithUnacknowledgedTask.containsKey(worker.getHost())) { |
| return false; |
| } |
| |
| for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> e : tasks.entrySet()) { |
| if (e.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) { |
| Worker w = e.getValue().getWorker(); |
| if (w != null && w.getHost().equals(worker.getHost())) { |
| return false; |
| } |
| } |
| } |
| |
| return true; |
| } |
| } |
| |
| @Override |
| public WorkerTaskRunnerConfig getConfig() |
| { |
| return config; |
| } |
| |
| @Override |
| public Collection<Task> getPendingTaskPayloads() |
| { |
| synchronized (statusLock) { |
| return tasks.values() |
| .stream() |
| .filter(item -> item.getState().isPending()) |
| .map(HttpRemoteTaskRunnerWorkItem::getTask) |
| .collect(Collectors.toList()); |
| } |
| } |
| |
| @Override |
| public Optional<ByteSource> streamTaskLog(String taskId, long offset) |
| { |
| @SuppressWarnings("GuardedBy") // Read on tasks is safe |
| HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId); |
| Worker worker = null; |
| if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) { |
| worker = taskRunnerWorkItem.getWorker(); |
| } |
| |
| if (worker == null || !workers.containsKey(worker.getHost())) { |
| // Worker is not running this task, it might be available in deep storage |
| return Optional.absent(); |
| } else { |
| // Worker is still running this task |
| final URL url = TaskRunnerUtils.makeWorkerURL( |
| worker, |
| "/druid/worker/v1/task/%s/log?offset=%s", |
| taskId, |
| Long.toString(offset) |
| ); |
| return Optional.of( |
| new ByteSource() |
| { |
| @Override |
| public InputStream openStream() throws IOException |
| { |
| try { |
| return httpClient.go( |
| new Request(HttpMethod.GET, url), |
| new InputStreamResponseHandler() |
| ).get(); |
| } |
| catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| catch (ExecutionException e) { |
| // Unwrap if possible |
| Throwables.propagateIfPossible(e.getCause(), IOException.class); |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| ); |
| } |
| } |
| |
| @Override |
| public Optional<ByteSource> streamTaskReports(String taskId) |
| { |
| @SuppressWarnings("GuardedBy") // Read on tasks is safe |
| HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId); |
| Worker worker = null; |
| if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) { |
| worker = taskRunnerWorkItem.getWorker(); |
| } |
| |
| if (worker == null || !workers.containsKey(worker.getHost())) { |
| // Worker is not running this task, it might be available in deep storage |
| return Optional.absent(); |
| } else { |
| // Worker is still running this task |
| TaskLocation taskLocation = taskRunnerWorkItem.getLocation(); |
| final URL url = TaskRunnerUtils.makeTaskLocationURL( |
| taskLocation, |
| "/druid/worker/v1/chat/%s/liveReports", |
| taskId |
| ); |
| return Optional.of( |
| new ByteSource() |
| { |
| @Override |
| public InputStream openStream() throws IOException |
| { |
| try { |
| return httpClient.go( |
| new Request(HttpMethod.GET, url), |
| new InputStreamResponseHandler() |
| ).get(); |
| } |
| catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| catch (ExecutionException e) { |
| // Unwrap if possible |
| Throwables.propagateIfPossible(e.getCause(), IOException.class); |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| ); |
| } |
| } |
| |
| @Override |
| public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() |
| { |
| return ImmutableList.of(); |
| } |
| |
| @Override |
| public void registerListener(TaskRunnerListener listener, Executor executor) |
| { |
| for (Pair<TaskRunnerListener, Executor> pair : listeners) { |
| if (pair.lhs.getListenerId().equals(listener.getListenerId())) { |
| throw new ISE("Listener [%s] already registered", listener.getListenerId()); |
| } |
| } |
| |
| final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, executor); |
| |
| synchronized (statusLock) { |
| for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> entry : tasks.entrySet()) { |
| if (entry.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) { |
| TaskRunnerUtils.notifyLocationChanged( |
| ImmutableList.of(listenerPair), |
| entry.getKey(), |
| entry.getValue().getLocation() |
| ); |
| } |
| } |
| |
| log.info("Registered listener [%s]", listener.getListenerId()); |
| listeners.add(listenerPair); |
| } |
| } |
| |
| @Override |
| public void unregisterListener(String listenerId) |
| { |
| for (Pair<TaskRunnerListener, Executor> pair : listeners) { |
| if (pair.lhs.getListenerId().equals(listenerId)) { |
| listeners.remove(pair); |
| log.info("Unregistered listener [%s]", listenerId); |
| return; |
| } |
| } |
| } |
| |
| @Override |
| public ListenableFuture<TaskStatus> run(Task task) |
| { |
| Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS), "not started"); |
| |
| synchronized (statusLock) { |
| HttpRemoteTaskRunnerWorkItem existing = tasks.get(task.getId()); |
| |
| if (existing != null) { |
| log.info("Assigned a task[%s] that is known already. Ignored.", task.getId()); |
| if (existing.getTask() == null) { |
| // in case it was discovered from a worker on start() and TaskAnnouncement does not have Task instance |
| // in it. |
| existing.setTask(task); |
| } |
| return existing.getResult(); |
| } else { |
| log.info("Adding pending task[%s].", task.getId()); |
| HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = new HttpRemoteTaskRunnerWorkItem( |
| task.getId(), |
| null, |
| null, |
| task, |
| task.getType(), |
| HttpRemoteTaskRunnerWorkItem.State.PENDING |
| ); |
| tasks.put(task.getId(), taskRunnerWorkItem); |
| pendingTaskIds.add(task.getId()); |
| |
| statusLock.notifyAll(); |
| |
| return taskRunnerWorkItem.getResult(); |
| } |
| } |
| } |
| |
| private void startPendingTaskHandling() |
| { |
| for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { |
| pendingTasksExec.submit( |
| () -> { |
| try { |
| if (!lifecycleLock.awaitStarted()) { |
| log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); |
| return; |
| } |
| |
| pendingTasksExecutionLoop(); |
| } |
| catch (Throwable t) { |
| log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") |
| .emit(); |
| } |
| finally { |
| log.info("PendingTaskExecution loop exited."); |
| } |
| } |
| ); |
| } |
| } |
| |
| private void pendingTasksExecutionLoop() |
| { |
| while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { |
| try { |
| // Find one pending task to run and a worker to run on |
| HttpRemoteTaskRunnerWorkItem taskItem = null; |
| ImmutableWorkerInfo immutableWorker = null; |
| |
| synchronized (statusLock) { |
| Iterator<String> iter = pendingTaskIds.iterator(); |
| while (iter.hasNext()) { |
| String taskId = iter.next(); |
| HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); |
| |
| if (ti == null || !ti.getState().isPending()) { |
| // happens if the task was shutdown, failed or observed running by a worker |
| iter.remove(); |
| continue; |
| } |
| |
| if (ti.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) { |
| // picked up by another pending task executor thread which is in the process of trying to |
| // run it on a worker, skip to next. |
| continue; |
| } |
| |
| if (ti.getTask() == null) { |
| // this is not supposed to happen except for a bug, we want to mark this task failed but |
| // taskComplete(..) can not be called while holding statusLock. See the javadoc on that |
| // method. |
| // so this will get marked failed afterwards outside of current synchronized block. |
| taskItem = ti; |
| break; |
| } |
| |
| immutableWorker = findWorkerToRunTask(ti.getTask()); |
| if (immutableWorker == null) { |
| continue; |
| } |
| |
| String prevUnackedTaskId = workersWithUnacknowledgedTask.putIfAbsent( |
| immutableWorker.getWorker().getHost(), |
| taskId |
| ); |
| if (prevUnackedTaskId != null) { |
| log.makeAlert( |
| "Found worker[%s] with unacked task[%s] but still was identified to run task[%s].", |
| immutableWorker.getWorker().getHost(), |
| prevUnackedTaskId, |
| taskId |
| ).emit(); |
| } |
| |
| // set state to PENDING_WORKER_ASSIGN before releasing the lock so that this task item is not picked |
| // up by another task execution thread. |
| // note that we can't simply delete this task item from pendingTaskIds or else we would have to add it |
| // back if this thread couldn't run this task for any reason, which we will know at some later time |
| // and also we will need to add it back to its old position in the list. that becomes complex quickly. |
| // Instead we keep the PENDING_WORKER_ASSIGN to notify other task execution threads not to pick this one up. |
| // And, it is automatically removed by any of the task execution threads when they notice that |
| // ti.getState().isPending() is false (at the beginning of this loop) |
| ti.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN); |
| taskItem = ti; |
| break; |
| } |
| |
| if (taskItem == null) { |
| // Either no pending task is found or no suitable worker is found for any of the pending tasks. |
| // statusLock.notifyAll() is called whenever a new task shows up or if there is a possibility for a task |
| // to successfully get worker to run, for example when a new worker shows up, a task slot opens up |
| // because some task completed etc. |
| statusLock.wait(TimeUnit.MINUTES.toMillis(1)); |
| continue; |
| } |
| } |
| |
| String taskId = taskItem.getTaskId(); |
| |
| if (taskItem.getTask() == null) { |
| log.makeAlert("No Task obj found in TaskItem for taskID[%s]. Failed.", taskId).emit(); |
| // taskComplete(..) must be called outside of statusLock, see comments on method. |
| taskComplete( |
| taskItem, |
| null, |
| TaskStatus.failure( |
| taskId, |
| "No payload found for this task. " |
| + "See overlord logs and middleManager/indexer logs for more details." |
| ) |
| ); |
| continue; |
| } |
| |
| if (immutableWorker == null) { |
| throw new ISE("Unexpected state: null immutableWorker"); |
| } |
| |
| try { |
| // this will send HTTP request to worker for assigning task |
| if (!runTaskOnWorker(taskItem, immutableWorker.getWorker().getHost())) { |
| if (taskItem.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) { |
| taskItem.revertStateFromPendingWorkerAssignToPending(); |
| } |
| } |
| } |
| catch (InterruptedException ex) { |
| log.info("Got InterruptedException while assigning task[%s].", taskId); |
| throw ex; |
| } |
| catch (Throwable th) { |
| log.makeAlert(th, "Exception while trying to assign task") |
| .addData("taskId", taskId) |
| .emit(); |
| |
| // taskComplete(..) must be called outside of statusLock, see comments on method. |
| taskComplete( |
| taskItem, |
| null, |
| TaskStatus.failure(taskId, "Failed to assign this task. See overlord logs for more details.") |
| ); |
| } |
| finally { |
| synchronized (statusLock) { |
| workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost()); |
| statusLock.notifyAll(); |
| } |
| } |
| |
| } |
| catch (InterruptedException ex) { |
| log.info("Interrupted, will Exit."); |
| Thread.currentThread().interrupt(); |
| } |
| catch (Throwable th) { |
| log.makeAlert(th, "Unknown Exception while trying to assign tasks.").emit(); |
| } |
| } |
| } |
| |
| /** |
| * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} |
| */ |
| List<String> getPendingTasksList() |
| { |
| synchronized (statusLock) { |
| return ImmutableList.copyOf(pendingTaskIds); |
| } |
| } |
| |
| @Override |
| public void shutdown(String taskId, String reason) |
| { |
| if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) { |
| log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", taskId); |
| return; |
| } |
| |
| WorkerHolder workerHolderRunningTask = null; |
| synchronized (statusLock) { |
| log.info("Shutdown [%s] because: [%s]", taskId, reason); |
| HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.remove(taskId); |
| if (taskRunnerWorkItem != null) { |
| if (taskRunnerWorkItem.getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) { |
| workerHolderRunningTask = workers.get(taskRunnerWorkItem.getWorker().getHost()); |
| if (workerHolderRunningTask == null) { |
| log.info("Can't shutdown! No worker running task[%s]", taskId); |
| } |
| } |
| } else { |
| log.info("Received shutdown task[%s], but can't find it. Ignored.", taskId); |
| } |
| } |
| |
| //shutdown is called outside of lock as we don't want to hold the lock while sending http request |
| //to worker. |
| if (workerHolderRunningTask != null) { |
| log.debug( |
| "Got shutdown request for task[%s]. Asking worker[%s] to kill it.", |
| taskId, |
| workerHolderRunningTask.getWorker().getHost() |
| ); |
| workerHolderRunningTask.shutdownTask(taskId); |
| } |
| } |
| |
| @Override |
| @LifecycleStop |
| public void stop() |
| { |
| if (!lifecycleLock.canStop()) { |
| throw new ISE("can't stop."); |
| } |
| |
| log.info("Stopping..."); |
| |
| pendingTasksExec.shutdownNow(); |
| workersSyncExec.shutdownNow(); |
| cleanupExec.shutdown(); |
| |
| log.info("Stopped."); |
| } |
| |
| @Override |
| @SuppressWarnings("GuardedBy") // Read on tasks is safe |
| public Collection<? extends TaskRunnerWorkItem> getRunningTasks() |
| { |
| return tasks.values() |
| .stream() |
| .filter(item -> item.getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) |
| .collect(Collectors.toList()); |
| } |
| |
| @Override |
| @SuppressWarnings("GuardedBy") // Read on tasks is safe |
| public Collection<? extends TaskRunnerWorkItem> getPendingTasks() |
| { |
| return tasks.values() |
| .stream() |
| .filter(item -> item.getState().isPending()) |
| .collect(Collectors.toList()); |
| } |
| |
| @Override |
| public Collection<? extends TaskRunnerWorkItem> getKnownTasks() |
| { |
| synchronized (statusLock) { |
| return ImmutableList.copyOf(tasks.values()); |
| } |
| } |
| |
| @SuppressWarnings("GuardedBy") // Read on tasks is safe |
| public Collection<? extends TaskRunnerWorkItem> getCompletedTasks() |
| { |
| return tasks.values() |
| .stream() |
| .filter(item -> item.getState() == HttpRemoteTaskRunnerWorkItem.State.COMPLETE) |
| .collect(Collectors.toList()); |
| } |
| |
| @Nullable |
| @Override |
| @SuppressWarnings("GuardedBy") // Read on tasks is safe |
| public RunnerTaskState getRunnerTaskState(String taskId) |
| { |
| final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId); |
| if (workItem == null) { |
| return null; |
| } else { |
| return workItem.getState().toRunnerTaskState(); |
| } |
| } |
| |
| @Override |
| @SuppressWarnings("GuardedBy") // Read on tasks is safe |
| public TaskLocation getTaskLocation(String taskId) |
| { |
| final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId); |
| if (workItem == null) { |
| return TaskLocation.unknown(); |
| } else { |
| return workItem.getLocation(); |
| } |
| } |
| |
| public List<String> getBlacklistedWorkers() |
| { |
| return blackListedWorkers.values().stream().map( |
| (holder) -> holder.getWorker().getHost() |
| ).collect(Collectors.toList()); |
| } |
| |
| public Collection<ImmutableWorkerInfo> getBlackListedWorkers() |
| { |
| return ImmutableList.copyOf(Collections2.transform(blackListedWorkers.values(), WorkerHolder::toImmutable)); |
| } |
| |
| /** |
| * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} , used for read only. |
| */ |
| @SuppressWarnings("GuardedBy") |
| Map<String, String> getWorkersWithUnacknowledgedTasks() |
| { |
| return workersWithUnacknowledgedTask; |
| } |
| |
| @Override |
| public Optional<ScalingStats> getScalingStats() |
| { |
| return Optional.fromNullable(provisioningService.getStats()); |
| } |
| |
| void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder workerHolder) |
| { |
| final String taskId = announcement.getTaskId(); |
| final Worker worker = workerHolder.getWorker(); |
| |
| log.debug( |
| "Worker[%s] wrote [%s] status for task [%s] on [%s]", |
| worker.getHost(), |
| announcement.getTaskStatus().getStatusCode(), |
| taskId, |
| announcement.getTaskLocation() |
| ); |
| |
| HttpRemoteTaskRunnerWorkItem taskItem; |
| boolean shouldShutdownTask = false; |
| boolean isTaskCompleted = false; |
| |
| synchronized (statusLock) { |
| taskItem = tasks.get(taskId); |
| if (taskItem == null) { |
| // Try to find information about it in the TaskStorage |
| Optional<TaskStatus> knownStatusInStorage = taskStorage.getStatus(taskId); |
| |
| if (knownStatusInStorage.isPresent()) { |
| switch (knownStatusInStorage.get().getStatusCode()) { |
| case RUNNING: |
| taskItem = new HttpRemoteTaskRunnerWorkItem( |
| taskId, |
| worker, |
| TaskLocation.unknown(), |
| null, |
| announcement.getTaskType(), |
| HttpRemoteTaskRunnerWorkItem.State.RUNNING |
| ); |
| tasks.put(taskId, taskItem); |
| break; |
| case SUCCESS: |
| case FAILED: |
| if (!announcement.getTaskStatus().isComplete()) { |
| log.info( |
| "Worker[%s] reported status for completed, known from taskStorage, task[%s]. Ignored.", |
| worker.getHost(), |
| taskId |
| ); |
| } |
| break; |
| default: |
| log.makeAlert( |
| "Found unrecognized state[%s] of task[%s] in taskStorage. Notification[%s] from worker[%s] is ignored.", |
| knownStatusInStorage.get().getStatusCode(), |
| taskId, |
| announcement, |
| worker.getHost() |
| ).emit(); |
| } |
| } else { |
| log.warn( |
| "Worker[%s] reported status[%s] for unknown task[%s]. Ignored.", |
| worker.getHost(), |
| announcement.getStatus(), |
| taskId |
| ); |
| } |
| } |
| |
| if (taskItem == null) { |
| if (!announcement.getTaskStatus().isComplete()) { |
| shouldShutdownTask = true; |
| } |
| } else { |
| switch (announcement.getTaskStatus().getStatusCode()) { |
| case RUNNING: |
| switch (taskItem.getState()) { |
| case PENDING: |
| case PENDING_WORKER_ASSIGN: |
| taskItem.setWorker(worker); |
| taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING); |
| log.info("Task[%s] started RUNNING on worker[%s].", taskId, worker.getHost()); |
| // fall through |
| case RUNNING: |
| if (worker.getHost().equals(taskItem.getWorker().getHost())) { |
| if (!announcement.getTaskLocation().equals(taskItem.getLocation())) { |
| log.info( |
| "Task[%s] location changed on worker[%s]. new location[%s].", |
| taskId, |
| worker.getHost(), |
| announcement.getTaskLocation() |
| ); |
| taskItem.setLocation(announcement.getTaskLocation()); |
| TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); |
| } |
| } else { |
| log.warn( |
| "Found worker[%s] running task[%s] which is being run by another worker[%s]. Notification ignored.", |
| worker.getHost(), |
| taskId, |
| taskItem.getWorker().getHost() |
| ); |
| shouldShutdownTask = true; |
| } |
| break; |
| case COMPLETE: |
| log.warn( |
| "Worker[%s] reported status for completed task[%s]. Ignored.", |
| worker.getHost(), |
| taskId |
| ); |
| shouldShutdownTask = true; |
| break; |
| default: |
| log.makeAlert( |
| "Found unrecognized state[%s] of task[%s]. Notification[%s] from worker[%s] is ignored.", |
| taskItem.getState(), |
| taskId, |
| announcement, |
| worker.getHost() |
| ).emit(); |
| } |
| break; |
| case FAILED: |
| case SUCCESS: |
| switch (taskItem.getState()) { |
| case PENDING: |
| case PENDING_WORKER_ASSIGN: |
| taskItem.setWorker(worker); |
| taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING); |
| log.info("Task[%s] finished on worker[%s].", taskId, worker.getHost()); |
| // fall through |
| case RUNNING: |
| if (worker.getHost().equals(taskItem.getWorker().getHost())) { |
| if (!announcement.getTaskLocation().equals(taskItem.getLocation())) { |
| log.info( |
| "Task[%s] location changed on worker[%s]. new location[%s].", |
| taskId, |
| worker.getHost(), |
| announcement.getTaskLocation() |
| ); |
| taskItem.setLocation(announcement.getTaskLocation()); |
| TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); |
| } |
| |
| isTaskCompleted = true; |
| } else { |
| log.warn( |
| "Worker[%s] reported completed task[%s] which is being run by another worker[%s]. Notification ignored.", |
| worker.getHost(), |
| taskId, |
| taskItem.getWorker().getHost() |
| ); |
| } |
| break; |
| case COMPLETE: |
| // this can happen when a worker is restarted and reports its list of completed tasks again. |
| break; |
| default: |
| log.makeAlert( |
| "Found unrecognized state[%s] of task[%s]. Notification[%s] from worker[%s] is ignored.", |
| taskItem.getState(), |
| taskId, |
| announcement, |
| worker.getHost() |
| ).emit(); |
| } |
| break; |
| default: |
| log.makeAlert( |
| "Worker[%s] reported unrecognized state[%s] for task[%s].", |
| worker.getHost(), |
| announcement.getTaskStatus().getStatusCode(), |
| taskId |
| ).emit(); |
| } |
| } |
| } |
| |
| if (isTaskCompleted) { |
| // taskComplete(..) must be called outside of statusLock, see comments on method. |
| taskComplete(taskItem, workerHolder, announcement.getTaskStatus()); |
| } |
| |
| if (shouldShutdownTask) { |
| log.warn("Killing task[%s] on worker[%s].", taskId, worker.getHost()); |
| workerHolder.shutdownTask(taskId); |
| } |
| |
| synchronized (statusLock) { |
| statusLock.notifyAll(); |
| } |
| } |
| |
| @Override |
| public long getTotalTaskSlotCount() |
| { |
| long totalPeons = 0; |
| for (ImmutableWorkerInfo worker : getWorkers()) { |
| totalPeons += worker.getWorker().getCapacity(); |
| } |
| |
| return totalPeons; |
| } |
| |
| @Override |
| public long getIdleTaskSlotCount() |
| { |
| long totalIdlePeons = 0; |
| for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) { |
| totalIdlePeons += worker.getAvailableCapacity(); |
| } |
| |
| return totalIdlePeons; |
| } |
| |
| @Override |
| public long getUsedTaskSlotCount() |
| { |
| long totalUsedPeons = 0; |
| for (ImmutableWorkerInfo worker : getWorkers()) { |
| totalUsedPeons += worker.getCurrCapacityUsed(); |
| } |
| |
| return totalUsedPeons; |
| } |
| |
| @Override |
| public long getLazyTaskSlotCount() |
| { |
| long totalLazyPeons = 0; |
| for (Worker worker : getLazyWorkers()) { |
| totalLazyPeons += worker.getCapacity(); |
| } |
| |
| return totalLazyPeons; |
| } |
| |
| @Override |
| public long getBlacklistedTaskSlotCount() |
| { |
| long totalBlacklistedPeons = 0; |
| for (ImmutableWorkerInfo worker : getBlackListedWorkers()) { |
| totalBlacklistedPeons += worker.getWorker().getCapacity(); |
| } |
| |
| return totalBlacklistedPeons; |
| } |
| |
| private static class HttpRemoteTaskRunnerWorkItem extends RemoteTaskRunnerWorkItem |
| { |
| enum State |
| { |
| // Task has been given to HRTR, but a worker to run this task hasn't been identified yet. |
| PENDING(0, true, RunnerTaskState.PENDING), |
| |
| // A Worker has been identified to run this task, but request to run task hasn't been made to worker yet |
| // or worker hasn't acknowledged the task yet. |
| PENDING_WORKER_ASSIGN(1, true, RunnerTaskState.PENDING), |
| |
| RUNNING(2, false, RunnerTaskState.RUNNING), |
| COMPLETE(3, false, RunnerTaskState.NONE); |
| |
| private final int index; |
| private final boolean isPending; |
| private final RunnerTaskState runnerTaskState; |
| |
| State(int index, boolean isPending, RunnerTaskState runnerTaskState) |
| { |
| this.index = index; |
| this.isPending = isPending; |
| this.runnerTaskState = runnerTaskState; |
| } |
| |
| boolean isPending() |
| { |
| return isPending; |
| } |
| |
| RunnerTaskState toRunnerTaskState() |
| { |
| return runnerTaskState; |
| } |
| } |
| |
| private Task task; |
| private State state; |
| |
| HttpRemoteTaskRunnerWorkItem( |
| String taskId, |
| Worker worker, |
| TaskLocation location, |
| @Nullable Task task, |
| String taskType, |
| State state |
| ) |
| { |
| super(taskId, task == null ? null : task.getType(), worker, location, task == null ? null : task.getDataSource()); |
| this.state = Preconditions.checkNotNull(state); |
| Preconditions.checkArgument(task == null || taskType == null || taskType.equals(task.getType())); |
| |
| // It is possible to have it null when the TaskRunner is just started and discovered this taskId from a worker, |
| // notifications don't contain whole Task instance but just metadata about the task. |
| this.task = task; |
| } |
| |
| public Task getTask() |
| { |
| return task; |
| } |
| |
| public void setTask(Task task) |
| { |
| this.task = task; |
| if (getTaskType() == null) { |
| setTaskType(task.getType()); |
| } else { |
| Preconditions.checkArgument(getTaskType().equals(task.getType())); |
| } |
| } |
| |
| @JsonProperty |
| public State getState() |
| { |
| return state; |
| } |
| |
| @Override |
| public void setResult(TaskStatus status) |
| { |
| setState(State.COMPLETE); |
| super.setResult(status); |
| } |
| |
| public void setState(State state) |
| { |
| Preconditions.checkArgument( |
| state.index - this.state.index > 0, |
| "Invalid state transition from [%s] to [%s]", |
| this.state, |
| state |
| ); |
| |
| setStateUnconditionally(state); |
| } |
| |
| public void revertStateFromPendingWorkerAssignToPending() |
| { |
| Preconditions.checkState( |
| this.state == State.PENDING_WORKER_ASSIGN, |
| "Can't move state from [%s] to [%s]", |
| this.state, |
| State.PENDING |
| ); |
| |
| setStateUnconditionally(State.PENDING); |
| } |
| |
| private void setStateUnconditionally(State state) |
| { |
| if (log.isDebugEnabled()) { |
| // Exception is logged to know what led to this call. |
| log.debug( |
| new RuntimeException("Stacktrace..."), |
| "Setting task[%s] work item state from [%s] to [%s].", |
| getTaskId(), |
| this.state, |
| state |
| ); |
| } |
| this.state = state; |
| } |
| } |
| } |