blob: 9e0299a3e44dd49f4ba7fd17a493295d8145d64d [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.dimensions.generic;
import com.datatorrent.api.Context;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
/**
* DimensionsDemo run with HDHT
*
* Following settings are provided with properties.xml set by default, but can be modified in local dt-site.xml
<pre>
{@code
<property>
<name>dt.application.GenericDimensionsApplication.operator.*.attr.MEMORY_MB</name>
<value>8192</value>
</property>
<property>
<name>dt.application.GenericDimensionsApplication.port.*.attr.QUEUE_CAPACITY</name>
<value>32000</value>
</property>
<property>
<name>dt.application.GenericDimensionsApplication.operator.InputGenerator.attr.PARTITIONER</name>
<value>com.datatorrent.lib.partitioner.StatelessPartitioner:2</value>
</property>
<property>
<name>dt.application.GenericDimensionsApplication.operator.InputGenerator.maxTuplesPerWindow</name>
<value>40000</value>
</property>
<property>
<name>dt.application.GenericDimensionsApplication.operator.DimensionsComputation.attr.APPLICATION_WINDOW_COUNT</name>
<value>4</value>
</property>
<property>
<name>dt.application.GenericDimensionsApplication.operator.DimensionsStore.attr.PARTITIONER</name>
<value>com.datatorrent.lib.partitioner.StatelessPartitioner:4</value>
</property>
<property>
<name>dt.application.GenericDimensionsApplication.operator.DimensionsStore.fileStore.basePath</name>
<value>GenericDimensionsApplication</value>
</property>
<property>
<name>dt.application.GenericDimensionsApplication.operator.DimensionsStore.prop.maxCacheSize</name>
<value>5</value>
</property>
<property>
<name>dt.application.GenericDimensionsApplication.operator.Query.topic</name>
<value>GenericDimensionsQuery</value>
</property>
<property>
<name>dt.application.GenericDimensionsApplication.operator.QueryResult.topic</name>
<value>GenericDimensionsQueryResult</value>
</property>
}
</pre>
*
*
*
* Following settings should be provided by user and modified to reflect local Kafka settings
*
*
<pre>
{@code
<property>
<name>dt.operator.Query.brokerSet</name>
<value>localhost:9092</value>
</property>
<property>
<name>dt.operator.QueryResult.prop.configProperties(metadata.broker.list)</name>
<value>localhost:9092</value>
</property>
}
</pre>
*
*/
@ApplicationAnnotation(name="GenericDimensionsApplication")
public class GenericDimensionsApplication implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
JsonSalesGenerator input = dag.addOperator("Input", JsonSalesGenerator.class);
input.setAddProductCategory(true);
JsonToMapConverter converter = dag.addOperator("Parse", JsonToMapConverter.class);
SchemaConverter map2eventConverter = dag.addOperator("Map2EventConverter", SchemaConverter.class);
GenericDimensionComputation dimensions = dag.addOperator("Compute", new GenericDimensionComputation());
DimensionStoreOperator store = dag.addOperator("Store", DimensionStoreOperator.class);
KafkaSinglePortStringInputOperator queries = dag.addOperator("Query", new KafkaSinglePortStringInputOperator());
KafkaSinglePortOutputOperator<Object, Object> queryResult = dag.addOperator("QueryResult", new KafkaSinglePortOutputOperator<Object, Object>());
dag.setInputPortAttribute(converter.input, Context.PortContext.PARTITION_PARALLEL, true);
dag.setInputPortAttribute(dimensions.data, Context.PortContext.PARTITION_PARALLEL, true);
// Removing setLocality(Locality.CONTAINER_LOCAL) from JSONStream and MapStream to isolate performance bottleneck
dag.addStream("JSONStream", input.jsonBytes, converter.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
dag.addStream("MapStream", converter.outputMap, map2eventConverter.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
dag.addStream("EventStream", map2eventConverter.output, dimensions.data).setLocality(DAG.Locality.THREAD_LOCAL);
dag.addStream("DimensionalData", dimensions.output, store.input);
dag.addStream("Query", queries.outputPort, store.query);
dag.addStream("QueryResult", store.queryResult, queryResult.inputPort);
}
}