blob: cc61697edea36dc6b64228f969500e6c50c1aa05 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
/**
* BlockReaderLocal enables local short circuited reads. If the DFS client is on
* the same machine as the datanode, then the client can read files directly
* from the local file system rather than going through the datanode for better
* performance. <br>
* {@link BlockReaderLocal} works as follows:
* <ul>
* <li>The client performing short circuit reads must be configured at the
* datanode.</li>
* <li>The client gets the path to the file where block is stored using
* {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
* RPC call</li>
* <li>Client uses kerberos authentication to connect to the datanode over RPC,
* if security is enabled.</li>
* </ul>
*/
class BlockReaderLocal implements BlockReader {
private static final Log LOG = LogFactory.getLog(DFSClient.class);
//Stores the cache and proxy for a local datanode.
private static class LocalDatanodeInfo {
private ClientDatanodeProtocol proxy = null;
private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
LocalDatanodeInfo() {
final int cacheSize = 10000;
final float hashTableLoadFactor = 0.75f;
int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
cache = Collections
.synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
hashTableCapacity, hashTableLoadFactor, true) {
private static final long serialVersionUID = 1;
@Override
protected boolean removeEldestEntry(
Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
return size() > cacheSize;
}
});
}
private synchronized ClientDatanodeProtocol getDatanodeProxy(
DatanodeInfo node, Configuration conf, int socketTimeout)
throws IOException {
if (proxy == null) {
proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
socketTimeout);
}
return proxy;
}
private synchronized void resetDatanodeProxy() {
if (null != proxy) {
RPC.stopProxy(proxy);
proxy = null;
}
}
private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
return cache.get(b);
}
private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
cache.put(b, info);
}
private void removeBlockLocalPathInfo(ExtendedBlock b) {
cache.remove(b);
}
}
// Multiple datanodes could be running on the local machine. Store proxies in
// a map keyed by the ipc port of the datanode.
private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
private final FileInputStream dataIn; // reader for the data file
private FileInputStream checksumIn; // reader for the checksum file
private int offsetFromChunkBoundary;
private byte[] skipBuf = null;
private ByteBuffer dataBuff = null;
private ByteBuffer checksumBuff = null;
private DataChecksum checksum;
private final boolean verifyChecksum;
private static DirectBufferPool bufferPool = new DirectBufferPool();
private int bytesPerChecksum;
private int checksumSize;
/** offset in block where reader wants to actually read */
private long startOffset;
private final String filename;
/**
* The only way this object can be instantiated.
*/
static BlockReaderLocal newBlockReader(Configuration conf, String file,
ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
int socketTimeout, long startOffset, long length) throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
.getIpcPort());
// check the cache first
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
if (pathinfo == null) {
pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
}
// check to see if the file exists. It may so happen that the
// HDFS file has been deleted and this block-lookup is occurring
// on behalf of a new HDFS file. This time, the block file could
// be residing in a different portion of the fs.data.dir directory.
// In this case, we remove this entry from the cache. The next
// call to this method will re-populate the cache.
FileInputStream dataIn = null;
FileInputStream checksumIn = null;
BlockReaderLocal localBlockReader = null;
boolean skipChecksumCheck = skipChecksumCheck(conf);
try {
// get a local file system
File blkfile = new File(pathinfo.getBlockPath());
dataIn = new FileInputStream(blkfile);
if (LOG.isDebugEnabled()) {
LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
+ blkfile.length() + " startOffset " + startOffset + " length "
+ length + " short circuit checksum " + skipChecksumCheck);
}
if (!skipChecksumCheck) {
// get the metadata file
File metafile = new File(pathinfo.getMetaPath());
checksumIn = new FileInputStream(metafile);
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader
.readHeader(new DataInputStream(checksumIn));
short version = header.getVersion();
if (version != BlockMetadataHeader.VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for "
+ blk + " ignoring ...");
}
DataChecksum checksum = header.getChecksum();
long firstChunkOffset = startOffset
- (startOffset % checksum.getBytesPerChecksum());
localBlockReader = new BlockReaderLocal(conf, file, blk, token,
startOffset, length, pathinfo, checksum, true, dataIn,
firstChunkOffset, checksumIn);
} else {
localBlockReader = new BlockReaderLocal(conf, file, blk, token,
startOffset, length, pathinfo, dataIn);
}
} catch (IOException e) {
// remove from cache
localDatanodeInfo.removeBlockLocalPathInfo(blk);
DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk
+ " from cache because local file " + pathinfo.getBlockPath()
+ " could not be opened.");
throw e;
} finally {
if (localBlockReader == null) {
if (dataIn != null) {
dataIn.close();
}
if (checksumIn != null) {
checksumIn.close();
}
}
}
return localBlockReader;
}
private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
if (ldInfo == null) {
ldInfo = new LocalDatanodeInfo();
localDatanodeInfoMap.put(port, ldInfo);
}
return ldInfo;
}
private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
DatanodeInfo node, Configuration conf, int timeout,
Token<BlockTokenIdentifier> token) throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort);
BlockLocalPathInfo pathinfo = null;
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
conf, timeout);
try {
// make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token);
if (pathinfo != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cached location of block " + blk + " as " + pathinfo);
}
localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
}
} catch (IOException e) {
localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
throw e;
}
return pathinfo;
}
private static boolean skipChecksumCheck(Configuration conf) {
return conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
}
private BlockReaderLocal(Configuration conf, String hdfsfile,
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
throws IOException {
this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, 4), false,
dataIn, startOffset, null);
}
private BlockReaderLocal(Configuration conf, String hdfsfile,
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
FileInputStream checksumIn) throws IOException {
this.filename = hdfsfile;
this.checksum = checksum;
this.verifyChecksum = verifyChecksum;
this.startOffset = Math.max(startOffset, 0);
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
this.dataIn = dataIn;
this.checksumIn = checksumIn;
this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
dataBuff = bufferPool.getBuffer(bytesPerChecksum*64);
checksumBuff = bufferPool.getBuffer(checksumSize*64);
//Initially the buffers have nothing to read.
dataBuff.flip();
checksumBuff.flip();
long toSkip = firstChunkOffset;
while (toSkip > 0) {
long skipped = dataIn.skip(toSkip);
if (skipped == 0) {
throw new IOException("Couldn't initialize input stream");
}
toSkip -= skipped;
}
if (checksumIn != null) {
long checkSumOffset = (firstChunkOffset / bytesPerChecksum)
* checksumSize;
while (checkSumOffset > 0) {
long skipped = checksumIn.skip(checkSumOffset);
if (skipped == 0) {
throw new IOException("Couldn't initialize checksum input stream");
}
checkSumOffset -= skipped;
}
}
}
private int readIntoBuffer(FileInputStream stream, ByteBuffer buf)
throws IOException {
int bytesRead = stream.getChannel().read(buf);
if (bytesRead < 0) {
//EOF
return bytesRead;
}
while (buf.remaining() > 0) {
int n = stream.getChannel().read(buf);
if (n < 0) {
//EOF
return bytesRead;
}
bytesRead += n;
}
return bytesRead;
}
@Override
public synchronized int read(byte[] buf, int off, int len) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.info("read off " + off + " len " + len);
}
if (!verifyChecksum) {
return dataIn.read(buf, off, len);
} else {
int dataRead = -1;
if (dataBuff.remaining() == 0) {
dataBuff.clear();
checksumBuff.clear();
dataRead = readIntoBuffer(dataIn, dataBuff);
readIntoBuffer(checksumIn, checksumBuff);
checksumBuff.flip();
dataBuff.flip();
checksum.verifyChunkedSums(dataBuff, checksumBuff, filename,
this.startOffset);
} else {
dataRead = dataBuff.remaining();
}
if (dataRead > 0) {
int nRead = Math.min(dataRead - offsetFromChunkBoundary, len);
if (offsetFromChunkBoundary > 0) {
dataBuff.position(offsetFromChunkBoundary);
// Its either end of file or dataRead is greater than the
// offsetFromChunkBoundary
offsetFromChunkBoundary = 0;
}
if (nRead > 0) {
dataBuff.get(buf, off, nRead);
return nRead;
} else {
return 0;
}
} else {
return -1;
}
}
}
@Override
public synchronized long skip(long n) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("skip " + n);
}
if (n <= 0) {
return 0;
}
if (!verifyChecksum) {
return dataIn.skip(n);
}
// caller made sure newPosition is not beyond EOF.
int remaining = dataBuff.remaining();
int position = dataBuff.position();
int newPosition = position + (int)n;
// if the new offset is already read into dataBuff, just reposition
if (n <= remaining) {
assert offsetFromChunkBoundary == 0;
dataBuff.position(newPosition);
return n;
}
// for small gap, read through to keep the data/checksum in sync
if (n - remaining <= bytesPerChecksum) {
dataBuff.position(position + remaining);
if (skipBuf == null) {
skipBuf = new byte[bytesPerChecksum];
}
int ret = read(skipBuf, 0, (int)(n - remaining));
return ret;
}
// optimize for big gap: discard the current buffer, skip to
// the beginning of the appropriate checksum chunk and then
// read to the middle of that chunk to be in sync with checksums.
this.offsetFromChunkBoundary = newPosition % bytesPerChecksum;
long toskip = n - remaining - this.offsetFromChunkBoundary;
dataBuff.clear();
checksumBuff.clear();
long dataSkipped = dataIn.skip(toskip);
if (dataSkipped != toskip) {
throw new IOException("skip error in data input stream");
}
long checkSumOffset = (dataSkipped / bytesPerChecksum) * checksumSize;
if (checkSumOffset > 0) {
long skipped = checksumIn.skip(checkSumOffset);
if (skipped != checkSumOffset) {
throw new IOException("skip error in checksum input stream");
}
}
// read into the middle of the chunk
if (skipBuf == null) {
skipBuf = new byte[bytesPerChecksum];
}
assert skipBuf.length == bytesPerChecksum;
assert this.offsetFromChunkBoundary < bytesPerChecksum;
int ret = read(skipBuf, 0, this.offsetFromChunkBoundary);
if (ret == -1) { // EOS
return toskip;
} else {
return (toskip + ret);
}
}
@Override
public synchronized void close() throws IOException {
dataIn.close();
if (checksumIn != null) {
checksumIn.close();
}
if (dataBuff != null) {
bufferPool.returnBuffer(dataBuff);
dataBuff = null;
}
if (checksumBuff != null) {
bufferPool.returnBuffer(checksumBuff);
checksumBuff = null;
}
startOffset = -1;
checksum = null;
}
@Override
public int readAll(byte[] buf, int offset, int len) throws IOException {
return BlockReaderUtil.readAll(this, buf, offset, len);
}
@Override
public void readFully(byte[] buf, int off, int len) throws IOException {
BlockReaderUtil.readFully(this, buf, off, len);
}
@Override
public Socket takeSocket() {
return null;
}
@Override
public boolean hasSentStatusCode() {
return false;
}
}