blob: 7be9b08f11a3da62fe3c8e3fa1e5ba8e75e1cd6c [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.common.comm;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.FrameHelper;
import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileHandle;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
public class SortMergeFrameReader implements IFrameReader {
private static final Logger LOGGER = Logger.getLogger(SortMergeFrameReader.class.getName());
private final IHyracksStageletContext ctx;
private final IConnectionDemultiplexer demux;
private final FrameTuplePairComparator tpc;
private final FrameTupleAppender appender;
private final RecordDescriptor recordDescriptor;
private Run[] runs;
private ByteBuffer[] frames;
private PriorityQueue<Integer> pQueue;
private int lastReadSender;
private boolean first;
public SortMergeFrameReader(IHyracksStageletContext ctx, IConnectionDemultiplexer demux, int[] sortFields,
IBinaryComparator[] comparators, RecordDescriptor recordDescriptor) {
this.ctx = ctx;
this.demux = demux;
tpc = new FrameTuplePairComparator(sortFields, sortFields, comparators);
appender = new FrameTupleAppender(ctx.getFrameSize());
this.recordDescriptor = recordDescriptor;
}
@Override
public void open() throws HyracksDataException {
int nSenders = demux.getSenderCount();
runs = new Run[nSenders];
frames = new ByteBuffer[nSenders];
for (int i = 0; i < runs.length; ++i) {
runs[i] = new Run(i);
frames[i] = ctx.allocateFrame();
}
pQueue = new PriorityQueue<Integer>(nSenders, new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
int i1 = o1.intValue();
int i2 = o2.intValue();
Run r1 = runs[i1];
Run r2 = runs[i2];
int c = tpc.compare(r1.accessor, r1.tIndex, r2.accessor, r2.tIndex);
return c == 0 ? (i1 < i2 ? -1 : 1) : c;
}
});
lastReadSender = 0;
first = true;
}
@Override
public void close() throws HyracksDataException {
for (Run r : runs) {
r.close();
}
}
@Override
public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
buffer.clear();
buffer.position(buffer.capacity());
appender.reset(buffer, true);
if (first) {
for (int i = 0; i < runs.length; ++i) {
if (runs[i].next()) {
pQueue.add(Integer.valueOf(i));
}
}
}
first = false;
while (true) {
if (pQueue.isEmpty()) {
return appender.getTupleCount() > 0;
}
Integer top = pQueue.peek();
Run run = runs[top.intValue()];
if (!appender.append(run.accessor, run.tIndex)) {
return true;
}
pQueue.remove();
if (run.next()) {
pQueue.add(top);
}
}
}
private class Run {
private final int runId;
private final FileReference fRef;
private final FileHandle fHandle;
private final ByteBuffer frame;
private final FrameTupleAccessor accessor;
private int tIndex;
private long readFP;
private long writeFP;
private boolean eof;
public Run(int runId) throws HyracksDataException {
this.runId = runId;
fRef = ctx.createWorkspaceFile(SortMergeFrameReader.class.getName());
fHandle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
frame = ctx.allocateFrame();
accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
readFP = 0;
writeFP = 0;
eof = false;
}
public void close() throws HyracksDataException {
ctx.getIOManager().close(fHandle);
}
private void write(ByteBuffer frame) throws HyracksDataException {
int sz = ctx.getIOManager().syncWrite(fHandle, writeFP, frame);
writeFP += sz;
}
private boolean next() throws HyracksDataException {
++tIndex;
while (readFP == 0 || tIndex >= accessor.getTupleCount()) {
if (!read(frame)) {
return false;
}
accessor.reset(frame);
tIndex = 0;
}
return true;
}
private boolean read(ByteBuffer frame) throws HyracksDataException {
frame.clear();
while (!eof && readFP >= writeFP) {
spoolRuns(runId);
}
if (eof && readFP >= writeFP) {
return false;
}
int sz = ctx.getIOManager().syncRead(fHandle, readFP, frame);
readFP += sz;
return true;
}
private void eof() {
eof = true;
}
}
private void spoolRuns(int interestingRun) throws HyracksDataException {
while (true) {
IConnectionEntry entry = demux.findNextReadyEntry(lastReadSender);
lastReadSender = (Integer) entry.getAttachment();
ByteBuffer netBuffer = entry.getReadBuffer();
int tupleCount = netBuffer.getInt(FrameHelper.getTupleCountOffset(ctx.getFrameSize()));
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.finer("Frame Tuple Count: " + tupleCount);
}
if (tupleCount == 0) {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Empty Frame received: Closing " + lastReadSender);
}
int openEntries = demux.closeEntry(lastReadSender);
runs[lastReadSender].eof();
netBuffer.clear();
demux.unreadyEntry(lastReadSender);
if (openEntries == 0) {
return;
}
} else {
runs[lastReadSender].write(netBuffer);
netBuffer.clear();
demux.unreadyEntry(lastReadSender);
}
if (lastReadSender == interestingRun) {
return;
}
}
}
}