Merge tag 'v0.9.2-incubating' into security

[maven-release-plugin]  copy for tag v0.9.2-incubating

Conflicts:
	storm-core/pom.xml
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ee2eae5..95024da 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,7 +1,7 @@
-## 0.9.3-incubating (unreleased)
- * STORM-338: Move towards idiomatic Clojure style 
-
 ## 0.9.2-incubating
+ * STORM-66: send taskid on initial handshake
+ * STORM-342: Contention in Disruptor Queue which may cause out of order or lost messages
+ * STORM-338: Move towards idiomatic Clojure style 
  * STORM-335: add drpc test for removing timed out requests from queue
  * STORM-69: Storm UI Visualizations for Topologies
  * STORM-297: Performance scaling with CPU
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 903c6e7..0e0d920 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -20,7 +20,7 @@
   <parent>
       <artifactId>storm</artifactId>
       <groupId>org.apache.storm</groupId>
-      <version>0.9.3-incubating-SNAPSHOT</version>
+      <version>0.9.2-incubating</version>
       <relativePath>../../pom.xml</relativePath>
   </parent>
 
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 4972619..a9d9a6d 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.3-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index d868e50..2c32a2c 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -214,7 +214,7 @@
 
     public void commit() {
         long lastCompletedOffset = lastCompletedOffset();
-        if (lastCompletedOffset != lastCompletedOffset) {
+        if (_committedTo != lastCompletedOffset) {
             LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
             Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
                     .put("topology", ImmutableMap.of("id", _topologyInstanceId,
diff --git a/pom.xml b/pom.xml
index 16d163a..19bfa8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
 
     <groupId>org.apache.storm</groupId>
     <artifactId>storm</artifactId>
-    <version>0.9.3-incubating-SNAPSHOT</version>
+    <version>0.9.2-incubating</version>
     <packaging>pom</packaging>
     <name>Storm</name>
     <description>Distributed and fault-tolerant realtime computation</description>
@@ -164,7 +164,7 @@
     <scm>
         <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-storm.git</connection>
         <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-storm.git</developerConnection>
-        <tag>HEAD</tag>
+        <tag>v0.9.2-incubating</tag>
         <url>https://git-wip-us.apache.org/repos/asf/incubator-storm</url>
     </scm>
 
diff --git a/storm-buildtools/maven-shade-clojure-transformer/pom.xml b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
index a6fbad1..27d0cfe 100644
--- a/storm-buildtools/maven-shade-clojure-transformer/pom.xml
+++ b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.3-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 160a1b3..4272c60 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -20,7 +20,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.3-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating</version>
     </parent>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-core</artifactId>
@@ -191,9 +191,21 @@
             <artifactId>mockito-all</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.clojars.runa</groupId>
+            <artifactId>conjure</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+        	<groupId>junit</groupId>
+        	<artifactId>junit</artifactId>
+        	<version>4.1</version>
+        	<scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <sourceDirectory>src/jvm</sourceDirectory>
+         <testSourceDirectory>test/jvm</testSourceDirectory>
         <resources>
             <resource>
                 <directory>../conf</directory>
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index 067932c..10e630c 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -37,6 +37,7 @@
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang.NotImplementedException;
+import org.json.simple.JSONValue;
 
 /**
  * A TopologyContext is given to bolts and spouts in their "prepare" and "open"
@@ -217,6 +218,16 @@
     public Collection<ITaskHook> getHooks() {
         return _hooks;
     }
+    
+    @Override
+    public String toJSONString() {
+        Map obj = new HashMap();
+        obj.put("task->component", this.getTaskToComponent());
+        obj.put("taskid", this.getThisTaskId());
+        // TODO: jsonify StormTopology
+        // at the minimum should send source info
+        return JSONValue.toJSONString(obj);
+    }
 
     /*
      * Register a IMetric instance. 
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 8c5b466..932af16 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -27,13 +27,15 @@
 import com.lmax.disruptor.SequenceBarrier;
 import com.lmax.disruptor.SingleThreadedClaimStrategy;
 import com.lmax.disruptor.WaitStrategy;
+
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.HashMap;
 import java.util.Map;
 import backtype.storm.metric.api.IStatefulObject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+
 
 /**
  *
@@ -51,6 +53,11 @@
     // TODO: consider having a threadlocal cache of this variable to speed up reads?
     volatile boolean consumerStartedFlag = false;
     ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
+    
+    private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
+    private final Lock readLock  = cacheLock.readLock();
+    private final Lock writeLock = cacheLock.writeLock();
+    
     private static String PREFIX = "disruptor-";
     private String _queueName = "";
     
@@ -62,6 +69,13 @@
         _buffer.setGatingSequences(_consumer);
         if(claim instanceof SingleThreadedClaimStrategy) {
             consumerStartedFlag = true;
+        } else {
+            // make sure we flush the pending messages in cache first
+            try {
+                publishDirect(FLUSH_CACHE, true);
+            } catch (InsufficientCapacityException e) {
+                throw new RuntimeException("This code should be unreachable!", e);
+            }
         }
     }
     
@@ -134,33 +148,47 @@
     }
     
     public void publish(Object obj, boolean block) throws InsufficientCapacityException {
-        if(consumerStartedFlag) {
-            final long id;
-            if(block) {
-                id = _buffer.next();
-            } else {
-                id = _buffer.tryNext(1);
+
+        boolean publishNow = consumerStartedFlag;
+
+        if (!publishNow) {
+            readLock.lock(); 
+            try {
+                publishNow = consumerStartedFlag;
+                if (!publishNow) {
+                    _cache.add(obj);
+                }
+            } finally {
+                readLock.unlock();
             }
-            final MutableObject m = _buffer.get(id);
-            m.setObject(obj);
-            _buffer.publish(id);
-        } else {
-            _cache.add(obj);
-            if(consumerStartedFlag) flushCache();
         }
+        
+        if (publishNow) {
+            publishDirect(obj, block);
+        }
+    }
+    
+    private void publishDirect(Object obj, boolean block) throws InsufficientCapacityException {
+        final long id;
+        if(block) {
+            id = _buffer.next();
+        } else {
+            id = _buffer.tryNext(1);
+        }
+        final MutableObject m = _buffer.get(id);
+        m.setObject(obj);
+        _buffer.publish(id);
     }
     
     public void consumerStarted() {
-        if(!consumerStartedFlag) {
-            consumerStartedFlag = true;
-            flushCache();
-        }
+
+        consumerStartedFlag = true;
+        
+        // Use writeLock to make sure all pending cache add opearation completed
+        writeLock.lock();
+        writeLock.unlock();
     }
     
-    private void flushCache() {
-        publish(FLUSH_CACHE);
-    }
-
     public long  population() { return (writePos() - readPos()); }
     public long  capacity()   { return _buffer.getBufferSize(); }
     public long  writePos()   { return _buffer.getCursor(); }
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
new file mode 100644
index 0000000..653fd33
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.InsufficientCapacityException;
+import com.lmax.disruptor.MultiThreadedClaimStrategy;
+import org.junit.Assert;
+import org.junit.Test;
+import junit.framework.TestCase;
+
+public class DisruptorQueueTest extends TestCase {
+
+    private final static int TIMEOUT = 5; // MS
+    private final static int PRODUCER_NUM = 4;
+
+    @Test
+    public void testMessageDisorder() throws InterruptedException {
+
+        // Set queue length to bigger enough
+        DisruptorQueue queue = createQueue("messageOrder", 16);
+
+        queue.publish("1");
+
+        Runnable producer = new Producer(queue, "2");
+
+        final Object [] result = new Object[1];
+        Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
+            private boolean head = true;
+
+            @Override
+            public void onEvent(Object obj, long sequence, boolean endOfBatch)
+                    throws Exception {
+                if (head) {
+                    head = false;
+                    result[0] = obj;
+                }
+            }
+        });
+
+        run(producer, consumer);
+        Assert.assertEquals("We expect to receive first published message first, but received " + result[0],
+                "1", result[0]);
+    }
+    
+    @Test 
+    public void testConsumerHang() throws InterruptedException {
+        final AtomicBoolean messageConsumed = new AtomicBoolean(false);
+
+        // Set queue length to 1, so that the RingBuffer can be easily full
+        // to trigger consumer blocking
+        DisruptorQueue queue = createQueue("consumerHang", 1);
+        Runnable producer = new Producer(queue, "msg");
+        Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
+            @Override
+            public void onEvent(Object obj, long sequence, boolean endOfBatch)
+                    throws Exception {
+                messageConsumed.set(true);
+            }
+        });
+
+        run(producer, consumer);
+        Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs",
+                messageConsumed.get());
+    }
+
+
+    private void run(Runnable producer, Runnable consumer)
+            throws InterruptedException {
+
+        Thread[] producerThreads = new Thread[PRODUCER_NUM];
+        for (int i = 0; i < PRODUCER_NUM; i++) {
+            producerThreads[i] = new Thread(producer);
+            producerThreads[i].start();
+        }
+        
+        Thread consumerThread = new Thread(consumer);
+        consumerThread.start();
+                
+        for (int i = 0; i < PRODUCER_NUM; i++) {
+            producerThreads[i].interrupt();
+            producerThreads[i].join(TIMEOUT);
+        }
+        consumerThread.interrupt();
+        consumerThread.join(TIMEOUT);
+    }
+
+    private class Producer implements Runnable {
+        private String msg;
+        private DisruptorQueue queue;
+
+        Producer(DisruptorQueue queue, String msg) {
+            this.msg = msg;
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    queue.publish(msg, false);
+                }
+            } catch (InsufficientCapacityException e) {
+                return;
+            }
+        }
+    };
+
+    private class Consumer implements Runnable {
+        private EventHandler handler;
+        private DisruptorQueue queue;
+
+        Consumer(DisruptorQueue queue, EventHandler handler) {
+            this.handler = handler;
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            queue.consumerStarted();
+            try {
+                while(true) {
+                    queue.consumeBatchWhenAvailable(handler);
+                }
+            }catch(RuntimeException e) {
+                //break
+            }
+        }
+    };
+
+    private static DisruptorQueue createQueue(String name, int queueSize) {
+        return new DisruptorQueue(name, new MultiThreadedClaimStrategy(
+                queueSize), new BlockingWaitStrategy());
+    }
+}
diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml
index 0d97c0b..6ffe62d 100644
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.3-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <groupId>org.apache.storm</groupId>
diff --git a/storm-dist/source/pom.xml b/storm-dist/source/pom.xml
index 4fedefb..2bd815b 100644
--- a/storm-dist/source/pom.xml
+++ b/storm-dist/source/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.3-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <groupId>org.apache.storm</groupId>