blob: 7e55b63f3c52822dd73b70ccd7ed5345de3c4b62 [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.dimensions.generic;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.stream.DevNullCounter;
import org.apache.hadoop.conf.Configuration;
/**
* DimensionsDemo run with HDHT
*
* Example of configuration
<pre>
{@code
<property>
<name>dt.application.AdsDimensionsGenericDemo.class</name>
<value>com.datatorrent.demos.adsdimension.GenericApplication</value>
</property>
<property>
<name>dt.application.AdsDimensionsGenericDemo.attr.containerMemoryMB</name>
<value>8192</value>
</property>
<property>
<name>dt.application.AdsDimensionsGenericDemo.attr.containerJvmOpts</name>
<value>-Xmx6g -server -Dlog4j.debug=true -Xloggc:&lt;LOG_DIR&gt;/gc.log -verbose:gc -XX:+PrintGCDateStamps</value>
</property>
<property>
<name>dt.application.AdsDimensionsGenericDemo.port.*.attr.QUEUE_CAPACITY</name>
<value>32000</value>
</property>
<property>
<name>dt.operator.InputGenerator.attr.PARTITIONER</name>
<value>com.datatorrent.lib.partitioner.StatelessPartitioner:8</value>
</property>
<property>
<name>dt.operator.DimensionsComputation.attr.APPLICATION_WINDOW_COUNT</name>
<value>4</value>
</property>
<property>
<name>dt.operator.DimensionsComputation.port.data.attr.PARTITION_PARALLEL</name>
<value>true</value>
</property>
<property>
<name>dt.operator.Store.attr.PARTITIONER</name>
<value>com.datatorrent.lib.partitioner.StatelessPartitioner:4</value>
</property>
<property>
<name>dt.operator.Store.fileStore.basePath</name>
<value>AdsDimensionWithHDS</value>
</property>
<property>
<name>dt.operator.Query.topic</name>
<value>HDSQuery</value>
</property>
<property>
<name>dt.operator.QueryResult.topic</name>
<value>HDSQueryResult</value>
</property>
<property>
<name>dt.operator.Query.brokerSet</name>
<value>localhost:9092</value>
</property>
<property>
<name>dt.operator.QueryResult.prop.configProperties(metadata.broker.list)</name>
<value>localhost:9092</value>
</property>
}
</pre>
*
*/
@ApplicationAnnotation(name="DimensionsComputationBenchmark")
public class DimensionsComputationBenchmark implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
JsonAdInfoGenerator input = dag.addOperator("InputGenerator", JsonAdInfoGenerator.class);
JsonToMapConverter converter = dag.addOperator("Converter", JsonToMapConverter.class);
String schemaSpec = conf.get(this.getClass().getName() + ".schema");
SchemaConverter map2eventConverter = dag.addOperator("map2eventConverter", SchemaConverter.class);
if (schemaSpec != null) {
map2eventConverter.setEventSchemaJSON(schemaSpec);
}
GenericDimensionComputation dimensions = dag.addOperator("DimensionsComputation", new GenericDimensionComputation());
dimensions.setSchema(map2eventConverter.getEventSchema());
DevNullCounter<GenericAggregate> counter = dag.addOperator("Conter", new DevNullCounter<GenericAggregate>());
// Removing setLocality(Locality.CONTAINER_LOCAL) from JSONStream and MapStream to isolate performance bottleneck
dag.addStream("JSONStream", input.jsonOutput, converter.input);
dag.addStream("MapStream", converter.outputMap, map2eventConverter.input);
dag.addStream("EventStream", map2eventConverter.output, dimensions.data).setLocality(Locality.THREAD_LOCAL);
dag.addStream("DimensionalData", dimensions.output, counter.data);
}
}