blob: f888dd76bf0f225a5a70f7ffa5cc5b0ec31809c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.DataSegmentsWithSchemas;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CompressionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
{
static final String DISABLE_TASK_INJECT_CONTEXT_KEY = "disableInject";
static final TimestampSpec DEFAULT_TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
static final DimensionsSpec DEFAULT_DIMENSIONS_SPEC = new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim"))
);
static final AggregatorFactory[] DEFAULT_METRICS_SPEC = new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
};
static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec(
DEFAULT_TIMESTAMP_SPEC,
DEFAULT_DIMENSIONS_SPEC,
null,
Arrays.asList("ts", "dim", "val"),
false,
0
);
static final InputFormat DEFAULT_INPUT_FORMAT = new CsvInputFormat(
Arrays.asList("ts", "dim", "val"),
null,
false,
false,
0
);
public static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING =
new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
2,
null,
null,
null,
null,
null,
null,
null,
null,
5,
null,
null,
null,
null
);
protected static final double DEFAULT_TRANSIENT_TASK_FAILURE_RATE = 0.2;
protected static final double DEFAULT_TRANSIENT_API_FAILURE_RATE = 0.2;
private static final Logger LOG = new Logger(AbstractParallelIndexSupervisorTaskTest.class);
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public final TestName testName = new TestName();
/**
* Transient task failure rate emulated by the taskKiller in {@link SimpleThreadingTaskRunner}.
* Per {@link SubTaskSpec}, there could be at most one task failure.
*
* @see #DEFAULT_TRANSIENT_TASK_FAILURE_RATE
*/
private final double transientTaskFailureRate;
/**
* Transient API call failure rate emulated by {@link LocalParallelIndexSupervisorTaskClient}.
* This will be applied to every API calls in the future.
*
* @see #DEFAULT_TRANSIENT_API_FAILURE_RATE
*/
private final double transientApiCallFailureRate;
private File localDeepStorage;
private SimpleThreadingTaskRunner taskRunner;
private ObjectMapper objectMapper;
private LocalOverlordClient indexingServiceClient;
private IntermediaryDataManager intermediaryDataManager;
private CoordinatorClient coordinatorClient;
// An executor that executes API calls using a different thread from the caller thread as if they were remote calls.
private ExecutorService remoteApiExecutor;
protected AbstractParallelIndexSupervisorTaskTest(
double transientTaskFailureRate,
double transientApiCallFailureRate
)
{
this.transientTaskFailureRate = transientTaskFailureRate;
this.transientApiCallFailureRate = transientApiCallFailureRate;
}
@Before
public void setUpAbstractParallelIndexSupervisorTaskTest() throws IOException
{
localDeepStorage = temporaryFolder.newFolder("localStorage");
taskRunner = new SimpleThreadingTaskRunner(testName.getMethodName());
objectMapper = getObjectMapper();
indexingServiceClient = new LocalOverlordClient(objectMapper, taskRunner);
final TaskConfig taskConfig = new TaskConfigBuilder()
.setShuffleDataLocations(ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)))
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
intermediaryDataManager = new LocalIntermediaryDataManager(new WorkerConfig(), taskConfig, null);
remoteApiExecutor = Execs.singleThreaded("coordinator-api-executor");
coordinatorClient = new LocalCoordinatorClient(remoteApiExecutor);
prepareObjectMapper(objectMapper, getIndexIO());
}
@After
public void tearDownAbstractParallelIndexSupervisorTaskTest()
{
remoteApiExecutor.shutdownNow();
taskRunner.shutdown();
temporaryFolder.delete();
}
protected ParallelIndexTuningConfig newTuningConfig(
PartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks,
boolean forceGuaranteedRollup
)
{
return new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
new MaxSizeSplitHintSpec(null, 1),
partitionsSpec,
null,
null,
null,
forceGuaranteedRollup,
null,
null,
null,
null,
maxNumConcurrentSubTasks,
null,
null,
null,
null,
null,
null,
null,
null,
5,
null,
null,
null,
null
);
}
protected LocalOverlordClient getIndexingServiceClient()
{
return indexingServiceClient;
}
protected CoordinatorClient getCoordinatorClient()
{
return coordinatorClient;
}
protected static class TaskContainer
{
private final Task task;
@MonotonicNonNull
private volatile Future<TaskStatus> statusFuture;
@MonotonicNonNull
private volatile TestLocalTaskActionClient actionClient;
private final CountDownLatch taskFinishLatch;
private TaskContainer(Task task, CountDownLatch taskFinishLatch)
{
this.task = task;
this.taskFinishLatch = taskFinishLatch;
}
public Task getTask()
{
return task;
}
private void setStatusFuture(Future<TaskStatus> statusFuture)
{
this.statusFuture = statusFuture;
}
private void setActionClient(TestLocalTaskActionClient actionClient)
{
this.actionClient = actionClient;
}
}
public class SimpleThreadingTaskRunner
{
private final ConcurrentMap<String, TaskContainer> tasks = new ConcurrentHashMap<>();
private final ListeningExecutorService service;
private final ScheduledExecutorService taskKiller = Execs.scheduledSingleThreaded("simple-threading-task-killer");
private final Set<String> killedSubtaskSpecs = new HashSet<>();
private volatile boolean useTaskFinishLatches = false;
private final List<CountDownLatch> allTaskLatches = new ArrayList<>();
SimpleThreadingTaskRunner(String threadNameBase)
{
service = MoreExecutors.listeningDecorator(Execs.multiThreaded(5, threadNameBase + "-%d"));
taskKiller.scheduleAtFixedRate(
() -> {
for (TaskContainer taskContainer : tasks.values()) {
boolean kill = ThreadLocalRandom.current().nextDouble() < transientTaskFailureRate;
if (kill && !taskContainer.statusFuture.isDone()) {
String subtaskSpecId = taskContainer.task instanceof AbstractBatchSubtask
? ((AbstractBatchSubtask) taskContainer.task).getSubtaskSpecId()
: null;
if (subtaskSpecId != null && !killedSubtaskSpecs.contains(subtaskSpecId)) {
killedSubtaskSpecs.add(subtaskSpecId);
taskContainer.statusFuture.cancel(true);
LOG.info(
"Transient task failure test. Killed task[%s] for spec[%s]",
taskContainer.task.getId(),
subtaskSpecId
);
}
}
}
},
100,
100,
TimeUnit.MILLISECONDS
);
}
public void shutdown()
{
service.shutdownNow();
taskKiller.shutdownNow();
}
private TaskStatus runAndWait(Task task)
{
try {
// 20 minutes should be enough for the tasks to finish.
return runTask(task).get(20, TimeUnit.MINUTES);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
catch (ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}
private TaskStatus waitToFinish(Task task, long waitTime, TimeUnit timeUnit)
{
final TaskContainer taskContainer = tasks.get(task.getId());
if (taskContainer == null) {
throw new IAE("Unknown task[%s]", task.getId());
}
try {
while (taskContainer.statusFuture == null && !Thread.currentThread().isInterrupted()) {
Thread.sleep(10);
}
return taskContainer.statusFuture.get(waitTime, timeUnit);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
catch (ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}
private TaskContainer getTaskContainer(String taskId)
{
return tasks.get(taskId);
}
private void setUseTaskFinishLatches(boolean useLatches)
{
this.useTaskFinishLatches = useLatches;
if (!useLatches) {
countDownAllTaskLatches();
}
}
private void countDownAllTaskLatches()
{
allTaskLatches.forEach(latch -> {
if (latch.getCount() > 0) {
latch.countDown();
}
});
}
private Future<TaskStatus> runTask(Task task)
{
final CountDownLatch taskFinishLatch = useTaskFinishLatches ? new CountDownLatch(1) : new CountDownLatch(0);
allTaskLatches.add(taskFinishLatch);
final TaskContainer taskContainer = new TaskContainer(task, taskFinishLatch);
if (tasks.put(task.getId(), taskContainer) != null) {
throw new ISE("Duplicate task ID[%s]", task.getId());
}
prepareTaskForLocking(task);
task.addToContextIfAbsent(
SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
);
final ListenableFuture<TaskStatus> statusFuture = service.submit(
() -> {
try {
final TestLocalTaskActionClient actionClient = createActionClient(task);
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
taskContainer.setActionClient(actionClient);
if (task.isReady(toolbox.getTaskActionClient())) {
TaskStatus status = task.run(toolbox);
taskFinishLatch.await();
return status;
} else {
getTaskStorage().setStatus(TaskStatus.failure(task.getId(), "Dummy task status failure for testing"));
throw new ISE("task[%s] is not ready", task.getId());
}
}
catch (Exception e) {
getTaskStorage().setStatus(TaskStatus.failure(task.getId(), e.getMessage()));
throw new RuntimeException(e);
}
}
);
taskContainer.setStatusFuture(statusFuture);
final ListenableFuture<TaskStatus> cleanupFuture = Futures.transform(
statusFuture,
status -> {
shutdownTask(task);
return status;
},
MoreExecutors.directExecutor()
);
return cleanupFuture;
}
private void cancel(String taskId)
{
final TaskContainer taskContainer = tasks.remove(taskId);
if (taskContainer != null && taskContainer.statusFuture != null) {
taskContainer.statusFuture.cancel(true);
}
}
@Nullable
public TaskStatus getStatus(String taskId)
{
final TaskContainer taskContainer = tasks.get(taskId);
if (taskContainer != null && taskContainer.statusFuture != null) {
try {
if (taskContainer.statusFuture.isDone()) {
return taskContainer.statusFuture.get();
} else {
return TaskStatus.running(taskId);
}
}
catch (InterruptedException | ExecutionException | CancellationException e) {
// We don't have a way to propagate this exception to the supervisorTask yet..
// So, let's print it here.
LOG.error(e, "Task[%s] failed", taskId);
return TaskStatus.failure(taskId, e.getMessage());
}
} else {
return null;
}
}
public DataSegmentsWithSchemas getPublishedSegmentsWithSchemas(String taskId)
{
final TaskContainer taskContainer = tasks.get(taskId);
if (taskContainer == null || taskContainer.actionClient == null) {
return new DataSegmentsWithSchemas(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
} else {
return new DataSegmentsWithSchemas(taskContainer.actionClient.getPublishedSegments(), taskContainer.actionClient.getSegmentSchemas());
}
}
}
public class LocalOverlordClient extends NoopOverlordClient
{
private final ObjectMapper objectMapper;
private final SimpleThreadingTaskRunner taskRunner;
public LocalOverlordClient(ObjectMapper objectMapper, SimpleThreadingTaskRunner taskRunner)
{
this.objectMapper = objectMapper;
this.taskRunner = taskRunner;
}
@Override
public ListenableFuture<Void> runTask(String taskId, Object taskObject)
{
final Task task = (Task) taskObject;
taskRunner.runTask(injectIfNeeded(task));
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<TaskReport.ReportMap> taskReportAsMap(String taskId)
{
return Futures.immediateFuture(getLiveReportsForTask(taskId));
}
private TaskReport.ReportMap getLiveReportsForTask(String taskId)
{
final Optional<Task> taskOptional = getTaskStorage().getTask(taskId);
if (!taskOptional.isPresent()) {
return null;
}
final Task task = taskOptional.get();
if (task instanceof SinglePhaseSubTask) {
return ((SinglePhaseSubTask) task).doGetLiveReports(true);
} else {
return ((ParallelIndexSupervisorTask) task).doGetLiveReports(true);
}
}
public TaskContainer getTaskContainer(String taskId)
{
return taskRunner.getTaskContainer(taskId);
}
public void keepTasksRunning()
{
taskRunner.setUseTaskFinishLatches(true);
}
public void allowTasksToFinish()
{
taskRunner.setUseTaskFinishLatches(false);
}
public TaskStatus runAndWait(Task task)
{
return taskRunner.runAndWait(injectIfNeeded(task));
}
public TaskStatus waitToFinish(Task task, long timeout, TimeUnit timeUnit)
{
return taskRunner.waitToFinish(task, timeout, timeUnit);
}
private Task injectIfNeeded(Task task)
{
if (!task.getContextValue(DISABLE_TASK_INJECT_CONTEXT_KEY, false)) {
try {
final byte[] json = objectMapper.writeValueAsBytes(task);
return objectMapper.readValue(json, Task.class);
}
catch (IOException e) {
LOG.error(e, "Error while serializing and deserializing task spec");
throw new RuntimeException(e);
}
} else {
return task;
}
}
@Override
public ListenableFuture<Void> cancelTask(String taskId)
{
taskRunner.cancel(taskId);
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<TaskStatusResponse> taskStatus(String taskId)
{
final Optional<Task> task = getTaskStorage().getTask(taskId);
final String groupId = task.isPresent() ? task.get().getGroupId() : null;
final String taskType = task.isPresent() ? task.get().getType() : null;
final TaskStatus taskStatus = taskRunner.getStatus(taskId);
if (taskStatus != null) {
final TaskStatusResponse retVal = new TaskStatusResponse(
taskId,
new TaskStatusPlus(
taskId,
groupId,
taskType,
DateTimes.EPOCH,
DateTimes.EPOCH,
taskStatus.getStatusCode(),
taskStatus.isComplete() ? RunnerTaskState.NONE : RunnerTaskState.RUNNING,
-1L,
TaskLocation.unknown(),
null,
null
)
);
return Futures.immediateFuture(retVal);
} else {
return Futures.immediateFuture(new TaskStatusResponse(taskId, null));
}
}
public DataSegmentsWithSchemas getSegmentAndSchemas(Task task)
{
return taskRunner.getPublishedSegmentsWithSchemas(task.getId());
}
}
public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO)
{
final TaskConfig taskConfig = new TaskConfigBuilder()
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
objectMapper.setInjectableValues(
new InjectableValues.Std()
.addValue(ExprMacroTable.class, LookupEnabledTestExprMacroTable.INSTANCE)
.addValue(IndexIO.class, indexIO)
.addValue(ObjectMapper.class, objectMapper)
.addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider())
.addValue(AuthConfig.class, new AuthConfig())
.addValue(AuthorizerMapper.class, null)
.addValue(RowIngestionMetersFactory.class, new DropwizardRowIngestionMetersFactory())
.addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT)
.addValue(AuthorizerMapper.class, new AuthorizerMapper(ImmutableMap.of()))
.addValue(AppenderatorsManager.class, TestUtils.APPENDERATORS_MANAGER)
.addValue(LocalDataSegmentPuller.class, new LocalDataSegmentPuller())
.addValue(CoordinatorClient.class, coordinatorClient)
.addValue(SegmentCacheManagerFactory.class, new SegmentCacheManagerFactory(objectMapper))
.addValue(RetryPolicyFactory.class, new RetryPolicyFactory(new RetryPolicyConfig()))
.addValue(TaskConfig.class, taskConfig)
);
objectMapper.registerSubtypes(
new NamedType(ParallelIndexSupervisorTask.class, ParallelIndexSupervisorTask.TYPE),
new NamedType(CompactionTask.CompactionTuningConfig.class, CompactionTask.CompactionTuningConfig.TYPE),
new NamedType(SinglePhaseSubTask.class, SinglePhaseSubTask.TYPE),
new NamedType(PartialHashSegmentGenerateTask.class, PartialHashSegmentGenerateTask.TYPE),
new NamedType(PartialRangeSegmentGenerateTask.class, PartialRangeSegmentGenerateTask.TYPE),
new NamedType(PartialGenericSegmentMergeTask.class, PartialGenericSegmentMergeTask.TYPE),
new NamedType(PartialDimensionDistributionTask.class, PartialDimensionDistributionTask.TYPE),
new NamedType(PartialDimensionCardinalityTask.class, PartialDimensionCardinalityTask.TYPE)
);
}
protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException
{
TaskConfig config = new TaskConfigBuilder()
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig = CentralizedDatasourceSchemaConfig.create(true);
return new TaskToolbox.Builder()
.config(config)
.taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false))
.taskActionClient(actionClient)
.segmentPusher(
new LocalDataSegmentPusher(
new LocalDataSegmentPusherConfig()
{
@Override
public File getStorageDirectory()
{
return localDeepStorage;
}
}
)
)
.dataSegmentKiller(new NoopDataSegmentKiller())
.joinableFactory(NoopJoinableFactory.INSTANCE)
.segmentCacheManager(newSegmentLoader(temporaryFolder.newFolder()))
.jsonMapper(objectMapper)
.taskWorkDir(temporaryFolder.newFolder(task.getId()))
.indexIO(getIndexIO())
.indexMergerV9(getIndexMergerV9Factory().create(task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, true)))
.intermediaryDataManager(intermediaryDataManager)
.taskReportFileWriter(new SingleFileTaskReportFileWriter(reportsFile))
.authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
.chatHandlerProvider(new NoopChatHandlerProvider())
.rowIngestionMetersFactory(new TestUtils().getRowIngestionMetersFactory())
.appenderatorsManager(new TestAppenderatorsManager())
.overlordClient(indexingServiceClient)
.coordinatorClient(coordinatorClient)
.supervisorTaskClientProvider(new LocalParallelIndexTaskClientProvider(taskRunner, transientApiCallFailureRate))
.shuffleClient(new LocalShuffleClient(intermediaryDataManager))
.taskLogPusher(null)
.attemptId("1")
.emitter(new StubServiceEmitter())
.centralizedTableSchemaConfig(centralizedDatasourceSchemaConfig)
.build();
}
static class TestParallelIndexSupervisorTask extends ParallelIndexSupervisorTask
{
TestParallelIndexSupervisorTask(
String id,
TaskResource taskResource,
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context
)
{
super(
id,
null,
taskResource,
ingestionSchema,
context
);
}
}
static class LocalShuffleClient implements ShuffleClient<GenericPartitionLocation>
{
private final IntermediaryDataManager intermediaryDataManager;
LocalShuffleClient(IntermediaryDataManager intermediaryDataManager)
{
this.intermediaryDataManager = intermediaryDataManager;
}
@Override
public File fetchSegmentFile(
File partitionDir,
String supervisorTaskId,
GenericPartitionLocation location
) throws IOException
{
final java.util.Optional<ByteSource> zippedFile = intermediaryDataManager.findPartitionFile(
supervisorTaskId,
location.getSubTaskId(),
location.getInterval(),
location.getBucketId()
);
if (!zippedFile.isPresent()) {
throw new ISE("Can't find segment file for location[%s] at path[%s]", location);
}
final File fetchedFile = new File(partitionDir, StringUtils.format("temp_%s", location.getSubTaskId()));
FileUtils.writeAtomically(
fetchedFile,
out -> zippedFile.get().copyTo(out)
);
final File unzippedDir = new File(partitionDir, StringUtils.format("unzipped_%s", location.getSubTaskId()));
try {
FileUtils.mkdirp(unzippedDir);
CompressionUtils.unzip(fetchedFile, unzippedDir);
}
finally {
if (!fetchedFile.delete()) {
LOG.warn("Failed to delete temp file[%s]", zippedFile);
}
}
return unzippedDir;
}
}
protected TaskReport.ReportMap buildExpectedTaskReportSequential(
String taskId,
List<ParseExceptionReport> expectedUnparseableEvents,
RowIngestionMetersTotals expectedDeterminePartitions,
RowIngestionMetersTotals expectedTotals
)
{
final Map<String, Object> unparseableEvents =
ImmutableMap.of("determinePartitions", ImmutableList.of(), "buildSegments", expectedUnparseableEvents);
Map<String, Object> emptyAverageMinuteMap = ImmutableMap.of(
"processed", 0.0,
"processedBytes", 0.0,
"unparseable", 0.0,
"thrownAway", 0.0,
"processedWithError", 0.0
);
Map<String, Object> emptyAverages = ImmutableMap.of(
"1m", emptyAverageMinuteMap,
"5m", emptyAverageMinuteMap,
"15m", emptyAverageMinuteMap
);
final Map<String, Object> rowStats = ImmutableMap.of(
"movingAverages",
ImmutableMap.of("determinePartitions", emptyAverages, "buildSegments", emptyAverages),
"totals",
ImmutableMap.of("determinePartitions", expectedDeterminePartitions, "buildSegments", expectedTotals)
);
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
taskId,
new IngestionStatsAndErrors(
IngestionState.COMPLETED,
unparseableEvents,
rowStats,
null,
false,
0L,
null,
null,
null
)
)
);
}
protected TaskReport.ReportMap buildExpectedTaskReportParallel(
String taskId,
List<ParseExceptionReport> expectedUnparseableEvents,
RowIngestionMetersTotals expectedTotals
)
{
Map<String, Object> unparseableEvents = ImmutableMap.of("buildSegments", expectedUnparseableEvents);
Map<String, Object> rowStats = ImmutableMap.of("totals", ImmutableMap.of("buildSegments", expectedTotals));
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
taskId,
new IngestionStatsAndErrors(
IngestionState.COMPLETED,
unparseableEvents,
rowStats,
null,
false,
0L,
null,
null,
null
)
)
);
}
protected void compareTaskReports(
TaskReport.ReportMap expectedReports,
TaskReport.ReportMap actualReports
)
{
final java.util.Optional<IngestionStatsAndErrorsTaskReport> expectedReportOptional
= expectedReports.findReport("ingestionStatsAndErrors");
final java.util.Optional<IngestionStatsAndErrorsTaskReport> actualReportOptional
= actualReports.findReport("ingestionStatsAndErrors");
Assert.assertTrue(expectedReportOptional.isPresent());
Assert.assertTrue(actualReportOptional.isPresent());
final IngestionStatsAndErrorsTaskReport expectedReport = expectedReportOptional.get();
final IngestionStatsAndErrorsTaskReport actualReport = actualReportOptional.get();
Assert.assertEquals(expectedReport.getTaskId(), actualReport.getTaskId());
Assert.assertEquals(expectedReport.getReportKey(), actualReport.getReportKey());
final IngestionStatsAndErrors expectedPayload = expectedReport.getPayload();
final IngestionStatsAndErrors actualPayload = actualReport.getPayload();
Assert.assertEquals(expectedPayload.getIngestionState(), actualPayload.getIngestionState());
Map<String, Object> expectedTotals = expectedPayload.getRowStats();
Map<String, Object> actualTotals = actualPayload.getRowStats();
Assert.assertEquals(expectedTotals, actualTotals);
List<ParseExceptionReport> expectedParseExceptionReports =
(List<ParseExceptionReport>) (expectedPayload.getUnparseableEvents()).get("buildSegments");
List<ParseExceptionReport> actualParseExceptionReports =
(List<ParseExceptionReport>) (actualPayload.getUnparseableEvents()).get("buildSegments");
List<String> expectedMessages = expectedParseExceptionReports
.stream().map(r -> r.getDetails().get(0)).collect(Collectors.toList());
List<String> actualMessages = actualParseExceptionReports
.stream().map(r -> r.getDetails().get(0)).collect(Collectors.toList());
Assert.assertEquals(expectedMessages, actualMessages);
List<String> expectedInputs = expectedParseExceptionReports
.stream().map(ParseExceptionReport::getInput).collect(Collectors.toList());
List<String> actualInputs = actualParseExceptionReports
.stream().map(ParseExceptionReport::getInput).collect(Collectors.toList());
Assert.assertEquals(expectedInputs, actualInputs);
}
static class LocalParallelIndexTaskClientProvider implements ParallelIndexSupervisorTaskClientProvider
{
private final ConcurrentMap<String, TaskContainer> tasks;
private final double transientApiCallFailureRate;
LocalParallelIndexTaskClientProvider(SimpleThreadingTaskRunner taskRunner, double transientApiCallFailureRate)
{
this.tasks = taskRunner.tasks;
this.transientApiCallFailureRate = transientApiCallFailureRate;
}
@Override
public ParallelIndexSupervisorTaskClient build(String supervisorTaskId, Duration httpTimeout, long numRetries)
{
return new LocalParallelIndexSupervisorTaskClient(supervisorTaskId, tasks, transientApiCallFailureRate);
}
}
static class LocalParallelIndexSupervisorTaskClient implements ParallelIndexSupervisorTaskClient
{
private static final int MAX_TRANSIENT_API_FAILURES = 3;
private final String supervisorTaskId;
private final double transientFailureRate;
private final ConcurrentMap<String, TaskContainer> tasks;
LocalParallelIndexSupervisorTaskClient(
String supervisorTaskId,
ConcurrentMap<String, TaskContainer> tasks,
double transientFailureRate
)
{
this.supervisorTaskId = supervisorTaskId;
this.tasks = tasks;
this.transientFailureRate = transientFailureRate;
}
@Override
public SegmentIdWithShardSpec allocateSegment(DateTime timestamp) throws IOException
{
final TaskContainer taskContainer = tasks.get(supervisorTaskId);
final ParallelIndexSupervisorTask supervisorTask = findSupervisorTask(taskContainer);
if (supervisorTask == null) {
throw new ISE("Cannot find supervisor task for [%s]", supervisorTaskId);
}
if (!(supervisorTask.getCurrentRunner() instanceof SinglePhaseParallelIndexTaskRunner)) {
throw new ISE("Only SinglePhaseParallelIndexTaskRunner can call this API");
}
SinglePhaseParallelIndexTaskRunner runner =
(SinglePhaseParallelIndexTaskRunner) supervisorTask.getCurrentRunner();
return runner.allocateNewSegment(supervisorTask.getDataSource(), timestamp);
}
@Override
public SegmentIdWithShardSpec allocateSegment(
DateTime timestamp,
String sequenceName,
@Nullable String prevSegmentId
) throws IOException
{
final TaskContainer taskContainer = tasks.get(supervisorTaskId);
final ParallelIndexSupervisorTask supervisorTask = findSupervisorTask(taskContainer);
if (supervisorTask == null) {
throw new ISE("Cannot find supervisor task for [%s]", supervisorTaskId);
}
if (!(supervisorTask.getCurrentRunner() instanceof SinglePhaseParallelIndexTaskRunner)) {
throw new ISE("Only SinglePhaseParallelIndexTaskRunner can call this API");
}
SinglePhaseParallelIndexTaskRunner runner =
(SinglePhaseParallelIndexTaskRunner) supervisorTask.getCurrentRunner();
SegmentIdWithShardSpec newSegmentId = null;
int i = 0;
do {
SegmentIdWithShardSpec allocated = runner.allocateNewSegment(
supervisorTask.getDataSource(),
timestamp,
sequenceName,
prevSegmentId
);
if (newSegmentId == null) {
newSegmentId = allocated;
}
if (!newSegmentId.equals(allocated)) {
throw new ISE(
"Segment allocation is not idempotent. Prev id was [%s] but new id is [%s]",
newSegmentId,
allocated
);
}
} while (i++ < MAX_TRANSIENT_API_FAILURES && ThreadLocalRandom.current().nextDouble() < transientFailureRate);
return newSegmentId;
}
@Override
public void report(SubTaskReport report)
{
final TaskContainer taskContainer = tasks.get(supervisorTaskId);
final ParallelIndexSupervisorTask supervisorTask = findSupervisorTask(taskContainer);
if (supervisorTask == null) {
throw new ISE("Cannot find supervisor task for [%s]", supervisorTaskId);
}
int i = 0;
do {
supervisorTask.getCurrentRunner().collectReport(report);
} while (i++ < MAX_TRANSIENT_API_FAILURES && ThreadLocalRandom.current().nextDouble() < transientFailureRate);
}
@Nullable
private ParallelIndexSupervisorTask findSupervisorTask(TaskContainer taskContainer)
{
if (taskContainer == null) {
return null;
}
if (taskContainer.task instanceof CompactionTask) {
final Task task = ((CompactionTask) taskContainer.task).getCurrentSubTaskHolder().getTask();
if (!(task instanceof ParallelIndexSupervisorTask)) {
return null;
} else {
return (ParallelIndexSupervisorTask) task;
}
} else if (!(taskContainer.task instanceof ParallelIndexSupervisorTask)) {
return null;
} else {
return (ParallelIndexSupervisorTask) taskContainer.task;
}
}
}
class LocalCoordinatorClient extends NoopCoordinatorClient
{
private final ListeningExecutorService exec;
LocalCoordinatorClient(ExecutorService exec)
{
this.exec = MoreExecutors.listeningDecorator(exec);
}
@Override
public ListenableFuture<List<DataSegment>> fetchUsedSegments(
String dataSource,
List<Interval> intervals
)
{
return exec.submit(
() -> ImmutableList.copyOf(
getStorageCoordinator().retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE)
)
);
}
@Override
public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId, boolean includeUnused)
{
ImmutableDruidDataSource druidDataSource;
try {
druidDataSource = exec.submit(
() -> getSegmentsMetadataManager().getImmutableDataSourceWithUsedSegments(dataSource)
).get();
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
if (druidDataSource == null) {
throw new ISE("Unknown datasource[%s]", dataSource);
}
for (SegmentId possibleSegmentId : SegmentId.iteratePossibleParsingsWithDataSource(dataSource, segmentId)) {
DataSegment segment = druidDataSource.getSegment(possibleSegmentId);
if (segment != null) {
return Futures.immediateFuture(segment);
}
}
throw new ISE("Can't find segment for id[%s]", segmentId);
}
}
}