| /* |
| * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.datatorrent.demos.dimensions.generic; |
| |
| import com.datatorrent.api.DAG; |
| import com.datatorrent.api.DAG.Locality; |
| import com.datatorrent.api.StreamingApplication; |
| import com.datatorrent.api.annotation.ApplicationAnnotation; |
| import com.datatorrent.lib.stream.DevNullCounter; |
| import org.apache.hadoop.conf.Configuration; |
| |
| /** |
| * DimensionsDemo run with HDHT |
| * |
| * Example of configuration |
| <pre> |
| {@code |
| <property> |
| <name>dt.application.AdsDimensionsGenericDemo.class</name> |
| <value>com.datatorrent.demos.adsdimension.GenericApplication</value> |
| </property> |
| |
| <property> |
| <name>dt.application.AdsDimensionsGenericDemo.attr.containerMemoryMB</name> |
| <value>8192</value> |
| </property> |
| |
| <property> |
| <name>dt.application.AdsDimensionsGenericDemo.attr.containerJvmOpts</name> |
| <value>-Xmx6g -server -Dlog4j.debug=true -Xloggc:<LOG_DIR>/gc.log -verbose:gc -XX:+PrintGCDateStamps</value> |
| </property> |
| |
| <property> |
| <name>dt.application.AdsDimensionsGenericDemo.port.*.attr.QUEUE_CAPACITY</name> |
| <value>32000</value> |
| </property> |
| |
| <property> |
| <name>dt.operator.InputGenerator.attr.PARTITIONER</name> |
| <value>com.datatorrent.lib.partitioner.StatelessPartitioner:8</value> |
| </property> |
| |
| <property> |
| <name>dt.operator.DimensionsComputation.attr.APPLICATION_WINDOW_COUNT</name> |
| <value>4</value> |
| </property> |
| |
| <property> |
| <name>dt.operator.DimensionsComputation.port.data.attr.PARTITION_PARALLEL</name> |
| <value>true</value> |
| </property> |
| |
| <property> |
| <name>dt.operator.Store.attr.PARTITIONER</name> |
| <value>com.datatorrent.lib.partitioner.StatelessPartitioner:4</value> |
| </property> |
| |
| <property> |
| <name>dt.operator.Store.fileStore.basePath</name> |
| <value>AdsDimensionWithHDS</value> |
| </property> |
| |
| <property> |
| <name>dt.operator.Query.topic</name> |
| <value>HDSQuery</value> |
| </property> |
| |
| <property> |
| <name>dt.operator.QueryResult.topic</name> |
| <value>HDSQueryResult</value> |
| </property> |
| |
| <property> |
| <name>dt.operator.Query.brokerSet</name> |
| <value>localhost:9092</value> |
| </property> |
| |
| <property> |
| <name>dt.operator.QueryResult.prop.configProperties(metadata.broker.list)</name> |
| <value>localhost:9092</value> |
| </property> |
| |
| } |
| </pre> |
| * |
| */ |
| @ApplicationAnnotation(name="DimensionsComputationBenchmark") |
| public class DimensionsComputationBenchmark implements StreamingApplication |
| { |
| |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| JsonAdInfoGenerator input = dag.addOperator("InputGenerator", JsonAdInfoGenerator.class); |
| JsonToMapConverter converter = dag.addOperator("Converter", JsonToMapConverter.class); |
| |
| String schemaSpec = conf.get(this.getClass().getName() + ".schema"); |
| SchemaConverter map2eventConverter = dag.addOperator("map2eventConverter", SchemaConverter.class); |
| if (schemaSpec != null) { |
| map2eventConverter.setEventSchemaJSON(schemaSpec); |
| } |
| |
| GenericDimensionComputation dimensions = dag.addOperator("DimensionsComputation", new GenericDimensionComputation()); |
| dimensions.setSchema(map2eventConverter.getEventSchema()); |
| |
| DevNullCounter<GenericAggregate> counter = dag.addOperator("Conter", new DevNullCounter<GenericAggregate>()); |
| |
| |
| // Removing setLocality(Locality.CONTAINER_LOCAL) from JSONStream and MapStream to isolate performance bottleneck |
| dag.addStream("JSONStream", input.jsonOutput, converter.input); |
| dag.addStream("MapStream", converter.outputMap, map2eventConverter.input); |
| dag.addStream("EventStream", map2eventConverter.output, dimensions.data).setLocality(Locality.THREAD_LOCAL); |
| dag.addStream("DimensionalData", dimensions.output, counter.data); |
| } |
| |
| } |