diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index fb6d83f..bb75e3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -432,6 +432,7 @@
       ris = new ReplicaInputStreams(
           blockIn, checksumIn, volumeRef, fileIoProvider);
     } catch (IOException ioe) {
+      IOUtils.cleanupWithLogger(null, volumeRef);
       IOUtils.closeStream(this);
       org.apache.commons.io.IOUtils.closeQuietly(blockIn);
       org.apache.commons.io.IOUtils.closeQuietly(checksumIn);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index 2a89a80d..706c078 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -167,18 +167,26 @@
    * Execute the task sometime in the future, using ThreadPools.
    */
   synchronized void execute(FsVolumeImpl volume, Runnable task) {
-    if (executors == null) {
-      throw new RuntimeException("AsyncDiskService is already shutdown");
-    }
-    if (volume == null) {
-      throw new RuntimeException("A null volume does not have a executor");
-    }
-    ThreadPoolExecutor executor = executors.get(volume.getStorageID());
-    if (executor == null) {
-      throw new RuntimeException("Cannot find volume " + volume
-          + " for execution of task " + task);
-    } else {
-      executor.execute(task);
+    try {
+      if (executors == null) {
+        throw new RuntimeException("AsyncDiskService is already shutdown");
+      }
+      if (volume == null) {
+        throw new RuntimeException("A null volume does not have a executor");
+      }
+      ThreadPoolExecutor executor = executors.get(volume.getStorageID());
+      if (executor == null) {
+        throw new RuntimeException("Cannot find volume " + volume
+            + " for execution of task " + task);
+      } else {
+        executor.execute(task);
+      }
+    } catch (RuntimeException re) {
+      if (task instanceof ReplicaFileDeleteTask) {
+        IOUtils.cleanupWithLogger(null,
+            ((ReplicaFileDeleteTask) task).volumeRef);
+      }
+      throw re;
     }
   }
   
@@ -314,28 +322,31 @@
 
     @Override
     public void run() {
-      final long blockLength = replicaToDelete.getBlockDataLength();
-      final long metaLength = replicaToDelete.getMetadataLength();
-      boolean result;
+      try {
+        final long blockLength = replicaToDelete.getBlockDataLength();
+        final long metaLength = replicaToDelete.getMetadataLength();
+        boolean result;
 
-      result = (trashDirectory == null) ? deleteFiles() : moveFiles();
+        result = (trashDirectory == null) ? deleteFiles() : moveFiles();
 
-      if (!result) {
-        LOG.warn("Unexpected error trying to "
-            + (trashDirectory == null ? "delete" : "move")
-            + " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
-            + " at file " + replicaToDelete.getBlockURI() + ". Ignored.");
-      } else {
-        if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
-          datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
+        if (!result) {
+          LOG.warn("Unexpected error trying to "
+              + (trashDirectory == null ? "delete" : "move")
+              + " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
+              + " at file " + replicaToDelete.getBlockURI() + ". Ignored.");
+        } else {
+          if (block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK) {
+            datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
+          }
+          volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
+          volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
+          LOG.info("Deleted " + block.getBlockPoolId() + " " +
+              block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI());
         }
-        volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
-        volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
-        LOG.info("Deleted " + block.getBlockPoolId() + " "
-            + block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI());
+        updateDeletedBlockId(block);
+      } finally {
+        IOUtils.cleanupWithLogger(null, this.volumeRef);
       }
-      updateDeletedBlockId(block);
-      IOUtils.cleanupWithLogger(null, volumeRef);
     }
   }
   
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 07e14fb..6681f6f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -319,7 +319,7 @@
   }
 
   @VisibleForTesting
-  int getReferenceCount() {
+  public int getReferenceCount() {
     return this.reference.getReferenceCount();
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
index a77faf2..0d42ae9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -153,16 +154,24 @@
    * Execute the task sometime in the future, using ThreadPools.
    */
   synchronized void execute(String storageId, Runnable task) {
-    if (executors == null) {
-      throw new RuntimeException(
-          "AsyncLazyPersistService is already shutdown");
-    }
-    ThreadPoolExecutor executor = executors.get(storageId);
-    if (executor == null) {
-      throw new RuntimeException("Cannot find root storage volume with id " +
-          storageId + " for execution of task " + task);
-    } else {
-      executor.execute(task);
+    try {
+      if (executors == null) {
+        throw new RuntimeException(
+            "AsyncLazyPersistService is already shutdown");
+      }
+      ThreadPoolExecutor executor = executors.get(storageId);
+      if (executor == null) {
+        throw new RuntimeException("Cannot find root storage volume with id " +
+            storageId + " for execution of task " + task);
+      } else {
+        executor.execute(task);
+      }
+    } catch (RuntimeException re) {
+      if (task instanceof ReplicaLazyPersistTask) {
+        IOUtils.cleanupWithLogger(null,
+            ((ReplicaLazyPersistTask) task).targetVolume);
+      }
+      throw re;
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index b9da5f4..b1a675c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -33,6 +33,9 @@
 import java.nio.ByteBuffer;
 import java.util.Random;
 
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -562,4 +565,56 @@
         checksum, CachingStrategy.newDefaultStrategy(), false, false,
         null, null, new String[0]);
   }
+
+  @Test(timeout = 30000)
+  public void testReleaseVolumeRefIfExceptionThrown()
+      throws IOException, InterruptedException {
+    Path file = new Path("dataprotocol.dat");
+    int numDataNodes = 1;
+
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        numDataNodes).build();
+    try {
+      cluster.waitActive();
+      datanode = cluster.getFileSystem().getDataNodeStats(
+          DatanodeReportType.LIVE)[0];
+      dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
+      FileSystem fileSys = cluster.getFileSystem();
+
+      int fileLen = Math.min(
+          conf.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096), 4096);
+
+      DFSTestUtil.createFile(fileSys, file, fileLen, fileLen,
+          fileSys.getDefaultBlockSize(file),
+          fileSys.getDefaultReplication(file), 0L);
+
+      // Get the first blockid for the file.
+      final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+
+      String bpid = cluster.getNamesystem().getBlockPoolId();
+      ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock());
+      sendBuf.reset();
+      recvBuf.reset();
+
+      // Delete the meta file to create a exception in BlockSender constructor.
+      DataNode dn = cluster.getDataNodes().get(0);
+      cluster.getMaterializedReplica(0, blk).deleteMeta();
+
+      FsVolumeImpl volume = (FsVolumeImpl) DataNodeTestUtils.getFSDataset(
+          dn).getVolume(blk);
+      int beforeCnt = volume.getReferenceCount();
+
+      sender.copyBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN);
+      sendRecvData("Copy a block.", false);
+      Thread.sleep(3000);
+
+      int afterCnt = volume.getReferenceCount();
+      assertEquals(beforeCnt, afterCnt);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 6ae6248..778ef97 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -24,6 +24,7 @@
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
+import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
 
 import java.io.OutputStream;
@@ -1805,4 +1806,37 @@
       cluster.shutdown();
     }
   }
+
+  @Test(timeout = 20000)
+  public void testReleaseVolumeRefIfExceptionThrown() throws IOException {
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(
+        new HdfsConfiguration()).build();
+    cluster.waitActive();
+    FsVolumeImpl vol = (FsVolumeImpl) dataset.getFsVolumeReferences().get(0);
+    ExtendedBlock eb;
+    ReplicaInfo info;
+    int beforeCnt = 0;
+    try {
+      List<Block> blockList = new ArrayList<Block>();
+      eb = new ExtendedBlock(BLOCKPOOL, 1, 1, 1001);
+      info = new FinalizedReplica(
+          eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
+      dataset.volumeMap.add(BLOCKPOOL, info);
+      ((LocalReplica) info).getBlockFile().createNewFile();
+      ((LocalReplica) info).getMetaFile().createNewFile();
+      blockList.add(info);
+
+      // Create a runtime exception.
+      dataset.asyncDiskService.shutdown();
+
+      beforeCnt = vol.getReferenceCount();
+      dataset.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
+
+    } catch (RuntimeException re) {
+      int afterCnt = vol.getReferenceCount();
+      assertEquals(beforeCnt, afterCnt);
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index c0b4b17..14ed26e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.ThreadUtil;
 import org.junit.Assert;
@@ -280,4 +283,38 @@
       }
     }
   }
+
+  @Test(timeout = 20000)
+  public void testReleaseVolumeRefIfExceptionThrown()
+      throws IOException, InterruptedException {
+    getClusterBuilder().setRamDiskReplicaCapacity(2).build();
+    final String methodName = GenericTestUtils.getMethodName();
+    final int seed = 0xFADED;
+    Path path = new Path("/" + methodName + ".Writer.File.dat");
+
+    DataNode dn = cluster.getDataNodes().get(0);
+    FsDatasetSpi.FsVolumeReferences volumes =
+        DataNodeTestUtils.getFSDataset(dn).getFsVolumeReferences();
+    int[] beforeCnts = new int[volumes.size()];
+    FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
+
+    // Create a runtime exception.
+    ds.asyncLazyPersistService.shutdown();
+    for (int i = 0; i < volumes.size(); ++i) {
+      beforeCnts[i] = ((FsVolumeImpl) volumes.get(i)).getReferenceCount();
+    }
+
+    makeRandomTestFile(path, BLOCK_SIZE, true, seed);
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    for (int i = 0; i < volumes.size(); ++i) {
+      int afterCnt = ((FsVolumeImpl) volumes.get(i)).getReferenceCount();
+      // LazyWriter keeps trying to save copies even if
+      // asyncLazyPersistService is already shutdown.
+      // If we do not release references, the number of
+      // references will increase infinitely.
+      Assert.assertTrue(
+          beforeCnts[i] == afterCnt || beforeCnts[i] == (afterCnt - 1));
+    }
+  }
 }
