blob: 8ea6321d1bdb6a143a03db58c552dbdad26244b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexing.common.IndexTaskClient;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
public abstract class SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType> extends IndexTaskClient
{
private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTaskClient.class);
public SeekableStreamIndexTaskClient(
HttpClient httpClient,
ObjectMapper jsonMapper,
TaskInfoProvider taskInfoProvider,
String dataSource,
int numThreads,
Duration httpTimeout,
long numRetries
)
{
super(httpClient, jsonMapper, taskInfoProvider, httpTimeout, dataSource, numThreads, numRetries);
}
public boolean stop(final String id, final boolean publish)
{
log.debug("Stop task[%s] publish[%s]", id, publish);
try {
final StringFullResponseHolder response = submitRequestWithEmptyContent(
id,
HttpMethod.POST,
"stop",
publish ? "publish=true" : null,
true
);
return isSuccess(response);
}
catch (NoTaskLocationException e) {
return false;
}
catch (TaskNotRunnableException e) {
log.info("Task [%s] couldn't be stopped because it is no longer running", id);
return true;
}
catch (Exception e) {
log.warn(e, "Exception while stopping task [%s]", id);
return false;
}
}
public boolean resume(final String id)
{
log.debug("Resume task[%s]", id);
try {
final StringFullResponseHolder response = submitRequestWithEmptyContent(id, HttpMethod.POST, "resume", null, true);
return isSuccess(response);
}
catch (NoTaskLocationException | IOException e) {
log.warn(e, "Exception while stopping task [%s]", id);
return false;
}
}
public Map<PartitionIdType, SequenceOffsetType> pause(final String id)
{
log.debug("Pause task[%s]", id);
try {
final StringFullResponseHolder response = submitRequestWithEmptyContent(
id,
HttpMethod.POST,
"pause",
null,
true
);
final HttpResponseStatus responseStatus = response.getStatus();
final String responseContent = response.getContent();
if (responseStatus.equals(HttpResponseStatus.OK)) {
log.info("Task [%s] paused successfully", id);
return deserializeMap(responseContent, Map.class, getPartitionType(), getSequenceType());
} else if (responseStatus.equals(HttpResponseStatus.ACCEPTED)) {
// The task received the pause request, but its status hasn't been changed yet.
while (true) {
final SeekableStreamIndexTaskRunner.Status status = getStatus(id);
if (status == SeekableStreamIndexTaskRunner.Status.PAUSED) {
return getCurrentOffsets(id, true);
}
final Duration delay = newRetryPolicy().getAndIncrementRetryDelay();
if (delay == null) {
throw new ISE(
"Task [%s] failed to change its status from [%s] to [%s], aborting",
id,
status,
SeekableStreamIndexTaskRunner.Status.PAUSED
);
} else {
final long sleepTime = delay.getMillis();
log.info(
"Still waiting for task [%s] to change its status to [%s]; will try again in [%s]",
id,
SeekableStreamIndexTaskRunner.Status.PAUSED,
new Duration(sleepTime).toString()
);
Thread.sleep(sleepTime);
}
}
} else {
throw new ISE(
"Pause request for task [%s] failed with response [%s] : [%s]",
id,
responseStatus,
responseContent
);
}
}
catch (NoTaskLocationException e) {
log.error("Exception [%s] while pausing Task [%s]", e.getMessage(), id);
return ImmutableMap.of();
}
catch (IOException | InterruptedException e) {
throw new RE(e, "Exception [%s] while pausing Task [%s]", e.getMessage(), id);
}
}
public SeekableStreamIndexTaskRunner.Status getStatus(final String id)
{
log.debug("GetStatus task[%s]", id);
try {
final StringFullResponseHolder response = submitRequestWithEmptyContent(id, HttpMethod.GET, "status", null, true);
return deserialize(response.getContent(), SeekableStreamIndexTaskRunner.Status.class);
}
catch (NoTaskLocationException e) {
return SeekableStreamIndexTaskRunner.Status.NOT_STARTED;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Nullable
public DateTime getStartTime(final String id)
{
log.debug("GetStartTime task[%s]", id);
try {
final StringFullResponseHolder response = submitRequestWithEmptyContent(id, HttpMethod.GET, "time/start", null, true);
return response.getContent() == null || response.getContent().isEmpty()
? null
: deserialize(response.getContent(), DateTime.class);
}
catch (NoTaskLocationException e) {
return null;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public Map<String, Object> getMovingAverages(final String id)
{
log.debug("GetMovingAverages task[%s]", id);
try {
final StringFullResponseHolder response = submitRequestWithEmptyContent(
id,
HttpMethod.GET,
"rowStats",
null,
true
);
return response.getContent() == null || response.getContent().isEmpty()
? Collections.emptyMap()
: deserialize(response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
}
catch (NoTaskLocationException e) {
return Collections.emptyMap();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public Map<PartitionIdType, SequenceOffsetType> getCurrentOffsets(final String id, final boolean retry)
{
log.debug("GetCurrentOffsets task[%s] retry[%s]", id, retry);
try {
final StringFullResponseHolder response = submitRequestWithEmptyContent(
id,
HttpMethod.GET,
"offsets/current",
null,
retry
);
return deserializeMap(response.getContent(), Map.class, getPartitionType(), getSequenceType());
}
catch (NoTaskLocationException e) {
return ImmutableMap.of();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> getCheckpoints(final String id, final boolean retry)
{
log.debug("GetCheckpoints task[%s] retry[%s]", id, retry);
try {
final StringFullResponseHolder response = submitRequestWithEmptyContent(id, HttpMethod.GET, "checkpoints", null, retry);
return deserializeNestedValueMap(
response.getContent(),
TreeMap.class,
Integer.class,
Map.class,
getPartitionType(),
getSequenceType()
);
}
catch (NoTaskLocationException e) {
return new TreeMap<>();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> getCheckpointsAsync(
final String id,
final boolean retry
)
{
return doAsync(() -> getCheckpoints(id, retry));
}
public Map<PartitionIdType, SequenceOffsetType> getEndOffsets(final String id)
{
log.debug("GetEndOffsets task[%s]", id);
try {
final StringFullResponseHolder response = submitRequestWithEmptyContent(id, HttpMethod.GET, "offsets/end", null, true);
return deserializeMap(response.getContent(), Map.class, getPartitionType(), getSequenceType());
}
catch (NoTaskLocationException e) {
return ImmutableMap.of();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public boolean setEndOffsets(
final String id,
final Map<PartitionIdType, SequenceOffsetType> endOffsets,
final boolean finalize
) throws IOException
{
log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, endOffsets, finalize);
try {
final StringFullResponseHolder response = submitJsonRequest(
id,
HttpMethod.POST,
"offsets/end",
StringUtils.format("finish=%s", finalize),
serialize(endOffsets),
true
);
return isSuccess(response);
}
catch (NoTaskLocationException e) {
return false;
}
}
public ListenableFuture<Boolean> stopAsync(final String id, final boolean publish)
{
return doAsync(() -> stop(id, publish));
}
public ListenableFuture<Boolean> resumeAsync(final String id)
{
return doAsync(() -> resume(id));
}
public ListenableFuture<DateTime> getStartTimeAsync(final String id)
{
return doAsync(() -> getStartTime(id));
}
public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseAsync(final String id)
{
return doAsync(() -> pause(id));
}
public ListenableFuture<Boolean> setEndOffsetsAsync(
final String id,
final Map<PartitionIdType, SequenceOffsetType> endOffsets,
final boolean finalize
)
{
return doAsync(() -> setEndOffsets(id, endOffsets, finalize));
}
public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getCurrentOffsetsAsync(
final String id,
final boolean retry
)
{
return doAsync(() -> getCurrentOffsets(id, retry));
}
public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getEndOffsetsAsync(final String id)
{
return doAsync(() -> getEndOffsets(id));
}
public ListenableFuture<Map<String, Object>> getMovingAveragesAsync(final String id)
{
return doAsync(() -> getMovingAverages(id));
}
public ListenableFuture<SeekableStreamIndexTaskRunner.Status> getStatusAsync(final String id)
{
return doAsync(() -> getStatus(id));
}
protected abstract Class<PartitionIdType> getPartitionType();
protected abstract Class<SequenceOffsetType> getSequenceType();
}