blob: eb8b08aa2169ae753b47a2d7d04117bf11efc8c3 [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.dimensions.generic;
import com.datatorrent.lib.statistics.DimensionsComputation;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
class GenericAggregate implements DimensionsComputation.AggregateEvent
{
public Object[] keys;
public Object[] aggregates;
long timestamp;
private int aggregatorIndex = 0;
public GenericAggregate() {}
public GenericAggregate(GenericEvent ge) {
this.keys = ge.keys;
this.aggregates = ge.values;
this.timestamp = ge.timestamp;
}
public GenericAggregate(int aggregatorIndex) {
this.aggregatorIndex = aggregatorIndex;
}
@Override
public int getAggregatorIndex()
{
return aggregatorIndex;
}
public long getTimestamp()
{
return timestamp;
}
public void setTimestamp(long timestamp)
{
this.timestamp = timestamp;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof GenericAggregate)) {
return false;
}
GenericAggregate that = (GenericAggregate) o;
for (int i = 0; i < keys.length; i++) {
Object thisValue = keys[i];
Object thatValue = that.keys[i];
if (thisValue != null ? !thisValue.equals(thatValue) : thatValue != null) {
return false;
}
}
return true;
}
@Override
public int hashCode()
{
int result = Arrays.hashCode(keys);
result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
return result;
}
@Override
public String toString()
{
return "GenericAggregate{" +
"timestamp = " + timestamp +
", keys=" + Arrays.toString(keys) +
", values=" + Arrays.toString(aggregates) +
", aggregatorIndex=" + aggregatorIndex +
'}';
}
}
public class GenericAggregator implements DimensionsComputation.Aggregator<GenericEvent, GenericAggregate>
{
private static final long serialVersionUID = 7636266873750826291L;
private EventSchema eventSchema;
private String dimension;
private TimeUnit time;
private final List<String> keys = Lists.newArrayList();
private final List<Integer> keyIndexes = Lists.newArrayList();
public GenericAggregator() {}
public GenericAggregator(EventSchema eventSchema)
{
this.eventSchema = eventSchema;
}
public void init(String dimension)
{
String[] attributes = dimension.split(":");
for (String attribute : attributes) {
String[] keyval = attribute.split("=", 2);
String key = keyval[0];
if (key.equals("time")) {
time = TimeUnit.valueOf(keyval[1]);
continue;
}
keys.add(key);
}
this.dimension = dimension;
for(int i = 0; i < eventSchema.genericEventKeys.size(); i++)
if (keys.contains(eventSchema.genericEventKeys.get(i)))
keyIndexes.add(i);
}
@Override
public GenericAggregate getGroup(GenericEvent src, int aggregatorIndex)
{
GenericAggregate aggr = new GenericAggregate();
aggr.keys = new Object[src.keys.length];
for(int i : keyIndexes)
{
aggr.keys[i] = src.keys[i];
}
aggr.aggregates = new Object[src.values.length];
// put converted timestamp as per unit specified in aggregation.
aggr.timestamp = TimeUnit.MILLISECONDS.convert(time.convert(src.timestamp, TimeUnit.MILLISECONDS), time);
eventSchema.defaultMissingFields(aggr);
return aggr;
}
@Override
public int computeHashCode(GenericEvent object)
{
int hashCode = 31;
for(int i : keyIndexes)
{
hashCode = hashCode * 31 + object.keys[i].hashCode();
}
long ltime = time.convert(object.timestamp, TimeUnit.MILLISECONDS);
hashCode = hashCode * 31 + (int)ltime;
return hashCode;
}
@Override public boolean equals(GenericEvent o1, GenericEvent o2)
{
if (o1 == o2)
return true;
if (o1 == null || o2 == null)
return false;
long t1 = time.convert(o1.timestamp, TimeUnit.MILLISECONDS);
long t2 = time.convert(o2.timestamp, TimeUnit.MILLISECONDS);
if (t1 != t2)
return false;
for(int i : keyIndexes)
{
Object i1 = o1.keys[i];
Object i2 = o2.keys[i];
if (i1 == i2)
continue;
if (i1 == null || i2 == null)
return false;
if (i1.equals(i2))
continue;
return false;
}
return true;
}
@Override
public void aggregate(GenericAggregate dest, GenericEvent src)
{
for(int i = 0; i < eventSchema.genericEventValues.size(); i++) {
dest.aggregates[i] = apply(eventSchema.genericEventValues.get(i), dest.aggregates[i], src.values[i]);
}
}
@Override
public void aggregate(GenericAggregate dest, GenericAggregate src)
{
for(int i = 0; i < eventSchema.genericEventValues.size(); i++) {
dest.aggregates[i] = apply(eventSchema.genericEventValues.get(i), dest.aggregates[i], src.aggregates[i]);
}
}
/* Apply operator between multiple objects */
private Object apply(String metric, Object o, Object o1)
{
//TODO define a class for each type of aggregation and
// avoid if/else.
if (eventSchema.aggregates.get(metric).equals("sum"))
{
if (eventSchema.fields.get(metric).equals(Integer.class)) {
int val1 = (o != null) ? ((Number)o).intValue() : 0;
int val2 = (o1 != null) ? ((Number)o1).intValue() : 0;
return val1 + val2;
} else if (eventSchema.fields.get(metric).equals(Long.class)) {
long val1 = (o != null) ? ((Number)o).longValue() : 0L;
long val2 = (o1 != null) ? ((Number)o1).longValue() : 0L;
return val1 + val2;
} else if (eventSchema.fields.get(metric).equals(Float.class)) {
float val1 = (o != null) ? ((Number)o).floatValue() : 0f;
float val2 = (o1 != null) ? ((Number)o1).floatValue() : 0f;
return val1 + val2;
} else if (eventSchema.fields.get(metric).equals(Double.class)) {
double val1 = (o != null) ? ((Number)o).doubleValue() : 0d;
double val2 = (o1 != null) ? ((Number)o1).doubleValue() : 0d;
return val1 + val2;
} else if (eventSchema.fields.get(metric).equals(String.class)) {
String val1 = (o != null) ? (String)o : "";
String val2 = (o1 != null) ? (String)o1 : "";
return val1 + val2;
}
}
return null;
}
public String getDimension()
{
return dimension;
}
public TimeUnit getTime()
{
return time;
}
public List<String> getKeys()
{
return keys;
}
}