changes.
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserApplicationWithFieldMappingFileInput.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserApplicationWithFieldMappingFileInput.java
index 8fc7660..664b1f3 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserApplicationWithFieldMappingFileInput.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserApplicationWithFieldMappingFileInput.java
@@ -19,14 +19,11 @@
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.common.util.DTThrowable;
-import com.datatorrent.contrib.kafka.AbstractKafkaSinglePortInputOperator;
 import com.datatorrent.lib.stream.DevNull;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import java.nio.ByteBuffer;
 import java.util.Map;
-import kafka.message.Message;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,7 +38,7 @@
   {
     dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
 
-    KafkaSinglePortStringInputOperator kafkaStringInput = dag.addOperator("KafkaStringInput", new KafkaSinglePortStringInputOperator());
+    KafkaSinglePortByteArrayInputOperator kafkaStringInput = dag.addOperator("KafkaStringInput", new KafkaSinglePortByteArrayInputOperator());
     CsvToMapParser parser = dag.addOperator("CsvParser", CsvToMapParser.class);
     String filepath = conf.get("dt.application.CsvParserFileMappingInputApplication.operator.CsvParser.fieldmappingFile");
     parser.setFieldmappingFile(filepath);
@@ -116,32 +113,6 @@
     logger.debug("Written data to HDFS file.");
   }
 
-  public static class KafkaSinglePortStringInputOperator extends AbstractKafkaSinglePortInputOperator<byte[]>
-  {
-
-    /**
-     * Implement abstract method of AbstractKafkaSinglePortInputOperator
-     *
-     * @param message
-     * @return byte Array
-     */
-    @Override
-    public byte[] getTuple(Message message)
-    {
-      byte[] bytes = null;
-      try {
-        ByteBuffer buffer = message.payload();
-        bytes = new byte[buffer.remaining()];
-        buffer.get(bytes);
-      }
-      catch (Exception ex) {
-        return bytes;
-      }
-      return bytes;
-    }
-
-  }
-
   private static final Logger logger = LoggerFactory.getLogger(CsvToMapParserApplicationWithFieldMappingFileInput.class);
 
 }
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserTestApplication.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserTestApplication.java
index cb2b367..946078d 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserTestApplication.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/CsvToMapParserTestApplication.java
@@ -18,15 +18,12 @@
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.kafka.AbstractKafkaSinglePortInputOperator;
 import com.datatorrent.contrib.parser.AbstractCsvParser.Field;
 import com.datatorrent.lib.stream.DevNull;
 
 import org.apache.hadoop.conf.Configuration;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Map;
-import kafka.message.Message;
 
 @ApplicationAnnotation(name="CsvToMapParserTestApplication")
 public class CsvToMapParserTestApplication implements StreamingApplication
@@ -37,7 +34,7 @@
   {
     dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000);
 
-    KafkaSinglePortStringInputOperator kafkaStringInput = dag.addOperator("KafkaStringInput", new KafkaSinglePortStringInputOperator());
+    KafkaSinglePortByteArrayInputOperator kafkaStringInput = dag.addOperator("KafkaStringInput", new KafkaSinglePortByteArrayInputOperator());
     CsvToMapParser parser = dag.addOperator("CsvToMapParser", CsvToMapParser.class);
 
     ArrayList<CsvToMapParser.Field> fields= new ArrayList<CsvToMapParser.Field>();
@@ -85,29 +82,4 @@
     dag.addStream("Parser2DevNull",parser.output,devNull.data);
   }
 
-  public static class KafkaSinglePortStringInputOperator extends AbstractKafkaSinglePortInputOperator<byte[]>
-  {
-
-    /**
-     * Implement abstract method of AbstractKafkaSinglePortInputOperator
-     * @param message
-     * @return byte Array
-     */
-    @Override
-    public byte[] getTuple(Message message)
-    {
-      byte[] bytes = null;
-      try {
-        ByteBuffer buffer = message.payload();
-        bytes = new byte[buffer.remaining()];
-        buffer.get(bytes);
-      }
-      catch (Exception ex) {
-        return bytes;
-      }
-      return bytes;
-    }
-
-  }
-
 }
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsWithCsvMapParser.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsWithCsvMapParser.java
index 8053dc8..dd0f998 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsWithCsvMapParser.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsWithCsvMapParser.java
@@ -107,7 +107,7 @@
   public void populateDAG(DAG dag, Configuration conf)
   {
     CsvToMapParser parser = dag.addOperator("Parser", CsvToMapParser.class);
-    String filepath = conf.get("dt.application.CsvParserFileMappingInputApplication.operator.CsvParser.fieldmappingFile");
+    String filepath = conf.get("dt.application.GenericDimensionsWithCsvMapParser.operator.Parser.fieldmappingFile");
     parser.setFieldmappingFile(filepath);
     createFieldMappingFile(filepath);
     parser.setIsHeader(false);
@@ -117,6 +117,9 @@
     DimensionStoreOperator store = dag.addOperator("Store", DimensionStoreOperator.class);
     KafkaSinglePortStringInputOperator queries = dag.addOperator("Query", new KafkaSinglePortStringInputOperator());
     KafkaSinglePortOutputOperator<Object, Object> queryResult = dag.addOperator("QueryResult", new KafkaSinglePortOutputOperator<Object, Object>());
+    KafkaSinglePortByteArrayInputOperator kafkaStringInput = dag.addOperator("KafkaStringInput", new KafkaSinglePortByteArrayInputOperator());
+
+    dag.addStream("Kafka2Parser", kafkaStringInput.outputPort, parser.input);
 
     dag.setInputPortAttribute(parser.input, Context.PortContext.PARTITION_PARALLEL, true);
     dag.setInputPortAttribute(dimensions.data, Context.PortContext.PARTITION_PARALLEL, true);
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/KafkaSinglePortByteArrayInputOperator.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/KafkaSinglePortByteArrayInputOperator.java
new file mode 100644
index 0000000..b47ad9a
--- /dev/null
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/KafkaSinglePortByteArrayInputOperator.java
@@ -0,0 +1,35 @@
+/*
+ *  Copyright (c) 2012-2015 Malhar, Inc.
+ *  All Rights Reserved.
+ */
+package com.datatorrent.demos.dimensions.generic;
+
+import com.datatorrent.contrib.kafka.AbstractKafkaSinglePortInputOperator;
+import java.nio.ByteBuffer;
+import kafka.message.Message;
+
+  public class KafkaSinglePortByteArrayInputOperator extends AbstractKafkaSinglePortInputOperator<byte[]>
+  {
+
+    /**
+     * Implement abstract method of AbstractKafkaSinglePortInputOperator
+     *
+     * @param message
+     * @return byte Array
+     */
+    @Override
+    public byte[] getTuple(Message message)
+    {
+      byte[] bytes = null;
+      try {
+        ByteBuffer buffer = message.payload();
+        bytes = new byte[buffer.remaining()];
+        buffer.get(bytes);
+      }
+      catch (Exception ex) {
+        return bytes;
+      }
+      return bytes;
+    }
+
+  }
diff --git a/demos/dimensions/src/main/resources/META-INF/properties.xml b/demos/dimensions/src/main/resources/META-INF/properties.xml
index 32a2957..a14aab6 100644
--- a/demos/dimensions/src/main/resources/META-INF/properties.xml
+++ b/demos/dimensions/src/main/resources/META-INF/properties.xml
@@ -97,4 +97,8 @@
     <name>dt.application.GenericDimensionsWithCsvMapParser.operator.Parser.fieldmappingFile</name>
     <value>/tmp/test.txt</value>
   </property>
+  <property>
+    <name>dt.application.CsvParserFileMappingInputApplication.operator.CsvParser.fieldmappingFile</name>
+    <value>/tmp/test.txt</value>
+  </property>
 </configuration>