Merge branch 'master' of github.com:metamx/druid
diff --git a/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java b/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java
index 3b3e90f..8574314 100644
--- a/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java
+++ b/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java
@@ -19,12 +19,13 @@
 
 package com.metamx.druid.master;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.metamx.emitter.EmittingLogger;
 
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * The ReplicationThrottler is used to throttle the number of replicants that are created and destroyed.
@@ -61,17 +62,20 @@
     int size = holder.getNumProcessing(tier);
     if (size != 0) {
       log.info(
-          "[%s]: Replicant %s queue still has %d segments. Lifetime[%d]",
+          "[%s]: Replicant %s queue still has %d segments. Lifetime[%d]. Segments %s",
           tier,
           type,
           size,
-          holder.getLifetime(tier)
+          holder.getLifetime(tier),
+          holder.getCurrentlyProcessingSegmentsAndHosts(tier)
       );
       holder.reduceLifetime(tier);
       lookup.put(tier, false);
 
       if (holder.getLifetime(tier) < 0) {
-        log.makeAlert("[%s]: Replicant %s queue stuck after %d+ runs!", tier, type, maxLifetime).emit();
+        log.makeAlert("[%s]: Replicant %s queue stuck after %d+ runs!", tier, type, maxLifetime)
+           .addData("segments", holder.getCurrentlyProcessingSegmentsAndHosts(tier))
+           .emit();
       }
     } else {
       log.info("[%s]: Replicant %s queue is empty.", tier, type);
@@ -90,49 +94,49 @@
     return terminatingLookup.get(tier);
   }
 
-  public boolean registerReplicantCreation(String tier, String segmentId)
+  public boolean registerReplicantCreation(String tier, String segmentId, String serverId)
   {
-    return currentlyReplicating.addSegment(tier, segmentId);
+    return currentlyReplicating.addSegment(tier, segmentId, serverId);
   }
 
-  public void unregisterReplicantCreation(String tier, String segmentId)
+  public void unregisterReplicantCreation(String tier, String segmentId, String serverId)
   {
-    currentlyReplicating.removeSegment(tier, segmentId);
+    currentlyReplicating.removeSegment(tier, segmentId, serverId);
   }
 
-  public boolean registerReplicantTermination(String tier, String segmentId)
+  public boolean registerReplicantTermination(String tier, String segmentId, String serverId)
   {
-    return currentlyTerminating.addSegment(tier, segmentId);
+    return currentlyTerminating.addSegment(tier, segmentId, serverId);
   }
 
-  public void unregisterReplicantTermination(String tier, String segmentId)
+  public void unregisterReplicantTermination(String tier, String segmentId, String serverId)
   {
-    currentlyTerminating.removeSegment(tier, segmentId);
+    currentlyTerminating.removeSegment(tier, segmentId, serverId);
   }
 
   private class ReplicatorSegmentHolder
   {
-    private final Map<String, ConcurrentSkipListSet<String>> currentlyProcessingSegments = Maps.newHashMap();
+    private final Map<String, ConcurrentHashMap<String, String>> currentlyProcessingSegments = Maps.newHashMap();
     private final Map<String, Integer> lifetimes = Maps.newHashMap();
 
-    public boolean addSegment(String tier, String segmentId)
+    public boolean addSegment(String tier, String segmentId, String serverId)
     {
-      ConcurrentSkipListSet<String> segments = currentlyProcessingSegments.get(tier);
+      ConcurrentHashMap<String, String> segments = currentlyProcessingSegments.get(tier);
       if (segments == null) {
-        segments = new ConcurrentSkipListSet<String>();
+        segments = new ConcurrentHashMap<String, String>();
         currentlyProcessingSegments.put(tier, segments);
       }
       if (segments.size() < maxReplicants) {
-        segments.add(segmentId);
+        segments.put(segmentId, serverId);
         return true;
       }
 
       return false;
     }
 
-    public void removeSegment(String tier, String segmentId)
+    public void removeSegment(String tier, String segmentId, String serverId)
     {
-      Set<String> segments = currentlyProcessingSegments.get(tier);
+      Map<String, String> segments = currentlyProcessingSegments.get(tier);
       if (segments != null) {
         segments.remove(segmentId);
       }
@@ -140,7 +144,7 @@
 
     public int getNumProcessing(String tier)
     {
-      Set<String> segments = currentlyProcessingSegments.get(tier);
+      Map<String, String> segments = currentlyProcessingSegments.get(tier);
       return (segments == null) ? 0 : segments.size();
     }
 
@@ -168,5 +172,17 @@
     {
       lifetimes.put(tier, maxLifetime);
     }
+
+    public List<String> getCurrentlyProcessingSegmentsAndHosts(String tier)
+    {
+      Map<String, String> segments = currentlyProcessingSegments.get(tier);
+      List<String> retVal = Lists.newArrayList();
+      for (Map.Entry<String, String> entry : segments.entrySet()) {
+        retVal.add(
+            String.format("%s ON %s", entry.getKey(), entry.getValue())
+        );
+      }
+      return retVal;
+    }
   }
 }
diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
index a7b3b28..2f1819c 100644
--- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
+++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
@@ -74,7 +74,7 @@
 
     List<ServerHolder> assignedServers = Lists.newArrayList();
     while (totalReplicants < expectedReplicants) {
-      ServerHolder holder = serverQueue.pollFirst();
+      final ServerHolder holder = serverQueue.pollFirst();
       if (holder == null) {
         log.warn(
             "Not enough %s servers[%d] to assign segment[%s]! Expected Replicants[%d]",
@@ -113,7 +113,11 @@
 
       if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
         if (!replicationManager.canAddReplicant(getTier()) ||
-            !replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) {
+            !replicationManager.registerReplicantCreation(
+                getTier(),
+                segment.getIdentifier(),
+                holder.getServer().getHost()
+            )) {
           serverQueue.add(holder);
           break;
         }
@@ -126,7 +130,11 @@
             @Override
             protected void execute()
             {
-              replicationManager.unregisterReplicantCreation(getTier(), segment.getIdentifier());
+              replicationManager.unregisterReplicantCreation(
+                  getTier(),
+                  segment.getIdentifier(),
+                  holder.getServer().getHost()
+              );
             }
           }
       );
@@ -174,21 +182,25 @@
 
       List<ServerHolder> droppedServers = Lists.newArrayList();
       while (actualNumReplicantsForType > expectedNumReplicantsForType) {
-        ServerHolder holder = serverQueue.pollLast();
+        final ServerHolder holder = serverQueue.pollLast();
         if (holder == null) {
           log.warn("Wtf, holder was null?  I have no servers serving [%s]?", segment.getIdentifier());
           break;
         }
 
-        if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
-          if (!replicationManager.canDestroyReplicant(getTier()) ||
-              !replicationManager.registerReplicantTermination(getTier(), segment.getIdentifier())) {
-            serverQueue.add(holder);
-            break;
-          }
-        }
-
         if (holder.isServingSegment(segment)) {
+          if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
+            if (!replicationManager.canDestroyReplicant(getTier()) ||
+                !replicationManager.registerReplicantTermination(
+                    getTier(),
+                    segment.getIdentifier(),
+                    holder.getServer().getHost()
+                )) {
+              serverQueue.add(holder);
+              break;
+            }
+          }
+
           holder.getPeon().dropSegment(
               segment,
               new LoadPeonCallback()
@@ -196,7 +208,11 @@
                 @Override
                 protected void execute()
                 {
-                  replicationManager.unregisterReplicantTermination(getTier(), segment.getIdentifier());
+                  replicationManager.unregisterReplicantTermination(
+                      getTier(),
+                      segment.getIdentifier(),
+                      holder.getServer().getHost()
+                  );
                 }
               }
           );