blob: c8dc852febb6983791f77a0cdd31d1577fb5c778 [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
public class NestedPlansRunningAggregatorFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private final AlgebricksPipeline[] subplans;
private final int[] keyFieldIdx;
private final int[] decorFieldIdx;
public NestedPlansRunningAggregatorFactory(AlgebricksPipeline[] subplans, int[] keyFieldIdx, int[] decorFieldIdx) {
this.subplans = subplans;
this.keyFieldIdx = keyFieldIdx;
this.decorFieldIdx = decorFieldIdx;
}
/* (non-Javadoc)
* @see edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory#createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[], int[])
*/
@Override
public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
final IFrameWriter writer) throws HyracksDataException {
final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length,
decorFieldIdx.length, writer);
final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
try {
pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}
final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
final ByteBuffer outputFrame = ctx.allocateFrame();
final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
outputAppender.reset(outputFrame, true);
return new IAggregatorDescriptor() {
@Override
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
for (int i = 0; i < pipelines.length; ++i) {
pipelines[i].open();
}
gbyTb.reset();
for (int i = 0; i < keyFieldIdx.length; ++i) {
gbyTb.addField(accessor, tIndex, keyFieldIdx[i]);
}
for (int i = 0; i < decorFieldIdx.length; ++i) {
gbyTb.addField(accessor, tIndex, decorFieldIdx[i]);
}
// aggregate the first tuple
for (int i = 0; i < pipelines.length; i++) {
outputWriter.setInputIdx(i);
pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
}
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
for (int i = 0; i < pipelines.length; i++) {
outputWriter.setInputIdx(i);
pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
}
}
@Override
public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
for (int i = 0; i < pipelines.length; ++i) {
outputWriter.setInputIdx(i);
pipelines[i].close();
}
return false;
}
@Override
public AggregateState createAggregateStates() {
return new AggregateState();
}
@Override
public void reset() {
}
@Override
public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
int tIndex, AggregateState state) throws HyracksDataException {
throw new IllegalStateException("this method should not be called");
}
@Override
public void close() {
}
};
}
private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
throws AlgebricksException, HyracksDataException {
// plug the operators
IFrameWriter start = writer;
IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
if (i > 0) {
newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
} else {
// the nts has the same input and output rec. desc.
newRuntime.setInputRecordDescriptor(0, recordDescriptors[0]);
}
start = newRuntime;
}
return start;
}
private static class RunningAggregatorOutput implements IFrameWriter {
private final FrameTupleAccessor[] tAccess;
private final RecordDescriptor[] inputRecDesc;
private int inputIdx;
private final ArrayTupleBuilder tb;
private final ArrayTupleBuilder gbyTb;
private final AlgebricksPipeline[] subplans;
private final IFrameWriter outputWriter;
private final ByteBuffer outputFrame;
private final FrameTupleAppender outputAppender;
public RunningAggregatorOutput(IHyracksTaskContext ctx, AlgebricksPipeline[] subplans, int numKeys,
int numDecors, IFrameWriter outputWriter) throws HyracksDataException {
this.subplans = subplans;
this.outputWriter = outputWriter;
// this.keyFieldIndexes = keyFieldIndexes;
int totalAggFields = 0;
this.inputRecDesc = new RecordDescriptor[subplans.length];
for (int i = 0; i < subplans.length; i++) {
RecordDescriptor[] rd = subplans[i].getRecordDescriptors();
this.inputRecDesc[i] = rd[rd.length - 1];
totalAggFields += subplans[i].getOutputWidth();
}
tb = new ArrayTupleBuilder(numKeys + numDecors + totalAggFields);
gbyTb = new ArrayTupleBuilder(numKeys + numDecors);
this.tAccess = new FrameTupleAccessor[inputRecDesc.length];
for (int i = 0; i < inputRecDesc.length; i++) {
tAccess[i] = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc[i]);
}
this.outputFrame = ctx.allocateFrame();
this.outputAppender = new FrameTupleAppender(ctx.getFrameSize());
this.outputAppender.reset(outputFrame, true);
}
@Override
public void open() throws HyracksDataException {
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
int w = subplans[inputIdx].getOutputWidth();
IFrameTupleAccessor accessor = tAccess[inputIdx];
accessor.reset(buffer);
for (int tIndex = 0; tIndex < accessor.getTupleCount(); tIndex++) {
tb.reset();
byte[] data = gbyTb.getByteArray();
int[] fieldEnds = gbyTb.getFieldEndOffsets();
int start = 0;
int offset = 0;
for (int i = 0; i < fieldEnds.length; i++) {
if (i > 0)
start = fieldEnds[i - 1];
offset = fieldEnds[i] - start;
tb.addField(data, start, offset);
}
for (int f = 0; f < w; f++) {
tb.addField(accessor, tIndex, f);
}
if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
FrameUtils.flushFrame(outputFrame, outputWriter);
outputAppender.reset(outputFrame, true);
if (!outputAppender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
throw new HyracksDataException(
"Failed to write a running aggregation result into an empty frame: possibly the size of the result is too large.");
}
}
}
}
@Override
public void close() throws HyracksDataException {
if (outputAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(outputFrame, outputWriter);
outputAppender.reset(outputFrame, true);
}
}
public void setInputIdx(int inputIdx) {
this.inputIdx = inputIdx;
}
public ArrayTupleBuilder getGroupByTupleBuilder() {
return gbyTb;
}
@Override
public void fail() throws HyracksDataException {
}
}
}