blob: c6b2335d31a8370be64263dda362fa4620001b8a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Assert;
import org.junit.Test;
/** Test if FSDataset#append, writeToRbw, and writeToTmp */
public class TestWriteToReplica {
final private static Block[] blocks = new Block[] {
new Block(1, 1, 2001), new Block(2, 1, 2002),
new Block(3, 1, 2003), new Block(4, 1, 2004),
new Block(5, 1, 2005), new Block(6, 1, 2006)
};
final private static int FINALIZED = 0;
final private static int TEMPORARY = 1;
final private static int RBW = 2;
final private static int RWR = 3;
final private static int RUR = 4;
final private static int NON_EXISTENT = 5;
// test close
@Test
public void testClose() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster(new HdfsConfiguration(), 1, true, null);
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FSDataset dataSet = (FSDataset)dn.data;
// set up replicasMap
setup(dataSet);
// test close
testClose(dataSet);
} finally {
cluster.shutdown();
}
}
// test append
@Test
public void testAppend() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster(new HdfsConfiguration(), 1, true, null);
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FSDataset dataSet = (FSDataset)dn.data;
// set up replicasMap
setup(dataSet);
// test append
testAppend(dataSet);
} finally {
cluster.shutdown();
}
}
// test writeToRbw
@Test
public void testWriteToRbw() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster(new HdfsConfiguration(), 1, true, null);
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FSDataset dataSet = (FSDataset)dn.data;
// set up replicasMap
setup(dataSet);
// test writeToRbw
testWriteToRbw(dataSet);
} finally {
cluster.shutdown();
}
}
// test writeToTemporary
@Test
public void testWriteToTempoary() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster(new HdfsConfiguration(), 1, true, null);
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FSDataset dataSet = (FSDataset)dn.data;
// set up replicasMap
setup(dataSet);
// test writeToTemporary
testWriteToTemporary(dataSet);
} finally {
cluster.shutdown();
}
}
private void setup(FSDataset dataSet) throws IOException {
// setup replicas map
ReplicasMap replicasMap = dataSet.volumeMap;
FSVolume vol = dataSet.volumes.getNextVolume(0);
ReplicaInfo replicaInfo = new FinalizedReplica(
blocks[FINALIZED], vol, vol.getDir());
replicasMap.add(replicaInfo);
replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
replicasMap.add(new ReplicaInPipeline(
blocks[TEMPORARY].getBlockId(),
blocks[TEMPORARY].getGenerationStamp(), vol,
vol.createTmpFile(blocks[TEMPORARY]).getParentFile()));
replicaInfo = new ReplicaBeingWritten(blocks[RBW], vol,
vol.createRbwFile(blocks[RBW]).getParentFile(), null);
replicasMap.add(replicaInfo);
replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
replicasMap.add(new ReplicaWaitingToBeRecovered(blocks[RWR], vol,
vol.createRbwFile(blocks[RWR]).getParentFile()));
replicasMap.add(new ReplicaUnderRecovery(
new FinalizedReplica(blocks[RUR], vol, vol.getDir()), 2007));
}
private void testAppend(FSDataset dataSet) throws IOException {
long newGS = blocks[FINALIZED].getGenerationStamp()+1;
FSVolume v = dataSet.volumeMap.get(blocks[FINALIZED]).getVolume();
long available = v.getCapacity()-v.getDfsUsed();
long expectedLen = blocks[FINALIZED].getNumBytes();
try {
v.decDfsUsed(-available);
blocks[FINALIZED].setNumBytes(expectedLen+100);
dataSet.append(blocks[FINALIZED], newGS, expectedLen);
Assert.fail("Should not have space to append to an RWR replica" + blocks[RWR]);
} catch (DiskOutOfSpaceException e) {
Assert.assertTrue(e.getMessage().startsWith(
"Insufficient space for appending to "));
}
v.decDfsUsed(available);
blocks[FINALIZED].setNumBytes(expectedLen);
newGS = blocks[RBW].getGenerationStamp()+1;
dataSet.append(blocks[FINALIZED], newGS,
blocks[FINALIZED].getNumBytes()); // successful
blocks[FINALIZED].setGenerationStamp(newGS);
try {
dataSet.append(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1,
blocks[TEMPORARY].getNumBytes());
Assert.fail("Should not have appended to a temporary replica "
+ blocks[TEMPORARY]);
} catch (ReplicaNotFoundException e) {
Assert.assertEquals(ReplicaNotFoundException.UNFINALIZED_REPLICA +
blocks[TEMPORARY], e.getMessage());
}
try {
dataSet.append(blocks[RBW], blocks[RBW].getGenerationStamp()+1,
blocks[RBW].getNumBytes());
Assert.fail("Should not have appended to an RBW replica" + blocks[RBW]);
} catch (ReplicaNotFoundException e) {
Assert.assertEquals(ReplicaNotFoundException.UNFINALIZED_REPLICA +
blocks[RBW], e.getMessage());
}
try {
dataSet.append(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
blocks[RBW].getNumBytes());
Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]);
} catch (ReplicaNotFoundException e) {
Assert.assertEquals(ReplicaNotFoundException.UNFINALIZED_REPLICA +
blocks[RWR], e.getMessage());
}
try {
dataSet.append(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
blocks[RUR].getNumBytes());
Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]);
} catch (ReplicaNotFoundException e) {
Assert.assertEquals(ReplicaNotFoundException.UNFINALIZED_REPLICA +
blocks[RUR], e.getMessage());
}
try {
dataSet.append(blocks[NON_EXISTENT],
blocks[NON_EXISTENT].getGenerationStamp(),
blocks[NON_EXISTENT].getNumBytes());
Assert.fail("Should not have appended to a non-existent replica " +
blocks[NON_EXISTENT]);
} catch (ReplicaNotFoundException e) {
Assert.assertEquals(ReplicaNotFoundException.NON_EXISTENT_REPLICA +
blocks[NON_EXISTENT], e.getMessage());
}
newGS = blocks[FINALIZED].getGenerationStamp()+1;
dataSet.recoverAppend(blocks[FINALIZED], newGS,
blocks[FINALIZED].getNumBytes()); // successful
blocks[FINALIZED].setGenerationStamp(newGS);
try {
dataSet.recoverAppend(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1,
blocks[TEMPORARY].getNumBytes());
Assert.fail("Should not have appended to a temporary replica "
+ blocks[TEMPORARY]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
}
newGS = blocks[RBW].getGenerationStamp()+1;
dataSet.recoverAppend(blocks[RBW], newGS, blocks[RBW].getNumBytes());
blocks[RBW].setGenerationStamp(newGS);
try {
dataSet.recoverAppend(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
blocks[RBW].getNumBytes());
Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
}
try {
dataSet.recoverAppend(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
blocks[RUR].getNumBytes());
Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
}
try {
dataSet.recoverAppend(blocks[NON_EXISTENT],
blocks[NON_EXISTENT].getGenerationStamp(),
blocks[NON_EXISTENT].getNumBytes());
Assert.fail("Should not have appended to a non-existent replica " +
blocks[NON_EXISTENT]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.NON_EXISTENT_REPLICA));
}
}
private void testClose(FSDataset dataSet) throws IOException {
long newGS = blocks[FINALIZED].getGenerationStamp()+1;
dataSet.recoverClose(blocks[FINALIZED], newGS,
blocks[FINALIZED].getNumBytes()); // successful
blocks[FINALIZED].setGenerationStamp(newGS);
try {
dataSet.recoverClose(blocks[TEMPORARY], blocks[TEMPORARY].getGenerationStamp()+1,
blocks[TEMPORARY].getNumBytes());
Assert.fail("Should not have recovered close a temporary replica "
+ blocks[TEMPORARY]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
}
newGS = blocks[RBW].getGenerationStamp()+1;
dataSet.recoverClose(blocks[RBW], newGS, blocks[RBW].getNumBytes());
blocks[RBW].setGenerationStamp(newGS);
try {
dataSet.recoverClose(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
blocks[RBW].getNumBytes());
Assert.fail("Should not have recovered close an RWR replica" + blocks[RWR]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
}
try {
dataSet.recoverClose(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
blocks[RUR].getNumBytes());
Assert.fail("Should not have recovered close an RUR replica" + blocks[RUR]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA));
}
try {
dataSet.recoverClose(blocks[NON_EXISTENT],
blocks[NON_EXISTENT].getGenerationStamp(),
blocks[NON_EXISTENT].getNumBytes());
Assert.fail("Should not have recovered close a non-existent replica " +
blocks[NON_EXISTENT]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.NON_EXISTENT_REPLICA));
}
}
private void testWriteToRbw(FSDataset dataSet) throws IOException {
try {
dataSet.recoverRbw(blocks[FINALIZED],
blocks[FINALIZED].getGenerationStamp()+1,
0L, blocks[FINALIZED].getNumBytes());
Assert.fail("Should not have recovered a finalized replica " +
blocks[FINALIZED]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.NON_RBW_REPLICA));
}
try {
dataSet.createRbw(blocks[FINALIZED]);
Assert.fail("Should not have created a replica that's already " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.recoverRbw(blocks[TEMPORARY],
blocks[TEMPORARY].getGenerationStamp()+1,
0L, blocks[TEMPORARY].getNumBytes());
Assert.fail("Should not have recovered a temporary replica " +
blocks[TEMPORARY]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.NON_RBW_REPLICA));
}
try {
dataSet.createRbw(blocks[TEMPORARY]);
Assert.fail("Should not have created a replica that had created as " +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
}
dataSet.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp()+1,
0L, blocks[RBW].getNumBytes()); // expect to be successful
try {
dataSet.createRbw(blocks[RBW]);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.recoverRbw(blocks[RWR], blocks[RWR].getGenerationStamp()+1,
0L, blocks[RWR].getNumBytes());
Assert.fail("Should not have recovered a RWR replica " + blocks[RWR]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.NON_RBW_REPLICA));
}
try {
dataSet.createRbw(blocks[RWR]);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.recoverRbw(blocks[RUR], blocks[RUR].getGenerationStamp()+1,
0L, blocks[RUR].getNumBytes());
Assert.fail("Should not have recovered a RUR replica " + blocks[RUR]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(e.getMessage().startsWith(
ReplicaNotFoundException.NON_RBW_REPLICA));
}
try {
dataSet.createRbw(blocks[RUR]);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.recoverRbw(blocks[NON_EXISTENT],
blocks[NON_EXISTENT].getGenerationStamp()+1,
0L, blocks[NON_EXISTENT].getNumBytes());
Assert.fail("Cannot recover a non-existent replica " +
blocks[NON_EXISTENT]);
} catch (ReplicaNotFoundException e) {
Assert.assertTrue(
e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
}
dataSet.createRbw(blocks[NON_EXISTENT]);
}
private void testWriteToTemporary(FSDataset dataSet) throws IOException {
try {
dataSet.createTemporary(blocks[FINALIZED]);
Assert.fail("Should not have created a temporary replica that was " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(blocks[TEMPORARY]);
Assert.fail("Should not have created a replica that had created as" +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(blocks[RBW]);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(blocks[RWR]);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(blocks[RUR]);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
}
dataSet.createTemporary(blocks[NON_EXISTENT]);
}
}