bug fix for dropping segments in master
diff --git a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java
index 541d534..9c73405 100644
--- a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java
+++ b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java
@@ -43,8 +43,8 @@
public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>>
{
public static final Interval MY_Y2K_INTERVAL = new Interval(
- new DateTime(Long.MIN_VALUE),
- new DateTime(Long.MAX_VALUE)
+ new DateTime("0000-01-01"),
+ new DateTime("9000-01-01")
);
public static final String MAX_TIME = "maxTime";
public static final String MIN_TIME = "minTime";
diff --git a/server/src/main/java/com/metamx/druid/master/ServerHolder.java b/server/src/main/java/com/metamx/druid/master/ServerHolder.java
index 6cf4d65..82c31fd 100644
--- a/server/src/main/java/com/metamx/druid/master/ServerHolder.java
+++ b/server/src/main/java/com/metamx/druid/master/ServerHolder.java
@@ -94,9 +94,14 @@
return availableSize;
}
+ public boolean isServingSegment(DataSegment segment)
+ {
+ return (server.getSegment(segment.getIdentifier()) != null);
+ }
+
public boolean containsSegment(DataSegment segment)
{
- return (server.getSegment(segment.getIdentifier()) != null || peon.getSegmentsToLoad().contains(segment));
+ return isServingSegment(segment) || peon.getSegmentsToLoad().contains(segment);
}
@Override
diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
index b3d256d..f3b5de8 100644
--- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
+++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
@@ -163,23 +163,25 @@
while (actualNumReplicantsForType > expectedNumReplicantsForType) {
ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
- log.warn("Wtf, holder was null? Do I have no servers[%s]?", serverQueue);
- continue;
+ log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
+ break;
}
- holder.getPeon().dropSegment(
- segment,
- new LoadPeonCallback()
- {
- @Override
- protected void execute()
+ if (holder.isServingSegment(segment)) {
+ holder.getPeon().dropSegment(
+ segment,
+ new LoadPeonCallback()
{
+ @Override
+ protected void execute()
+ {
+ }
}
- }
- );
+ );
+ --actualNumReplicantsForType;
+ stats.addToTieredStat("droppedCount", tier, 1);
+ }
droppedServers.add(holder);
- --actualNumReplicantsForType;
- stats.addToTieredStat("droppedCount", tier, 1);
}
serverQueue.addAll(droppedServers);
}
diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java
index c31dc67..acde8a1 100644
--- a/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java
+++ b/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java
@@ -31,7 +31,6 @@
import com.metamx.druid.master.rules.IntervalDropRule;
import com.metamx.druid.master.rules.IntervalLoadRule;
import com.metamx.druid.master.rules.Rule;
-import com.metamx.druid.master.rules.RuleMap;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
@@ -86,14 +85,6 @@
}
ruleRunner = new DruidMasterRuleRunner(master);
-
- mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
- EasyMock.expectLastCall().anyTimes();
- mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
- EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).anyTimes();
- EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
- EasyMock.replay(mockPeon);
}
@After
@@ -113,6 +104,12 @@
@Test
public void testRunThreeTiersOneReplicant() throws Exception
{
+ mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 1, "hot"),
@@ -202,6 +199,12 @@
@Test
public void testRunTwoTiersTwoReplicants() throws Exception
{
+ mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 2, "hot"),
@@ -284,6 +287,12 @@
@Test
public void testRunTwoTiersWithExistingSegments() throws Exception
{
+ mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
@@ -356,6 +365,12 @@
@Test
public void testRunTwoTiersTierDoesNotExist() throws Exception
{
+ mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
EasyMock.expectLastCall().times(12);
EasyMock.replay(emitter);
@@ -455,6 +470,12 @@
@Test
public void testDropRemove() throws Exception
{
+ mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
master.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(master);
@@ -513,6 +534,12 @@
@Test
public void testDropTooManyInSameTier() throws Exception
{
+ mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
@@ -581,6 +608,14 @@
@Test
public void testDropTooManyInDifferentTiers() throws Exception
{
+ mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
@@ -653,6 +688,14 @@
@Test
public void testDontDropInDifferentTiers() throws Exception
{
+ mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
@@ -717,4 +760,92 @@
Assert.assertTrue(stats.getPerTierStats().get("droppedCount") == null);
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
}
+
+ @Test
+ public void testDropServerActuallyServesSegment() throws Exception
+ {
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
+ Lists.<Rule>newArrayList(
+ new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), 0, "normal")
+ )
+ ).atLeastOnce();
+ EasyMock.replay(databaseRuleManager);
+
+ DruidServer server1 = new DruidServer(
+ "server1",
+ "host1",
+ 1000,
+ "historical",
+ "normal"
+ );
+ server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
+ DruidServer server2 = new DruidServer(
+ "serverNorm2",
+ "hostNorm2",
+ 1000,
+ "historical",
+ "normal"
+ );
+ server2.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
+ DruidServer server3 = new DruidServer(
+ "serverNorm3",
+ "hostNorm3",
+ 1000,
+ "historical",
+ "normal"
+ );
+ server3.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
+ server3.addDataSegment(availableSegments.get(2).getIdentifier(), availableSegments.get(2));
+
+ mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
+ LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
+ EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+ EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
+ EasyMock.replay(anotherMockPeon);
+
+ DruidCluster druidCluster = new DruidCluster(
+ ImmutableMap.of(
+ "normal",
+ MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
+ Arrays.asList(
+ new ServerHolder(
+ server1,
+ mockPeon
+ ),
+ new ServerHolder(
+ server2,
+ anotherMockPeon
+ ),
+ new ServerHolder(
+ server3,
+ anotherMockPeon
+ )
+ )
+ )
+ )
+ );
+
+ SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
+
+ DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
+ .withDruidCluster(druidCluster)
+ .withMillisToWaitBeforeDeleting(0L)
+ .withAvailableSegments(availableSegments)
+ .withDatabaseRuleManager(databaseRuleManager)
+ .withSegmentReplicantLookup(segmentReplicantLookup)
+ .build();
+
+ DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
+ MasterStats stats = afterParams.getMasterStats();
+
+ Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1);
+
+ EasyMock.verify(mockPeon);
+ EasyMock.verify(anotherMockPeon);
+ }
}