blob: 6652b7907d295ecd9eb696e224ba80f9cdb6885e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.examples.wordcount;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.io.ConsoleOutputOperator;
/**
* Simple Word Count Example : <br>
* This is application to count total occurrence of each word from file or any
* stream. <br>
* <br>
*
* Functional Description : <br>
* This example declares custom input operator to read data file set by user. <br>
* This input operator can be replaced by any stream input operator. <br>
* <br>
*
* Custom Attribute(s) : None <br>
* <br>
*
* Input Adapter : <br>
* Word input operator opens user specified data file and streams each line to
* application. <br>
* <br>
*
* Output Adapter : <br>
* Output values are written to console through ConsoleOutputOerator<br>
* If needed you can use other output adapters<br>
* <br>
* <p>
* Running Java Test or Main app in IDE:
*
* <pre>
* LocalMode.runApp(new Application(), 600000); // 10 min run
* </pre>
*
* Run Success : <br>
* For successful deployment and run, user should see following output on console: <br>
* </pre>
* {developed=1} {bush’s=2} {roster=1} {council=1} {mankiw=1} {academia=1}
* {of=6} {help=1} {are=1} {presidential=1}
* </pre> <br> <br>
*
* Scaling Options : <br>
* This operator app can not be scaled, please look at implementation {@link com.datatorrent.lib.algo.UniqueCounterEach} <br> <br>
*
* Application DAG : <br>
* <img src="doc-files/UniqueWordCounter.jpg" width=600px > <br>
*
* Streaming Window Size : 500ms
* Operator Details : <br>
* <ul>
* <li>
* <p><b> The operator wordinput : </b> This operator opens local file, reads each line and sends each word to application.
* This can replaced by any input stream by user. <br>
* Class : {@link WordCountInputOperator} <br>
* Operator Application Window Count : 1 <br>
* StateFull : No
* </li>
* <li>
* <p><b> The operator count : </b> This operator aggregates unique key count over one window count(app). <br>
* Class : {@link com.datatorrent.lib.algo.UniqueCounterEach} <br>
* Operator Application Window Count : 1 <br>
* StateFull : No
* </li>
* <li>
* <p><b>The operator Console: </b> This operator just outputs the input tuples to the console (or stdout).
* This case it emits unique count of each word over 500ms.
* </li>
* </ul>
*
* @since 0.3.2
*/
@ApplicationAnnotation(name = "WordCountExample")
public class Application implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
WordCountInputOperator input = dag.addOperator("wordinput", new WordCountInputOperator());
UniqueCounter<String> wordCount = dag.addOperator("count", new UniqueCounter<String>());
dag.addStream("wordinput-count", input.outputPort, wordCount.data);
ConsoleOutputOperator consoleOperator = dag.addOperator("console", new ConsoleOutputOperator());
dag.addStream("count-console",wordCount.count, consoleOperator.input);
}
}