blob: 743325677bccb37163830e32b48982afaf9f67c3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.util.DirectBufferPool;
import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.Collection;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
/**
* DFSStripedInputStream reads from striped block groups
*/
@InterfaceAudience.Private
public class DFSStripedInputStream extends DFSInputStream {
private static class ReaderRetryPolicy {
private int fetchEncryptionKeyTimes = 1;
private int fetchTokenTimes = 1;
void refetchEncryptionKey() {
fetchEncryptionKeyTimes--;
}
void refetchToken() {
fetchTokenTimes--;
}
boolean shouldRefetchEncryptionKey() {
return fetchEncryptionKeyTimes > 0;
}
boolean shouldRefetchToken() {
return fetchTokenTimes > 0;
}
}
/** Used to indicate the buffered data's range in the block group */
private static class StripeRange {
/** start offset in the block group (inclusive) */
final long offsetInBlock;
/** length of the stripe range */
final long length;
StripeRange(long offsetInBlock, long length) {
Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
this.offsetInBlock = offsetInBlock;
this.length = length;
}
boolean include(long pos) {
return pos >= offsetInBlock && pos < offsetInBlock + length;
}
}
private static class BlockReaderInfo {
final BlockReader reader;
final DatanodeInfo datanode;
/**
* when initializing block readers, their starting offsets are set to the same
* number: the smallest internal block offsets among all the readers. This is
* because it is possible that for some internal blocks we have to read
* "backwards" for decoding purpose. We thus use this offset array to track
* offsets for all the block readers so that we can skip data if necessary.
*/
long blockReaderOffset;
/**
* We use this field to indicate whether we should use this reader. In case
* we hit any issue with this reader, we set this field to true and avoid
* using it for the next stripe.
*/
boolean shouldSkip = false;
BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) {
this.reader = reader;
this.datanode = dn;
this.blockReaderOffset = offset;
}
void setOffset(long offset) {
this.blockReaderOffset = offset;
}
void skip() {
this.shouldSkip = true;
}
}
private static final DirectBufferPool bufferPool = new DirectBufferPool();
private final BlockReaderInfo[] blockReaders;
private final int cellSize;
private final short dataBlkNum;
private final short parityBlkNum;
private final int groupSize;
/** the buffer for a complete stripe */
private ByteBuffer curStripeBuf;
private ByteBuffer parityBuf;
private final ErasureCodingPolicy ecPolicy;
private final RawErasureDecoder decoder;
/**
* indicate the start/end offset of the current buffered stripe in the
* block group
*/
private StripeRange curStripeRange;
private final CompletionService<Void> readingService;
/**
* When warning the user of a lost block in striping mode, we remember the
* dead nodes we've logged. All other striping blocks on these nodes can be
* considered lost too, and we don't want to log a warning for each of them.
* This is to prevent the log from being too verbose. Refer to HDFS-8920.
*
* To minimize the overhead, we only store the datanodeUuid in this set
*/
private final Set<String> warnedNodes = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());
DFSStripedInputStream(DFSClient dfsClient, String src,
boolean verifyChecksum, ErasureCodingPolicy ecPolicy,
LocatedBlocks locatedBlocks) throws IOException {
super(dfsClient, src, verifyChecksum, locatedBlocks);
assert ecPolicy != null;
this.ecPolicy = ecPolicy;
this.cellSize = ecPolicy.getCellSize();
dataBlkNum = (short) ecPolicy.getNumDataUnits();
parityBlkNum = (short) ecPolicy.getNumParityUnits();
groupSize = dataBlkNum + parityBlkNum;
blockReaders = new BlockReaderInfo[groupSize];
curStripeRange = new StripeRange(0, 0);
readingService =
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
decoder = CodecUtil.createRSRawDecoder(dfsClient.getConfiguration(),
dataBlkNum, parityBlkNum);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Creating an striped input stream for file " + src);
}
}
private void resetCurStripeBuffer() {
if (curStripeBuf == null) {
curStripeBuf = bufferPool.getBuffer(cellSize * dataBlkNum);
}
curStripeBuf.clear();
curStripeRange = new StripeRange(0, 0);
}
private ByteBuffer getParityBuffer() {
if (parityBuf == null) {
parityBuf = bufferPool.getBuffer(cellSize * parityBlkNum);
}
parityBuf.clear();
return parityBuf;
}
/**
* When seeking into a new block group, create blockReader for each internal
* block in the group.
*/
private synchronized void blockSeekTo(long target) throws IOException {
if (target >= getFileLength()) {
throw new IOException("Attempted to read past end of file");
}
// Will be getting a new BlockReader.
closeCurrentBlockReaders();
// Compute desired striped block group
LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target);
// Update current position
this.pos = target;
this.blockEnd = targetBlockGroup.getStartOffset() +
targetBlockGroup.getBlockSize() - 1;
currentLocatedBlock = targetBlockGroup;
}
@Override
public synchronized void close() throws IOException {
super.close();
if (curStripeBuf != null) {
bufferPool.returnBuffer(curStripeBuf);
curStripeBuf = null;
}
if (parityBuf != null) {
bufferPool.returnBuffer(parityBuf);
parityBuf = null;
}
}
/**
* Extend the super method with the logic of switching between cells.
* When reaching the end of a cell, proceed to the next cell and read it
* with the next blockReader.
*/
@Override
protected void closeCurrentBlockReaders() {
resetCurStripeBuffer();
if (blockReaders == null || blockReaders.length == 0) {
return;
}
for (int i = 0; i < groupSize; i++) {
closeReader(blockReaders[i]);
blockReaders[i] = null;
}
blockEnd = -1;
}
private void closeReader(BlockReaderInfo readerInfo) {
if (readerInfo != null) {
// IOUtils.cleanup(null, readerInfo.reader);
readerInfo.skip();
}
}
private long getOffsetInBlockGroup() {
return getOffsetInBlockGroup(pos);
}
private long getOffsetInBlockGroup(long pos) {
return pos - currentLocatedBlock.getStartOffset();
}
/**
* Read a new stripe covering the current position, and store the data in the
* {@link #curStripeBuf}.
*/
private void readOneStripe(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
resetCurStripeBuffer();
// compute stripe range based on pos
final long offsetInBlockGroup = getOffsetInBlockGroup();
final long stripeLen = cellSize * dataBlkNum;
final int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
- (stripeIndex * stripeLen), stripeLen);
StripeRange stripeRange = new StripeRange(offsetInBlockGroup,
stripeLimit - stripeBufOffset);
LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy, cellSize,
blockGroup, offsetInBlockGroup,
offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
blockGroup, cellSize, dataBlkNum, parityBlkNum);
// read the whole stripe
for (AlignedStripe stripe : stripes) {
// Parse group to get chosen DN location
StripeReader sreader = new StatefulStripeReader(readingService, stripe,
blks, blockReaders, corruptedBlockMap);
sreader.readStripe();
}
curStripeBuf.position(stripeBufOffset);
curStripeBuf.limit(stripeLimit);
curStripeRange = stripeRange;
}
private Callable<Void> readCells(final BlockReader reader,
final DatanodeInfo datanode, final long currentReaderOffset,
final long targetReaderOffset, final ByteBufferStrategy[] strategies,
final ExtendedBlock currentBlock,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
// reader can be null if getBlockReaderWithRetry failed or
// the reader hit exception before
if (reader == null) {
throw new IOException("The BlockReader is null. " +
"The BlockReader creation failed or the reader hit exception.");
}
Preconditions.checkState(currentReaderOffset <= targetReaderOffset);
if (currentReaderOffset < targetReaderOffset) {
long skipped = reader.skip(targetReaderOffset - currentReaderOffset);
Preconditions.checkState(
skipped == targetReaderOffset - currentReaderOffset);
}
int result = 0;
for (ByteBufferStrategy strategy : strategies) {
result += readToBuffer(reader, datanode, strategy, currentBlock,
corruptedBlockMap);
}
return null;
}
};
}
private int readToBuffer(BlockReader blockReader,
DatanodeInfo currentNode, ByteBufferStrategy strategy,
ExtendedBlock currentBlock,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
final int targetLength = strategy.buf.remaining();
int length = 0;
try {
while (length < targetLength) {
int ret = strategy.doRead(blockReader, 0, 0);
if (ret < 0) {
throw new IOException("Unexpected EOS from the reader");
}
length += ret;
}
return length;
} catch (ChecksumException ce) {
DFSClient.LOG.warn("Found Checksum error for "
+ currentBlock + " from " + currentNode
+ " at " + ce.getPos());
// we want to remember which block replicas we have tried
addIntoCorruptedBlockMap(currentBlock, currentNode,
corruptedBlockMap);
throw ce;
} catch (IOException e) {
DFSClient.LOG.warn("Exception while reading from "
+ currentBlock + " of " + src + " from "
+ currentNode, e);
throw e;
}
}
/**
* Seek to a new arbitrary location
*/
@Override
public synchronized void seek(long targetPos) throws IOException {
if (targetPos > getFileLength()) {
throw new EOFException("Cannot seek after EOF");
}
if (targetPos < 0) {
throw new EOFException("Cannot seek to negative offset");
}
if (closed.get()) {
throw new IOException("Stream is closed!");
}
if (targetPos <= blockEnd) {
final long targetOffsetInBlk = getOffsetInBlockGroup(targetPos);
if (curStripeRange.include(targetOffsetInBlk)) {
int bufOffset = getStripedBufOffset(targetOffsetInBlk);
curStripeBuf.position(bufOffset);
pos = targetPos;
return;
}
}
pos = targetPos;
blockEnd = -1;
}
private int getStripedBufOffset(long offsetInBlockGroup) {
final long stripeLen = cellSize * dataBlkNum;
// compute the position in the curStripeBuf based on "pos"
return (int) (offsetInBlockGroup % stripeLen);
}
@Override
public synchronized boolean seekToNewSource(long targetPos)
throws IOException {
return false;
}
@Override
protected synchronized int readWithStrategy(ReaderStrategy strategy,
int off, int len) throws IOException {
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
}
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
new ConcurrentHashMap<>();
if (pos < getFileLength()) {
try {
if (pos > blockEnd) {
blockSeekTo(pos);
}
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
synchronized (infoLock) {
if (locatedBlocks.isLastBlockComplete()) {
realLen = (int) Math.min(realLen,
locatedBlocks.getFileLength() - pos);
}
}
/** Number of bytes already read into buffer */
int result = 0;
while (result < realLen) {
if (!curStripeRange.include(getOffsetInBlockGroup())) {
readOneStripe(corruptedBlockMap);
}
int ret = copyToTargetBuf(strategy, off + result, realLen - result);
result += ret;
pos += ret;
}
if (dfsClient.stats != null) {
dfsClient.stats.incrementBytesRead(result);
}
return result;
} finally {
// Check if need to report block replicas corruption either read
// was successful or ChecksumException occured.
reportCheckSumFailure(corruptedBlockMap,
currentLocatedBlock.getLocations().length);
}
}
return -1;
}
/**
* Copy the data from {@link #curStripeBuf} into the given buffer
* @param strategy the ReaderStrategy containing the given buffer
* @param offset the offset of the given buffer. Used only when strategy is
* a ByteArrayStrategy
* @param length target length
* @return number of bytes copied
*/
private int copyToTargetBuf(ReaderStrategy strategy, int offset, int length) {
final long offsetInBlk = getOffsetInBlockGroup();
int bufOffset = getStripedBufOffset(offsetInBlk);
curStripeBuf.position(bufOffset);
return strategy.copyFrom(curStripeBuf, offset,
Math.min(length, curStripeBuf.remaining()));
}
/**
* The super method {@link DFSInputStream#refreshLocatedBlock} refreshes
* cached LocatedBlock by executing {@link DFSInputStream#getBlockAt} again.
* This method extends the logic by first remembering the index of the
* internal block, and re-parsing the refreshed block group with the same
* index.
*/
@Override
protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
throws IOException {
int idx = StripedBlockUtil.getBlockIndex(block.getBlock().getLocalBlock());
LocatedBlock lb = getBlockGroupAt(block.getStartOffset());
// If indexing information is returned, iterate through the index array
// to find the entry for position idx in the group
LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
int i = 0;
for (; i < lsb.getBlockIndices().length; i++) {
if (lsb.getBlockIndices()[i] == idx) {
break;
}
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("refreshLocatedBlock for striped blocks, offset="
+ block.getStartOffset() + ". Obtained block " + lb + ", idx=" + idx);
}
return StripedBlockUtil.constructInternalBlock(
lsb, i, cellSize, dataBlkNum, idx);
}
private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
LocatedBlock lb = super.getBlockAt(offset);
assert lb instanceof LocatedStripedBlock : "NameNode" +
" should return a LocatedStripedBlock for a striped file";
return (LocatedStripedBlock)lb;
}
/**
* Real implementation of pread.
*/
@Override
protected void fetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
// Refresh the striped block group
LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());
AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
ecPolicy, cellSize, blockGroup, start, end, buf, offset);
CompletionService<Void> readService = new ExecutorCompletionService<>(
dfsClient.getStripedReadsThreadPool());
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
blockGroup, cellSize, dataBlkNum, parityBlkNum);
final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
try {
for (AlignedStripe stripe : stripes) {
// Parse group to get chosen DN location
StripeReader preader = new PositionStripeReader(readService, stripe,
blks, preaderInfos, corruptedBlockMap);
preader.readStripe();
}
} finally {
for (BlockReaderInfo preaderInfo : preaderInfos) {
closeReader(preaderInfo);
}
}
}
@Override
protected void reportLostBlock(LocatedBlock lostBlock,
Collection<DatanodeInfo> ignoredNodes) {
DatanodeInfo[] nodes = lostBlock.getLocations();
if (nodes != null && nodes.length > 0) {
List<String> dnUUIDs = new ArrayList<>();
for (DatanodeInfo node : nodes) {
dnUUIDs.add(node.getDatanodeUuid());
}
if (!warnedNodes.containsAll(dnUUIDs)) {
DFSClient.LOG.warn(Arrays.toString(nodes) + " are unavailable and " +
"all striping blocks on them are lost. " +
"IgnoredNodes = " + ignoredNodes);
warnedNodes.addAll(dnUUIDs);
}
} else {
super.reportLostBlock(lostBlock, ignoredNodes);
}
}
/**
* The reader for reading a complete {@link AlignedStripe}. Note that an
* {@link AlignedStripe} may cross multiple stripes with cellSize width.
*/
private abstract class StripeReader {
final Map<Future<Void>, Integer> futures = new HashMap<>();
final AlignedStripe alignedStripe;
final CompletionService<Void> service;
final LocatedBlock[] targetBlocks;
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap;
final BlockReaderInfo[] readerInfos;
StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe,
LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
this.service = service;
this.alignedStripe = alignedStripe;
this.targetBlocks = targetBlocks;
this.readerInfos = readerInfos;
this.corruptedBlockMap = corruptedBlockMap;
}
/** prepare all the data chunks */
abstract void prepareDecodeInputs();
/** prepare the parity chunk and block reader if necessary */
abstract boolean prepareParityChunk(int index);
abstract void decode();
void updateState4SuccessRead(StripingChunkReadResult result) {
Preconditions.checkArgument(
result.state == StripingChunkReadResult.SUCCESSFUL);
readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock()
+ alignedStripe.getSpanInBlock());
}
private void checkMissingBlocks() throws IOException {
if (alignedStripe.missingChunksNum > parityBlkNum) {
clearFutures(futures.keySet());
throw new IOException(alignedStripe.missingChunksNum
+ " missing blocks, the stripe is: " + alignedStripe);
}
}
/**
* We need decoding. Thus go through all the data chunks and make sure we
* submit read requests for all of them.
*/
private void readDataForDecoding() throws IOException {
prepareDecodeInputs();
for (int i = 0; i < dataBlkNum; i++) {
Preconditions.checkNotNull(alignedStripe.chunks[i]);
if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
if (!readChunk(targetBlocks[i], i)) {
alignedStripe.missingChunksNum++;
}
}
}
checkMissingBlocks();
}
void readParityChunks(int num) throws IOException {
for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
i++) {
if (alignedStripe.chunks[i] == null) {
if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) {
j++;
} else {
alignedStripe.missingChunksNum++;
}
}
}
checkMissingBlocks();
}
boolean createBlockReader(LocatedBlock block, int chunkIndex)
throws IOException {
BlockReader reader = null;
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
DNAddrPair dnInfo = new DNAddrPair(null, null, null);
while(true) {
try {
// the cached block location might have been re-fetched, so always
// get it from cache.
block = refreshLocatedBlock(block);
targetBlocks[chunkIndex] = block;
// internal block has one location, just rule out the deadNodes
dnInfo = getBestNodeDNAddrPair(block, null);
if (dnInfo == null) {
break;
}
reader = getBlockReader(block, alignedStripe.getOffsetInBlock(),
block.getBlockSize() - alignedStripe.getOffsetInBlock(),
dnInfo.addr, dnInfo.storageType, dnInfo.info);
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException &&
retry.shouldRefetchEncryptionKey()) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + dnInfo.addr
+ " : " + e);
dfsClient.clearDataEncryptionKey();
retry.refetchEncryptionKey();
} else if (retry.shouldRefetchToken() &&
tokenRefetchNeeded(e, dnInfo.addr)) {
fetchBlockAt(block.getStartOffset());
retry.refetchToken();
} else {
//TODO: handles connection issues
DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
"block" + block.getBlock(), e);
// re-fetch the block in case the block has been moved
fetchBlockAt(block.getStartOffset());
addToDeadNodes(dnInfo.info);
}
}
if (reader != null) {
readerInfos[chunkIndex] = new BlockReaderInfo(reader, dnInfo.info,
alignedStripe.getOffsetInBlock());
return true;
}
}
return false;
}
private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
if (chunk.byteBuffer != null) {
ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer);
return new ByteBufferStrategy[]{strategy};
} else {
ByteBufferStrategy[] strategies =
new ByteBufferStrategy[chunk.byteArray.getOffsets().length];
for (int i = 0; i < strategies.length; i++) {
ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(),
chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]);
strategies[i] = new ByteBufferStrategy(buffer);
}
return strategies;
}
}
boolean readChunk(final LocatedBlock block, int chunkIndex)
throws IOException {
final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
if (block == null) {
chunk.state = StripingChunk.MISSING;
return false;
}
if (readerInfos[chunkIndex] == null) {
if (!createBlockReader(block, chunkIndex)) {
chunk.state = StripingChunk.MISSING;
return false;
}
} else if (readerInfos[chunkIndex].shouldSkip) {
chunk.state = StripingChunk.MISSING;
return false;
}
chunk.state = StripingChunk.PENDING;
Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
readerInfos[chunkIndex].datanode,
readerInfos[chunkIndex].blockReaderOffset,
alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
block.getBlock(), corruptedBlockMap);
Future<Void> request = service.submit(readCallable);
futures.put(request, chunkIndex);
return true;
}
/** read the whole stripe. do decoding if necessary */
void readStripe() throws IOException {
for (int i = 0; i < dataBlkNum; i++) {
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
if (!readChunk(targetBlocks[i], i)) {
alignedStripe.missingChunksNum++;
}
}
}
// There are missing block locations at this stage. Thus we need to read
// the full stripe and one more parity block.
if (alignedStripe.missingChunksNum > 0) {
checkMissingBlocks();
readDataForDecoding();
// read parity chunks
readParityChunks(alignedStripe.missingChunksNum);
}
// TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
// Input buffers for potential decode operation, which remains null until
// first read failure
while (!futures.isEmpty()) {
try {
StripingChunkReadResult r = StripedBlockUtil
.getNextCompletedStripedRead(service, futures, 0);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
+ alignedStripe);
}
StripingChunk returnedChunk = alignedStripe.chunks[r.index];
Preconditions.checkNotNull(returnedChunk);
Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
if (r.state == StripingChunkReadResult.SUCCESSFUL) {
returnedChunk.state = StripingChunk.FETCHED;
alignedStripe.fetchedChunksNum++;
updateState4SuccessRead(r);
if (alignedStripe.fetchedChunksNum == dataBlkNum) {
clearFutures(futures.keySet());
break;
}
} else {
returnedChunk.state = StripingChunk.MISSING;
// close the corresponding reader
closeReader(readerInfos[r.index]);
final int missing = alignedStripe.missingChunksNum;
alignedStripe.missingChunksNum++;
checkMissingBlocks();
readDataForDecoding();
readParityChunks(alignedStripe.missingChunksNum - missing);
}
} catch (InterruptedException ie) {
String err = "Read request interrupted";
DFSClient.LOG.error(err);
clearFutures(futures.keySet());
// Don't decode if read interrupted
throw new InterruptedIOException(err);
}
}
if (alignedStripe.missingChunksNum > 0) {
decode();
}
}
}
class PositionStripeReader extends StripeReader {
private byte[][] decodeInputs = null;
PositionStripeReader(CompletionService<Void> service,
AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
BlockReaderInfo[] readerInfos,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
super(service, alignedStripe, targetBlocks, readerInfos,
corruptedBlockMap);
}
@Override
void prepareDecodeInputs() {
if (decodeInputs == null) {
decodeInputs = StripedBlockUtil.initDecodeInputs(alignedStripe,
dataBlkNum, parityBlkNum);
}
}
@Override
boolean prepareParityChunk(int index) {
Preconditions.checkState(index >= dataBlkNum &&
alignedStripe.chunks[index] == null);
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
alignedStripe.chunks[index].addByteArraySlice(0,
(int) alignedStripe.getSpanInBlock());
return true;
}
@Override
void decode() {
StripedBlockUtil.finalizeDecodeInputs(decodeInputs, alignedStripe);
StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe,
dataBlkNum, parityBlkNum, decoder);
}
}
class StatefulStripeReader extends StripeReader {
ByteBuffer[] decodeInputs;
StatefulStripeReader(CompletionService<Void> service,
AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
BlockReaderInfo[] readerInfos,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
super(service, alignedStripe, targetBlocks, readerInfos,
corruptedBlockMap);
}
@Override
void prepareDecodeInputs() {
if (decodeInputs == null) {
decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
final ByteBuffer cur;
synchronized (DFSStripedInputStream.this) {
cur = curStripeBuf.duplicate();
}
StripedBlockUtil.VerticalRange range = alignedStripe.range;
for (int i = 0; i < dataBlkNum; i++) {
cur.limit(cur.capacity());
int pos = (int) (range.offsetInBlock % cellSize + cellSize * i);
cur.position(pos);
cur.limit((int) (pos + range.spanInBlock));
decodeInputs[i] = cur.slice();
if (alignedStripe.chunks[i] == null) {
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
}
}
}
}
@Override
boolean prepareParityChunk(int index) {
Preconditions.checkState(index >= dataBlkNum
&& alignedStripe.chunks[index] == null);
if (blockReaders[index] != null && blockReaders[index].shouldSkip) {
alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
// we have failed the block reader before
return false;
}
final int parityIndex = index - dataBlkNum;
ByteBuffer buf = getParityBuffer().duplicate();
buf.position(cellSize * parityIndex);
buf.limit(cellSize * parityIndex + (int) alignedStripe.range.spanInBlock);
decodeInputs[index] = buf.slice();
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
return true;
}
@Override
void decode() {
final int span = (int) alignedStripe.getSpanInBlock();
for (int i = 0; i < alignedStripe.chunks.length; i++) {
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state == StripingChunk.ALLZERO) {
for (int j = 0; j < span; j++) {
decodeInputs[i].put((byte) 0);
}
decodeInputs[i].flip();
} else if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state == StripingChunk.FETCHED) {
decodeInputs[i].position(0);
decodeInputs[i].limit(span);
}
}
int[] decodeIndices = new int[parityBlkNum];
int pos = 0;
for (int i = 0; i < alignedStripe.chunks.length; i++) {
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state == StripingChunk.MISSING) {
if (i < dataBlkNum) {
decodeIndices[pos++] = i;
} else {
decodeInputs[i] = null;
}
}
}
decodeIndices = Arrays.copyOf(decodeIndices, pos);
final int decodeChunkNum = decodeIndices.length;
ByteBuffer[] outputs = new ByteBuffer[decodeChunkNum];
for (int i = 0; i < decodeChunkNum; i++) {
outputs[i] = decodeInputs[decodeIndices[i]];
outputs[i].position(0);
outputs[i].limit((int) alignedStripe.range.spanInBlock);
decodeInputs[decodeIndices[i]] = null;
}
decoder.decode(decodeInputs, decodeIndices, outputs);
}
}
/**
* May need online read recovery, zero-copy read doesn't make
* sense, so don't support it.
*/
@Override
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
throw new UnsupportedOperationException(
"Not support enhanced byte buffer access.");
}
@Override
public synchronized void releaseBuffer(ByteBuffer buffer) {
throw new UnsupportedOperationException(
"Not support enhanced byte buffer access.");
}
/** A variation to {@link DFSInputStream#cancelAll} */
private void clearFutures(Collection<Future<Void>> futures) {
for (Future<Void> future : futures) {
future.cancel(false);
}
futures.clear();
}
}