fix kafka spout HeapByteBuffer not serializable problem (#2063)
diff --git a/heron/storm/src/java/org/apache/storm/spout/RawMultiScheme.java b/heron/storm/src/java/org/apache/storm/spout/RawMultiScheme.java
index 620fc87..c1aaa89 100644
--- a/heron/storm/src/java/org/apache/storm/spout/RawMultiScheme.java
+++ b/heron/storm/src/java/org/apache/storm/spout/RawMultiScheme.java
@@ -31,7 +31,9 @@
@Override
public Iterable<List<Object>> deserialize(ByteBuffer ser) {
- return asList(tuple(ser));
+ byte[] bytes = new byte[ser.remaining()];
+ ser.get(bytes);
+ return asList(tuple(bytes));
}
@Override
diff --git a/heron/storm/src/java/org/apache/storm/spout/RawScheme.java b/heron/storm/src/java/org/apache/storm/spout/RawScheme.java
index e36add4..30755b6 100644
--- a/heron/storm/src/java/org/apache/storm/spout/RawScheme.java
+++ b/heron/storm/src/java/org/apache/storm/spout/RawScheme.java
@@ -28,10 +28,14 @@
public class RawScheme implements Scheme {
private static final long serialVersionUID = 6098042939916415521L;
+ @Override
public List<Object> deserialize(ByteBuffer ser) {
- return tuple(ser);
+ byte[] bytes = new byte[ser.remaining()];
+ ser.get(bytes);
+ return tuple(bytes);
}
+ @Override
public Fields getOutputFields() {
return new Fields("bytes");
}