blob: f6b168812001a7082c631ca5fa7c5f36428f3691 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Assert;
import org.junit.Test;
/** Test if FSDataset#append, writeToRbw, and writeToTmp */
public class TestWriteToReplica {
final private static int FINALIZED = 0;
final private static int TEMPORARY = 1;
final private static int RBW = 2;
final private static int RWR = 3;
final private static int RUR = 4;
final private static int NON_EXISTENT = 5;
// test close
@Test
public void testClose() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration(),
new File(GenericTestUtils.getRandomizedTempPath())).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
// test close
testClose(dataSet, blocks);
} finally {
cluster.shutdown();
}
}
// test append
@Test
public void testAppend() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration(),
new File(GenericTestUtils.getRandomizedTempPath())).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
// test append
testAppend(bpid, dataSet, blocks);
} finally {
cluster.shutdown();
}
}
// test writeToRbw
@Test
public void testWriteToRbw() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration(),
new File(GenericTestUtils.getRandomizedTempPath())).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
// test writeToRbw
testWriteToRbw(dataSet, blocks);
} finally {
cluster.shutdown();
}
}
// test writeToTemporary
@Test
public void testWriteToTemporary() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration(),
new File(GenericTestUtils.getRandomizedTempPath())).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
// test writeToTemporary
testWriteToTemporary(dataSet, blocks);
} finally {
cluster.shutdown();
}
}
/**
* Generate testing environment and return a collection of blocks
* on which to run the tests.
*
* @param bpid Block pool ID to generate blocks for
* @param testUtils FsDatasetTestUtils provides white box access to FsDataset.
* @return Contrived blocks for further testing.
* @throws IOException
*/
private ExtendedBlock[] setup(String bpid, FsDatasetTestUtils testUtils)
throws IOException {
// setup replicas map
ExtendedBlock[] blocks = new ExtendedBlock[] {
new ExtendedBlock(bpid, 1, 1, 2001), new ExtendedBlock(bpid, 2, 1, 2002),
new ExtendedBlock(bpid, 3, 2, 2003), new ExtendedBlock(bpid, 4, 1, 2004),
new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
};
testUtils.createFinalizedReplica(blocks[FINALIZED]);
testUtils.createReplicaInPipeline(blocks[TEMPORARY]);
testUtils.createRBW(blocks[RBW]);
testUtils.createReplicaWaitingToBeRecovered(blocks[RWR]);
testUtils.createReplicaUnderRecovery(blocks[RUR], 2007);
return blocks;
}
private void testAppend(String bpid, FsDatasetSpi<?> dataSet,
ExtendedBlock[] blocks) throws IOException {
long newGS = blocks[FINALIZED].getGenerationStamp()+1;
final FsVolumeSpi v = dataSet.getVolume(blocks[FINALIZED]);
if (v instanceof FsVolumeImpl) {
FsVolumeImpl fvi = (FsVolumeImpl) v;
long available = fvi.getCapacity() - fvi.getDfsUsed();
long expectedLen = blocks[FINALIZED].getNumBytes();
try {
fvi.onBlockFileDeletion(bpid, -available);
blocks[FINALIZED].setNumBytes(expectedLen + 100);
dataSet.append(blocks[FINALIZED], newGS, expectedLen);
Assert.fail("Should not have space to append to an RWR replica" + blocks[RWR]);
} catch (DiskOutOfSpaceException e) {
Assert.assertTrue(e.getMessage().startsWith(
"Insufficient space for appending to "));
}
fvi.onBlockFileDeletion(bpid, available);
blocks[FINALIZED].setNumBytes(expectedLen);
}
newGS = blocks[RBW].getGenerationStamp()+1;
dataSet.append(blocks[FINALIZED], newGS,
blocks[FINALIZED].getNumBytes()); // successful
blocks[FINALIZED].setGenerationStamp(newGS);
try {
dataSet.append(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1,
blocks[TEMPORARY].getNumBytes());
Assert.fail("Should not have appended to a temporary replica "
+ blocks[TEMPORARY]);
} catch (ReplicaNotFoundException e) {
Assert.assertEquals(ReplicaNotFoundException.UNFINALIZED_REPLICA +
blocks[TEMPORARY], e.getMessage());
}
try {
dataSet.append(blocks[RBW], blocks[RBW].getGenerationStamp()+1,
blocks[RBW].getNumBytes());
Assert.fail("Should not have appended to an RBW replica" + blocks[RBW]);
} catch (ReplicaNotFoundException e) {
Assert.assertEquals(ReplicaNotFoundException.UNFINALIZED_REPLICA +
blocks[RBW], e.getMessage());
}
try {
dataSet.append(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
blocks[RBW].getNumBytes());
Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]);
} catch (ReplicaNotFoundException e) {
Assert.assertEquals(ReplicaNotFoundException.UNFINALIZED_REPLICA +
blocks[RWR], e.getMessage());
}
try {
dataSet.append(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
blocks[RUR].getNumBytes());
Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]);
} catch (ReplicaNotFoundException e) {
Assert.assertEquals(ReplicaNotFoundException.UNFINALIZED_REPLICA +
blocks[RUR], e.getMessage());
}
try {
dataSet.append(blocks[NON_EXISTENT],
blocks[NON_EXISTENT].getGenerationStamp(),
blocks[NON_EXISTENT].getNumBytes());
Assert.fail("Should not have appended to a non-existent replica " +
blocks[NON_EXISTENT]);
} catch (ReplicaNotFoundException e) {
Assert.assertEquals(ReplicaNotFoundException.NON_EXISTENT_REPLICA +
blocks[NON_EXISTENT], e.getMessage());
}
newGS = blocks[FINALIZED].getGenerationStamp()+1;
dataSet.recoverAppend(blocks[FINALIZED], newGS,
blocks[FINALIZED].getNumBytes()); // successful
blocks[FINALIZED].setGenerationStamp(newGS);
try {
dataSet.recoverAppend(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1,
blocks[TEMPORARY].getNumBytes());
Assert.fail("Should not have appended to a temporary replica "
+ blocks[TEMPORARY]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
}
newGS = blocks[RBW].getGenerationStamp()+1;
dataSet.recoverAppend(blocks[RBW], newGS, blocks[RBW].getNumBytes());
blocks[RBW].setGenerationStamp(newGS);
try {
dataSet.recoverAppend(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
blocks[RBW].getNumBytes());
Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
}
try {
dataSet.recoverAppend(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
blocks[RUR].getNumBytes());
Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
}
try {
dataSet.recoverAppend(blocks[NON_EXISTENT],
blocks[NON_EXISTENT].getGenerationStamp(),
blocks[NON_EXISTENT].getNumBytes());
Assert.fail("Should not have appended to a non-existent replica " +
blocks[NON_EXISTENT]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.NON_EXISTENT_REPLICA));
}
}
private void testClose(FsDatasetSpi<?> dataSet, ExtendedBlock [] blocks) throws IOException {
long newGS = blocks[FINALIZED].getGenerationStamp()+1;
dataSet.recoverClose(blocks[FINALIZED], newGS,
blocks[FINALIZED].getNumBytes()); // successful
blocks[FINALIZED].setGenerationStamp(newGS);
try {
dataSet.recoverClose(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1,
blocks[TEMPORARY].getNumBytes());
Assert.fail("Should not have recovered close a temporary replica "
+ blocks[TEMPORARY]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
}
newGS = blocks[RBW].getGenerationStamp()+1;
dataSet.recoverClose(blocks[RBW], newGS, blocks[RBW].getNumBytes());
blocks[RBW].setGenerationStamp(newGS);
try {
dataSet.recoverClose(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
blocks[RBW].getNumBytes());
Assert.fail("Should not have recovered close an RWR replica" + blocks[RWR]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
}
try {
dataSet.recoverClose(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
blocks[RUR].getNumBytes());
Assert.fail("Should not have recovered close an RUR replica" + blocks[RUR]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
}
try {
dataSet.recoverClose(blocks[NON_EXISTENT],
blocks[NON_EXISTENT].getGenerationStamp(),
blocks[NON_EXISTENT].getNumBytes());
Assert.fail("Should not have recovered close a non-existent replica " +
blocks[NON_EXISTENT]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.NON_EXISTENT_REPLICA));
}
}
private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
try {
dataSet.recoverRbw(blocks[FINALIZED],
blocks[FINALIZED].getGenerationStamp()+1,
0L, blocks[FINALIZED].getNumBytes());
Assert.fail("Should not have recovered a finalized replica " +
blocks[FINALIZED]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.NON_RBW_REPLICA));
}
try {
dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false);
Assert.fail("Should not have created a replica that's already " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.recoverRbw(blocks[TEMPORARY],
blocks[TEMPORARY].getGenerationStamp()+1,
0L, blocks[TEMPORARY].getNumBytes());
Assert.fail("Should not have recovered a temporary replica " +
blocks[TEMPORARY]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.NON_RBW_REPLICA));
}
try {
dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false);
Assert.fail("Should not have created a replica that had created as " +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
}
dataSet.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp()+1,
0L, blocks[RBW].getNumBytes()); // expect to be successful
try {
dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.recoverRbw(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
0L, blocks[RWR].getNumBytes());
Assert.fail("Should not have recovered a RWR replica " + blocks[RWR]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.NON_RBW_REPLICA));
}
try {
dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.recoverRbw(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
0L, blocks[RUR].getNumBytes());
Assert.fail("Should not have recovered a RUR replica " + blocks[RUR]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.NON_RBW_REPLICA));
}
try {
dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.recoverRbw(blocks[NON_EXISTENT],
blocks[NON_EXISTENT].getGenerationStamp()+1,
0L, blocks[NON_EXISTENT].getNumBytes());
Assert.fail("Cannot recover a non-existent replica " +
blocks[NON_EXISTENT]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(
e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
}
dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
}
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED], false);
Assert.fail("Should not have created a temporary replica that was " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY], false);
Assert.fail("Should not have created a replica that had created as" +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW], false);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR], false);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR], false);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
}
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
Assert.fail("Should not have created a replica that had already been "
+ "created " + blocks[NON_EXISTENT]);
} catch (Exception e) {
Assert.assertTrue(
e.getMessage().contains(blocks[NON_EXISTENT].getBlockName()));
Assert.assertTrue(e instanceof ReplicaAlreadyExistsException);
}
long newGenStamp = blocks[NON_EXISTENT].getGenerationStamp() * 10;
blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
try {
ReplicaInPipelineInterface replicaInfo =
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT],
false).getReplica();
Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
Assert.assertTrue(
replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());
} catch (ReplicaAlreadyExistsException e) {
Assert.fail("createTemporary should have allowed the block with newer "
+ " generation stamp to be created " + blocks[NON_EXISTENT]);
}
}
/**
* This is a test to check the replica map before and after the datanode
* quick restart (less than 5 minutes)
* @throws Exception
*/
@Test
public void testReplicaMapAfterDatanodeRestart() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf,
new File(GenericTestUtils.getRandomizedTempPath()))
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.build();
try {
cluster.waitActive();
NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
assertNotNull("cannot create nn1", nn1);
assertNotNull("cannot create nn2", nn2);
// check number of volumes in fsdataset
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.
getFSDataset(dn);
List<FsVolumeSpi> volumes = null;
try (FsDatasetSpi.FsVolumeReferences referredVols = dataSet.getFsVolumeReferences()) {
// number of volumes should be 2 - [data1, data2]
assertEquals("number of volumes is wrong", 2, referredVols.size());
volumes = new ArrayList<>(referredVols.size());
for (FsVolumeSpi vol : referredVols) {
volumes.add(vol);
}
}
ArrayList<String> bpList = new ArrayList<>(Arrays.asList(
cluster.getNamesystem(0).getBlockPoolId(),
cluster.getNamesystem(1).getBlockPoolId()));
Assert.assertTrue("Cluster should have 2 block pools",
bpList.size() == 2);
createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock());
oldReplicaMap.addAll(dataSet.volumeMap);
cluster.restartDataNode(0);
cluster.waitActive();
dn = cluster.getDataNodes().get(0);
dataSet = (FsDatasetImpl) dn.getFSDataset();
testEqualityOfReplicaMap(oldReplicaMap, dataSet.volumeMap, bpList);
} finally {
cluster.shutdown();
}
}
/**
* Test that we can successfully recover a {@link ReplicaBeingWritten}
* which has inconsistent metadata (bytes were written to disk but bytesOnDisk
* was not updated) but that recovery fails when the block is actually
* corrupt (bytes are not present on disk).
*/
@Test
public void testRecoverInconsistentRbw() throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf,
new File(GenericTestUtils.getRandomizedTempPath())).build();
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl fsDataset = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
ReplicaBeingWritten rbw = (ReplicaBeingWritten)fsDataset.
getReplicaInfo(blocks[RBW]);
long bytesOnDisk = rbw.getBytesOnDisk();
// simulate an inconsistent replica length update by reducing in-memory
// value of on disk length
rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null);
fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L,
rbw.getNumBytes());
// after the recovery, on disk length should equal acknowledged length.
Assert.assertTrue(rbw.getBytesOnDisk() == rbw.getBytesAcked());
// reduce on disk length again; this time actually truncate the file to
// simulate the data not being present
rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null);
try (RandomAccessFile blockRAF =
new RandomAccessFile(rbw.getBlockFile(), "rw")) {
// truncate blockFile
blockRAF.setLength(bytesOnDisk - 1);
fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L,
rbw.getNumBytes());
Assert.fail("recovery should have failed");
} catch (ReplicaNotFoundException rnfe) {
GenericTestUtils.assertExceptionContains("Found fewer bytesOnDisk than " +
"bytesAcked for replica", rnfe);
}
}
/**
* Compare the replica map before and after the restart
**/
private void testEqualityOfReplicaMap(ReplicaMap oldReplicaMap, ReplicaMap
newReplicaMap, List<String> bpidList) {
// Traversing through newReplica map and remove the corresponding
// replicaInfo from oldReplicaMap.
for (String bpid: bpidList) {
for (ReplicaInfo info: newReplicaMap.replicas(bpid)) {
assertNotNull("Volume map before restart didn't contain the "
+ "blockpool: " + bpid, oldReplicaMap.replicas(bpid));
ReplicaInfo oldReplicaInfo = oldReplicaMap.get(bpid,
info.getBlockId());
// Volume map after restart contains a blockpool id which
assertNotNull("Old Replica Map didnt't contain block with blockId: " +
info.getBlockId(), oldReplicaInfo);
ReplicaState oldState = oldReplicaInfo.getState();
// Since after restart, all the RWR, RBW and RUR blocks gets
// converted to RWR
if (info.getState() == ReplicaState.RWR) {
if (oldState == ReplicaState.RWR || oldState == ReplicaState.RBW
|| oldState == ReplicaState.RUR) {
oldReplicaMap.remove(bpid, oldReplicaInfo);
}
} else if (info.getState() == ReplicaState.FINALIZED &&
oldState == ReplicaState.FINALIZED) {
oldReplicaMap.remove(bpid, oldReplicaInfo);
}
}
}
// We don't persist the ReplicaInPipeline replica
// and if the old replica map contains any replica except ReplicaInPipeline
// then we didn't persist that replica
for (String bpid: bpidList) {
for (ReplicaInfo replicaInfo: oldReplicaMap.replicas(bpid)) {
if (replicaInfo.getState() != ReplicaState.TEMPORARY) {
Assert.fail("After datanode restart we lost the block with blockId: "
+ replicaInfo.getBlockId());
}
}
}
}
private void createReplicas(List<String> bpList, List<FsVolumeSpi> volumes,
FsDatasetTestUtils testUtils) throws IOException {
// Here we create all different type of replicas and add it
// to volume map.
// Created all type of ReplicaInfo, each under Blkpool corresponding volume
long id = 1; // This variable is used as both blockId and genStamp
for (String bpId: bpList) {
for (FsVolumeSpi volume: volumes) {
ExtendedBlock eb = new ExtendedBlock(bpId, id, 1, id);
testUtils.createFinalizedReplica(volume, eb);
id++;
eb = new ExtendedBlock(bpId, id, 1, id);
testUtils.createRBW(volume, eb);
id++;
eb = new ExtendedBlock(bpId, id, 1, id);
testUtils.createReplicaWaitingToBeRecovered(volume, eb);
id++;
eb = new ExtendedBlock(bpId, id, 1, id);
testUtils.createReplicaInPipeline(volume, eb);
id++;
}
}
}
}