blob: 92ae916679391c9a05053d4ab196fb2063d795a1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.sort.impl.dflt;
import java.io.IOException;
import java.io.InputStream;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.runtime.library.common.shuffle.server.ShuffleHandler;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
public class InMemoryShuffleSorter extends DefaultSorter {
private static final Log LOG = LogFactory.getLog(InMemoryShuffleSorter.class);
static final int IFILE_EOF_LENGTH =
2 * WritableUtils.getVIntSize(IFile.EOF_MARKER);
static final int IFILE_CHECKSUM_LENGTH = DataChecksum.Type.CRC32.size;
private List<Integer> spillIndices = new ArrayList<Integer>();
private List<ShuffleHeader> shuffleHeaders = new ArrayList<ShuffleHeader>();
ShuffleHandler shuffleHandler = new ShuffleHandler(this);
byte[] kvbuffer;
IntBuffer kvmeta;
@Override
public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
super.initialize(outputContext, conf, numOutputs);
shuffleHandler.initialize(outputContext, conf);
}
@Override
protected void spill(int mstart, int mend)
throws IOException, InterruptedException {
// Start the shuffleHandler
shuffleHandler.start();
// Don't spill!
// Make a copy
this.kvbuffer = super.kvbuffer;
this.kvmeta = super.kvmeta;
// Just save spill-indices for serving later
int spindex = mstart;
for (int i = 0; i < partitions; ++i) {
spillIndices.add(spindex);
int length = 0;
while (spindex < mend &&
kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
final int kvoff = offsetFor(spindex);
int keyLen =
kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART);
int valLen = getInMemVBytesLength(kvoff);
length +=
(keyLen + WritableUtils.getVIntSize(keyLen)) +
(valLen + WritableUtils.getVIntSize(valLen));
++spindex;
}
length += IFILE_EOF_LENGTH;
shuffleHeaders.add(
new ShuffleHeader(
outputContext.getUniqueIdentifier(), // TODO Verify that this is correct.
length + IFILE_CHECKSUM_LENGTH, length, i)
);
LOG.info("shuffleHeader[" + i + "]:" +
" rawLen=" + length + " partLen=" + (length + IFILE_CHECKSUM_LENGTH) +
" spillIndex=" + spillIndices.get(i));
}
LOG.info("Saved " + spillIndices.size() + " spill-indices and " +
shuffleHeaders.size() + " shuffle headers");
}
@Override
public InputStream getSortedStream(int partition) {
return new SortBufferInputStream(this, partition);
}
@Override
public void close() throws IOException {
// FIXME
//shuffleHandler.stop();
}
@Override
public ShuffleHeader getShuffleHeader(int reduce) {
return shuffleHeaders.get(reduce);
}
public int getSpillIndex(int partition) {
return spillIndices.get(partition);
}
}