blob: 108e2c88b875b5f4870fc9787184a7b8b4d06f7b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.http;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
import org.apache.druid.curator.discovery.NoopServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
public class OverlordTest
{
private static final TaskLocation TASK_LOCATION = new TaskLocation("dummy", 1000, -1);
private TestingServer server;
private Timing timing;
private CuratorFramework curator;
private TaskMaster taskMaster;
private TaskLockbox taskLockbox;
private TaskStorage taskStorage;
private TaskActionClientFactory taskActionClientFactory;
private CountDownLatch announcementLatch;
private DruidNode druidNode;
private OverlordResource overlordResource;
private CountDownLatch[] taskCompletionCountDownLatches;
private CountDownLatch[] runTaskCountDownLatches;
private HttpServletRequest req;
private SupervisorManager supervisorManager;
private void setupServerAndCurator() throws Exception
{
server = new TestingServer();
timing = new Timing();
curator = CuratorFrameworkFactory
.builder()
.connectString(server.getConnectString())
.sessionTimeoutMs(timing.session())
.connectionTimeoutMs(timing.connection())
.retryPolicy(new RetryOneTime(1))
.compressionProvider(new PotentiallyGzippedCompressionProvider(true))
.build();
}
private void tearDownServerAndCurator()
{
CloseQuietly.close(curator);
CloseQuietly.close(server);
}
@Before
public void setUp() throws Exception
{
req = EasyMock.createMock(HttpServletRequest.class);
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
new AuthenticationResult("druid", "druid", null, null)
).anyTimes();
req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
EasyMock.expectLastCall().anyTimes();
supervisorManager = EasyMock.createMock(SupervisorManager.class);
taskLockbox = EasyMock.createStrictMock(TaskLockbox.class);
taskLockbox.syncFromStorage();
EasyMock.expectLastCall().atLeastOnce();
taskLockbox.add(EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
taskLockbox.remove(EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
// for second Noop Task directly added to deep storage.
taskLockbox.add(EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
taskLockbox.remove(EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
taskActionClientFactory = EasyMock.createStrictMock(TaskActionClientFactory.class);
EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject()))
.andReturn(null).anyTimes();
EasyMock.replay(taskLockbox, taskActionClientFactory, req);
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
runTaskCountDownLatches = new CountDownLatch[2];
runTaskCountDownLatches[0] = new CountDownLatch(1);
runTaskCountDownLatches[1] = new CountDownLatch(1);
taskCompletionCountDownLatches = new CountDownLatch[2];
taskCompletionCountDownLatches[0] = new CountDownLatch(1);
taskCompletionCountDownLatches[1] = new CountDownLatch(1);
announcementLatch = new CountDownLatch(1);
setupServerAndCurator();
curator.start();
curator.blockUntilConnected();
druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
ServiceEmitter serviceEmitter = new NoopServiceEmitter();
taskMaster = new TaskMaster(
new TaskLockConfig(),
new TaskQueueConfig(null, new Period(1), null, new Period(10)),
new DefaultTaskConfig(),
taskLockbox,
taskStorage,
taskActionClientFactory,
druidNode,
new TaskRunnerFactory<MockTaskRunner>()
{
@Override
public MockTaskRunner build()
{
return new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches);
}
},
new NoopServiceAnnouncer()
{
@Override
public void announce(DruidNode node)
{
announcementLatch.countDown();
}
},
new CoordinatorOverlordServiceConfig(null, null),
serviceEmitter,
supervisorManager,
EasyMock.createNiceMock(OverlordHelperManager.class),
new TestDruidLeaderSelector()
);
EmittingLogger.registerEmitter(serviceEmitter);
}
@Test(timeout = 60_000L)
public void testOverlordRun() throws Exception
{
// basic task master lifecycle test
taskMaster.start();
announcementLatch.await();
while (!taskMaster.isLeader()) {
// I believe the control will never reach here and thread will never sleep but just to be on safe side
Thread.sleep(10);
}
Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort());
final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox);
final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null);
// Test Overlord resource stuff
overlordResource = new OverlordResource(
taskMaster,
taskStorageQueryAdapter,
new IndexerMetadataStorageAdapter(taskStorageQueryAdapter, null),
null,
null,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
workerTaskRunnerQueryAdapter
);
Response response = overlordResource.getLeader();
Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity());
final String taskId_0 = "0";
NoopTask task_0 = NoopTask.create(taskId_0, 0);
response = overlordResource.taskPost(task_0, req);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity());
// Duplicate task - should fail
response = overlordResource.taskPost(task_0, req);
Assert.assertEquals(400, response.getStatus());
// Task payload for task_0 should be present in taskStorage
response = overlordResource.getTaskPayload(taskId_0);
Assert.assertEquals(task_0, ((TaskPayloadResponse) response.getEntity()).getPayload());
// Task not present in taskStorage - should fail
response = overlordResource.getTaskPayload("whatever");
Assert.assertEquals(404, response.getStatus());
// Task status of the submitted task should be running
response = overlordResource.getTaskStatus(taskId_0);
Assert.assertEquals(taskId_0, ((TaskStatusResponse) response.getEntity()).getTask());
Assert.assertEquals(
TaskStatus.running(taskId_0).getStatusCode(),
((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode()
);
// Simulate completion of task_0
taskCompletionCountDownLatches[Integer.parseInt(taskId_0)].countDown();
// Wait for taskQueue to handle success status of task_0
waitForTaskStatus(taskId_0, TaskState.SUCCESS);
// Manually insert task in taskStorage
// Verifies sync from storage
final String taskId_1 = "1";
NoopTask task_1 = NoopTask.create(taskId_1, 0);
taskStorage.insert(task_1, TaskStatus.running(taskId_1));
// Wait for task runner to run task_1
runTaskCountDownLatches[Integer.parseInt(taskId_1)].await();
response = overlordResource.getRunningTasks(null, req);
// 1 task that was manually inserted should be in running state
Assert.assertEquals(1, (((List) response.getEntity()).size()));
final TaskStatusPlus taskResponseObject = ((List<TaskStatusPlus>) response
.getEntity()).get(0);
Assert.assertEquals(taskId_1, taskResponseObject.getId());
Assert.assertEquals(TASK_LOCATION, taskResponseObject.getLocation());
// Simulate completion of task_1
taskCompletionCountDownLatches[Integer.parseInt(taskId_1)].countDown();
// Wait for taskQueue to handle success status of task_1
waitForTaskStatus(taskId_1, TaskState.SUCCESS);
// should return number of tasks which are not in running state
response = overlordResource.getCompleteTasks(null, req);
Assert.assertEquals(2, (((List) response.getEntity()).size()));
response = overlordResource.getCompleteTasks(1, req);
Assert.assertEquals(1, (((List) response.getEntity()).size()));
taskMaster.stop();
Assert.assertFalse(taskMaster.isLeader());
EasyMock.verify(taskLockbox, taskActionClientFactory);
}
/* Wait until the task with given taskId has the given Task Status
* These method will not timeout until the condition is met so calling method should ensure timeout
* This method also assumes that the task with given taskId is present
* */
private void waitForTaskStatus(String taskId, TaskState status) throws InterruptedException
{
while (true) {
Response response = overlordResource.getTaskStatus(taskId);
if (status.equals(((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode())) {
break;
}
Thread.sleep(10);
}
}
@After
public void tearDown()
{
tearDownServerAndCurator();
}
public static class MockTaskRunner implements TaskRunner
{
private CountDownLatch[] completionLatches;
private CountDownLatch[] runLatches;
private ConcurrentHashMap<String, TaskRunnerWorkItem> taskRunnerWorkItems;
private List<String> runningTasks;
public MockTaskRunner(CountDownLatch[] runLatches, CountDownLatch[] completionLatches)
{
this.runLatches = runLatches;
this.completionLatches = completionLatches;
this.taskRunnerWorkItems = new ConcurrentHashMap<>();
this.runningTasks = new ArrayList<>();
}
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
return ImmutableList.of();
}
@Override
public void registerListener(TaskRunnerListener listener, Executor executor)
{
// Overlord doesn't call this method
throw new UnsupportedOperationException();
}
@Override
public void unregisterListener(String listenerId)
{
// Overlord doesn't call this method
throw new UnsupportedOperationException();
}
@Override
public void stop()
{
// Do nothing
}
@Override
public synchronized ListenableFuture<TaskStatus> run(final Task task)
{
final String taskId = task.getId();
ListenableFuture<TaskStatus> future = MoreExecutors.listeningDecorator(
Execs.singleThreaded(
"noop_test_task_exec_%s"
)
).submit(
new Callable<TaskStatus>()
{
@Override
public TaskStatus call() throws Exception
{
// adding of task to list of runningTasks should be done before count down as
// getRunningTasks may not include the task for which latch has been counted down
// Count down to let know that task is actually running
// this is equivalent of getting process holder to run task in ForkingTaskRunner
runningTasks.add(taskId);
if (runLatches != null) {
runLatches[Integer.parseInt(taskId)].countDown();
}
// Wait for completion count down
if (completionLatches != null) {
completionLatches[Integer.parseInt(taskId)].await();
}
taskRunnerWorkItems.remove(taskId);
runningTasks.remove(taskId);
return TaskStatus.success(taskId);
}
}
);
TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(taskId, future)
{
@Override
public TaskLocation getLocation()
{
return TASK_LOCATION;
}
@Override
public String getTaskType()
{
return task.getType();
}
@Override
public String getDataSource()
{
return task.getDataSource();
}
};
taskRunnerWorkItems.put(taskId, taskRunnerWorkItem);
return future;
}
@Override
public void shutdown(String taskid, String reason)
{
}
@Override
public synchronized Collection<? extends TaskRunnerWorkItem> getRunningTasks()
{
return Lists.transform(
runningTasks,
new Function<String, TaskRunnerWorkItem>()
{
@Nullable
@Override
public TaskRunnerWorkItem apply(String input)
{
return taskRunnerWorkItems.get(input);
}
}
);
}
@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
return ImmutableList.of();
}
@Override
public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
{
return taskRunnerWorkItems.values();
}
@Override
public Optional<ScalingStats> getScalingStats()
{
return Optional.absent();
}
@Override
public long getTotalTaskSlotCount()
{
throw new UnsupportedOperationException();
}
@Override
public long getIdleTaskSlotCount()
{
throw new UnsupportedOperationException();
}
@Override
public long getUsedTaskSlotCount()
{
throw new UnsupportedOperationException();
}
@Override
public long getLazyTaskSlotCount()
{
throw new UnsupportedOperationException();
}
@Override
public long getBlacklistedTaskSlotCount()
{
throw new UnsupportedOperationException();
}
@Override
public void start()
{
//Do nothing
}
}
private static class TestDruidLeaderSelector implements DruidLeaderSelector
{
private volatile Listener listener;
private volatile String leader;
@Override
public String getCurrentLeader()
{
return leader;
}
@Override
public boolean isLeader()
{
return leader != null;
}
@Override
public int localTerm()
{
return 0;
}
@Override
public void registerListener(Listener listener)
{
this.listener = listener;
leader = "what:1234";
listener.becomeLeader();
}
@Override
public void unregisterListener()
{
leader = null;
listener.stopBeingLeader();
}
}
}