blob: 2ae6f72ed4c46cd6448cd083ce1cfcf7b9a84bd5 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
public class NestedPlansAccumulatingAggregatorFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private AlgebricksPipeline[] subplans;
private int[] keyFieldIdx;
private int[] decorFieldIdx;
public NestedPlansAccumulatingAggregatorFactory(AlgebricksPipeline[] subplans, int[] keyFieldIdx,
int[] decorFieldIdx) {
this.subplans = subplans;
this.keyFieldIdx = keyFieldIdx;
this.decorFieldIdx = decorFieldIdx;
}
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
final AggregatorOutput outputWriter = new AggregatorOutput(ctx.getFrameSize(), subplans, keyFieldIdx.length,
decorFieldIdx.length);
final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
try {
pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
return new IAggregatorDescriptor() {
@Override
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
tb.reset();
for (int i = 0; i < keyFieldIdx.length; ++i) {
tb.addField(accessor, tIndex, keyFieldIdx[i]);
}
for (int i = 0; i < decorFieldIdx.length; ++i) {
tb.addField(accessor, tIndex, decorFieldIdx[i]);
}
for (int i = 0; i < pipelines.length; ++i) {
pipelines[i].open();
}
// aggregate the first tuple
for (int i = 0; i < pipelines.length; i++) {
pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
}
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
// it only works if the output of the aggregator fits in one
// frame
for (int i = 0; i < pipelines.length; i++) {
pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
}
}
@Override
public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
for (int i = 0; i < pipelines.length; i++) {
outputWriter.setInputIdx(i);
pipelines[i].close();
}
// outputWriter.writeTuple(appender);
tupleBuilder.reset();
ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
byte[] data = tb.getByteArray();
int[] fieldEnds = tb.getFieldEndOffsets();
int start = 0;
int offset = 0;
for (int i = 0; i < fieldEnds.length; i++) {
if (i > 0)
start = fieldEnds[i - 1];
offset = fieldEnds[i] - start;
tupleBuilder.addField(data, start, offset);
}
}
@Override
public AggregateState createAggregateStates() {
return new AggregateState();
}
@Override
public void reset() {
}
@Override
public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
throw new IllegalStateException("this method should not be called");
}
@Override
public void close() {
}
};
}
private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
throws AlgebricksException {
// plug the operators
IFrameWriter start = writer;
IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
if (i > 0) {
newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
} else {
// the nts has the same input and output rec. desc.
newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
}
start = newRuntime;
}
return start;
}
/**
* We suppose for now, that each subplan only produces one tuple.
*/
private static class AggregatorOutput implements IFrameWriter {
// private ByteBuffer frame;
private FrameTupleAccessor[] tAccess;
private RecordDescriptor[] inputRecDesc;
private int inputIdx;
private ArrayTupleBuilder tb;
private AlgebricksPipeline[] subplans;
public AggregatorOutput(int frameSize, AlgebricksPipeline[] subplans, int numKeys, int numDecors) {
this.subplans = subplans;
// this.keyFieldIndexes = keyFieldIndexes;
int totalAggFields = 0;
this.inputRecDesc = new RecordDescriptor[subplans.length];
for (int i = 0; i < subplans.length; i++) {
RecordDescriptor[] rd = subplans[i].getRecordDescriptors();
this.inputRecDesc[i] = rd[rd.length - 1];
totalAggFields += subplans[i].getOutputWidth();
}
tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields);
this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
for (int i = 0; i < inputRecDesc.length; i++) {
tAccess[i] = new FrameTupleAccessor(frameSize, inputRecDesc[i]);
}
}
@Override
public void open() throws HyracksDataException {
}
/**
* Since each pipeline only produces one tuple, this method is only
* called by the close method of the pipelines.
*/
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
int tIndex = 0;
int w = subplans[inputIdx].getOutputWidth();
IFrameTupleAccessor accessor = tAccess[inputIdx];
accessor.reset(buffer);
for (int f = 0; f < w; f++) {
tb.addField(accessor, tIndex, f);
}
}
@Override
public void close() throws HyracksDataException {
// clearFrame();
}
public void setInputIdx(int inputIdx) {
this.inputIdx = inputIdx;
}
public ArrayTupleBuilder getTupleBuilder() {
return tb;
}
@Override
public void fail() throws HyracksDataException {
}
}
}