separated the convert functionality from DimensionsCOmputation
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/DimensionsComputationBenchmark.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/DimensionsComputationBenchmark.java
index 864bbf5..7e55b63 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/DimensionsComputationBenchmark.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/DimensionsComputationBenchmark.java
@@ -16,6 +16,7 @@
package com.datatorrent.demos.dimensions.generic;
import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNullCounter;
@@ -105,12 +106,23 @@
{
JsonAdInfoGenerator input = dag.addOperator("InputGenerator", JsonAdInfoGenerator.class);
JsonToMapConverter converter = dag.addOperator("Converter", JsonToMapConverter.class);
+
+ String schemaSpec = conf.get(this.getClass().getName() + ".schema");
+ SchemaConverter map2eventConverter = dag.addOperator("map2eventConverter", SchemaConverter.class);
+ if (schemaSpec != null) {
+ map2eventConverter.setEventSchemaJSON(schemaSpec);
+ }
+
GenericDimensionComputation dimensions = dag.addOperator("DimensionsComputation", new GenericDimensionComputation());
- DevNullCounter counter = dag.addOperator("Conter", new DevNullCounter());
+ dimensions.setSchema(map2eventConverter.getEventSchema());
+
+ DevNullCounter<GenericAggregate> counter = dag.addOperator("Conter", new DevNullCounter<GenericAggregate>());
+
// Removing setLocality(Locality.CONTAINER_LOCAL) from JSONStream and MapStream to isolate performance bottleneck
dag.addStream("JSONStream", input.jsonOutput, converter.input);
- dag.addStream("MapStream", converter.outputMap, dimensions.data);
+ dag.addStream("MapStream", converter.outputMap, map2eventConverter.input);
+ dag.addStream("EventStream", map2eventConverter.output, dimensions.data).setLocality(Locality.THREAD_LOCAL);
dag.addStream("DimensionalData", dimensions.output, counter.data);
}
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/EventSchema.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/EventSchema.java
index 0233438..a135cb2 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/EventSchema.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/EventSchema.java
@@ -109,17 +109,11 @@
{
private static final long serialVersionUID = 4586481500190519858L;
- /* Map of all the fields and their types in an event tuple */
- public Map<String, Class<?>> fields = Maps.newHashMap();
+ public Map<String, Class<?>> fields;
- /* Names of the fields which make up the keys */
- public List<String> keys = Lists.newArrayList();
-
- // Fields to aggregate mapped to aggregate operations and data types
- public Map<String, String> aggregates = Maps.newHashMap();
-
- // List of dimensional combinations to compute
- public List<String> dimensions = Lists.newArrayList();
+ public List<String> keys;
+ public Map<String, String> aggregates;
+ public List<String> dimensions;
public String timestamp = "timestamp";
@@ -195,7 +189,9 @@
ObjectMapper mapper = new ObjectMapper();
EventSchema eventSchema = mapper.readValue(json, EventSchema.class);
- if ( eventSchema.dimensions.size() == 0 ) throw new IllegalArgumentException("EventSchema JSON must specify dimensions list");
+ if ( eventSchema.dimensions.isEmpty() ) {
+ throw new IllegalArgumentException("EventSchema JSON must specify dimensions list");
+ }
// Generate list of keys from dimensions specified
Set<String> uniqueKeys = Sets.newHashSet();
@@ -219,6 +215,14 @@
return eventSchema;
}
+ public EventSchema()
+ {
+ this.dimensions = Lists.newArrayList();
+ this.aggregates = Maps.newHashMap();
+ this.keys = Lists.newArrayList();
+ this.fields = Maps.newHashMap();
+ }
+
public String getTimestamp() {
return timestamp;
}
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericAggregator.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericAggregator.java
index 5be3676..eb8b08a 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericAggregator.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericAggregator.java
@@ -101,12 +101,6 @@
}
}
-class GenericEvent {
- Object[] keys;
- Object[] values;
- long timestamp;
-}
-
public class GenericAggregator implements DimensionsComputation.Aggregator<GenericEvent, GenericAggregate>
{
private static final long serialVersionUID = 7636266873750826291L;
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputation.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputation.java
index f3b3359..48b37b1 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputation.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputation.java
@@ -1,9 +1,9 @@
package com.datatorrent.demos.dimensions.generic;
-import com.datatorrent.api.Context;
import com.datatorrent.lib.statistics.DimensionsComputation;
-import java.util.Map;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
/**
* Performs dimensional computations given an event schema.
@@ -33,60 +33,31 @@
* @tags dimension, aggregation
*
*/
-public class GenericDimensionComputation extends DimensionsComputation<Object, GenericAggregate>
+public class GenericDimensionComputation extends DimensionsComputation<GenericEvent, GenericAggregate>
{
- private String eventSchemaJSON = EventSchema.DEFAULT_SCHEMA_SALES;
- private transient EventSchema eventSchema;
-
- // Initialize aggregators when this class is instantiated
+ public void setSchema(EventSchema schema)
{
- initAggregators();
+ DimensionsGenerator gen = new DimensionsGenerator(schema);
+ setAggregators(gen.generateAggregators());
}
- public String getEventSchemaJSON()
- {
- return eventSchemaJSON;
- }
-
- private void initAggregators(){
- DimensionsGenerator gen = new DimensionsGenerator(getEventSchema());
- Aggregator[] aggregators = gen.generateAggregators();
- setAggregators(aggregators);
- }
-
- public void setEventSchemaJSON(String eventSchemaJSON)
- {
- this.eventSchemaJSON = eventSchemaJSON;
- try {
- eventSchema = EventSchema.createFromJSON(eventSchemaJSON);
- } catch (Exception e) {
- throw new IllegalArgumentException("Failed to parse JSON input: " + eventSchemaJSON, e);
- }
- initAggregators();
- }
-
- public EventSchema getEventSchema() {
- if (eventSchema == null ) {
- try {
- eventSchema = EventSchema.createFromJSON(eventSchemaJSON);
- } catch (Exception e) {
- throw new IllegalArgumentException("Failed to parse JSON input: " + eventSchemaJSON, e);
- }
- }
- return eventSchema;
- }
-
-
- @Override public void setup(Context.OperatorContext context)
- {
- super.setup(context);
- initAggregators();
- }
@Override
- public void processInputTuple(Object tuple)
+ public void setup(OperatorContext context)
{
- GenericEvent ae = getEventSchema().convertMapToGenericEvent((Map<String, Object>) tuple);
- super.processInputTuple(ae);
+ // hack begin!
+ // this hack should be removed when we have application level properties - talk to Sasha/Chetan.
+ try {
+ getAggregators();
+ }
+ catch (NullPointerException npe) {
+ /* means that it's not properly initialized; so initialize it for app builder demo */
+ setSchema(new SchemaConverter().getEventSchema());
+ }
+ // hack end!
+
+
+ super.setup(context); //To change body of generated methods, choose Tools | Templates.
}
+
}
\ No newline at end of file
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java
index ec5aa81..9e0299a 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java
@@ -105,6 +105,7 @@
JsonSalesGenerator input = dag.addOperator("Input", JsonSalesGenerator.class);
input.setAddProductCategory(true);
JsonToMapConverter converter = dag.addOperator("Parse", JsonToMapConverter.class);
+ SchemaConverter map2eventConverter = dag.addOperator("Map2EventConverter", SchemaConverter.class);
GenericDimensionComputation dimensions = dag.addOperator("Compute", new GenericDimensionComputation());
DimensionStoreOperator store = dag.addOperator("Store", DimensionStoreOperator.class);
KafkaSinglePortStringInputOperator queries = dag.addOperator("Query", new KafkaSinglePortStringInputOperator());
@@ -115,7 +116,8 @@
// Removing setLocality(Locality.CONTAINER_LOCAL) from JSONStream and MapStream to isolate performance bottleneck
dag.addStream("JSONStream", input.jsonBytes, converter.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
- dag.addStream("MapStream", converter.outputMap, dimensions.data).setLocality(DAG.Locality.CONTAINER_LOCAL);
+ dag.addStream("MapStream", converter.outputMap, map2eventConverter.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
+ dag.addStream("EventStream", map2eventConverter.output, dimensions.data).setLocality(DAG.Locality.THREAD_LOCAL);
dag.addStream("DimensionalData", dimensions.output, store.input);
dag.addStream("Query", queries.outputPort, store.query);
dag.addStream("QueryResult", store.queryResult, queryResult.inputPort);
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericEvent.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericEvent.java
new file mode 100644
index 0000000..4dfc7e7
--- /dev/null
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericEvent.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.dimensions.generic;
+
+public class GenericEvent
+{
+ Object[] keys;
+ Object[] values;
+ long timestamp;
+}
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/SchemaConverter.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/SchemaConverter.java
new file mode 100644
index 0000000..b41b694
--- /dev/null
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/SchemaConverter.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.dimensions.generic;
+
+import java.util.Map;
+
+import com.datatorrent.api.BaseOperator;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+
+/**
+ *
+ */
+public class SchemaConverter extends BaseOperator
+{
+ public final DefaultOutputPort<GenericEvent> output = new DefaultOutputPort<GenericEvent>();
+ public final DefaultInputPort<Map<String, Object>> input = new DefaultInputPort<Map<String, Object>>()
+ {
+ @Override
+ public void process(Map<String, Object> tuple)
+ {
+ output.emit(eventSchema.convertMapToGenericEvent(tuple));
+ }
+ };
+
+ private String eventSchemaJSON;
+ private transient EventSchema eventSchema;
+
+ public SchemaConverter()
+ {
+ this.eventSchemaJSON = EventSchema.DEFAULT_SCHEMA_SALES;
+ getEventSchema();
+ }
+
+ public String getEventSchemaJSON()
+ {
+ return eventSchemaJSON;
+ }
+
+ public final EventSchema getEventSchema()
+ {
+ if (eventSchema == null) {
+ try {
+ eventSchema = EventSchema.createFromJSON(eventSchemaJSON);
+ }
+ catch (Exception e) {
+ throw new IllegalArgumentException("Failed to parse JSON input: " + eventSchemaJSON, e);
+ }
+ }
+ return eventSchema;
+ }
+
+ public void setEventSchemaJSON(String eventSchemaJSON)
+ {
+ this.eventSchemaJSON = eventSchemaJSON;
+ try {
+ eventSchema = EventSchema.createFromJSON(eventSchemaJSON);
+ }
+ catch (Exception e) {
+ throw new IllegalArgumentException("Failed to parse JSON input: " + eventSchemaJSON, e);
+ }
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ setEventSchemaJSON(eventSchemaJSON);
+ }
+
+}
diff --git a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/DimensionComputationPerformanceTest.java b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/DimensionComputationPerformanceTest.java
index 2fb975b..8a801ed 100644
--- a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/DimensionComputationPerformanceTest.java
+++ b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/DimensionComputationPerformanceTest.java
@@ -42,8 +42,10 @@
" \"timestamp\": \"timestamp\"\n" +
"}";
+ SchemaConverter converter = new SchemaConverter();
+ converter.setEventSchemaJSON(TEST_SCHEMA_JSON);
GenericDimensionComputation dimensions = new GenericDimensionComputation();
- dimensions.setEventSchemaJSON(TEST_SCHEMA_JSON);
+ dimensions.setSchema(converter.getEventSchema());
dimensions.setup(null);
long start_time = System.currentTimeMillis();
@@ -57,7 +59,7 @@
tuple.put("clicks", info.getClicks());
tuple.put("timestamp", info.getTimestamp());
- dimensions.data.process(tuple);
+ dimensions.data.process(converter.getEventSchema().convertMapToGenericEvent(tuple));
}
return System.currentTimeMillis() - start_time;
}
@@ -72,8 +74,10 @@
" \"timestamp\": \"timestamp\"\n" +
"}";
+ SchemaConverter converter = new SchemaConverter();
+ converter.setEventSchemaJSON(TEST_SCHEMA_JSON);
GenericDimensionComputation dimensions = new GenericDimensionComputation();
- dimensions.setEventSchemaJSON(TEST_SCHEMA_JSON);
+ dimensions.setSchema(converter.getEventSchema());
dimensions.setup(null);
KryoSerializableStreamCodec<Object> codec = new KryoSerializableStreamCodec<Object>();
codec.register(HashMap.class);
@@ -89,9 +93,9 @@
tuple.put("timestamp", info.getTimestamp());
Slice slice = codec.toByteArray(tuple);
- Object o = codec.fromByteArray(slice);
-
- dimensions.data.process(o);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> o = (Map<String, Object>)codec.fromByteArray(slice);
+ dimensions.data.process(converter.getEventSchema().convertMapToGenericEvent(o));
}
return System.currentTimeMillis() - start_time;
}
diff --git a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAggregatorTest.java b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAggregatorTest.java
index 5688e68..db35ac2 100644
--- a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAggregatorTest.java
+++ b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAggregatorTest.java
@@ -288,8 +288,10 @@
@Test
public void dimensionComputationTest()
{
+ SchemaConverter converter = new SchemaConverter();
+ converter.setEventSchemaJSON(GenericAggregateSerializerTest.TEST_SCHEMA_JSON);
GenericDimensionComputation dimensions = new GenericDimensionComputation();
- dimensions.setEventSchemaJSON(GenericAggregateSerializerTest.TEST_SCHEMA_JSON);
+ dimensions.setSchema(converter.getEventSchema());
dimensions.setup(null);
for(int i = 0; i < 10; i++) {
@@ -300,7 +302,7 @@
event.put("adId", 3);
event.put("clicks", 10L);
- dimensions.data.process(event);
+ dimensions.data.process(converter.getEventSchema().convertMapToGenericEvent(event));
}
System.out.println("Something needs to be done");
}
diff --git a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputationTest.java b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputationTest.java
index 803e8e5..2ab2573 100644
--- a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputationTest.java
+++ b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericDimensionComputationTest.java
@@ -8,8 +8,10 @@
@Test
public void test()
{
+ SchemaConverter converter = new SchemaConverter();
+ converter.setEventSchemaJSON(GenericAggregateSerializerTest.TEST_SCHEMA_JSON);
GenericDimensionComputation dc = new GenericDimensionComputation();
- dc.setEventSchemaJSON(GenericAggregateSerializerTest.TEST_SCHEMA_JSON);
+ dc.setSchema(converter.getEventSchema());
dc.setup(null);
Assert.assertEquals("Total number of aggregators ", 8, dc.getAggregators().length);
diff --git a/library/src/main/java/com/datatorrent/lib/statistics/DimensionsComputation.java b/library/src/main/java/com/datatorrent/lib/statistics/DimensionsComputation.java
index 15f3e5a..7541303 100644
--- a/library/src/main/java/com/datatorrent/lib/statistics/DimensionsComputation.java
+++ b/library/src/main/java/com/datatorrent/lib/statistics/DimensionsComputation.java
@@ -40,11 +40,12 @@
/**
* <p>An implementation of an operator that computes dimensions of events. </p>
* <p>
+ * @param <EVENT> - Type of the tuple whose attributes are used to define dimensions.
+ * @param <AGGREGATE> - Type of the aggregate event which gets emitted as a result of aggregation.
* @displayName Dimension Computation
* @category Statistics
* @tags event, dimension, aggregation, computation
*
- * @param <EVENT> - Type of the tuple whose attributes are used to define dimensions.
* @since 1.0.2
*/
public class DimensionsComputation<EVENT, AGGREGATE extends DimensionsComputation.AggregateEvent> implements Operator
@@ -73,13 +74,6 @@
}
};
- protected void processInputTuple(EVENT tuple)
- {
- for (int i = 0; i < aggregatorMaps.length; i++) {
- aggregatorMaps[i].add(tuple, i);
- }
- }
-
/**
* Input data port that takes an event.
*/
@@ -88,7 +82,9 @@
@Override
public void process(EVENT tuple)
{
- processInputTuple(tuple);
+ for (int i = 0; i < aggregatorMaps.length; i++) {
+ aggregatorMaps[i].add(tuple, i);
+ }
}
};
@@ -384,11 +380,7 @@
AggregatorMap<?, ?> that = (AggregatorMap<?, ?>) o;
- if (aggregator != null ? !aggregator.equals(that.aggregator) : that.aggregator != null) {
- return false;
- }
-
- return true;
+ return !(aggregator != null ? !aggregator.equals(that.aggregator) : that.aggregator != null);
}
@Override