HDFS-15709. Socket file descriptor leak in StripedBlockChecksumReconstructor. (#2518)

(cherry picked from commit 40f7543a6d5765c98d41c78736124b7b7f078aa2)
(cherry picked from commit edd9b659caf77d14caf6df88bac3cedf025e1f25)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
index 3388855..988c463 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
@@ -700,24 +700,25 @@
               blockGroup, ecPolicy, blockIndices, datanodes, errIndices);
       BlockChecksumType groupChecksumType =
           getBlockChecksumOptions().getBlockChecksumType();
-      final StripedBlockChecksumReconstructor checksumRecon =
+      try (StripedBlockChecksumReconstructor checksumRecon =
           groupChecksumType == BlockChecksumType.COMPOSITE_CRC ?
           new StripedBlockChecksumCompositeCrcReconstructor(
               getDatanode().getErasureCodingWorker(), stripedReconInfo,
               blockChecksumBuf, blockLength) :
           new StripedBlockChecksumMd5CrcReconstructor(
               getDatanode().getErasureCodingWorker(), stripedReconInfo,
-              blockChecksumBuf, blockLength);
-      checksumRecon.reconstruct();
+              blockChecksumBuf, blockLength)) {
+        checksumRecon.reconstruct();
 
-      DataChecksum checksum = checksumRecon.getChecksum();
-      long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0
-          : checksumRecon.getChecksumDataLen() / checksum.getChecksumSize();
-      setOrVerifyChecksumProperties(errBlkIndex,
-          checksum.getBytesPerChecksum(), crcPerBlock,
-          checksum.getChecksumType());
-      LOG.debug("Recalculated checksum for the block index:{}, checksum={}",
-          errBlkIndex, checksumRecon.getDigestObject());
+        DataChecksum checksum = checksumRecon.getChecksum();
+        long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0
+            : checksumRecon.getChecksumDataLen() / checksum.getChecksumSize();
+        setOrVerifyChecksumProperties(errBlkIndex,
+            checksum.getBytesPerChecksum(), crcPerBlock,
+            checksum.getChecksumType());
+        LOG.debug("Recalculated checksum for the block index:{}, checksum={}",
+            errBlkIndex, checksumRecon.getDigestObject());
+      }
     }
 
     private void setOrVerifyChecksumProperties(int blockIdx, int bpc,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
index d9e2f60..a600626 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.erasurecode;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -32,7 +33,7 @@
  */
 @InterfaceAudience.Private
 public abstract class StripedBlockChecksumReconstructor
-    extends StripedReconstructor {
+    extends StripedReconstructor implements Closeable {
   private ByteBuffer targetBuffer;
   private final byte[] targetIndices;
 
@@ -73,31 +74,27 @@
   public void reconstruct() throws IOException {
     prepareDigester();
     long maxTargetLength = getMaxTargetLength();
-    try {
-      while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) {
-        long remaining = maxTargetLength - getPositionInBlock();
-        final int toReconstructLen = (int) Math
-            .min(getStripedReader().getBufferSize(), remaining);
-        // step1: read from minimum source DNs required for reconstruction.
-        // The returned success list is the source DNs we do real read from
-        getStripedReader().readMinimumSources(toReconstructLen);
+    while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) {
+      long remaining = maxTargetLength - getPositionInBlock();
+      final int toReconstructLen = (int) Math
+          .min(getStripedReader().getBufferSize(), remaining);
+      // step1: read from minimum source DNs required for reconstruction.
+      // The returned success list is the source DNs we do real read from
+      getStripedReader().readMinimumSources(toReconstructLen);
 
-        // step2: decode to reconstruct targets
-        reconstructTargets(toReconstructLen);
+      // step2: decode to reconstruct targets
+      reconstructTargets(toReconstructLen);
 
-        // step3: calculate checksum
-        checksumDataLen += checksumWithTargetOutput(
-            getBufferArray(targetBuffer), toReconstructLen);
+      // step3: calculate checksum
+      checksumDataLen += checksumWithTargetOutput(
+          getBufferArray(targetBuffer), toReconstructLen);
 
-        updatePositionInBlock(toReconstructLen);
-        requestedLen -= toReconstructLen;
-        clearBuffers();
-      }
-
-      commitDigest();
-    } finally {
-      cleanup();
+      updatePositionInBlock(toReconstructLen);
+      requestedLen -= toReconstructLen;
+      clearBuffers();
     }
+
+    commitDigest();
   }
 
   /**
@@ -222,4 +219,10 @@
     }
     return buff;
   }
+
+  @Override
+  public void close() throws IOException {
+    getStripedReader().close();
+    cleanup();
+  }
 }
\ No newline at end of file