blob: a30d0b0b53c423cc9a4c752e6e66f621f11b2186 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.examples.wordcount;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
/**
* Computes word frequencies per file and globally, and writes the top N pairs to an output file
* and to snapshot servers for visualization.
* Currently designed to work with only 1 file at a time; will be enhanced later to support
* multiple files dropped into the monitored directory at the same time.
*
* <p>
* Receives per-window list of pairs (word, frequency) on the input port. When the end of a file
* is reached, expects to get an EOF on the control port; at the next endWindow, the top N words
* and frequencies are computed and emitted to the output ports.
* <p>
* There are 3 output ports: (a) One for the per-file top N counts emitted when the file is fully
* read and is written to the output file. (b) One for the top N counts emitted per window for the
* current file to the snapshot server and (c) One for the global top N counts emitted per window
* to a different snapshot server.
*
* Since the EOF is received by a single operator, this operator cannot be partitionable
*
* @since 3.2.0
*/
public class FileWordCount extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class);
private static final String GLOBAL = "global";
/**
* If {@literal topN > 0}, only data for the topN most frequent words is output; if topN == 0, the
* entire frequency map is output
*/
protected int topN;
/**
* Set to true when an EOF control tuple for the current input file is received; reset to false
* when the corresponding output file has been written.
*/
protected boolean eof = false;
/**
* Last component of path (just the file name)
* incoming value from control tuple
*/
protected String fileName;
/**
* {@literal (word => frequency)} map: current file, all words
*/
protected Map<String, WCPair> wordMapFile = new HashMap<>();
/**
* {@literal (word => frequency)} map: global, all words
*/
protected Map<String, WCPair> wordMapGlobal = new HashMap<>();
/**
* Singleton list with per file data; sent on {@code outputPerFile}
*/
protected transient List<Map<String, Object>> resultPerFile;
/**
* Singleton list with global data; sent on {@code outputGlobal}
*/
protected transient List<Map<String, Object>> resultGlobal;
/**
* Singleton map of {@code fileName} to sorted list of (word, frequency) pairs
*/
protected transient Map<String, Object> resultFileFinal;
/**
* final list of (word, frequency) pairs written to output file
*/
protected transient List<WCPair> fileFinalList;
/**
* Input port on which per-window {@literal (word => frequency)} map is received; the map
* is merged into {@code wordMapFile} and {@code wordMapGlobal}.
*/
public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
{
@Override
public void process(List<WCPair> list)
{
// blend incoming list into wordMapFile and wordMapGlobal
for (WCPair pair : list) {
final String word = pair.word;
WCPair filePair = wordMapFile.get(word);
if (null != filePair) { // word seen previously in current file
WCPair globalPair = wordMapGlobal.get(word); // cannot be null
filePair.freq += pair.freq;
globalPair.freq += pair.freq;
continue;
}
// new word in current file
filePair = new WCPair(word, pair.freq);
wordMapFile.put(word, filePair);
// check global map
WCPair globalPair = wordMapGlobal.get(word); // may be null
if (null != globalPair) { // word seen previously
globalPair.freq += pair.freq;
continue;
}
// word never seen before
globalPair = new WCPair(word, pair.freq);
wordMapGlobal.put(word, globalPair);
}
}
};
/**
* Control port on which the current file name is received to indicate EOF
*/
@InputPortFieldAnnotation(optional = true)
public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
{
@Override
public void process(String msg)
{
if (msg.isEmpty()) { // sanity check
throw new RuntimeException("Empty file path");
}
LOG.info("FileWordCount: EOF for {}, topN = {}", msg, topN);
fileName = msg;
eof = true;
// NOTE: current version only supports processing one file at a time.
}
};
/**
* Output port for current file output
*/
public final transient DefaultOutputPort<List<Map<String, Object>>>
outputPerFile = new DefaultOutputPort<>();
/**
* Output port for global output
*/
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<List<Map<String, Object>>>
outputGlobal = new DefaultOutputPort<>();
/**
* Tuple is singleton map {@code fileName => TopNMap} where {@code TopNMap} is the final
* top N pairs for current file and will be written to the output file; emitted in the
* {@code endWindow()} call after an EOF
*/
public final transient DefaultOutputPort<Map<String, Object>>
fileOutput = new DefaultOutputPort<>();
/**
* Get the number of top (word, frequency) pairs that will be output
*/
public int getTopN()
{
return topN;
}
/**
* Set the number of top (word, frequency) pairs that will be output
* @param n The new number
*/
public void setTopN(int n)
{
topN = n;
}
/**
* {@inheritDoc}
* Initialize various map and list fields
*/
@Override
public void setup(OperatorContext context)
{
if (null == wordMapFile) {
wordMapFile = new HashMap<>();
}
if (null == wordMapGlobal) {
wordMapGlobal = new HashMap<>();
}
resultPerFile = new ArrayList(1);
resultGlobal = new ArrayList(1);
// singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName
resultFileFinal = new HashMap<>(1);
fileFinalList = new ArrayList<>();
}
/**
* {@inheritDoc}
* This is where we do most of the work:
* 1. Sort global map and emit top N pairs
* 2. Sort current file map and emit top N pairs
* 3. If we've seen EOF, emit top N pairs on port connected to file writer and clear all per-file
* data structures.
*/
@Override
public void endWindow()
{
LOG.info("FileWordCount: endWindow for {}, topN = {}", fileName, topN);
if (wordMapFile.isEmpty()) { // no words found
if (eof) { // write empty list to fileOutput port
// got EOF, so output empty list to output file
fileFinalList.clear();
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
resultFileFinal.clear();
}
LOG.info("FileWordCount: endWindow for {}, no words, topN = {}", fileName, topN);
return;
}
LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = {}", fileName, wordMapFile.size(), topN);
// get topN list for this file and, if we have EOF, emit to fileOutput port
// get topN global list and emit to global output port
getTopNMap(wordMapGlobal, resultGlobal);
LOG.info("FileWordCount: resultGlobal.size = {}", resultGlobal.size());
outputGlobal.emit(resultGlobal);
// get topN list for this file and emit to file output port
getTopNMap(wordMapFile, resultPerFile);
LOG.info("FileWordCount: resultPerFile.size = {}", resultPerFile.size());
outputPerFile.emit(resultPerFile);
if (eof) { // got EOF earlier
if (null == fileName) { // need file name to emit topN pairs to file writer
throw new RuntimeException("EOF but no fileName at endWindow");
}
// so compute final topN list from wordMapFile into fileFinalList and emit it
getTopNList(wordMapFile);
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
wordMapFile.clear();
resultFileFinal.clear();
}
}
/**
* Get topN frequencies from map, convert each pair to a singleton map and append to result
* This map is suitable input to AppDataSnapshotServer
* MUST have {@code map.size() > 0} here
*/
private void getTopNMap(final Map<String, WCPair> map, List<Map<String, Object>> result)
{
final ArrayList<WCPair> list = new ArrayList<>(map.values());
// sort entries in descending order of frequency
Collections.sort(list, new Comparator<WCPair>()
{
@Override
public int compare(WCPair o1, WCPair o2)
{
return (int)(o2.freq - o1.freq);
}
});
if (topN > 0) {
list.subList(topN, map.size()).clear(); // retain only the first topN entries
}
// convert each pair (word, freq) of list to a map with 2 elements
// {("word": <word>, "count": freq)} and append to list
//
result.clear();
for (WCPair pair : list) {
Map<String, Object> wmap = new HashMap<>(2);
wmap.put("word", pair.word);
wmap.put("count", pair.freq);
result.add(wmap);
}
LOG.info("FileWordCount:getTopNMap: result.size = {}", result.size());
list.clear();
}
/**
* Populate fileFinalList with topN frequencies from argument
* This list is suitable input to WordCountWriter which writes it to a file
* MUST have {@code map.size() > 0} here
*/
private void getTopNList(final Map<String, WCPair> map)
{
fileFinalList.clear();
fileFinalList.addAll(map.values());
// sort entries in descending order of frequency
Collections.sort(fileFinalList, new Comparator<WCPair>()
{
@Override
public int compare(WCPair o1, WCPair o2)
{
return (int)(o2.freq - o1.freq);
}
});
if (topN > 0) {
fileFinalList.subList(topN, map.size()).clear(); // retain only the first topN entries
}
LOG.info("FileWordCount:getTopNList: fileFinalList.size = {}", fileFinalList.size());
}
}