Merge tag 'v0.9.2-incubating' into security
[maven-release-plugin] copy for tag v0.9.2-incubating
Conflicts:
storm-core/pom.xml
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ee2eae5..95024da 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,7 +1,7 @@
-## 0.9.3-incubating (unreleased)
- * STORM-338: Move towards idiomatic Clojure style
-
## 0.9.2-incubating
+ * STORM-66: send taskid on initial handshake
+ * STORM-342: Contention in Disruptor Queue which may cause out of order or lost messages
+ * STORM-338: Move towards idiomatic Clojure style
* STORM-335: add drpc test for removing timed out requests from queue
* STORM-69: Storm UI Visualizations for Topologies
* STORM-297: Performance scaling with CPU
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 903c6e7..0e0d920 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.3-incubating-SNAPSHOT</version>
+ <version>0.9.2-incubating</version>
<relativePath>../../pom.xml</relativePath>
</parent>
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 4972619..a9d9a6d 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.3-incubating-SNAPSHOT</version>
+ <version>0.9.2-incubating</version>
<relativePath>../../pom.xml</relativePath>
</parent>
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index d868e50..2c32a2c 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -214,7 +214,7 @@
public void commit() {
long lastCompletedOffset = lastCompletedOffset();
- if (lastCompletedOffset != lastCompletedOffset) {
+ if (_committedTo != lastCompletedOffset) {
LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
diff --git a/pom.xml b/pom.xml
index 16d163a..19bfa8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm</artifactId>
- <version>0.9.3-incubating-SNAPSHOT</version>
+ <version>0.9.2-incubating</version>
<packaging>pom</packaging>
<name>Storm</name>
<description>Distributed and fault-tolerant realtime computation</description>
@@ -164,7 +164,7 @@
<scm>
<connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-storm.git</connection>
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-storm.git</developerConnection>
- <tag>HEAD</tag>
+ <tag>v0.9.2-incubating</tag>
<url>https://git-wip-us.apache.org/repos/asf/incubator-storm</url>
</scm>
diff --git a/storm-buildtools/maven-shade-clojure-transformer/pom.xml b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
index a6fbad1..27d0cfe 100644
--- a/storm-buildtools/maven-shade-clojure-transformer/pom.xml
+++ b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.3-incubating-SNAPSHOT</version>
+ <version>0.9.2-incubating</version>
<relativePath>../../pom.xml</relativePath>
</parent>
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 160a1b3..4272c60 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.3-incubating-SNAPSHOT</version>
+ <version>0.9.2-incubating</version>
</parent>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
@@ -191,9 +191,21 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.clojars.runa</groupId>
+ <artifactId>conjure</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<sourceDirectory>src/jvm</sourceDirectory>
+ <testSourceDirectory>test/jvm</testSourceDirectory>
<resources>
<resource>
<directory>../conf</directory>
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index 067932c..10e630c 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.NotImplementedException;
+import org.json.simple.JSONValue;
/**
* A TopologyContext is given to bolts and spouts in their "prepare" and "open"
@@ -217,6 +218,16 @@
public Collection<ITaskHook> getHooks() {
return _hooks;
}
+
+ @Override
+ public String toJSONString() {
+ Map obj = new HashMap();
+ obj.put("task->component", this.getTaskToComponent());
+ obj.put("taskid", this.getThisTaskId());
+ // TODO: jsonify StormTopology
+ // at the minimum should send source info
+ return JSONValue.toJSONString(obj);
+ }
/*
* Register a IMetric instance.
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 8c5b466..932af16 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -27,13 +27,15 @@
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.SingleThreadedClaimStrategy;
import com.lmax.disruptor.WaitStrategy;
+
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.metric.api.IStatefulObject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+
/**
*
@@ -51,6 +53,11 @@
// TODO: consider having a threadlocal cache of this variable to speed up reads?
volatile boolean consumerStartedFlag = false;
ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
+
+ private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
+ private final Lock readLock = cacheLock.readLock();
+ private final Lock writeLock = cacheLock.writeLock();
+
private static String PREFIX = "disruptor-";
private String _queueName = "";
@@ -62,6 +69,13 @@
_buffer.setGatingSequences(_consumer);
if(claim instanceof SingleThreadedClaimStrategy) {
consumerStartedFlag = true;
+ } else {
+ // make sure we flush the pending messages in cache first
+ try {
+ publishDirect(FLUSH_CACHE, true);
+ } catch (InsufficientCapacityException e) {
+ throw new RuntimeException("This code should be unreachable!", e);
+ }
}
}
@@ -134,33 +148,47 @@
}
public void publish(Object obj, boolean block) throws InsufficientCapacityException {
- if(consumerStartedFlag) {
- final long id;
- if(block) {
- id = _buffer.next();
- } else {
- id = _buffer.tryNext(1);
+
+ boolean publishNow = consumerStartedFlag;
+
+ if (!publishNow) {
+ readLock.lock();
+ try {
+ publishNow = consumerStartedFlag;
+ if (!publishNow) {
+ _cache.add(obj);
+ }
+ } finally {
+ readLock.unlock();
}
- final MutableObject m = _buffer.get(id);
- m.setObject(obj);
- _buffer.publish(id);
- } else {
- _cache.add(obj);
- if(consumerStartedFlag) flushCache();
}
+
+ if (publishNow) {
+ publishDirect(obj, block);
+ }
+ }
+
+ private void publishDirect(Object obj, boolean block) throws InsufficientCapacityException {
+ final long id;
+ if(block) {
+ id = _buffer.next();
+ } else {
+ id = _buffer.tryNext(1);
+ }
+ final MutableObject m = _buffer.get(id);
+ m.setObject(obj);
+ _buffer.publish(id);
}
public void consumerStarted() {
- if(!consumerStartedFlag) {
- consumerStartedFlag = true;
- flushCache();
- }
+
+ consumerStartedFlag = true;
+
+ // Use writeLock to make sure all pending cache add opearation completed
+ writeLock.lock();
+ writeLock.unlock();
}
- private void flushCache() {
- publish(FLUSH_CACHE);
- }
-
public long population() { return (writePos() - readPos()); }
public long capacity() { return _buffer.getBufferSize(); }
public long writePos() { return _buffer.getCursor(); }
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
new file mode 100644
index 0000000..653fd33
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.InsufficientCapacityException;
+import com.lmax.disruptor.MultiThreadedClaimStrategy;
+import org.junit.Assert;
+import org.junit.Test;
+import junit.framework.TestCase;
+
+public class DisruptorQueueTest extends TestCase {
+
+ private final static int TIMEOUT = 5; // MS
+ private final static int PRODUCER_NUM = 4;
+
+ @Test
+ public void testMessageDisorder() throws InterruptedException {
+
+ // Set queue length to bigger enough
+ DisruptorQueue queue = createQueue("messageOrder", 16);
+
+ queue.publish("1");
+
+ Runnable producer = new Producer(queue, "2");
+
+ final Object [] result = new Object[1];
+ Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
+ private boolean head = true;
+
+ @Override
+ public void onEvent(Object obj, long sequence, boolean endOfBatch)
+ throws Exception {
+ if (head) {
+ head = false;
+ result[0] = obj;
+ }
+ }
+ });
+
+ run(producer, consumer);
+ Assert.assertEquals("We expect to receive first published message first, but received " + result[0],
+ "1", result[0]);
+ }
+
+ @Test
+ public void testConsumerHang() throws InterruptedException {
+ final AtomicBoolean messageConsumed = new AtomicBoolean(false);
+
+ // Set queue length to 1, so that the RingBuffer can be easily full
+ // to trigger consumer blocking
+ DisruptorQueue queue = createQueue("consumerHang", 1);
+ Runnable producer = new Producer(queue, "msg");
+ Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
+ @Override
+ public void onEvent(Object obj, long sequence, boolean endOfBatch)
+ throws Exception {
+ messageConsumed.set(true);
+ }
+ });
+
+ run(producer, consumer);
+ Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs",
+ messageConsumed.get());
+ }
+
+
+ private void run(Runnable producer, Runnable consumer)
+ throws InterruptedException {
+
+ Thread[] producerThreads = new Thread[PRODUCER_NUM];
+ for (int i = 0; i < PRODUCER_NUM; i++) {
+ producerThreads[i] = new Thread(producer);
+ producerThreads[i].start();
+ }
+
+ Thread consumerThread = new Thread(consumer);
+ consumerThread.start();
+
+ for (int i = 0; i < PRODUCER_NUM; i++) {
+ producerThreads[i].interrupt();
+ producerThreads[i].join(TIMEOUT);
+ }
+ consumerThread.interrupt();
+ consumerThread.join(TIMEOUT);
+ }
+
+ private class Producer implements Runnable {
+ private String msg;
+ private DisruptorQueue queue;
+
+ Producer(DisruptorQueue queue, String msg) {
+ this.msg = msg;
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ queue.publish(msg, false);
+ }
+ } catch (InsufficientCapacityException e) {
+ return;
+ }
+ }
+ };
+
+ private class Consumer implements Runnable {
+ private EventHandler handler;
+ private DisruptorQueue queue;
+
+ Consumer(DisruptorQueue queue, EventHandler handler) {
+ this.handler = handler;
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ queue.consumerStarted();
+ try {
+ while(true) {
+ queue.consumeBatchWhenAvailable(handler);
+ }
+ }catch(RuntimeException e) {
+ //break
+ }
+ }
+ };
+
+ private static DisruptorQueue createQueue(String name, int queueSize) {
+ return new DisruptorQueue(name, new MultiThreadedClaimStrategy(
+ queueSize), new BlockingWaitStrategy());
+ }
+}
diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml
index 0d97c0b..6ffe62d 100644
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.3-incubating-SNAPSHOT</version>
+ <version>0.9.2-incubating</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>
diff --git a/storm-dist/source/pom.xml b/storm-dist/source/pom.xml
index 4fedefb..2bd815b 100644
--- a/storm-dist/source/pom.xml
+++ b/storm-dist/source/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.3-incubating-SNAPSHOT</version>
+ <version>0.9.2-incubating</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>