blob: 9c17bab60c381fde356460e34fe6984831fdd7ca [file] [log] [blame]
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.emitter.service.ServiceEventBuilder;
import io.druid.client.DruidServer;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.metadata.MetadataRuleManager;
import io.druid.segment.IndexIO;
import io.druid.server.coordination.ServerType;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.rules.ForeverLoadRule;
import io.druid.server.coordinator.rules.IntervalDropRule;
import io.druid.server.coordinator.rules.IntervalLoadRule;
import io.druid.server.coordinator.rules.Rule;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
*/
public class DruidCoordinatorRuleRunnerTest
{
private DruidCoordinator coordinator;
private LoadQueuePeon mockPeon;
private List<DataSegment> availableSegments;
private DruidCoordinatorRuleRunner ruleRunner;
private ServiceEmitter emitter;
private MetadataRuleManager databaseRuleManager;
@Before
public void setUp()
{
coordinator = EasyMock.createMock(DruidCoordinator.class);
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
DateTime start = DateTimes.of("2012-01-01");
availableSegments = Lists.newArrayList();
for (int i = 0; i < 24; i++) {
availableSegments.add(
new DataSegment(
"test",
new Interval(start, start.plusHours(1)),
DateTimes.nowUtc().toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
1
)
);
start = start.plusHours(1);
}
ruleRunner = new DruidCoordinatorRuleRunner(new ReplicationThrottler(24, 1), coordinator);
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(coordinator);
EasyMock.verify(databaseRuleManager);
}
/**
* Nodes:
* hot - 1 replicant
* normal - 1 replicant
* cold - 1 replicant
*
* @throws Exception
*/
@Test
public void testRunThreeTiersOneReplicant() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 1)
),
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
),
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("cold", 1)
)
)).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
"hot",
0
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"normal",
Stream.of(
new ServerHolder(
new DruidServer(
"serverNorm",
"hostNorm",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"cold",
Stream.of(
new ServerHolder(
new DruidServer(
"serverCold",
"hostCold",
null,
1000,
ServerType.HISTORICAL,
"cold",
0
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(5).build())
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(6L, stats.getTieredStat("assignedCount", "hot"));
Assert.assertEquals(6L, stats.getTieredStat("assignedCount", "normal"));
Assert.assertEquals(12L, stats.getTieredStat("assignedCount", "cold"));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
exec.shutdown();
EasyMock.verify(mockPeon);
}
/**
* Nodes:
* hot - 2 replicants
* cold - 1 replicant
*
* @throws Exception
*/
@Test
public void testRunTwoTiersTwoReplicants() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 2)
),
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("cold", 1)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
"hot",
0
).toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
new DruidServer(
"serverHot2",
"hostHot2",
null,
1000,
ServerType.HISTORICAL,
"hot",
0
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"cold",
Stream.of(
new ServerHolder(
new DruidServer(
"serverCold",
"hostCold",
null,
1000,
ServerType.HISTORICAL,
"cold",
0
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(12L, stats.getTieredStat("assignedCount", "hot"));
Assert.assertEquals(18L, stats.getTieredStat("assignedCount", "cold"));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
exec.shutdown();
EasyMock.verify(mockPeon);
}
/**
* Nodes:
* hot - 1 replicant
* normal - 1 replicant
*
* @throws Exception
*/
@Test
public void testRunTwoTiersWithExistingSegments() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 1)
),
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidServer normServer = new DruidServer(
"serverNorm",
"hostNorm",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
for (DataSegment availableSegment : availableSegments) {
normServer.addDataSegment(availableSegment);
}
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
"hot",
0
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"normal",
Stream.of(
new ServerHolder(
normServer.toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(12L, stats.getTieredStat("assignedCount", "hot"));
Assert.assertEquals(0L, stats.getTieredStat("assignedCount", "normal"));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
exec.shutdown();
EasyMock.verify(mockPeon);
}
@Test
public void testRunTwoTiersTierDoesNotExist() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
EasyMock.expectLastCall().times(12);
EasyMock.replay(emitter);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 1)
),
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
Stream.of(
new ServerHolder(
new DruidServer(
"serverNorm",
"hostNorm",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withEmitter(emitter)
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.build();
ruleRunner.run(params);
exec.shutdown();
EasyMock.verify(emitter);
EasyMock.verify(mockPeon);
}
@Test
public void testRunRuleDoesNotExist() throws Exception
{
mockCoordinator();
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
EasyMock.expectLastCall().times(1);
EasyMock.replay(emitter);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-02T00:00:00.000Z/2012-01-03T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
)
)
).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
EasyMock.replay(databaseRuleManager, mockPeon);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
Stream.of(
new ServerHolder(
new DruidServer(
"serverNorm",
"hostNorm",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withEmitter(emitter)
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
ruleRunner.run(params);
EasyMock.verify(emitter, mockPeon);
}
@Test
public void testDropRemove() throws Exception
{
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
createCoordinatorDynamicConfig()
).anyTimes();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(coordinator);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
),
new IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidServer server = new DruidServer(
"serverNorm",
"hostNorm",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
for (DataSegment segment : availableSegments) {
server.addDataSegment(segment);
}
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
Stream.of(
new ServerHolder(
server.toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(12L, stats.getGlobalStat("deletedCount"));
exec.shutdown();
EasyMock.verify(coordinator);
}
@Test
public void testDropTooManyInSameTier() throws Exception
{
mockCoordinator();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
),
new IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidServer server1 = new DruidServer(
"serverNorm",
"hostNorm",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
server1.addDataSegment(availableSegments.get(0));
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment);
}
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
server2.toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "normal"));
Assert.assertEquals(12L, stats.getGlobalStat("deletedCount"));
exec.shutdown();
EasyMock.verify(mockPeon);
}
@Test
public void testDropTooManyInDifferentTiers() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 1)
),
new IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidServer server1 = new DruidServer(
"server1",
"host1",
null,
1000,
ServerType.HISTORICAL,
"hot",
0
);
server1.addDataSegment(availableSegments.get(0));
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment);
}
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"normal",
Stream.of(
new ServerHolder(
server2.toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "normal"));
Assert.assertEquals(12L, stats.getGlobalStat("deletedCount"));
exec.shutdown();
EasyMock.verify(mockPeon);
}
@Test
public void testDontDropInDifferentTiers() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 1)
),
new IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidServer server1 = new DruidServer(
"server1",
"host1",
null,
1000,
ServerType.HISTORICAL,
"hot",
0
);
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment);
}
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"normal",
Stream.of(
new ServerHolder(
server2.toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getTiers("droppedCount").isEmpty());
Assert.assertEquals(12L, stats.getGlobalStat("deletedCount"));
exec.shutdown();
EasyMock.verify(mockPeon);
}
@Test
public void testDropServerActuallyServesSegment() throws Exception
{
mockCoordinator();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 0)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidServer server1 = new DruidServer(
"server1",
"host1",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
server1.addDataSegment(availableSegments.get(0));
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
server2.addDataSegment(availableSegments.get(1));
DruidServer server3 = new DruidServer(
"serverNorm3",
"hostNorm3",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
server3.addDataSegment(availableSegments.get(1));
server3.addDataSegment(availableSegments.get(2));
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
EasyMock.replay(anotherMockPeon);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
server2.toImmutableDruidServer(),
anotherMockPeon
),
new ServerHolder(
server3.toImmutableDruidServer(),
anotherMockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "normal"));
exec.shutdown();
EasyMock.verify(mockPeon);
EasyMock.verify(anotherMockPeon);
}
/**
* Nodes:
* hot - 2 replicants
*
* @throws Exception
*/
@Test
public void testReplicantThrottle() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 2)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
"hot",
0
).toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
new DruidServer(
"serverHot2",
"hostHot2",
null,
1000,
ServerType.HISTORICAL,
"hot",
0
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(48L, stats.getTieredStat("assignedCount", "hot"));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
DataSegment overFlowSegment = new DataSegment(
"test",
Intervals.of("2012-02-01/2012-02-02"),
DateTimes.nowUtc().toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
NoneShardSpec.instance(),
1,
0
);
afterParams = ruleRunner.run(
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withEmitter(emitter)
.withAvailableSegments(Arrays.asList(overFlowSegment))
.withDatabaseRuleManager(databaseRuleManager)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build()
);
stats = afterParams.getCoordinatorStats();
Assert.assertEquals(1L, stats.getTieredStat("assignedCount", "hot"));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
EasyMock.verify(mockPeon);
exec.shutdown();
}
/**
* Nodes:
* hot - nothing loaded
* _default_tier - 1 segment loaded
*
* @throws Exception
*/
@Test
public void testReplicantThrottleAcrossTiers() throws Exception
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
CoordinatorDynamicConfig.builder()
.withReplicationThrottleLimit(7)
.withReplicantLifetime(1)
.withMaxSegmentsInNodeLoadingQueue(1000)
.build()
).atLeastOnce();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"),
ImmutableMap.<String, Integer>of(
"hot", 1,
DruidServer.DEFAULT_TIER, 1
)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
"hot",
1
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
DruidServer.DEFAULT_TIER,
Stream.of(
new ServerHolder(
new DruidServer(
"serverNorm",
"hostNorm",
null,
1000,
ServerType.HISTORICAL,
DruidServer.DEFAULT_TIER,
0
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
DruidCoordinatorRuleRunner runner = new DruidCoordinatorRuleRunner(new ReplicationThrottler(7, 1), coordinator);
DruidCoordinatorRuntimeParams afterParams = runner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(24L, stats.getTieredStat("assignedCount", "hot"));
Assert.assertEquals(7L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
EasyMock.verify(mockPeon);
exec.shutdown();
}
@Test
public void testDropReplicantThrottle() throws Exception
{
mockCoordinator();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DataSegment overFlowSegment = new DataSegment(
"test",
Intervals.of("2012-02-01/2012-02-02"),
DateTimes.nowUtc().toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
NoneShardSpec.instance(),
1,
0
);
List<DataSegment> longerAvailableSegments = Lists.newArrayList(availableSegments);
longerAvailableSegments.add(overFlowSegment);
DruidServer server1 = new DruidServer(
"serverNorm1",
"hostNorm1",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
for (DataSegment availableSegment : longerAvailableSegments) {
server1.addDataSegment(availableSegment);
}
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
for (DataSegment availableSegment : longerAvailableSegments) {
server2.addDataSegment(availableSegment);
}
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
server2.toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(longerAvailableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
// There is no throttling on drop
Assert.assertEquals(25L, stats.getTieredStat("droppedCount", "normal"));
EasyMock.verify(mockPeon);
exec.shutdown();
}
@Test
public void testRulesRunOnNonOvershadowedSegmentsOnly() throws Exception
{
Set<DataSegment> availableSegments = new HashSet<>();
DataSegment v1 = new DataSegment(
"test",
Intervals.of("2012-01-01/2012-01-02"),
"1",
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
1
);
DataSegment v2 = new DataSegment(
"test",
Intervals.of("2012-01-01/2012-01-02"),
"2",
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
1
);
availableSegments.add(v1);
availableSegments.add(v2);
mockCoordinator();
mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().once();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new ForeverLoadRule(ImmutableMap.of(DruidServer.DEFAULT_TIER, 1))
)).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
DruidServer.DEFAULT_TIER,
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
DruidServer.DEFAULT_TIER,
0
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(5).build())
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(1, stats.getTiers("assignedCount").size());
Assert.assertEquals(1, stats.getTieredStat("assignedCount", "_default_tier"));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
Assert.assertEquals(2, availableSegments.size());
Assert.assertEquals(availableSegments, params.getAvailableSegments());
Assert.assertEquals(availableSegments, afterParams.getAvailableSegments());
EasyMock.verify(mockPeon);
exec.shutdown();
}
private void mockCoordinator()
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
}
private void mockEmptyPeon()
{
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
}
private CoordinatorDynamicConfig createCoordinatorDynamicConfig()
{
return CoordinatorDynamicConfig.builder()
.withMillisToWaitBeforeDeleting(0)
.withMergeBytesLimit(0)
.withMergeSegmentsLimit(0)
.withMaxSegmentsToMove(0)
.withReplicantLifetime(1)
.withReplicationThrottleLimit(24)
.withBalancerComputeThreads(0)
.withEmitBalancingStats(false)
.withKillDataSourceWhitelist(null)
.withKillAllDataSources(false)
.withMaxSegmentsInNodeLoadingQueue(1000)
.build();
}
}