blob: 297b75c7e6abc855c73d067bc133634874706261 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.group.external;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.ISpillableTable;
import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
private final IHyracksTaskContext ctx;
private final Object stateId;
private final int[] keyFields;
private final IBinaryComparator[] comparators;
private final AggregateState aggregateState;
private final ArrayTupleBuilder tupleBuilder;
private final int[] storedKeys;
private final IAggregatorDescriptor aggregator;
private final boolean isOutputSorted;
private final int framesLimit;
private final RecordDescriptor outRecordDescriptor;
/**
* Input frames, one for each run file.
*/
private List<ByteBuffer> inFrames;
/**
* Output frame.
*/
private ByteBuffer outFrame, writerFrame;
private final FrameTupleAppender outAppender;
private FrameTupleAppender writerAppender;
private LinkedList<RunFileReader> runs;
private ExternalGroupState aggState;
private ArrayTupleBuilder finalTupleBuilder;
/**
* how many frames to be read ahead once
*/
private int runFrameLimit = 1;
private int[] currentFrameIndexInRun;
private int[] currentRunFrames;
private final FrameTupleAccessor outFrameAccessor;
ExternalGroupMergeOperatorNodePushable(IHyracksTaskContext ctx, Object stateId,
IBinaryComparatorFactory[] comparatorFactories, int[] keyFields,
IAggregatorDescriptorFactory mergerFactory, boolean isOutputSorted, int framesLimit,
RecordDescriptor outRecordDescriptor) throws HyracksDataException {
this.stateId = stateId;
this.keyFields = keyFields;
comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
int[] keyFieldsInPartialResults = new int[keyFields.length];
for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
keyFieldsInPartialResults[i] = i;
}
aggregator = mergerFactory.createAggregator(ctx, outRecordDescriptor, outRecordDescriptor, keyFields,
keyFieldsInPartialResults);
aggregateState = aggregator.createAggregateStates();
storedKeys = new int[keyFields.length];
/**
* Get the list of the fields in the stored records.
*/
for (int i = 0; i < keyFields.length; ++i) {
storedKeys[i] = i;
}
tupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
this.ctx = ctx;
outAppender = new FrameTupleAppender(ctx.getFrameSize());
outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
this.isOutputSorted = isOutputSorted;
this.framesLimit = framesLimit;
this.outRecordDescriptor = outRecordDescriptor;
}
public void initialize() throws HyracksDataException {
aggState = (ExternalGroupState) ctx.getStateObject(stateId);
runs = aggState.getRuns();
writer.open();
try {
if (runs.size() <= 0) {
ISpillableTable gTable = aggState.getSpillableTable();
if (gTable != null) {
if (isOutputSorted)
gTable.sortFrames();
gTable.flushFrames(writer, false);
}
gTable = null;
aggState = null;
} else {
aggState = null;
runs = new LinkedList<RunFileReader>(runs);
inFrames = new ArrayList<ByteBuffer>();
outFrame = ctx.allocateFrame();
outAppender.reset(outFrame, true);
outFrameAccessor.reset(outFrame);
while (runs.size() > 0) {
try {
doPass(runs);
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
inFrames.clear();
}
} catch (Exception e) {
writer.fail();
throw new HyracksDataException(e);
} finally {
aggregateState.close();
writer.close();
}
}
private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
FileReference newRun = null;
IFrameWriter writer = this.writer;
boolean finalPass = false;
while (inFrames.size() + 2 < framesLimit) {
inFrames.add(ctx.allocateFrame());
}
int runNumber;
if (runs.size() + 2 <= framesLimit) {
finalPass = true;
runFrameLimit = (framesLimit - 2) / runs.size();
runNumber = runs.size();
} else {
runNumber = framesLimit - 2;
newRun = ctx.getJobletContext().createManagedWorkspaceFile(
ExternalGroupOperatorDescriptor.class.getSimpleName());
writer = new RunFileWriter(newRun, ctx.getIOManager());
writer.open();
}
try {
currentFrameIndexInRun = new int[runNumber];
currentRunFrames = new int[runNumber];
/**
* Create file readers for each input run file, only for
* the ones fit into the inFrames
*/
RunFileReader[] runFileReaders = new RunFileReader[runNumber];
FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), outRecordDescriptor,
runNumber, comparator);
/**
* current tuple index in each run
*/
int[] tupleIndices = new int[runNumber];
for (int i = 0; i < runNumber; i++) {
int runIndex = topTuples.peek().getRunid();
tupleIndices[runIndex] = 0;
// Load the run file
runFileReaders[runIndex] = runs.get(runIndex);
runFileReaders[runIndex].open();
currentRunFrames[runIndex] = 0;
currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
for (int j = 0; j < runFrameLimit; j++) {
int frameIndex = currentFrameIndexInRun[runIndex] + j;
if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
currentRunFrames[runIndex]++;
if (j == 0)
setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
} else {
break;
}
}
}
/**
* Start merging
*/
while (!topTuples.areRunsExhausted()) {
/**
* Get the top record
*/
ReferenceEntry top = topTuples.peek();
int tupleIndex = top.getTupleIndex();
int runIndex = topTuples.peek().getRunid();
FrameTupleAccessor fta = top.getAccessor();
int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
if (currentTupleInOutFrame < 0
|| compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
/**
* Initialize the first output record Reset the
* tuple builder
*/
tupleBuilder.reset();
for (int k = 0; k < storedKeys.length; k++) {
tupleBuilder.addField(fta, tupleIndex, storedKeys[k]);
}
aggregator.init(tupleBuilder, fta, tupleIndex, aggregateState);
if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
flushOutFrame(writer, finalPass);
if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
throw new HyracksDataException(
"The partial result is too large to be initialized in a frame.");
}
}
} else {
/**
* if new tuple is in the same group of the
* current aggregator do merge and output to the
* outFrame
*/
aggregator.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame, aggregateState);
}
tupleIndices[runIndex]++;
setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
}
if (outAppender.getTupleCount() > 0) {
flushOutFrame(writer, finalPass);
outAppender.reset(outFrame, true);
}
aggregator.close();
runs.subList(0, runNumber).clear();
/**
* insert the new run file into the beginning of the run
* file list
*/
if (!finalPass) {
runs.add(0, ((RunFileWriter) writer).createReader());
}
} finally {
if (!finalPass) {
writer.close();
}
}
}
private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
if (finalTupleBuilder == null) {
finalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
}
if (writerFrame == null) {
writerFrame = ctx.allocateFrame();
}
if (writerAppender == null) {
writerAppender = new FrameTupleAppender(ctx.getFrameSize());
writerAppender.reset(writerFrame, true);
}
outFrameAccessor.reset(outFrame);
for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
finalTupleBuilder.reset();
for (int k = 0; k < storedKeys.length; k++) {
finalTupleBuilder.addField(outFrameAccessor, i, storedKeys[k]);
}
if (isFinal) {
aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
} else {
aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
}
if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
FrameUtils.flushFrame(writerFrame, writer);
writerAppender.reset(writerFrame, true);
if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
throw new HyracksDataException("Aggregation output is too large to be fit into a frame.");
}
}
}
if (writerAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(writerFrame, writer);
writerAppender.reset(writerFrame, true);
}
outAppender.reset(outFrame, true);
}
private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
int runStart = runIndex * runFrameLimit;
boolean existNext = false;
if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
/**
* run already closed
*/
existNext = false;
} else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
/**
* not the last frame for this run
*/
existNext = true;
if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
tupleIndices[runIndex] = 0;
currentFrameIndexInRun[runIndex]++;
}
} else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
/**
* the last frame has expired
*/
existNext = true;
} else {
/**
* If all tuples in the targeting frame have been
* checked.
*/
tupleIndices[runIndex] = 0;
currentFrameIndexInRun[runIndex] = runStart;
/**
* read in batch
*/
currentRunFrames[runIndex] = 0;
for (int j = 0; j < runFrameLimit; j++) {
int frameIndex = currentFrameIndexInRun[runIndex] + j;
if (runCursors[runIndex].nextFrame(inFrames.get(frameIndex))) {
tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
existNext = true;
currentRunFrames[runIndex]++;
} else {
break;
}
}
}
if (existNext) {
topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]], tupleIndices[runIndex]);
} else {
topTuples.pop();
closeRun(runIndex, runCursors, tupleAccessors);
}
}
/**
* Close the run file, and also the corresponding readers and
* input frame.
*
* @param index
* @param runCursors
* @param tupleAccessor
* @throws HyracksDataException
*/
private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
throws HyracksDataException {
if (runCursors[index] != null) {
runCursors[index].close();
runCursors[index] = null;
int frameOffset = index * runFrameLimit;
for (int j = 0; j < runFrameLimit; j++) {
tupleAccessor[frameOffset + j] = null;
}
}
}
private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
byte[] b1 = fta1.getBuffer().array();
byte[] b2 = fta2.getBuffer().array();
for (int f = 0; f < keyFields.length; ++f) {
int fIdx = f;
int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
int l1 = fta1.getFieldLength(j1, fIdx);
int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
int l2_start = fta2.getFieldStartOffset(j2, fIdx);
int l2_end = fta2.getFieldEndOffset(j2, fIdx);
int l2 = l2_end - l2_start;
int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
if (c != 0) {
return c;
}
}
return 0;
}
private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
return new Comparator<ReferenceEntry>() {
@Override
public int compare(ReferenceEntry o1, ReferenceEntry o2) {
FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
int j1 = o1.getTupleIndex();
int j2 = o2.getTupleIndex();
byte[] b1 = fta1.getBuffer().array();
byte[] b2 = fta2.getBuffer().array();
for (int f = 0; f < keyFields.length; ++f) {
int fIdx = f;
int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ fta1.getFieldStartOffset(j1, fIdx);
int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ fta2.getFieldStartOffset(j2, fIdx);
int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
if (c != 0) {
return c;
}
}
return 0;
}
};
}
}