HDFS-17299. Adding rack failure tolerance when creating a new file  (#6614)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index fb7568d..a02444f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -94,6 +94,7 @@
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 
+import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -706,8 +707,8 @@
 
         // get new block from namenode.
         if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
-          LOG.debug("Allocating new block");
-          setPipeline(nextBlockOutputStream());
+          LOG.debug("Allocating new block: {}", this);
+          setupPipelineForCreate();
           initDataStreaming();
         } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
           LOG.debug("Append to block {}", block);
@@ -1241,7 +1242,8 @@
       streamerClosed = true;
       return false;
     }
-    boolean doSleep = setupPipelineForAppendOrRecovery();
+
+    setupPipelineForAppendOrRecovery();
 
     if (!streamerClosed && dfsClient.clientRunning) {
       if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
@@ -1275,7 +1277,7 @@
       }
     }
 
-    return doSleep;
+    return false;
   }
 
   void setHflush() {
@@ -1449,9 +1451,11 @@
    * it can be written to.
    * This happens when a file is appended or data streaming fails
    * It keeps on trying until a pipeline is setup
+   *
+   * Returns boolean whether pipeline was setup successfully or not.
+   * This boolean is used upstream on whether to continue creating pipeline or throw exception
    */
   private boolean setupPipelineForAppendOrRecovery() throws IOException {
-    // check number of datanodes
     if (nodes == null || nodes.length == 0) {
       String msg = "Could not get block locations. " + "Source file \""
           + src + "\" - Aborting...";
@@ -1463,23 +1467,35 @@
 
     boolean success = false;
     long newGS = 0L;
+    boolean isCreateStage = BlockConstructionStage.PIPELINE_SETUP_CREATE == stage;
     while (!success && !streamerClosed && dfsClient.clientRunning) {
       if (!handleRestartingDatanode()) {
         return false;
       }
 
-      final boolean isRecovery = errorState.hasError();
+      final boolean isRecovery = errorState.hasError() && !isCreateStage;
+
       if (!handleBadDatanode()) {
         return false;
       }
 
       handleDatanodeReplacement();
 
+      // During create stage, min replication should still be satisfied.
+      if (isCreateStage && !(dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 &&
+          nodes.length  >= dfsClient.dtpReplaceDatanodeOnFailureReplication)) {
+        return false;
+      }
+
       // get a new generation stamp and an access token
       final LocatedBlock lb = updateBlockForPipeline();
       newGS = lb.getBlock().getGenerationStamp();
       accessToken = lb.getBlockToken();
 
+      if (isCreateStage) {
+        block.setCurrentBlock(lb.getBlock());
+      }
+
       // set up the pipeline again with the remaining nodes
       success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
 
@@ -1491,7 +1507,7 @@
     if (success) {
       updatePipeline(newGS);
     }
-    return false; // do not sleep, continue processing
+    return success;
   }
 
   /**
@@ -1629,10 +1645,10 @@
    * Must get block ID and the IDs of the destinations from the namenode.
    * Returns the list of target datanodes.
    */
-  protected LocatedBlock nextBlockOutputStream() throws IOException {
+  protected void setupPipelineForCreate() throws IOException {
     LocatedBlock lb;
     DatanodeInfo[] nodes;
-    StorageType[] storageTypes;
+    StorageType[] nextStorageTypes;
     int count = dfsClient.getConf().getNumBlockWriteRetry();
     boolean success;
     final ExtendedBlock oldBlock = block.getCurrentBlock();
@@ -1640,6 +1656,7 @@
       errorState.reset();
       lastException.clear();
       success = false;
+      streamerClosed = false;
 
       DatanodeInfo[] excluded = getExcludedNodes();
       lb = locateFollowingBlock(
@@ -1649,26 +1666,33 @@
       bytesSent = 0;
       accessToken = lb.getBlockToken();
       nodes = lb.getLocations();
-      storageTypes = lb.getStorageTypes();
-
-      // Connect to first DataNode in the list.
-      success = createBlockOutputStream(nodes, storageTypes, 0L, false);
-
+      nextStorageTypes = lb.getStorageTypes();
+      setPipeline(lb);
+      try {
+        // Connect to first DataNode in the list.
+        success = createBlockOutputStream(nodes, nextStorageTypes, 0L, false)
+            || setupPipelineForAppendOrRecovery();
+      } catch(IOException ie) {
+        LOG.warn("Exception in setupPipelineForCreate " + this, ie);
+        success = false;
+      }
       if (!success) {
         LOG.warn("Abandoning " + block);
         dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
             stat.getFileId(), src, dfsClient.clientName);
         block.setCurrentBlock(null);
-        final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
+        final DatanodeInfo badNode = errorState.getBadNodeIndex() == -1
+              ? Iterables.getLast(failed)
+              : nodes[errorState.getBadNodeIndex()];
         LOG.warn("Excluding datanode " + badNode);
         excludedNodes.put(badNode, badNode);
+        setPipeline(null, null, null);
       }
     } while (!success && --count >= 0);
 
     if (!success) {
       throw new IOException("Unable to create new block.");
     }
-    return lb;
   }
 
   // connects to the first datanode in the pipeline
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 7f381b1..9f98aea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -212,7 +212,10 @@
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
-          replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist);
+          replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist, newGs);
+          if (newGs != 0L) {
+            block.setGenerationStamp(newGs);
+          }
           datanode.notifyNamenodeReceivingBlock(
               block, replicaHandler.getReplica().getStorageUuid());
           break;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index d29d772..d23405b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -334,6 +334,16 @@
       ExtendedBlock b, boolean allowLazyPersist) throws IOException;
 
   /**
+   * Creates a RBW replica and returns the meta info of the replica
+   *
+   * @param b block
+   * @return the meta info of the replica which is being written to
+   * @throws IOException if an error occurs
+   */
+  ReplicaHandler createRbw(StorageType storageType,
+      ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException;
+
+  /**
    * Recovers a RBW replica and returns the meta info of the replica.
    *
    * @param b block
@@ -466,7 +476,7 @@
   boolean isValidRbw(ExtendedBlock b);
 
   /**
-   * Invalidates the specified blocks
+   * Invalidates the specified blocks.
    * @param bpid Block pool Id
    * @param invalidBlks - the blocks to be invalidated
    * @throws IOException
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 81b0d67..1764c38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1472,13 +1472,29 @@
   public ReplicaHandler createRbw(
       StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
       throws IOException {
+    return createRbw(storageType, b, allowLazyPersist, 0L);
+  }
+
+  @Override // FsDatasetSpi
+  public ReplicaHandler createRbw(
+      StorageType storageType, ExtendedBlock b,
+      boolean allowLazyPersist, long newGS) throws IOException {
     try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getBlockId());
       if (replicaInfo != null) {
-        throw new ReplicaAlreadyExistsException("Block " + b +
-            " already exists in state " + replicaInfo.getState() +
-            " and thus cannot be created.");
+        // In case of retries with same blockPoolId + blockId as before
+        // with updated GS, cleanup the old replica to avoid
+        // any multiple copies with same blockPoolId + blockId
+        if (newGS != 0L) {
+          cleanupReplica(replicaInfo, replicaInfo.getBlockFile(), replicaInfo.getMetaFile(),
+              replicaInfo.getBlockFile().length(), replicaInfo.getMetaFile().length(),
+              b.getBlockPoolId());
+        } else {
+          throw new ReplicaAlreadyExistsException("Block " + b +
+              " already exists in state " + replicaInfo.getState() +
+              " and thus cannot be created.");
+        }
       }
       // create a new block
       FsVolumeReference ref = null;
@@ -3198,6 +3214,14 @@
         newReplicaInfo.isOnTransientStorage());
 
     // Remove the old replicas
+    cleanupReplica(replicaInfo, blockFile, metaFile, blockFileUsed, metaFileUsed, bpid);
+
+    // If deletion failed then the directory scanner will cleanup the blocks
+    // eventually.
+  }
+
+  private void cleanupReplica(ReplicaInfo replicaInfo, File blockFile, File metaFile,
+      long blockFileUsed, long metaFileUsed, final String bpid) {
     if (blockFile.delete() || !blockFile.exists()) {
       FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume();
       volume.onBlockFileDeletion(bpid, blockFileUsed);
@@ -3205,9 +3229,6 @@
         volume.onMetaFileDeletion(bpid, metaFileUsed);
       }
     }
-
-    // If deletion failed then the directory scanner will cleanup the blocks
-    // eventually.
   }
 
   class LazyWriter implements Runnable {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index e98733c..633d3da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -54,7 +54,6 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
@@ -89,6 +88,8 @@
 import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -1707,4 +1708,153 @@
       assertEquals("Number of SSD should be 1 but was : " + numSSD, 1, numSSD);
     }
   }
+
+  @Test
+  public void testSingleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception {
+    Configuration conf = getTestConfiguration();
+    conf.setClass(
+        DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyRackFaultTolerant.class,
+        BlockPlacementPolicy.class);
+    conf.setBoolean(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+        false);
+    conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+        MIN_REPLICATION, 2);
+    // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6)
+        .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) {
+      cluster.waitClusterUp();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // kill all the DNs in the 3rd rack, so only 2 racks stays with 1 active DN each
+      cluster.stopDataNode(5);
+      cluster.stopDataNode(4);
+      cluster.stopDataNode(3);
+      cluster.stopDataNode(2);
+
+      // create a file with replication 3, for rack fault tolerant BPP,
+      DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L);
+    }
+  }
+
+  @Test
+  public void testSingleRackFailureDuringPipelineSetupMinReplicationImpossible()
+      throws Exception {
+    Configuration conf = getTestConfiguration();
+    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyRackFaultTolerant.class, BlockPlacementPolicy.class);
+    conf.setBoolean(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, false);
+    conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION, 3);
+    // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6)
+        .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) {
+      cluster.waitClusterUp();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // kill all the DNs in the 3rd rack, so only 2 racks stays with 1 active DN each
+      cluster.stopDataNode(5);
+      cluster.stopDataNode(4);
+      cluster.stopDataNode(3);
+      cluster.stopDataNode(2);
+      boolean threw = false;
+      try {
+        DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L);
+      } catch (IOException e) {
+        threw = true;
+      }
+      assertTrue(threw);
+    }
+  }
+
+  @Test
+  public void testMultipleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception {
+    Configuration conf = getTestConfiguration();
+    conf.setClass(
+        DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyRackFaultTolerant.class,
+        BlockPlacementPolicy.class);
+    conf.setBoolean(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+        false);
+    conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+        MIN_REPLICATION, 1);
+    // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6)
+        .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) {
+      cluster.waitClusterUp();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // kill all DNs except 1, so only rack1 stays with 1 active DN
+      cluster.stopDataNode(5);
+      cluster.stopDataNode(4);
+      cluster.stopDataNode(3);
+      cluster.stopDataNode(2);
+      cluster.stopDataNode(1);
+      // create a file with replication 3, for rack fault tolerant BPP,
+      // it should allocate nodes in all 3 racks.
+      DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L);
+    }
+  }
+
+  @Test
+  public void testMultipleRackFailureDuringPipelineSetupMinReplicationImpossible()
+      throws Exception {
+    Configuration conf = getTestConfiguration();
+    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyRackFaultTolerant.class,
+        BlockPlacementPolicy.class);
+    conf.setBoolean(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+        false);
+    conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+        MIN_REPLICATION, 2);
+    // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6)
+        .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) {
+      cluster.waitClusterUp();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // kill all DNs except 1, so only rack1 stays with 1 active DN
+      cluster.stopDataNode(5);
+      cluster.stopDataNode(4);
+      cluster.stopDataNode(3);
+      cluster.stopDataNode(2);
+      cluster.stopDataNode(1);
+      boolean threw = false;
+      try {
+        DFSTestUtil.createFile(fs, new Path("/testFile"),
+            1024L, (short) 3, 1024L);
+      } catch (IOException e) {
+        threw = true;
+      }
+      assertTrue(threw);
+    }
+  }
+
+  @Test
+  public void testAllRackFailureDuringPipelineSetup() throws Exception {
+    Configuration conf = getTestConfiguration();
+    conf.setClass(
+        DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyRackFaultTolerant.class,
+        BlockPlacementPolicy.class);
+    conf.setBoolean(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+        false);
+    // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6)
+        .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) {
+      cluster.waitClusterUp();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // shutdown all DNs
+      cluster.shutdownDataNodes();
+      // create a file with replication 3, for rack fault tolerant BPP,
+      // it should allocate nodes in all 3 rack but fail because no DNs are present.
+      boolean threw = false;
+      try {
+        DFSTestUtil.createFile(fs, new Path("/testFile"),
+            1024L, (short) 3, 1024L);
+      } catch (IOException e) {
+        threw = true;
+      }
+      assertTrue(threw);
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index b4526fb..4a054c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1103,6 +1103,12 @@
     return createTemporary(storageType, b, false);
   }
 
+  @Override
+  public ReplicaHandler createRbw(StorageType storageType,
+      ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException {
+    return createRbw(storageType, b, allowLazyPersist);
+  }
+
   @Override // FsDatasetSpi
   public synchronized ReplicaHandler createTemporary(StorageType storageType,
       ExtendedBlock b, boolean isTransfer) throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 1f03f50..2486d1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -149,6 +149,12 @@
   }
 
   @Override
+  public ReplicaHandler createRbw(StorageType storageType,
+      ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException {
+    return createRbw(storageType, b, allowLazyPersist);
+  }
+
+  @Override
   public ReplicaHandler recoverRbw(ExtendedBlock b, long newGS,
       long minBytesRcvd, long maxBytesRcvd) throws IOException {
     return new ReplicaHandler(new ExternalReplicaInPipeline(), null);