blob: 40d75f44713c3a186814714654cd822d274e7cf9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.mapred.lib;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* The DynamicRecordReader is used in conjunction with the DynamicInputFormat
* to implement the "Worker pattern" for DistCp.
* The DynamicRecordReader is responsible for:
* 1. Presenting the contents of each chunk to DistCp's mapper.
* 2. Acquiring a new chunk when the current chunk has been completely consumed,
* transparently.
*/
public class DynamicRecordReader<K, V> extends RecordReader<K, V> {
private static final Log LOG = LogFactory.getLog(DynamicRecordReader.class);
private TaskAttemptContext taskAttemptContext;
private Configuration configuration;
private DynamicInputChunk<K, V> chunk;
private TaskID taskId;
// Data required for progress indication.
private int numRecordsPerChunk; // Constant per job.
private int totalNumRecords; // Constant per job.
private int numRecordsProcessedByThisMap = 0;
private long timeOfLastChunkDirScan = 0;
private boolean isChunkDirAlreadyScanned = false;
private static long TIME_THRESHOLD_FOR_DIR_SCANS = TimeUnit.MINUTES.toMillis(5);
/**
* Implementation for RecordReader::initialize(). Initializes the internal
* RecordReader to read from chunks.
* @param inputSplit The InputSplit for the map. Ignored entirely.
* @param taskAttemptContext The AttemptContext.
* @throws IOException, on failure.
* @throws InterruptedException
*/
@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
numRecordsPerChunk = DynamicInputFormat.getNumEntriesPerChunk(
taskAttemptContext.getConfiguration());
this.taskAttemptContext = taskAttemptContext;
configuration = taskAttemptContext.getConfiguration();
taskId = taskAttemptContext.getTaskAttemptID().getTaskID();
chunk = DynamicInputChunk.acquire(this.taskAttemptContext);
timeOfLastChunkDirScan = System.currentTimeMillis();
isChunkDirAlreadyScanned = false;
totalNumRecords = getTotalNumRecords();
}
private int getTotalNumRecords() {
return DistCpUtils.getInt(configuration,
DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS);
}
/**
* Implementation of RecordReader::nextValue().
* Reads the contents of the current chunk and returns them. When a chunk has
* been completely exhausted, an new chunk is acquired and read,
* transparently.
* @return True, if the nextValue() could be traversed to. False, otherwise.
* @throws IOException, on failure.
* @throws InterruptedException
*/
@Override
public boolean nextKeyValue()
throws IOException, InterruptedException {
if (chunk == null) {
if (LOG.isDebugEnabled())
LOG.debug(taskId + ": RecordReader is null. No records to be read.");
return false;
}
if (chunk.getReader().nextKeyValue()) {
++numRecordsProcessedByThisMap;
return true;
}
if (LOG.isDebugEnabled())
LOG.debug(taskId + ": Current chunk exhausted. " +
" Attempting to pick up new one.");
chunk.release();
timeOfLastChunkDirScan = System.currentTimeMillis();
isChunkDirAlreadyScanned = false;
chunk = DynamicInputChunk.acquire(taskAttemptContext);
if (chunk == null) return false;
if (chunk.getReader().nextKeyValue()) {
++numRecordsProcessedByThisMap;
return true;
}
else {
return false;
}
}
/**
* Implementation of RecordReader::getCurrentKey().
* @return The key of the current record. (i.e. the source-path.)
* @throws IOException, on failure.
* @throws InterruptedException
*/
@Override
public K getCurrentKey()
throws IOException, InterruptedException {
return chunk.getReader().getCurrentKey();
}
/**
* Implementation of RecordReader::getCurrentValue().
* @return The value of the current record. (i.e. the target-path.)
* @throws IOException, on failure.
* @throws InterruptedException
*/
@Override
public V getCurrentValue()
throws IOException, InterruptedException {
return chunk.getReader().getCurrentValue();
}
/**
* Implementation of RecordReader::getProgress().
* @return A fraction [0.0,1.0] indicating the progress of a DistCp mapper.
* @throws IOException, on failure.
* @throws InterruptedException
*/
@Override
public float getProgress()
throws IOException, InterruptedException {
final int numChunksLeft = getNumChunksLeft();
if (numChunksLeft < 0) {// Un-initialized. i.e. Before 1st dir-scan.
assert numRecordsProcessedByThisMap <= numRecordsPerChunk
: "numRecordsProcessedByThisMap:" + numRecordsProcessedByThisMap +
" exceeds numRecordsPerChunk:" + numRecordsPerChunk;
return ((float) numRecordsProcessedByThisMap) / totalNumRecords;
// Conservative estimate, till the first directory scan.
}
return ((float) numRecordsProcessedByThisMap)
/(numRecordsProcessedByThisMap + numRecordsPerChunk*numChunksLeft);
}
private int getNumChunksLeft() throws IOException {
long now = System.currentTimeMillis();
boolean tooLongSinceLastDirScan
= now - timeOfLastChunkDirScan > TIME_THRESHOLD_FOR_DIR_SCANS;
if (tooLongSinceLastDirScan
|| (!isChunkDirAlreadyScanned &&
numRecordsProcessedByThisMap%numRecordsPerChunk
> numRecordsPerChunk/2)) {
DynamicInputChunk.getListOfChunkFiles();
isChunkDirAlreadyScanned = true;
timeOfLastChunkDirScan = now;
}
return DynamicInputChunk.getNumChunksLeft();
}
/**
* Implementation of RecordReader::close().
* Closes the RecordReader.
* @throws IOException, on failure.
*/
@Override
public void close()
throws IOException {
if (chunk != null)
chunk.close();
}
}