blob: 3ac6951abdef14de892881ba5d6940587667624b [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.twitter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.*;
import com.datatorrent.api.Context.OperatorContext;
import java.util.*;
/**
*
* WindowedTopCounter is an operator which counts the most often occurring tuples in a sliding window of a specific size.
* The operator expects to receive a map object which contains a set of objects mapped to their respective frequency of
* occurrences. e.g. if we are looking at most commonly occurring names then the operator expects to receive the tuples
* of type Map<String, Intenger> on its input port, and at the end of the window it emits 1 object of type Map<String, Integer>
* with a pre determined size. The emitted object contains the most frequently occurring keys.
*
* @param <T> Type of the key in the map object which is accepted on input port as payload. Note that this key must be HashMap friendly.
* @since 0.3.2
*/
public class WindowedTopCounter<T> extends BaseOperator
{
private static final Logger logger = LoggerFactory.getLogger(WindowedTopCounter.class);
/**
* Input port on which map objects containing keys with their respective frequency as values will be accepted.
*/
public final transient DefaultInputPort<Map<T, Integer>> input = new DefaultInputPort<Map<T, Integer>>()
{
@Override
public void process(Map<T, Integer> map)
{
for (Map.Entry<T, Integer> e : map.entrySet()) {
SlidingContainer<T> holder = objects.get(e.getKey());
if (holder == null) {
holder = new SlidingContainer<T>(e.getKey(), windows);
objects.put(e.getKey(), holder);
}
holder.adjustCount(e.getValue());
}
}
};
/**
* Output port on which a map object containing most frequently occurring keys with their frequency will be emitted.
*/
public final transient DefaultOutputPort<Map<T, Integer>> output = new DefaultOutputPort<Map<T, Integer>>();
private PriorityQueue<SlidingContainer<T>> topCounter;
private int windows;
private int topCount = 10;
private HashMap<T, SlidingContainer<T>> objects = new HashMap<T, SlidingContainer<T>>();
/**
* Set the width of the sliding window.
*
* Sliding window is typically much larger than the dag window. e.g. One may want to measure the most frequently
* occurring keys over the period of 5 minutes. So if dagWindowWidth (which is by default 500ms) is set to 500ms,
* the slidingWindowWidth would be (60 * 5 * 1000 =) 300000.
*
* @param slidingWindowWidth - Sliding window width to be set for this operator, recommended to be multiple of DAG window.
* @param dagWindowWidth - DAG's native window width. It has to be the value of the native window set at the application level.
*/
public void setSlidingWindowWidth(long slidingWindowWidth, int dagWindowWidth)
{
windows = (int)(slidingWindowWidth / dagWindowWidth) + 1;
if (slidingWindowWidth % dagWindowWidth != 0) {
logger.warn("slidingWindowWidth(" + slidingWindowWidth + ") is not exact multiple of dagWindowWidth(" + dagWindowWidth + ")");
}
}
@Override
public void setup(OperatorContext context)
{
topCounter = new PriorityQueue<SlidingContainer<T>>(this.topCount, new TopSpotComparator());
}
@Override
public void beginWindow(long windowId)
{
topCounter.clear();
}
@Override
public void endWindow()
{
Iterator<Map.Entry<T, SlidingContainer<T>>> iterator = objects.entrySet().iterator();
int i = topCount;
/*
* Try to fill the priority queue with the first topCount URLs.
*/
SlidingContainer<T> holder;
while (iterator.hasNext()) {
holder = iterator.next().getValue();
holder.slide();
if (holder.totalCount == 0) {
iterator.remove();
}
else {
topCounter.add(holder);
if (--i == 0) {
break;
}
}
}
logger.debug("objects.size(): {}", objects.size());
/*
* Make room for the new element in the priority queue by deleting the
* smallest one, if we KNOW that the new element is useful to us.
*/
if (i == 0) {
int smallest = topCounter.peek().totalCount;
while (iterator.hasNext()) {
holder = iterator.next().getValue();
holder.slide();
if (holder.totalCount > smallest) {
topCounter.poll();
topCounter.add(holder);
smallest = topCounter.peek().totalCount;
}
else if (holder.totalCount == 0) {
iterator.remove();
}
}
}
/*
* Emit our top URLs without caring for order.
*/
HashMap<T, Integer> map = new HashMap<T, Integer>();
Iterator<SlidingContainer<T>> iterator1 = topCounter.iterator();
while (iterator1.hasNext()) {
final SlidingContainer<T> wh = iterator1.next();
map.put(wh.identifier, wh.totalCount);
}
output.emit(map);
topCounter.clear();
}
@Override
public void teardown()
{
topCounter = null;
objects = null;
}
/**
* Set the count of most frequently occurring keys to emit per map object.
*
* @param count count of the objects in the map emitted at the output port.
*/
public void setTopCount(int count)
{
topCount = count;
}
static class TopSpotComparator implements Comparator<SlidingContainer<?>>
{
@Override
public int compare(SlidingContainer<?> o1, SlidingContainer<?> o2)
{
if (o1.totalCount > o2.totalCount) {
return 1;
}
else if (o1.totalCount < o2.totalCount) {
return -1;
}
return 0;
}
}
}