blob: 44ba61c8eab27914bae13f881101468128f516b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client.indexing;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class HttpIndexingServiceClient implements IndexingServiceClient
{
private final DruidLeaderClient druidLeaderClient;
private final ObjectMapper jsonMapper;
@Inject
public HttpIndexingServiceClient(
ObjectMapper jsonMapper,
@IndexingService DruidLeaderClient druidLeaderClient
)
{
this.jsonMapper = jsonMapper;
this.druidLeaderClient = druidLeaderClient;
}
@Override
public void killUnusedSegments(String idPrefix, String dataSource, Interval interval)
{
final String taskId = IdUtils.newTaskId(idPrefix, ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval);
final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval);
runTask(taskId, taskQuery);
}
@Override
public String compactSegments(
String idPrefix,
List<DataSegment> segments,
int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec,
@Nullable Boolean dropExisting,
@Nullable Map<String, Object> context
)
{
Preconditions.checkArgument(!segments.isEmpty(), "Expect non-empty segments to compact");
final String dataSource = segments.get(0).getDataSource();
Preconditions.checkArgument(
segments.stream().allMatch(segment -> segment.getDataSource().equals(dataSource)),
"Segments must have the same dataSource"
);
context = context == null ? new HashMap<>() : context;
context.put("priority", compactionTaskPriority);
final String taskId = IdUtils.newTaskId(idPrefix, ClientCompactionTaskQuery.TYPE, dataSource, null);
final ClientTaskQuery taskQuery = new ClientCompactionTaskQuery(
taskId,
dataSource,
new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments), dropExisting),
tuningConfig,
granularitySpec,
context
);
return runTask(taskId, taskQuery);
}
@Override
public String runTask(String taskId, Object taskObject)
{
try {
// Warning, magic: here we may serialize ClientTaskQuery objects, but OverlordResource.taskPost() deserializes
// Task objects from the same data. See the comment for ClientTaskQuery for details.
final StringFullResponseHolder response = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/task")
.setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskObject))
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
if (!Strings.isNullOrEmpty(response.getContent())) {
throw new ISE(
"Failed to post task[%s] with error[%s].",
taskId,
response.getContent()
);
} else {
throw new ISE("Failed to post task[%s]. Please check overlord log", taskId);
}
}
final Map<String, Object> resultMap = jsonMapper.readValue(
response.getContent(),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
final String returnedTaskId = (String) resultMap.get("task");
Preconditions.checkState(
taskId.equals(returnedTaskId),
"Got a different taskId[%s]. Expected taskId[%s]",
returnedTaskId,
taskId
);
return taskId;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public String cancelTask(String taskId)
{
try {
final StringFullResponseHolder response = druidLeaderClient.go(
druidLeaderClient.makeRequest(
HttpMethod.POST,
StringUtils.format("/druid/indexer/v1/task/%s/shutdown", StringUtils.urlEncode(taskId))
)
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE("Failed to cancel task[%s]", taskId);
}
final Map<String, Object> resultMap = jsonMapper.readValue(
response.getContent(),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
final String cancelledTaskId = (String) resultMap.get("task");
Preconditions.checkNotNull(cancelledTaskId, "Null task id returned for task[%s]", taskId);
Preconditions.checkState(
taskId.equals(cancelledTaskId),
"Requested to cancel task[%s], but another task[%s] was cancelled!",
taskId,
cancelledTaskId
);
return cancelledTaskId;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public int getTotalWorkerCapacity()
{
try {
final StringFullResponseHolder response = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/indexer/v1/workers")
.setHeader("Content-Type", MediaType.APPLICATION_JSON)
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while getting available cluster capacity. status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
final Collection<IndexingWorkerInfo> workers = jsonMapper.readValue(
response.getContent(),
new TypeReference<Collection<IndexingWorkerInfo>>() {}
);
return workers.stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public List<TaskStatusPlus> getActiveTasks()
{
// Must retrieve waiting, then pending, then running, so if tasks move from one state to the next between
// calls then we still catch them. (Tasks always go waiting -> pending -> running.)
//
// Consider switching to new-style /druid/indexer/v1/tasks API in the future.
final List<TaskStatusPlus> tasks = new ArrayList<>();
final Set<String> taskIdsSeen = new HashSet<>();
final Iterable<TaskStatusPlus> activeTasks = Iterables.concat(
getTasks("waitingTasks"),
getTasks("pendingTasks"),
getTasks("runningTasks")
);
for (TaskStatusPlus task : activeTasks) {
// Use taskIdsSeen to prevent returning the same task ID more than once (if it hops from 'pending' to 'running',
// for example, and we see it twice.)
if (taskIdsSeen.add(task.getId())) {
tasks.add(task);
}
}
return tasks;
}
private List<TaskStatusPlus> getTasks(String endpointSuffix)
{
try {
final StringFullResponseHolder responseHolder = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/%s", endpointSuffix))
);
if (!responseHolder.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE("Error while fetching the status of tasks");
}
return jsonMapper.readValue(
responseHolder.getContent(),
new TypeReference<List<TaskStatusPlus>>()
{
}
);
}
catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public TaskStatusResponse getTaskStatus(String taskId)
{
try {
final StringFullResponseHolder responseHolder = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format(
"/druid/indexer/v1/task/%s/status",
StringUtils.urlEncode(taskId)
))
);
return jsonMapper.readValue(
responseHolder.getContent(),
new TypeReference<TaskStatusResponse>()
{
}
);
}
catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds) throws InterruptedException
{
try {
final StringFullResponseHolder responseHolder = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/taskStatus")
.setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskIds))
);
return jsonMapper.readValue(
responseHolder.getContent(),
new TypeReference<Map<String, TaskStatus>>()
{
}
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
@Nullable
public TaskStatusPlus getLastCompleteTask()
{
final List<TaskStatusPlus> completeTaskStatuses = getTasks("completeTasks?n=1");
return completeTaskStatuses.isEmpty() ? null : completeTaskStatuses.get(0);
}
@Override
public TaskPayloadResponse getTaskPayload(String taskId)
{
try {
final StringFullResponseHolder responseHolder = druidLeaderClient.go(
druidLeaderClient.makeRequest(
HttpMethod.GET,
StringUtils.format("/druid/indexer/v1/task/%s", StringUtils.urlEncode(taskId))
)
);
return jsonMapper.readValue(
responseHolder.getContent(),
new TypeReference<TaskPayloadResponse>()
{
}
);
}
catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
{
try {
final StringFullResponseHolder responseHolder = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals")
.setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(minTaskPriority))
);
final Map<String, List<Interval>> response = jsonMapper.readValue(
responseHolder.getContent(),
new TypeReference<Map<String, List<Interval>>>()
{
}
);
return response == null ? Collections.emptyMap() : response;
}
catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public SamplerResponse sample(SamplerSpec samplerSpec)
{
try {
final StringFullResponseHolder response = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/sampler")
.setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(samplerSpec))
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
if (!Strings.isNullOrEmpty(response.getContent())) {
throw new ISE(
"Failed to sample with sampler spec[%s], response[%s].",
samplerSpec,
response.getContent()
);
} else {
throw new ISE("Failed to sample with sampler spec[%s]. Please check overlord log", samplerSpec);
}
}
return jsonMapper.readValue(response.getContent(), SamplerResponse.class);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public int killPendingSegments(String dataSource, DateTime end)
{
final String endPoint = StringUtils.format(
"/druid/indexer/v1/pendingSegments/%s?interval=%s",
StringUtils.urlEncode(dataSource),
new Interval(DateTimes.MIN, end)
);
try {
final StringFullResponseHolder responseHolder = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.DELETE, endPoint)
);
if (!responseHolder.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE("Error while killing pendingSegments of dataSource[%s] created until [%s]", dataSource, end);
}
final Map<String, Object> resultMap = jsonMapper.readValue(
responseHolder.getContent(),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
final Object numDeletedObject = resultMap.get("numDeleted");
return (Integer) Preconditions.checkNotNull(numDeletedObject, "numDeletedObject");
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}