Merge branch 'dt-dev' of git://github.com/tushargosavi/Malhar into tushargosavi-dt-dev
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortByteArrayInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortByteArrayInputOperator.java
index 25b387b..a3a65d0 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortByteArrayInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortByteArrayInputOperator.java
@@ -15,12 +15,12 @@
*/
package com.datatorrent.contrib.kafka;
+import com.datatorrent.common.util.DTThrowable;
import java.nio.ByteBuffer;
import kafka.message.Message;
public class KafkaSinglePortByteArrayInputOperator extends AbstractKafkaSinglePortInputOperator<byte[]>
{
-
/**
* Implement abstract method of AbstractKafkaSinglePortInputOperator
*
@@ -30,16 +30,17 @@
@Override
public byte[] getTuple(Message message)
{
- byte[] bytes = null;
try {
+ byte[] bytes;
ByteBuffer buffer = message.payload();
bytes = new byte[buffer.remaining()];
buffer.get(bytes);
- }
- catch (Exception ex) {
return bytes;
}
- return bytes;
+ catch (Exception ex) {
+ DTThrowable.rethrow(ex);
+ }
+ return null;
}
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortStringInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortStringInputOperator.java
index 5471a27..fbaeef9 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortStringInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortStringInputOperator.java
@@ -15,6 +15,7 @@
*/
package com.datatorrent.contrib.kafka;
+import com.datatorrent.common.util.DTThrowable;
import kafka.message.Message;
import java.nio.ByteBuffer;
@@ -22,7 +23,7 @@
/**
* Kafka input adapter operator with a single output port, which consumes String data from the Kafka message bus.
* <p></p>
- *
+ * @deprecated Please use KafkaSinglePortByteArrayInputOperator and ByteArrayToStringConverter as Thread Local instead of KafkaSinglePortStringInputOperator.
* @displayName Kafka Single Port String Input
* @category Messaging
* @tags input operator, string
@@ -38,18 +39,19 @@
@Override
public String getTuple(Message message)
{
- String data = "";
try {
+ String data;
ByteBuffer buffer = message.payload();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
data = new String(bytes);
+ return data;
//logger.debug("Consuming {}", data);
}
catch (Exception ex) {
- return data;
+ DTThrowable.rethrow(ex);
}
- return data;
+ return null;
}
}
diff --git a/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java b/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
new file mode 100644
index 0000000..c2d026d
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.converter;
+
+import com.datatorrent.api.BaseOperator;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import java.nio.charset.Charset;
+
+public class ByteArrayToStringConverterOperator extends BaseOperator
+{
+ private Charset characterEncoding;
+
+ public ByteArrayToStringConverterOperator()
+ {
+ characterEncoding = Charset.forName("UTF-8");
+ }
+
+ public Charset getCharacterEncoding()
+ {
+ return characterEncoding;
+ }
+
+ public void setCharacterEncoding(Charset characterEncoding)
+ {
+ this.characterEncoding = characterEncoding;
+ }
+ /**
+ * Accepts byte arrays
+ */
+ public final transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>()
+ {
+ @Override
+ public void process(byte[] message)
+ {
+ if (message != null) {
+ outputString.emit(new String(message,characterEncoding));
+ }
+ else {
+ outputString.emit(null);
+ }
+ }
+
+ };
+
+ /**
+ * Output byte array converted to String
+ */
+ public final transient DefaultOutputPort<String> outputString = new DefaultOutputPort<String>();
+
+}