blob: c4751669b272e6cc5dfd73820ace860d4bf164e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.http;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.chrono.ISOChronology;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class OverlordResourceTest
{
private OverlordResource overlordResource;
private TaskMaster taskMaster;
private TaskStorageQueryAdapter taskStorageQueryAdapter;
private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
private HttpServletRequest req;
private TaskRunner taskRunner;
private WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setUp()
{
taskRunner = EasyMock.createMock(TaskRunner.class);
taskMaster = EasyMock.createStrictMock(TaskMaster.class);
taskStorageQueryAdapter = EasyMock.createStrictMock(TaskStorageQueryAdapter.class);
indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class);
req = EasyMock.createStrictMock(HttpServletRequest.class);
workerTaskRunnerQueryAdapter = EasyMock.createStrictMock(WorkerTaskRunnerQueryAdapter.class);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.of(taskRunner)
).anyTimes();
AuthorizerMapper authMapper = new AuthorizerMapper(null)
{
@Override
public Authorizer getAuthorizer(String name)
{
return new Authorizer()
{
@Override
public Access authorize(AuthenticationResult authenticationResult, Resource resource, Action action)
{
if (resource.getName().equals("allow")) {
return new Access(true);
} else {
return new Access(false);
}
}
};
}
};
overlordResource = new OverlordResource(
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
null,
null,
null,
authMapper,
workerTaskRunnerQueryAdapter
);
}
@After
public void tearDown()
{
EasyMock.verify(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
}
@Test
public void testLeader()
{
EasyMock.expect(taskMaster.getCurrentLeader()).andReturn("boz").once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
final Response response = overlordResource.getLeader();
Assert.assertEquals("boz", response.getEntity());
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testIsLeader()
{
EasyMock.expect(taskMaster.isLeader()).andReturn(true).once();
EasyMock.expect(taskMaster.isLeader()).andReturn(false).once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
// true
final Response response1 = overlordResource.isLeader();
Assert.assertEquals(ImmutableMap.of("leader", true), response1.getEntity());
Assert.assertEquals(200, response1.getStatus());
// false
final Response response2 = overlordResource.isLeader();
Assert.assertEquals(ImmutableMap.of("leader", false), response2.getEntity());
Assert.assertEquals(404, response2.getStatus());
}
@Test
public void testSecuredGetWaitingTask()
{
expectAuthorizationTokenCheck();
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"deny",
getTaskWithIdAndDatasource("id_3", "deny")
),
new TaskInfo(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"),
"deny",
getTaskWithIdAndDatasource("id_4", "deny")
)
)
);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null),
new MockTaskRunnerWorkItem("id_4", null)
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource.getWaitingTasks(req)
.getEntity();
Assert.assertEquals(1, responseObjects.size());
Assert.assertEquals("id_2", responseObjects.get(0).getId());
}
@Test
public void testSecuredGetCompleteTasks()
{
expectAuthorizationTokenCheck();
List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3");
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem(tasksIds.get(0), null),
new MockTaskRunnerWorkItem(tasksIds.get(1), null),
new MockTaskRunnerWorkItem(tasksIds.get(2), null)
));
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"deny",
getTaskWithIdAndDatasource("id_1", "deny")
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
)
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
Assert.assertTrue(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null).size() == 3);
Assert.assertTrue(taskRunner.getRunningTasks().size() == 3);
List<TaskStatusPlus> responseObjects = (List) overlordResource
.getCompleteTasks(null, req).getEntity();
Assert.assertEquals(2, responseObjects.size());
Assert.assertEquals(tasksIds.get(1), responseObjects.get(0).getId());
Assert.assertEquals(tasksIds.get(2), responseObjects.get(1).getId());
}
@Test
public void testSecuredGetRunningTasks()
{
expectAuthorizationTokenCheck();
List<String> tasksIds = ImmutableList.of("id_1", "id_2");
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem(tasksIds.get(0), null),
new MockTaskRunnerWorkItem(tasksIds.get(1), null)
)
);
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"deny",
getTaskWithIdAndDatasource("id_1", "deny")
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
)
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
List<TaskStatusPlus> responseObjects = (List) overlordResource.getRunningTasks(null, req)
.getEntity();
Assert.assertEquals(1, responseObjects.size());
Assert.assertEquals(tasksIds.get(1), responseObjects.get(0).getId());
}
@Test
public void testGetTasks()
{
expectAuthorizationTokenCheck();
//completed tasks
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_5",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_5"),
"deny",
getTaskWithIdAndDatasource("id_5", "deny")
),
new TaskInfo(
"id_6",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_6"),
"allow",
getTaskWithIdAndDatasource("id_6", "allow")
),
new TaskInfo(
"id_7",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_7"),
"allow",
getTaskWithIdAndDatasource("id_7", "allow")
)
)
);
//active tasks
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"deny",
getTaskWithIdAndDatasource("id_3", "deny")
),
new TaskInfo(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"),
"deny",
getTaskWithIdAndDatasource("id_4", "deny")
)
)
);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null),
new MockTaskRunnerWorkItem("id_4", null)
)
).atLeastOnce();
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getPendingTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_4", null)
)
);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null)
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks(null, null, null, null, null, req)
.getEntity();
Assert.assertEquals(4, responseObjects.size());
}
@Test
public void testGetTasksFilterDataSource()
{
expectAuthorizationTokenCheck();
//completed tasks
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, "allow"))
.andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_5",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_5"),
"allow",
getTaskWithIdAndDatasource("id_5", "allow")
),
new TaskInfo(
"id_6",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_6"),
"allow",
getTaskWithIdAndDatasource("id_6", "allow")
),
new TaskInfo(
"id_7",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_7"),
"allow",
getTaskWithIdAndDatasource("id_7", "allow")
)
)
);
//active tasks
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
),
new TaskInfo(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"),
"allow",
getTaskWithIdAndDatasource("id_4", "allow")
)
)
);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null),
new MockTaskRunnerWorkItem("id_4", null)
)
).atLeastOnce();
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getPendingTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_4", null)
)
);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null)
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks(null, "allow", null, null, null, req)
.getEntity();
Assert.assertEquals(7, responseObjects.size());
Assert.assertEquals("id_5", responseObjects.get(0).getId());
Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource()));
}
@Test
public void testGetTasksFilterWaitingState()
{
expectAuthorizationTokenCheck();
//active tasks
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"deny",
getTaskWithIdAndDatasource("id_3", "deny")
),
new TaskInfo(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"),
"deny",
getTaskWithIdAndDatasource("id_4", "deny")
)
)
);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null),
new MockTaskRunnerWorkItem("id_4", null)
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks(
"waiting",
null,
null,
null,
null,
req
).getEntity();
Assert.assertEquals(1, responseObjects.size());
Assert.assertEquals("id_2", responseObjects.get(0).getId());
}
@Test
public void testGetTasksFilterRunningState()
{
expectAuthorizationTokenCheck();
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
),
new TaskInfo(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"),
"deny",
getTaskWithIdAndDatasource("id_4", "deny")
)
)
);
List<String> tasksIds = ImmutableList.of("id_1", "id_2");
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem(tasksIds.get(0), null),
new MockTaskRunnerWorkItem(tasksIds.get(1), null)
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
List<TaskStatusPlus> responseObjects = (List) overlordResource
.getTasks("running", "allow", null, null, null, req)
.getEntity();
Assert.assertEquals(2, responseObjects.size());
Assert.assertEquals(tasksIds.get(0), responseObjects.get(0).getId());
String ds = responseObjects.get(0).getDataSource();
Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource()));
}
@Test
public void testGetTasksFilterPendingState()
{
expectAuthorizationTokenCheck();
List<String> tasksIds = ImmutableList.of("id_1", "id_2");
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getPendingTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem(tasksIds.get(0), null),
new MockTaskRunnerWorkItem(tasksIds.get(1), null)
)
);
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"deny",
getTaskWithIdAndDatasource("id_1", "deny")
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
),
new TaskInfo(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"),
"deny",
getTaskWithIdAndDatasource("id_4", "deny")
)
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks("pending", null, null, null, null, req)
.getEntity();
Assert.assertEquals(1, responseObjects.size());
Assert.assertEquals(tasksIds.get(1), responseObjects.get(0).getId());
String ds = responseObjects.get(0).getDataSource();
//Assert.assertTrue("DataSource Check", "ds_test".equals(responseObjects.get(0).getDataSource()));
}
@Test
public void testGetTasksFilterCompleteState()
{
expectAuthorizationTokenCheck();
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"deny",
getTaskWithIdAndDatasource("id_2", "deny")
),
new TaskInfo(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
)
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks("complete", null, null, null, null, req)
.getEntity();
Assert.assertEquals(2, responseObjects.size());
Assert.assertEquals("id_1", responseObjects.get(0).getId());
Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource()));
}
@Test
public void testGetTasksFilterCompleteStateWithInterval()
{
expectAuthorizationTokenCheck();
List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3");
Duration duration = new Period("PT86400S").toStandardDuration();
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, duration, null))
.andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"deny",
getTaskWithIdAndDatasource("id_1", "deny")
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
)
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
String interval = "2010-01-01_P1D";
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks("complete", null, interval, null, null, req)
.getEntity();
Assert.assertEquals(2, responseObjects.size());
Assert.assertEquals("id_2", responseObjects.get(0).getId());
Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource()));
}
@Test
public void testGetNullCompleteTask()
{
expectAuthorizationTokenCheck();
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
null
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"deny",
getTaskWithIdAndDatasource("id_2", "deny")
),
new TaskInfo(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
)
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks("complete", null, null, null, null, req)
.getEntity();
Assert.assertEquals(2, responseObjects.size());
Assert.assertEquals("id_1", responseObjects.get(0).getId());
TaskStatusPlus tsp = responseObjects.get(0);
Assert.assertEquals(null, tsp.getType());
Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource()));
}
@Test
public void testGetTasksNegativeState()
{
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
Object responseObject = overlordResource
.getTasks("blah", "ds_test", null, null, null, req)
.getEntity();
Assert.assertEquals(
"Invalid state : blah, valid values are: [pending, waiting, running, complete]",
responseObject.toString()
);
}
@Test
public void testSecuredTaskPost()
{
expectedException.expect(ForbiddenException.class);
expectAuthorizationTokenCheck();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
Task task = NoopTask.create();
overlordResource.taskPost(task, req);
}
@Test
public void testKillPendingSegments()
{
expectAuthorizationTokenCheck();
EasyMock.expect(taskMaster.isLeader()).andReturn(true);
EasyMock
.expect(
indexerMetadataStorageAdapter.deletePendingSegments(
EasyMock.eq("allow"),
EasyMock.anyObject(Interval.class)
)
)
.andReturn(2);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
final Map<String, Integer> response = (Map<String, Integer>) overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req)
.getEntity();
Assert.assertEquals(2, response.get("numDeleted").intValue());
}
@Test
public void testGetTaskPayload() throws Exception
{
// This is disabled since OverlordResource.getTaskStatus() is annotated with TaskResourceFilter which is supposed to
// set authorization token properly, but isn't called in this test.
// This should be fixed in https://github.com/apache/druid/issues/6685.
// expectAuthorizationTokenCheck();
final NoopTask task = NoopTask.create("mydatasource");
EasyMock.expect(taskStorageQueryAdapter.getTask("mytask"))
.andReturn(Optional.of(task));
EasyMock.expect(taskStorageQueryAdapter.getTask("othertask"))
.andReturn(Optional.absent());
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
final Response response1 = overlordResource.getTaskPayload("mytask");
final TaskPayloadResponse taskPayloadResponse1 = TestHelper.makeJsonMapper().readValue(
TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()),
TaskPayloadResponse.class
);
Assert.assertEquals(new TaskPayloadResponse("mytask", task), taskPayloadResponse1);
final Response response2 = overlordResource.getTaskPayload("othertask");
final TaskPayloadResponse taskPayloadResponse2 = TestHelper.makeJsonMapper().readValue(
TestHelper.makeJsonMapper().writeValueAsString(response2.getEntity()),
TaskPayloadResponse.class
);
Assert.assertEquals(new TaskPayloadResponse("othertask", null), taskPayloadResponse2);
}
@Test
public void testGetTaskStatus() throws Exception
{
// This is disabled since OverlordResource.getTaskStatus() is annotated with TaskResourceFilter which is supposed to
// set authorization token properly, but isn't called in this test.
// This should be fixed in https://github.com/apache/druid/issues/6685.
// expectAuthorizationTokenCheck();
final Task task = NoopTask.create("mytask", 0);
final TaskStatus status = TaskStatus.running("mytask");
EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("mytask"))
.andReturn(new TaskInfo(
task.getId(),
DateTimes.of("2018-01-01"),
status,
task.getDataSource(),
task
));
EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("othertask"))
.andReturn(null);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks())
.andReturn(ImmutableList.of());
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
final Response response1 = overlordResource.getTaskStatus("mytask");
final TaskStatusResponse taskStatusResponse1 = TestHelper.makeJsonMapper().readValue(
TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()),
TaskStatusResponse.class
);
TaskStatusPlus tsp = taskStatusResponse1.getStatus();
Assert.assertEquals(tsp.getStatusCode(), tsp.getStatus());
Assert.assertEquals(
new TaskStatusResponse(
"mytask",
new TaskStatusPlus(
"mytask",
"mytask",
"noop",
DateTimes.of("2018-01-01"),
DateTimes.EPOCH,
TaskState.RUNNING,
RunnerTaskState.RUNNING,
-1L,
TaskLocation.unknown(),
task.getDataSource(),
null
)
),
taskStatusResponse1
);
final Response response2 = overlordResource.getTaskStatus("othertask");
final TaskStatusResponse taskStatusResponse2 = TestHelper.makeJsonMapper().readValue(
TestHelper.makeJsonMapper().writeValueAsString(response2.getEntity()),
TaskStatusResponse.class
);
Assert.assertEquals(new TaskStatusResponse("othertask", null), taskStatusResponse2);
}
@Test
public void testGetLockedIntervals() throws Exception
{
final Map<String, Integer> minTaskPriority = Collections.singletonMap("ds1", 0);
final Map<String, List<Interval>> expectedLockedIntervals = Collections.singletonMap(
"ds1",
Arrays.asList(
Intervals.of("2012-01-01/2012-01-02"),
Intervals.of("2012-01-02/2012-01-03")
)
);
EasyMock.expect(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority))
.andReturn(expectedLockedIntervals);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
final Response response = overlordResource.getDatasourceLockedIntervals(minTaskPriority);
Assert.assertEquals(200, response.getStatus());
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
Map<String, List<Interval>> observedLockedIntervals = jsonMapper.readValue(
jsonMapper.writeValueAsString(response.getEntity()),
new TypeReference<Map<String, List<Interval>>>()
{
}
);
Assert.assertEquals(expectedLockedIntervals, observedLockedIntervals);
}
@Test
public void testGetLockedIntervalsWithEmptyBody()
{
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
Response response = overlordResource.getDatasourceLockedIntervals(null);
Assert.assertEquals(400, response.getStatus());
response = overlordResource.getDatasourceLockedIntervals(Collections.emptyMap());
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testShutdownTask()
{
// This is disabled since OverlordResource.doShutdown is annotated with TaskResourceFilter
// This should be fixed in https://github.com/apache/druid/issues/6685.
// expectAuthorizationTokenCheck();
TaskQueue mockQueue = EasyMock.createMock(TaskQueue.class);
EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.of(taskRunner)
).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(
Optional.of(mockQueue)
).anyTimes();
mockQueue.shutdown("id_1", "Shutdown request from user");
EasyMock.expectLastCall();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
mockQueue,
workerTaskRunnerQueryAdapter
);
final Map<String, Integer> response = (Map<String, Integer>) overlordResource
.doShutdown("id_1")
.getEntity();
Assert.assertEquals("id_1", response.get("task"));
}
@Test
public void testShutdownAllTasks()
{
// This is disabled since OverlordResource.shutdownTasksForDataSource is annotated with DatasourceResourceFilter
// This should be fixed in https://github.com/apache/druid/issues/6685.
// expectAuthorizationTokenCheck();
TaskQueue mockQueue = EasyMock.createMock(TaskQueue.class);
EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.of(taskRunner)
).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(
Optional.of(mockQueue)
).anyTimes();
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of(
new TaskInfo(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"datasource",
getTaskWithIdAndDatasource("id_1", "datasource")
),
new TaskInfo(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"datasource",
getTaskWithIdAndDatasource("id_2", "datasource")
)
));
mockQueue.shutdown("id_1", "Shutdown request from user");
EasyMock.expectLastCall();
mockQueue.shutdown("id_2", "Shutdown request from user");
EasyMock.expectLastCall();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
mockQueue,
workerTaskRunnerQueryAdapter
);
final Map<String, Integer> response = (Map<String, Integer>) overlordResource
.shutdownTasksForDataSource("datasource")
.getEntity();
Assert.assertEquals("datasource", response.get("dataSource"));
}
@Test
public void testShutdownAllTasksForNonExistingDataSource()
{
final TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class);
EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(EasyMock.anyString())).andReturn(Collections.emptyList());
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
final Response response = overlordResource.shutdownTasksForDataSource("notExisting");
Assert.assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
}
@Test
public void testEnableWorker()
{
final String host = "worker-host";
workerTaskRunnerQueryAdapter.enableWorker(host);
EasyMock.expectLastCall().once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
final Response response = overlordResource.enableWorker(host);
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(ImmutableMap.of(host, "enabled"), response.getEntity());
}
@Test
public void testDisableWorker()
{
final String host = "worker-host";
workerTaskRunnerQueryAdapter.disableWorker(host);
EasyMock.expectLastCall().once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
final Response response = overlordResource.disableWorker(host);
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(ImmutableMap.of(host, "disabled"), response.getEntity());
}
@Test
public void testEnableWorkerWhenWorkerAPIRaisesError()
{
final String host = "worker-host";
workerTaskRunnerQueryAdapter.enableWorker(host);
EasyMock.expectLastCall().andThrow(new RE("Worker API returns error!")).once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
final Response response = overlordResource.enableWorker(host);
Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.getCode(), response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", "Worker API returns error!"), response.getEntity());
}
@Test
public void testDisableWorkerWhenWorkerAPIRaisesError()
{
final String host = "worker-host";
workerTaskRunnerQueryAdapter.disableWorker(host);
EasyMock.expectLastCall().andThrow(new RE("Worker API returns error!")).once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
final Response response = overlordResource.disableWorker(host);
Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.getCode(), response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", "Worker API returns error!"), response.getEntity());
}
private void expectAuthorizationTokenCheck()
{
AuthenticationResult authenticationResult = new AuthenticationResult("druid", "druid", null, null);
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
.andReturn(authenticationResult)
.atLeastOnce();
req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, false);
EasyMock.expectLastCall().anyTimes();
req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
EasyMock.expectLastCall().anyTimes();
}
private Task getTaskWithIdAndDatasource(String id, String datasource)
{
return new AbstractTask(id, datasource, null)
{
@Override
public String getType()
{
return "test";
}
@Override
public boolean isReady(TaskActionClient taskActionClient)
{
return false;
}
@Override
public void stopGracefully(TaskConfig taskConfig)
{
}
@Override
public TaskStatus run(TaskToolbox toolbox)
{
return null;
}
};
}
private static class MockTaskRunnerWorkItem extends TaskRunnerWorkItem
{
public MockTaskRunnerWorkItem(
String taskId,
ListenableFuture<TaskStatus> result
)
{
super(taskId, result);
}
@Override
public TaskLocation getLocation()
{
return TaskLocation.unknown();
}
@Override
public String getTaskType()
{
return "test";
}
@Override
public String getDataSource()
{
return "ds_test";
}
}
}