Merge pull request #2243 from metamx/backport2240
[BACKPORT 2240]Fix loadRule when one of the tiers had no servers
diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java
index 57a59e9..57076d0 100644
--- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java
+++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java
@@ -65,7 +65,7 @@
final MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
if (serverQueue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
- return stats;
+ continue;
}
final List<ServerHolder> serverHolderList = Lists.newArrayList(serverQueue);
@@ -190,7 +190,7 @@
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().get(tier);
if (serverQueue == null) {
log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit();
- return stats;
+ continue;
}
List<ServerHolder> droppedServers = Lists.newArrayList();
diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java
index 6f6921e..9d393bb 100644
--- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java
+++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java
@@ -17,13 +17,19 @@
package io.druid.server.coordinator.rules;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
+import com.metamx.common.logger.Logger;
+import com.metamx.emitter.EmittingLogger;
+import com.metamx.emitter.core.LoggingEmitter;
+import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DruidServer;
+import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
@@ -49,13 +55,29 @@
*/
public class LoadRuleTest
{
+ private static final Logger log = new Logger(LoadRuleTest.class);
+ private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
+
+ private static final ServiceEmitter emitter = new ServiceEmitter(
+ "service",
+ "host",
+ new LoggingEmitter(
+ log,
+ LoggingEmitter.Level.ERROR,
+ jsonMapper
+ )
+ );
+
private LoadQueuePeon mockPeon;
private ReplicationThrottler throttler;
private DataSegment segment;
+
@Before
public void setUp() throws Exception
{
+ EmittingLogger.registerEmitter(emitter);
+ emitter.start();
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
throttler = new ReplicationThrottler(2, 1);
for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) {
@@ -281,4 +303,182 @@
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1);
}
+
+ @Test
+ public void testLoadWithNonExistentTier() throws Exception
+ {
+ mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
+ LoadRule rule = new LoadRule()
+ {
+ private final Map<String, Integer> tiers = ImmutableMap.of(
+ "nonExistentTier", 1,
+ "hot", 1
+ );
+
+ @Override
+ public Map<String, Integer> getTieredReplicants()
+ {
+ return tiers;
+ }
+
+ @Override
+ public int getNumReplicants(String tier)
+ {
+ return tiers.get(tier);
+ }
+
+ @Override
+ public String getType()
+ {
+ return "test";
+ }
+
+ @Override
+ public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
+ {
+ return true;
+ }
+
+ @Override
+ public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
+ {
+ return true;
+ }
+ };
+
+ DruidCluster druidCluster = new DruidCluster(
+ ImmutableMap.of(
+ "hot",
+ MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
+ Arrays.asList(
+ new ServerHolder(
+ new DruidServer(
+ "serverHot",
+ "hostHot",
+ 1000,
+ "historical",
+ "hot",
+ 0
+ ).toImmutableDruidServer(),
+ mockPeon
+ )
+ )
+ )
+ )
+ );
+
+ CoordinatorStats stats = rule.run(
+ null,
+ DruidCoordinatorRuntimeParams.newBuilder()
+ .withDruidCluster(druidCluster)
+ .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
+ .withReplicationManager(throttler)
+ .withAvailableSegments(Arrays.asList(segment)).build(),
+ segment
+ );
+
+ Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
+ }
+
+ @Test
+ public void testDropWithNonExistentTier() throws Exception
+ {
+ mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
+ EasyMock.replay(mockPeon);
+
+ LoadRule rule = new LoadRule()
+ {
+ private final Map<String, Integer> tiers = ImmutableMap.of(
+ "nonExistentTier", 1,
+ "hot", 1
+ );
+
+ @Override
+ public Map<String, Integer> getTieredReplicants()
+ {
+ return tiers;
+ }
+
+ @Override
+ public int getNumReplicants(String tier)
+ {
+ return tiers.get(tier);
+ }
+
+ @Override
+ public String getType()
+ {
+ return "test";
+ }
+
+ @Override
+ public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
+ {
+ return true;
+ }
+
+ @Override
+ public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
+ {
+ return true;
+ }
+ };
+
+ DruidServer server1 = new DruidServer(
+ "serverHot",
+ "hostHot",
+ 1000,
+ "historical",
+ "hot",
+ 0
+ );
+ DruidServer server2 = new DruidServer(
+ "serverHo2t",
+ "hostHot2",
+ 1000,
+ "historical",
+ "hot",
+ 0
+ );
+ server1.addDataSegment(segment.getIdentifier(), segment);
+ server2.addDataSegment(segment.getIdentifier(), segment);
+
+ DruidCluster druidCluster = new DruidCluster(
+ ImmutableMap.of(
+ "hot",
+ MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
+ Arrays.asList(
+ new ServerHolder(
+ server1.toImmutableDruidServer(),
+ mockPeon
+ ),
+ new ServerHolder(
+ server2.toImmutableDruidServer(),
+ mockPeon
+ )
+ )
+ )
+ )
+ );
+
+ CoordinatorStats stats = rule.run(
+ null,
+ DruidCoordinatorRuntimeParams.newBuilder()
+ .withDruidCluster(druidCluster)
+ .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
+ .withReplicationManager(throttler)
+ .withAvailableSegments(Arrays.asList(segment)).build(),
+ segment
+ );
+
+ Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
+ }
}