Merge pull request #216 from metamx/announce-fix
Make batch data segment announcer thread safe
diff --git a/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java
index 5911ec2..46aedce 100644
--- a/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java
+++ b/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java
@@ -36,6 +36,8 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicLong;
/**
*/
@@ -48,8 +50,11 @@
private final ObjectMapper jsonMapper;
private final String liveSegmentLocation;
- private final Set<SegmentZNode> availableZNodes = Sets.newHashSet();
- private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap();
+ private final Object lock = new Object();
+ private final AtomicLong counter = new AtomicLong(0);
+
+ private final Set<SegmentZNode> availableZNodes = new ConcurrentSkipListSet<SegmentZNode>();
+ private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newConcurrentMap();
public BatchDataSegmentAnnouncer(
DruidServerMetadata server,
@@ -74,32 +79,34 @@
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
}
- // create new batch
- if (availableZNodes.isEmpty()) {
- SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
- availableZNode.addSegment(segment);
+ synchronized (lock) {
+ // create new batch
+ if (availableZNodes.isEmpty()) {
+ SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
+ availableZNode.addSegment(segment);
- log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
- announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
- segmentLookup.put(segment, availableZNode);
- availableZNodes.add(availableZNode);
- } else { // update existing batch
- Iterator<SegmentZNode> iter = availableZNodes.iterator();
- boolean done = false;
- while (iter.hasNext() && !done) {
- SegmentZNode availableZNode = iter.next();
- if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
- availableZNode.addSegment(segment);
+ log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
+ announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
+ segmentLookup.put(segment, availableZNode);
+ availableZNodes.add(availableZNode);
+ } else { // update existing batch
+ Iterator<SegmentZNode> iter = availableZNodes.iterator();
+ boolean done = false;
+ while (iter.hasNext() && !done) {
+ SegmentZNode availableZNode = iter.next();
+ if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
+ availableZNode.addSegment(segment);
- log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
- announcer.update(availableZNode.getPath(), availableZNode.getBytes());
- segmentLookup.put(segment, availableZNode);
+ log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
+ announcer.update(availableZNode.getPath(), availableZNode.getBytes());
+ segmentLookup.put(segment, availableZNode);
- if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
- availableZNodes.remove(availableZNode);
+ if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
+ availableZNodes.remove(availableZNode);
+ }
+
+ done = true;
}
-
- done = true;
}
}
}
@@ -109,15 +116,21 @@
public void unannounceSegment(DataSegment segment) throws IOException
{
final SegmentZNode segmentZNode = segmentLookup.remove(segment);
- segmentZNode.removeSegment(segment);
+ if (segmentZNode == null) {
+ return;
+ }
- log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
- if (segmentZNode.getCount() == 0) {
- availableZNodes.remove(segmentZNode);
- announcer.unannounce(segmentZNode.getPath());
- } else {
- announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
- availableZNodes.add(segmentZNode);
+ synchronized (lock) {
+ segmentZNode.removeSegment(segment);
+
+ log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
+ if (segmentZNode.getCount() == 0) {
+ availableZNodes.remove(segmentZNode);
+ announcer.unannounce(segmentZNode.getPath());
+ } else {
+ announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
+ availableZNodes.add(segmentZNode);
+ }
}
}
@@ -167,10 +180,10 @@
private String makeServedSegmentPath(String zNode)
{
- return ZKPaths.makePath(liveSegmentLocation, zNode);
+ return ZKPaths.makePath(liveSegmentLocation, String.format("%s%s", zNode, counter.getAndIncrement()));
}
- private class SegmentZNode
+ private class SegmentZNode implements Comparable<SegmentZNode>
{
private final String path;
@@ -286,5 +299,11 @@
{
return path.hashCode();
}
+
+ @Override
+ public int compareTo(SegmentZNode segmentZNode)
+ {
+ return path.compareTo(segmentZNode.getPath());
+ }
}
}