blob: 57c117da5787528b0edbd626888fd97828a2a23a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.MonitorEntry;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Base class for different implementations of {@link ParallelIndexTaskRunner}.
* It creates sub tasks, schedule them, and monitor their status.
*/
public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTaskReportType extends SubTaskReport>
implements ParallelIndexTaskRunner<SubTaskType, SubTaskReportType>
{
private static final Logger LOG = new Logger(ParallelIndexPhaseRunner.class);
private final TaskToolbox toolbox;
private final String taskId;
private final String groupId;
private final ParallelIndexTuningConfig tuningConfig;
private final Map<String, Object> context;
/**
* Max number of sub tasks which can be executed concurrently at the same time.
*/
private final int maxNumConcurrentSubTasks;
private final IndexingServiceClient indexingServiceClient;
private final BlockingQueue<SubTaskCompleteEvent<SubTaskType>> taskCompleteEvents = new LinkedBlockingDeque<>();
// subTaskId -> report
private final ConcurrentHashMap<String, SubTaskReportType> reportsMap = new ConcurrentHashMap<>();
private volatile boolean subTaskScheduleAndMonitorStopped;
private volatile TaskMonitor<SubTaskType> taskMonitor;
private int nextSpecId = 0;
ParallelIndexPhaseRunner(
TaskToolbox toolbox,
String taskId,
String groupId,
ParallelIndexTuningConfig tuningConfig,
Map<String, Object> context,
IndexingServiceClient indexingServiceClient
)
{
this.toolbox = toolbox;
this.taskId = taskId;
this.groupId = groupId;
this.tuningConfig = tuningConfig;
this.context = context;
this.maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks();
this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
}
/**
* Returns an iterator for {@link SubTaskSpec}s of this phase.
*/
abstract Iterator<SubTaskSpec<SubTaskType>> subTaskSpecIterator() throws IOException;
/**
* Returns the total number of sub tasks required to execute this phase.
*/
abstract int estimateTotalNumSubTasks() throws IOException;
@Override
public TaskState run() throws Exception
{
final CountingSubTaskSpecIterator subTaskSpecIterator = new CountingSubTaskSpecIterator(subTaskSpecIterator());
if (!subTaskSpecIterator.hasNext()) {
LOG.warn("There's no input split to process");
return TaskState.SUCCESS;
}
final long taskStatusCheckingPeriod = tuningConfig.getTaskStatusCheckPeriodMs();
taskMonitor = new TaskMonitor<>(
Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient"),
tuningConfig.getMaxRetry(),
estimateTotalNumSubTasks()
);
TaskState state = TaskState.RUNNING;
taskMonitor.start(taskStatusCheckingPeriod);
try {
LOG.info("Submitting initial tasks");
// Submit initial tasks
while (isRunning() && subTaskSpecIterator.hasNext() && taskMonitor.getNumRunningTasks() < maxNumConcurrentSubTasks) {
submitNewTask(taskMonitor, subTaskSpecIterator.next());
}
LOG.info("Waiting for subTasks to be completed");
while (isRunning()) {
final SubTaskCompleteEvent<SubTaskType> taskCompleteEvent = taskCompleteEvents.poll(
taskStatusCheckingPeriod,
TimeUnit.MILLISECONDS
);
if (taskCompleteEvent != null) {
final TaskState completeState = taskCompleteEvent.getLastState();
switch (completeState) {
case SUCCESS:
final TaskStatusPlus completeStatus = taskCompleteEvent.getLastStatus();
if (completeStatus == null) {
throw new ISE("Last status of complete task is missing!");
}
// Pushed segments of complete tasks are supposed to be already reported.
if (!reportsMap.containsKey(completeStatus.getId())) {
throw new ISE("Missing reports from task[%s]!", completeStatus.getId());
}
if (!subTaskSpecIterator.hasNext()) {
// We have no more subTasks to run
if (taskMonitor.getNumRunningTasks() == 0 && taskCompleteEvents.isEmpty()) {
subTaskScheduleAndMonitorStopped = true;
if (subTaskSpecIterator.count == taskMonitor.getNumSucceededTasks()) {
// Succeeded
state = TaskState.SUCCESS;
} else {
// Failed
final ParallelIndexingPhaseProgress monitorStatus = taskMonitor.getProgress();
throw new ISE(
"Expected [%d] tasks to succeed, but we got [%d] succeeded tasks and [%d] failed tasks",
subTaskSpecIterator.count,
monitorStatus.getSucceeded(),
monitorStatus.getFailed()
);
}
}
} else if (taskMonitor.getNumRunningTasks() < maxNumConcurrentSubTasks) {
// We have more subTasks to run
submitNewTask(taskMonitor, subTaskSpecIterator.next());
} else {
// We have more subTasks to run, but don't have enough available task slots
// do nothing
}
break;
case FAILED:
// TaskMonitor already tried everything it can do for failed tasks. We failed.
state = TaskState.FAILED;
subTaskScheduleAndMonitorStopped = true;
final TaskStatusPlus lastStatus = taskCompleteEvent.getLastStatus();
if (lastStatus != null) {
LOG.error("Failed because of the failed sub task[%s]", lastStatus.getId());
} else {
final SinglePhaseSubTaskSpec spec = (SinglePhaseSubTaskSpec) taskCompleteEvent.getSpec();
LOG.error("Failed to run sub tasks for inputSplits[%s]", spec.getInputSplit());
}
break;
default:
throw new ISE("spec[%s] is in an invalid state[%s]", taskCompleteEvent.getSpec().getId(), completeState);
}
}
}
}
finally {
stopInternal();
if (!state.isComplete()) {
state = TaskState.FAILED;
}
}
return state;
}
private class CountingSubTaskSpecIterator implements Iterator<SubTaskSpec<SubTaskType>>
{
private final Iterator<SubTaskSpec<SubTaskType>> delegate;
private int count;
private CountingSubTaskSpecIterator(Iterator<SubTaskSpec<SubTaskType>> delegate)
{
this.delegate = delegate;
}
@Override
public boolean hasNext()
{
return delegate.hasNext();
}
@Override
public SubTaskSpec<SubTaskType> next()
{
if (!delegate.hasNext()) {
throw new NoSuchElementException();
}
count++;
return delegate.next();
}
}
private boolean isRunning()
{
return !subTaskScheduleAndMonitorStopped && !Thread.currentThread().isInterrupted();
}
private void submitNewTask(
TaskMonitor<SubTaskType> taskMonitor,
SubTaskSpec<SubTaskType> spec
)
{
LOG.info("Submit a new task for spec[%s] and inputSplit[%s]", spec.getId(), spec.getInputSplit());
final ListenableFuture<SubTaskCompleteEvent<SubTaskType>> future = taskMonitor.submit(spec);
Futures.addCallback(
future,
new FutureCallback<SubTaskCompleteEvent<SubTaskType>>()
{
@Override
public void onSuccess(SubTaskCompleteEvent<SubTaskType> completeEvent)
{
// this callback is called if a task completed whether it succeeded or not.
taskCompleteEvents.offer(completeEvent);
}
@Override
public void onFailure(Throwable t)
{
// this callback is called only when there were some problems in TaskMonitor.
LOG.error(t, "Error while running a task for subTaskSpec[%s]", spec);
taskCompleteEvents.offer(SubTaskCompleteEvent.fail(spec, t));
}
}
);
}
private static List<InputSplit> getSplitsIfSplittable(
InputSource inputSource,
InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec
) throws IOException
{
if (inputSource instanceof SplittableInputSource) {
final SplittableInputSource<?> splittableInputSource = (SplittableInputSource) inputSource;
return splittableInputSource.createSplits(inputFormat, splitHintSpec).collect(Collectors.toList());
} else {
throw new ISE("inputSource[%s] is not splittable", inputSource.getClass().getSimpleName());
}
}
@Override
public void stopGracefully()
{
subTaskScheduleAndMonitorStopped = true;
stopInternal();
}
/**
* Stop task scheduling and monitoring, and kill all running tasks.
* This method is thread-safe.
*/
private void stopInternal()
{
LOG.info("Cleaning up resources");
taskCompleteEvents.clear();
if (taskMonitor != null) {
taskMonitor.stop();
}
}
@Override
public void collectReport(SubTaskReportType report)
{
// subTasks might send their reports multiple times because of the HTTP retry.
// Here, we simply make sure the current report is exactly same with the previous one.
reportsMap.compute(report.getTaskId(), (taskId, prevReport) -> {
if (prevReport != null) {
Preconditions.checkState(
prevReport.equals(report),
"task[%s] sent two or more reports and previous report[%s] is different from the current one[%s]",
taskId,
prevReport,
report
);
}
return report;
});
}
@Override
public Map<String, SubTaskReportType> getReports()
{
return reportsMap;
}
@Override
public ParallelIndexingPhaseProgress getProgress()
{
return taskMonitor == null ? ParallelIndexingPhaseProgress.notRunning() : taskMonitor.getProgress();
}
@Override
public Set<String> getRunningTaskIds()
{
return taskMonitor == null ? Collections.emptySet() : taskMonitor.getRunningTaskIds();
}
@Override
public List<SubTaskSpec<SubTaskType>> getSubTaskSpecs()
{
if (taskMonitor != null) {
final List<SubTaskSpec<SubTaskType>> runningSubTaskSpecs = taskMonitor.getRunningSubTaskSpecs();
final List<SubTaskSpec<SubTaskType>> completeSubTaskSpecs = taskMonitor
.getCompleteSubTaskSpecs();
// Deduplicate subTaskSpecs because some subTaskSpec might exist both in runningSubTaskSpecs and
// completeSubTaskSpecs.
final Map<String, SubTaskSpec<SubTaskType>> subTaskSpecMap = Maps.newHashMapWithExpectedSize(
runningSubTaskSpecs.size() + completeSubTaskSpecs.size()
);
runningSubTaskSpecs.forEach(spec -> subTaskSpecMap.put(spec.getId(), spec));
completeSubTaskSpecs.forEach(spec -> subTaskSpecMap.put(spec.getId(), spec));
return new ArrayList<>(subTaskSpecMap.values());
} else {
return Collections.emptyList();
}
}
@Override
public List<SubTaskSpec<SubTaskType>> getRunningSubTaskSpecs()
{
return taskMonitor == null ? Collections.emptyList() : taskMonitor.getRunningSubTaskSpecs();
}
@Override
public List<SubTaskSpec<SubTaskType>> getCompleteSubTaskSpecs()
{
return taskMonitor == null ? Collections.emptyList() : taskMonitor.getCompleteSubTaskSpecs();
}
@Nullable
@Override
public SubTaskSpec<SubTaskType> getSubTaskSpec(String subTaskSpecId)
{
if (taskMonitor != null) {
// Running tasks should be checked first because, in taskMonitor, subTaskSpecs are removed from runningTasks after
// adding them to taskHistory.
final MonitorEntry monitorEntry = taskMonitor.getRunningTaskMonitorEntry(subTaskSpecId);
final TaskHistory<SubTaskType> taskHistory = taskMonitor.getCompleteSubTaskSpecHistory(subTaskSpecId);
final SubTaskSpec<SubTaskType> subTaskSpec;
if (monitorEntry != null) {
subTaskSpec = monitorEntry.getSpec();
} else {
if (taskHistory != null) {
subTaskSpec = taskHistory.getSpec();
} else {
subTaskSpec = null;
}
}
return subTaskSpec;
} else {
return null;
}
}
@Nullable
@Override
public SubTaskSpecStatus getSubTaskState(String subTaskSpecId)
{
if (taskMonitor == null) {
return null;
} else {
// Running tasks should be checked first because, in taskMonitor, subTaskSpecs are removed from runningTasks after
// adding them to taskHistory.
final MonitorEntry monitorEntry = taskMonitor.getRunningTaskMonitorEntry(subTaskSpecId);
final TaskHistory<SubTaskType> taskHistory = taskMonitor.getCompleteSubTaskSpecHistory(subTaskSpecId);
final SubTaskSpecStatus subTaskSpecStatus;
if (monitorEntry != null) {
subTaskSpecStatus = new SubTaskSpecStatus(
(SinglePhaseSubTaskSpec) monitorEntry.getSpec(),
monitorEntry.getRunningStatus(),
monitorEntry.getTaskHistory()
);
} else {
if (taskHistory != null && !taskHistory.isEmpty()) {
subTaskSpecStatus = new SubTaskSpecStatus(
(SinglePhaseSubTaskSpec) taskHistory.getSpec(),
null,
taskHistory.getAttemptHistory()
);
} else {
subTaskSpecStatus = null;
}
}
return subTaskSpecStatus;
}
}
@Nullable
@Override
public TaskHistory<SubTaskType> getCompleteSubTaskSpecAttemptHistory(String subTaskSpecId)
{
if (taskMonitor == null) {
return null;
} else {
return taskMonitor.getCompleteSubTaskSpecHistory(subTaskSpecId);
}
}
String getTaskId()
{
return taskId;
}
String getGroupId()
{
return groupId;
}
Map<String, Object> getContext()
{
return context;
}
ParallelIndexTuningConfig getTuningConfig()
{
return tuningConfig;
}
@VisibleForTesting
TaskToolbox getToolbox()
{
return toolbox;
}
@VisibleForTesting
@Nullable
TaskMonitor<SubTaskType> getTaskMonitor()
{
return taskMonitor;
}
@VisibleForTesting
int getAndIncrementNextSpecId()
{
return nextSpecId++;
}
@VisibleForTesting
IndexingServiceClient getIndexingServiceClient()
{
return indexingServiceClient;
}
}