blob: 8e03eae20f51f12922364e487b99ab923057b7ae [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;
public class ShuffleFrameReader implements IFrameReader {
private final IHyracksTaskContext ctx;
private final NonDeterministicChannelReader channelReader;
private final HadoopHelper helper;
private final RecordDescriptor recordDescriptor;
private List<RunFileWriter> runFileWriters;
private RunFileReader reader;
public ShuffleFrameReader(IHyracksTaskContext ctx, NonDeterministicChannelReader channelReader,
MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
this.ctx = ctx;
this.channelReader = channelReader;
helper = new HadoopHelper(mConfig);
this.recordDescriptor = helper.getMapOutputRecordDescriptor();
}
@Override
public void open() throws HyracksDataException {
channelReader.open();
int nSenders = channelReader.getSenderPartitionCount();
runFileWriters = new ArrayList<RunFileWriter>();
RunInfo[] infos = new RunInfo[nSenders];
FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
IInputChannel[] channels = channelReader.getChannels();
while (true) {
int entry = channelReader.findNextSender();
if (entry < 0) {
break;
}
RunInfo info = infos[entry];
IInputChannel channel = channels[entry];
ByteBuffer netBuffer = channel.getNextBuffer();
accessor.reset(netBuffer);
int nTuples = accessor.getTupleCount();
for (int i = 0; i < nTuples; ++i) {
int tBlockId = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
FrameUtils.getAbsoluteFieldStartOffset(accessor, i, HadoopHelper.BLOCKID_FIELD_INDEX));
if (info == null) {
info = new RunInfo();
info.reset(tBlockId);
infos[entry] = info;
} else if (info.blockId != tBlockId) {
info.close();
info.reset(tBlockId);
}
info.write(accessor, i);
}
channel.recycleBuffer(netBuffer);
}
for (int i = 0; i < infos.length; ++i) {
RunInfo info = infos[i];
if (info != null) {
info.close();
}
}
infos = null;
FileReference outFile = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
int framesLimit = helper.getSortFrameLimit(ctx);
IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
List<IFrameReader> runs = new LinkedList<IFrameReader>();
for (RunFileWriter rfw : runFileWriters) {
runs.add(rfw.createReader());
}
RunFileWriter rfw = new RunFileWriter(outFile, ctx.getIOManager());
ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, null, runs, new int[] { 0 }, comparators,
recordDescriptor, framesLimit, rfw);
merger.process();
reader = rfw.createReader();
reader.open();
}
@Override
public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
return reader.nextFrame(buffer);
}
@Override
public void close() throws HyracksDataException {
reader.close();
}
private class RunInfo {
private final ByteBuffer buffer;
private final FrameTupleAppender fta;
private FileReference file;
private RunFileWriter rfw;
private int blockId;
public RunInfo() {
buffer = ctx.allocateFrame();
fta = new FrameTupleAppender(ctx.getFrameSize());
}
public void reset(int blockId) throws HyracksDataException {
this.blockId = blockId;
fta.reset(buffer, true);
try {
file = ctx.createManagedWorkspaceFile(ShuffleFrameReader.class.getName() + ".run");
rfw = new RunFileWriter(file, ctx.getIOManager());
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
public void write(FrameTupleAccessor accessor, int tIdx) throws HyracksDataException {
if (!fta.append(accessor, tIdx)) {
flush();
if (!fta.append(accessor, tIdx)) {
throw new IllegalStateException();
}
}
}
public void close() throws HyracksDataException {
flush();
rfw.close();
runFileWriters.add(rfw);
}
private void flush() throws HyracksDataException {
if (fta.getTupleCount() <= 0) {
return;
}
buffer.limit(buffer.capacity());
buffer.position(0);
rfw.nextFrame(buffer);
fta.reset(buffer, true);
}
}
}