blob: b873326fb3b994793ca966645d0d650fe697b862 [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.lib.logs;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang.mutable.MutableDouble;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.util.KeyValPair;
/**
* This class aggregates the value of given dimension across windows.
* <p></p>
* @displayName Multi Window Dimension Aggregation
* @category Logs
* @tags aggregation
*
* @since 0.3.4
*/
public class MultiWindowDimensionAggregation implements Operator
{
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(MultiWindowDimensionAggregation.class);
public enum AggregateOperation {
SUM, AVERAGE
};
private int windowSize = 2;
private int currentWindow = 0;
private String timeBucket = "m";
private String dimensionKeyVal = "0";
private List<String> dimensionArrayString;
@NotNull
private List<int[]> dimensionArray;
private AggregateOperation operationType = AggregateOperation.SUM;
private Map<String, Map<String, KeyValPair<MutableDouble, Integer>>> outputMap;
private Map<Integer, Map<String, Map<String, Number>>> cacheOject;
private transient List<Pattern> patternList;
private transient int applicationWindowSize = 500;
/**
* This is the output port which emits aggregated dimensions.
*/
public final transient DefaultOutputPort<Map<String, DimensionObject<String>>> output = new DefaultOutputPort<Map<String, DimensionObject<String>>>();
/**
* This is the input port which receives multi dimensional data.
*/
public final transient DefaultInputPort<Map<String, Map<String, Number>>> data = new DefaultInputPort<Map<String, Map<String, Number>>>() {
@Override
public void process(Map<String, Map<String, Number>> tuple)
{
cacheOject.put(currentWindow, tuple);
for (Map.Entry<String, Map<String, Number>> tupleEntry : tuple.entrySet()) {
String tupleKey = tupleEntry.getKey();
Map<String, Number> tupleValue = tupleEntry.getValue();
int currentPattern = 0;
for (Pattern pattern : patternList) {
Matcher matcher = pattern.matcher(tupleKey);
if (matcher.matches()) {
String currentPatternString = dimensionArrayString.get(currentPattern);
Map<String, KeyValPair<MutableDouble, Integer>> currentPatternMap = outputMap.get(currentPatternString);
if (currentPatternMap == null) {
currentPatternMap = new HashMap<String, KeyValPair<MutableDouble, Integer>>();
outputMap.put(currentPatternString, currentPatternMap);
}
StringBuilder builder = new StringBuilder(matcher.group(2));
for (int i = 1; i < dimensionArray.get(currentPattern).length; i++) {
builder.append("," + matcher.group(i + 2));
}
KeyValPair<MutableDouble, Integer> currentDimensionKeyValPair = currentPatternMap.get(builder.toString());
if (currentDimensionKeyValPair == null) {
currentDimensionKeyValPair = new KeyValPair<MutableDouble, Integer>(new MutableDouble(tupleValue.get(dimensionKeyVal)), 1);
currentPatternMap.put(builder.toString(), currentDimensionKeyValPair);
} else {
currentDimensionKeyValPair.getKey().add(tupleValue.get(dimensionKeyVal));
currentDimensionKeyValPair.setValue(currentDimensionKeyValPair.getValue() + 1);
}
break;
}
currentPattern++;
}
}
}
};
public String getDimensionKeyVal()
{
return dimensionKeyVal;
}
public void setDimensionKeyVal(String dimensionKeyVal)
{
this.dimensionKeyVal = dimensionKeyVal;
}
public String getTimeBucket()
{
return timeBucket;
}
public void setTimeBucket(String timeBucket)
{
this.timeBucket = timeBucket;
}
public List<int[]> getDimensionArray()
{
return dimensionArray;
}
public void setDimensionArray(List<int[]> dimensionArray)
{
this.dimensionArray = dimensionArray;
dimensionArrayString = new ArrayList<String>();
for (int[] e : dimensionArray) {
StringBuilder builder = new StringBuilder("" + e[0]);
for (int i = 1; i < e.length; i++) {
builder.append("," + e[i]);
}
dimensionArrayString.add(builder.toString());
}
}
public int getWindowSize()
{
return windowSize;
}
public void setWindowSize(int windowSize)
{
this.windowSize = windowSize;
}
@Override
public void setup(OperatorContext arg0)
{
if (arg0 != null)
applicationWindowSize = arg0.getValue(OperatorContext.APPLICATION_WINDOW_COUNT);
if (cacheOject == null)
cacheOject = new HashMap<Integer, Map<String, Map<String, Number>>>(windowSize);
if (outputMap == null)
outputMap = new HashMap<String, Map<String, KeyValPair<MutableDouble, Integer>>>();
setUpPatternList();
}
private void setUpPatternList()
{
patternList = new ArrayList<Pattern>();
for (int[] e : dimensionArray) {
Pattern pattern;
StringBuilder builder = new StringBuilder(timeBucket + "\\|(\\d+)");
for (int i = 0; i < e.length; i++) {
builder.append("\\|" + e[i] + ":([^\\|]+)");
}
pattern = Pattern.compile(builder.toString());
patternList.add(pattern);
}
}
@Override
public void teardown()
{
}
@Override
public void beginWindow(long arg0)
{
Map<String, Map<String, Number>> currentWindowMap = cacheOject.get(currentWindow);
if (currentWindowMap == null) {
currentWindowMap = new HashMap<String, Map<String, Number>>();
} else {
for (Map.Entry<String, Map<String, Number>> tupleEntry : currentWindowMap.entrySet()) {
String tupleKey = tupleEntry.getKey();
Map<String, Number> tupleValue = tupleEntry.getValue();
int currentPattern = 0;
for (Pattern pattern : patternList) {
Matcher matcher = pattern.matcher(tupleKey);
if (matcher.matches()) {
String currentPatternString = dimensionArrayString.get(currentPattern);
Map<String, KeyValPair<MutableDouble, Integer>> currentPatternMap = outputMap.get(currentPatternString);
if (currentPatternMap != null) {
StringBuilder builder = new StringBuilder(matcher.group(2));
for (int i = 1; i < dimensionArray.get(currentPattern).length; i++) {
builder.append("," + matcher.group(i + 2));
}
KeyValPair<MutableDouble, Integer> currentDimensionKeyValPair = currentPatternMap.get(builder.toString());
if (currentDimensionKeyValPair != null) {
currentDimensionKeyValPair.getKey().add(0 - tupleValue.get(dimensionKeyVal).doubleValue());
currentDimensionKeyValPair.setValue(currentDimensionKeyValPair.getValue() - 1);
if (currentDimensionKeyValPair.getKey().doubleValue() == 0.0) {
currentPatternMap.remove(builder.toString());
}
}
}
break;
}
currentPattern++;
}
}
}
currentWindowMap.clear();
if (patternList == null || patternList.isEmpty())
setUpPatternList();
}
@Override
public void endWindow()
{
int totalWindowsOccupied = cacheOject.size();
for (Map.Entry<String, Map<String, KeyValPair<MutableDouble, Integer>>> e : outputMap.entrySet()) {
for (Map.Entry<String, KeyValPair<MutableDouble, Integer>> dimensionValObj : e.getValue().entrySet()) {
Map<String, DimensionObject<String>> outputData = new HashMap<String, DimensionObject<String>>();
KeyValPair<MutableDouble, Integer> keyVal = dimensionValObj.getValue();
if (operationType == AggregateOperation.SUM) {
outputData.put(e.getKey(), new DimensionObject<String>(keyVal.getKey(), dimensionValObj.getKey()));
} else if (operationType == AggregateOperation.AVERAGE) {
if (keyVal.getValue() != 0) {
double totalCount = ((double) (totalWindowsOccupied * applicationWindowSize)) / 1000;
outputData.put(e.getKey(), new DimensionObject<String>(new MutableDouble(keyVal.getKey().doubleValue() / totalCount), dimensionValObj.getKey()));
}
}
if (!outputData.isEmpty())
output.emit(outputData);
}
}
currentWindow = (currentWindow + 1) % windowSize;
}
public AggregateOperation getOperationType()
{
return operationType;
}
public void setOperationType(AggregateOperation operationType)
{
this.operationType = operationType;
}
}