blob: 9fa8a28957ac45b93c4911541c0d428bf31a7f30 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.EmittingLogger;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
/**
* Base class for {@link ForkingTaskRunner} and {@link ThreadingTaskRunner} which support task restoration.
*/
public abstract class BaseRestorableTaskRunner<WorkItemType extends TaskRunnerWorkItem> implements TaskRunner
{
protected static final EmittingLogger LOG = new EmittingLogger(BaseRestorableTaskRunner.class);
protected static final String TASK_RESTORE_FILENAME = "restore.json";
protected final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
/**
* Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting.
*/
protected final ConcurrentHashMap<String, WorkItemType> tasks = new ConcurrentHashMap<>();
protected final ObjectMapper jsonMapper;
protected final TaskConfig taskConfig;
public BaseRestorableTaskRunner(
ObjectMapper jsonMapper,
TaskConfig taskConfig
)
{
this.jsonMapper = jsonMapper;
this.taskConfig = taskConfig;
}
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
final File restoreFile = getRestoreFile();
final TaskRestoreInfo taskRestoreInfo;
if (restoreFile.exists()) {
try {
taskRestoreInfo = jsonMapper.readValue(restoreFile, TaskRestoreInfo.class);
}
catch (Exception e) {
LOG.error(e, "Failed to read restorable tasks from file[%s]. Skipping restore.", restoreFile);
return ImmutableList.of();
}
} else {
return ImmutableList.of();
}
final List<Pair<Task, ListenableFuture<TaskStatus>>> retVal = new ArrayList<>();
for (final String taskId : taskRestoreInfo.getRunningTasks()) {
try {
final File taskFile = new File(taskConfig.getTaskDir(taskId), "task.json");
final Task task = jsonMapper.readValue(taskFile, Task.class);
if (!task.getId().equals(taskId)) {
throw new ISE("Task[%s] restore file had wrong id[%s]", taskId, task.getId());
}
if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
LOG.info("Restoring task[%s].", task.getId());
retVal.add(Pair.of(task, run(task)));
}
}
catch (Exception e) {
LOG.warn(e, "Failed to restore task[%s]. Trying to restore other tasks.", taskId);
}
}
if (!retVal.isEmpty()) {
LOG.info("Restored %,d tasks: %s", retVal.size(), Joiner.on(", ").join(retVal));
}
return retVal;
}
@Override
public void registerListener(TaskRunnerListener listener, Executor executor)
{
for (Pair<TaskRunnerListener, Executor> pair : listeners) {
if (pair.lhs.getListenerId().equals(listener.getListenerId())) {
throw new ISE("Listener [%s] already registered", listener.getListenerId());
}
}
final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, executor);
synchronized (tasks) {
for (TaskRunnerWorkItem item : tasks.values()) {
TaskRunnerUtils.notifyLocationChanged(ImmutableList.of(listenerPair), item.getTaskId(), item.getLocation());
}
listeners.add(listenerPair);
LOG.debug("Registered listener [%s]", listener.getListenerId());
}
}
@Override
public void unregisterListener(String listenerId)
{
for (Pair<TaskRunnerListener, Executor> pair : listeners) {
if (pair.lhs.getListenerId().equals(listenerId)) {
listeners.remove(pair);
LOG.debug("Unregistered listener [%s]", listenerId);
return;
}
}
}
@Override
public abstract Collection<TaskRunnerWorkItem> getRunningTasks();
@Override
public abstract Collection<TaskRunnerWorkItem> getPendingTasks();
@Nullable
@Override
public abstract RunnerTaskState getRunnerTaskState(String taskId);
@Override
public Collection<TaskRunnerWorkItem> getKnownTasks()
{
synchronized (tasks) {
return Lists.newArrayList(tasks.values());
}
}
/**
* Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that
* occur while saving.
*/
@GuardedBy("tasks")
protected void saveRunningTasks()
{
final File restoreFile = getRestoreFile();
final List<String> theTasks = new ArrayList<>();
for (TaskRunnerWorkItem forkingTaskRunnerWorkItem : tasks.values()) {
theTasks.add(forkingTaskRunnerWorkItem.getTaskId());
}
try {
Files.createParentDirs(restoreFile);
jsonMapper.writeValue(restoreFile, new TaskRestoreInfo(theTasks));
}
catch (Exception e) {
LOG.warn(e, "Failed to save tasks to restore file[%s]. Skipping this save.", restoreFile);
}
}
protected File getRestoreFile()
{
return new File(taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME);
}
protected static class TaskRestoreInfo
{
@JsonProperty
private final List<String> runningTasks;
@JsonCreator
public TaskRestoreInfo(
@JsonProperty("runningTasks") List<String> runningTasks
)
{
this.runningTasks = runningTasks;
}
public List<String> getRunningTasks()
{
return runningTasks;
}
}
}