Merge branch 'STORM-519' of github.com:Parth-Brahmbhatt/incubator-storm
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index f976cea..a492164 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.3-incubating-SNAPSHOT</version>
+ <version>0.9.3-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
index 12263a6..c6838be 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
@@ -66,7 +66,7 @@
try {
Result result = hBaseClient.batchGet(Lists.newArrayList(get))[0];
- for(Values values : rowToTupleMapper.toValues(result)) {
+ for(Values values : rowToTupleMapper.toValues(tuple, result)) {
this.collector.emit(values);
}
this.collector.ack(tuple);
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
index 39ce47a..bc38b83 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
@@ -18,6 +18,7 @@
package org.apache.storm.hbase.bolt.mapper;
import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Values;
import org.apache.hadoop.hbase.client.Result;
@@ -27,11 +28,12 @@
public interface HBaseValueMapper extends Serializable {
/**
*
+ * @param input tuple.
* @param result HBase lookup result instance.
* @return list of values that should be emitted by the lookup bolt.
* @throws Exception
*/
- public List<Values> toValues(Result result) throws Exception;
+ public List<Values> toValues(ITuple input, Result result) throws Exception;
/**
* declares the output fields for the lookup bolt.
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
index 66decf2..7b31fad 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
@@ -149,8 +149,10 @@
try {
Result[] results = hBaseClient.batchGet(gets);
- for(Result result : results) {
- List<Values> values = options.rowToStormValueMapper.toValues(result);
+ for(int i = 0; i < results.length; i++) {
+ Result result = results[i];
+ TridentTuple tuple = tridentTuples.get(i);
+ List<Values> values = options.rowToStormValueMapper.toValues(tuple, result);
batchRetrieveResult.add(values);
}
} catch (Exception e) {
diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
index dd2ae20..2463085 100644
--- a/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
+++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java
@@ -20,6 +20,7 @@
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Values;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -51,7 +52,7 @@
public class WordCountValueMapper implements HBaseValueMapper {
@Override
- public List<Values> toValues(Result result) throws Exception {
+ public List<Values> toValues(ITuple tuple, Result result) throws Exception {
List<Values> values = new ArrayList<Values>();
Cell[] cells = result.rawCells();
for(Cell cell : cells) {