blob: aba3c7df8e330605ef98cd5b3e06f4c05994d0d7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.IFile.Reader;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
/**
* <code>BackupStore</code> is an utility class that is used to support
* the mark-reset functionality of values iterator
*
* <p>It has two caches - a memory cache and a file cache where values are
* stored as they are iterated, after a mark. On reset, values are retrieved
* from these caches. Framework moves from the memory cache to the
* file cache when the memory cache becomes full.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class BackupStore<K,V> {
private static final Log LOG = LogFactory.getLog(BackupStore.class.getName());
private static final int MAX_VINT_SIZE = 9;
private static final int EOF_MARKER_SIZE = 2 * MAX_VINT_SIZE;
private final TaskAttemptID tid;
private MemoryCache memCache;
private FileCache fileCache;
List<Segment<K,V>> segmentList = new LinkedList<Segment<K,V>>();
private int readSegmentIndex = 0;
private int firstSegmentOffset = 0;
private int currentKVOffset = 0;
private int nextKVOffset = -1;
private DataInputBuffer currentKey = null;
private DataInputBuffer currentValue = new DataInputBuffer();
private DataInputBuffer currentDiskValue = new DataInputBuffer();
private boolean hasMore = false;
private boolean inReset = false;
private boolean clearMarkFlag = false;
private boolean lastSegmentEOF = false;
public BackupStore(Configuration conf, TaskAttemptID taskid)
throws IOException {
final float bufferPercent =
conf.getFloat(JobContext.REDUCE_MARKRESET_BUFFER_PERCENT, 0f);
if (bufferPercent > 1.0 || bufferPercent < 0.0) {
throw new IOException(JobContext.REDUCE_MARKRESET_BUFFER_PERCENT +
bufferPercent);
}
int maxSize = (int)Math.min(
Runtime.getRuntime().maxMemory() * bufferPercent, Integer.MAX_VALUE);
// Support an absolute size also.
int tmp = conf.getInt(JobContext.REDUCE_MARKRESET_BUFFER_SIZE, 0);
if (tmp > 0) {
maxSize = tmp;
}
memCache = new MemoryCache(maxSize);
fileCache = new FileCache(conf);
tid = taskid;
LOG.info("Created a new BackupStore with a memory of " + maxSize);
}
/**
* Write the given K,V to the cache.
* Write to memcache if space is available, else write to the filecache
* @param key
* @param value
* @throws IOException
*/
public void write(DataInputBuffer key, DataInputBuffer value)
throws IOException {
assert (key != null && value != null);
if (fileCache.isActive()) {
fileCache.write(key, value);
return;
}
if (memCache.reserveSpace(key, value)) {
memCache.write(key, value);
} else {
fileCache.activate();
fileCache.write(key, value);
}
}
public void mark() throws IOException {
// We read one KV pair in advance in hasNext.
// If hasNext has read the next KV pair from a new segment, but the
// user has not called next() for that KV, then reset the readSegmentIndex
// to the previous segment
if (nextKVOffset == 0) {
assert (readSegmentIndex != 0);
assert (currentKVOffset != 0);
readSegmentIndex --;
}
// just drop segments before the current active segment
int i = 0;
Iterator<Segment<K,V>> itr = segmentList.iterator();
while (itr.hasNext()) {
Segment<K,V> s = itr.next();
if (i == readSegmentIndex) {
break;
}
s.close();
itr.remove();
i++;
LOG.debug("Dropping a segment");
}
// FirstSegmentOffset is the offset in the current segment from where we
// need to start reading on the next reset
firstSegmentOffset = currentKVOffset;
readSegmentIndex = 0;
LOG.debug("Setting the FirsSegmentOffset to " + currentKVOffset);
}
public void reset() throws IOException {
// Create a new segment for the previously written records only if we
// are not already in the reset mode
if (!inReset) {
if (fileCache.isActive) {
fileCache.createInDiskSegment();
} else {
memCache.createInMemorySegment();
}
}
inReset = true;
// Reset the segments to the correct position from where the next read
// should begin.
for (int i = 0; i < segmentList.size(); i++) {
Segment<K,V> s = segmentList.get(i);
if (s.inMemory()) {
int offset = (i == 0) ? firstSegmentOffset : 0;
s.getReader().reset(offset);
} else {
s.closeReader();
if (i == 0) {
s.reinitReader(firstSegmentOffset);
s.getReader().disableChecksumValidation();
}
}
}
currentKVOffset = firstSegmentOffset;
nextKVOffset = -1;
readSegmentIndex = 0;
hasMore = false;
lastSegmentEOF = false;
LOG.debug("Reset - First segment offset is " + firstSegmentOffset +
" Segment List Size is " + segmentList.size());
}
public boolean hasNext() throws IOException {
if (lastSegmentEOF) {
return false;
}
// We read the next KV from the cache to decide if there is any left.
// Since hasNext can be called several times before the actual call to
// next(), we use hasMore to avoid extra reads. hasMore is set to false
// when the user actually consumes this record in next()
if (hasMore) {
return true;
}
Segment<K,V> seg = segmentList.get(readSegmentIndex);
// Mark the current position. This would be set to currentKVOffset
// when the user consumes this record in next().
nextKVOffset = (int) seg.getActualPosition();
if (seg.nextRawKey()) {
currentKey = seg.getKey();
seg.getValue(currentValue);
hasMore = true;
return true;
} else {
if (!seg.inMemory()) {
seg.closeReader();
}
}
// If this is the last segment, mark the lastSegmentEOF flag and return
if (readSegmentIndex == segmentList.size() - 1) {
nextKVOffset = -1;
lastSegmentEOF = true;
return false;
}
nextKVOffset = 0;
readSegmentIndex ++;
Segment<K,V> nextSegment = segmentList.get(readSegmentIndex);
// We possibly are moving from a memory segment to a disk segment.
// Reset so that we do not corrupt the in-memory segment buffer.
// See HADOOP-5494
if (!nextSegment.inMemory()) {
currentValue.reset(currentDiskValue.getData(),
currentDiskValue.getLength());
nextSegment.init(null);
}
if (nextSegment.nextRawKey()) {
currentKey = nextSegment.getKey();
nextSegment.getValue(currentValue);
hasMore = true;
return true;
} else {
throw new IOException("New segment did not have even one K/V");
}
}
public void next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException("iterate past last value");
}
// Reset hasMore. See comment in hasNext()
hasMore = false;
currentKVOffset = nextKVOffset;
nextKVOffset = -1;
}
public DataInputBuffer nextValue() {
return currentValue;
}
public DataInputBuffer nextKey() {
return currentKey;
}
public void reinitialize() throws IOException {
if (segmentList.size() != 0) {
clearSegmentList();
}
memCache.reinitialize(true);
fileCache.reinitialize();
readSegmentIndex = firstSegmentOffset = 0;
currentKVOffset = 0;
nextKVOffset = -1;
hasMore = inReset = clearMarkFlag = false;
}
/**
* This function is called the ValuesIterator when a mark is called
* outside of a reset zone.
*/
public void exitResetMode() throws IOException {
inReset = false;
if (clearMarkFlag ) {
// If a flag was set to clear mark, do the reinit now.
// See clearMark()
reinitialize();
return;
}
if (!fileCache.isActive) {
memCache.reinitialize(false);
}
}
/** For writing the first key and value bytes directly from the
* value iterators, pass the current underlying output stream
* @param length The length of the impending write
*/
public DataOutputStream getOutputStream(int length) throws IOException {
if (memCache.reserveSpace(length)) {
return memCache.dataOut;
} else {
fileCache.activate();
return fileCache.writer.getOutputStream();
}
}
/** This method is called by the valueIterators after writing the first
* key and value bytes to the BackupStore
* @param length
*/
public void updateCounters(int length) {
if (fileCache.isActive) {
fileCache.writer.updateCountersForExternalAppend(length);
} else {
memCache.usedSize += length;
}
}
public void clearMark() throws IOException {
if (inReset) {
// If we are in the reset mode, we just mark a flag and come out
// The actual re initialization would be done when we exit the reset
// mode
clearMarkFlag = true;
} else {
reinitialize();
}
}
private void clearSegmentList() throws IOException {
for (Segment<K,V> segment: segmentList) {
long len = segment.getLength();
segment.close();
if (segment.inMemory()) {
memCache.unreserve(len);
}
}
segmentList.clear();
}
class MemoryCache {
private DataOutputBuffer dataOut;
private int blockSize;
private int usedSize;
private final BackupRamManager ramManager;
// Memory cache is made up of blocks.
private int defaultBlockSize = 1024 * 1024;
public MemoryCache(int maxSize) {
ramManager = new BackupRamManager(maxSize);
if (maxSize < defaultBlockSize) {
defaultBlockSize = maxSize;
}
}
public void unreserve(long len) {
ramManager.unreserve((int)len);
}
/**
* Re-initialize the memory cache.
*
* @param clearAll If true, re-initialize the ramManager also.
*/
void reinitialize(boolean clearAll) {
if (clearAll) {
ramManager.reinitialize();
}
int allocatedSize = createNewMemoryBlock(defaultBlockSize,
defaultBlockSize);
assert(allocatedSize == defaultBlockSize || allocatedSize == 0);
LOG.debug("Created a new mem block of " + allocatedSize);
}
private int createNewMemoryBlock(int requestedSize, int minSize) {
int allocatedSize = ramManager.reserve(requestedSize, minSize);
usedSize = 0;
if (allocatedSize == 0) {
dataOut = null;
blockSize = 0;
} else {
dataOut = new DataOutputBuffer(allocatedSize);
blockSize = allocatedSize;
}
return allocatedSize;
}
/**
* This method determines if there is enough space left in the
* memory cache to write to the requested length + space for
* subsequent EOF makers.
* @param length
* @return true if enough space is available
*/
boolean reserveSpace(int length) throws IOException {
int availableSize = blockSize - usedSize;
if (availableSize >= length + EOF_MARKER_SIZE) {
return true;
}
// Not enough available. Close this block
assert (!inReset);
createInMemorySegment();
// Create a new block
int tmp = Math.max(length + EOF_MARKER_SIZE, defaultBlockSize);
availableSize = createNewMemoryBlock(tmp,
(length + EOF_MARKER_SIZE));
return (availableSize == 0) ? false : true;
}
boolean reserveSpace(DataInputBuffer key, DataInputBuffer value)
throws IOException {
int keyLength = key.getLength() - key.getPosition();
int valueLength = value.getLength() - value.getPosition();
int requestedSize = keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
return reserveSpace(requestedSize);
}
/**
* Write the key and value to the cache in the IFile format
* @param key
* @param value
* @throws IOException
*/
public void write(DataInputBuffer key, DataInputBuffer value)
throws IOException {
int keyLength = key.getLength() - key.getPosition();
int valueLength = value.getLength() - value.getPosition();
WritableUtils.writeVInt(dataOut, keyLength);
WritableUtils.writeVInt(dataOut, valueLength);
dataOut.write(key.getData(), key.getPosition(), keyLength);
dataOut.write(value.getData(), value.getPosition(), valueLength);
usedSize += keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
LOG.debug("ID: " + segmentList.size() + " WRITE TO MEM");
}
/**
* This method creates a memory segment from the existing buffer
* @throws IOException
*/
void createInMemorySegment () throws IOException {
// If nothing was written in this block because the record size
// was greater than the allocated block size, just return.
if (usedSize == 0) {
ramManager.unreserve(blockSize);
return;
}
// spaceAvailable would have ensured that there is enough space
// left for the EOF markers.
assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
usedSize += EOF_MARKER_SIZE;
ramManager.unreserve(blockSize - usedSize);
Reader<K, V> reader =
new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null,
(org.apache.hadoop.mapred.TaskAttemptID) tid,
dataOut.getData(), 0, usedSize);
Segment<K, V> segment = new Segment<K, V>(reader, false);
segmentList.add(segment);
LOG.debug("Added Memory Segment to List. List Size is " +
segmentList.size());
}
}
class FileCache {
private LocalDirAllocator lDirAlloc;
private final Configuration conf;
private final FileSystem fs;
private boolean isActive = false;
private Path file = null;
private IFile.Writer<K,V> writer = null;
private int spillNumber = 0;
public FileCache(Configuration conf)
throws IOException {
this.conf = conf;
this.fs = FileSystem.getLocal(conf);
this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
}
void write(DataInputBuffer key, DataInputBuffer value)
throws IOException {
if (writer == null) {
// If spillNumber is 0, we should have called activate and not
// come here at all
assert (spillNumber != 0);
writer = createSpillFile();
}
writer.append(key, value);
LOG.debug("ID: " + segmentList.size() + " WRITE TO DISK");
}
void reinitialize() {
spillNumber = 0;
writer = null;
isActive = false;
}
void activate() throws IOException {
isActive = true;
writer = createSpillFile();
}
void createInDiskSegment() throws IOException {
assert (writer != null);
writer.close();
Segment<K,V> s = new Segment<K, V>(conf, fs, file, null, true);
writer = null;
segmentList.add(s);
LOG.debug("Disk Segment added to List. Size is " + segmentList.size());
}
boolean isActive() { return isActive; }
private Writer<K,V> createSpillFile() throws IOException {
Path tmp =
new Path(TaskTracker.OUTPUT + "/backup_" + tid.getId() + "_"
+ (spillNumber++) + ".out");
LOG.info("Created file: " + tmp);
file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(),
-1, conf);
return new Writer<K, V>(conf, fs, file);
}
}
static class BackupRamManager implements RamManager {
private int availableSize = 0;
private final int maxSize;
public BackupRamManager(int size) {
availableSize = maxSize = size;
}
public boolean reserve(int requestedSize, InputStream in) {
// Not used
LOG.warn("Reserve(int, InputStream) not supported by BackupRamManager");
return false;
}
int reserve(int requestedSize) {
if (availableSize == 0) {
return 0;
}
int reservedSize = Math.min(requestedSize, availableSize);
availableSize -= reservedSize;
LOG.debug("Reserving: " + reservedSize + " Requested: " + requestedSize);
return reservedSize;
}
int reserve(int requestedSize, int minSize) {
if (availableSize < minSize) {
LOG.debug("No Space available. Available: " + availableSize +
" MinSize: " + minSize);
return 0;
} else {
return reserve(requestedSize);
}
}
public void unreserve(int requestedSize) {
availableSize += requestedSize;
LOG.debug("Unreserving: " + requestedSize +
". Available: " + availableSize);
}
void reinitialize() {
availableSize = maxSize;
}
}
}