HDFS-17299. Adding rack failure tolerance when creating a new file (#6614)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index fb7568d..a02444f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -94,6 +94,7 @@
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -706,8 +707,8 @@
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
- LOG.debug("Allocating new block");
- setPipeline(nextBlockOutputStream());
+ LOG.debug("Allocating new block: {}", this);
+ setupPipelineForCreate();
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
LOG.debug("Append to block {}", block);
@@ -1241,7 +1242,8 @@
streamerClosed = true;
return false;
}
- boolean doSleep = setupPipelineForAppendOrRecovery();
+
+ setupPipelineForAppendOrRecovery();
if (!streamerClosed && dfsClient.clientRunning) {
if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
@@ -1275,7 +1277,7 @@
}
}
- return doSleep;
+ return false;
}
void setHflush() {
@@ -1449,9 +1451,11 @@
* it can be written to.
* This happens when a file is appended or data streaming fails
* It keeps on trying until a pipeline is setup
+ *
+ * Returns boolean whether pipeline was setup successfully or not.
+ * This boolean is used upstream on whether to continue creating pipeline or throw exception
*/
private boolean setupPipelineForAppendOrRecovery() throws IOException {
- // check number of datanodes
if (nodes == null || nodes.length == 0) {
String msg = "Could not get block locations. " + "Source file \""
+ src + "\" - Aborting...";
@@ -1463,23 +1467,35 @@
boolean success = false;
long newGS = 0L;
+ boolean isCreateStage = BlockConstructionStage.PIPELINE_SETUP_CREATE == stage;
while (!success && !streamerClosed && dfsClient.clientRunning) {
if (!handleRestartingDatanode()) {
return false;
}
- final boolean isRecovery = errorState.hasError();
+ final boolean isRecovery = errorState.hasError() && !isCreateStage;
+
if (!handleBadDatanode()) {
return false;
}
handleDatanodeReplacement();
+ // During create stage, min replication should still be satisfied.
+ if (isCreateStage && !(dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 &&
+ nodes.length >= dfsClient.dtpReplaceDatanodeOnFailureReplication)) {
+ return false;
+ }
+
// get a new generation stamp and an access token
final LocatedBlock lb = updateBlockForPipeline();
newGS = lb.getBlock().getGenerationStamp();
accessToken = lb.getBlockToken();
+ if (isCreateStage) {
+ block.setCurrentBlock(lb.getBlock());
+ }
+
// set up the pipeline again with the remaining nodes
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
@@ -1491,7 +1507,7 @@
if (success) {
updatePipeline(newGS);
}
- return false; // do not sleep, continue processing
+ return success;
}
/**
@@ -1629,10 +1645,10 @@
* Must get block ID and the IDs of the destinations from the namenode.
* Returns the list of target datanodes.
*/
- protected LocatedBlock nextBlockOutputStream() throws IOException {
+ protected void setupPipelineForCreate() throws IOException {
LocatedBlock lb;
DatanodeInfo[] nodes;
- StorageType[] storageTypes;
+ StorageType[] nextStorageTypes;
int count = dfsClient.getConf().getNumBlockWriteRetry();
boolean success;
final ExtendedBlock oldBlock = block.getCurrentBlock();
@@ -1640,6 +1656,7 @@
errorState.reset();
lastException.clear();
success = false;
+ streamerClosed = false;
DatanodeInfo[] excluded = getExcludedNodes();
lb = locateFollowingBlock(
@@ -1649,26 +1666,33 @@
bytesSent = 0;
accessToken = lb.getBlockToken();
nodes = lb.getLocations();
- storageTypes = lb.getStorageTypes();
-
- // Connect to first DataNode in the list.
- success = createBlockOutputStream(nodes, storageTypes, 0L, false);
-
+ nextStorageTypes = lb.getStorageTypes();
+ setPipeline(lb);
+ try {
+ // Connect to first DataNode in the list.
+ success = createBlockOutputStream(nodes, nextStorageTypes, 0L, false)
+ || setupPipelineForAppendOrRecovery();
+ } catch(IOException ie) {
+ LOG.warn("Exception in setupPipelineForCreate " + this, ie);
+ success = false;
+ }
if (!success) {
LOG.warn("Abandoning " + block);
dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
stat.getFileId(), src, dfsClient.clientName);
block.setCurrentBlock(null);
- final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
+ final DatanodeInfo badNode = errorState.getBadNodeIndex() == -1
+ ? Iterables.getLast(failed)
+ : nodes[errorState.getBadNodeIndex()];
LOG.warn("Excluding datanode " + badNode);
excludedNodes.put(badNode, badNode);
+ setPipeline(null, null, null);
}
} while (!success && --count >= 0);
if (!success) {
throw new IOException("Unable to create new block.");
}
- return lb;
}
// connects to the first datanode in the pipeline
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 7f381b1..9f98aea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -212,7 +212,10 @@
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
- replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist);
+ replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist, newGs);
+ if (newGs != 0L) {
+ block.setGenerationStamp(newGs);
+ }
datanode.notifyNamenodeReceivingBlock(
block, replicaHandler.getReplica().getStorageUuid());
break;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index d29d772..d23405b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -334,6 +334,16 @@
ExtendedBlock b, boolean allowLazyPersist) throws IOException;
/**
+ * Creates a RBW replica and returns the meta info of the replica
+ *
+ * @param b block
+ * @return the meta info of the replica which is being written to
+ * @throws IOException if an error occurs
+ */
+ ReplicaHandler createRbw(StorageType storageType,
+ ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException;
+
+ /**
* Recovers a RBW replica and returns the meta info of the replica.
*
* @param b block
@@ -466,7 +476,7 @@
boolean isValidRbw(ExtendedBlock b);
/**
- * Invalidates the specified blocks
+ * Invalidates the specified blocks.
* @param bpid Block pool Id
* @param invalidBlks - the blocks to be invalidated
* @throws IOException
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 81b0d67..1764c38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1472,13 +1472,29 @@
public ReplicaHandler createRbw(
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
throws IOException {
+ return createRbw(storageType, b, allowLazyPersist, 0L);
+ }
+
+ @Override // FsDatasetSpi
+ public ReplicaHandler createRbw(
+ StorageType storageType, ExtendedBlock b,
+ boolean allowLazyPersist, long newGS) throws IOException {
try(AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
- throw new ReplicaAlreadyExistsException("Block " + b +
- " already exists in state " + replicaInfo.getState() +
- " and thus cannot be created.");
+ // In case of retries with same blockPoolId + blockId as before
+ // with updated GS, cleanup the old replica to avoid
+ // any multiple copies with same blockPoolId + blockId
+ if (newGS != 0L) {
+ cleanupReplica(replicaInfo, replicaInfo.getBlockFile(), replicaInfo.getMetaFile(),
+ replicaInfo.getBlockFile().length(), replicaInfo.getMetaFile().length(),
+ b.getBlockPoolId());
+ } else {
+ throw new ReplicaAlreadyExistsException("Block " + b +
+ " already exists in state " + replicaInfo.getState() +
+ " and thus cannot be created.");
+ }
}
// create a new block
FsVolumeReference ref = null;
@@ -3198,6 +3214,14 @@
newReplicaInfo.isOnTransientStorage());
// Remove the old replicas
+ cleanupReplica(replicaInfo, blockFile, metaFile, blockFileUsed, metaFileUsed, bpid);
+
+ // If deletion failed then the directory scanner will cleanup the blocks
+ // eventually.
+ }
+
+ private void cleanupReplica(ReplicaInfo replicaInfo, File blockFile, File metaFile,
+ long blockFileUsed, long metaFileUsed, final String bpid) {
if (blockFile.delete() || !blockFile.exists()) {
FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume();
volume.onBlockFileDeletion(bpid, blockFileUsed);
@@ -3205,9 +3229,6 @@
volume.onMetaFileDeletion(bpid, metaFileUsed);
}
}
-
- // If deletion failed then the directory scanner will cleanup the blocks
- // eventually.
}
class LazyWriter implements Runnable {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index e98733c..633d3da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -54,7 +54,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
@@ -89,6 +88,8 @@
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -1707,4 +1708,153 @@
assertEquals("Number of SSD should be 1 but was : " + numSSD, 1, numSSD);
}
}
+
+ @Test
+ public void testSingleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception {
+ Configuration conf = getTestConfiguration();
+ conf.setClass(
+ DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+ BlockPlacementPolicyRackFaultTolerant.class,
+ BlockPlacementPolicy.class);
+ conf.setBoolean(
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+ false);
+ conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+ MIN_REPLICATION, 2);
+ // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6)
+ .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) {
+ cluster.waitClusterUp();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ // kill all the DNs in the 3rd rack, so only 2 racks stays with 1 active DN each
+ cluster.stopDataNode(5);
+ cluster.stopDataNode(4);
+ cluster.stopDataNode(3);
+ cluster.stopDataNode(2);
+
+ // create a file with replication 3, for rack fault tolerant BPP,
+ DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L);
+ }
+ }
+
+ @Test
+ public void testSingleRackFailureDuringPipelineSetupMinReplicationImpossible()
+ throws Exception {
+ Configuration conf = getTestConfiguration();
+ conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+ BlockPlacementPolicyRackFaultTolerant.class, BlockPlacementPolicy.class);
+ conf.setBoolean(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, false);
+ conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION, 3);
+ // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6)
+ .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) {
+ cluster.waitClusterUp();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ // kill all the DNs in the 3rd rack, so only 2 racks stays with 1 active DN each
+ cluster.stopDataNode(5);
+ cluster.stopDataNode(4);
+ cluster.stopDataNode(3);
+ cluster.stopDataNode(2);
+ boolean threw = false;
+ try {
+ DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L);
+ } catch (IOException e) {
+ threw = true;
+ }
+ assertTrue(threw);
+ }
+ }
+
+ @Test
+ public void testMultipleRackFailureDuringPipelineSetupMinReplicationPossible() throws Exception {
+ Configuration conf = getTestConfiguration();
+ conf.setClass(
+ DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+ BlockPlacementPolicyRackFaultTolerant.class,
+ BlockPlacementPolicy.class);
+ conf.setBoolean(
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+ false);
+ conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+ MIN_REPLICATION, 1);
+ // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6)
+ .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) {
+ cluster.waitClusterUp();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ // kill all DNs except 1, so only rack1 stays with 1 active DN
+ cluster.stopDataNode(5);
+ cluster.stopDataNode(4);
+ cluster.stopDataNode(3);
+ cluster.stopDataNode(2);
+ cluster.stopDataNode(1);
+ // create a file with replication 3, for rack fault tolerant BPP,
+ // it should allocate nodes in all 3 racks.
+ DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 1024L);
+ }
+ }
+
+ @Test
+ public void testMultipleRackFailureDuringPipelineSetupMinReplicationImpossible()
+ throws Exception {
+ Configuration conf = getTestConfiguration();
+ conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+ BlockPlacementPolicyRackFaultTolerant.class,
+ BlockPlacementPolicy.class);
+ conf.setBoolean(
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+ false);
+ conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+ MIN_REPLICATION, 2);
+ // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6)
+ .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) {
+ cluster.waitClusterUp();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ // kill all DNs except 1, so only rack1 stays with 1 active DN
+ cluster.stopDataNode(5);
+ cluster.stopDataNode(4);
+ cluster.stopDataNode(3);
+ cluster.stopDataNode(2);
+ cluster.stopDataNode(1);
+ boolean threw = false;
+ try {
+ DFSTestUtil.createFile(fs, new Path("/testFile"),
+ 1024L, (short) 3, 1024L);
+ } catch (IOException e) {
+ threw = true;
+ }
+ assertTrue(threw);
+ }
+ }
+
+ @Test
+ public void testAllRackFailureDuringPipelineSetup() throws Exception {
+ Configuration conf = getTestConfiguration();
+ conf.setClass(
+ DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+ BlockPlacementPolicyRackFaultTolerant.class,
+ BlockPlacementPolicy.class);
+ conf.setBoolean(
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+ false);
+ // 3 racks & 6 nodes. 1 per rack for 2 racks and 4 nodes in the 3rd rack
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6)
+ .racks(new String[] {"/rack1", "/rack2", "/rack3", "/rack3", "/rack3", "/rack3"}).build()) {
+ cluster.waitClusterUp();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ // shutdown all DNs
+ cluster.shutdownDataNodes();
+ // create a file with replication 3, for rack fault tolerant BPP,
+ // it should allocate nodes in all 3 rack but fail because no DNs are present.
+ boolean threw = false;
+ try {
+ DFSTestUtil.createFile(fs, new Path("/testFile"),
+ 1024L, (short) 3, 1024L);
+ } catch (IOException e) {
+ threw = true;
+ }
+ assertTrue(threw);
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index b4526fb..4a054c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1103,6 +1103,12 @@
return createTemporary(storageType, b, false);
}
+ @Override
+ public ReplicaHandler createRbw(StorageType storageType,
+ ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException {
+ return createRbw(storageType, b, allowLazyPersist);
+ }
+
@Override // FsDatasetSpi
public synchronized ReplicaHandler createTemporary(StorageType storageType,
ExtendedBlock b, boolean isTransfer) throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 1f03f50..2486d1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -149,6 +149,12 @@
}
@Override
+ public ReplicaHandler createRbw(StorageType storageType,
+ ExtendedBlock b, boolean allowLazyPersist, long newGS) throws IOException {
+ return createRbw(storageType, b, allowLazyPersist);
+ }
+
+ @Override
public ReplicaHandler recoverRbw(ExtendedBlock b, long newGS,
long minBytesRcvd, long maxBytesRcvd) throws IOException {
return new ReplicaHandler(new ExternalReplicaInPipeline(), null);