HDFS-1877. Add a new test for concurrent read and write. Contributed by CW Chung
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1125189 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 3237fb0..b24af07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -445,6 +445,9 @@
HDFS-1941. Remove -genclusterid option from namenode command.
(Bharath Mundlapudi via suresh)
+ HDFS-1877. Add a new test for concurrent read and write. (CW Chung
+ via szetszwo)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java
new file mode 100644
index 0000000..c040b43
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestWriteRead {
+
+ // junit test settings
+ private static final int WR_NTIMES = 4;
+ private static final int WR_CHUNK_SIZE = 1000;
+
+
+ private static final int BUFFER_SIZE = 8192 * 100;
+ private static final String ROOT_DIR = "/tmp/";
+
+ // command-line options
+ String filenameOption = ROOT_DIR + "fileX1";
+ int chunkSizeOption = 10000;
+ int loopOption = 10;
+
+
+ private MiniDFSCluster cluster;
+ private Configuration conf; // = new HdfsConfiguration();
+ private FileSystem mfs; // = cluster.getFileSystem();
+ private FileContext mfc; // = FileContext.getFileContext();
+
+ // configuration
+ final boolean positionRead = false; // position read vs sequential read
+ private boolean useFCOption = false; // use either FileSystem or FileContext
+ private boolean verboseOption = true;
+
+ static private Log LOG = LogFactory.getLog(TestWriteRead.class);
+
+ @Before
+ public void initJunitModeTest() throws Exception {
+ LOG.info("initJunitModeTest");
+
+ conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024 * 100); //100K blocksize
+
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ cluster.waitActive();
+
+ mfs = cluster.getFileSystem();
+ mfc = FileContext.getFileContext();
+
+ Path rootdir = new Path(ROOT_DIR);
+ mfs.mkdirs(rootdir);
+ }
+
+ @After
+ public void shutdown() {
+ cluster.shutdown();
+ }
+
+ // Equivalence of @Before for cluster mode testing.
+ private void initClusterModeTest() throws IOException {
+
+ LOG = LogFactory.getLog(TestWriteRead.class);
+ ((Log4JLogger) FSNamesystem.LOG).getLogger().setLevel(Level.INFO);
+ ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.INFO);
+ LOG.info("initClusterModeTest");
+
+ conf = new Configuration();
+ mfc = FileContext.getFileContext();
+ mfs = FileSystem.get(conf);
+ }
+
+ /** Junit Test reading while writing. */
+ @Test
+ public void TestWriteRead1() throws IOException {
+ String fname = filenameOption;
+
+ // need to run long enough to fail: takes 25 to 35 seec on Mac
+ int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE);
+ Assert.assertTrue(stat == 0);
+ }
+
+ // equivalent of TestWriteRead1
+ private int clusterTestWriteRead1() throws IOException {
+ int stat = testWriteAndRead(filenameOption, loopOption, chunkSizeOption);
+ return stat;
+ }
+
+ /**
+ * Open the file to read from begin to end. Then close the file.
+ * Return number of bytes read.
+ * Support both sequential read and position read.
+ */
+ private long readData(String fname, byte[] buffer, long byteExpected)
+ throws IOException {
+ long totalByteRead = 0;
+ long beginPosition = 0;
+ Path path = getFullyQualifiedPath(fname);
+
+ FSDataInputStream in = null;
+ try {
+ in = openInputStream(path);
+
+ long visibleLenFromReadStream = getVisibleFileLength(in);
+
+ totalByteRead = readUntilEnd(in, buffer, buffer.length, fname,
+ beginPosition, visibleLenFromReadStream, positionRead);
+ in.close();
+
+ return totalByteRead + beginPosition;
+
+ } catch (IOException e) {
+ throw new IOException("##### Caught Exception in readData. "
+ + "Total Byte Read so far = " + totalByteRead
+ + " beginPosition = " + beginPosition, e);
+ } finally {
+ if (in != null)
+ in.close();
+ }
+ }
+
+ /**
+ * read chunks into buffer repeatedly until total of VisibleLen byte are read
+ * Return total number of bytes read
+ */
+ private long readUntilEnd(FSDataInputStream in, byte[] buffer, long size,String fname,
+ long pos, long visibleLen, boolean positionRead) throws IOException {
+
+ if (pos >= visibleLen || visibleLen <= 0 )
+ return 0;
+
+ int chunkNumber = 0;
+ long totalByteRead = 0;
+ long currentPosition = pos;
+ int byteRead = 0;
+ long byteLeftToRead = visibleLen - pos;
+ int byteToReadThisRound = 0;
+
+ if (!positionRead){
+ in.seek(pos);
+ currentPosition = in.getPos();
+ }
+ if (verboseOption)
+ LOG.info("reader begin: position: " + pos
+ + " ; currentOffset = " + currentPosition + " ; bufferSize ="
+ + buffer.length + " ; Filename = " + fname);
+ try {
+ while (byteLeftToRead > 0 && currentPosition < visibleLen ) {
+ byteToReadThisRound = (int) (byteLeftToRead >= buffer.length ?
+ buffer.length : byteLeftToRead);
+ if (positionRead) {
+ byteRead = in.read(currentPosition, buffer, 0, byteToReadThisRound);
+ } else {
+ byteRead = in.read(buffer, 0, byteToReadThisRound);
+ }
+ if (byteRead <= 0)
+ break;
+ chunkNumber++;
+ totalByteRead += byteRead;
+ currentPosition += byteRead;
+ byteLeftToRead -= byteRead;
+
+ if (verboseOption) {
+ LOG.info("reader: Number of byte read: " + byteRead
+ + " ; toatlByteRead = " + totalByteRead + " ; currentPosition="
+ + currentPosition + " ; chunkNumber =" + chunkNumber
+ + "; File name = " + fname);
+ }
+ }
+ } catch (IOException e) {
+ throw new IOException(
+ "#### Exception caught in readUntilEnd: reader currentOffset = "
+ + currentPosition + " ; totalByteRead =" + totalByteRead
+ + " ; latest byteRead = " + byteRead + "; visibleLen= "
+ + visibleLen + " ; bufferLen = " + buffer.length
+ + " ; Filename = " + fname, e);
+ }
+
+ if (verboseOption)
+ LOG.info("reader end: position: " + pos
+ + " ; currentOffset = " + currentPosition + " ; totalByteRead ="
+ + totalByteRead + " ; Filename = " + fname);
+
+ return totalByteRead;
+ }
+
+ private int writeData(FSDataOutputStream out, byte[] buffer, int length)
+ throws IOException {
+
+ int totalByteWritten = 0;
+ int remainToWrite = length;
+
+ while (remainToWrite > 0) {
+ int toWriteThisRound = remainToWrite > buffer.length ? buffer.length
+ : remainToWrite;
+ out.write(buffer, 0, toWriteThisRound);
+ totalByteWritten += toWriteThisRound;
+ remainToWrite -= toWriteThisRound;
+ }
+ return totalByteWritten;
+ }
+
+ /**
+ * Common routine to do position read while open the file for write.
+ * After each iteration of write, do a read of the file from begin to end.
+ * Return 0 on success, else number of failure.
+ */
+ private int testWriteAndRead(String fname, int loopN, int chunkSize)
+ throws IOException {
+
+ int countOfFailures = 0;
+ long byteVisibleToRead = 0;
+ FSDataOutputStream out = null;
+
+ byte[] outBuffer = new byte[BUFFER_SIZE];
+ byte[] inBuffer = new byte[BUFFER_SIZE];
+
+ for (int i = 0; i < BUFFER_SIZE; i++) {
+ outBuffer[i] = (byte) (i & 0x00ff);
+ }
+
+ try {
+ Path path = getFullyQualifiedPath(fname);
+
+ out = useFCOption ? mfc.create(path, EnumSet.of(CreateFlag.CREATE)) :
+ mfs.create(path);
+
+ long totalByteWritten = 0;
+ long totalByteVisible = 0;
+ long totalByteWrittenButNotVisible = 0;
+ int byteWrittenThisTime;
+
+ boolean toFlush;
+ for (int i = 0; i < loopN; i++) {
+ toFlush = (i % 2) == 0;
+
+ byteWrittenThisTime = writeData(out, outBuffer, chunkSize);
+
+ totalByteWritten += byteWrittenThisTime;
+
+ if (toFlush) {
+ out.hflush();
+ totalByteVisible += byteWrittenThisTime
+ + totalByteWrittenButNotVisible;
+ totalByteWrittenButNotVisible = 0;
+ } else {
+ totalByteWrittenButNotVisible += byteWrittenThisTime;
+ }
+
+ if (verboseOption) {
+ LOG.info("TestReadWrite - Written " + byteWrittenThisTime
+ + ". Total written = " + totalByteWritten
+ + ". TotalByteVisible = " + totalByteVisible + " to file "
+ + fname);
+ }
+ byteVisibleToRead = readData(fname, inBuffer, totalByteVisible);
+
+ String readmsg;
+
+ if (byteVisibleToRead >= totalByteVisible
+ && byteVisibleToRead <= totalByteWritten) {
+ readmsg = "pass: reader sees expected number of visible byte "
+ + byteVisibleToRead + " of file " + fname + " [pass]";
+ } else {
+ countOfFailures++;
+ readmsg = "fail: reader does not see expected number of visible byte "
+ + byteVisibleToRead + " of file " + fname + " [fail]";
+ }
+ LOG.info(readmsg);
+ }
+
+ // test the automatic flush after close
+ writeData(out, outBuffer, chunkSize);
+ totalByteWritten += chunkSize;
+ totalByteVisible += chunkSize + totalByteWrittenButNotVisible;
+ totalByteWrittenButNotVisible += 0;
+
+ out.close();
+
+ byteVisibleToRead = readData(fname, inBuffer, totalByteVisible);
+ long lenFromFc = getFileLengthFromNN(path);
+
+ String readmsg;
+ if (byteVisibleToRead == totalByteVisible) {
+ readmsg = "PASS: reader sees expected size of file " + fname
+ + " after close. File Length from NN: " + lenFromFc + " [Pass]";
+ } else {
+ countOfFailures++;
+ readmsg = "FAIL: reader sees is different size of file " + fname
+ + " after close. File Length from NN: " + lenFromFc + " [Fail]";
+ }
+ LOG.info(readmsg);
+
+ } catch (IOException e) {
+ throw new IOException(
+ "##### Caught Exception in testAppendWriteAndRead. Close file. "
+ + "Total Byte Read so far = " + byteVisibleToRead, e);
+ } finally {
+ if (out != null)
+ out.close();
+ }
+ return -countOfFailures;
+ }
+
+ // //////////////////////////////////////////////////////////////////////
+ // // helper function:
+ // /////////////////////////////////////////////////////////////////////
+ private FSDataInputStream openInputStream(Path path) throws IOException {
+ FSDataInputStream in = useFCOption ? mfc.open(path) : mfs.open(path);
+ return in;
+ }
+
+ // length of a file (path name) from NN.
+ private long getFileLengthFromNN(Path path) throws IOException {
+ FileStatus fileStatus = useFCOption ?
+ mfc.getFileStatus(path) : mfs.getFileStatus(path);
+ return fileStatus.getLen();
+ }
+
+ private long getVisibleFileLength(FSDataInputStream in) throws IOException {
+ DFSClient.DFSDataInputStream din = (DFSClient.DFSDataInputStream) in;
+ return din.getVisibleLength();
+ }
+
+ private boolean ifExists(Path path) throws IOException {
+ return useFCOption ? mfc.util().exists(path) : mfs.exists(path);
+ }
+
+ private Path getFullyQualifiedPath(String pathString) {
+ return useFCOption ?
+ mfc.makeQualified(new Path(ROOT_DIR, pathString)) :
+ mfs.makeQualified(new Path(ROOT_DIR, pathString));
+ }
+
+ private void usage(){
+ System.out.println("Usage: -chunkSize nn -loop ntime -f filename");
+ System.exit(0);
+ }
+
+ private void getCmdLineOption(String[] args){
+ for (int i = 0; i < args.length; i++){
+ if (args[i].equals("-f")) {
+ filenameOption = args[++i];
+ } else if (args[i].equals("-chunkSize")){
+ chunkSizeOption = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-loop")){
+ loopOption = Integer.parseInt(args[++i]);
+ } else {
+ usage();
+ }
+ }
+ return;
+ }
+
+ /**
+ * Entry point of the test when using a real cluster.
+ * Usage: [-loop ntimes] [-chunkSize nn] [-f filename]
+ * -loop: iterate ntimes: each iteration consists of a write, then a read
+ * -chunkSize: number of byte for each write
+ * -f filename: filename to write and read
+ * Default: ntimes = 10; chunkSize = 10000; filename = /tmp/fileX1
+ */
+ public static void main(String[] args) {
+ try {
+ TestWriteRead trw = new TestWriteRead();
+ trw.initClusterModeTest();
+ trw.getCmdLineOption(args);
+ int stat = trw.clusterTestWriteRead1();
+
+ if (stat == 0){
+ System.out.println("Status: clusterTestWriteRead1 test PASS");
+ } else {
+ System.out.println("Status: clusterTestWriteRead1 test FAIL");
+ }
+ System.exit(stat);
+ } catch (IOException e) {
+ LOG.info("#### Exception in Main");
+ e.printStackTrace();
+ System.exit(-2);
+ }
+ }
+}