blob: ce49f698210f3a121aea532d521020b94bb5e588 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Test;
/** Class contains a set of tests to verify the correctness of
* newly introduced {@link FSDataOutputStream#hflush()} method */
public class TestHFlush {
{
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
}
private final String fName = "hflushtest.dat";
/**
* The test uses
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* to write a file with a standard block size
*/
@Test
public void hFlush_01() throws IOException {
doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
(short) 2, false, EnumSet.noneOf(SyncFlag.class));
}
/**
* The test uses
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* to write a file with a custom block size so the writes will be
* happening across block' boundaries
*/
@Test
public void hFlush_02() throws IOException {
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 512;
int customBlockSize = customPerChecksumSize * 3;
// Modify defaul filesystem settings
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
doTheJob(conf, fName, customBlockSize, (short) 2, false,
EnumSet.noneOf(SyncFlag.class));
}
/**
* The test uses
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* to write a file with a custom block size so the writes will be
* happening across block's and checksum' boundaries
*/
@Test
public void hFlush_03() throws IOException {
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 400;
int customBlockSize = customPerChecksumSize * 3;
// Modify defaul filesystem settings
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
doTheJob(conf, fName, customBlockSize, (short) 2, false,
EnumSet.noneOf(SyncFlag.class));
}
/**
* Test hsync (with updating block length in NameNode) while no data is
* actually written yet
*/
@Test
public void hSyncUpdateLength_00() throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
2).build();
DistributedFileSystem fileSystem =
cluster.getFileSystem();
try {
Path path = new Path(fName);
FSDataOutputStream stm = fileSystem.create(path, true, 4096, (short) 2,
AppendTestUtil.BLOCK_SIZE);
System.out.println("Created file " + path.toString());
((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
.of(SyncFlag.UPDATE_LENGTH));
long currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(0L, currentFileLength);
stm.close();
} finally {
fileSystem.close();
cluster.shutdown();
}
}
/**
* Test hsync with END_BLOCK flag.
*/
@Test
public void hSyncEndBlock_00() throws IOException {
final int preferredBlockSize = 1024;
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, preferredBlockSize);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.build();
DistributedFileSystem fileSystem = cluster.getFileSystem();
FSDataOutputStream stm = null;
try {
Path path = new Path("/" + fName);
stm = fileSystem.create(path, true, 4096, (short) 2,
AppendTestUtil.BLOCK_SIZE);
System.out.println("Created file " + path.toString());
((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
.of(SyncFlag.END_BLOCK));
long currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(0L, currentFileLength);
LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
assertEquals(0, blocks.getLocatedBlocks().size());
// write a block and call hsync(end_block) at the block boundary
stm.write(new byte[preferredBlockSize]);
((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
.of(SyncFlag.END_BLOCK));
currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(preferredBlockSize, currentFileLength);
blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
assertEquals(1, blocks.getLocatedBlocks().size());
// call hsync then call hsync(end_block) immediately
stm.write(new byte[preferredBlockSize / 2]);
stm.hsync();
((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
.of(SyncFlag.END_BLOCK));
currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(preferredBlockSize + preferredBlockSize / 2,
currentFileLength);
blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
assertEquals(2, blocks.getLocatedBlocks().size());
stm.write(new byte[preferredBlockSize / 4]);
stm.hsync();
currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(preferredBlockSize + preferredBlockSize / 2
+ preferredBlockSize / 4, currentFileLength);
blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
assertEquals(3, blocks.getLocatedBlocks().size());
} finally {
IOUtils.cleanup(null, stm, fileSystem);
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
*/
@Test
public void hSyncUpdateLength_01() throws IOException {
doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
(short) 2, true, EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* while requiring the semantic of {@link SyncFlag#END_BLOCK}.
*/
@Test
public void hSyncEndBlock_01() throws IOException {
doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
(short) 2, true, EnumSet.of(SyncFlag.END_BLOCK));
}
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* while requiring the semantic of {@link SyncFlag#END_BLOCK} and
* {@link SyncFlag#UPDATE_LENGTH}.
*/
@Test
public void hSyncEndBlockAndUpdateLength() throws IOException {
doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
(short) 2, true, EnumSet.of(SyncFlag.END_BLOCK, SyncFlag.UPDATE_LENGTH));
}
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
* Similar with {@link #hFlush_02()} , it writes a file with a custom block
* size so the writes will be happening across block' boundaries
*/
@Test
public void hSyncUpdateLength_02() throws IOException {
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 512;
int customBlockSize = customPerChecksumSize * 3;
// Modify defaul filesystem settings
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
@Test
public void hSyncEndBlock_02() throws IOException {
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 512;
int customBlockSize = customPerChecksumSize * 3;
// Modify defaul filesystem settings
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.END_BLOCK));
}
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
* Similar with {@link #hFlush_03()} , it writes a file with a custom block
* size so the writes will be happening across block's and checksum'
* boundaries.
*/
@Test
public void hSyncUpdateLength_03() throws IOException {
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 400;
int customBlockSize = customPerChecksumSize * 3;
// Modify defaul filesystem settings
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
@Test
public void hSyncEndBlock_03() throws IOException {
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 400;
int customBlockSize = customPerChecksumSize * 3;
// Modify defaul filesystem settings
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.END_BLOCK));
}
/**
* The method starts new cluster with defined Configuration; creates a file
* with specified block_size and writes 10 equal sections in it; it also calls
* hflush/hsync after each write and throws an IOException in case of an error.
*
* @param conf cluster configuration
* @param fileName of the file to be created and processed as required
* @param block_size value to be used for the file's creation
* @param replicas is the number of replicas
* @param isSync hsync or hflush
* @param syncFlags specify the semantic of the sync/flush
* @throws IOException in case of any errors
*/
public static void doTheJob(Configuration conf, final String fileName,
long block_size, short replicas, boolean isSync,
EnumSet<SyncFlag> syncFlags) throws IOException {
byte[] fileContent;
final int SECTIONS = 10;
fileContent = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(replicas).build();
// Make sure we work with DFS in order to utilize all its functionality
DistributedFileSystem fileSystem = cluster.getFileSystem();
FSDataInputStream is;
try {
Path path = new Path(fileName);
final String pathName = new Path(fileSystem.getWorkingDirectory(), path)
.toUri().getPath();
FSDataOutputStream stm = fileSystem.create(path, false, 4096, replicas,
block_size);
System.out.println("Created file " + fileName);
int tenth = AppendTestUtil.FILE_SIZE/SECTIONS;
int rounding = AppendTestUtil.FILE_SIZE - tenth * SECTIONS;
for (int i=0; i<SECTIONS; i++) {
System.out.println("Writing " + (tenth * i) + " to "
+ (tenth * (i + 1)) + " section to file " + fileName);
// write to the file
stm.write(fileContent, tenth * i, tenth);
// Wait while hflush/hsync pushes all packets through built pipeline
if (isSync) {
((DFSOutputStream)stm.getWrappedStream()).hsync(syncFlags);
} else {
((DFSOutputStream)stm.getWrappedStream()).hflush();
}
// Check file length if updatelength is required
if (isSync && syncFlags.contains(SyncFlag.UPDATE_LENGTH)) {
long currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(
"File size doesn't match for hsync/hflush with updating the length",
tenth * (i + 1), currentFileLength);
} else if (isSync && syncFlags.contains(SyncFlag.END_BLOCK)) {
LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(pathName, 0);
assertEquals(i + 1, blocks.getLocatedBlocks().size());
}
byte [] toRead = new byte[tenth];
byte [] expected = new byte[tenth];
System.arraycopy(fileContent, tenth * i, expected, 0, tenth);
// Open the same file for read. Need to create new reader after every write operation(!)
is = fileSystem.open(path);
is.seek(tenth * i);
int readBytes = is.read(toRead, 0, tenth);
System.out.println("Has read " + readBytes);
assertTrue("Should've get more bytes", (readBytes > 0) && (readBytes <= tenth));
is.close();
checkData(toRead, 0, readBytes, expected, "Partial verification");
}
System.out.println("Writing " + (tenth * SECTIONS) + " to " + (tenth * SECTIONS + rounding) + " section to file " + fileName);
stm.write(fileContent, tenth * SECTIONS, rounding);
stm.close();
assertEquals("File size doesn't match ", AppendTestUtil.FILE_SIZE, fileSystem.getFileStatus(path).getLen());
AppendTestUtil.checkFullFile(fileSystem, path, fileContent.length, fileContent, "hflush()");
} finally {
fileSystem.close();
cluster.shutdown();
}
}
static void checkData(final byte[] actual, int from, int len,
final byte[] expected, String message) {
for (int idx = 0; idx < len; idx++) {
assertEquals(message+" byte "+(from+idx)+" differs. expected "+
expected[from+idx]+" actual "+actual[idx],
expected[from+idx], actual[idx]);
actual[idx] = 0;
}
}
/** This creates a slow writer and check to see
* if pipeline heartbeats work fine
*/
@Test
public void testPipelineHeartbeat() throws Exception {
final int DATANODE_NUM = 2;
final int fileLen = 6;
Configuration conf = new HdfsConfiguration();
final int timeout = 2000;
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
timeout);
final Path p = new Path("/pipelineHeartbeat/foo");
System.out.println("p=" + p);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
try {
DistributedFileSystem fs = cluster.getFileSystem();
byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
// create a new file.
FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
stm.write(fileContents, 0, 1);
Thread.sleep(timeout);
stm.hflush();
System.out.println("Wrote 1 byte and hflush " + p);
// write another byte
Thread.sleep(timeout);
stm.write(fileContents, 1, 1);
stm.hflush();
stm.write(fileContents, 2, 1);
Thread.sleep(timeout);
stm.hflush();
stm.write(fileContents, 3, 1);
Thread.sleep(timeout);
stm.write(fileContents, 4, 1);
stm.hflush();
stm.write(fileContents, 5, 1);
Thread.sleep(timeout);
stm.close();
// verify that entire file is good
AppendTestUtil.checkFullFile(fs, p, fileLen,
fileContents, "Failed to slowly write to a file");
} finally {
cluster.shutdown();
}
}
@Test
public void testHFlushInterrupted() throws Exception {
final int DATANODE_NUM = 2;
final int fileLen = 6;
byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
Configuration conf = new HdfsConfiguration();
final Path p = new Path("/hflush-interrupted");
System.out.println("p=" + p);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
try {
DistributedFileSystem fs = cluster.getFileSystem();
// create a new file.
FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
stm.write(fileContents, 0, 2);
Thread.currentThread().interrupt();
try {
stm.hflush();
// If we made it past the hflush(), then that means that the ack made it back
// from the pipeline before we got to the wait() call. In that case we should
// still have interrupted status.
assertTrue(Thread.interrupted());
} catch (InterruptedIOException ie) {
System.out.println("Got expected exception during flush");
}
assertFalse(Thread.interrupted());
// Try again to flush should succeed since we no longer have interrupt status
stm.hflush();
// Write some more data and flush
stm.write(fileContents, 2, 2);
stm.hflush();
// Write some data and close while interrupted
stm.write(fileContents, 4, 2);
Thread.currentThread().interrupt();
try {
stm.close();
// If we made it past the close(), then that means that the ack made it back
// from the pipeline before we got to the wait() call. In that case we should
// still have interrupted status.
assertTrue(Thread.interrupted());
} catch (InterruptedIOException ioe) {
System.out.println("Got expected exception during close");
// If we got the exception, we shouldn't have interrupted status anymore.
assertFalse(Thread.interrupted());
// Do the try-catch in the second stm.close() avoid that streamer was already
// closed in other places.
try {
// Now do a successful close.
stm.close();
} catch (IOException e) {
System.out.println("Got expected exception during second close");
}
}
// verify that entire file is good
AppendTestUtil.checkFullFile(fs, p, 4, fileContents,
"Failed to deal with thread interruptions", false);
} finally {
cluster.shutdown();
}
}
}