Merge pull request #216 from metamx/announce-fix

Make batch data segment announcer thread safe
diff --git a/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java
index 5911ec2..46aedce 100644
--- a/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java
+++ b/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java
@@ -36,6 +36,8 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  */
@@ -48,8 +50,11 @@
   private final ObjectMapper jsonMapper;
   private final String liveSegmentLocation;
 
-  private final Set<SegmentZNode> availableZNodes = Sets.newHashSet();
-  private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap();
+  private final Object lock = new Object();
+  private final AtomicLong counter = new AtomicLong(0);
+
+  private final Set<SegmentZNode> availableZNodes = new ConcurrentSkipListSet<SegmentZNode>();
+  private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newConcurrentMap();
 
   public BatchDataSegmentAnnouncer(
       DruidServerMetadata server,
@@ -74,32 +79,34 @@
       throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
     }
 
-    // create new batch
-    if (availableZNodes.isEmpty()) {
-      SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
-      availableZNode.addSegment(segment);
+    synchronized (lock) {
+      // create new batch
+      if (availableZNodes.isEmpty()) {
+        SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
+        availableZNode.addSegment(segment);
 
-      log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
-      announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
-      segmentLookup.put(segment, availableZNode);
-      availableZNodes.add(availableZNode);
-    } else { // update existing batch
-      Iterator<SegmentZNode> iter = availableZNodes.iterator();
-      boolean done = false;
-      while (iter.hasNext() && !done) {
-        SegmentZNode availableZNode = iter.next();
-        if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
-          availableZNode.addSegment(segment);
+        log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
+        announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
+        segmentLookup.put(segment, availableZNode);
+        availableZNodes.add(availableZNode);
+      } else { // update existing batch
+        Iterator<SegmentZNode> iter = availableZNodes.iterator();
+        boolean done = false;
+        while (iter.hasNext() && !done) {
+          SegmentZNode availableZNode = iter.next();
+          if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
+            availableZNode.addSegment(segment);
 
-          log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
-          announcer.update(availableZNode.getPath(), availableZNode.getBytes());
-          segmentLookup.put(segment, availableZNode);
+            log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
+            announcer.update(availableZNode.getPath(), availableZNode.getBytes());
+            segmentLookup.put(segment, availableZNode);
 
-          if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
-            availableZNodes.remove(availableZNode);
+            if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
+              availableZNodes.remove(availableZNode);
+            }
+
+            done = true;
           }
-
-          done = true;
         }
       }
     }
@@ -109,15 +116,21 @@
   public void unannounceSegment(DataSegment segment) throws IOException
   {
     final SegmentZNode segmentZNode = segmentLookup.remove(segment);
-    segmentZNode.removeSegment(segment);
+    if (segmentZNode == null) {
+      return;
+    }
 
-    log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
-    if (segmentZNode.getCount() == 0) {
-      availableZNodes.remove(segmentZNode);
-      announcer.unannounce(segmentZNode.getPath());
-    } else {
-      announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
-      availableZNodes.add(segmentZNode);
+    synchronized (lock) {
+      segmentZNode.removeSegment(segment);
+
+      log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
+      if (segmentZNode.getCount() == 0) {
+        availableZNodes.remove(segmentZNode);
+        announcer.unannounce(segmentZNode.getPath());
+      } else {
+        announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
+        availableZNodes.add(segmentZNode);
+      }
     }
   }
 
@@ -167,10 +180,10 @@
 
   private String makeServedSegmentPath(String zNode)
   {
-    return ZKPaths.makePath(liveSegmentLocation, zNode);
+    return ZKPaths.makePath(liveSegmentLocation, String.format("%s%s", zNode, counter.getAndIncrement()));
   }
 
-  private class SegmentZNode
+  private class SegmentZNode implements Comparable<SegmentZNode>
   {
     private final String path;
 
@@ -286,5 +299,11 @@
     {
       return path.hashCode();
     }
+
+    @Override
+    public int compareTo(SegmentZNode segmentZNode)
+    {
+      return path.compareTo(segmentZNode.getPath());
+    }
   }
 }