changes.
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserApplicationWithFieldMappingFileInput.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserApplicationWithFieldMappingFileInput.java index 8fc7660..664b1f3 100644 --- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserApplicationWithFieldMappingFileInput.java +++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserApplicationWithFieldMappingFileInput.java
@@ -19,14 +19,11 @@ import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.common.util.DTThrowable; -import com.datatorrent.contrib.kafka.AbstractKafkaSinglePortInputOperator; import com.datatorrent.lib.stream.DevNull; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import java.nio.ByteBuffer; import java.util.Map; -import kafka.message.Message; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -41,7 +38,7 @@ { dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000); - KafkaSinglePortStringInputOperator kafkaStringInput = dag.addOperator("KafkaStringInput", new KafkaSinglePortStringInputOperator()); + KafkaSinglePortByteArrayInputOperator kafkaStringInput = dag.addOperator("KafkaStringInput", new KafkaSinglePortByteArrayInputOperator()); CsvToMapParser parser = dag.addOperator("CsvParser", CsvToMapParser.class); String filepath = conf.get("dt.application.CsvParserFileMappingInputApplication.operator.CsvParser.fieldmappingFile"); parser.setFieldmappingFile(filepath); @@ -116,32 +113,6 @@ logger.debug("Written data to HDFS file."); } - public static class KafkaSinglePortStringInputOperator extends AbstractKafkaSinglePortInputOperator<byte[]> - { - - /** - * Implement abstract method of AbstractKafkaSinglePortInputOperator - * - * @param message - * @return byte Array - */ - @Override - public byte[] getTuple(Message message) - { - byte[] bytes = null; - try { - ByteBuffer buffer = message.payload(); - bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - } - catch (Exception ex) { - return bytes; - } - return bytes; - } - - } - private static final Logger logger = LoggerFactory.getLogger(CsvToMapParserApplicationWithFieldMappingFileInput.class); }
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserTestApplication.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserTestApplication.java index cb2b367..946078d 100644 --- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserTestApplication.java +++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserTestApplication.java
@@ -18,15 +18,12 @@ import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.contrib.kafka.AbstractKafkaSinglePortInputOperator; import com.datatorrent.contrib.parser.AbstractCsvParser.Field; import com.datatorrent.lib.stream.DevNull; import org.apache.hadoop.conf.Configuration; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Map; -import kafka.message.Message; @ApplicationAnnotation(name="CsvToMapParserTestApplication") public class CsvToMapParserTestApplication implements StreamingApplication @@ -37,7 +34,7 @@ { dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000); - KafkaSinglePortStringInputOperator kafkaStringInput = dag.addOperator("KafkaStringInput", new KafkaSinglePortStringInputOperator()); + KafkaSinglePortByteArrayInputOperator kafkaStringInput = dag.addOperator("KafkaStringInput", new KafkaSinglePortByteArrayInputOperator()); CsvToMapParser parser = dag.addOperator("CsvToMapParser", CsvToMapParser.class); ArrayList<CsvToMapParser.Field> fields= new ArrayList<CsvToMapParser.Field>(); @@ -85,29 +82,4 @@ dag.addStream("Parser2DevNull",parser.output,devNull.data); } - public static class KafkaSinglePortStringInputOperator extends AbstractKafkaSinglePortInputOperator<byte[]> - { - - /** - * Implement abstract method of AbstractKafkaSinglePortInputOperator - * @param message - * @return byte Array - */ - @Override - public byte[] getTuple(Message message) - { - byte[] bytes = null; - try { - ByteBuffer buffer = message.payload(); - bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - } - catch (Exception ex) { - return bytes; - } - return bytes; - } - - } - }
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsWithCsvMapParser.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsWithCsvMapParser.java index 8053dc8..dd0f998 100644 --- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsWithCsvMapParser.java +++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsWithCsvMapParser.java
@@ -107,7 +107,7 @@ public void populateDAG(DAG dag, Configuration conf) { CsvToMapParser parser = dag.addOperator("Parser", CsvToMapParser.class); - String filepath = conf.get("dt.application.CsvParserFileMappingInputApplication.operator.CsvParser.fieldmappingFile"); + String filepath = conf.get("dt.application.GenericDimensionsWithCsvMapParser.operator.Parser.fieldmappingFile"); parser.setFieldmappingFile(filepath); createFieldMappingFile(filepath); parser.setIsHeader(false); @@ -117,6 +117,9 @@ DimensionStoreOperator store = dag.addOperator("Store", DimensionStoreOperator.class); KafkaSinglePortStringInputOperator queries = dag.addOperator("Query", new KafkaSinglePortStringInputOperator()); KafkaSinglePortOutputOperator<Object, Object> queryResult = dag.addOperator("QueryResult", new KafkaSinglePortOutputOperator<Object, Object>()); + KafkaSinglePortByteArrayInputOperator kafkaStringInput = dag.addOperator("KafkaStringInput", new KafkaSinglePortByteArrayInputOperator()); + + dag.addStream("Kafka2Parser", kafkaStringInput.outputPort, parser.input); dag.setInputPortAttribute(parser.input, Context.PortContext.PARTITION_PARALLEL, true); dag.setInputPortAttribute(dimensions.data, Context.PortContext.PARTITION_PARALLEL, true);
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/KafkaSinglePortByteArrayInputOperator.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/KafkaSinglePortByteArrayInputOperator.java new file mode 100644 index 0000000..b47ad9a --- /dev/null +++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/KafkaSinglePortByteArrayInputOperator.java
@@ -0,0 +1,35 @@ +/* + * Copyright (c) 2012-2015 Malhar, Inc. + * All Rights Reserved. + */ +package com.datatorrent.demos.dimensions.generic; + +import com.datatorrent.contrib.kafka.AbstractKafkaSinglePortInputOperator; +import java.nio.ByteBuffer; +import kafka.message.Message; + + public class KafkaSinglePortByteArrayInputOperator extends AbstractKafkaSinglePortInputOperator<byte[]> + { + + /** + * Implement abstract method of AbstractKafkaSinglePortInputOperator + * + * @param message + * @return byte Array + */ + @Override + public byte[] getTuple(Message message) + { + byte[] bytes = null; + try { + ByteBuffer buffer = message.payload(); + bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + } + catch (Exception ex) { + return bytes; + } + return bytes; + } + + }
diff --git a/demos/dimensions/src/main/resources/META-INF/properties.xml b/demos/dimensions/src/main/resources/META-INF/properties.xml index 32a2957..a14aab6 100644 --- a/demos/dimensions/src/main/resources/META-INF/properties.xml +++ b/demos/dimensions/src/main/resources/META-INF/properties.xml
@@ -97,4 +97,8 @@ <name>dt.application.GenericDimensionsWithCsvMapParser.operator.Parser.fieldmappingFile</name> <value>/tmp/test.txt</value> </property> + <property> + <name>dt.application.CsvParserFileMappingInputApplication.operator.CsvParser.fieldmappingFile</name> + <value>/tmp/test.txt</value> + </property> </configuration>