HDFS-15240. Erasure Coding: dirty buffer causes reconstruction block error. Contributed by HuangTao.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
index bbedf2a..0350db3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
@@ -96,6 +96,7 @@
                       ByteBuffer.allocate(length);
     }
     tree.remove(entry.getKey());
+    entry.getValue().clear();
     return entry.getValue();
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 7e66111..08123c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -97,6 +97,22 @@
   public void stripedBlockReconstruction() throws IOException {}
 
   /**
+   * Used as a hook to inject latency when read block
+   * in erasure coding reconstruction process.
+   */
+  public void delayBlockReader() {}
+
+  /**
+   * Used as a hook to inject intercept when free the block reader buffer.
+   */
+  public void interceptFreeBlockReaderBuffer() {}
+
+  /**
+   * Used as a hook to inject intercept When finish reading from block.
+   */
+  public void interceptBlockReader() {}
+
+  /**
    * Used as a hook to inject intercept when BPOfferService hold lock.
    */
   public void delayWhenOfferServiceHoldLock() {}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
index 0db8a6f..ff3306b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
@@ -31,6 +31,7 @@
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
@@ -95,6 +96,7 @@
   }
 
   void freeReadBuffer() {
+    DataNodeFaultInjector.get().interceptFreeBlockReaderBuffer();
     buffer = null;
   }
 
@@ -179,6 +181,8 @@
         } catch (IOException e) {
           LOG.info(e.getMessage());
           throw e;
+        } finally {
+          DataNodeFaultInjector.get().interceptBlockReader();
         }
       }
     };
@@ -188,6 +192,7 @@
    * Perform actual reading of bytes from block.
    */
   private BlockReadStats actualReadFromBlock() throws IOException {
+    DataNodeFaultInjector.get().delayBlockReader();
     int len = buffer.remaining();
     int n = 0;
     while (n < len) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
index 98edf72..070931c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
@@ -43,6 +43,7 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Manage striped readers that performs reading of block data from remote to
@@ -328,14 +329,14 @@
             // cancel remaining reads if we read successfully from minimum
             // number of source DNs required by reconstruction.
             cancelReads(futures.keySet());
-            futures.clear();
+            clearFuturesAndService();
             break;
           }
         }
       } catch (InterruptedException e) {
         LOG.info("Read data interrupted.", e);
         cancelReads(futures.keySet());
-        futures.clear();
+        clearFuturesAndService();
         break;
       }
     }
@@ -429,6 +430,20 @@
     }
   }
 
+  // remove all stale futures from readService, and clear futures.
+  private void clearFuturesAndService() {
+    while (!futures.isEmpty()) {
+      try {
+        Future<BlockReadStats> future = readService.poll(
+            stripedReadTimeoutInMills, TimeUnit.MILLISECONDS
+        );
+        futures.remove(future);
+      } catch (InterruptedException e) {
+        LOG.info("Clear stale futures from service is interrupted.", e);
+      }
+    }
+  }
+
   void close() {
     if (zeroStripeBuffers != null) {
       for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) {
@@ -438,9 +453,9 @@
     zeroStripeBuffers = null;
 
     for (StripedBlockReader reader : readers) {
+      reader.closeBlockReader();
       reconstructor.freeBuffer(reader.getReadBuffer());
       reader.freeReadBuffer();
-      reader.closeBlockReader();
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
index 4c8be82..48a0747 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.erasurecode;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -279,4 +280,9 @@
   public ErasureCodingWorker getErasureCodingWorker() {
     return erasureCodingWorker;
   }
+
+  @VisibleForTesting
+  static ByteBufferPool getBufferPool() {
+    return BUFFER_POOL;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
index b0b3350..16ce0dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
@@ -23,6 +23,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -34,6 +35,12 @@
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingTestHelper;
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -575,4 +582,237 @@
       }
     }
   }
+
+  /**
+   * When the StripedBlockReader timeout, the outdated future should be ignored.
+   * Or the NPE will be thrown, which will stop reading the remaining data, and
+   * the reconstruction task will fail.
+   */
+  @Test(timeout = 120000)
+  public void testTimeoutReadBlockInReconstruction() throws Exception {
+    assumeTrue("Ignore case where num parity units <= 1",
+        ecPolicy.getNumParityUnits() > 1);
+    int stripedBufferSize = conf.getInt(
+        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
+        cellSize);
+    ErasureCodingPolicy policy = ecPolicy;
+    fs.enableErasureCodingPolicy(policy.getName());
+    fs.getClient().setErasureCodingPolicy("/", policy.getName());
+
+    // StripedBlockReconstructor#reconstruct will loop 2 times
+    final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits();
+    String fileName = "/timeout-read-block";
+    Path file = new Path(fileName);
+    writeFile(fs, fileName, fileLen);
+    fs.getFileBlockLocations(file, 0, fileLen);
+
+    LocatedBlocks locatedBlocks =
+        StripedFileTestUtil.getLocatedBlocks(file, fs);
+    Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size());
+    // The file only has one block group
+    LocatedBlock lblock = locatedBlocks.get(0);
+    DatanodeInfo[] datanodeinfos = lblock.getLocations();
+
+    // to reconstruct first block
+    DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort());
+
+    int stripedReadTimeoutInMills = conf.getInt(
+        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
+        DFSConfigKeys.
+            DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
+    Assert.assertTrue(
+        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY
+            + " must be greater than 2000",
+        stripedReadTimeoutInMills > 2000);
+
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() {
+      private AtomicInteger numDelayReader = new AtomicInteger(0);
+
+      @Override
+      public void delayBlockReader() {
+        int index = numDelayReader.incrementAndGet();
+        LOG.info("Delay the {}th read block", index);
+
+        // the file's first StripedBlockReconstructor#reconstruct,
+        // and the first reader will timeout
+        if (index == 1) {
+          try {
+            GenericTestUtils.waitFor(() -> numDelayReader.get() >=
+                    ecPolicy.getNumDataUnits() + 1, 50,
+                stripedReadTimeoutInMills * 3
+            );
+          } catch (TimeoutException e) {
+            Assert.fail("Can't reconstruct the file's first part.");
+          } catch (InterruptedException e) {
+          }
+        }
+        // stop all the following re-reconstruction tasks
+        if (index > 3 * ecPolicy.getNumDataUnits() + 1) {
+          while (true) {
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+          }
+        }
+      }
+    };
+    DataNodeFaultInjector.set(timeoutInjector);
+
+    try {
+      shutdownDataNode(dataNode);
+      // before HDFS-15240, NPE will cause reconstruction fail(test timeout)
+      StripedFileTestUtil
+          .waitForReconstructionFinished(file, fs, groupSize);
+    } finally {
+      DataNodeFaultInjector.set(oldInjector);
+    }
+  }
+
+  /**
+   * When block reader timeout, the outdated future should be ignored.
+   * Or the ByteBuffer would be wrote after giving back to the BufferPool.
+   * This UT is used to ensure that we should close block reader
+   * before freeing the buffer.
+   */
+  @Test(timeout = 120000)
+  public void testAbnormallyCloseDoesNotWriteBufferAgain() throws Exception {
+    assumeTrue("Ignore case where num parity units <= 1",
+        ecPolicy.getNumParityUnits() > 1);
+    int stripedBufferSize = conf.getInt(
+        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
+        cellSize);
+    // StripedBlockReconstructor#reconstruct will loop 2 times
+    final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits();
+    String fileName = "/no-dirty-buffer";
+    Path file = new Path(fileName);
+    writeFile(fs, fileName, fileLen);
+    fs.getFileBlockLocations(file, 0, fileLen);
+
+    LocatedBlocks locatedBlocks =
+        StripedFileTestUtil.getLocatedBlocks(file, fs);
+    Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size());
+    // The file only has one block group
+    LocatedBlock lblock = locatedBlocks.get(0);
+    DatanodeInfo[] datanodeinfos = lblock.getLocations();
+
+    // to reconstruct first block
+    DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort());
+
+    int stripedReadTimeoutInMills = conf.getInt(
+        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
+        DFSConfigKeys.
+            DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
+    Assert.assertTrue(
+        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY
+            + " must be greater than 2000",
+        stripedReadTimeoutInMills > 2000);
+
+    ElasticByteBufferPool bufferPool =
+        (ElasticByteBufferPool) ErasureCodingTestHelper.getBufferPool();
+    emptyBufferPool(bufferPool, true);
+    emptyBufferPool(bufferPool, false);
+
+    AtomicInteger finishedReadBlock = new AtomicInteger(0);
+
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() {
+      private AtomicInteger numDelayReader = new AtomicInteger(0);
+      private AtomicBoolean continueRead = new AtomicBoolean(false);
+      private AtomicBoolean closeByNPE = new AtomicBoolean(false);
+
+      @Override
+      public void delayBlockReader() {
+        int index = numDelayReader.incrementAndGet();
+        LOG.info("Delay the {}th read block", index);
+
+        // the file's first StripedBlockReconstructor#reconstruct,
+        // and the first reader will timeout
+        if (index == 1) {
+          try {
+            GenericTestUtils.waitFor(() -> numDelayReader.get() >=
+                    ecPolicy.getNumDataUnits() + 1, 50,
+                stripedReadTimeoutInMills * 3
+            );
+          } catch (TimeoutException e) {
+            Assert.fail("Can't reconstruct the file's first part.");
+          } catch (InterruptedException e) {
+          }
+        }
+        if (index > ecPolicy.getNumDataUnits() + 1) {
+          try {
+            GenericTestUtils.waitFor(
+                () -> {
+                  LOG.info("Close by NPE: {}, continue read: {}",
+                      closeByNPE, continueRead);
+                  return closeByNPE.get() ? continueRead.get()
+                    : index == finishedReadBlock.get() + 1; }, 5,
+                stripedReadTimeoutInMills * 3
+            );
+          } catch (TimeoutException e) {
+            Assert.fail("Can't reconstruct the file's remaining part.");
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+
+      @Override
+      public void interceptBlockReader() {
+        int n = finishedReadBlock.incrementAndGet();
+        LOG.info("Intercept the end of {}th read block.", n);
+      }
+
+      private AtomicInteger numFreeBuffer = new AtomicInteger(0);
+      @Override
+      public void interceptFreeBlockReaderBuffer() {
+        closeByNPE.compareAndSet(false, true);
+        int num = numFreeBuffer.incrementAndGet();
+        LOG.info("Intercept the {} free block buffer.", num);
+        if (num >= ecPolicy.getNumDataUnits() + 1) {
+          continueRead.compareAndSet(false, true);
+          try {
+            GenericTestUtils.waitFor(() -> finishedReadBlock.get() >=
+                    2 * ecPolicy.getNumDataUnits() + 1, 50,
+                stripedReadTimeoutInMills * 3
+            );
+          } catch (TimeoutException e) {
+            Assert.fail("Can't finish the file's reconstruction.");
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+    DataNodeFaultInjector.set(timeoutInjector);
+    try {
+      shutdownDataNode(dataNode);
+      // at least one timeout reader
+      GenericTestUtils.waitFor(() -> finishedReadBlock.get() >=
+              2 * ecPolicy.getNumDataUnits() + 1, 50,
+          stripedReadTimeoutInMills * 3
+      );
+
+      assertBufferPoolIsEmpty(bufferPool, false);
+      assertBufferPoolIsEmpty(bufferPool, true);
+      StripedFileTestUtil.waitForReconstructionFinished(file, fs, groupSize);
+    } finally {
+      DataNodeFaultInjector.set(oldInjector);
+    }
+  }
+
+  private void assertBufferPoolIsEmpty(ElasticByteBufferPool bufferPool,
+      boolean direct) {
+    while (bufferPool.size(direct) != 0) {
+      // iterate all ByteBuffers in ElasticByteBufferPool
+      ByteBuffer byteBuffer =  bufferPool.getBuffer(direct, 0);
+      Assert.assertEquals(0, byteBuffer.position());
+    }
+  }
+
+  private void emptyBufferPool(ElasticByteBufferPool bufferPool,
+      boolean direct) {
+    while (bufferPool.size(direct) != 0) {
+      bufferPool.getBuffer(direct, 0);
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingTestHelper.java
new file mode 100644
index 0000000..da571055
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingTestHelper.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import org.apache.hadoop.io.ByteBufferPool;
+
+public final class ErasureCodingTestHelper {
+
+  private ErasureCodingTestHelper() { }
+
+  public static ByteBufferPool getBufferPool() {
+    return StripedReconstructor.getBufferPool();
+  }
+}