separated the convert functionality from DimensionsCOmputation
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/DimensionsComputationBenchmark.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/DimensionsComputationBenchmark.java
index 864bbf5..7e55b63 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/DimensionsComputationBenchmark.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/DimensionsComputationBenchmark.java
@@ -16,6 +16,7 @@
 package com.datatorrent.demos.dimensions.generic;
 
 import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNullCounter;
@@ -105,12 +106,23 @@
   {
     JsonAdInfoGenerator input = dag.addOperator("InputGenerator", JsonAdInfoGenerator.class);
     JsonToMapConverter converter = dag.addOperator("Converter", JsonToMapConverter.class);
+
+    String schemaSpec = conf.get(this.getClass().getName() + ".schema");
+    SchemaConverter map2eventConverter = dag.addOperator("map2eventConverter", SchemaConverter.class);
+    if (schemaSpec != null) {
+      map2eventConverter.setEventSchemaJSON(schemaSpec);
+    }
+
     GenericDimensionComputation dimensions = dag.addOperator("DimensionsComputation", new GenericDimensionComputation());
-    DevNullCounter counter = dag.addOperator("Conter", new DevNullCounter());
+    dimensions.setSchema(map2eventConverter.getEventSchema());
+
+    DevNullCounter<GenericAggregate> counter = dag.addOperator("Conter", new DevNullCounter<GenericAggregate>());
+
 
     // Removing setLocality(Locality.CONTAINER_LOCAL) from JSONStream and MapStream to isolate performance bottleneck
     dag.addStream("JSONStream", input.jsonOutput, converter.input);
-    dag.addStream("MapStream", converter.outputMap, dimensions.data);
+    dag.addStream("MapStream", converter.outputMap, map2eventConverter.input);
+    dag.addStream("EventStream", map2eventConverter.output, dimensions.data).setLocality(Locality.THREAD_LOCAL);
     dag.addStream("DimensionalData", dimensions.output, counter.data);
   }
 
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/EventSchema.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/EventSchema.java
index 0233438..a135cb2 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/EventSchema.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/EventSchema.java
@@ -109,17 +109,11 @@
 {
   private static final long serialVersionUID = 4586481500190519858L;
 
-  /* Map of all the fields and their types in an event tuple */
-  public Map<String, Class<?>> fields = Maps.newHashMap();
+  public Map<String, Class<?>> fields;
 
-  /* Names of the fields which make up the keys */
-  public List<String> keys = Lists.newArrayList();
-
-  // Fields to aggregate mapped to aggregate operations and data types
-  public Map<String, String> aggregates = Maps.newHashMap();
-
-  // List of dimensional combinations to compute
-  public List<String> dimensions = Lists.newArrayList();
+  public List<String> keys;
+  public Map<String, String> aggregates;
+  public List<String> dimensions;
 
   public String timestamp = "timestamp";
 
@@ -195,7 +189,9 @@
     ObjectMapper mapper = new ObjectMapper();
     EventSchema eventSchema = mapper.readValue(json, EventSchema.class);
 
-    if ( eventSchema.dimensions.size() == 0 ) throw new IllegalArgumentException("EventSchema JSON must specify dimensions list");
+    if ( eventSchema.dimensions.isEmpty() ) {
+      throw new IllegalArgumentException("EventSchema JSON must specify dimensions list");
+    }
 
     // Generate list of keys from dimensions specified
     Set<String> uniqueKeys = Sets.newHashSet();
@@ -219,6 +215,14 @@
     return eventSchema;
   }
 
+  public EventSchema()
+  {
+    this.dimensions = Lists.newArrayList();
+    this.aggregates = Maps.newHashMap();
+    this.keys = Lists.newArrayList();
+    this.fields = Maps.newHashMap();
+  }
+
   public String getTimestamp() {
     return timestamp;
   }
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericAggregator.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericAggregator.java
index 5be3676..eb8b08a 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericAggregator.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericAggregator.java
@@ -101,12 +101,6 @@
   }
 }
 
-class GenericEvent {
-  Object[] keys;
-  Object[] values;
-  long timestamp;
-}
-
 public class GenericAggregator implements DimensionsComputation.Aggregator<GenericEvent, GenericAggregate>
 {
   private static final long serialVersionUID = 7636266873750826291L;
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputation.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputation.java
index f3b3359..48b37b1 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputation.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputation.java
@@ -1,9 +1,9 @@
 package com.datatorrent.demos.dimensions.generic;
 
-import com.datatorrent.api.Context;
 import com.datatorrent.lib.statistics.DimensionsComputation;
 
-import java.util.Map;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
 
 /**
  * Performs dimensional computations given an event schema.
@@ -33,60 +33,31 @@
  * @tags dimension, aggregation
  *
  */
-public class GenericDimensionComputation extends DimensionsComputation<Object, GenericAggregate>
+public class GenericDimensionComputation extends DimensionsComputation<GenericEvent, GenericAggregate>
 {
-  private String eventSchemaJSON = EventSchema.DEFAULT_SCHEMA_SALES;
-  private transient EventSchema eventSchema;
-
-  // Initialize aggregators when this class is instantiated
+  public void setSchema(EventSchema schema)
   {
-    initAggregators();
+    DimensionsGenerator gen = new DimensionsGenerator(schema);
+    setAggregators(gen.generateAggregators());
   }
 
-  public String getEventSchemaJSON()
-  {
-    return eventSchemaJSON;
-  }
-
-  private void initAggregators(){
-    DimensionsGenerator gen = new DimensionsGenerator(getEventSchema());
-    Aggregator[] aggregators = gen.generateAggregators();
-    setAggregators(aggregators);
-  }
-
-  public void setEventSchemaJSON(String eventSchemaJSON)
-  {
-    this.eventSchemaJSON = eventSchemaJSON;
-    try {
-      eventSchema = EventSchema.createFromJSON(eventSchemaJSON);
-    } catch (Exception e) {
-      throw new IllegalArgumentException("Failed to parse JSON input: " + eventSchemaJSON, e);
-    }
-    initAggregators();
-  }
-
-  public EventSchema getEventSchema() {
-    if (eventSchema == null ) {
-      try {
-        eventSchema = EventSchema.createFromJSON(eventSchemaJSON);
-      } catch (Exception e) {
-        throw new IllegalArgumentException("Failed to parse JSON input: " + eventSchemaJSON, e);
-      }
-    }
-    return eventSchema;
-  }
-
-
-  @Override public void setup(Context.OperatorContext context)
-  {
-    super.setup(context);
-    initAggregators();
-  }
 
   @Override
-  public void processInputTuple(Object tuple)
+  public void setup(OperatorContext context)
   {
-    GenericEvent ae = getEventSchema().convertMapToGenericEvent((Map<String, Object>) tuple);
-    super.processInputTuple(ae);
+    // hack begin!
+    // this hack should be removed when we have application level properties - talk to Sasha/Chetan.
+    try {
+      getAggregators();
+    }
+    catch (NullPointerException npe) {
+      /* means that it's not properly initialized; so initialize it for app builder demo */
+      setSchema(new SchemaConverter().getEventSchema());
+    }
+    // hack end!
+
+
+    super.setup(context); //To change body of generated methods, choose Tools | Templates.
   }
+
 }
\ No newline at end of file
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java
index ec5aa81..9e0299a 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java
@@ -105,6 +105,7 @@
     JsonSalesGenerator input = dag.addOperator("Input", JsonSalesGenerator.class);
     input.setAddProductCategory(true);
     JsonToMapConverter converter = dag.addOperator("Parse", JsonToMapConverter.class);
+    SchemaConverter map2eventConverter = dag.addOperator("Map2EventConverter", SchemaConverter.class);
     GenericDimensionComputation dimensions = dag.addOperator("Compute", new GenericDimensionComputation());
     DimensionStoreOperator store = dag.addOperator("Store", DimensionStoreOperator.class);
     KafkaSinglePortStringInputOperator queries = dag.addOperator("Query", new KafkaSinglePortStringInputOperator());
@@ -115,7 +116,8 @@
 
     // Removing setLocality(Locality.CONTAINER_LOCAL) from JSONStream and MapStream to isolate performance bottleneck
     dag.addStream("JSONStream", input.jsonBytes, converter.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
-    dag.addStream("MapStream", converter.outputMap, dimensions.data).setLocality(DAG.Locality.CONTAINER_LOCAL);
+    dag.addStream("MapStream", converter.outputMap, map2eventConverter.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
+    dag.addStream("EventStream", map2eventConverter.output, dimensions.data).setLocality(DAG.Locality.THREAD_LOCAL);
     dag.addStream("DimensionalData", dimensions.output, store.input);
     dag.addStream("Query", queries.outputPort, store.query);
     dag.addStream("QueryResult", store.queryResult, queryResult.inputPort);
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericEvent.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericEvent.java
new file mode 100644
index 0000000..4dfc7e7
--- /dev/null
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericEvent.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.dimensions.generic;
+
+public class GenericEvent
+{
+  Object[] keys;
+  Object[] values;
+  long timestamp;
+}
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/SchemaConverter.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/SchemaConverter.java
new file mode 100644
index 0000000..b41b694
--- /dev/null
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/SchemaConverter.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.dimensions.generic;
+
+import java.util.Map;
+
+import com.datatorrent.api.BaseOperator;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+
+/**
+ *
+ */
+public class SchemaConverter extends BaseOperator
+{
+  public final DefaultOutputPort<GenericEvent> output = new DefaultOutputPort<GenericEvent>();
+  public final DefaultInputPort<Map<String, Object>> input = new DefaultInputPort<Map<String, Object>>()
+  {
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      output.emit(eventSchema.convertMapToGenericEvent(tuple));
+    }
+  };
+
+  private String eventSchemaJSON;
+  private transient EventSchema eventSchema;
+
+  public SchemaConverter()
+  {
+    this.eventSchemaJSON = EventSchema.DEFAULT_SCHEMA_SALES;
+    getEventSchema();
+  }
+
+  public String getEventSchemaJSON()
+  {
+    return eventSchemaJSON;
+  }
+
+  public final EventSchema getEventSchema()
+  {
+    if (eventSchema == null) {
+      try {
+        eventSchema = EventSchema.createFromJSON(eventSchemaJSON);
+      }
+      catch (Exception e) {
+        throw new IllegalArgumentException("Failed to parse JSON input: " + eventSchemaJSON, e);
+      }
+    }
+    return eventSchema;
+  }
+
+  public void setEventSchemaJSON(String eventSchemaJSON)
+  {
+    this.eventSchemaJSON = eventSchemaJSON;
+    try {
+      eventSchema = EventSchema.createFromJSON(eventSchemaJSON);
+    }
+    catch (Exception e) {
+      throw new IllegalArgumentException("Failed to parse JSON input: " + eventSchemaJSON, e);
+    }
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    setEventSchemaJSON(eventSchemaJSON);
+  }
+
+}
diff --git a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/DimensionComputationPerformanceTest.java b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/DimensionComputationPerformanceTest.java
index 2fb975b..8a801ed 100644
--- a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/DimensionComputationPerformanceTest.java
+++ b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/DimensionComputationPerformanceTest.java
@@ -42,8 +42,10 @@
         "  \"timestamp\": \"timestamp\"\n" +
         "}";
 
+    SchemaConverter converter = new SchemaConverter();
+    converter.setEventSchemaJSON(TEST_SCHEMA_JSON);
     GenericDimensionComputation dimensions = new GenericDimensionComputation();
-    dimensions.setEventSchemaJSON(TEST_SCHEMA_JSON);
+    dimensions.setSchema(converter.getEventSchema());
     dimensions.setup(null);
 
     long start_time = System.currentTimeMillis();
@@ -57,7 +59,7 @@
       tuple.put("clicks", info.getClicks());
       tuple.put("timestamp", info.getTimestamp());
 
-      dimensions.data.process(tuple);
+      dimensions.data.process(converter.getEventSchema().convertMapToGenericEvent(tuple));
     }
     return System.currentTimeMillis() - start_time;
   }
@@ -72,8 +74,10 @@
         "  \"timestamp\": \"timestamp\"\n" +
         "}";
 
+    SchemaConverter converter = new SchemaConverter();
+    converter.setEventSchemaJSON(TEST_SCHEMA_JSON);
     GenericDimensionComputation dimensions = new GenericDimensionComputation();
-    dimensions.setEventSchemaJSON(TEST_SCHEMA_JSON);
+    dimensions.setSchema(converter.getEventSchema());
     dimensions.setup(null);
     KryoSerializableStreamCodec<Object> codec = new KryoSerializableStreamCodec<Object>();
     codec.register(HashMap.class);
@@ -89,9 +93,9 @@
       tuple.put("timestamp", info.getTimestamp());
 
       Slice slice = codec.toByteArray(tuple);
-      Object o = codec.fromByteArray(slice);
-
-      dimensions.data.process(o);
+      @SuppressWarnings("unchecked")
+      Map<String, Object> o = (Map<String, Object>)codec.fromByteArray(slice);
+      dimensions.data.process(converter.getEventSchema().convertMapToGenericEvent(o));
     }
     return System.currentTimeMillis() - start_time;
   }
diff --git a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAggregatorTest.java b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAggregatorTest.java
index 5688e68..db35ac2 100644
--- a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAggregatorTest.java
+++ b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAggregatorTest.java
@@ -288,8 +288,10 @@
   @Test
   public void dimensionComputationTest()
   {
+    SchemaConverter converter = new SchemaConverter();
+    converter.setEventSchemaJSON(GenericAggregateSerializerTest.TEST_SCHEMA_JSON);
     GenericDimensionComputation dimensions = new GenericDimensionComputation();
-    dimensions.setEventSchemaJSON(GenericAggregateSerializerTest.TEST_SCHEMA_JSON);
+    dimensions.setSchema(converter.getEventSchema());
     dimensions.setup(null);
 
     for(int i = 0; i < 10; i++) {
@@ -300,7 +302,7 @@
       event.put("adId", 3);
       event.put("clicks", 10L);
 
-      dimensions.data.process(event);
+      dimensions.data.process(converter.getEventSchema().convertMapToGenericEvent(event));
     }
     System.out.println("Something needs to be done");
   }
diff --git a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputationTest.java b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputationTest.java
index 803e8e5..2ab2573 100644
--- a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputationTest.java
+++ b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputationTest.java
@@ -8,8 +8,10 @@
   @Test
   public void test()
   {
+    SchemaConverter converter = new SchemaConverter();
+    converter.setEventSchemaJSON(GenericAggregateSerializerTest.TEST_SCHEMA_JSON);
     GenericDimensionComputation dc = new GenericDimensionComputation();
-    dc.setEventSchemaJSON(GenericAggregateSerializerTest.TEST_SCHEMA_JSON);
+    dc.setSchema(converter.getEventSchema());
     dc.setup(null);
 
     Assert.assertEquals("Total number of aggregators ", 8, dc.getAggregators().length);
diff --git a/library/src/main/java/com/datatorrent/lib/statistics/DimensionsComputation.java b/library/src/main/java/com/datatorrent/lib/statistics/DimensionsComputation.java
index 15f3e5a..7541303 100644
--- a/library/src/main/java/com/datatorrent/lib/statistics/DimensionsComputation.java
+++ b/library/src/main/java/com/datatorrent/lib/statistics/DimensionsComputation.java
@@ -40,11 +40,12 @@
 /**
  * <p>An implementation of an operator that computes dimensions of events. </p>
  * <p>
+ * @param <EVENT> - Type of the tuple whose attributes are used to define dimensions.
+ * @param <AGGREGATE> - Type of the aggregate event which gets emitted as a result of aggregation.
  * @displayName Dimension Computation
  * @category Statistics
  * @tags event, dimension, aggregation, computation
  *
- * @param <EVENT> - Type of the tuple whose attributes are used to define dimensions.
  * @since 1.0.2
  */
 public class DimensionsComputation<EVENT, AGGREGATE extends DimensionsComputation.AggregateEvent> implements Operator
@@ -73,13 +74,6 @@
     }
   };
 
-  protected void processInputTuple(EVENT tuple)
-  {
-    for (int i = 0; i < aggregatorMaps.length; i++) {
-      aggregatorMaps[i].add(tuple, i);
-    }
-  }
-
   /**
    * Input data port that takes an event.
    */
@@ -88,7 +82,9 @@
     @Override
     public void process(EVENT tuple)
     {
-      processInputTuple(tuple);
+      for (int i = 0; i < aggregatorMaps.length; i++) {
+        aggregatorMaps[i].add(tuple, i);
+      }
     }
   };
 
@@ -384,11 +380,7 @@
 
       AggregatorMap<?, ?> that = (AggregatorMap<?, ?>) o;
 
-      if (aggregator != null ? !aggregator.equals(that.aggregator) : that.aggregator != null) {
-        return false;
-      }
-
-      return true;
+      return !(aggregator != null ? !aggregator.equals(that.aggregator) : that.aggregator != null);
     }
 
     @Override