blob: 9a444df827fe8340c781c157cee327c3c445bedd [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hdfs;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestWriteRead {
// junit test settings
private static final int WR_NTIMES = 350;
private static final int WR_CHUNK_SIZE = 10000;
private static final int BUFFER_SIZE = 8192 * 100;
private static final String ROOT_DIR = "/tmp/";
// command-line options
String filenameOption = ROOT_DIR + "fileX1";
int chunkSizeOption = 10000;
int loopOption = 10;
private MiniDFSCluster cluster;
private Configuration conf; // = new HdfsConfiguration();
private FileSystem mfs; // = cluster.getFileSystem();
private FileContext mfc; // = FileContext.getFileContext();
// configuration
final boolean positionRead = false; // position read vs sequential read
private boolean useFCOption = false; // use either FileSystem or FileContext
private boolean verboseOption = true;
static private Log LOG = LogFactory.getLog(TestWriteRead.class);
public void initJunitModeTest() throws Exception {"initJunitModeTest");
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024 * 100); //100K blocksize
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
mfs = cluster.getFileSystem();
mfc = FileContext.getFileContext();
Path rootdir = new Path(ROOT_DIR);
public void shutdown() {
// Equivalence of @Before for cluster mode testing.
private void initClusterModeTest() throws IOException {
LOG = LogFactory.getLog(TestWriteRead.class);
((Log4JLogger) FSNamesystem.LOG).getLogger().setLevel(Level.INFO);
((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.INFO);"initClusterModeTest");
conf = new Configuration();
mfc = FileContext.getFileContext();
mfs = FileSystem.get(conf);
/** Junit Test reading while writing. */
public void TestWriteRead1() throws IOException {
String fname = filenameOption;
// need to run long enough to fail: takes 25 to 35 seec on Mac
int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE);
Assert.assertTrue(stat == 0);
// equivalent of TestWriteRead1
private int clusterTestWriteRead1() throws IOException {
int stat = testWriteAndRead(filenameOption, loopOption, chunkSizeOption);
return stat;
* Open the file to read from begin to end. Then close the file.
* Return number of bytes read.
* Support both sequential read and position read.
private long readData(String fname, byte[] buffer, long byteExpected)
throws IOException {
long totalByteRead = 0;
long beginPosition = 0;
Path path = getFullyQualifiedPath(fname);
FSDataInputStream in = null;
try {
in = openInputStream(path);
long visibleLenFromReadStream = getVisibleFileLength(in);
totalByteRead = readUntilEnd(in, buffer, buffer.length, fname,
beginPosition, visibleLenFromReadStream, positionRead);
return totalByteRead + beginPosition;
} catch (IOException e) {
throw new IOException("##### Caught Exception in readData. "
+ "Total Byte Read so far = " + totalByteRead
+ " beginPosition = " + beginPosition, e);
} finally {
if (in != null)
* read chunks into buffer repeatedly until total of VisibleLen byte are read
* Return total number of bytes read
private long readUntilEnd(FSDataInputStream in, byte[] buffer, long size,String fname,
long pos, long visibleLen, boolean positionRead) throws IOException {
if (pos >= visibleLen || visibleLen <= 0 )
return 0;
int chunkNumber = 0;
long totalByteRead = 0;
long currentPosition = pos;
int byteRead = 0;
long byteLeftToRead = visibleLen - pos;
int byteToReadThisRound = 0;
if (!positionRead){;
currentPosition = in.getPos();
if (verboseOption)"reader begin: position: " + pos
+ " ; currentOffset = " + currentPosition + " ; bufferSize ="
+ buffer.length + " ; Filename = " + fname);
try {
while (byteLeftToRead > 0 && currentPosition < visibleLen ) {
byteToReadThisRound = (int) (byteLeftToRead >= buffer.length ?
buffer.length : byteLeftToRead);
if (positionRead) {
byteRead =, buffer, 0, byteToReadThisRound);
} else {
byteRead =, 0, byteToReadThisRound);
if (byteRead <= 0)
totalByteRead += byteRead;
currentPosition += byteRead;
byteLeftToRead -= byteRead;
if (verboseOption) {"reader: Number of byte read: " + byteRead
+ " ; toatlByteRead = " + totalByteRead + " ; currentPosition="
+ currentPosition + " ; chunkNumber =" + chunkNumber
+ "; File name = " + fname);
} catch (IOException e) {
throw new IOException(
"#### Exception caught in readUntilEnd: reader currentOffset = "
+ currentPosition + " ; totalByteRead =" + totalByteRead
+ " ; latest byteRead = " + byteRead + "; visibleLen= "
+ visibleLen + " ; bufferLen = " + buffer.length
+ " ; Filename = " + fname, e);
if (verboseOption)"reader end: position: " + pos
+ " ; currentOffset = " + currentPosition + " ; totalByteRead ="
+ totalByteRead + " ; Filename = " + fname);
return totalByteRead;
private int writeData(FSDataOutputStream out, byte[] buffer, int length)
throws IOException {
int totalByteWritten = 0;
int remainToWrite = length;
while (remainToWrite > 0) {
int toWriteThisRound = remainToWrite > buffer.length ? buffer.length
: remainToWrite;
out.write(buffer, 0, toWriteThisRound);
totalByteWritten += toWriteThisRound;
remainToWrite -= toWriteThisRound;
return totalByteWritten;
* Common routine to do position read while open the file for write.
* After each iteration of write, do a read of the file from begin to end.
* Return 0 on success, else number of failure.
private int testWriteAndRead(String fname, int loopN, int chunkSize)
throws IOException {
int countOfFailures = 0;
long byteVisibleToRead = 0;
FSDataOutputStream out = null;
byte[] outBuffer = new byte[BUFFER_SIZE];
byte[] inBuffer = new byte[BUFFER_SIZE];
for (int i = 0; i < BUFFER_SIZE; i++) {
outBuffer[i] = (byte) (i & 0x00ff);
try {
Path path = getFullyQualifiedPath(fname);
out = useFCOption ? mfc.create(path, EnumSet.of(CreateFlag.CREATE)) :
long totalByteWritten = 0;
long totalByteVisible = 0;
long totalByteWrittenButNotVisible = 0;
int byteWrittenThisTime;
boolean toFlush;
for (int i = 0; i < loopN; i++) {
toFlush = (i % 2) == 0;
byteWrittenThisTime = writeData(out, outBuffer, chunkSize);
totalByteWritten += byteWrittenThisTime;
if (toFlush) {
totalByteVisible += byteWrittenThisTime
+ totalByteWrittenButNotVisible;
totalByteWrittenButNotVisible = 0;
} else {
totalByteWrittenButNotVisible += byteWrittenThisTime;
if (verboseOption) {"TestReadWrite - Written " + byteWrittenThisTime
+ ". Total written = " + totalByteWritten
+ ". TotalByteVisible = " + totalByteVisible + " to file "
+ fname);
byteVisibleToRead = readData(fname, inBuffer, totalByteVisible);
String readmsg;
if (byteVisibleToRead >= totalByteVisible
&& byteVisibleToRead <= totalByteWritten) {
readmsg = "pass: reader sees expected number of visible byte "
+ byteVisibleToRead + " of file " + fname + " [pass]";
} else {
readmsg = "fail: reader does not see expected number of visible byte "
+ byteVisibleToRead + " of file " + fname + " [fail]";
// test the automatic flush after close
writeData(out, outBuffer, chunkSize);
totalByteWritten += chunkSize;
totalByteVisible += chunkSize + totalByteWrittenButNotVisible;
totalByteWrittenButNotVisible += 0;
byteVisibleToRead = readData(fname, inBuffer, totalByteVisible);
long lenFromFc = getFileLengthFromNN(path);
String readmsg;
if (byteVisibleToRead == totalByteVisible) {
readmsg = "PASS: reader sees expected size of file " + fname
+ " after close. File Length from NN: " + lenFromFc + " [Pass]";
} else {
readmsg = "FAIL: reader sees is different size of file " + fname
+ " after close. File Length from NN: " + lenFromFc + " [Fail]";
} catch (IOException e) {
throw new IOException(
"##### Caught Exception in testAppendWriteAndRead. Close file. "
+ "Total Byte Read so far = " + byteVisibleToRead, e);
} finally {
if (out != null)
return -countOfFailures;
// //////////////////////////////////////////////////////////////////////
// // helper function:
// /////////////////////////////////////////////////////////////////////
private FSDataInputStream openInputStream(Path path) throws IOException {
FSDataInputStream in = useFCOption ? :;
return in;
// length of a file (path name) from NN.
private long getFileLengthFromNN(Path path) throws IOException {
FileStatus fileStatus = useFCOption ?
mfc.getFileStatus(path) : mfs.getFileStatus(path);
return fileStatus.getLen();
private long getVisibleFileLength(FSDataInputStream in) throws IOException {
DFSClient.DFSDataInputStream din = (DFSClient.DFSDataInputStream) in;
return din.getVisibleLength();
private boolean ifExists(Path path) throws IOException {
return useFCOption ? mfc.util().exists(path) : mfs.exists(path);
private Path getFullyQualifiedPath(String pathString) {
return useFCOption ?
mfc.makeQualified(new Path(ROOT_DIR, pathString)) :
mfs.makeQualified(new Path(ROOT_DIR, pathString));
private void usage(){
System.out.println("Usage: -chunkSize nn -loop ntime -f filename");
private void getCmdLineOption(String[] args){
for (int i = 0; i < args.length; i++){
if (args[i].equals("-f")) {
filenameOption = args[++i];
} else if (args[i].equals("-chunkSize")){
chunkSizeOption = Integer.parseInt(args[++i]);
} else if (args[i].equals("-loop")){
loopOption = Integer.parseInt(args[++i]);
} else {
* Entry point of the test when using a real cluster.
* Usage: [-loop ntimes] [-chunkSize nn] [-f filename]
* -loop: iterate ntimes: each iteration consists of a write, then a read
* -chunkSize: number of byte for each write
* -f filename: filename to write and read
* Default: ntimes = 10; chunkSize = 10000; filename = /tmp/fileX1
public static void main(String[] args) {
try {
TestWriteRead trw = new TestWriteRead();
int stat = trw.clusterTestWriteRead1();
if (stat == 0){
System.out.println("Status: clusterTestWriteRead1 test PASS");
} else {
System.out.println("Status: clusterTestWriteRead1 test FAIL");
} catch (IOException e) {"#### Exception in Main");