Merge pull request #2243 from metamx/backport2240

[BACKPORT 2240]Fix loadRule when one of the tiers had no servers
diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java
index 57a59e9..57076d0 100644
--- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java
+++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java
@@ -65,7 +65,7 @@
       final MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
       if (serverQueue == null) {
         log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
-        return stats;
+        continue;
       }
 
       final List<ServerHolder> serverHolderList = Lists.newArrayList(serverQueue);
@@ -190,7 +190,7 @@
       MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().get(tier);
       if (serverQueue == null) {
         log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit();
-        return stats;
+        continue;
       }
 
       List<ServerHolder> droppedServers = Lists.newArrayList();
diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java
index 6f6921e..9d393bb 100644
--- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java
+++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java
@@ -17,13 +17,19 @@
 
 package io.druid.server.coordinator.rules;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.MinMaxPriorityQueue;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
+import com.metamx.common.logger.Logger;
+import com.metamx.emitter.EmittingLogger;
+import com.metamx.emitter.core.LoggingEmitter;
+import com.metamx.emitter.service.ServiceEmitter;
 import io.druid.client.DruidServer;
+import io.druid.jackson.DefaultObjectMapper;
 import io.druid.server.coordinator.CoordinatorStats;
 import io.druid.server.coordinator.DruidCluster;
 import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
@@ -49,13 +55,29 @@
  */
 public class LoadRuleTest
 {
+  private static final Logger log = new Logger(LoadRuleTest.class);
+  private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
+
+  private static final ServiceEmitter emitter = new ServiceEmitter(
+      "service",
+      "host",
+      new LoggingEmitter(
+          log,
+          LoggingEmitter.Level.ERROR,
+          jsonMapper
+      )
+  );
+
   private LoadQueuePeon mockPeon;
   private ReplicationThrottler throttler;
   private DataSegment segment;
 
+
   @Before
   public void setUp() throws Exception
   {
+    EmittingLogger.registerEmitter(emitter);
+    emitter.start();
     mockPeon = EasyMock.createMock(LoadQueuePeon.class);
     throttler = new ReplicationThrottler(2, 1);
     for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) {
@@ -281,4 +303,182 @@
     Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
     Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1);
   }
+
+  @Test
+  public void testLoadWithNonExistentTier() throws Exception
+  {
+    mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+    EasyMock.expectLastCall().atLeastOnce();
+    EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+    EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+    EasyMock.replay(mockPeon);
+
+    LoadRule rule = new LoadRule()
+    {
+      private final Map<String, Integer> tiers = ImmutableMap.of(
+          "nonExistentTier", 1,
+          "hot", 1
+      );
+
+      @Override
+      public Map<String, Integer> getTieredReplicants()
+      {
+        return tiers;
+      }
+
+      @Override
+      public int getNumReplicants(String tier)
+      {
+        return tiers.get(tier);
+      }
+
+      @Override
+      public String getType()
+      {
+        return "test";
+      }
+
+      @Override
+      public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
+      {
+        return true;
+      }
+
+      @Override
+      public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
+      {
+        return true;
+      }
+    };
+
+    DruidCluster druidCluster = new DruidCluster(
+        ImmutableMap.of(
+            "hot",
+            MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
+                Arrays.asList(
+                    new ServerHolder(
+                        new DruidServer(
+                            "serverHot",
+                            "hostHot",
+                            1000,
+                            "historical",
+                            "hot",
+                            0
+                        ).toImmutableDruidServer(),
+                        mockPeon
+                    )
+                )
+            )
+        )
+    );
+
+    CoordinatorStats stats = rule.run(
+        null,
+        DruidCoordinatorRuntimeParams.newBuilder()
+                                     .withDruidCluster(druidCluster)
+                                     .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
+                                     .withReplicationManager(throttler)
+                                     .withAvailableSegments(Arrays.asList(segment)).build(),
+        segment
+    );
+
+    Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
+  }
+
+  @Test
+  public void testDropWithNonExistentTier() throws Exception
+  {
+    mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
+    EasyMock.expectLastCall().atLeastOnce();
+    EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
+    EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
+    EasyMock.replay(mockPeon);
+
+    LoadRule rule = new LoadRule()
+    {
+      private final Map<String, Integer> tiers = ImmutableMap.of(
+          "nonExistentTier", 1,
+          "hot", 1
+      );
+
+      @Override
+      public Map<String, Integer> getTieredReplicants()
+      {
+        return tiers;
+      }
+
+      @Override
+      public int getNumReplicants(String tier)
+      {
+        return tiers.get(tier);
+      }
+
+      @Override
+      public String getType()
+      {
+        return "test";
+      }
+
+      @Override
+      public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
+      {
+        return true;
+      }
+
+      @Override
+      public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
+      {
+        return true;
+      }
+    };
+
+    DruidServer server1 = new DruidServer(
+        "serverHot",
+        "hostHot",
+        1000,
+        "historical",
+        "hot",
+        0
+    );
+    DruidServer server2 = new DruidServer(
+        "serverHo2t",
+        "hostHot2",
+        1000,
+        "historical",
+        "hot",
+        0
+    );
+    server1.addDataSegment(segment.getIdentifier(), segment);
+    server2.addDataSegment(segment.getIdentifier(), segment);
+
+    DruidCluster druidCluster = new DruidCluster(
+        ImmutableMap.of(
+            "hot",
+            MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
+                Arrays.asList(
+                    new ServerHolder(
+                        server1.toImmutableDruidServer(),
+                        mockPeon
+                    ),
+                    new ServerHolder(
+                        server2.toImmutableDruidServer(),
+                        mockPeon
+                    )
+                )
+            )
+        )
+    );
+
+    CoordinatorStats stats = rule.run(
+        null,
+        DruidCoordinatorRuntimeParams.newBuilder()
+                                     .withDruidCluster(druidCluster)
+                                     .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
+                                     .withReplicationManager(throttler)
+                                     .withAvailableSegments(Arrays.asList(segment)).build(),
+        segment
+    );
+
+    Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
+  }
 }