blob: 744cd1cecf864c95665ecb0fd36c31d048ada1df [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.dimensions.aggregator;
import java.util.List;
import java.util.Map;
import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.datatorrent.api.annotation.Name;
import com.datatorrent.lib.appdata.gpo.GPOMutable;
import com.datatorrent.lib.appdata.gpo.GPOUtils;
import com.datatorrent.lib.appdata.gpo.Serde;
import com.datatorrent.lib.appdata.gpo.SerdeFieldsDescriptor;
import com.datatorrent.lib.appdata.gpo.SerdeListGPOMutable;
import com.datatorrent.lib.appdata.gpo.SerdeObjectPayloadFix;
import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
import com.datatorrent.lib.appdata.schemas.Type;
@Name("CUM_SUM")
/**
* @since 3.1.0
*/
public class AggregatorCumSum extends AggregatorSum
{
private static final long serialVersionUID = 201506280518L;
public static final int KEY_FD_INDEX = 0;
public static final int AGGREGATE_FD_INDEX = 1;
public static final int KEYS_INDEX = 2;
public static final int AGGREGATES_INDEX = 3;
public static final FieldsDescriptor META_DATA_FIELDS_DESCRIPTOR;
static {
Map<String, Type> fieldToType = Maps.newHashMap();
fieldToType.put("fdkeys", Type.OBJECT);
fieldToType.put("fdvalues", Type.OBJECT);
fieldToType.put("keys", Type.OBJECT);
fieldToType.put("values", Type.OBJECT);
Map<String, Serde> fieldToSerde = Maps.newHashMap();
fieldToSerde.put("fdkeys", SerdeFieldsDescriptor.INSTANCE);
fieldToSerde.put("fdvalues", SerdeFieldsDescriptor.INSTANCE);
fieldToSerde.put("keys", SerdeListGPOMutable.INSTANCE);
fieldToSerde.put("values", SerdeListGPOMutable.INSTANCE);
META_DATA_FIELDS_DESCRIPTOR = new FieldsDescriptor(fieldToType,
fieldToSerde,
new PayloadFix());
}
public AggregatorCumSum()
{
}
@Override
public Aggregate getGroup(InputEvent src, int aggregatorIndex)
{
src.used = true;
Aggregate agg = createAggregate(src,
context,
aggregatorIndex);
GPOUtils.indirectCopy(agg.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
GPOMutable metaData = new GPOMutable(getMetaDataDescriptor());
GPOMutable fullKey = new GPOMutable(src.getKeys());
if (context.inputTimestampIndex >= 0) {
fullKey.getFieldsLong()[context.inputTimestampIndex] = -1L;
}
List<GPOMutable> keys = Lists.newArrayList(fullKey);
GPOMutable value = new GPOMutable(agg.getAggregates());
List<GPOMutable> values = Lists.newArrayList(value);
metaData.getFieldsObject()[KEY_FD_INDEX] = fullKey.getFieldDescriptor();
metaData.getFieldsObject()[AGGREGATE_FD_INDEX] = value.getFieldDescriptor();
metaData.getFieldsObject()[KEYS_INDEX] = keys;
metaData.getFieldsObject()[AGGREGATES_INDEX] = values;
agg.setMetaData(metaData);
return agg;
}
@Override
public void aggregate(Aggregate dest, InputEvent src)
{
@SuppressWarnings("unchecked")
List<GPOMutable> destKeys =
(List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX];
@SuppressWarnings("unchecked")
List<GPOMutable> destAggregates =
(List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
long timestamp = 0L;
if (context.inputTimestampIndex >= 0) {
timestamp = src.getKeys().getFieldsLong()[context.inputTimestampIndex];
src.getKeys().getFieldsLong()[context.inputTimestampIndex] = -1L;
}
if (!contains(destKeys, src.getKeys())) {
destKeys.add(new GPOMutable(src.getKeys()));
GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor);
GPOUtils.indirectCopy(aggregates, src.getAggregates(), context.indexSubsetAggregates);
destAggregates.add(aggregates);
this.aggregateAggs(dest.getAggregates(), aggregates);
}
if (context.inputTimestampIndex >= 0) {
src.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp;
}
}
@Override
public void aggregate(Aggregate dest, Aggregate src)
{
dest.getMetaData().applyObjectPayloadFix();
src.getMetaData().applyObjectPayloadFix();
@SuppressWarnings("unchecked")
List<GPOMutable> destKeys =
(List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX];
@SuppressWarnings("unchecked")
List<GPOMutable> srcKeys =
(List<GPOMutable>)src.getMetaData().getFieldsObject()[KEYS_INDEX];
@SuppressWarnings("unchecked")
List<GPOMutable> destAggregates =
(List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
@SuppressWarnings("unchecked")
List<GPOMutable> srcAggregates =
(List<GPOMutable>)src.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
List<GPOMutable> newKeys = Lists.newArrayList();
List<GPOMutable> newAggs = Lists.newArrayList();
for (int index = 0;
index < srcKeys.size();
index++) {
GPOMutable currentSrcKey = srcKeys.get(index);
GPOMutable currentSrcAgg = srcAggregates.get(index);
if (!contains(destKeys, currentSrcKey)) {
newKeys.add(currentSrcKey);
newAggs.add(currentSrcAgg);
this.aggregateAggs(dest.getAggregates(), currentSrcAgg);
}
}
destKeys.addAll(newKeys);
destAggregates.addAll(newAggs);
}
private boolean contains(List<GPOMutable> mutables, GPOMutable mutable)
{
for (int index = 0;
index < mutables.size();
index++) {
GPOMutable mutableFromList = mutables.get(index);
if (GPOUtils.equals(mutableFromList, mutable)) {
return true;
}
}
return false;
}
@Override
public FieldsDescriptor getMetaDataDescriptor()
{
return META_DATA_FIELDS_DESCRIPTOR;
}
public static class PayloadFix implements SerdeObjectPayloadFix
{
@Override
public void fix(Object[] objects)
{
FieldsDescriptor keyfd = (FieldsDescriptor)objects[KEY_FD_INDEX];
FieldsDescriptor valuefd = (FieldsDescriptor)objects[AGGREGATE_FD_INDEX];
@SuppressWarnings("unchecked")
List<GPOMutable> keyMutables = (List<GPOMutable>)objects[KEYS_INDEX];
@SuppressWarnings("unchecked")
List<GPOMutable> aggregateMutables = (List<GPOMutable>)objects[AGGREGATES_INDEX];
fix(keyfd, keyMutables);
fix(valuefd, aggregateMutables);
}
private void fix(FieldsDescriptor fd, List<GPOMutable> mutables)
{
for (int index = 0;
index < mutables.size();
index++) {
mutables.get(index).setFieldDescriptor(fd);
}
}
}
}