blob: 57bab1fed0d3a5b0e2a2c627fa1262dbe52341ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.rpc.indexing;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.client.indexing.IndexingWorker;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.MockServiceClient;
import org.apache.druid.rpc.RequestBuilder;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class OverlordClientImplTest
{
private static final List<TaskStatusPlus> STATUSES = Collections.singletonList(
new TaskStatusPlus(
"taskId",
null,
null,
DateTimes.of("2000"),
DateTimes.of("2000"),
TaskState.RUNNING,
RunnerTaskState.RUNNING,
null,
TaskLocation.unknown(),
null,
null
)
);
private ObjectMapper jsonMapper;
private MockServiceClient serviceClient;
private OverlordClient overlordClient;
@Before
public void setup()
{
jsonMapper = new DefaultObjectMapper();
serviceClient = new MockServiceClient();
overlordClient = new OverlordClientImpl(serviceClient, jsonMapper);
}
@After
public void tearDown()
{
serviceClient.verify();
}
@Test
public void test_findCurrentLeader() throws Exception
{
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/leader"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
StringUtils.toUtf8("http://followTheLeader")
);
Assert.assertEquals(URI.create("http://followTheLeader"), overlordClient.findCurrentLeader().get());
}
@Test
public void test_taskStatuses_null_null_null() throws Exception
{
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/tasks"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(STATUSES)
);
Assert.assertEquals(
STATUSES,
ImmutableList.copyOf(overlordClient.taskStatuses(null, null, null).get())
);
}
@Test
public void test_taskStatuses_RUNNING_null_null() throws Exception
{
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/tasks?state=RUNNING"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(STATUSES)
);
Assert.assertEquals(
STATUSES,
ImmutableList.copyOf(overlordClient.taskStatuses("RUNNING", null, null).get())
);
}
@Test
public void test_taskStatuses_RUNNING_foo_null() throws Exception
{
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/tasks?state=RUNNING&datasource=foo"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(STATUSES)
);
Assert.assertEquals(
STATUSES,
ImmutableList.copyOf(overlordClient.taskStatuses("RUNNING", "foo", null).get())
);
}
@Test
public void test_taskStatuses_null_foo_null() throws Exception
{
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/tasks?datasource=foo"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(STATUSES)
);
Assert.assertEquals(
STATUSES,
ImmutableList.copyOf(overlordClient.taskStatuses(null, "foo", null).get())
);
}
@Test
public void test_taskStatuses_RUNNING_foo_zero() throws Exception
{
serviceClient.expectAndRespond(
new RequestBuilder(
HttpMethod.GET,
"/druid/indexer/v1/tasks?state=RUNNING&datasource=foo%3F&max=0"
),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(STATUSES)
);
Assert.assertEquals(
STATUSES,
ImmutableList.copyOf(overlordClient.taskStatuses("RUNNING", "foo?", 0).get())
);
}
@Test
public void test_taskStatuses_null_null_zero() throws Exception
{
serviceClient.expectAndRespond(
new RequestBuilder(
HttpMethod.GET,
"/druid/indexer/v1/tasks?max=0"
),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(STATUSES)
);
Assert.assertEquals(
STATUSES,
ImmutableList.copyOf(overlordClient.taskStatuses(null, null, 0).get())
);
}
@Test
public void test_findLockedIntervals() throws Exception
{
final Map<String, List<Interval>> lockMap =
ImmutableMap.of("foo", Collections.singletonList(Intervals.of("2000/2001")));
final List<LockFilterPolicy> requests = ImmutableList.of(
new LockFilterPolicy("foo", 3, null)
);
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals/v2")
.jsonContent(jsonMapper, requests),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(lockMap)
);
Assert.assertEquals(
lockMap,
overlordClient.findLockedIntervals(requests).get()
);
}
@Test
public void test_findLockedIntervals_nullReturn() throws Exception
{
final List<LockFilterPolicy> requests = ImmutableList.of(
new LockFilterPolicy("foo", 3, null)
);
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals/v2")
.jsonContent(jsonMapper, requests),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(null)
);
Assert.assertEquals(
Collections.emptyMap(),
overlordClient.findLockedIntervals(requests).get()
);
}
@Test
public void test_supervisorStatuses() throws Exception
{
final List<SupervisorStatus> statuses = ImmutableList.of(
new SupervisorStatus.Builder()
.withId("foo")
.withSource("kafka")
.withState("chill")
.build()
);
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/supervisor?system"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(statuses)
);
Assert.assertEquals(
statuses,
ImmutableList.copyOf(overlordClient.supervisorStatuses().get())
);
}
@Test
public void test_taskReportAsMap() throws Exception
{
final String taskId = "testTaskId";
final TaskReport.ReportMap response = TaskReport.buildTaskReports(new KillTaskReport("taskId", null));
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/task/testTaskId/reports"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(response)
);
final ListenableFuture<TaskReport.ReportMap> future = overlordClient.taskReportAsMap(taskId);
Assert.assertEquals(response, future.get());
}
@Test
public void test_taskReportAsMap_notFound()
{
String taskId = "testTaskId";
String errorMsg = "No task reports were found for this task. "
+ "The task may not exist, or it may not have completed yet.";
serviceClient.expectAndThrow(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/task/testTaskId/reports"),
new HttpResponseException(
new StringFullResponseHolder(
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND),
StandardCharsets.UTF_8
).addChunk(errorMsg)
)
);
final ListenableFuture<TaskReport.ReportMap> future = overlordClient.taskReportAsMap(taskId);
final ExecutionException e = Assert.assertThrows(
ExecutionException.class,
future::get
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(HttpResponseException.class));
Assert.assertEquals(
HttpResponseStatus.NOT_FOUND.getCode(),
((HttpResponseException) e.getCause()).getResponse().getStatus().getCode()
);
}
@Test
public void test_getTaskReport_empty()
{
final String taskID = "testTaskId";
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/task/" + taskID + "/reports"),
HttpResponseStatus.OK,
Collections.emptyMap(),
StringUtils.toUtf8("{}")
);
final TaskReport.ReportMap actualResponse =
FutureUtils.getUnchecked(overlordClient.taskReportAsMap(taskID), true);
Assert.assertEquals(Collections.emptyMap(), actualResponse);
}
@Test
public void test_getTotalWorkerCapacity() throws Exception
{
int currentClusterCapacity = 5;
int maximumCapacityWithAutoScale = 10;
// Mock response for /druid/indexer/v1/totalWorkerCapacity
final IndexingTotalWorkerCapacityInfo indexingTotalWorkerCapacityInfo = new IndexingTotalWorkerCapacityInfo(
currentClusterCapacity,
maximumCapacityWithAutoScale
);
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/totalWorkerCapacity"),
HttpResponseStatus.OK,
Collections.emptyMap(),
jsonMapper.writeValueAsBytes(indexingTotalWorkerCapacityInfo)
);
Assert.assertEquals(
indexingTotalWorkerCapacityInfo,
FutureUtils.getUnchecked(overlordClient.getTotalWorkerCapacity(), true)
);
}
@Test
public void test_getWorkers() throws Exception
{
final List<IndexingWorkerInfo> workers = ImmutableList.of(
new IndexingWorkerInfo(
new IndexingWorker("http", "localhost", "1.2.3.4", 3, "2"),
0,
Collections.emptySet(),
Collections.emptyList(),
DateTimes.of("2000"),
null
)
);
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/workers"),
HttpResponseStatus.OK,
Collections.emptyMap(),
jsonMapper.writeValueAsBytes(workers)
);
Assert.assertEquals(
workers,
FutureUtils.getUnchecked(overlordClient.getWorkers(), true)
);
}
@Test
public void test_killPendingSegments() throws Exception
{
serviceClient.expectAndRespond(
new RequestBuilder(
HttpMethod.DELETE,
"/druid/indexer/v1/pendingSegments/foo?interval=2000-01-01T00%3A00%3A00.000Z%2F2001-01-01T00%3A00%3A00.000Z"
),
HttpResponseStatus.OK,
Collections.emptyMap(),
jsonMapper.writeValueAsBytes(
ImmutableMap.of("numDeleted", 2L)
)
);
Assert.assertEquals(
Integer.valueOf(2),
FutureUtils.getUnchecked(overlordClient.killPendingSegments("foo", Intervals.of("2000/2001")), true)
);
}
@Test
public void test_taskPayload() throws ExecutionException, InterruptedException, JsonProcessingException
{
final String taskID = "taskId_1";
final ClientTaskQuery clientTaskQuery = new ClientKillUnusedSegmentsTaskQuery(
taskID,
"test",
null,
null,
null,
null,
null,
null
);
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/task/" + taskID),
HttpResponseStatus.OK,
Collections.emptyMap(),
DefaultObjectMapper.INSTANCE.writeValueAsBytes(new TaskPayloadResponse(taskID, clientTaskQuery))
);
Assert.assertEquals(
clientTaskQuery,
overlordClient.taskPayload(taskID).get().getPayload()
);
}
}