blob: 389cbb39d86726878f2208e5a186a0fd6ea16099 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.testbench;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
/**
* Filters the tuples as per the filter (pass through percent) and emits them.
* <p>
* The aim is to create another stream representing a subsection of incoming load<p>
* <br>
* Examples of pairs include<br>
* publisher,advertizer<br>
* automobile,model<br>
* <br>
* <b>Ports</b>:
* <b>data</b>: expects HashMap<String,T>
* <b>filter</b>:
* The keys to be inserted are given by the property <b>keys</b>. Users can choose to insert their
* own values via property <b>values</b>.<br>
* For each incoming key users can provide an insertion
* probability for the insert keys. This allows for randomization of the insert key choice<br><br>
* <br>
* Benchmarks: This node has been benchmarked at over 22 million tuples/second in local/inline mode<br>
*
* <b>Tuple Schema</b>: Each tuple is HashMap<String, Double> on both the ports. Currently other schemas are not supported<br>
* <br>
* <b>Properties</b>:
* <br>
* Compile time checks are:<br>
* <br>
* </p>
* @displayName Filter Classifier
* @category Test Bench
* @tags filter
* @since 0.3.2
*/
public class FilterClassifier<T> extends BaseOperator
{
/**
* The input port on which tuples are received.
*/
public final transient DefaultInputPort<HashMap<String, T>> data = new DefaultInputPort<HashMap<String, T>>()
{
@Override
public void process(HashMap<String, T> tuple)
{
int fval = random.nextInt(total_filter);
if (fval >= pass_filter) {
return;
}
// Now insertion needs to be done
for (Map.Entry<String, T> e: tuple.entrySet()) {
String[] twokeys = e.getKey().split(",");
if (twokeys.length != 2) {
continue;
}
String inkey = twokeys[1];
ArrayList<Integer> alist;
if (inkeys != null) {
alist = inkeys.get(inkey);
} else {
alist = noweight;
}
// now alist are the weights
int rval = random.nextInt(alist.get(alist.size() - 1));
int j = 0;
int wval = 0;
for (Integer ew: alist) {
wval += ew.intValue();
if (wval > rval) {
break;
}
j++;
}
HashMap<String, T> otuple = new HashMap<String, T>(1);
String key = wtostr_index.get(j); // the key
T keyval = keys.get(key);
if (keyval == null) {
keyval = e.getValue();
}
otuple.put(key + "," + inkey, keyval);
filter.emit(otuple);
}
}
};
/**
* The output port which emits filtered tuples.
*/
public final transient DefaultOutputPort<HashMap<String, T>> filter = new DefaultOutputPort<HashMap<String, T>>();
HashMap<String, T> keys = new HashMap<String, T>();
HashMap<Integer, String> wtostr_index = new HashMap<Integer, String>();
HashMap<String, ArrayList<Integer>> inkeys = null;
ArrayList<Integer> noweight = null;
int total_weight = 0;
int pass_filter = 0;
int total_filter = 0;
private Random random = new Random();
public void setPassFilter(int i)
{
pass_filter = i;
}
public void setTotalFilter(int i)
{
total_filter = i;
}
public void setKeyMap(HashMap<String, T> map)
{
int i = 0;
// First load up the keys and the index hash (wtostr_index) for randomization to work
for (Map.Entry<String, T> e: map.entrySet()) {
keys.put(e.getKey(), e.getValue());
wtostr_index.put(i, e.getKey());
i += 1;
}
}
public void setKeyWeights(HashMap<String, ArrayList<Integer>> map)
{
if (inkeys == null) {
inkeys = new HashMap<String, ArrayList<Integer>>();
}
for (Map.Entry<String, ArrayList<Integer>> e: map.entrySet()) {
inkeys.put(e.getKey(), e.getValue());
}
for (Map.Entry<String, ArrayList<Integer>> e: inkeys.entrySet()) {
ArrayList<Integer> list = e.getValue();
int total = 0;
for (Integer i: list) {
total += i.intValue();
}
list.add(total);
}
}
@Override
public void setup(OperatorContext context)
{
noweight = new ArrayList<Integer>();
for (int i = 0; i < keys.size(); i++) {
noweight.add(100); // Even distribution
total_weight += 100;
}
noweight.add(total_weight);
if (pass_filter > total_filter) {
throw new IllegalArgumentException(String.format("Pass filter (%d) cannot be >= Total filter (%d)", pass_filter, total_filter));
}
}
}