blob: 4d5d3d6cd7ab245c74206cc531abed931d536ede [file] [log] [blame]
package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
import java.io.DataOutput;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private ICopySerializableAggregateFunctionFactory[] aggFactories;
public SerializableAggregatorDescriptorFactory(ICopySerializableAggregateFunctionFactory[] aggFactories) {
this.aggFactories = aggFactories;
}
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults)
throws HyracksDataException {
final int[] keys = keyFields;
/**
* one IAggregatorDescriptor instance per Gby operator
*/
return new IAggregatorDescriptor() {
private FrameTupleReference ftr = new FrameTupleReference();
private ICopySerializableAggregateFunction[] aggs = new ICopySerializableAggregateFunction[aggFactories.length];
private int offsetFieldIndex = keys.length;
private int stateFieldLength[] = new int[aggFactories.length];
@Override
public AggregateState createAggregateStates() {
return new AggregateState();
}
@Override
public void init(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
throws HyracksDataException {
DataOutput output = tb.getDataOutput();
ftr.reset(accessor, tIndex);
for (int i = 0; i < aggs.length; i++) {
try {
int begin = tb.getSize();
if (aggs[i] == null) {
aggs[i] = aggFactories[i].createAggregateFunction();
}
aggs[i].init(output);
tb.addFieldEndOffset();
stateFieldLength[i] = tb.getSize() - begin;
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
// doing initial aggregate
ftr.reset(accessor, tIndex);
for (int i = 0; i < aggs.length; i++) {
try {
byte[] data = tb.getByteArray();
int prevFieldPos = i + keys.length - 1;
int start = prevFieldPos >= 0 ? tb.getFieldEndOffsets()[prevFieldPos] : 0;
aggs[i].step(ftr, data, start, stateFieldLength[i]);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
ftr.reset(accessor, tIndex);
int stateTupleStart = stateAccessor.getTupleStartOffset(stateTupleIndex);
int fieldSlotLength = stateAccessor.getFieldSlotsLength();
for (int i = 0; i < aggs.length; i++) {
try {
byte[] data = stateAccessor.getBuffer().array();
int start = stateAccessor.getFieldStartOffset(stateTupleIndex, i + keys.length)
+ stateTupleStart + fieldSlotLength;
aggs[i].step(ftr, data, start, stateFieldLength[i]);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
}
@Override
public void outputPartialResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
byte[] data = accessor.getBuffer().array();
int startOffset = accessor.getTupleStartOffset(tIndex);
int aggFieldOffset = accessor.getFieldStartOffset(tIndex, offsetFieldIndex);
int refOffset = startOffset + accessor.getFieldSlotsLength() + aggFieldOffset;
int start = refOffset;
for (int i = 0; i < aggs.length; i++) {
try {
aggs[i].finishPartial(data, start, stateFieldLength[i], tb.getDataOutput());
start += stateFieldLength[i];
tb.addFieldEndOffset();
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
}
@Override
public void outputFinalResult(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
byte[] data = accessor.getBuffer().array();
int startOffset = accessor.getTupleStartOffset(tIndex);
int aggFieldOffset = accessor.getFieldStartOffset(tIndex, offsetFieldIndex);
int refOffset = startOffset + accessor.getFieldSlotsLength() + aggFieldOffset;
int start = refOffset;
for (int i = 0; i < aggs.length; i++) {
try {
aggs[i].finish(data, start, stateFieldLength[i], tb.getDataOutput());
start += stateFieldLength[i];
tb.addFieldEndOffset();
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
}
@Override
public void reset() {
}
@Override
public void close() {
reset();
}
};
}
}