blob: b0aa79c31c6b3bc50308ba6e50cce900fc94f154 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.query.aggregate.raw;
import org.apache.eagle.log.entity.EntityQualifierUtils;
import org.apache.eagle.log.entity.GenericMetricEntity;
import org.apache.eagle.log.entity.meta.*;
import org.apache.eagle.log.expression.ExpressionParser;
import org.apache.eagle.query.aggregate.AggregateFunctionType;
import org.apache.eagle.query.parser.TokenConstant;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class RawGroupbyBucket {
private static final Logger LOG = LoggerFactory.getLogger(RawGroupbyBucket.class);
private List<String> aggregatedFields;
private EntityDefinition entityDefinition;
private List<AggregateFunctionType> types;
private SortedMap<GroupbyKey, List<Function>> group2FunctionMap =
new TreeMap<GroupbyKey, List<Function>>(new GroupbyKeyComparator());
public RawGroupbyBucket(List<AggregateFunctionType> types, List<String> aggregatedFields, EntityDefinition ed) {
this.types = types;
this.aggregatedFields = aggregatedFields;
this.entityDefinition = ed;
}
public boolean exists(GroupbyKey key) {
return group2FunctionMap.containsKey(key);
}
public void addDatapoint(GroupbyKey groupbyKey, Map<String, byte[]> values) {
// locate groupby bucket
List<Function> functions = group2FunctionMap.get(groupbyKey);
if (functions == null) {
functions = new ArrayList<Function>();
for (AggregateFunctionType type : types) {
FunctionFactory ff = FunctionFactory.locateFunctionFactory(type);
if (ff == null) {
LOG.error("FunctionFactory of AggregationFunctionType:" + type + " is null");
} else {
functions.add(ff.createFunction());
}
}
group2FunctionMap.put(groupbyKey, functions);
}
ListIterator<Function> e1 = functions.listIterator();
ListIterator<String> e2 = aggregatedFields.listIterator();
while (e1.hasNext() && e2.hasNext()) {
Function f = e1.next();
String aggregatedField = e2.next();
byte[] v = values.get(aggregatedField);
if (f instanceof Function.Count) { // handle count
if (entityDefinition.getMetricDefinition() == null) {
f.run(1.0);
continue;
} else if (v == null) {
aggregatedField = GenericMetricEntity.VALUE_FIELD;
v = values.get(aggregatedField);
}
}
if (v != null) {
Qualifier q = entityDefinition.getDisplayNameMap().get(aggregatedField);
EntitySerDeser<?> serDeser = q.getSerDeser();
// double d = 0.0;
if (serDeser instanceof IntSerDeser) {
double d = (Integer)serDeser.deserialize(v);
f.run(d);
} else if (serDeser instanceof LongSerDeser) {
double d = (Long)serDeser.deserialize(v);
f.run(d);
} else if (serDeser instanceof DoubleSerDeser) {
double d = (Double)serDeser.deserialize(v);
f.run(d);
// TODO: support numeric array type that is not metric
} else if (serDeser instanceof DoubleArraySerDeser) {
double[] d = ((DoubleArraySerDeser) serDeser).deserialize(v);
if (f instanceof Function.Count) {
f.run(d.length);
} else {
for (double i:d) {
f.run(i);
}
}
} else if (serDeser instanceof IntArraySerDeser) {
int[] d = ((IntArraySerDeser) serDeser).deserialize(v);
if (f instanceof Function.Count) {
f.run(d.length);
} else {
for (int i:d) {
f.run(i);
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("EntitySerDeser of field " + aggregatedField
+ " is not IntSerDeser or LongSerDeser or DoubleSerDeser or IntArraySerDeser or DoubleArraySerDeser, default as 0.0");
}
}
} else if (TokenConstant.isExpression(aggregatedField)) {
String expression = TokenConstant.parseExpressionContent(aggregatedField);
try {
Map<String,Double> doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(values, entityDefinition);
if (entityDefinition.getMetricDefinition() == null) {
double value = ExpressionParser.eval(expression,doubleMap);
// LOG.info("DEBUG: Eval "+expression +" = "+value);
f.run(value);
} else {
Qualifier qualifier = entityDefinition.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD);
EntitySerDeser _serDeser = qualifier.getSerDeser();
byte[] valueBytes = values.get(GenericMetricEntity.VALUE_FIELD);
if ( _serDeser instanceof DoubleArraySerDeser) {
double[] d = (double[]) _serDeser.deserialize(valueBytes);
if (f instanceof Function.Count) {
f.run(d.length);
} else {
for (double i:d) {
doubleMap.put(GenericMetricEntity.VALUE_FIELD,i);
f.run(ExpressionParser.eval(expression, doubleMap));
}
}
} else if (_serDeser instanceof IntArraySerDeser) {
int[] d = (int[]) _serDeser.deserialize(valueBytes);
if (f instanceof Function.Count) {
f.run(d.length);
} else {
for (double i : d) {
doubleMap.put(GenericMetricEntity.VALUE_FIELD, i);
f.run(ExpressionParser.eval(expression, doubleMap));
}
}
} else {
double value = ExpressionParser.eval(expression,doubleMap);
f.run(value);
}
}
} catch (Exception e) {
LOG.error("Got exception to evaluate expression: " + expression + ", exception: " + e.getMessage(), e);
}
}
}
}
/**
* expensive operation - create objects and format the result
* @return
*/
public List<GroupbyKeyValue> groupbyKeyValues() {
List<GroupbyKeyValue> results = new ArrayList<GroupbyKeyValue>();
for (Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()) {
GroupbyValue value = new GroupbyValue();
for (Function f : entry.getValue()) {
value.add(new DoubleWritable(f.result()));
value.addMeta(f.count());
}
results.add(new GroupbyKeyValue(entry.getKey(),value));
}
return results;
}
/**
* expensive operation - create objects and format the result
* @return
*/
public Map<List<String>, List<Double>> result() {
Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
for (Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()) {
List<Double> values = new ArrayList<Double>();
for (Function f : entry.getValue()) {
values.add(f.result());
}
GroupbyKey key = entry.getKey();
List<BytesWritable> list1 = key.getValue();
List<String> list2 = new ArrayList<String>();
for (BytesWritable e : list1) {
list2.add(new String(e.copyBytes()));
}
result.put(list2, values);
}
return result;
}
}