blob: e889968bb30386df14f55d2248e34560f8595b44 [file] [log] [blame]
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.curator.CuratorTestBase;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.segment.TestHelper;
import io.druid.server.coordination.DataSegmentChangeCallback;
import io.druid.server.coordination.DataSegmentChangeHandler;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.SegmentChangeRequestDrop;
import io.druid.server.coordination.SegmentChangeRequestLoad;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class LoadQueuePeonTest extends CuratorTestBase
{
private static final String LOAD_QUEUE_PATH = "/druid/loadqueue/localhost:1234";
private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
private LoadQueuePeon loadQueuePeon;
private PathChildrenCache loadQueueCache;
@Before
public void setUp() throws Exception
{
setupServerAndCurator();
curator.start();
curator.blockUntilConnected();
curator.create().creatingParentsIfNeeded().forPath(LOAD_QUEUE_PATH);
loadQueueCache = new PathChildrenCache(
curator,
LOAD_QUEUE_PATH,
true,
true,
Execs.singleThreaded("load_queue_cache-%d")
);
}
@Test
public void testMultipleLoadDropSegments() throws Exception
{
final AtomicInteger requestSignalIdx = new AtomicInteger(0);
final AtomicInteger segmentSignalIdx = new AtomicInteger(0);
loadQueuePeon = new CuratorLoadQueuePeon(
curator,
LOAD_QUEUE_PATH,
jsonMapper,
Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
Execs.singleThreaded("test_load_queue_peon-%d"),
new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO)
);
loadQueuePeon.start();
final CountDownLatch[] loadRequestSignal = new CountDownLatch[5];
final CountDownLatch[] dropRequestSignal = new CountDownLatch[5];
final CountDownLatch[] segmentLoadedSignal = new CountDownLatch[5];
final CountDownLatch[] segmentDroppedSignal = new CountDownLatch[5];
for (int i = 0; i < 5; ++i) {
loadRequestSignal[i] = new CountDownLatch(1);
dropRequestSignal[i] = new CountDownLatch(1);
segmentLoadedSignal[i] = new CountDownLatch(1);
segmentDroppedSignal[i] = new CountDownLatch(1);
}
final DataSegmentChangeHandler handler = new DataSegmentChangeHandler()
{
@Override
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
loadRequestSignal[requestSignalIdx.get()].countDown();
}
@Override
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
dropRequestSignal[requestSignalIdx.get()].countDown();
}
};
final List<DataSegment> segmentToDrop = Lists.transform(
ImmutableList.<String>of(
"2014-10-26T00:00:00Z/P1D",
"2014-10-25T00:00:00Z/P1D",
"2014-10-24T00:00:00Z/P1D",
"2014-10-23T00:00:00Z/P1D",
"2014-10-22T00:00:00Z/P1D"
), new Function<String, DataSegment>()
{
@Override
public DataSegment apply(String intervalStr)
{
return dataSegmentWithInterval(intervalStr);
}
}
);
final List<DataSegment> segmentToLoad = Lists.transform(
ImmutableList.<String>of(
"2014-10-27T00:00:00Z/P1D",
"2014-10-29T00:00:00Z/P1M",
"2014-10-31T00:00:00Z/P1D",
"2014-10-30T00:00:00Z/P1D",
"2014-10-28T00:00:00Z/P1D"
), new Function<String, DataSegment>()
{
@Override
public DataSegment apply(String intervalStr)
{
return dataSegmentWithInterval(intervalStr);
}
}
);
// segment with latest interval should be loaded first
final List<DataSegment> expectedLoadOrder = Lists.transform(
ImmutableList.<String>of(
"2014-10-29T00:00:00Z/P1M",
"2014-10-31T00:00:00Z/P1D",
"2014-10-30T00:00:00Z/P1D",
"2014-10-28T00:00:00Z/P1D",
"2014-10-27T00:00:00Z/P1D"
), new Function<String, DataSegment>()
{
@Override
public DataSegment apply(String intervalStr)
{
return dataSegmentWithInterval(intervalStr);
}
}
);
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(
CuratorFramework client, PathChildrenCacheEvent event
) throws Exception
{
if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
DataSegmentChangeRequest request = jsonMapper.readValue(
event.getData().getData(),
DataSegmentChangeRequest.class
);
request.go(handler, null);
}
}
}
);
loadQueueCache.start();
for (DataSegment segment : segmentToDrop) {
loadQueuePeon.dropSegment(
segment, new LoadPeonCallback()
{
@Override
public void execute()
{
segmentDroppedSignal[segmentSignalIdx.get()].countDown();
}
}
);
}
for (DataSegment segment : segmentToLoad) {
loadQueuePeon.loadSegment(
segment, new LoadPeonCallback()
{
@Override
public void execute()
{
segmentLoadedSignal[segmentSignalIdx.get()].countDown();
}
}
);
}
Assert.assertEquals(6000, loadQueuePeon.getLoadQueueSize());
Assert.assertEquals(5, loadQueuePeon.getSegmentsToLoad().size());
Assert.assertEquals(5, loadQueuePeon.getSegmentsToDrop().size());
for (DataSegment segment : segmentToDrop) {
String dropRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getIdentifier());
Assert.assertTrue(timing.forWaiting().awaitLatch(dropRequestSignal[requestSignalIdx.get()]));
Assert.assertNotNull(curator.checkExists().forPath(dropRequestPath));
Assert.assertEquals(
segment,
((SegmentChangeRequestDrop) jsonMapper.readValue(
curator.getData()
.decompressed()
.forPath(dropRequestPath), DataSegmentChangeRequest.class
)).getSegment()
);
if (requestSignalIdx.get() == 4) {
requestSignalIdx.set(0);
} else {
requestSignalIdx.incrementAndGet();
}
// simulate completion of drop request by historical
curator.delete().guaranteed().forPath(dropRequestPath);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentDroppedSignal[segmentSignalIdx.get()]));
int expectedNumSegmentToDrop = 5 - segmentSignalIdx.get() - 1;
Assert.assertEquals(expectedNumSegmentToDrop, loadQueuePeon.getSegmentsToDrop().size());
if (segmentSignalIdx.get() == 4) {
segmentSignalIdx.set(0);
} else {
segmentSignalIdx.incrementAndGet();
}
}
for (DataSegment segment : expectedLoadOrder) {
String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getIdentifier());
Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal[requestSignalIdx.get()]));
Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath));
Assert.assertEquals(
segment, ((SegmentChangeRequestLoad) jsonMapper.readValue(
curator.getData()
.decompressed()
.forPath(loadRequestPath),
DataSegmentChangeRequest.class
)).getSegment()
);
requestSignalIdx.incrementAndGet();
// simulate completion of load request by historical
curator.delete().guaranteed().forPath(loadRequestPath);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal[segmentSignalIdx.get()]));
int expectedNumSegmentToLoad = 5 - segmentSignalIdx.get() - 1;
Assert.assertEquals(1200 * expectedNumSegmentToLoad, loadQueuePeon.getLoadQueueSize());
Assert.assertEquals(expectedNumSegmentToLoad, loadQueuePeon.getSegmentsToLoad().size());
segmentSignalIdx.incrementAndGet();
}
}
@Test
public void testFailAssign() throws Exception
{
final DataSegment segment = dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D");
final CountDownLatch loadRequestSignal = new CountDownLatch(1);
final CountDownLatch segmentLoadedSignal = new CountDownLatch(1);
loadQueuePeon = new CuratorLoadQueuePeon(
curator,
LOAD_QUEUE_PATH,
jsonMapper,
Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
Execs.singleThreaded("test_load_queue_peon-%d"),
// set time-out to 1 ms so that LoadQueuePeon will fail the assignment quickly
new TestDruidCoordinatorConfig(null, null, null, new Duration(1), null, null, 10, null, false, false, new Duration("PT1s"))
);
loadQueuePeon.start();
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(
CuratorFramework client, PathChildrenCacheEvent event
) throws Exception
{
if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
loadRequestSignal.countDown();
}
}
}
);
loadQueueCache.start();
loadQueuePeon.loadSegment(
segment, new LoadPeonCallback()
{
@Override
public void execute()
{
segmentLoadedSignal.countDown();
}
}
);
String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getIdentifier());
Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal));
Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath));
Assert.assertEquals(
segment, ((SegmentChangeRequestLoad) jsonMapper.readValue(
curator.getData()
.decompressed()
.forPath(loadRequestPath),
DataSegmentChangeRequest.class
)).getSegment()
);
// don't simulate completion of load request here
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal));
Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size());
Assert.assertEquals(0L, loadQueuePeon.getLoadQueueSize());
}
private DataSegment dataSegmentWithInterval(String intervalStr)
{
return DataSegment.builder()
.dataSource("test_load_queue_peon")
.interval(Intervals.of(intervalStr))
.loadSpec(ImmutableMap.<String, Object>of())
.version("2015-05-27T03:38:35.683Z")
.dimensions(ImmutableList.<String>of())
.metrics(ImmutableList.<String>of())
.shardSpec(NoneShardSpec.instance())
.binaryVersion(9)
.size(1200)
.build();
}
@After
public void tearDown() throws Exception
{
loadQueueCache.close();
loadQueuePeon.stop();
tearDownServerAndCurator();
}
}