blob: 9b27033bccdcb5b698d86d2e082d54fa54ef3d39 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
import org.apache.hadoop.hdfs.StripeReader.BlockReaderInfo;
import org.apache.hadoop.hdfs.StripeReader.ReaderRetryPolicy;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripeRange;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
/**
* DFSStripedInputStream reads from striped block groups.
*/
@InterfaceAudience.Private
public class DFSStripedInputStream extends DFSInputStream {
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
private final BlockReaderInfo[] blockReaders;
private final int cellSize;
private final short dataBlkNum;
private final short parityBlkNum;
private final int groupSize;
/** the buffer for a complete stripe. */
private ByteBuffer curStripeBuf;
@VisibleForTesting
protected ByteBuffer parityBuf;
private final ErasureCodingPolicy ecPolicy;
private RawErasureDecoder decoder;
/**
* Indicate the start/end offset of the current buffered stripe in the
* block group.
*/
private StripeRange curStripeRange;
/**
* When warning the user of a lost block in striping mode, we remember the
* dead nodes we've logged. All other striping blocks on these nodes can be
* considered lost too, and we don't want to log a warning for each of them.
* This is to prevent the log from being too verbose. Refer to HDFS-8920.
*
* To minimize the overhead, we only store the datanodeUuid in this set
*/
private final Set<String> warnedNodes =
Collections.newSetFromMap(new ConcurrentHashMap<>());
DFSStripedInputStream(DFSClient dfsClient, String src,
boolean verifyChecksum, ErasureCodingPolicy ecPolicy,
LocatedBlocks locatedBlocks) throws IOException {
super(dfsClient, src, verifyChecksum, locatedBlocks);
this.readStatistics.setBlockType(BlockType.STRIPED);
assert ecPolicy != null;
this.ecPolicy = ecPolicy;
this.cellSize = ecPolicy.getCellSize();
dataBlkNum = (short) ecPolicy.getNumDataUnits();
parityBlkNum = (short) ecPolicy.getNumParityUnits();
groupSize = dataBlkNum + parityBlkNum;
blockReaders = new BlockReaderInfo[groupSize];
curStripeRange = new StripeRange(0, 0);
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
dataBlkNum, parityBlkNum);
decoder = CodecUtil.createRawDecoder(dfsClient.getConfiguration(),
ecPolicy.getCodecName(), coderOptions);
DFSClient.LOG.debug("Creating an striped input stream for file {}", src);
}
private boolean useDirectBuffer() {
return decoder.preferDirectBuffer();
}
private void resetCurStripeBuffer(boolean shouldAllocateBuf) {
if (shouldAllocateBuf && curStripeBuf == null) {
curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
cellSize * dataBlkNum);
}
if (curStripeBuf != null) {
curStripeBuf.clear();
}
curStripeRange = new StripeRange(0, 0);
}
protected synchronized ByteBuffer getParityBuffer() {
if (parityBuf == null) {
parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
cellSize * parityBlkNum);
}
parityBuf.clear();
return parityBuf;
}
protected ByteBuffer getCurStripeBuf() {
return curStripeBuf;
}
protected String getSrc() {
return src;
}
protected LocatedBlocks getLocatedBlocks() {
return locatedBlocks;
}
protected ByteBufferPool getBufferPool() {
return BUFFER_POOL;
}
protected ThreadPoolExecutor getStripedReadsThreadPool(){
return dfsClient.getStripedReadsThreadPool();
}
/**
* When seeking into a new block group, create blockReader for each internal
* block in the group.
*/
@VisibleForTesting
synchronized void blockSeekTo(long target) throws IOException {
if (target >= getFileLength()) {
throw new IOException("Attempted to read past end of file");
}
// Will be getting a new BlockReader.
closeCurrentBlockReaders();
// Compute desired striped block group
LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target);
// Update current position
this.pos = target;
this.blockEnd = targetBlockGroup.getStartOffset() +
targetBlockGroup.getBlockSize() - 1;
currentLocatedBlock = targetBlockGroup;
}
@Override
public synchronized void close() throws IOException {
try {
super.close();
} finally {
if (curStripeBuf != null) {
BUFFER_POOL.putBuffer(curStripeBuf);
curStripeBuf = null;
}
if (parityBuf != null) {
BUFFER_POOL.putBuffer(parityBuf);
parityBuf = null;
}
if (decoder != null) {
decoder.release();
decoder = null;
}
}
}
/**
* Extend the super method with the logic of switching between cells.
* When reaching the end of a cell, proceed to the next cell and read it
* with the next blockReader.
*/
@Override
protected void closeCurrentBlockReaders() {
resetCurStripeBuffer(false);
if (blockReaders == null || blockReaders.length == 0) {
return;
}
for (int i = 0; i < groupSize; i++) {
closeReader(blockReaders[i]);
blockReaders[i] = null;
}
blockEnd = -1;
}
protected void closeReader(BlockReaderInfo readerInfo) {
if (readerInfo != null) {
if (readerInfo.reader != null) {
try {
readerInfo.reader.close();
} catch (Throwable ignored) {
}
}
readerInfo.skip();
}
}
private long getOffsetInBlockGroup() {
return getOffsetInBlockGroup(pos);
}
private long getOffsetInBlockGroup(long pos) {
return pos - currentLocatedBlock.getStartOffset();
}
boolean createBlockReader(LocatedBlock block, long offsetInBlock,
LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
int chunkIndex) throws IOException {
BlockReader reader = null;
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
DFSInputStream.DNAddrPair dnInfo =
new DFSInputStream.DNAddrPair(null, null, null, null);
while (true) {
try {
// the cached block location might have been re-fetched, so always
// get it from cache.
block = refreshLocatedBlock(block);
targetBlocks[chunkIndex] = block;
// internal block has one location, just rule out the deadNodes
dnInfo = getBestNodeDNAddrPair(block, null);
if (dnInfo == null) {
break;
}
reader = getBlockReader(block, offsetInBlock,
block.getBlockSize() - offsetInBlock,
dnInfo.addr, dnInfo.storageType, dnInfo.info);
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException &&
retry.shouldRefetchEncryptionKey()) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + dnInfo.addr
+ " : " + e);
dfsClient.clearDataEncryptionKey();
retry.refetchEncryptionKey();
} else if (retry.shouldRefetchToken() &&
tokenRefetchNeeded(e, dnInfo.addr)) {
fetchBlockAt(block.getStartOffset());
retry.refetchToken();
} else {
//TODO: handles connection issues
DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
"block" + block.getBlock(), e);
// re-fetch the block in case the block has been moved
fetchBlockAt(block.getStartOffset());
addToLocalDeadNodes(dnInfo.info);
}
}
if (reader != null) {
readerInfos[chunkIndex] =
new BlockReaderInfo(reader, dnInfo.info, offsetInBlock);
return true;
}
}
return false;
}
/**
* Read a new stripe covering the current position, and store the data in the
* {@link #curStripeBuf}.
*/
private void readOneStripe(CorruptedBlocks corruptedBlocks)
throws IOException {
resetCurStripeBuffer(true);
// compute stripe range based on pos
final long offsetInBlockGroup = getOffsetInBlockGroup();
final long stripeLen = cellSize * dataBlkNum;
final int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
- (stripeIndex * stripeLen), stripeLen);
StripeRange stripeRange =
new StripeRange(offsetInBlockGroup, stripeLimit - stripeBufOffset);
LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy,
cellSize, blockGroup, offsetInBlockGroup,
offsetInBlockGroup + stripeRange.getLength() - 1, curStripeBuf);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
blockGroup, cellSize, dataBlkNum, parityBlkNum);
// read the whole stripe
for (AlignedStripe stripe : stripes) {
// Parse group to get chosen DN location
StripeReader sreader = new StatefulStripeReader(stripe, ecPolicy, blks,
blockReaders, corruptedBlocks, decoder, this);
sreader.readStripe();
}
curStripeBuf.position(stripeBufOffset);
curStripeBuf.limit(stripeLimit);
curStripeRange = stripeRange;
}
/**
* Update read statistics. Note that this has to be done on the thread that
* initiates the read, rather than inside each async thread, for
* {@link org.apache.hadoop.fs.FileSystem.Statistics} to work correctly with
* its ThreadLocal.
*
* @param stats striped read stats
*/
void updateReadStats(final StripedBlockUtil.BlockReadStats stats) {
if (stats == null) {
return;
}
updateReadStatistics(readStatistics, stats.getBytesRead(),
stats.isShortCircuit(), stats.getNetworkDistance());
dfsClient.updateFileSystemReadStats(stats.getNetworkDistance(),
stats.getBytesRead());
assert readStatistics.getBlockType() == BlockType.STRIPED;
dfsClient.updateFileSystemECReadStats(stats.getBytesRead());
}
/**
* Seek to a new arbitrary location.
*/
@Override
public synchronized void seek(long targetPos) throws IOException {
if (targetPos > getFileLength()) {
throw new EOFException("Cannot seek after EOF");
}
if (targetPos < 0) {
throw new EOFException("Cannot seek to negative offset");
}
if (closed.get()) {
throw new IOException("Stream is closed!");
}
if (targetPos <= blockEnd) {
final long targetOffsetInBlk = getOffsetInBlockGroup(targetPos);
if (curStripeRange.include(targetOffsetInBlk)) {
int bufOffset = getStripedBufOffset(targetOffsetInBlk);
curStripeBuf.position(bufOffset);
pos = targetPos;
return;
}
}
pos = targetPos;
blockEnd = -1;
}
private int getStripedBufOffset(long offsetInBlockGroup) {
final long stripeLen = cellSize * dataBlkNum;
// compute the position in the curStripeBuf based on "pos"
return (int) (offsetInBlockGroup % stripeLen);
}
@Override
public synchronized boolean seekToNewSource(long targetPos)
throws IOException {
return false;
}
@Override
protected synchronized int readWithStrategy(ReaderStrategy strategy)
throws IOException {
dfsClient.checkOpen();
if (closed.get()) {
throw new IOException("Stream closed");
}
int len = strategy.getTargetLength();
CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
if (pos < getFileLength()) {
try {
if (pos > blockEnd) {
blockSeekTo(pos);
}
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
synchronized (infoLock) {
if (locatedBlocks.isLastBlockComplete()) {
realLen = (int) Math.min(realLen,
locatedBlocks.getFileLength() - pos);
}
}
/** Number of bytes already read into buffer */
int result = 0;
while (result < realLen) {
if (!curStripeRange.include(getOffsetInBlockGroup())) {
readOneStripe(corruptedBlocks);
}
int ret = copyToTargetBuf(strategy, realLen - result);
result += ret;
pos += ret;
}
return result;
} finally {
// Check if need to report block replicas corruption either read
// was successful or ChecksumException occurred.
reportCheckSumFailure(corruptedBlocks, getCurrentBlockLocationsLength(),
true);
}
}
return -1;
}
/**
* Copy the data from {@link #curStripeBuf} into the given buffer.
* @param strategy the ReaderStrategy containing the given buffer
* @param length target length
* @return number of bytes copied
*/
private int copyToTargetBuf(ReaderStrategy strategy, int length) {
final long offsetInBlk = getOffsetInBlockGroup();
int bufOffset = getStripedBufOffset(offsetInBlk);
curStripeBuf.position(bufOffset);
return strategy.readFromBuffer(curStripeBuf,
Math.min(length, curStripeBuf.remaining()));
}
/**
* The super method {@link DFSInputStream#refreshLocatedBlock} refreshes
* cached LocatedBlock by executing {@link DFSInputStream#getBlockAt} again.
* This method extends the logic by first remembering the index of the
* internal block, and re-parsing the refreshed block group with the same
* index.
*/
@Override
protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
throws IOException {
int idx = StripedBlockUtil.getBlockIndex(block.getBlock().getLocalBlock());
LocatedBlock lb = getBlockGroupAt(block.getStartOffset());
// If indexing information is returned, iterate through the index array
// to find the entry for position idx in the group
LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
int i = 0;
for (; i < lsb.getBlockIndices().length; i++) {
if (lsb.getBlockIndices()[i] == idx) {
break;
}
}
DFSClient.LOG.debug("refreshLocatedBlock for striped blocks, offset={}." +
" Obtained block {}, idx={}", block.getStartOffset(), lb, idx);
return StripedBlockUtil.constructInternalBlock(
lsb, i, cellSize, dataBlkNum, idx);
}
private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
LocatedBlock lb = super.getBlockAt(offset);
assert lb instanceof LocatedStripedBlock : "NameNode" +
" should return a LocatedStripedBlock for a striped file";
return (LocatedStripedBlock)lb;
}
/**
* Real implementation of pread.
*/
@Override
protected void fetchBlockByteRange(LocatedBlock block, long start,
long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks)
throws IOException {
// Refresh the striped block group
LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());
AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
ecPolicy, cellSize, blockGroup, start, end, buf);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
blockGroup, cellSize, dataBlkNum, parityBlkNum);
final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
try {
for (AlignedStripe stripe : stripes) {
// Parse group to get chosen DN location
StripeReader preader = new PositionStripeReader(stripe, ecPolicy, blks,
preaderInfos, corruptedBlocks, decoder, this);
try {
preader.readStripe();
} finally {
preader.close();
}
}
buf.position(buf.position() + (int)(end - start + 1));
} finally {
for (BlockReaderInfo preaderInfo : preaderInfos) {
closeReader(preaderInfo);
}
}
}
@Override
protected void reportLostBlock(LocatedBlock lostBlock,
Collection<DatanodeInfo> ignoredNodes) {
DatanodeInfo[] nodes = lostBlock.getLocations();
if (nodes != null && nodes.length > 0) {
List<String> dnUUIDs = new ArrayList<>();
for (DatanodeInfo node : nodes) {
dnUUIDs.add(node.getDatanodeUuid());
}
if (!warnedNodes.containsAll(dnUUIDs)) {
DFSClient.LOG.warn(Arrays.toString(nodes) + " are unavailable and " +
"all striping blocks on them are lost. " +
"IgnoredNodes = {}", ignoredNodes);
warnedNodes.addAll(dnUUIDs);
}
} else {
super.reportLostBlock(lostBlock, ignoredNodes);
}
}
/**
* May need online read recovery, zero-copy read doesn't make
* sense, so don't support it.
*/
@Override
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
throw new UnsupportedOperationException(
"Not support enhanced byte buffer access.");
}
@Override
public synchronized void releaseBuffer(ByteBuffer buffer) {
throw new UnsupportedOperationException(
"Not support enhanced byte buffer access.");
}
@Override
public synchronized void unbuffer() {
super.unbuffer();
if (curStripeBuf != null) {
BUFFER_POOL.putBuffer(curStripeBuf);
curStripeBuf = null;
}
if (parityBuf != null) {
BUFFER_POOL.putBuffer(parityBuf);
parityBuf = null;
}
}
}