blob: de1801d7ad7a5f8ce4c1efe854ee14c2f8c87c92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.loading;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.server.coordination.DataSegmentChangeCallback;
import org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.SegmentLoadDropHandler;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
*
*/
public class HttpLoadQueuePeonTest
{
private final List<DataSegment> segments =
CreateDataSegments.ofDatasource("test")
.forIntervals(1, Granularities.DAY)
.startingAt("2022-01-01")
.withNumPartitions(4)
.eachOfSizeInMb(100);
private TestHttpClient httpClient;
private HttpLoadQueuePeon httpLoadQueuePeon;
private BlockingExecutorService processingExecutor;
private BlockingExecutorService callbackExecutor;
private final List<DataSegment> processedSegments = new ArrayList<>();
@Before
public void setUp()
{
httpClient = new TestHttpClient();
processingExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-%s");
callbackExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-cb");
processedSegments.clear();
httpLoadQueuePeon = new HttpLoadQueuePeon(
"http://dummy:4000",
ServerTestHelper.MAPPER,
httpClient,
new HttpLoadQueuePeonConfig(null, null, 10),
new WrappingScheduledExecutorService("HttpLoadQueuePeonTest-%s", processingExecutor, true),
callbackExecutor
);
httpLoadQueuePeon.start();
}
@After
public void tearDown()
{
httpLoadQueuePeon.stop();
}
@Test
public void testSimple()
{
httpLoadQueuePeon
.dropSegment(segments.get(0), markSegmentProcessed(segments.get(0)));
httpLoadQueuePeon
.loadSegment(segments.get(1), SegmentAction.LOAD, markSegmentProcessed(segments.get(1)));
httpLoadQueuePeon
.loadSegment(segments.get(2), SegmentAction.REPLICATE, markSegmentProcessed(segments.get(2)));
httpLoadQueuePeon
.loadSegment(segments.get(3), SegmentAction.MOVE_TO, markSegmentProcessed(segments.get(3)));
// Send requests to server
processingExecutor.finishAllPendingTasks();
Assert.assertEquals(segments, httpClient.segmentsSentToServer);
// Verify that all callbacks are executed
callbackExecutor.finishAllPendingTasks();
Assert.assertEquals(segments, processedSegments);
}
@Test
public void testLoadDropAfterStop()
{
// Verify that requests sent after stopping the peon fail immediately
httpLoadQueuePeon.stop();
final Set<DataSegment> failedSegments = new HashSet<>();
final DataSegment segment1 = segments.get(0);
httpLoadQueuePeon.dropSegment(segment1, success -> {
if (!success) {
failedSegments.add(segment1);
}
});
final DataSegment segment2 = segments.get(1);
httpLoadQueuePeon.loadSegment(segment2, SegmentAction.MOVE_TO, success -> {
if (!success) {
failedSegments.add(segment2);
}
});
Assert.assertTrue(failedSegments.contains(segment1));
Assert.assertTrue(failedSegments.contains(segment2));
}
@Test
public void testPriorityOfSegmentAction()
{
// Shuffle the segments for the same day
final List<DataSegment> segmentsDay1 = new ArrayList<>(segments);
Collections.shuffle(segmentsDay1);
// Assign segments to the actions in their order of priority
// Order: drop, load, replicate, move
final List<QueueAction> actions = Arrays.asList(
QueueAction.of(segmentsDay1.get(0), s -> httpLoadQueuePeon.dropSegment(s, null)),
QueueAction.of(segmentsDay1.get(1), s -> httpLoadQueuePeon.loadSegment(s, SegmentAction.LOAD, null)),
QueueAction.of(segmentsDay1.get(2), s -> httpLoadQueuePeon.loadSegment(s, SegmentAction.REPLICATE, null)),
QueueAction.of(segmentsDay1.get(3), s -> httpLoadQueuePeon.loadSegment(s, SegmentAction.MOVE_TO, null))
);
// Queue the actions on the peon in a random order
Collections.shuffle(actions);
actions.forEach(QueueAction::invoke);
// Send one batch of requests to the server
processingExecutor.finishAllPendingTasks();
// Verify that all segments are sent to the server in the expected order
Assert.assertEquals(segmentsDay1, httpClient.segmentsSentToServer);
}
@Test
public void testPriorityOfSegmentInterval()
{
// Create 8 segments (4 x 2days) and shuffle them
final List<DataSegment> segmentsDay1 = new ArrayList<>(segments);
Collections.shuffle(segmentsDay1);
final List<DataSegment> segmentsDay2 = new ArrayList<>(
CreateDataSegments.ofDatasource("test")
.forIntervals(1, Granularities.DAY)
.startingAt("2022-01-02")
.withNumPartitions(4)
.eachOfSizeInMb(100)
);
Collections.shuffle(segmentsDay2);
// Assign segments to the actions in their order of priority
// Priority order: action (drop, priorityLoad, etc), then interval (new then old)
List<QueueAction> actions = Arrays.asList(
QueueAction.of(segmentsDay2.get(0), s -> httpLoadQueuePeon.dropSegment(s, null)),
QueueAction.of(segmentsDay1.get(0), s -> httpLoadQueuePeon.dropSegment(s, null)),
QueueAction.of(segmentsDay2.get(1), s -> httpLoadQueuePeon.loadSegment(s, SegmentAction.LOAD, null)),
QueueAction.of(segmentsDay1.get(1), s -> httpLoadQueuePeon.loadSegment(s, SegmentAction.LOAD, null)),
QueueAction.of(segmentsDay2.get(2), s -> httpLoadQueuePeon.loadSegment(s, SegmentAction.REPLICATE, null)),
QueueAction.of(segmentsDay1.get(2), s -> httpLoadQueuePeon.loadSegment(s, SegmentAction.REPLICATE, null)),
QueueAction.of(segmentsDay2.get(3), s -> httpLoadQueuePeon.loadSegment(s, SegmentAction.MOVE_TO, null)),
QueueAction.of(segmentsDay1.get(3), s -> httpLoadQueuePeon.loadSegment(s, SegmentAction.MOVE_TO, null))
);
final List<DataSegment> expectedSegmentOrder =
actions.stream().map(a -> a.segment).collect(Collectors.toList());
// Queue the actions on the peon in a random order
Collections.shuffle(actions);
actions.forEach(QueueAction::invoke);
// Send one batch of requests to the server
processingExecutor.finishNextPendingTask();
// Verify that all segments are sent to the server in the expected order
Assert.assertEquals(expectedSegmentOrder, httpClient.segmentsSentToServer);
}
@Test
public void testCancelLoad()
{
final DataSegment segment = segments.get(0);
httpLoadQueuePeon.loadSegment(segment, SegmentAction.REPLICATE, markSegmentProcessed(segment));
Assert.assertEquals(1, httpLoadQueuePeon.getSegmentsToLoad().size());
boolean cancelled = httpLoadQueuePeon.cancelOperation(segment);
Assert.assertTrue(cancelled);
Assert.assertEquals(0, httpLoadQueuePeon.getSegmentsToLoad().size());
Assert.assertTrue(processedSegments.isEmpty());
}
@Test
public void testCancelDrop()
{
final DataSegment segment = segments.get(0);
httpLoadQueuePeon.dropSegment(segment, markSegmentProcessed(segment));
Assert.assertEquals(1, httpLoadQueuePeon.getSegmentsToDrop().size());
boolean cancelled = httpLoadQueuePeon.cancelOperation(segment);
Assert.assertTrue(cancelled);
Assert.assertTrue(httpLoadQueuePeon.getSegmentsToDrop().isEmpty());
Assert.assertTrue(processedSegments.isEmpty());
}
@Test
public void testCannotCancelRequestSentToServer()
{
final DataSegment segment = segments.get(0);
httpLoadQueuePeon.loadSegment(segment, SegmentAction.REPLICATE, markSegmentProcessed(segment));
Assert.assertTrue(httpLoadQueuePeon.getSegmentsToLoad().contains(segment));
// Send the request to the server
processingExecutor.finishNextPendingTask();
Assert.assertTrue(httpClient.segmentsSentToServer.contains(segment));
// Segment is still in queue but operation cannot be cancelled
Assert.assertTrue(httpLoadQueuePeon.getSegmentsToLoad().contains(segment));
boolean cancelled = httpLoadQueuePeon.cancelOperation(segment);
Assert.assertFalse(cancelled);
// Handle response from server
processingExecutor.finishNextPendingTask();
// Segment has been removed from queue
Assert.assertTrue(httpLoadQueuePeon.getSegmentsToLoad().isEmpty());
cancelled = httpLoadQueuePeon.cancelOperation(segment);
Assert.assertFalse(cancelled);
// Execute callbacks and verify segment is fully processed
callbackExecutor.finishAllPendingTasks();
Assert.assertTrue(processedSegments.contains(segment));
}
@Test
public void testCannotCancelOperationMultipleTimes()
{
final DataSegment segment = segments.get(0);
httpLoadQueuePeon.loadSegment(segment, SegmentAction.REPLICATE, markSegmentProcessed(segment));
Assert.assertTrue(httpLoadQueuePeon.getSegmentsToLoad().contains(segment));
Assert.assertTrue(httpLoadQueuePeon.cancelOperation(segment));
Assert.assertFalse(httpLoadQueuePeon.cancelOperation(segment));
}
private LoadPeonCallback markSegmentProcessed(DataSegment segment)
{
return success -> processedSegments.add(segment);
}
private static class TestHttpClient implements HttpClient, DataSegmentChangeHandler
{
private final List<DataSegment> segmentsSentToServer = new ArrayList<>();
@Override
public <Intermediate, Final> ListenableFuture<Final> go(
Request request,
HttpResponseHandler<Intermediate, Final> httpResponseHandler
)
{
throw new UnsupportedOperationException("Not Implemented.");
}
@Override
public <Intermediate, Final> ListenableFuture<Final> go(
Request request,
HttpResponseHandler<Intermediate, Final> httpResponseHandler,
Duration duration
)
{
HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
httpResponse.setContent(ChannelBuffers.buffer(0));
httpResponseHandler.handleResponse(httpResponse, null);
try {
List<DataSegmentChangeRequest> changeRequests = ServerTestHelper.MAPPER.readValue(
request.getContent().array(), new TypeReference<List<DataSegmentChangeRequest>>()
{
}
);
List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> statuses = new ArrayList<>(changeRequests.size());
for (DataSegmentChangeRequest cr : changeRequests) {
cr.go(this, null);
statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(
cr,
SegmentLoadDropHandler.Status.SUCCESS
));
}
return (ListenableFuture) Futures.immediateFuture(
new ByteArrayInputStream(
ServerTestHelper.MAPPER
.writerWithType(HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF)
.writeValueAsBytes(statuses)
)
);
}
catch (Exception ex) {
throw new RE(ex, "Unexpected exception.");
}
}
@Override
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
segmentsSentToServer.add(segment);
}
@Override
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
segmentsSentToServer.add(segment);
}
}
/**
* Represents an action that can be performed on a segment by calling {@link #invoke()}.
*/
private static class QueueAction
{
final DataSegment segment;
final Consumer<DataSegment> action;
static QueueAction of(DataSegment segment, Consumer<DataSegment> action)
{
return new QueueAction(segment, action);
}
QueueAction(DataSegment segment, Consumer<DataSegment> action)
{
this.segment = segment;
this.action = action;
}
void invoke()
{
action.accept(segment);
}
}
}