Merge branch 'master' of github.com:metamx/druid
diff --git a/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java b/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java
index 3b3e90f..8574314 100644
--- a/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java
+++ b/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java
@@ -19,12 +19,13 @@
package com.metamx.druid.master;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.emitter.EmittingLogger;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ConcurrentHashMap;
/**
* The ReplicationThrottler is used to throttle the number of replicants that are created and destroyed.
@@ -61,17 +62,20 @@
int size = holder.getNumProcessing(tier);
if (size != 0) {
log.info(
- "[%s]: Replicant %s queue still has %d segments. Lifetime[%d]",
+ "[%s]: Replicant %s queue still has %d segments. Lifetime[%d]. Segments %s",
tier,
type,
size,
- holder.getLifetime(tier)
+ holder.getLifetime(tier),
+ holder.getCurrentlyProcessingSegmentsAndHosts(tier)
);
holder.reduceLifetime(tier);
lookup.put(tier, false);
if (holder.getLifetime(tier) < 0) {
- log.makeAlert("[%s]: Replicant %s queue stuck after %d+ runs!", tier, type, maxLifetime).emit();
+ log.makeAlert("[%s]: Replicant %s queue stuck after %d+ runs!", tier, type, maxLifetime)
+ .addData("segments", holder.getCurrentlyProcessingSegmentsAndHosts(tier))
+ .emit();
}
} else {
log.info("[%s]: Replicant %s queue is empty.", tier, type);
@@ -90,49 +94,49 @@
return terminatingLookup.get(tier);
}
- public boolean registerReplicantCreation(String tier, String segmentId)
+ public boolean registerReplicantCreation(String tier, String segmentId, String serverId)
{
- return currentlyReplicating.addSegment(tier, segmentId);
+ return currentlyReplicating.addSegment(tier, segmentId, serverId);
}
- public void unregisterReplicantCreation(String tier, String segmentId)
+ public void unregisterReplicantCreation(String tier, String segmentId, String serverId)
{
- currentlyReplicating.removeSegment(tier, segmentId);
+ currentlyReplicating.removeSegment(tier, segmentId, serverId);
}
- public boolean registerReplicantTermination(String tier, String segmentId)
+ public boolean registerReplicantTermination(String tier, String segmentId, String serverId)
{
- return currentlyTerminating.addSegment(tier, segmentId);
+ return currentlyTerminating.addSegment(tier, segmentId, serverId);
}
- public void unregisterReplicantTermination(String tier, String segmentId)
+ public void unregisterReplicantTermination(String tier, String segmentId, String serverId)
{
- currentlyTerminating.removeSegment(tier, segmentId);
+ currentlyTerminating.removeSegment(tier, segmentId, serverId);
}
private class ReplicatorSegmentHolder
{
- private final Map<String, ConcurrentSkipListSet<String>> currentlyProcessingSegments = Maps.newHashMap();
+ private final Map<String, ConcurrentHashMap<String, String>> currentlyProcessingSegments = Maps.newHashMap();
private final Map<String, Integer> lifetimes = Maps.newHashMap();
- public boolean addSegment(String tier, String segmentId)
+ public boolean addSegment(String tier, String segmentId, String serverId)
{
- ConcurrentSkipListSet<String> segments = currentlyProcessingSegments.get(tier);
+ ConcurrentHashMap<String, String> segments = currentlyProcessingSegments.get(tier);
if (segments == null) {
- segments = new ConcurrentSkipListSet<String>();
+ segments = new ConcurrentHashMap<String, String>();
currentlyProcessingSegments.put(tier, segments);
}
if (segments.size() < maxReplicants) {
- segments.add(segmentId);
+ segments.put(segmentId, serverId);
return true;
}
return false;
}
- public void removeSegment(String tier, String segmentId)
+ public void removeSegment(String tier, String segmentId, String serverId)
{
- Set<String> segments = currentlyProcessingSegments.get(tier);
+ Map<String, String> segments = currentlyProcessingSegments.get(tier);
if (segments != null) {
segments.remove(segmentId);
}
@@ -140,7 +144,7 @@
public int getNumProcessing(String tier)
{
- Set<String> segments = currentlyProcessingSegments.get(tier);
+ Map<String, String> segments = currentlyProcessingSegments.get(tier);
return (segments == null) ? 0 : segments.size();
}
@@ -168,5 +172,17 @@
{
lifetimes.put(tier, maxLifetime);
}
+
+ public List<String> getCurrentlyProcessingSegmentsAndHosts(String tier)
+ {
+ Map<String, String> segments = currentlyProcessingSegments.get(tier);
+ List<String> retVal = Lists.newArrayList();
+ for (Map.Entry<String, String> entry : segments.entrySet()) {
+ retVal.add(
+ String.format("%s ON %s", entry.getKey(), entry.getValue())
+ );
+ }
+ return retVal;
+ }
}
}
diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
index a7b3b28..2f1819c 100644
--- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
+++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
@@ -74,7 +74,7 @@
List<ServerHolder> assignedServers = Lists.newArrayList();
while (totalReplicants < expectedReplicants) {
- ServerHolder holder = serverQueue.pollFirst();
+ final ServerHolder holder = serverQueue.pollFirst();
if (holder == null) {
log.warn(
"Not enough %s servers[%d] to assign segment[%s]! Expected Replicants[%d]",
@@ -113,7 +113,11 @@
if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
if (!replicationManager.canAddReplicant(getTier()) ||
- !replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) {
+ !replicationManager.registerReplicantCreation(
+ getTier(),
+ segment.getIdentifier(),
+ holder.getServer().getHost()
+ )) {
serverQueue.add(holder);
break;
}
@@ -126,7 +130,11 @@
@Override
protected void execute()
{
- replicationManager.unregisterReplicantCreation(getTier(), segment.getIdentifier());
+ replicationManager.unregisterReplicantCreation(
+ getTier(),
+ segment.getIdentifier(),
+ holder.getServer().getHost()
+ );
}
}
);
@@ -174,21 +182,25 @@
List<ServerHolder> droppedServers = Lists.newArrayList();
while (actualNumReplicantsForType > expectedNumReplicantsForType) {
- ServerHolder holder = serverQueue.pollLast();
+ final ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
break;
}
- if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
- if (!replicationManager.canDestroyReplicant(getTier()) ||
- !replicationManager.registerReplicantTermination(getTier(), segment.getIdentifier())) {
- serverQueue.add(holder);
- break;
- }
- }
-
if (holder.isServingSegment(segment)) {
+ if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
+ if (!replicationManager.canDestroyReplicant(getTier()) ||
+ !replicationManager.registerReplicantTermination(
+ getTier(),
+ segment.getIdentifier(),
+ holder.getServer().getHost()
+ )) {
+ serverQueue.add(holder);
+ break;
+ }
+ }
+
holder.getPeon().dropSegment(
segment,
new LoadPeonCallback()
@@ -196,7 +208,11 @@
@Override
protected void execute()
{
- replicationManager.unregisterReplicantTermination(getTier(), segment.getIdentifier());
+ replicationManager.unregisterReplicantTermination(
+ getTier(),
+ segment.getIdentifier(),
+ holder.getServer().getHost()
+ );
}
}
);