blob: e495ee6f1a91eb8d8879efa25f3fa8d8cd181514 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.apps.logstream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.mutable.MutableDouble;
import com.datatorrent.lib.logs.DimensionObject;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator.Unifier;
import com.datatorrent.apps.logstream.LogstreamUtil.AggregateOperation;
/**
* Unifies the output of dimension operator for every window.
*
* @since 0.9.4
*/
public class DimensionOperatorUnifier implements Unifier<Map<String, DimensionObject<String>>>
{
public final transient DefaultOutputPort<Map<String, DimensionObject<String>>> aggregationsOutput = new DefaultOutputPort<Map<String, DimensionObject<String>>>();
Map<String, DimensionObject<String>> unifiedOutput;
private Map<String, Map<String, Map<AggregateOperation, Number>>> unifiedCache = new HashMap<String, Map<String, Map<AggregateOperation, Number>>>();
private transient boolean firstTuple = true;
private HashMap<String, Number> recordType = new HashMap<String, Number>();
private static final Logger logger = LoggerFactory.getLogger(DimensionOperatorUnifier.class);
@Override
public void process(Map<String, DimensionObject<String>> tuple)
{
if (firstTuple) {
extractType(tuple);
firstTuple = false;
}
// tuple will have one record each per metric type. currently 3 record one for each of SUM, COUNT, AVERAGE
Iterator<Entry<String, DimensionObject<String>>> iterator = tuple.entrySet().iterator();
String randomKey = null;
String key = null;
if (iterator.hasNext()) {
randomKey = iterator.next().getKey();
String[] split = randomKey.split("\\.");
key = split[0];
}
String[] split = randomKey.split("\\|");
Number receivedFilter = new Integer(split[3]);
Number expectedFilter = recordType.get(LogstreamUtil.FILTER);
if (!receivedFilter.equals(expectedFilter)) {
logger.error("Unexpected tuple");
logger.error("expected filter = {} received = {}", expectedFilter, receivedFilter);
}
computeAddition(tuple, AggregateOperation.SUM, key);
computeAddition(tuple, AggregateOperation.COUNT, key);
computeAverage(tuple, AggregateOperation.AVERAGE, key);
}
private void computeAddition(Map<String, DimensionObject<String>> tuple, AggregateOperation opType, String key)
{
String finalKey = key + "." + opType.name();
if (tuple.containsKey(finalKey)) {
DimensionObject<String> dimObj = tuple.get(finalKey);
if (unifiedCache.containsKey(key)) {
Map<String, Map<AggregateOperation, Number>> cacheAggrs = unifiedCache.get(key);
if (cacheAggrs.containsKey(dimObj.getVal())) {
Map<AggregateOperation, Number> cacheAggr = cacheAggrs.get(dimObj.getVal());
if (cacheAggr.containsKey(opType)) {
double cacheVal = cacheAggr.get(opType).doubleValue();
double newVal = dimObj.getCount().doubleValue();
double finalVal = cacheVal + newVal;
cacheAggr.put(opType, finalVal);
}
else {
cacheAggr.put(opType, dimObj.getCount().doubleValue());
}
}
else {
Map<AggregateOperation, Number> newAggrs = new HashMap<AggregateOperation, Number>();
newAggrs.put(opType, dimObj.getCount().doubleValue());
cacheAggrs.put(dimObj.getVal(), newAggrs);
}
}
else {
Map<AggregateOperation, Number> newAggrs = new HashMap<AggregateOperation, Number>();
Map<String, Map<AggregateOperation, Number>> cacheAggrs = new HashMap<String, Map<AggregateOperation, Number>>();
newAggrs.put(opType, dimObj.getCount().doubleValue());
cacheAggrs.put(dimObj.getVal(), newAggrs);
unifiedCache.put(key, cacheAggrs);
}
}
}
private void computeAverage(Map<String, DimensionObject<String>> tuple, AggregateOperation opType, String key)
{
String finalKey = key + "." + opType.name();
if (tuple.containsKey(finalKey)) {
DimensionObject<String> dimObj = tuple.get(finalKey);
if (unifiedCache.containsKey(key)) {
Map<String, Map<AggregateOperation, Number>> cacheAggrs = unifiedCache.get(key);
if (cacheAggrs.containsKey(dimObj.getVal())) {
Map<AggregateOperation, Number> cacheAggr = cacheAggrs.get(dimObj.getVal());
if (cacheAggr.containsKey(opType)) {
double cacheAvg = cacheAggr.get(opType).doubleValue();
double cacheCount = cacheAggr.get(AggregateOperation.COUNT).doubleValue(); // this is total count, should be computed before calling average
double newAvg = dimObj.getCount().doubleValue();
double newCount = tuple.get(key + "." + AggregateOperation.COUNT.name()).getCount().doubleValue();
double finalVal = (cacheAvg * (cacheCount - newCount) + newAvg * newCount) / cacheCount;
cacheAggr.put(opType, finalVal);
}
else {
cacheAggr.put(opType, dimObj.getCount().doubleValue());
}
}
else {
Map<AggregateOperation, Number> newAggrs = new HashMap<AggregateOperation, Number>();
newAggrs.put(opType, dimObj.getCount().doubleValue());
cacheAggrs.put(dimObj.getVal(), newAggrs);
}
}
else {
Map<AggregateOperation, Number> newAggrs = new HashMap<AggregateOperation, Number>();
Map<String, Map<AggregateOperation, Number>> cacheAggrs = new HashMap<String, Map<AggregateOperation, Number>>();
newAggrs.put(opType, dimObj.getCount().doubleValue());
cacheAggrs.put(dimObj.getVal(), newAggrs);
unifiedCache.put(key, cacheAggrs);
}
}
}
@Override
public void beginWindow(long l)
{
unifiedCache = new HashMap<String, Map<String, Map<AggregateOperation, Number>>>();
}
@Override
public void endWindow()
{
Map<String, DimensionObject<String>> outputAggregationsObject;
for (Entry<String, Map<String, Map<AggregateOperation, Number>>> keys : unifiedCache.entrySet()) {
String key = keys.getKey();
Map<String, Map<AggregateOperation, Number>> dimValues = keys.getValue();
for (Entry<String, Map<AggregateOperation, Number>> dimValue : dimValues.entrySet()) {
String dimValueName = dimValue.getKey();
Map<AggregateOperation, Number> operations = dimValue.getValue();
outputAggregationsObject = new HashMap<String, DimensionObject<String>>();
for (Entry<AggregateOperation, Number> operation : operations.entrySet()) {
AggregateOperation aggrOperationType = operation.getKey();
Number aggr = operation.getValue();
String outKey = key + "." + aggrOperationType.name();
DimensionObject<String> outDimObj = new DimensionObject<String>(new MutableDouble(aggr), dimValueName);
outputAggregationsObject.put(outKey, outDimObj);
}
aggregationsOutput.emit(outputAggregationsObject);
}
}
}
@Override
public void setup(OperatorContext t1)
{
}
@Override
public void teardown()
{
}
private void extractType(Map<String, DimensionObject<String>> tuple)
{
Iterator<Entry<String, DimensionObject<String>>> iterator = tuple.entrySet().iterator();
String randomKey = null;
if (iterator.hasNext()) {
randomKey = iterator.next().getKey();
}
String[] split = randomKey.split("\\|");
Number filterId = new Integer(split[3]);
recordType.put(LogstreamUtil.FILTER, filterId);
}
}