blob: 6c436a96f8600ba16b546fa9e0a17607d1796d62 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.group.aggregators;
import java.io.DataOutput;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
/**
*
*/
public class MultiFieldsAggregatorFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private final IFieldAggregateDescriptorFactory[] aggregatorFactories;
private int[] keys;
public MultiFieldsAggregatorFactory(int[] keys, IFieldAggregateDescriptorFactory[] aggregatorFactories) {
this.keys = keys;
this.aggregatorFactories = aggregatorFactories;
}
public MultiFieldsAggregatorFactory(IFieldAggregateDescriptorFactory[] aggregatorFactories) {
this.aggregatorFactories = aggregatorFactories;
}
/*
* (non-Javadoc)
*
* @see
* edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregatorDescriptorFactory
* #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
* edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
* edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor)
*/
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults)
throws HyracksDataException {
final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length];
for (int i = 0; i < aggregators.length; i++) {
aggregators[i] = aggregatorFactories[i].createAggregator(ctx, inRecordDescriptor, outRecordDescriptor);
}
if (this.keys == null) {
this.keys = keyFields;
}
return new IAggregatorDescriptor() {
@Override
public void reset() {
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].reset();
}
}
@Override
public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
DataOutput dos = tupleBuilder.getDataOutput();
int tupleOffset = accessor.getTupleStartOffset(tIndex);
for (int i = 0; i < aggregators.length; i++) {
int fieldOffset = accessor.getFieldStartOffset(tIndex, keys.length + i);
aggregators[i].outputPartialResult(dos, accessor.getBuffer().array(),
fieldOffset + accessor.getFieldSlotsLength() + tupleOffset,
((AggregateState[]) state.state)[i]);
tupleBuilder.addFieldEndOffset();
}
}
@Override
public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
DataOutput dos = tupleBuilder.getDataOutput();
int tupleOffset = accessor.getTupleStartOffset(tIndex);
for (int i = 0; i < aggregators.length; i++) {
if (aggregators[i].needsBinaryState()) {
int fieldOffset = accessor.getFieldStartOffset(tIndex, keys.length + i);
aggregators[i].outputFinalResult(dos, accessor.getBuffer().array(),
tupleOffset + accessor.getFieldSlotsLength() + fieldOffset,
((AggregateState[]) state.state)[i]);
} else {
aggregators[i].outputFinalResult(dos, null, 0, ((AggregateState[]) state.state)[i]);
}
tupleBuilder.addFieldEndOffset();
}
}
@Override
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
DataOutput dos = tupleBuilder.getDataOutput();
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].init(accessor, tIndex, dos, ((AggregateState[]) state.state)[i]);
if (aggregators[i].needsBinaryState()) {
tupleBuilder.addFieldEndOffset();
}
}
}
@Override
public AggregateState createAggregateStates() {
AggregateState[] states = new AggregateState[aggregators.length];
for (int i = 0; i < states.length; i++) {
states[i] = aggregators[i].createState();
}
return new AggregateState(states);
}
@Override
public void close() {
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].close();
}
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
if (stateAccessor != null) {
int stateTupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
int fieldIndex = 0;
for (int i = 0; i < aggregators.length; i++) {
if (aggregators[i].needsBinaryState()) {
int stateFieldOffset = stateAccessor.getFieldStartOffset(stateTupleIndex, keys.length
+ fieldIndex);
aggregators[i].aggregate(accessor, tIndex, stateAccessor.getBuffer().array(),
stateTupleOffset + stateAccessor.getFieldSlotsLength() + stateFieldOffset,
((AggregateState[]) state.state)[i]);
fieldIndex++;
} else {
aggregators[i].aggregate(accessor, tIndex, null, 0, ((AggregateState[]) state.state)[i]);
}
}
} else {
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].aggregate(accessor, tIndex, null, 0, ((AggregateState[]) state.state)[i]);
}
}
}
};
}
}