blob: 3e9adcac7e7e0ff2a7d1c3fbd310417eb9c1eeb0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
import org.apache.hadoop.test.GenericTestUtils;
import org.mockito.invocation.InvocationOnMock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.mockito.stubbing.Answer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.event.Level;
/** This class implements some of tests posted in HADOOP-2658. */
public class TestFileAppend3 {
{
DFSTestUtil.setNameNodeLogLevel(Level.TRACE);
GenericTestUtils.setLogLevel(DataNode.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(InterDatanodeProtocol.LOG, Level.TRACE);
}
static final long BLOCK_SIZE = 64 * 1024;
static final short REPLICATION = 3;
static final int DATANODE_NUM = 5;
private static Configuration conf;
private static int buffersize;
private static MiniDFSCluster cluster;
private static DistributedFileSystem fs;
@BeforeClass
public static void setUp() throws java.lang.Exception {
AppendTestUtil.LOG.info("setUp()");
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
buffersize = conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
fs = cluster.getFileSystem();
}
@AfterClass
public static void tearDown() throws Exception {
AppendTestUtil.LOG.info("tearDown()");
if(fs != null) fs.close();
if(cluster != null) cluster.shutdown();
}
/**
* TC1: Append on block boundary.
* @throws IOException an exception might be thrown
*/
@Test
public void testTC1() throws Exception {
final Path p = new Path("/TC1/foo");
System.out.println("p=" + p);
//a. Create file and write one block of data. Close file.
final int len1 = (int)BLOCK_SIZE;
{
FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
AppendTestUtil.write(out, 0, len1);
out.close();
}
// Reopen file to append. Append half block of data. Close file.
final int len2 = (int)BLOCK_SIZE/2;
{
FSDataOutputStream out = fs.append(p);
AppendTestUtil.write(out, len1, len2);
out.close();
}
//b. Reopen file and read 1.5 blocks worth of data. Close file.
AppendTestUtil.check(fs, p, len1 + len2);
}
@Test
public void testTC1ForAppend2() throws Exception {
final Path p = new Path("/TC1/foo2");
//a. Create file and write one block of data. Close file.
final int len1 = (int) BLOCK_SIZE;
{
FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
BLOCK_SIZE);
AppendTestUtil.write(out, 0, len1);
out.close();
}
// Reopen file to append. Append half block of data. Close file.
final int len2 = (int) BLOCK_SIZE / 2;
{
FSDataOutputStream out = fs.append(p,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
AppendTestUtil.write(out, len1, len2);
out.close();
}
// b. Reopen file and read 1.5 blocks worth of data. Close file.
AppendTestUtil.check(fs, p, len1 + len2);
}
/**
* TC2: Append on non-block boundary.
* @throws IOException an exception might be thrown
*/
@Test
public void testTC2() throws Exception {
final Path p = new Path("/TC2/foo");
System.out.println("p=" + p);
//a. Create file with one and a half block of data. Close file.
final int len1 = (int)(BLOCK_SIZE + BLOCK_SIZE/2);
{
FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
AppendTestUtil.write(out, 0, len1);
out.close();
}
AppendTestUtil.check(fs, p, len1);
// Reopen file to append quarter block of data. Close file.
final int len2 = (int)BLOCK_SIZE/4;
{
FSDataOutputStream out = fs.append(p);
AppendTestUtil.write(out, len1, len2);
out.close();
}
//b. Reopen file and read 1.75 blocks of data. Close file.
AppendTestUtil.check(fs, p, len1 + len2);
}
@Test
public void testTC2ForAppend2() throws Exception {
final Path p = new Path("/TC2/foo2");
//a. Create file with one and a half block of data. Close file.
final int len1 = (int) (BLOCK_SIZE + BLOCK_SIZE / 2);
{
FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
BLOCK_SIZE);
AppendTestUtil.write(out, 0, len1);
out.close();
}
AppendTestUtil.check(fs, p, len1);
// Reopen file to append quarter block of data. Close file.
final int len2 = (int) BLOCK_SIZE / 4;
{
FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
4096, null);
AppendTestUtil.write(out, len1, len2);
out.close();
}
// b. Reopen file and read 1.75 blocks of data. Close file.
AppendTestUtil.check(fs, p, len1 + len2);
List<LocatedBlock> blocks = fs.getClient().getLocatedBlocks(
p.toString(), 0L).getLocatedBlocks();
Assert.assertEquals(3, blocks.size());
Assert.assertEquals(BLOCK_SIZE, blocks.get(0).getBlockSize());
Assert.assertEquals(BLOCK_SIZE / 2, blocks.get(1).getBlockSize());
Assert.assertEquals(BLOCK_SIZE / 4, blocks.get(2).getBlockSize());
}
/**
* TC5: Only one simultaneous append.
* @throws IOException an exception might be thrown
*/
@Test
public void testTC5() throws Exception {
final Path p = new Path("/TC5/foo");
System.out.println("p=" + p);
//a. Create file on Machine M1. Write half block to it. Close file.
{
FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
AppendTestUtil.write(out, 0, (int)(BLOCK_SIZE/2));
out.close();
}
//b. Reopen file in "append" mode on Machine M1.
FSDataOutputStream out = fs.append(p);
//c. On Machine M2, reopen file in "append" mode. This should fail.
try {
AppendTestUtil.createHdfsWithDifferentUsername(conf).append(p);
fail("This should fail.");
} catch(IOException ioe) {
AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
}
try {
((DistributedFileSystem) AppendTestUtil
.createHdfsWithDifferentUsername(conf)).append(p,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
fail("This should fail.");
} catch(IOException ioe) {
AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
}
//d. On Machine M1, close file.
out.close();
}
@Test
public void testTC5ForAppend2() throws Exception {
final Path p = new Path("/TC5/foo2");
// a. Create file on Machine M1. Write half block to it. Close file.
{
FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
BLOCK_SIZE);
AppendTestUtil.write(out, 0, (int)(BLOCK_SIZE/2));
out.close();
}
// b. Reopen file in "append" mode on Machine M1.
FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
4096, null);
// c. On Machine M2, reopen file in "append" mode. This should fail.
try {
((DistributedFileSystem) AppendTestUtil
.createHdfsWithDifferentUsername(conf)).append(p,
EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
fail("This should fail.");
} catch(IOException ioe) {
AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
}
try {
AppendTestUtil.createHdfsWithDifferentUsername(conf).append(p);
fail("This should fail.");
} catch(IOException ioe) {
AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
}
// d. On Machine M1, close file.
out.close();
}
/**
* TC7: Corrupted replicas are present.
* @throws IOException an exception might be thrown
*/
private void testTC7(boolean appendToNewBlock) throws Exception {
final short repl = 2;
final Path p = new Path("/TC7/foo" + (appendToNewBlock ? "0" : "1"));
System.out.println("p=" + p);
//a. Create file with replication factor of 2. Write half block of data. Close file.
final int len1 = (int)(BLOCK_SIZE/2);
{
FSDataOutputStream out = fs.create(p, false, buffersize, repl, BLOCK_SIZE);
AppendTestUtil.write(out, 0, len1);
out.close();
}
DFSTestUtil.waitReplication(fs, p, repl);
//b. Log into one datanode that has one replica of this block.
// Find the block file on this datanode and truncate it to zero size.
final LocatedBlocks locatedblocks = fs.dfs.getNamenode().getBlockLocations(p.toString(), 0L, len1);
assertEquals(1, locatedblocks.locatedBlockCount());
final LocatedBlock lb = locatedblocks.get(0);
final ExtendedBlock blk = lb.getBlock();
assertEquals(len1, lb.getBlockSize());
DatanodeInfo[] datanodeinfos = lb.getLocations();
assertEquals(repl, datanodeinfos.length);
final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort());
cluster.getMaterializedReplica(dn, blk).truncateData(0);
//c. Open file in "append mode". Append a new block worth of data. Close file.
final int len2 = (int)BLOCK_SIZE;
{
FSDataOutputStream out = appendToNewBlock ?
fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) : fs.append(p);
AppendTestUtil.write(out, len1, len2);
out.close();
}
//d. Reopen file and read two blocks worth of data.
AppendTestUtil.check(fs, p, len1 + len2);
}
@Test
public void testTC7() throws Exception {
testTC7(false);
}
@Test
public void testTC7ForAppend2() throws Exception {
testTC7(true);
}
/**
* TC11: Racing rename
*/
private void testTC11(boolean appendToNewBlock) throws Exception {
final Path p = new Path("/TC11/foo" + (appendToNewBlock ? "0" : "1"));
System.out.println("p=" + p);
//a. Create file and write one block of data. Close file.
final int len1 = (int)BLOCK_SIZE;
{
FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
AppendTestUtil.write(out, 0, len1);
out.close();
}
//b. Reopen file in "append" mode. Append half block of data.
FSDataOutputStream out = appendToNewBlock ?
fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
fs.append(p);
final int len2 = (int)BLOCK_SIZE/2;
AppendTestUtil.write(out, len1, len2);
out.hflush();
//c. Rename file to file.new.
final Path pnew = new Path(p + ".new");
assertTrue(fs.rename(p, pnew));
//d. Close file handle that was opened in (b).
out.close();
//check block sizes
final long len = fs.getFileStatus(pnew).getLen();
final LocatedBlocks locatedblocks = fs.dfs.getNamenode().getBlockLocations(pnew.toString(), 0L, len);
final int numblock = locatedblocks.locatedBlockCount();
for(int i = 0; i < numblock; i++) {
final LocatedBlock lb = locatedblocks.get(i);
final ExtendedBlock blk = lb.getBlock();
final long size = lb.getBlockSize();
if (i < numblock - 1) {
assertEquals(BLOCK_SIZE, size);
}
for(DatanodeInfo datanodeinfo : lb.getLocations()) {
final DataNode dn = cluster.getDataNode(datanodeinfo.getIpcPort());
final Block metainfo = DataNodeTestUtils.getFSDataset(dn).getStoredBlock(
blk.getBlockPoolId(), blk.getBlockId());
assertEquals(size, metainfo.getNumBytes());
}
}
}
@Test
public void testTC11() throws Exception {
testTC11(false);
}
@Test
public void testTC11ForAppend2() throws Exception {
testTC11(true);
}
/**
* TC12: Append to partial CRC chunk
*/
private void testTC12(boolean appendToNewBlock) throws Exception {
final Path p = new Path("/TC12/foo" + (appendToNewBlock ? "0" : "1"));
System.out.println("p=" + p);
//a. Create file with a block size of 64KB
// and a default io.bytes.per.checksum of 512 bytes.
// Write 25687 bytes of data. Close file.
final int len1 = 25687;
{
FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
AppendTestUtil.write(out, 0, len1);
out.close();
}
//b. Reopen file in "append" mode. Append another 5877 bytes of data. Close file.
final int len2 = 5877;
{
FSDataOutputStream out = appendToNewBlock ?
fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
fs.append(p);
AppendTestUtil.write(out, len1, len2);
out.close();
}
//c. Reopen file and read 25687+5877 bytes of data from file. Close file.
AppendTestUtil.check(fs, p, len1 + len2);
if (appendToNewBlock) {
LocatedBlocks blks = fs.dfs.getLocatedBlocks(p.toString(), 0);
Assert.assertEquals(2, blks.getLocatedBlocks().size());
Assert.assertEquals(len1, blks.getLocatedBlocks().get(0).getBlockSize());
Assert.assertEquals(len2, blks.getLocatedBlocks().get(1).getBlockSize());
AppendTestUtil.check(fs, p, 0, len1);
AppendTestUtil.check(fs, p, len1, len2);
}
}
@Test
public void testTC12() throws Exception {
testTC12(false);
}
@Test
public void testTC12ForAppend2() throws Exception {
testTC12(true);
}
/**
* Append to a partial CRC chunk and the first write does not fill up the
* partial CRC trunk
*/
private void testAppendToPartialChunk(boolean appendToNewBlock)
throws IOException {
final Path p = new Path("/partialChunk/foo"
+ (appendToNewBlock ? "0" : "1"));
final int fileLen = 513;
System.out.println("p=" + p);
byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
// create a new file.
FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, 1);
// create 1 byte file
stm.write(fileContents, 0, 1);
stm.close();
System.out.println("Wrote 1 byte and closed the file " + p);
// append to file
stm = appendToNewBlock ?
fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
fs.append(p);
// Append to a partial CRC trunk
stm.write(fileContents, 1, 1);
stm.hflush();
// The partial CRC trunk is not full yet and close the file
stm.close();
System.out.println("Append 1 byte and closed the file " + p);
// write the remainder of the file
stm = appendToNewBlock ?
fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
fs.append(p);
// ensure getPos is set to reflect existing size of the file
assertEquals(2, stm.getPos());
// append to a partial CRC trunk
stm.write(fileContents, 2, 1);
// The partial chunk is not full yet, force to send a packet to DN
stm.hflush();
System.out.println("Append and flush 1 byte");
// The partial chunk is not full yet, force to send another packet to DN
stm.write(fileContents, 3, 2);
stm.hflush();
System.out.println("Append and flush 2 byte");
// fill up the partial chunk and close the file
stm.write(fileContents, 5, fileLen-5);
stm.close();
System.out.println("Flush 508 byte and closed the file " + p);
// verify that entire file is good
AppendTestUtil.checkFullFile(fs, p, fileLen,
fileContents, "Failed to append to a partial chunk");
}
// Do small appends.
void doSmallAppends(Path file, DistributedFileSystem fs, int iterations)
throws IOException {
for (int i = 0; i < iterations; i++) {
FSDataOutputStream stm;
try {
stm = fs.append(file);
} catch (IOException e) {
// If another thread is already appending, skip this time.
continue;
}
// Failure in write or close will be terminal.
AppendTestUtil.write(stm, 0, 123);
stm.close();
}
}
@Test
public void testSmallAppendRace() throws Exception {
final Path file = new Path("/testSmallAppendRace");
final String fName = file.toUri().getPath();
// Create the file and write a small amount of data.
FSDataOutputStream stm = fs.create(file);
AppendTestUtil.write(stm, 0, 123);
stm.close();
// Introduce a delay between getFileInfo and calling append() against NN.
final DFSClient client = DFSClientAdapter.getDFSClient(fs);
DFSClient spyClient = spy(client);
when(spyClient.getFileInfo(fName)).thenAnswer(new Answer<HdfsFileStatus>() {
@Override
public HdfsFileStatus answer(InvocationOnMock invocation){
try {
HdfsFileStatus stat = client.getFileInfo(fName);
Thread.sleep(100);
return stat;
} catch (Exception e) {
return null;
}
}
});
DFSClientAdapter.setDFSClient(fs, spyClient);
// Create two threads for doing appends to the same file.
Thread worker1 = new Thread() {
@Override
public void run() {
try {
doSmallAppends(file, fs, 20);
} catch (IOException e) {
}
}
};
Thread worker2 = new Thread() {
@Override
public void run() {
try {
doSmallAppends(file, fs, 20);
} catch (IOException e) {
}
}
};
worker1.start();
worker2.start();
// append will fail when the file size crosses the checksum chunk boundary,
// if append was called with a stale file stat.
doSmallAppends(file, fs, 20);
}
@Test
public void testAppendToPartialChunk() throws IOException {
testAppendToPartialChunk(false);
}
@Test
public void testAppendToPartialChunkforAppend2() throws IOException {
testAppendToPartialChunk(true);
}
}