Revert "ByteArrayToStringConverter"
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortByteArrayInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortByteArrayInputOperator.java
index a3a65d0..25b387b 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortByteArrayInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortByteArrayInputOperator.java
@@ -15,12 +15,12 @@
*/
package com.datatorrent.contrib.kafka;
-import com.datatorrent.common.util.DTThrowable;
import java.nio.ByteBuffer;
import kafka.message.Message;
public class KafkaSinglePortByteArrayInputOperator extends AbstractKafkaSinglePortInputOperator<byte[]>
{
+
/**
* Implement abstract method of AbstractKafkaSinglePortInputOperator
*
@@ -30,17 +30,16 @@
@Override
public byte[] getTuple(Message message)
{
+ byte[] bytes = null;
try {
- byte[] bytes;
ByteBuffer buffer = message.payload();
bytes = new byte[buffer.remaining()];
buffer.get(bytes);
- return bytes;
}
catch (Exception ex) {
- DTThrowable.rethrow(ex);
+ return bytes;
}
- return null;
+ return bytes;
}
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortStringInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortStringInputOperator.java
index fbaeef9..5471a27 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortStringInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortStringInputOperator.java
@@ -15,7 +15,6 @@
*/
package com.datatorrent.contrib.kafka;
-import com.datatorrent.common.util.DTThrowable;
import kafka.message.Message;
import java.nio.ByteBuffer;
@@ -23,7 +22,7 @@
/**
* Kafka input adapter operator with a single output port, which consumes String data from the Kafka message bus.
* <p></p>
- * @deprecated Please use KafkaSinglePortByteArrayInputOperator and ByteArrayToStringConverter as Thread Local instead of KafkaSinglePortStringInputOperator.
+ *
* @displayName Kafka Single Port String Input
* @category Messaging
* @tags input operator, string
@@ -39,19 +38,18 @@
@Override
public String getTuple(Message message)
{
+ String data = "";
try {
- String data;
ByteBuffer buffer = message.payload();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
data = new String(bytes);
- return data;
//logger.debug("Consuming {}", data);
}
catch (Exception ex) {
- DTThrowable.rethrow(ex);
+ return data;
}
- return null;
+ return data;
}
}
diff --git a/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java b/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
deleted file mode 100644
index c2d026d..0000000
--- a/library/src/main/java/com/datatorrent/lib/converter/ByteArrayToStringConverterOperator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.lib.converter;
-
-import com.datatorrent.api.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import java.nio.charset.Charset;
-
-public class ByteArrayToStringConverterOperator extends BaseOperator
-{
- private Charset characterEncoding;
-
- public ByteArrayToStringConverterOperator()
- {
- characterEncoding = Charset.forName("UTF-8");
- }
-
- public Charset getCharacterEncoding()
- {
- return characterEncoding;
- }
-
- public void setCharacterEncoding(Charset characterEncoding)
- {
- this.characterEncoding = characterEncoding;
- }
- /**
- * Accepts byte arrays
- */
- public final transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>()
- {
- @Override
- public void process(byte[] message)
- {
- if (message != null) {
- outputString.emit(new String(message,characterEncoding));
- }
- else {
- outputString.emit(null);
- }
- }
-
- };
-
- /**
- * Output byte array converted to String
- */
- public final transient DefaultOutputPort<String> outputString = new DefaultOutputPort<String>();
-
-}