HDFS-941. The DFS client should cache and reuse open sockets to datanodes while performing reads. Contributed by bc Wong and Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1134023 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index b626062..535d5b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -503,6 +503,9 @@
     HDFS-1826. NameNode should save image to name directories in parallel
     during upgrade. (Matt Foley via hairong)
 
+    HDFS-941. The DFS client should cache and reuse open sockets to datanodes
+    while performing reads. (bc Wong and Todd Lipcon via todd)
+
   BUG FIXES
 
     HDFS-1449. Fix test failures - ExtendedBlock must return 
diff --git a/src/java/org/apache/hadoop/hdfs/BlockReader.java b/src/java/org/apache/hadoop/hdfs/BlockReader.java
index 5ff696d..c7578eb 100644
--- a/src/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/src/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -46,12 +46,29 @@
 import org.apache.hadoop.util.DataChecksum;
 
 /** This is a wrapper around connection to datanode
- * and understands checksum, offset etc
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ *   <dd>The hdfs block, typically large (~64MB).
+ *   </dd>
+ * <dt>chunk</dt>
+ *   <dd>A block is divided into chunks, each comes with a checksum.
+ *       We want transfers to be chunk-aligned, to be able to
+ *       verify checksums.
+ *   </dd>
+ * <dt>packet</dt>
+ *   <dd>A grouping of chunks used for transport. It contains a
+ *       header, followed by checksum data, followed by real data.
+ *   </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
  */
 @InterfaceAudience.Private
 public class BlockReader extends FSInputChecker {
 
-  Socket dnSock; //for now just sending checksumOk.
+  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
   private DataInputStream in;
   private DataChecksum checksum;
 
@@ -77,10 +94,12 @@
    */
   private final long bytesNeededToFinish;
 
-  private boolean gotEOS = false;
+  private boolean eos = false;
+  private boolean sentStatusCode = false;
   
   byte[] skipBuf = null;
   ByteBuffer checksumBytes = null;
+  /** Amount of unread data in the current received packet */
   int dataLeft = 0;
   
   /* FSInputChecker interface */
@@ -99,7 +118,7 @@
     // This has to be set here, *before* the skip, since we can
     // hit EOS during the skip, in the case that our entire read
     // is smaller than the checksum chunk.
-    boolean eosBefore = gotEOS;
+    boolean eosBefore = eos;
 
     //for the first read, skip the extra bytes at the front.
     if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
@@ -115,11 +134,14 @@
     }
     
     int nRead = super.read(buf, off, len);
-    
-    // if gotEOS was set in the previous read and checksum is enabled :
-    if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) {
-      //checksum is verified and there are no errors.
-      checksumOk(dnSock);
+
+    // if eos was set in the previous read, send a status code to the DN
+    if (eos && !eosBefore && nRead >= 0) {
+      if (needChecksum()) {
+        sendReadResult(dnSock, CHECKSUM_OK);
+      } else {
+        sendReadResult(dnSock, SUCCESS);
+      }
     }
     return nRead;
   }
@@ -191,7 +213,7 @@
                                        int len, byte[] checksumBuf) 
                                        throws IOException {
     // Read one chunk.
-    if ( gotEOS ) {
+    if (eos) {
       // Already hit EOF
       return -1;
     }
@@ -246,7 +268,7 @@
 
     if (checksumSize > 0) {
 
-      // How many chunks left in our stream - this is a ceiling
+      // How many chunks left in our packet - this is a ceiling
       // since we may have a partial chunk at the end of the file
       int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
 
@@ -307,7 +329,7 @@
                               ", dataLen : " + dataLen);
       }
 
-      gotEOS = true;
+      eos = true;
     }
 
     if ( bytesToRead == 0 ) {
@@ -335,7 +357,7 @@
     // The total number of bytes that we need to transfer from the DN is
     // the amount that the user wants (bytesToRead), plus the padding at
     // the beginning in order to chunk-align. Note that the DN may elect
-    // to send more than this amount if the read ends mid-chunk.
+    // to send more than this amount if the read starts/ends mid-chunk.
     this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
 
     this.firstChunkOffset = firstChunkOffset;
@@ -364,6 +386,21 @@
                           len, bufferSize, verifyChecksum, "");
   }
 
+  /**
+   * Create a new BlockReader specifically to satisfy a read.
+   * This method also sends the OP_READ_BLOCK request.
+   *
+   * @param sock  An established Socket to the DN. The BlockReader will not close it normally
+   * @param file  File location
+   * @param block  The block object
+   * @param blockToken  The block token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param bufferSize  The IO buffer size (not the client buffer size)
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @return New BlockReader instance, or null on error.
+   */
   public static BlockReader newBlockReader( Socket sock, String file,
                                      ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
@@ -423,6 +460,10 @@
   public synchronized void close() throws IOException {
     startOffset = -1;
     checksum = null;
+    if (dnSock != null) {
+      dnSock.close();
+    }
+
     // in will be closed when its Socket is closed.
   }
   
@@ -432,22 +473,43 @@
   public int readAll(byte[] buf, int offset, int len) throws IOException {
     return readFully(this, buf, offset, len);
   }
-  
-  /* When the reader reaches end of the read and there are no checksum
-   * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that 
-   * checksum was verified and there was no error.
-   */ 
-  void checksumOk(Socket sock) {
+
+  /**
+   * Take the socket used to talk to the DN.
+   */
+  public Socket takeSocket() {
+    assert hasSentStatusCode() :
+      "BlockReader shouldn't give back sockets mid-read";
+    Socket res = dnSock;
+    dnSock = null;
+    return res;
+  }
+
+  /**
+   * Whether the BlockReader has reached the end of its input stream
+   * and successfully sent a status code back to the datanode.
+   */
+  public boolean hasSentStatusCode() {
+    return sentStatusCode;
+  }
+
+  /**
+   * When the reader reaches end of the read, it sends a status response
+   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+   * closing our connection (which we will re-open), but won't affect
+   * data correctness.
+   */
+  void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + sock;
     try {
       OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
-      CHECKSUM_OK.writeOutputStream(out);
+      statusCode.writeOutputStream(out);
       out.flush();
+      sentStatusCode = true;
     } catch (IOException e) {
-      // its ok not to be able to send this.
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Could not write to datanode " + sock.getInetAddress() +
-                  ": " + e.getMessage());
-      }
+      // It's ok not to be able to send this. But something is probably wrong.
+      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+               sock.getInetAddress() + ": " + e.getMessage());
     }
   }
   
diff --git a/src/java/org/apache/hadoop/hdfs/DFSClient.java b/src/java/org/apache/hadoop/hdfs/DFSClient.java
index 17a9829..9ae968b 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -136,6 +136,8 @@
   final int hdfsTimeout;    // timeout value for a DFS operation.
   final LeaseRenewer leaserenewer;
 
+  final SocketCache socketCache;
+  
   /**
    * A map from file names to {@link DFSOutputStream} objects
    * that are currently being written by this client.
@@ -279,6 +281,10 @@
     defaultReplication = (short) 
       conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 
                   DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+    
+    this.socketCache = new SocketCache(
+        conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
+            DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT));
 
     if (nameNodeAddr != null && rpcNamenode == null) {
       this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
diff --git a/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8d51fab..94fbd9a 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -44,6 +44,8 @@
   public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
   public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
   public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
+  public static final String  DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
+  public static final int     DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
   
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
@@ -83,6 +85,8 @@
   public static final String  DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml";
   public static final String  DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth";
   public static final boolean DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false;
+  public static final String  DFS_CLIENT_CACHED_CONN_RETRY_KEY = "dfs.client.cached.conn.retry";
+  public static final int     DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT = 3;
   public static final String  DFS_NAMENODE_ACCESSTIME_PRECISION_KEY = "dfs.namenode.accesstime.precision";
   public static final long    DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT = 3600000;
   public static final String  DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY = "dfs.namenode.replication.considerLoad";
@@ -112,6 +116,8 @@
   public static final int     DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
   public static final String  DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
   public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
+  public static final String  DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
+  public static final int     DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
 
   //Delegation token related keys
   public static final String  DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY = "dfs.namenode.delegation.key.update-interval";
diff --git a/src/java/org/apache/hadoop/hdfs/DFSInputStream.java b/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 0051cc8..49707ae 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -43,7 +43,6 @@
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
@@ -56,8 +55,9 @@
  ****************************************************************/
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream {
+  private final SocketCache socketCache;
+
   private final DFSClient dfsClient;
-  private Socket s = null;
   private boolean closed = false;
 
   private final String src;
@@ -92,7 +92,9 @@
   private int buffersize = 1;
   
   private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
-  
+
+  private int nCachedConnRetry;
+
   void addToDeadNodes(DatanodeInfo dnInfo) {
     deadNodes.put(dnInfo, dnInfo);
   }
@@ -103,9 +105,14 @@
     this.verifyChecksum = verifyChecksum;
     this.buffersize = buffersize;
     this.src = src;
+    this.socketCache = dfsClient.socketCache;
     prefetchSize = this.dfsClient.conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
         10 * dfsClient.defaultBlockSize);
-    timeWindow = this.dfsClient.conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
+    timeWindow = this.dfsClient.conf.getInt(
+        DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
+    nCachedConnRetry = this.dfsClient.conf.getInt(
+        DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
+        DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
     openInfo();
   }
 
@@ -371,15 +378,11 @@
       throw new IOException("Attempted to read past end of file");
     }
 
-    if ( blockReader != null ) {
-      blockReader.close(); 
+    // Will be getting a new BlockReader.
+    if (blockReader != null) {
+      closeBlockReader(blockReader);
       blockReader = null;
     }
-    
-    if (s != null) {
-      s.close();
-      s = null;
-    }
 
     //
     // Connect to best DataNode for desired Block, with potential offset
@@ -400,14 +403,12 @@
       InetSocketAddress targetAddr = retval.addr;
 
       try {
-        s = dfsClient.socketFactory.createSocket();
-        NetUtils.connect(s, targetAddr, dfsClient.socketTimeout);
-        s.setSoTimeout(dfsClient.socketTimeout);
         ExtendedBlock blk = targetBlock.getBlock();
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
         
-        blockReader = BlockReader.newBlockReader(s, src, blk,
-            accessToken, 
+        blockReader = getBlockReader(
+            targetAddr, src, blk,
+            accessToken,
             offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
             buffersize, verifyChecksum, dfsClient.clientName);
         return chosenNode;
@@ -437,13 +438,6 @@
           // Put chosen node into dead list, continue
           addToDeadNodes(chosenNode);
         }
-        if (s != null) {
-          try {
-            s.close();
-          } catch (IOException iex) {
-          }                        
-        }
-        s = null;
       }
     }
   }
@@ -457,16 +451,11 @@
       return;
     }
     dfsClient.checkOpen();
-    
-    if ( blockReader != null ) {
-      blockReader.close();
+
+    if (blockReader != null) {
+      closeBlockReader(blockReader);
       blockReader = null;
     }
-    
-    if (s != null) {
-      s.close();
-      s = null;
-    }
     super.close();
     closed = true;
   }
@@ -479,7 +468,7 @@
 
   /* This is a used by regular read() and handles ChecksumExceptions.
    * name readBuffer() is chosen to imply similarity to readBuffer() in
-   * ChecksuFileSystem
+   * ChecksumFileSystem
    */ 
   private synchronized int readBuffer(byte buf[], int off, int len,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
@@ -659,7 +648,6 @@
     //
     // Connect to best DataNode for desired Block, with potential offset
     //
-    Socket dn = null;
     int refetchToken = 1; // only need to get a new access token once
     
     while (true) {
@@ -673,18 +661,15 @@
       BlockReader reader = null;
           
       try {
-        dn = dfsClient.socketFactory.createSocket();
-        NetUtils.connect(dn, targetAddr, dfsClient.socketTimeout);
-        dn.setSoTimeout(dfsClient.socketTimeout);
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
             
         int len = (int) (end - start + 1);
-            
-        reader = BlockReader.newBlockReader(dn, src, 
-                                            block.getBlock(),
-                                            blockToken,
-                                            start, len, buffersize, 
-                                            verifyChecksum, dfsClient.clientName);
+
+        reader = getBlockReader(targetAddr, src,
+                                block.getBlock(),
+                                blockToken,
+                                start, len, buffersize,
+                                verifyChecksum, dfsClient.clientName);
         int nread = reader.readAll(buf, offset, len);
         if (nread != len) {
           throw new IOException("truncated return from reader.read(): " +
@@ -713,8 +698,9 @@
           }
         }
       } finally {
-        IOUtils.closeStream(reader);
-        IOUtils.closeSocket(dn);
+        if (reader != null) {
+          closeBlockReader(reader);
+        }
       }
       // Put chosen node into dead list, continue
       addToDeadNodes(chosenNode);
@@ -722,6 +708,95 @@
   }
 
   /**
+   * Close the given BlockReader and cache its socket.
+   */
+  private void closeBlockReader(BlockReader reader) throws IOException {
+    if (reader.hasSentStatusCode()) {
+      Socket oldSock = reader.takeSocket();
+      socketCache.put(oldSock);
+    }
+    reader.close();
+  }
+
+  /**
+   * Retrieve a BlockReader suitable for reading.
+   * This method will reuse the cached connection to the DN if appropriate.
+   * Otherwise, it will create a new connection.
+   *
+   * @param dnAddr  Address of the datanode
+   * @param file  File location
+   * @param block  The Block object
+   * @param blockToken  The access token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param bufferSize  The IO buffer size (not the client buffer size)
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @return New BlockReader instance
+   */
+  protected BlockReader getBlockReader(InetSocketAddress dnAddr,
+                                       String file,
+                                       ExtendedBlock block,
+                                       Token<BlockTokenIdentifier> blockToken,
+                                       long startOffset,
+                                       long len,
+                                       int bufferSize,
+                                       boolean verifyChecksum,
+                                       String clientName)
+      throws IOException {
+    IOException err = null;
+    boolean fromCache = true;
+
+    // Allow retry since there is no way of knowing whether the cached socket
+    // is good until we actually use it.
+    for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
+      Socket sock = socketCache.get(dnAddr);
+      if (sock == null) {
+        fromCache = false;
+
+        sock = dfsClient.socketFactory.createSocket();
+        
+        // TCP_NODELAY is crucial here because of bad interactions between
+        // Nagle's Algorithm and Delayed ACKs. With connection keepalive
+        // between the client and DN, the conversation looks like:
+        //   1. Client -> DN: Read block X
+        //   2. DN -> Client: data for block X
+        //   3. Client -> DN: Status OK (successful read)
+        //   4. Client -> DN: Read block Y
+        // The fact that step #3 and #4 are both in the client->DN direction
+        // triggers Nagling. If the DN is using delayed ACKs, this results
+        // in a delay of 40ms or more.
+        //
+        // TCP_NODELAY disables nagling and thus avoids this performance
+        // disaster.
+        sock.setTcpNoDelay(true);
+
+        NetUtils.connect(sock, dnAddr, dfsClient.socketTimeout);
+        sock.setSoTimeout(dfsClient.socketTimeout);
+      }
+
+      try {
+        // The OP_READ_BLOCK request is sent as we make the BlockReader
+        BlockReader reader =
+            BlockReader.newBlockReader(sock, file, block,
+                                       blockToken,
+                                       startOffset, len,
+                                       bufferSize, verifyChecksum,
+                                       clientName);
+        return reader;
+      } catch (IOException ex) {
+        // Our socket is no good.
+        DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
+        sock.close();
+        err = ex;
+      }
+    }
+
+    throw err;
+  }
+
+
+  /**
    * Read bytes starting from the specified position.
    * 
    * @param position start read from this position
diff --git a/src/java/org/apache/hadoop/hdfs/SocketCache.java b/src/java/org/apache/hadoop/hdfs/SocketCache.java
new file mode 100644
index 0000000..508ec61
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/SocketCache.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.net.Socket;
+import java.net.SocketAddress;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * A cache of sockets.
+ */
+class SocketCache {
+  static final Log LOG = LogFactory.getLog(SocketCache.class);
+
+  private final LinkedListMultimap<SocketAddress, Socket> multimap;
+  private final int capacity;
+
+  /**
+   * Create a SocketCache with the given capacity.
+   * @param capacity  Max cache size.
+   */
+  public SocketCache(int capacity) {
+    multimap = LinkedListMultimap.create();
+    this.capacity = capacity;
+  }
+
+  /**
+   * Get a cached socket to the given address.
+   * @param remote  Remote address the socket is connected to.
+   * @return  A socket with unknown state, possibly closed underneath. Or null.
+   */
+  public synchronized Socket get(SocketAddress remote) {
+    List<Socket> socklist = multimap.get(remote);
+    if (socklist == null) {
+      return null;
+    }
+
+    Iterator<Socket> iter = socklist.iterator();
+    while (iter.hasNext()) {
+      Socket candidate = iter.next();
+      iter.remove();
+      if (!candidate.isClosed()) {
+        return candidate;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Give an unused socket to the cache.
+   * @param sock socket not used by anyone.
+   */
+  public synchronized void put(Socket sock) {
+    Preconditions.checkNotNull(sock);
+
+    SocketAddress remoteAddr = sock.getRemoteSocketAddress();
+    if (remoteAddr == null) {
+      LOG.warn("Cannot cache (unconnected) socket with no remote address: " +
+               sock);
+      IOUtils.closeSocket(sock);
+      return;
+    }
+
+    if (capacity == multimap.size()) {
+      evictOldest();
+    }
+    multimap.put(remoteAddr, sock);
+  }
+
+  public synchronized int size() {
+    return multimap.size();
+  }
+
+  /**
+   * Evict the oldest entry in the cache.
+   */
+  private synchronized void evictOldest() {
+    Iterator<Entry<SocketAddress, Socket>> iter =
+      multimap.entries().iterator();
+    if (!iter.hasNext()) {
+      throw new IllegalStateException("Cannot evict from empty cache!");
+    }
+    Entry<SocketAddress, Socket> entry = iter.next();
+    iter.remove();
+    Socket sock = entry.getValue();
+    IOUtils.closeSocket(sock);
+  }
+
+  /**
+   * Empty the cache, and close all sockets.
+   */
+  public synchronized void clear() {
+    for (Socket sock : multimap.values()) {
+      IOUtils.closeSocket(sock);
+    }
+    multimap.clear();
+  }
+
+  protected void finalize() {
+    clear();
+  }
+
+}
diff --git a/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java b/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
index bcb0786..94a86c4 100644
--- a/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
+++ b/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
@@ -83,6 +83,7 @@
   public static int READ_TIMEOUT_EXTENSION = 5 * 1000;
   public static int WRITE_TIMEOUT = 8 * 60 * 1000;
   public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
+  public static int DN_KEEPALIVE_TIMEOUT = 5 * 1000;
 
   /**
    * Defines the NameNode role.
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 742486f..2bc8086 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -69,7 +69,8 @@
   private long seqno; // sequence number of packet
 
   private boolean transferToAllowed = true;
-  private boolean blockReadFully; //set when the whole block is read
+  // set once entire requested byte range has been sent to the client
+  private boolean sentEntireByteRange;
   private boolean verifyChecksum; //if true, check is verified while reading
   private DataTransferThrottler throttler;
   private final String clientTraceFmt; // format of client trace log message
@@ -493,6 +494,8 @@
       } catch (IOException e) { //socket error
         throw ioeToSocketException(e);
       }
+
+      sentEntireByteRange = true;
     } finally {
       if (clientTraceFmt != null) {
         final long endTime = System.nanoTime();
@@ -501,12 +504,10 @@
       close();
     }
 
-    blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength;
-
     return totalRead;
   }
   
-  boolean isBlockReadFully() {
-    return blockReadFully;
+  boolean didSendEntireByteRange() {
+    return sentEntireByteRange;
   }
 }
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index babbb53..c40db5f 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1831,15 +1831,21 @@
     A "PACKET" is defined further below.
     
     The client reads data until it receives a packet with 
-    "LastPacketInBlock" set to true or with a zero length. If there is 
-    no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
+    "LastPacketInBlock" set to true or with a zero length. It then replies
+    to DataNode with one of the status codes:
+    - CHECKSUM_OK:    All the chunk checksums have been verified
+    - SUCCESS:        Data received; checksums not verified
+    - ERROR_CHECKSUM: (Currently not used) Detected invalid checksums
+
+      +---------------+
+      | 2 byte Status |
+      +---------------+
     
-    Client optional response at the end of data transmission of any length:
-      +------------------------------+
-      | 2 byte OP_STATUS_CHECKSUM_OK |
-      +------------------------------+
-    The DataNode always checks OP_STATUS_CHECKSUM_OK. It will close the
-    client connection if it is absent.
+    The DataNode expects all well behaved clients to send the 2 byte
+    status code. And if the the client doesn't, the DN will close the
+    connection. So the status code is optional in the sense that it
+    does not affect the correctness of the data. (And the client can
+    always reconnect.)
     
     PACKET : Contains a packet header, checksum and data. Amount of data
     ======== carried is set by BUFFER_SIZE.
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 89ef945..6ba7806 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -27,14 +27,18 @@
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -71,6 +75,7 @@
   private final DataNode datanode;
   private final DataXceiverServer dataXceiverServer;
 
+  private int socketKeepaliveTimeout;
   private long opStartTime; //the start time of receiving an Op
   
   public DataXceiver(Socket s, DataNode datanode, 
@@ -83,6 +88,10 @@
     remoteAddress = s.getRemoteSocketAddress().toString();
     localAddress = s.getLocalSocketAddress().toString();
 
+    socketKeepaliveTimeout = datanode.getConf().getInt(
+        DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
+        DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Number of active connections is: "
           + datanode.getXceiverCount());
@@ -113,24 +122,60 @@
     updateCurrentThreadName("Waiting for operation");
 
     DataInputStream in=null; 
+    int opsProcessed = 0;
     try {
       in = new DataInputStream(
           new BufferedInputStream(NetUtils.getInputStream(s), 
                                   SMALL_BUFFER_SIZE));
-      final DataTransferProtocol.Op op = readOp(in);
+      int stdTimeout = s.getSoTimeout();
 
-      // Make sure the xciver count is not exceeded
-      int curXceiverCount = datanode.getXceiverCount();
-      if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
-        throw new IOException("xceiverCount " + curXceiverCount
-                              + " exceeds the limit of concurrent xcievers "
-                              + dataXceiverServer.maxXceiverCount);
-      }
+      // We process requests in a loop, and stay around for a short timeout.
+      // This optimistic behaviour allows the other end to reuse connections.
+      // Setting keepalive timeout to 0 disable this behavior.
+      do {
+        DataTransferProtocol.Op op;
+        try {
+          if (opsProcessed != 0) {
+            assert socketKeepaliveTimeout > 0;
+            s.setSoTimeout(socketKeepaliveTimeout);
+          }
+          op = readOp(in);
+        } catch (InterruptedIOException ignored) {
+          // Time out while we wait for client rpc
+          break;
+        } catch (IOException err) {
+          // Since we optimistically expect the next op, it's quite normal to get EOF here.
+          if (opsProcessed > 0 &&
+              (err instanceof EOFException || err instanceof ClosedChannelException)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops");
+            }
+          } else {
+            throw err;
+          }
+          break;
+        }
 
-      opStartTime = now();
-      processOp(op, in);
+        // restore normal timeout
+        if (opsProcessed != 0) {
+          s.setSoTimeout(stdTimeout);
+        }
+
+        // Make sure the xceiver count is not exceeded
+        int curXceiverCount = datanode.getXceiverCount();
+        if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
+          throw new IOException("xceiverCount " + curXceiverCount
+                                + " exceeds the limit of concurrent xcievers "
+                                + dataXceiverServer.maxXceiverCount);
+        }
+
+        opStartTime = now();
+        processOp(op, in);
+        ++opsProcessed;
+      } while (s.isConnected() && socketKeepaliveTimeout > 0);
     } catch (Throwable t) {
-      LOG.error(datanode.getMachineName() + ":DataXceiver",t);
+      LOG.error(datanode.getMachineName() + ":DataXceiver, at " +
+        s.toString(), t);
     } finally {
       if (LOG.isDebugEnabled()) {
         LOG.debug(datanode.getMachineName() + ":Number of active connections is: "
@@ -176,18 +221,36 @@
         blockSender = new BlockSender(block, startOffset, length,
             true, true, false, datanode, clientTraceFmt);
       } catch(IOException e) {
-        ERROR.write(out);
+        sendResponse(s, ERROR, datanode.socketWriteTimeout);
         throw e;
       }
 
       SUCCESS.write(out); // send op status
       long read = blockSender.sendBlock(out, baseStream, null); // send data
-      
+
+      if (blockSender.didSendEntireByteRange()) {
+        // If we sent the entire range, then we should expect the client
+        // to respond with a Status enum.
+        try {
+          DataTransferProtocol.Status stat = DataTransferProtocol.Status.read(in);
+          if (stat == null) {
+            LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
+                     "code after reading. Will close connection.");
+            IOUtils.closeStream(out);
+          }
+        } catch (IOException ioe) {
+          LOG.debug("Error reading client status response. Will close connection.", ioe);
+          IOUtils.closeStream(out);
+        }
+      } else {
+        IOUtils.closeStream(out);
+      }
       datanode.metrics.incrBytesRead((int) read);
       datanode.metrics.incrBlocksRead();
     } catch ( SocketException ignored ) {
       // Its ok for remote side to close the connection anytime.
       datanode.metrics.incrBlocksRead();
+      IOUtils.closeStream(out);
     } catch ( IOException ioe ) {
       /* What exactly should we do here?
        * Earlier version shutdown() datanode if there is disk error.
@@ -198,7 +261,6 @@
                 StringUtils.stringifyException(ioe) );
       throw ioe;
     } finally {
-      IOUtils.closeStream(out);
       IOUtils.closeStream(blockSender);
     }
 
@@ -690,12 +752,8 @@
       long timeout) throws IOException {
     DataOutputStream reply = 
       new DataOutputStream(NetUtils.getOutputStream(s, timeout));
-    try {
-      opStatus.write(reply);
-      reply.flush();
-    } finally {
-      IOUtils.closeStream(reply);
-    }
+    opStatus.write(reply);
+    reply.flush();
   }
 
   private void checkAccess(DataOutputStream out, final boolean reply, 
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
index 74ef101..64e4998 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
@@ -73,28 +73,39 @@
 
   /**
    * Create a file of the given size filled with random data.
-   * @return  List of Blocks of the new file.
+   * @return  File data.
    */
-  public List<LocatedBlock> writeFile(Path filepath, int sizeKB)
+  public byte[] writeFile(Path filepath, int sizeKB)
       throws IOException {
     FileSystem fs = cluster.getFileSystem();
 
     // Write a file with the specified amount of data
     DataOutputStream os = fs.create(filepath);
-    byte data[] = new byte[1024];
+    byte data[] = new byte[1024 * sizeKB];
     new Random().nextBytes(data);
-    for (int i = 0; i < sizeKB; i++) {
-      os.write(data);
-    }
+    os.write(data);
     os.close();
+    return data;
+  }
 
+  /**
+   * Get the list of Blocks for a file.
+   */
+  public List<LocatedBlock> getFileBlocks(Path filepath, int sizeKB)
+      throws IOException {
     // Return the blocks we just wrote
-    DFSClient dfsclient = new DFSClient(
-      new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
+    DFSClient dfsclient = getDFSClient();
     return dfsclient.getNamenode().getBlockLocations(
       filepath.toString(), 0, sizeKB * 1024).getLocatedBlocks();
   }
 
+  /**
+   * Get the DFSClient.
+   */
+  public DFSClient getDFSClient() throws IOException {
+    InetSocketAddress nnAddr = new InetSocketAddress("localhost", cluster.getNameNodePort());
+    return new DFSClient(nnAddr, conf);
+  }
 
   /**
    * Exercise the BlockReader and read length bytes.
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
index ca9aed3..be6ac1c 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
@@ -20,6 +20,7 @@
 
 import java.util.List;
 
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.fs.Path;
 
@@ -41,23 +42,24 @@
   public static void setupCluster() throws Exception {
     final int REPLICATION_FACTOR = 1;
     util = new BlockReaderTestUtil(REPLICATION_FACTOR);
-    List<LocatedBlock> blkList = util.writeFile(TEST_FILE, FILE_SIZE_K);
+    util.writeFile(TEST_FILE, FILE_SIZE_K);
+    List<LocatedBlock> blkList = util.getFileBlocks(TEST_FILE, FILE_SIZE_K);
     testBlock = blkList.get(0);     // Use the first block to test
   }
 
   /**
-   * Verify that if we read an entire block, we send checksumOk
+   * Verify that if we read an entire block, we send CHECKSUM_OK
    */
   @Test
   public void testBlockVerification() throws Exception {
     BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
-    verify(reader).checksumOk(reader.dnSock);
+    verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
     reader.close();
   }
 
   /**
-   * Test that if we do an incomplete read, we don't call checksumOk
+   * Test that if we do an incomplete read, we don't call CHECKSUM_OK
    */
   @Test
   public void testIncompleteRead() throws Exception {
@@ -65,14 +67,14 @@
     util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
 
     // We asked the blockreader for the whole file, and only read
-    // half of it, so no checksumOk
-    verify(reader, never()).checksumOk(reader.dnSock);
+    // half of it, so no CHECKSUM_OK
+    verify(reader, never()).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
     reader.close();
   }
 
   /**
    * Test that if we ask for a half block, and read it all, we *do*
-   * call checksumOk. The DN takes care of knowing whether it was
+   * send CHECKSUM_OK. The DN takes care of knowing whether it was
    * the whole block or not.
    */
   @Test
@@ -81,7 +83,7 @@
     BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
     // And read half the file
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
-    verify(reader).checksumOk(reader.dnSock);
+    verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
     reader.close();
   }
 
@@ -99,7 +101,7 @@
                            " len=" + length);
         BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
         util.readAndCheckEOS(reader, length, true);
-        verify(reader).checksumOk(reader.dnSock);
+        verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK);
         reader.close();
       }
     }
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
new file mode 100644
index 0000000..311b724
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+
+import org.apache.hadoop.security.token.Token;
+import org.junit.Test;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import static org.junit.Assert.*;
+
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.mockito.invocation.InvocationOnMock;
+import static org.mockito.Mockito.spy;
+
+/**
+ * This class tests the client connection caching in a single node
+ * mini-cluster.
+ */
+public class TestConnCache {
+  static final Log LOG = LogFactory.getLog(TestConnCache.class);
+
+  static final int BLOCK_SIZE = 4096;
+  static final int FILE_SIZE = 3 * BLOCK_SIZE;
+
+  static Configuration conf = null;
+  static MiniDFSCluster cluster = null;
+  static FileSystem fs = null;
+
+  static final Path testFile = new Path("/testConnCache.dat");
+  static byte authenticData[] = null;
+
+  static BlockReaderTestUtil util = null;
+
+
+  /**
+   * A mock Answer to remember the BlockReader used.
+   *
+   * It verifies that all invocation to DFSInputStream.getBlockReader()
+   * use the same socket.
+   */
+  private class MockGetBlockReader implements Answer<BlockReader> {
+    public BlockReader reader = null;
+    private Socket sock = null;
+
+    public BlockReader answer(InvocationOnMock invocation) throws Throwable {
+      BlockReader prevReader = reader;
+      reader = (BlockReader) invocation.callRealMethod();
+      if (sock == null) {
+        sock = reader.dnSock;
+      } else if (prevReader != null && prevReader.hasSentStatusCode()) {
+        // Can't reuse socket if the previous BlockReader didn't read till EOS.
+        assertSame("DFSInputStream should use the same socket",
+                   sock, reader.dnSock);
+      } return reader;
+    }
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    final int REPLICATION_FACTOR = 1;
+
+    util = new BlockReaderTestUtil(REPLICATION_FACTOR);
+    cluster = util.getCluster();
+    conf = util.getConf();
+    fs = cluster.getFileSystem();
+
+    authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
+  }
+
+
+  /**
+   * (Optionally) seek to position, read and verify data.
+   *
+   * Seek to specified position if pos is non-negative.
+   */
+  private void pread(DFSInputStream in,
+                     long pos,
+                     byte[] buffer,
+                     int offset,
+                     int length)
+      throws IOException {
+    assertTrue("Test buffer too small", buffer.length >= offset + length);
+
+    if (pos >= 0)
+      in.seek(pos);
+
+    LOG.info("Reading from file of size " + in.getFileLength() +
+             " at offset " + in.getPos());
+
+    while (length > 0) {
+      int cnt = in.read(buffer, offset, length);
+      assertTrue("Error in read", cnt > 0);
+      offset += cnt;
+      length -= cnt;
+    }
+
+    // Verify
+    for (int i = 0; i < length; ++i) {
+      byte actual = buffer[i];
+      byte expect = authenticData[(int)pos + i];
+      assertEquals("Read data mismatch at file offset " + (pos + i) +
+                   ". Expects " + expect + "; got " + actual,
+                   actual, expect);
+    }
+  }
+
+  /**
+   * Test the SocketCache itself.
+   */
+  @Test
+  public void testSocketCache() throws IOException {
+    final int CACHE_SIZE = 4;
+    SocketCache cache = new SocketCache(CACHE_SIZE);
+
+    // Make a client
+    InetSocketAddress nnAddr =
+        new InetSocketAddress("localhost", cluster.getNameNodePort());
+    DFSClient client = new DFSClient(nnAddr, conf);
+
+    // Find out the DN addr
+    LocatedBlock block =
+        client.getNamenode().getBlockLocations(
+            testFile.toString(), 0, FILE_SIZE)
+        .getLocatedBlocks().get(0);
+    DataNode dn = util.getDataNode(block);
+    InetSocketAddress dnAddr = dn.getSelfAddr();
+
+    // Make some sockets to the DN
+    Socket[] dnSockets = new Socket[CACHE_SIZE];
+    for (int i = 0; i < dnSockets.length; ++i) {
+      dnSockets[i] = client.socketFactory.createSocket(
+          dnAddr.getAddress(), dnAddr.getPort());
+    }
+
+    // Insert a socket to the NN
+    Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
+    cache.put(nnSock);
+    assertSame("Read the write", nnSock, cache.get(nnAddr));
+    cache.put(nnSock);
+
+    // Insert DN socks
+    for (Socket dnSock : dnSockets) {
+      cache.put(dnSock);
+    }
+
+    assertEquals("NN socket evicted", null, cache.get(nnAddr));
+    assertTrue("Evicted socket closed", nnSock.isClosed());
+
+    // Lookup the DN socks
+    for (Socket dnSock : dnSockets) {
+      assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr));
+      dnSock.close();
+    }
+
+    assertEquals("Cache is empty", 0, cache.size());
+  }
+
+  /**
+   * Read a file served entirely from one DN. Seek around and read from
+   * different offsets. And verify that they all use the same socket.
+   *
+   * @throws java.io.IOException
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testReadFromOneDN() throws IOException {
+    LOG.info("Starting testReadFromOneDN()");
+    DFSClient client = new DFSClient(
+        new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
+    DFSInputStream in = spy(client.open(testFile.toString()));
+    LOG.info("opened " + testFile.toString());
+
+    byte[] dataBuf = new byte[BLOCK_SIZE];
+
+    MockGetBlockReader answer = new MockGetBlockReader();
+    Mockito.doAnswer(answer).when(in).getBlockReader(
+                           (InetSocketAddress) Matchers.anyObject(),
+                           Matchers.anyString(),
+                           (ExtendedBlock) Matchers.anyObject(),
+                           (Token<BlockTokenIdentifier>) Matchers.anyObject(),
+                           Matchers.anyLong(),
+                           Matchers.anyLong(),
+                           Matchers.anyInt(),
+                           Matchers.anyBoolean(),
+                           Matchers.anyString());
+
+    // Initial read
+    pread(in, 0, dataBuf, 0, dataBuf.length);
+    // Read again and verify that the socket is the same
+    pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length);
+    pread(in, 1024, dataBuf, 0, dataBuf.length);
+    pread(in, -1, dataBuf, 0, dataBuf.length);            // No seek; just read
+    pread(in, 64, dataBuf, 0, dataBuf.length / 2);
+
+    in.close();
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    util.shutdown();
+  }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
index 70269b6..05fa648 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
@@ -384,6 +384,8 @@
     conf.setInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 
                 retries);
     conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWin);
+    // Disable keepalive
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, 0);
 
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(replicationFactor).build();
     cluster.waitActive();
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java
new file mode 100644
index 0000000..eb37422
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+import org.junit.Test;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import static org.junit.Assert.*;
+
+/**
+ * Test the use of DFSInputStream by multiple concurrent readers.
+ */
+public class TestParallelRead {
+
+  static final Log LOG = LogFactory.getLog(TestParallelRead.class);
+  static BlockReaderTestUtil util = null;
+  static DFSClient dfsClient = null;
+  static final int FILE_SIZE_K = 256;
+  static Random rand = null;
+  
+  static {
+    // The client-trace log ends up causing a lot of blocking threads
+    // in this when it's being used as a performance benchmark.
+    LogManager.getLogger(DataNode.class.getName() + ".clienttrace")
+      .setLevel(Level.WARN);
+  }
+
+  private class TestFileInfo {
+    public DFSInputStream dis;
+    public Path filepath;
+    public byte[] authenticData;
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    final int REPLICATION_FACTOR = 2;
+    util = new BlockReaderTestUtil(REPLICATION_FACTOR);
+    dfsClient = util.getDFSClient();
+    rand = new Random(System.currentTimeMillis());
+  }
+
+  /**
+   * A worker to do one "unit" of read.
+   */
+  static class ReadWorker extends Thread {
+    static public final int N_ITERATIONS = 1024;
+
+    private static final double PROPORTION_NON_POSITIONAL_READ = 0.10;
+
+    private TestFileInfo testInfo;
+    private long fileSize;
+    private long bytesRead;
+    private boolean error;
+
+    ReadWorker(TestFileInfo testInfo, int id) {
+      super("ReadWorker-" + id + "-" + testInfo.filepath.toString());
+      this.testInfo = testInfo;
+      fileSize = testInfo.dis.getFileLength();
+      assertEquals(fileSize, testInfo.authenticData.length);
+      bytesRead = 0;
+      error = false;
+    }
+
+    /**
+     * Randomly do one of (1) Small read; and (2) Large Pread.
+     */
+    @Override
+    public void run() {
+      for (int i = 0; i < N_ITERATIONS; ++i) {
+        int startOff = rand.nextInt((int) fileSize);
+        int len = 0;
+        try {
+          double p = rand.nextDouble();
+          if (p < PROPORTION_NON_POSITIONAL_READ) {
+            // Do a small regular read. Very likely this will leave unread
+            // data on the socket and make the socket uncacheable.
+            len = Math.min(rand.nextInt(64), (int) fileSize - startOff);
+            read(startOff, len);
+            bytesRead += len;
+          } else {
+            // Do a positional read most of the time.
+            len = rand.nextInt((int) (fileSize - startOff));
+            pRead(startOff, len);
+            bytesRead += len;
+          }
+        } catch (Exception ex) {
+          LOG.error(getName() + ": Error while testing read at " + startOff +
+                    " length " + len);
+          error = true;
+          fail(ex.getMessage());
+        }
+      }
+    }
+
+    public long getBytesRead() {
+      return bytesRead;
+    }
+
+    /**
+     * Raising error in a thread doesn't seem to fail the test.
+     * So check afterwards.
+     */
+    public boolean hasError() {
+      return error;
+    }
+
+    /**
+     * Seek to somewhere random and read.
+     */
+    private void read(int start, int len) throws Exception {
+      assertTrue(
+          "Bad args: " + start + " + " + len + " should be < " + fileSize,
+          start + len < fileSize);
+      DFSInputStream dis = testInfo.dis;
+
+      synchronized (dis) {
+        dis.seek(start);
+
+        byte buf[] = new byte[len];
+        int cnt = 0;
+        while (cnt < len) {
+          cnt += dis.read(buf, cnt, buf.length - cnt);
+        }
+        verifyData("Read data corrupted", buf, start, start + len);
+      }
+    }
+
+    /**
+     * Positional read.
+     */
+    private void pRead(int start, int len) throws Exception {
+      assertTrue(
+          "Bad args: " + start + " + " + len + " should be < " + fileSize,
+          start + len < fileSize);
+      DFSInputStream dis = testInfo.dis;
+
+      byte buf[] = new byte[len];
+      int cnt = 0;
+      while (cnt < len) {
+        cnt += dis.read(start, buf, cnt, buf.length - cnt);
+      }
+      verifyData("Pread data corrupted", buf, start, start + len);
+    }
+
+    /**
+     * Verify read data vs authentic data
+     */
+    private void verifyData(String msg, byte actual[], int start, int end)
+        throws Exception {
+      byte auth[] = testInfo.authenticData;
+      if (end > auth.length) {
+        throw new Exception(msg + ": Actual array (" + end +
+                            ") is past the end of authentic data (" +
+                            auth.length + ")");
+      }
+
+      int j = start;
+      for (int i = 0; i < actual.length; ++i, ++j) {
+        if (auth[j] != actual[i]) {
+          throw new Exception(msg + ": Arrays byte " + i + " (at offset " +
+                              j + ") differs: expect " +
+                              auth[j] + " got " + actual[i]);
+        }
+      }
+    }
+  }
+
+  /**
+   * Do parallel read several times with different number of files and threads.
+   *
+   * Note that while this is the only "test" in a junit sense, we're actually
+   * dispatching a lot more. Failures in the other methods (and other threads)
+   * need to be manually collected, which is inconvenient.
+   */
+  @Test
+  public void testParallelRead() throws IOException {
+    if (!runParallelRead(1, 4)) {
+      fail("Check log for errors");
+    }
+    if (!runParallelRead(1, 16)) {
+      fail("Check log for errors");
+    }
+    if (!runParallelRead(2, 4)) {
+      fail("Check log for errors");
+    }
+  }
+
+  /**
+   * Start the parallel read with the given parameters.
+   */
+  boolean runParallelRead(int nFiles, int nWorkerEach) throws IOException {
+    ReadWorker workers[] = new ReadWorker[nFiles * nWorkerEach];
+    TestFileInfo testInfoArr[] = new TestFileInfo[nFiles];
+
+    // Prepare the files and workers
+    int nWorkers = 0;
+    for (int i = 0; i < nFiles; ++i) {
+      TestFileInfo testInfo = new TestFileInfo();
+      testInfoArr[i] = testInfo;
+
+      testInfo.filepath = new Path("/TestParallelRead.dat." + i);
+      testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
+      testInfo.dis = dfsClient.open(testInfo.filepath.toString());
+
+      for (int j = 0; j < nWorkerEach; ++j) {
+        workers[nWorkers++] = new ReadWorker(testInfo, nWorkers);
+      }
+    }
+
+    // Start the workers and wait
+    long starttime = System.currentTimeMillis();
+    for (ReadWorker worker : workers) {
+      worker.start();
+    }
+
+    for (ReadWorker worker : workers) {
+      try {
+        worker.join();
+      } catch (InterruptedException ignored) { }
+    }
+    long endtime = System.currentTimeMillis();
+
+    // Cleanup
+    for (TestFileInfo testInfo : testInfoArr) {
+      testInfo.dis.close();
+    }
+
+    // Report
+    boolean res = true;
+    long totalRead = 0;
+    for (ReadWorker worker : workers) {
+      long nread = worker.getBytesRead();
+      LOG.info("--- Report: " + worker.getName() + " read " + nread + " B; " +
+               "average " + nread / ReadWorker.N_ITERATIONS + " B per read");
+      totalRead += nread;
+      if (worker.hasError()) {
+        res = false;
+      }
+    }
+
+    double timeTakenSec = (endtime - starttime) / 1000.0;
+    long totalReadKB = totalRead / 1024;
+    LOG.info("=== Report: " + nWorkers + " threads read " +
+             totalReadKB + " KB (across " +
+             nFiles + " file(s)) in " +
+             timeTakenSec + "s; average " +
+             totalReadKB / timeTakenSec + " KB/s");
+
+    return res;
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    util.shutdown();
+  }
+
+}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
index 0ede253..7f4eb0f 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
@@ -23,6 +23,7 @@
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,6 +38,11 @@
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.apache.log4j.Level;
+
+import org.apache.commons.logging.LogFactory;
+
 import static org.apache.hadoop.test.MetricsAsserts.*;
 
 /**
@@ -59,6 +65,9 @@
         DFS_REPLICATION_INTERVAL);
     CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
         DFS_REPLICATION_INTERVAL);
+
+    ((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
+      .getLogger().setLevel(Level.DEBUG);
   }
   
   private MiniDFSCluster cluster;
@@ -255,9 +264,5 @@
     readFile(fs, file1_Path);
     updateMetrics();
     assertCounter("GetBlockLocations", 3L, getMetrics(NN_METRICS));
-  
-    // Verify total load metrics, total load = Data Node started.
-    updateMetrics();
-    assertGauge("TotalLoad" ,DATANODE_COUNT, getMetrics(NS_METRICS));
   }
 }