blob: 890b7ea792cb8514ef76d1bf14bf1cce2b73a89d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.query.aggregate;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
public class Aggregator {
private static final Logger LOG = LoggerFactory.getLogger(Aggregator.class);
public static final String GROUPBY_ROOT_FIELD_NAME = "site";
public static final String GROUPBY_ROOT_FIELD_VALUE = "xyz";
public static final String UNASSIGNED_GROUPBY_ROOT_FIELD_NAME = "unassigned";
private final AggregateAPIEntityFactory factory;
private final AggregateAPIEntity root;
private final List<String> groupbys;
private final List<String> sumFunctionFields;
private final boolean counting;
public Aggregator(AggregateAPIEntityFactory factory, AggregateAPIEntity root, List<String> groupbys, boolean counting, List<String> sumFunctionFields) {
this.factory = factory;
this.root = root;
this.groupbys = groupbys;
this.sumFunctionFields = sumFunctionFields;
this.counting = counting;
}
/**
* this locate result can be cached? we don't need check if it's TaggedLogAPIEntity each time when iterating entities
* @param groupby
* @param obj
* @return
* @throws Exception
*/
private String locateGroupbyField(String groupby, TaggedLogAPIEntity obj) {
if (groupby.equals(GROUPBY_ROOT_FIELD_NAME)) {
return GROUPBY_ROOT_FIELD_VALUE;
}
// check tag first
String tagv = obj.getTags().get(groupby);
if (tagv != null) {
return tagv;
}
// check against pojo, or qualifierValues
String fn = groupby.substring(0,1).toUpperCase() + groupby.substring(1, groupby.length());
try {
Method getM = obj.getClass().getMethod("get" + fn);
Object value = getM.invoke(obj);
return (String)value;
} catch (Exception ex) {
LOG.warn(groupby + " field is in neither tags nor fields, " + ex.getMessage());
return null;
}
}
/**
* accumulate a list of entities
* @param entities
* @throws Exception
*/
public void accumulateAll(List<TaggedLogAPIEntity> entities) throws Exception {
for (TaggedLogAPIEntity entity : entities) {
accumulate(entity);
}
}
/**
* currently only group by tags
* groupbys' first item always is site, which is a reserved field
*/
public void accumulate(TaggedLogAPIEntity entity) throws Exception {
AggregateAPIEntity current = root;
for (String groupby : groupbys) {
// TODO tagv is empty, so what to do? use a reserved field_name "unassigned" ?
// TODO we should support all Pojo with java bean style object
String tagv = locateGroupbyField(groupby, entity);
if (tagv == null || tagv.isEmpty()) {
tagv = UNASSIGNED_GROUPBY_ROOT_FIELD_NAME;
}
Map<String, AggregateAPIEntity> children = current.getEntityList();
if (children.get(tagv) == null) {
children.put(tagv, factory.create());
current.setNumDirectDescendants(current.getNumDirectDescendants() + 1);
}
AggregateAPIEntity child = children.get(tagv);
// go through all aggregate functions including count, summary etc.
if (counting) {
count(child);
}
for (String sumFunctionField : sumFunctionFields) {
sum(child, entity, sumFunctionField);
}
current = child;
}
}
/**
* use java bean specifications?
* reflection is not efficient, let us find out solutions
*/
private void sum(Object targetObj, TaggedLogAPIEntity srcObj, String fieldName) throws Exception {
try {
String fn = fieldName.substring(0,1).toUpperCase() + fieldName.substring(1, fieldName.length());
Method srcGetMethod = srcObj.getClass().getMethod("get" + fn);
Object srcValue = srcGetMethod.invoke(srcObj);
if (srcValue == null) {
return; // silently don't count this source object
}
Method targetGetMethod = targetObj.getClass().getMethod("get" + fn);
Object targetValue = targetGetMethod.invoke(targetObj);
if (targetValue instanceof Long) {
Method setM = targetObj.getClass().getMethod("set" + fn, long.class);
Long tmp1 = (Long)targetValue;
// TODO, now source object always have type "java.lang.String", later on we should support various type including integer type
Long tmp2 = null;
if (srcValue instanceof String) {
tmp2 = Long.valueOf((String)srcValue);
} else if (srcValue instanceof Long) {
tmp2 = (Long)srcValue;
} else {
throw new IllegalAggregateFieldTypeException(srcValue.getClass().toString() + " type is not support. The source type must be Long or String");
}
setM.invoke(targetObj, tmp1.longValue() + tmp2.longValue());
} else if (targetValue instanceof Double) {
Method setM = targetObj.getClass().getMethod("set" + fn, double.class);
Double tmp1 = (Double)targetValue;
String src = (String) srcValue;
Double tmp2 = Double.valueOf(src);
setM.invoke(targetObj, tmp1.doubleValue() + tmp2.doubleValue());
} else {
throw new IllegalAggregateFieldTypeException(targetValue.getClass().toString() + " type is not support. The target type must be long or double");
}
} catch (Exception ex) {
LOG.error("Cannot do sum aggregation for field " + fieldName, ex);
throw ex;
}
}
/**
* count possible not only count for number of descendants but also count for not-null fields
* @param targetObj
* @throws Exception
*/
private void count(AggregateAPIEntity targetObj) throws Exception {
targetObj.setNumTotalDescendants(targetObj.getNumTotalDescendants() + 1);
}
}