blob: 1caf74c6681b7050543c0ce20e4e2d7b09e96756 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Tests that {@link TaskQueue} is able to handle large numbers of concurrently-running tasks.
*/
public class TaskQueueScaleTest
{
private static final String DATASOURCE = "ds";
private final int numTasks = 1000;
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private TaskQueue taskQueue;
private TaskStorage taskStorage;
private TestTaskRunner taskRunner;
private Closer closer;
@Before
public void setUp()
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
closer = Closer.create();
// Be as realistic as possible; use actual classes for storage rather than mocks.
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(Period.hours(1)));
taskRunner = new TestTaskRunner();
closer.register(taskRunner::stop);
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final IndexerSQLMetadataStorageCoordinator storageCoordinator = new IndexerSQLMetadataStorageCoordinator(
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector()
);
final TaskActionClientFactory unsupportedTaskActionFactory =
task -> new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction)
{
throw new UnsupportedOperationException();
}
};
taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, Period.millis(1), null, null, null),
new DefaultTaskConfig(),
taskStorage,
taskRunner,
unsupportedTaskActionFactory, // Not used for anything serious
new TaskLockbox(taskStorage, storageCoordinator),
new NoopServiceEmitter(),
jsonMapper,
new NoopTaskContextEnricher()
);
taskQueue.start();
closer.register(taskQueue::stop);
}
@After
public void tearDown() throws Exception
{
closer.close();
}
@Test(timeout = 60_000L) // more than enough time if the task queue is efficient
public void doMassLaunchAndExit() throws Exception
{
Assert.assertEquals("no tasks should be running", 0, taskRunner.getKnownTasks().size());
Assert.assertEquals("no tasks should be known", 0, taskQueue.getTasks().size());
Assert.assertEquals("no tasks should be running", 0, taskQueue.getRunningTaskCount().size());
// Add all tasks.
for (int i = 0; i < numTasks; i++) {
final NoopTask testTask = createTestTask(2000L);
taskQueue.add(testTask);
}
// in theory we can get a race here, since we fetch the counts at separate times
Assert.assertEquals("all tasks should be known", numTasks, taskQueue.getTasks().size());
long runningTasks = taskQueue.getRunningTaskCount().values().stream().mapToLong(Long::longValue).sum();
long pendingTasks = taskQueue.getPendingTaskCount().values().stream().mapToLong(Long::longValue).sum();
long waitingTasks = taskQueue.getWaitingTaskCount().values().stream().mapToLong(Long::longValue).sum();
Assert.assertEquals("all tasks should be known", numTasks, (runningTasks + pendingTasks + waitingTasks));
// Wait for all tasks to finish.
final TaskLookup.CompleteTaskLookup completeTaskLookup =
TaskLookup.CompleteTaskLookup.of(numTasks, Duration.standardHours(1));
while (taskStorage.getTaskInfos(completeTaskLookup, DATASOURCE).size() < numTasks) {
Thread.sleep(100);
}
Thread.sleep(100);
Assert.assertEquals("no tasks should be active", 0, taskStorage.getActiveTasks().size());
runningTasks = taskQueue.getRunningTaskCount().values().stream().mapToLong(Long::longValue).sum();
pendingTasks = taskQueue.getPendingTaskCount().values().stream().mapToLong(Long::longValue).sum();
waitingTasks = taskQueue.getWaitingTaskCount().values().stream().mapToLong(Long::longValue).sum();
Assert.assertEquals("no tasks should be running", 0, runningTasks);
Assert.assertEquals("no tasks should be pending", 0, pendingTasks);
Assert.assertEquals("no tasks should be waiting", 0, waitingTasks);
}
@Test(timeout = 60_000L) // more than enough time if the task queue is efficient
public void doMassLaunchAndShutdown() throws Exception
{
Assert.assertEquals("no tasks should be running", 0, taskRunner.getKnownTasks().size());
// Add all tasks.
final List<String> taskIds = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
final NoopTask testTask = createTestTask(
Duration.standardHours(1).getMillis() /* very long runtime millis, so we can do a shutdown */
);
taskQueue.add(testTask);
taskIds.add(testTask.getId());
}
// wait for all tasks to progress to running state
while (taskStorage.getActiveTasks().size() < numTasks) {
Thread.sleep(100);
}
Assert.assertEquals("all tasks should be running", numTasks, taskStorage.getActiveTasks().size());
// Shut down all tasks.
for (final String taskId : taskIds) {
taskQueue.shutdown(taskId, "test shutdown");
}
// Wait for all tasks to finish.
while (!taskStorage.getActiveTasks().isEmpty()) {
Thread.sleep(100);
}
Assert.assertEquals("no tasks should be running", 0, taskStorage.getActiveTasks().size());
int completed = taskStorage.getTaskInfos(
TaskLookup.CompleteTaskLookup.of(numTasks, Duration.standardHours(1)),
DATASOURCE
).size();
Assert.assertEquals("all tasks should have completed", numTasks, completed);
}
private NoopTask createTestTask(long runtimeMillis)
{
return new NoopTask(null, null, DATASOURCE, runtimeMillis, 0, Collections.emptyMap());
}
private static class TestTaskRunner implements TaskRunner
{
private static final Logger log = new Logger(TestTaskRunner.class);
private static final Duration T_PENDING_TO_RUNNING = Duration.standardSeconds(2);
private static final Duration T_SHUTDOWN_ACK = Duration.millis(8);
private static final Duration T_SHUTDOWN_COMPLETE = Duration.standardSeconds(2);
@GuardedBy("knownTasks")
private final Map<String, TestTaskRunnerWorkItem> knownTasks = new HashMap<>();
private final ScheduledExecutorService exec = ScheduledExecutors.fixed(8, "TaskQueueScaleTest-%s");
@Override
public void start()
{
throw new UnsupportedOperationException();
}
@Override
public ListenableFuture<TaskStatus> run(Task task)
{
// Production task runners generally do not take a long time to execute "run", but may take a long time to
// go from "running" to "pending".
synchronized (knownTasks) {
final TestTaskRunnerWorkItem item = knownTasks.computeIfAbsent(task.getId(), TestTaskRunnerWorkItem::new);
exec.schedule(
() -> {
try {
synchronized (knownTasks) {
final TestTaskRunnerWorkItem item2 = knownTasks.get(task.getId());
if (item2.getState() == RunnerTaskState.PENDING) {
knownTasks.put(task.getId(), item2.withState(RunnerTaskState.RUNNING));
}
}
exec.schedule(
() -> {
try {
final TestTaskRunnerWorkItem item2;
synchronized (knownTasks) {
item2 = knownTasks.get(task.getId());
knownTasks.put(task.getId(), item2.withState(RunnerTaskState.NONE));
}
if (item2 != null) {
item2.setResult(TaskStatus.success(task.getId()));
}
}
catch (Throwable e) {
log.error(e, "Error in scheduled executor");
}
},
((NoopTask) task).getRunTime(),
TimeUnit.MILLISECONDS
);
}
catch (Throwable e) {
log.error(e, "Error in scheduled executor");
}
},
T_PENDING_TO_RUNNING.getMillis(),
TimeUnit.MILLISECONDS
);
return item.getResult();
}
}
@Override
public void shutdown(String taskid, String reason)
{
// Production task runners take a long time to execute "shutdown" if the task is currently running.
synchronized (knownTasks) {
if (!knownTasks.containsKey(taskid)) {
return;
}
}
threadSleep(T_SHUTDOWN_ACK);
final TestTaskRunnerWorkItem existingTask;
synchronized (knownTasks) {
existingTask = knownTasks.get(taskid);
}
if (!existingTask.getResult().isDone()) {
exec.schedule(() -> {
existingTask.setResult(TaskStatus.failure("taskId", "stopped"));
synchronized (knownTasks) {
knownTasks.remove(taskid);
}
}, T_SHUTDOWN_COMPLETE.getMillis(), TimeUnit.MILLISECONDS);
}
}
static void threadSleep(Duration duration)
{
try {
Thread.sleep(duration.getMillis());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
@Override
public void registerListener(TaskRunnerListener listener, Executor executor)
{
throw new UnsupportedOperationException();
}
@Override
public void unregisterListener(String listenerId)
{
throw new UnsupportedOperationException();
}
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
// Do nothing, and return null. (TaskQueue doesn't use the return value.)
return null;
}
@Override
public void stop()
{
exec.shutdownNow();
}
@Override
public Collection<? extends TaskRunnerWorkItem> getRunningTasks()
{
synchronized (knownTasks) {
return knownTasks.values()
.stream()
.filter(item -> item.getState() == RunnerTaskState.RUNNING)
.collect(Collectors.toList());
}
}
@Override
public Collection<? extends TaskRunnerWorkItem> getPendingTasks()
{
synchronized (knownTasks) {
return knownTasks.values()
.stream()
.filter(item -> item.getState() == RunnerTaskState.PENDING)
.collect(Collectors.toList());
}
}
@Override
public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
{
synchronized (knownTasks) {
return ImmutableList.copyOf(knownTasks.values());
}
}
@Override
public Optional<ScalingStats> getScalingStats()
{
throw new UnsupportedOperationException();
}
@Override
public Map<String, Long> getTotalTaskSlotCount()
{
throw new UnsupportedOperationException();
}
@Override
public Map<String, Long> getIdleTaskSlotCount()
{
throw new UnsupportedOperationException();
}
@Override
public Map<String, Long> getUsedTaskSlotCount()
{
throw new UnsupportedOperationException();
}
@Override
public Map<String, Long> getLazyTaskSlotCount()
{
throw new UnsupportedOperationException();
}
@Override
public Map<String, Long> getBlacklistedTaskSlotCount()
{
throw new UnsupportedOperationException();
}
}
private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final RunnerTaskState state;
public TestTaskRunnerWorkItem(final String taskId)
{
this(taskId, SettableFuture.create(), RunnerTaskState.PENDING);
}
private TestTaskRunnerWorkItem(
final String taskId,
final ListenableFuture<TaskStatus> result,
final RunnerTaskState state
)
{
super(taskId, result);
this.state = state;
}
public RunnerTaskState getState()
{
return state;
}
@Override
public TaskLocation getLocation()
{
return TaskLocation.unknown();
}
@Nullable
@Override
public String getTaskType()
{
throw new UnsupportedOperationException();
}
@Override
public String getDataSource()
{
throw new UnsupportedOperationException();
}
public void setResult(final TaskStatus result)
{
((SettableFuture<TaskStatus>) getResult()).set(result);
// possibly a parallel shutdown request was issued during the
// shutdown time; ignore it
}
public TestTaskRunnerWorkItem withState(final RunnerTaskState newState)
{
return new TestTaskRunnerWorkItem(getTaskId(), getResult(), newState);
}
}
}