blob: 7652d5f30871a387a56bdf3d12e38d2f2c8f824a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient.BlockReader;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
/**
* BlockReaderLocal enables local short circuited reads. If the DFS client is on
* the same machine as the datanode, then the client can read files directly
* from the local file system rather than going through the datanode for better
* performance. <br>
* {@link BlockReaderLocal} works as follows:
* <ul>
* <li>The client performing short circuit reads must be configured at the
* datanode.</li>
* <li>The client gets the path to the file where block is stored using
* {@link ClientDatanodeProtocol#getBlockLocalPathInfo(Block, Token)} RPC call</li>
* <li>Client uses kerberos authentication to connect to the datanode over RPC,
* if security is enabled.</li>
* </ul>
*/
class BlockReaderLocal extends BlockReader {
public static final Log LOG = LogFactory.getLog(DFSClient.class);
//Stores the cache and proxy for a local datanode.
private static class LocalDatanodeInfo {
private ClientDatanodeProtocol proxy = null;
private final Map<Block, BlockLocalPathInfo> cache;
LocalDatanodeInfo() {
final int cacheSize = 10000;
final float hashTableLoadFactor = 0.75f;
int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
cache = Collections
.synchronizedMap(new LinkedHashMap<Block, BlockLocalPathInfo>(
hashTableCapacity, hashTableLoadFactor, true) {
private static final long serialVersionUID = 1;
@Override
protected boolean removeEldestEntry(
Map.Entry<Block, BlockLocalPathInfo> eldest) {
return size() > cacheSize;
}
});
}
private synchronized ClientDatanodeProtocol getDatanodeProxy(
DatanodeInfo node, Configuration conf, int socketTimeout)
throws IOException {
if (proxy == null) {
proxy = DFSClient.createClientDatanodeProtocolProxy(node, conf,
socketTimeout);
}
return proxy;
}
private synchronized void resetDatanodeProxy() {
if (null != proxy) {
RPC.stopProxy(proxy);
proxy = null;
}
}
private BlockLocalPathInfo getBlockLocalPathInfo(Block b) {
return cache.get(b);
}
private void setBlockLocalPathInfo(Block b, BlockLocalPathInfo info) {
cache.put(b, info);
}
private void removeBlockLocalPathInfo(Block b) {
cache.remove(b);
}
}
// Multiple datanodes could be running on the local machine. Store proxies in
// a map keyed by the ipc port of the datanode.
private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
private FileInputStream dataIn; // reader for the data file
private FileInputStream checksumIn; // reader for the checksum file
/**
* The only way this object can be instantiated.
*/
static BlockReaderLocal newBlockReader(Configuration conf,
String file, Block blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
int socketTimeout, long startOffset, long length) throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
// check the cache first
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
if (pathinfo == null) {
pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
}
// check to see if the file exists. It may so happen that the
// HDFS file has been deleted and this block-lookup is occurring
// on behalf of a new HDFS file. This time, the block file could
// be residing in a different portion of the fs.data.dir directory.
// In this case, we remove this entry from the cache. The next
// call to this method will re-populate the cache.
FileInputStream dataIn = null;
FileInputStream checksumIn = null;
BlockReaderLocal localBlockReader = null;
boolean skipChecksum = shortCircuitChecksum(conf);
try {
// get a local file system
File blkfile = new File(pathinfo.getBlockPath());
dataIn = new FileInputStream(blkfile);
if (LOG.isDebugEnabled()) {
LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
+ blkfile.length() + " startOffset " + startOffset + " length "
+ length + " short circuit checksum " + skipChecksum);
}
if (!skipChecksum) {
// get the metadata file
File metafile = new File(pathinfo.getMetaPath());
checksumIn = new FileInputStream(metafile);
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader
.readHeader(new DataInputStream(checksumIn));
short version = header.getVersion();
if (version != FSDataset.METADATA_VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for "
+ blk + " ignoring ...");
}
DataChecksum checksum = header.getChecksum();
localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length,
pathinfo, checksum, true, dataIn, checksumIn);
} else {
localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length,
pathinfo, dataIn);
}
} catch (IOException e) {
// remove from cache
localDatanodeInfo.removeBlockLocalPathInfo(blk);
DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk +
" from cache because local file " + pathinfo.getBlockPath() +
" could not be opened.");
throw e;
} finally {
if (localBlockReader == null) {
if (dataIn != null) {
dataIn.close();
}
if (checksumIn != null) {
checksumIn.close();
}
}
}
return localBlockReader;
}
private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
if (ldInfo == null) {
ldInfo = new LocalDatanodeInfo();
localDatanodeInfoMap.put(port, ldInfo);
}
return ldInfo;
}
private static BlockLocalPathInfo getBlockPathInfo(Block blk,
DatanodeInfo node, Configuration conf, int timeout,
Token<BlockTokenIdentifier> token) throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort);
BlockLocalPathInfo pathinfo = null;
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
conf, timeout);
try {
// make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token);
if (pathinfo != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cached location of block " + blk + " as " + pathinfo);
}
localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
}
} catch (IOException e) {
localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
throw e;
}
return pathinfo;
}
private static boolean shortCircuitChecksum(Configuration conf) {
return conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
}
private BlockReaderLocal(Configuration conf, String hdfsfile, Block block,
Token<BlockTokenIdentifier> token, long startOffset, long length,
BlockLocalPathInfo pathinfo, FileInputStream dataIn) throws IOException {
super(
new Path("/blk_" + block.getBlockId() + ":of:" + hdfsfile) /*too non path-like?*/,
1);
this.startOffset = startOffset;
this.dataIn = dataIn;
long toSkip = startOffset;
while (toSkip > 0) {
long skipped = dataIn.skip(toSkip);
if (skipped == 0) {
throw new IOException("Couldn't initialize input stream");
}
toSkip -= skipped;
}
}
private BlockReaderLocal(Configuration conf, String hdfsfile, Block block,
Token<BlockTokenIdentifier> token, long startOffset, long length,
BlockLocalPathInfo pathinfo, DataChecksum checksum, boolean verifyChecksum,
FileInputStream dataIn, FileInputStream checksumIn) throws IOException {
super(
new Path("/blk_" + block.getBlockId() + ":of:" + hdfsfile) /*too non path-like?*/,
1,
checksum,
verifyChecksum);
this.startOffset = startOffset;
this.dataIn = dataIn;
this.checksumIn = checksumIn;
this.checksum = checksum;
long blockLength = pathinfo.getNumBytes();
/* If bytesPerChecksum is very large, then the metadata file
* is mostly corrupted. For now just truncate bytesPerchecksum to
* blockLength.
*/
bytesPerChecksum = checksum.getBytesPerChecksum();
if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
Math.max((int) blockLength, 10 * 1024 * 1024));
bytesPerChecksum = checksum.getBytesPerChecksum();
}
checksumSize = checksum.getChecksumSize();
long endOffset = blockLength;
if (startOffset < 0 || startOffset > endOffset
|| (length + startOffset) > endOffset) {
String msg = " Offset " + startOffset + " and length " + length
+ " don't match block " + block + " ( blockLen " + endOffset + " )";
LOG.warn("BlockReaderLocal requested with incorrect offset: " + msg);
throw new IOException(msg);
}
firstChunkOffset = (startOffset - (startOffset % bytesPerChecksum));
if (length >= 0) {
// Make sure endOffset points to end of a checksumed chunk.
long tmpLen = startOffset + length;
if (tmpLen % bytesPerChecksum != 0) {
tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
}
if (tmpLen < endOffset) {
endOffset = tmpLen;
}
}
// seek to the right offsets
if (firstChunkOffset > 0) {
dataIn.getChannel().position(firstChunkOffset);
long checksumSkip = (firstChunkOffset / bytesPerChecksum) * checksumSize;
// note blockInStream is seeked when created below
if (checksumSkip > 0) {
checksumIn.skip(checksumSkip);
}
}
lastChunkOffset = firstChunkOffset;
lastChunkLen = -1;
}
@Override
public synchronized int read(byte[] buf, int off, int len) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("read off " + off + " len " + len);
}
if (checksum == null) {
return dataIn.read(buf, off, len);
} else {
return super.read(buf, off, len);
}
}
@Override
public synchronized long skip(long n) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("skip " + n);
}
if (checksum == null) {
return dataIn.skip(n);
} else {
return super.skip(n);
}
}
@Override
public synchronized void seek(long n) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("seek " + n);
}
throw new IOException("Seek() is not supported in BlockReaderLocal");
}
@Override
protected synchronized int readChunk(long pos, byte[] buf, int offset,
int len, byte[] checksumBuf) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Reading chunk from position " + pos + " at offset " +
offset + " with length " + len);
}
if (gotEOS) {
startOffset = -1;
return -1;
}
if (checksumBuf.length != checksumSize) {
throw new IOException("Cannot read checksum into buffer. "
+ "The buffer must be exactly '" + checksumSize
+ "' bytes long to hold the checksum bytes.");
}
if ((pos + firstChunkOffset) != lastChunkOffset) {
throw new IOException("Mismatch in pos : " + pos + " + "
+ firstChunkOffset + " != " + lastChunkOffset);
}
int nRead = dataIn.read(buf, offset, bytesPerChecksum);
if (nRead < bytesPerChecksum) {
gotEOS = true;
}
lastChunkOffset += nRead;
lastChunkLen = nRead;
// If verifyChecksum is false, we omit reading the checksum
if (checksumIn != null) {
int nChecksumRead = checksumIn.read(checksumBuf);
if (nChecksumRead != checksumSize) {
throw new IOException("Could not read checksum at offset " +
checksumIn.getChannel().position() + " from the meta file.");
}
}
return nRead;
}
@Override
public synchronized void close() throws IOException {
if (dataIn != null) {
dataIn.close();
dataIn = null;
}
if (checksumIn != null) {
checksumIn.close();
checksumIn = null;
}
}
}