Merge branch 'dt-dev' of git://github.com/tushargosavi/Malhar into tushargosavi-dt-dev
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortByteArrayInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortByteArrayInputOperator.java
index 25b387b..a3a65d0 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortByteArrayInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortByteArrayInputOperator.java
@@ -15,12 +15,12 @@
  */
 package com.datatorrent.contrib.kafka;
 
+import com.datatorrent.common.util.DTThrowable;
 import java.nio.ByteBuffer;
 import kafka.message.Message;
 
   public class KafkaSinglePortByteArrayInputOperator extends AbstractKafkaSinglePortInputOperator<byte[]>
   {
-
     /**
      * Implement abstract method of AbstractKafkaSinglePortInputOperator
      *
@@ -30,16 +30,17 @@
     @Override
     public byte[] getTuple(Message message)
     {
-      byte[] bytes = null;
       try {
+        byte[] bytes;
         ByteBuffer buffer = message.payload();
         bytes = new byte[buffer.remaining()];
         buffer.get(bytes);
-      }
-      catch (Exception ex) {
         return bytes;
       }
-      return bytes;
+      catch (Exception ex) {
+        DTThrowable.rethrow(ex);
+      }
+      return null;
     }
 
   }
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortStringInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortStringInputOperator.java
index 5471a27..fbaeef9 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortStringInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortStringInputOperator.java
@@ -15,6 +15,7 @@
  */
 package com.datatorrent.contrib.kafka;
 
+import com.datatorrent.common.util.DTThrowable;
 import kafka.message.Message;
 
 import java.nio.ByteBuffer;
@@ -22,7 +23,7 @@
 /**
  * Kafka input adapter operator with a single output port, which consumes String data from the Kafka message bus.
  * <p></p>
- *
+ * @deprecated Please use KafkaSinglePortByteArrayInputOperator and ByteArrayToStringConverter as Thread Local instead of KafkaSinglePortStringInputOperator.
  * @displayName Kafka Single Port String Input
  * @category Messaging
  * @tags input operator, string
@@ -38,18 +39,19 @@
   @Override
   public String getTuple(Message message)
   {
-    String data = "";
     try {
+      String data;
       ByteBuffer buffer = message.payload();
       byte[] bytes = new byte[buffer.remaining()];
       buffer.get(bytes);
       data = new String(bytes);
+      return data;
       //logger.debug("Consuming {}", data);
     }
     catch (Exception ex) {
-      return data;
+      DTThrowable.rethrow(ex);
     }
-    return data;
+    return null;
   }
 
 }
diff --git a/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java b/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
new file mode 100644
index 0000000..c2d026d
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.converter;
+
+import com.datatorrent.api.BaseOperator;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import java.nio.charset.Charset;
+
+public class ByteArrayToStringConverterOperator extends BaseOperator
+{
+  private Charset characterEncoding;
+
+  public ByteArrayToStringConverterOperator()
+  {
+    characterEncoding = Charset.forName("UTF-8");
+  }
+
+  public Charset getCharacterEncoding()
+  {
+    return characterEncoding;
+  }
+
+  public void setCharacterEncoding(Charset characterEncoding)
+  {
+    this.characterEncoding = characterEncoding;
+  }
+  /**
+   * Accepts byte arrays
+   */
+  public final transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>()
+  {
+    @Override
+    public void process(byte[] message)
+    {
+      if (message != null) {
+        outputString.emit(new String(message,characterEncoding));
+      }
+      else {
+        outputString.emit(null);
+      }
+    }
+
+  };
+
+  /**
+   * Output byte array converted to String
+   */
+  public final transient DefaultOutputPort<String> outputString = new DefaultOutputPort<String>();
+
+}