blob: e877c42a25551d16e4f3f318405265b55eee7153 [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.lib.util;
import com.datatorrent.api.DefaultInputPort;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.mutable.MutableInt;
/**
* This is the base implementation of an operator, which takes key value pairs as inputs. 
* It counts the number of times each key value pair occurs. 
* The counts of each key value pair are compared and the winning key value pair(s) for each key is emitted. 
* Subclasses should implement the method that is used to emit key value pairs, as well as the comparison method used
* to determine the winning key value pair(s).
* <p>
* This module is an end of window module<br>
* <br>
* Ports:<br>
* <b>data</b>: expects Map<K, V><br>
* </p>
* @displayName Emit Winning Key Value Pair(s)
* @category Algorithmic
* @tags count, compare, key value
* @since 0.3.2
*/
public abstract class AbstractBaseFrequentKeyValueMap<K, V> extends BaseKeyValueOperator<K, V>
{
/**
* This is the input port which receives key value pairs.
*/
public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>()
{
/**
* Process every tuple to count occurrences of key,val pairs
*/
@Override
public void process(Map<K, V> tuple)
{
for (Map.Entry<K, V> e: tuple.entrySet()) {
HashMap<V, MutableInt> vals = keyvals.get(e.getKey());
if (vals == null) {
vals = new HashMap<V, MutableInt>(4);
keyvals.put(cloneKey(e.getKey()), vals);
}
MutableInt count = vals.get(e.getValue());
if (count == null) {
count = new MutableInt(0);
vals.put(cloneValue(e.getValue()), count);
}
count.increment();
}
}
};
HashMap<K, HashMap<V, MutableInt>> keyvals = new HashMap<K, HashMap<V, MutableInt>>();
/**
* Override compareCount to decide most vs least
*
* @param val1
* @param val2
* @return result of compareValue to be done by sub-class
*/
public abstract boolean compareValue(int val1, int val2);
/**
* override emitTuple to decide the port to emit to
*
* @param tuple
*/
public abstract void emitTuple(HashMap<K, HashMap<V, Integer>> tuple);
/**
* Emits the result.
*/
@Override
public void endWindow()
{
HashMap<V, Object> vmap = new HashMap<V, Object>();
for (Map.Entry<K, HashMap<V, MutableInt>> e: keyvals.entrySet()) {
V val = null;
int kval = -1;
vmap.clear();
HashMap<V, MutableInt> vals = e.getValue();
for (Map.Entry<V, MutableInt> v: vals.entrySet()) {
if (kval == -1) {
val = v.getKey();
kval = v.getValue().intValue();
vmap.put(val, null);
}
else if (compareValue(v.getValue().intValue(), kval)) {
val = v.getKey();
kval = v.getValue().intValue();
vmap.clear();
vmap.put(val, null);
}
else if (v.getValue().intValue() == kval) {
vmap.put(v.getKey(), null);
}
}
if ((val != null) && (kval > 0)) { // key is null if no
HashMap<K, HashMap<V, Integer>> tuple = new HashMap<K, HashMap<V, Integer>>(1);
HashMap<V, Integer> valpair = new HashMap<V, Integer>();
for (Map.Entry<V, Object> v: vmap.entrySet()) {
valpair.put(v.getKey(), new Integer(kval));
}
tuple.put(e.getKey(), valpair);
emitTuple(tuple);
}
}
keyvals.clear();
}
}