HDFS-941. The DFS client should cache and reuse open sockets to datanodes while performing reads. Contributed by bc Wong and Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1134023 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index b626062..535d5b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -503,6 +503,9 @@
HDFS-1826. NameNode should save image to name directories in parallel
during upgrade. (Matt Foley via hairong)
+ HDFS-941. The DFS client should cache and reuse open sockets to datanodes
+ while performing reads. (bc Wong and Todd Lipcon via todd)
+
BUG FIXES
HDFS-1449. Fix test failures - ExtendedBlock must return
diff --git a/src/java/org/apache/hadoop/hdfs/BlockReader.java b/src/java/org/apache/hadoop/hdfs/BlockReader.java
index 5ff696d..c7578eb 100644
--- a/src/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/src/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -46,12 +46,29 @@
import org.apache.hadoop.util.DataChecksum;
/** This is a wrapper around connection to datanode
- * and understands checksum, offset etc
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ * <dd>The hdfs block, typically large (~64MB).
+ * </dd>
+ * <dt>chunk</dt>
+ * <dd>A block is divided into chunks, each comes with a checksum.
+ * We want transfers to be chunk-aligned, to be able to
+ * verify checksums.
+ * </dd>
+ * <dt>packet</dt>
+ * <dd>A grouping of chunks used for transport. It contains a
+ * header, followed by checksum data, followed by real data.
+ * </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
*/
@InterfaceAudience.Private
public class BlockReader extends FSInputChecker {
- Socket dnSock; //for now just sending checksumOk.
+ Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
private DataInputStream in;
private DataChecksum checksum;
@@ -77,10 +94,12 @@
*/
private final long bytesNeededToFinish;
- private boolean gotEOS = false;
+ private boolean eos = false;
+ private boolean sentStatusCode = false;
byte[] skipBuf = null;
ByteBuffer checksumBytes = null;
+ /** Amount of unread data in the current received packet */
int dataLeft = 0;
/* FSInputChecker interface */
@@ -99,7 +118,7 @@
// This has to be set here, *before* the skip, since we can
// hit EOS during the skip, in the case that our entire read
// is smaller than the checksum chunk.
- boolean eosBefore = gotEOS;
+ boolean eosBefore = eos;
//for the first read, skip the extra bytes at the front.
if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
@@ -115,11 +134,14 @@
}
int nRead = super.read(buf, off, len);
-
- // if gotEOS was set in the previous read and checksum is enabled :
- if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) {
- //checksum is verified and there are no errors.
- checksumOk(dnSock);
+
+ // if eos was set in the previous read, send a status code to the DN
+ if (eos && !eosBefore && nRead >= 0) {
+ if (needChecksum()) {
+ sendReadResult(dnSock, CHECKSUM_OK);
+ } else {
+ sendReadResult(dnSock, SUCCESS);
+ }
}
return nRead;
}
@@ -191,7 +213,7 @@
int len, byte[] checksumBuf)
throws IOException {
// Read one chunk.
- if ( gotEOS ) {
+ if (eos) {
// Already hit EOF
return -1;
}
@@ -246,7 +268,7 @@
if (checksumSize > 0) {
- // How many chunks left in our stream - this is a ceiling
+ // How many chunks left in our packet - this is a ceiling
// since we may have a partial chunk at the end of the file
int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
@@ -307,7 +329,7 @@
", dataLen : " + dataLen);
}
- gotEOS = true;
+ eos = true;
}
if ( bytesToRead == 0 ) {
@@ -335,7 +357,7 @@
// The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead), plus the padding at
// the beginning in order to chunk-align. Note that the DN may elect
- // to send more than this amount if the read ends mid-chunk.
+ // to send more than this amount if the read starts/ends mid-chunk.
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
this.firstChunkOffset = firstChunkOffset;
@@ -364,6 +386,21 @@
len, bufferSize, verifyChecksum, "");
}
+ /**
+ * Create a new BlockReader specifically to satisfy a read.
+ * This method also sends the OP_READ_BLOCK request.
+ *
+ * @param sock An established Socket to the DN. The BlockReader will not close it normally
+ * @param file File location
+ * @param block The block object
+ * @param blockToken The block token for security
+ * @param startOffset The read offset, relative to block head
+ * @param len The number of bytes to read
+ * @param bufferSize The IO buffer size (not the client buffer size)
+ * @param verifyChecksum Whether to verify checksum
+ * @param clientName Client name
+ * @return New BlockReader instance, or null on error.
+ */
public static BlockReader newBlockReader( Socket sock, String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
@@ -423,6 +460,10 @@
public synchronized void close() throws IOException {
startOffset = -1;
checksum = null;
+ if (dnSock != null) {
+ dnSock.close();
+ }
+
// in will be closed when its Socket is closed.
}
@@ -432,22 +473,43 @@
public int readAll(byte[] buf, int offset, int len) throws IOException {
return readFully(this, buf, offset, len);
}
-
- /* When the reader reaches end of the read and there are no checksum
- * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that
- * checksum was verified and there was no error.
- */
- void checksumOk(Socket sock) {
+
+ /**
+ * Take the socket used to talk to the DN.
+ */
+ public Socket takeSocket() {
+ assert hasSentStatusCode() :
+ "BlockReader shouldn't give back sockets mid-read";
+ Socket res = dnSock;
+ dnSock = null;
+ return res;
+ }
+
+ /**
+ * Whether the BlockReader has reached the end of its input stream
+ * and successfully sent a status code back to the datanode.
+ */
+ public boolean hasSentStatusCode() {
+ return sentStatusCode;
+ }
+
+ /**
+ * When the reader reaches end of the read, it sends a status response
+ * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+ * closing our connection (which we will re-open), but won't affect
+ * data correctness.
+ */
+ void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
+ assert !sentStatusCode : "already sent status code to " + sock;
try {
OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
- CHECKSUM_OK.writeOutputStream(out);
+ statusCode.writeOutputStream(out);
out.flush();
+ sentStatusCode = true;
} catch (IOException e) {
- // its ok not to be able to send this.
- if(LOG.isDebugEnabled()) {
- LOG.debug("Could not write to datanode " + sock.getInetAddress() +
- ": " + e.getMessage());
- }
+ // It's ok not to be able to send this. But something is probably wrong.
+ LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+ sock.getInetAddress() + ": " + e.getMessage());
}
}
diff --git a/src/java/org/apache/hadoop/hdfs/DFSClient.java b/src/java/org/apache/hadoop/hdfs/DFSClient.java
index 17a9829..9ae968b 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -136,6 +136,8 @@
final int hdfsTimeout; // timeout value for a DFS operation.
final LeaseRenewer leaserenewer;
+ final SocketCache socketCache;
+
/**
* A map from file names to {@link DFSOutputStream} objects
* that are currently being written by this client.
@@ -279,6 +281,10 @@
defaultReplication = (short)
conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+
+ this.socketCache = new SocketCache(
+ conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT));
if (nameNodeAddr != null && rpcNamenode == null) {
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
diff --git a/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8d51fab..94fbd9a 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -44,6 +44,8 @@
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
+ public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
+ public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
@@ -83,6 +85,8 @@
public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml";
public static final String DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth";
public static final boolean DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false;
+ public static final String DFS_CLIENT_CACHED_CONN_RETRY_KEY = "dfs.client.cached.conn.retry";
+ public static final int DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT = 3;
public static final String DFS_NAMENODE_ACCESSTIME_PRECISION_KEY = "dfs.namenode.accesstime.precision";
public static final long DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT = 3600000;
public static final String DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY = "dfs.namenode.replication.considerLoad";
@@ -112,6 +116,8 @@
public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
+ public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
+ public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
//Delegation token related keys
public static final String DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY = "dfs.namenode.delegation.key.update-interval";
diff --git a/src/java/org/apache/hadoop/hdfs/DFSInputStream.java b/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 0051cc8..49707ae 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -43,7 +43,6 @@
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
@@ -56,8 +55,9 @@
****************************************************************/
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream {
+ private final SocketCache socketCache;
+
private final DFSClient dfsClient;
- private Socket s = null;
private boolean closed = false;
private final String src;
@@ -92,7 +92,9 @@
private int buffersize = 1;
private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
-
+
+ private int nCachedConnRetry;
+
void addToDeadNodes(DatanodeInfo dnInfo) {
deadNodes.put(dnInfo, dnInfo);
}
@@ -103,9 +105,14 @@
this.verifyChecksum = verifyChecksum;
this.buffersize = buffersize;
this.src = src;
+ this.socketCache = dfsClient.socketCache;
prefetchSize = this.dfsClient.conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * dfsClient.defaultBlockSize);
- timeWindow = this.dfsClient.conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
+ timeWindow = this.dfsClient.conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
+ nCachedConnRetry = this.dfsClient.conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
+ DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
openInfo();
}
@@ -371,15 +378,11 @@
throw new IOException("Attempted to read past end of file");
}
- if ( blockReader != null ) {
- blockReader.close();
+ // Will be getting a new BlockReader.
+ if (blockReader != null) {
+ closeBlockReader(blockReader);
blockReader = null;
}
-
- if (s != null) {
- s.close();
- s = null;
- }
//
// Connect to best DataNode for desired Block, with potential offset
@@ -400,14 +403,12 @@
InetSocketAddress targetAddr = retval.addr;
try {
- s = dfsClient.socketFactory.createSocket();
- NetUtils.connect(s, targetAddr, dfsClient.socketTimeout);
- s.setSoTimeout(dfsClient.socketTimeout);
ExtendedBlock blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
- blockReader = BlockReader.newBlockReader(s, src, blk,
- accessToken,
+ blockReader = getBlockReader(
+ targetAddr, src, blk,
+ accessToken,
offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
buffersize, verifyChecksum, dfsClient.clientName);
return chosenNode;
@@ -437,13 +438,6 @@
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
}
- if (s != null) {
- try {
- s.close();
- } catch (IOException iex) {
- }
- }
- s = null;
}
}
}
@@ -457,16 +451,11 @@
return;
}
dfsClient.checkOpen();
-
- if ( blockReader != null ) {
- blockReader.close();
+
+ if (blockReader != null) {
+ closeBlockReader(blockReader);
blockReader = null;
}
-
- if (s != null) {
- s.close();
- s = null;
- }
super.close();
closed = true;
}
@@ -479,7 +468,7 @@
/* This is a used by regular read() and handles ChecksumExceptions.
* name readBuffer() is chosen to imply similarity to readBuffer() in
- * ChecksuFileSystem
+ * ChecksumFileSystem
*/
private synchronized int readBuffer(byte buf[], int off, int len,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
@@ -659,7 +648,6 @@
//
// Connect to best DataNode for desired Block, with potential offset
//
- Socket dn = null;
int refetchToken = 1; // only need to get a new access token once
while (true) {
@@ -673,18 +661,15 @@
BlockReader reader = null;
try {
- dn = dfsClient.socketFactory.createSocket();
- NetUtils.connect(dn, targetAddr, dfsClient.socketTimeout);
- dn.setSoTimeout(dfsClient.socketTimeout);
Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
int len = (int) (end - start + 1);
-
- reader = BlockReader.newBlockReader(dn, src,
- block.getBlock(),
- blockToken,
- start, len, buffersize,
- verifyChecksum, dfsClient.clientName);
+
+ reader = getBlockReader(targetAddr, src,
+ block.getBlock(),
+ blockToken,
+ start, len, buffersize,
+ verifyChecksum, dfsClient.clientName);
int nread = reader.readAll(buf, offset, len);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
@@ -713,8 +698,9 @@
}
}
} finally {
- IOUtils.closeStream(reader);
- IOUtils.closeSocket(dn);
+ if (reader != null) {
+ closeBlockReader(reader);
+ }
}
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
@@ -722,6 +708,95 @@
}
/**
+ * Close the given BlockReader and cache its socket.
+ */
+ private void closeBlockReader(BlockReader reader) throws IOException {
+ if (reader.hasSentStatusCode()) {
+ Socket oldSock = reader.takeSocket();
+ socketCache.put(oldSock);
+ }
+ reader.close();
+ }
+
+ /**
+ * Retrieve a BlockReader suitable for reading.
+ * This method will reuse the cached connection to the DN if appropriate.
+ * Otherwise, it will create a new connection.
+ *
+ * @param dnAddr Address of the datanode
+ * @param file File location
+ * @param block The Block object
+ * @param blockToken The access token for security
+ * @param startOffset The read offset, relative to block head
+ * @param len The number of bytes to read
+ * @param bufferSize The IO buffer size (not the client buffer size)
+ * @param verifyChecksum Whether to verify checksum
+ * @param clientName Client name
+ * @return New BlockReader instance
+ */
+ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
+ String file,
+ ExtendedBlock block,
+ Token<BlockTokenIdentifier> blockToken,
+ long startOffset,
+ long len,
+ int bufferSize,
+ boolean verifyChecksum,
+ String clientName)
+ throws IOException {
+ IOException err = null;
+ boolean fromCache = true;
+
+ // Allow retry since there is no way of knowing whether the cached socket
+ // is good until we actually use it.
+ for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
+ Socket sock = socketCache.get(dnAddr);
+ if (sock == null) {
+ fromCache = false;
+
+ sock = dfsClient.socketFactory.createSocket();
+
+ // TCP_NODELAY is crucial here because of bad interactions between
+ // Nagle's Algorithm and Delayed ACKs. With connection keepalive
+ // between the client and DN, the conversation looks like:
+ // 1. Client -> DN: Read block X
+ // 2. DN -> Client: data for block X
+ // 3. Client -> DN: Status OK (successful read)
+ // 4. Client -> DN: Read block Y
+ // The fact that step #3 and #4 are both in the client->DN direction
+ // triggers Nagling. If the DN is using delayed ACKs, this results
+ // in a delay of 40ms or more.
+ //
+ // TCP_NODELAY disables nagling and thus avoids this performance
+ // disaster.
+ sock.setTcpNoDelay(true);
+
+ NetUtils.connect(sock, dnAddr, dfsClient.socketTimeout);
+ sock.setSoTimeout(dfsClient.socketTimeout);
+ }
+
+ try {
+ // The OP_READ_BLOCK request is sent as we make the BlockReader
+ BlockReader reader =
+ BlockReader.newBlockReader(sock, file, block,
+ blockToken,
+ startOffset, len,
+ bufferSize, verifyChecksum,
+ clientName);
+ return reader;
+ } catch (IOException ex) {
+ // Our socket is no good.
+ DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
+ sock.close();
+ err = ex;
+ }
+ }
+
+ throw err;
+ }
+
+
+ /**
* Read bytes starting from the specified position.
*
* @param position start read from this position
diff --git a/src/java/org/apache/hadoop/hdfs/SocketCache.java b/src/java/org/apache/hadoop/hdfs/SocketCache.java
new file mode 100644
index 0000000..508ec61
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/SocketCache.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.net.Socket;
+import java.net.SocketAddress;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * A cache of sockets.
+ */
+class SocketCache {
+ static final Log LOG = LogFactory.getLog(SocketCache.class);
+
+ private final LinkedListMultimap<SocketAddress, Socket> multimap;
+ private final int capacity;
+
+ /**
+ * Create a SocketCache with the given capacity.
+ * @param capacity Max cache size.
+ */
+ public SocketCache(int capacity) {
+ multimap = LinkedListMultimap.create();
+ this.capacity = capacity;
+ }
+
+ /**
+ * Get a cached socket to the given address.
+ * @param remote Remote address the socket is connected to.
+ * @return A socket with unknown state, possibly closed underneath. Or null.
+ */
+ public synchronized Socket get(SocketAddress remote) {
+ List<Socket> socklist = multimap.get(remote);
+ if (socklist == null) {
+ return null;
+ }
+
+ Iterator<Socket> iter = socklist.iterator();
+ while (iter.hasNext()) {
+ Socket candidate = iter.next();
+ iter.remove();
+ if (!candidate.isClosed()) {
+ return candidate;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Give an unused socket to the cache.
+ * @param sock socket not used by anyone.
+ */
+ public synchronized void put(Socket sock) {
+ Preconditions.checkNotNull(sock);
+
+ SocketAddress remoteAddr = sock.getRemoteSocketAddress();
+ if (remoteAddr == null) {
+ LOG.warn("Cannot cache (unconnected) socket with no remote address: " +
+ sock);
+ IOUtils.closeSocket(sock);
+ return;
+ }
+
+ if (capacity == multimap.size()) {
+ evictOldest();
+ }
+ multimap.put(remoteAddr, sock);
+ }
+
+ public synchronized int size() {
+ return multimap.size();
+ }
+
+ /**
+ * Evict the oldest entry in the cache.
+ */
+ private synchronized void evictOldest() {
+ Iterator<Entry<SocketAddress, Socket>> iter =
+ multimap.entries().iterator();
+ if (!iter.hasNext()) {
+ throw new IllegalStateException("Cannot evict from empty cache!");
+ }
+ Entry<SocketAddress, Socket> entry = iter.next();
+ iter.remove();
+ Socket sock = entry.getValue();
+ IOUtils.closeSocket(sock);
+ }
+
+ /**
+ * Empty the cache, and close all sockets.
+ */
+ public synchronized void clear() {
+ for (Socket sock : multimap.values()) {
+ IOUtils.closeSocket(sock);
+ }
+ multimap.clear();
+ }
+
+ protected void finalize() {
+ clear();
+ }
+
+}
diff --git a/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java b/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
index bcb0786..94a86c4 100644
--- a/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
+++ b/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
@@ -83,6 +83,7 @@
public static int READ_TIMEOUT_EXTENSION = 5 * 1000;
public static int WRITE_TIMEOUT = 8 * 60 * 1000;
public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
+ public static int DN_KEEPALIVE_TIMEOUT = 5 * 1000;
/**
* Defines the NameNode role.
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 742486f..2bc8086 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -69,7 +69,8 @@
private long seqno; // sequence number of packet
private boolean transferToAllowed = true;
- private boolean blockReadFully; //set when the whole block is read
+ // set once entire requested byte range has been sent to the client
+ private boolean sentEntireByteRange;
private boolean verifyChecksum; //if true, check is verified while reading
private DataTransferThrottler throttler;
private final String clientTraceFmt; // format of client trace log message
@@ -493,6 +494,8 @@
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
+
+ sentEntireByteRange = true;
} finally {
if (clientTraceFmt != null) {
final long endTime = System.nanoTime();
@@ -501,12 +504,10 @@
close();
}
- blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength;
-
return totalRead;
}
- boolean isBlockReadFully() {
- return blockReadFully;
+ boolean didSendEntireByteRange() {
+ return sentEntireByteRange;
}
}
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index babbb53..c40db5f 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1831,15 +1831,21 @@
A "PACKET" is defined further below.
The client reads data until it receives a packet with
- "LastPacketInBlock" set to true or with a zero length. If there is
- no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
+ "LastPacketInBlock" set to true or with a zero length. It then replies
+ to DataNode with one of the status codes:
+ - CHECKSUM_OK: All the chunk checksums have been verified
+ - SUCCESS: Data received; checksums not verified
+ - ERROR_CHECKSUM: (Currently not used) Detected invalid checksums
+
+ +---------------+
+ | 2 byte Status |
+ +---------------+
- Client optional response at the end of data transmission of any length:
- +------------------------------+
- | 2 byte OP_STATUS_CHECKSUM_OK |
- +------------------------------+
- The DataNode always checks OP_STATUS_CHECKSUM_OK. It will close the
- client connection if it is absent.
+ The DataNode expects all well behaved clients to send the 2 byte
+ status code. And if the the client doesn't, the DN will close the
+ connection. So the status code is optional in the sense that it
+ does not affect the correctness of the data. (And the client can
+ always reconnect.)
PACKET : Contains a packet header, checksum and data. Amount of data
======== carried is set by BUFFER_SIZE.
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 89ef945..6ba7806 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -27,14 +27,18 @@
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -71,6 +75,7 @@
private final DataNode datanode;
private final DataXceiverServer dataXceiverServer;
+ private int socketKeepaliveTimeout;
private long opStartTime; //the start time of receiving an Op
public DataXceiver(Socket s, DataNode datanode,
@@ -83,6 +88,10 @@
remoteAddress = s.getRemoteSocketAddress().toString();
localAddress = s.getLocalSocketAddress().toString();
+ socketKeepaliveTimeout = datanode.getConf().getInt(
+ DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
+ DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
+
if (LOG.isDebugEnabled()) {
LOG.debug("Number of active connections is: "
+ datanode.getXceiverCount());
@@ -113,24 +122,60 @@
updateCurrentThreadName("Waiting for operation");
DataInputStream in=null;
+ int opsProcessed = 0;
try {
in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(s),
SMALL_BUFFER_SIZE));
- final DataTransferProtocol.Op op = readOp(in);
+ int stdTimeout = s.getSoTimeout();
- // Make sure the xciver count is not exceeded
- int curXceiverCount = datanode.getXceiverCount();
- if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
- throw new IOException("xceiverCount " + curXceiverCount
- + " exceeds the limit of concurrent xcievers "
- + dataXceiverServer.maxXceiverCount);
- }
+ // We process requests in a loop, and stay around for a short timeout.
+ // This optimistic behaviour allows the other end to reuse connections.
+ // Setting keepalive timeout to 0 disable this behavior.
+ do {
+ DataTransferProtocol.Op op;
+ try {
+ if (opsProcessed != 0) {
+ assert socketKeepaliveTimeout > 0;
+ s.setSoTimeout(socketKeepaliveTimeout);
+ }
+ op = readOp(in);
+ } catch (InterruptedIOException ignored) {
+ // Time out while we wait for client rpc
+ break;
+ } catch (IOException err) {
+ // Since we optimistically expect the next op, it's quite normal to get EOF here.
+ if (opsProcessed > 0 &&
+ (err instanceof EOFException || err instanceof ClosedChannelException)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops");
+ }
+ } else {
+ throw err;
+ }
+ break;
+ }
- opStartTime = now();
- processOp(op, in);
+ // restore normal timeout
+ if (opsProcessed != 0) {
+ s.setSoTimeout(stdTimeout);
+ }
+
+ // Make sure the xceiver count is not exceeded
+ int curXceiverCount = datanode.getXceiverCount();
+ if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
+ throw new IOException("xceiverCount " + curXceiverCount
+ + " exceeds the limit of concurrent xcievers "
+ + dataXceiverServer.maxXceiverCount);
+ }
+
+ opStartTime = now();
+ processOp(op, in);
+ ++opsProcessed;
+ } while (s.isConnected() && socketKeepaliveTimeout > 0);
} catch (Throwable t) {
- LOG.error(datanode.getMachineName() + ":DataXceiver",t);
+ LOG.error(datanode.getMachineName() + ":DataXceiver, at " +
+ s.toString(), t);
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getMachineName() + ":Number of active connections is: "
@@ -176,18 +221,36 @@
blockSender = new BlockSender(block, startOffset, length,
true, true, false, datanode, clientTraceFmt);
} catch(IOException e) {
- ERROR.write(out);
+ sendResponse(s, ERROR, datanode.socketWriteTimeout);
throw e;
}
SUCCESS.write(out); // send op status
long read = blockSender.sendBlock(out, baseStream, null); // send data
-
+
+ if (blockSender.didSendEntireByteRange()) {
+ // If we sent the entire range, then we should expect the client
+ // to respond with a Status enum.
+ try {
+ DataTransferProtocol.Status stat = DataTransferProtocol.Status.read(in);
+ if (stat == null) {
+ LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
+ "code after reading. Will close connection.");
+ IOUtils.closeStream(out);
+ }
+ } catch (IOException ioe) {
+ LOG.debug("Error reading client status response. Will close connection.", ioe);
+ IOUtils.closeStream(out);
+ }
+ } else {
+ IOUtils.closeStream(out);
+ }
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
} catch ( SocketException ignored ) {
// Its ok for remote side to close the connection anytime.
datanode.metrics.incrBlocksRead();
+ IOUtils.closeStream(out);
} catch ( IOException ioe ) {
/* What exactly should we do here?
* Earlier version shutdown() datanode if there is disk error.
@@ -198,7 +261,6 @@
StringUtils.stringifyException(ioe) );
throw ioe;
} finally {
- IOUtils.closeStream(out);
IOUtils.closeStream(blockSender);
}
@@ -690,12 +752,8 @@
long timeout) throws IOException {
DataOutputStream reply =
new DataOutputStream(NetUtils.getOutputStream(s, timeout));
- try {
- opStatus.write(reply);
- reply.flush();
- } finally {
- IOUtils.closeStream(reply);
- }
+ opStatus.write(reply);
+ reply.flush();
}
private void checkAccess(DataOutputStream out, final boolean reply,
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
index 74ef101..64e4998 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
@@ -73,28 +73,39 @@
/**
* Create a file of the given size filled with random data.
- * @return List of Blocks of the new file.
+ * @return File data.
*/
- public List<LocatedBlock> writeFile(Path filepath, int sizeKB)
+ public byte[] writeFile(Path filepath, int sizeKB)
throws IOException {
FileSystem fs = cluster.getFileSystem();
// Write a file with the specified amount of data
DataOutputStream os = fs.create(filepath);
- byte data[] = new byte[1024];
+ byte data[] = new byte[1024 * sizeKB];
new Random().nextBytes(data);
- for (int i = 0; i < sizeKB; i++) {
- os.write(data);
- }
+ os.write(data);
os.close();
+ return data;
+ }
+ /**
+ * Get the list of Blocks for a file.
+ */
+ public List<LocatedBlock> getFileBlocks(Path filepath, int sizeKB)
+ throws IOException {
// Return the blocks we just wrote
- DFSClient dfsclient = new DFSClient(
- new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
+ DFSClient dfsclient = getDFSClient();
return dfsclient.getNamenode().getBlockLocations(
filepath.toString(), 0, sizeKB * 1024).getLocatedBlocks();
}
+ /**
+ * Get the DFSClient.
+ */
+ public DFSClient getDFSClient() throws IOException {
+ InetSocketAddress nnAddr = new InetSocketAddress("localhost", cluster.getNameNodePort());
+ return new DFSClient(nnAddr, conf);
+ }
/**
* Exercise the BlockReader and read length bytes.
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
index ca9aed3..be6ac1c 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
@@ -20,6 +20,7 @@
import java.util.List;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.fs.Path;
@@ -41,23 +42,24 @@
public static void setupCluster() throws Exception {
final int REPLICATION_FACTOR = 1;
util = new BlockReaderTestUtil(REPLICATION_FACTOR);
- List<LocatedBlock> blkList = util.writeFile(TEST_FILE, FILE_SIZE_K);
+ util.writeFile(TEST_FILE, FILE_SIZE_K);
+ List<LocatedBlock> blkList = util.getFileBlocks(TEST_FILE, FILE_SIZE_K);
testBlock = blkList.get(0); // Use the first block to test
}
/**
- * Verify that if we read an entire block, we send checksumOk
+ * Verify that if we read an entire block, we send CHECKSUM_OK
*/
@Test
public void testBlockVerification() throws Exception {
BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
- verify(reader).checksumOk(reader.dnSock);
+ verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
reader.close();
}
/**
- * Test that if we do an incomplete read, we don't call checksumOk
+ * Test that if we do an incomplete read, we don't call CHECKSUM_OK
*/
@Test
public void testIncompleteRead() throws Exception {
@@ -65,14 +67,14 @@
util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
// We asked the blockreader for the whole file, and only read
- // half of it, so no checksumOk
- verify(reader, never()).checksumOk(reader.dnSock);
+ // half of it, so no CHECKSUM_OK
+ verify(reader, never()).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
reader.close();
}
/**
* Test that if we ask for a half block, and read it all, we *do*
- * call checksumOk. The DN takes care of knowing whether it was
+ * send CHECKSUM_OK. The DN takes care of knowing whether it was
* the whole block or not.
*/
@Test
@@ -81,7 +83,7 @@
BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
// And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
- verify(reader).checksumOk(reader.dnSock);
+ verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
reader.close();
}
@@ -99,7 +101,7 @@
" len=" + length);
BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true);
- verify(reader).checksumOk(reader.dnSock);
+ verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
reader.close();
}
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
new file mode 100644
index 0000000..311b724
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+
+import org.apache.hadoop.security.token.Token;
+import org.junit.Test;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import static org.junit.Assert.*;
+
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.mockito.invocation.InvocationOnMock;
+import static org.mockito.Mockito.spy;
+
+/**
+ * This class tests the client connection caching in a single node
+ * mini-cluster.
+ */
+public class TestConnCache {
+ static final Log LOG = LogFactory.getLog(TestConnCache.class);
+
+ static final int BLOCK_SIZE = 4096;
+ static final int FILE_SIZE = 3 * BLOCK_SIZE;
+
+ static Configuration conf = null;
+ static MiniDFSCluster cluster = null;
+ static FileSystem fs = null;
+
+ static final Path testFile = new Path("/testConnCache.dat");
+ static byte authenticData[] = null;
+
+ static BlockReaderTestUtil util = null;
+
+
+ /**
+ * A mock Answer to remember the BlockReader used.
+ *
+ * It verifies that all invocation to DFSInputStream.getBlockReader()
+ * use the same socket.
+ */
+ private class MockGetBlockReader implements Answer<BlockReader> {
+ public BlockReader reader = null;
+ private Socket sock = null;
+
+ public BlockReader answer(InvocationOnMock invocation) throws Throwable {
+ BlockReader prevReader = reader;
+ reader = (BlockReader) invocation.callRealMethod();
+ if (sock == null) {
+ sock = reader.dnSock;
+ } else if (prevReader != null && prevReader.hasSentStatusCode()) {
+ // Can't reuse socket if the previous BlockReader didn't read till EOS.
+ assertSame("DFSInputStream should use the same socket",
+ sock, reader.dnSock);
+ } return reader;
+ }
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ final int REPLICATION_FACTOR = 1;
+
+ util = new BlockReaderTestUtil(REPLICATION_FACTOR);
+ cluster = util.getCluster();
+ conf = util.getConf();
+ fs = cluster.getFileSystem();
+
+ authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
+ }
+
+
+ /**
+ * (Optionally) seek to position, read and verify data.
+ *
+ * Seek to specified position if pos is non-negative.
+ */
+ private void pread(DFSInputStream in,
+ long pos,
+ byte[] buffer,
+ int offset,
+ int length)
+ throws IOException {
+ assertTrue("Test buffer too small", buffer.length >= offset + length);
+
+ if (pos >= 0)
+ in.seek(pos);
+
+ LOG.info("Reading from file of size " + in.getFileLength() +
+ " at offset " + in.getPos());
+
+ while (length > 0) {
+ int cnt = in.read(buffer, offset, length);
+ assertTrue("Error in read", cnt > 0);
+ offset += cnt;
+ length -= cnt;
+ }
+
+ // Verify
+ for (int i = 0; i < length; ++i) {
+ byte actual = buffer[i];
+ byte expect = authenticData[(int)pos + i];
+ assertEquals("Read data mismatch at file offset " + (pos + i) +
+ ". Expects " + expect + "; got " + actual,
+ actual, expect);
+ }
+ }
+
+ /**
+ * Test the SocketCache itself.
+ */
+ @Test
+ public void testSocketCache() throws IOException {
+ final int CACHE_SIZE = 4;
+ SocketCache cache = new SocketCache(CACHE_SIZE);
+
+ // Make a client
+ InetSocketAddress nnAddr =
+ new InetSocketAddress("localhost", cluster.getNameNodePort());
+ DFSClient client = new DFSClient(nnAddr, conf);
+
+ // Find out the DN addr
+ LocatedBlock block =
+ client.getNamenode().getBlockLocations(
+ testFile.toString(), 0, FILE_SIZE)
+ .getLocatedBlocks().get(0);
+ DataNode dn = util.getDataNode(block);
+ InetSocketAddress dnAddr = dn.getSelfAddr();
+
+ // Make some sockets to the DN
+ Socket[] dnSockets = new Socket[CACHE_SIZE];
+ for (int i = 0; i < dnSockets.length; ++i) {
+ dnSockets[i] = client.socketFactory.createSocket(
+ dnAddr.getAddress(), dnAddr.getPort());
+ }
+
+ // Insert a socket to the NN
+ Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
+ cache.put(nnSock);
+ assertSame("Read the write", nnSock, cache.get(nnAddr));
+ cache.put(nnSock);
+
+ // Insert DN socks
+ for (Socket dnSock : dnSockets) {
+ cache.put(dnSock);
+ }
+
+ assertEquals("NN socket evicted", null, cache.get(nnAddr));
+ assertTrue("Evicted socket closed", nnSock.isClosed());
+
+ // Lookup the DN socks
+ for (Socket dnSock : dnSockets) {
+ assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr));
+ dnSock.close();
+ }
+
+ assertEquals("Cache is empty", 0, cache.size());
+ }
+
+ /**
+ * Read a file served entirely from one DN. Seek around and read from
+ * different offsets. And verify that they all use the same socket.
+ *
+ * @throws java.io.IOException
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testReadFromOneDN() throws IOException {
+ LOG.info("Starting testReadFromOneDN()");
+ DFSClient client = new DFSClient(
+ new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
+ DFSInputStream in = spy(client.open(testFile.toString()));
+ LOG.info("opened " + testFile.toString());
+
+ byte[] dataBuf = new byte[BLOCK_SIZE];
+
+ MockGetBlockReader answer = new MockGetBlockReader();
+ Mockito.doAnswer(answer).when(in).getBlockReader(
+ (InetSocketAddress) Matchers.anyObject(),
+ Matchers.anyString(),
+ (ExtendedBlock) Matchers.anyObject(),
+ (Token<BlockTokenIdentifier>) Matchers.anyObject(),
+ Matchers.anyLong(),
+ Matchers.anyLong(),
+ Matchers.anyInt(),
+ Matchers.anyBoolean(),
+ Matchers.anyString());
+
+ // Initial read
+ pread(in, 0, dataBuf, 0, dataBuf.length);
+ // Read again and verify that the socket is the same
+ pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length);
+ pread(in, 1024, dataBuf, 0, dataBuf.length);
+ pread(in, -1, dataBuf, 0, dataBuf.length); // No seek; just read
+ pread(in, 64, dataBuf, 0, dataBuf.length / 2);
+
+ in.close();
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ util.shutdown();
+ }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
index 70269b6..05fa648 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
@@ -384,6 +384,8 @@
conf.setInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
retries);
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWin);
+ // Disable keepalive
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, 0);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(replicationFactor).build();
cluster.waitActive();
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java
new file mode 100644
index 0000000..eb37422
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+import org.junit.Test;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import static org.junit.Assert.*;
+
+/**
+ * Test the use of DFSInputStream by multiple concurrent readers.
+ */
+public class TestParallelRead {
+
+ static final Log LOG = LogFactory.getLog(TestParallelRead.class);
+ static BlockReaderTestUtil util = null;
+ static DFSClient dfsClient = null;
+ static final int FILE_SIZE_K = 256;
+ static Random rand = null;
+
+ static {
+ // The client-trace log ends up causing a lot of blocking threads
+ // in this when it's being used as a performance benchmark.
+ LogManager.getLogger(DataNode.class.getName() + ".clienttrace")
+ .setLevel(Level.WARN);
+ }
+
+ private class TestFileInfo {
+ public DFSInputStream dis;
+ public Path filepath;
+ public byte[] authenticData;
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ final int REPLICATION_FACTOR = 2;
+ util = new BlockReaderTestUtil(REPLICATION_FACTOR);
+ dfsClient = util.getDFSClient();
+ rand = new Random(System.currentTimeMillis());
+ }
+
+ /**
+ * A worker to do one "unit" of read.
+ */
+ static class ReadWorker extends Thread {
+ static public final int N_ITERATIONS = 1024;
+
+ private static final double PROPORTION_NON_POSITIONAL_READ = 0.10;
+
+ private TestFileInfo testInfo;
+ private long fileSize;
+ private long bytesRead;
+ private boolean error;
+
+ ReadWorker(TestFileInfo testInfo, int id) {
+ super("ReadWorker-" + id + "-" + testInfo.filepath.toString());
+ this.testInfo = testInfo;
+ fileSize = testInfo.dis.getFileLength();
+ assertEquals(fileSize, testInfo.authenticData.length);
+ bytesRead = 0;
+ error = false;
+ }
+
+ /**
+ * Randomly do one of (1) Small read; and (2) Large Pread.
+ */
+ @Override
+ public void run() {
+ for (int i = 0; i < N_ITERATIONS; ++i) {
+ int startOff = rand.nextInt((int) fileSize);
+ int len = 0;
+ try {
+ double p = rand.nextDouble();
+ if (p < PROPORTION_NON_POSITIONAL_READ) {
+ // Do a small regular read. Very likely this will leave unread
+ // data on the socket and make the socket uncacheable.
+ len = Math.min(rand.nextInt(64), (int) fileSize - startOff);
+ read(startOff, len);
+ bytesRead += len;
+ } else {
+ // Do a positional read most of the time.
+ len = rand.nextInt((int) (fileSize - startOff));
+ pRead(startOff, len);
+ bytesRead += len;
+ }
+ } catch (Exception ex) {
+ LOG.error(getName() + ": Error while testing read at " + startOff +
+ " length " + len);
+ error = true;
+ fail(ex.getMessage());
+ }
+ }
+ }
+
+ public long getBytesRead() {
+ return bytesRead;
+ }
+
+ /**
+ * Raising error in a thread doesn't seem to fail the test.
+ * So check afterwards.
+ */
+ public boolean hasError() {
+ return error;
+ }
+
+ /**
+ * Seek to somewhere random and read.
+ */
+ private void read(int start, int len) throws Exception {
+ assertTrue(
+ "Bad args: " + start + " + " + len + " should be < " + fileSize,
+ start + len < fileSize);
+ DFSInputStream dis = testInfo.dis;
+
+ synchronized (dis) {
+ dis.seek(start);
+
+ byte buf[] = new byte[len];
+ int cnt = 0;
+ while (cnt < len) {
+ cnt += dis.read(buf, cnt, buf.length - cnt);
+ }
+ verifyData("Read data corrupted", buf, start, start + len);
+ }
+ }
+
+ /**
+ * Positional read.
+ */
+ private void pRead(int start, int len) throws Exception {
+ assertTrue(
+ "Bad args: " + start + " + " + len + " should be < " + fileSize,
+ start + len < fileSize);
+ DFSInputStream dis = testInfo.dis;
+
+ byte buf[] = new byte[len];
+ int cnt = 0;
+ while (cnt < len) {
+ cnt += dis.read(start, buf, cnt, buf.length - cnt);
+ }
+ verifyData("Pread data corrupted", buf, start, start + len);
+ }
+
+ /**
+ * Verify read data vs authentic data
+ */
+ private void verifyData(String msg, byte actual[], int start, int end)
+ throws Exception {
+ byte auth[] = testInfo.authenticData;
+ if (end > auth.length) {
+ throw new Exception(msg + ": Actual array (" + end +
+ ") is past the end of authentic data (" +
+ auth.length + ")");
+ }
+
+ int j = start;
+ for (int i = 0; i < actual.length; ++i, ++j) {
+ if (auth[j] != actual[i]) {
+ throw new Exception(msg + ": Arrays byte " + i + " (at offset " +
+ j + ") differs: expect " +
+ auth[j] + " got " + actual[i]);
+ }
+ }
+ }
+ }
+
+ /**
+ * Do parallel read several times with different number of files and threads.
+ *
+ * Note that while this is the only "test" in a junit sense, we're actually
+ * dispatching a lot more. Failures in the other methods (and other threads)
+ * need to be manually collected, which is inconvenient.
+ */
+ @Test
+ public void testParallelRead() throws IOException {
+ if (!runParallelRead(1, 4)) {
+ fail("Check log for errors");
+ }
+ if (!runParallelRead(1, 16)) {
+ fail("Check log for errors");
+ }
+ if (!runParallelRead(2, 4)) {
+ fail("Check log for errors");
+ }
+ }
+
+ /**
+ * Start the parallel read with the given parameters.
+ */
+ boolean runParallelRead(int nFiles, int nWorkerEach) throws IOException {
+ ReadWorker workers[] = new ReadWorker[nFiles * nWorkerEach];
+ TestFileInfo testInfoArr[] = new TestFileInfo[nFiles];
+
+ // Prepare the files and workers
+ int nWorkers = 0;
+ for (int i = 0; i < nFiles; ++i) {
+ TestFileInfo testInfo = new TestFileInfo();
+ testInfoArr[i] = testInfo;
+
+ testInfo.filepath = new Path("/TestParallelRead.dat." + i);
+ testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
+ testInfo.dis = dfsClient.open(testInfo.filepath.toString());
+
+ for (int j = 0; j < nWorkerEach; ++j) {
+ workers[nWorkers++] = new ReadWorker(testInfo, nWorkers);
+ }
+ }
+
+ // Start the workers and wait
+ long starttime = System.currentTimeMillis();
+ for (ReadWorker worker : workers) {
+ worker.start();
+ }
+
+ for (ReadWorker worker : workers) {
+ try {
+ worker.join();
+ } catch (InterruptedException ignored) { }
+ }
+ long endtime = System.currentTimeMillis();
+
+ // Cleanup
+ for (TestFileInfo testInfo : testInfoArr) {
+ testInfo.dis.close();
+ }
+
+ // Report
+ boolean res = true;
+ long totalRead = 0;
+ for (ReadWorker worker : workers) {
+ long nread = worker.getBytesRead();
+ LOG.info("--- Report: " + worker.getName() + " read " + nread + " B; " +
+ "average " + nread / ReadWorker.N_ITERATIONS + " B per read");
+ totalRead += nread;
+ if (worker.hasError()) {
+ res = false;
+ }
+ }
+
+ double timeTakenSec = (endtime - starttime) / 1000.0;
+ long totalReadKB = totalRead / 1024;
+ LOG.info("=== Report: " + nWorkers + " threads read " +
+ totalReadKB + " KB (across " +
+ nFiles + " file(s)) in " +
+ timeTakenSec + "s; average " +
+ totalReadKB / timeTakenSec + " KB/s");
+
+ return res;
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ util.shutdown();
+ }
+
+}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
index 0ede253..7f4eb0f 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
@@ -23,6 +23,7 @@
import junit.framework.TestCase;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,6 +38,11 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.apache.log4j.Level;
+
+import org.apache.commons.logging.LogFactory;
+
import static org.apache.hadoop.test.MetricsAsserts.*;
/**
@@ -59,6 +65,9 @@
DFS_REPLICATION_INTERVAL);
CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFS_REPLICATION_INTERVAL);
+
+ ((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
+ .getLogger().setLevel(Level.DEBUG);
}
private MiniDFSCluster cluster;
@@ -255,9 +264,5 @@
readFile(fs, file1_Path);
updateMetrics();
assertCounter("GetBlockLocations", 3L, getMetrics(NN_METRICS));
-
- // Verify total load metrics, total load = Data Node started.
- updateMetrics();
- assertGauge("TotalLoad" ,DATANODE_COUNT, getMetrics(NS_METRICS));
}
}