HDFS-1877.  Add a new test for concurrent read and write.  Contributed by CW Chung


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1125189 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 3237fb0..b24af07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -445,6 +445,9 @@
     HDFS-1941. Remove -genclusterid option from namenode command.
     (Bharath Mundlapudi via suresh)
 
+    HDFS-1877.  Add a new test for concurrent read and write.  (CW Chung
+    via szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java
new file mode 100644
index 0000000..c040b43
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestWriteRead.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file 
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestWriteRead {
+  
+  // junit test settings
+  private static final int WR_NTIMES = 4;
+  private static final int WR_CHUNK_SIZE = 1000;
+
+  
+  private static final int BUFFER_SIZE = 8192  * 100;
+  private static final String ROOT_DIR = "/tmp/";
+      
+  // command-line options
+  String filenameOption = ROOT_DIR + "fileX1";
+  int chunkSizeOption = 10000;
+  int loopOption = 10;
+ 
+  
+  private MiniDFSCluster cluster;
+  private Configuration conf;   // = new HdfsConfiguration();
+  private FileSystem mfs;       // = cluster.getFileSystem();
+  private FileContext mfc;      // = FileContext.getFileContext();
+  
+   // configuration
+  final boolean positionRead = false;   // position read vs sequential read
+  private boolean useFCOption = false;  // use either FileSystem or FileContext
+  private boolean verboseOption = true;
+
+  static private Log LOG = LogFactory.getLog(TestWriteRead.class);
+
+  @Before
+  public void initJunitModeTest() throws Exception {
+    LOG.info("initJunitModeTest");
+   
+    conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024 * 100); //100K blocksize
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+   
+    mfs = cluster.getFileSystem();
+    mfc = FileContext.getFileContext();
+
+    Path rootdir = new Path(ROOT_DIR);
+    mfs.mkdirs(rootdir);
+  }
+
+  @After
+  public void shutdown() {
+    cluster.shutdown();
+  }
+
+  // Equivalence of @Before for cluster mode testing.
+  private void initClusterModeTest() throws IOException {
+    
+    LOG = LogFactory.getLog(TestWriteRead.class);
+    ((Log4JLogger) FSNamesystem.LOG).getLogger().setLevel(Level.INFO);
+    ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.INFO);
+    LOG.info("initClusterModeTest");
+
+    conf = new Configuration();
+    mfc = FileContext.getFileContext();
+    mfs = FileSystem.get(conf);
+  }
+
+  /** Junit Test reading while writing. */
+  @Test
+  public void TestWriteRead1() throws IOException {
+    String fname = filenameOption;
+ 
+    // need to run long enough to fail: takes 25 to 35 seec on Mac
+    int stat = testWriteAndRead(fname, WR_NTIMES, WR_CHUNK_SIZE);
+    Assert.assertTrue(stat == 0);
+  }
+
+  // equivalent of TestWriteRead1
+  private int clusterTestWriteRead1() throws IOException {
+    int stat =   testWriteAndRead(filenameOption, loopOption, chunkSizeOption);
+    return stat;
+  }
+  
+  /**
+   * Open the file to read from begin to end. Then close the file.
+   * Return number of bytes read.
+   * Support both sequential read and position read.
+   */
+  private long readData(String fname, byte[] buffer, long byteExpected)
+  throws IOException {
+    long totalByteRead = 0;
+    long beginPosition = 0;
+    Path path = getFullyQualifiedPath(fname);
+
+    FSDataInputStream in = null;
+    try {
+      in = openInputStream(path);
+
+      long visibleLenFromReadStream = getVisibleFileLength(in);
+
+      totalByteRead = readUntilEnd(in, buffer, buffer.length, fname,
+          beginPosition, visibleLenFromReadStream, positionRead);
+      in.close();
+
+      return  totalByteRead + beginPosition;
+
+    } catch (IOException e) {
+      throw new IOException("##### Caught Exception in readData. "
+          + "Total Byte Read so far = " + totalByteRead 
+          + " beginPosition = " + beginPosition, e);  
+    } finally {
+      if (in != null) 
+        in.close();
+    }
+  }
+
+  /**
+   * read chunks into buffer repeatedly until total of VisibleLen byte are read 
+   * Return total number of bytes read
+   */
+  private long readUntilEnd(FSDataInputStream in, byte[] buffer, long size,String fname, 
+      long pos, long visibleLen, boolean positionRead) throws IOException {
+
+    if (pos >= visibleLen || visibleLen <= 0 )
+      return 0;
+    
+    int chunkNumber = 0;
+    long totalByteRead = 0;
+    long currentPosition = pos;
+    int byteRead = 0;
+    long byteLeftToRead = visibleLen - pos;
+    int byteToReadThisRound = 0;
+    
+    if (!positionRead){
+      in.seek(pos);
+      currentPosition = in.getPos();
+    } 
+    if (verboseOption)
+      LOG.info("reader begin: position: " + pos
+          + " ; currentOffset = " + currentPosition + " ; bufferSize ="
+          + buffer.length + " ; Filename = " + fname);
+    try {
+      while (byteLeftToRead > 0 && currentPosition < visibleLen ) {
+        byteToReadThisRound = (int) (byteLeftToRead >=  buffer.length ? 
+            buffer.length : byteLeftToRead);
+        if (positionRead) {
+          byteRead = in.read(currentPosition, buffer, 0, byteToReadThisRound);
+        } else {
+          byteRead = in.read(buffer, 0, byteToReadThisRound);  
+        }
+        if (byteRead <= 0)
+          break;
+        chunkNumber++;
+        totalByteRead += byteRead;
+        currentPosition += byteRead;
+        byteLeftToRead -= byteRead;
+        
+        if (verboseOption) {
+          LOG.info("reader: Number of byte read: " + byteRead
+              + " ; toatlByteRead = " + totalByteRead + " ; currentPosition="
+              + currentPosition + " ; chunkNumber =" + chunkNumber
+              + "; File name = " + fname);
+        }
+      }
+    } catch (IOException e) {
+      throw new IOException(
+          "#### Exception caught in readUntilEnd: reader  currentOffset = "
+          + currentPosition + " ; totalByteRead =" + totalByteRead
+          + " ; latest byteRead = " + byteRead + "; visibleLen= "
+          + visibleLen + " ; bufferLen = " + buffer.length
+          + " ; Filename = " + fname, e);
+    }
+
+    if (verboseOption)
+      LOG.info("reader end:   position: " + pos
+          + " ; currentOffset = " + currentPosition + " ; totalByteRead ="
+          + totalByteRead + " ; Filename = " + fname);
+
+    return totalByteRead;
+  }
+
+  private int writeData(FSDataOutputStream out, byte[] buffer, int length)
+      throws IOException {
+
+    int totalByteWritten = 0;
+    int remainToWrite = length;
+
+    while (remainToWrite > 0) {
+      int toWriteThisRound = remainToWrite > buffer.length ? buffer.length
+          : remainToWrite;
+      out.write(buffer, 0, toWriteThisRound);
+      totalByteWritten += toWriteThisRound;
+      remainToWrite -= toWriteThisRound;
+    }
+    return totalByteWritten;
+  }
+
+  /** 
+   * Common routine to do position read while open the file for write.
+   * After each iteration of write, do a read of the file from begin to end.
+   * Return 0 on success, else number of failure.
+   */
+  private int testWriteAndRead(String fname, int loopN, int chunkSize)
+      throws IOException {
+    
+    int countOfFailures = 0;
+    long byteVisibleToRead = 0;
+    FSDataOutputStream out = null;
+
+    byte[] outBuffer = new byte[BUFFER_SIZE];
+    byte[] inBuffer = new byte[BUFFER_SIZE];
+    
+    for (int i = 0; i < BUFFER_SIZE; i++) {
+      outBuffer[i] = (byte) (i & 0x00ff);
+    }
+
+    try {
+      Path path = getFullyQualifiedPath(fname);
+
+      out = useFCOption ? mfc.create(path, EnumSet.of(CreateFlag.CREATE)) : 
+           mfs.create(path);
+
+      long totalByteWritten = 0;
+      long totalByteVisible = 0;
+      long totalByteWrittenButNotVisible = 0;
+      int byteWrittenThisTime;
+
+      boolean toFlush;
+      for (int i = 0; i < loopN; i++) {
+        toFlush = (i % 2) == 0;
+
+        byteWrittenThisTime = writeData(out, outBuffer, chunkSize);
+
+        totalByteWritten += byteWrittenThisTime;
+
+        if (toFlush) {
+          out.hflush();
+          totalByteVisible += byteWrittenThisTime
+              + totalByteWrittenButNotVisible;
+          totalByteWrittenButNotVisible = 0;
+        } else {
+          totalByteWrittenButNotVisible += byteWrittenThisTime;
+        }
+
+        if (verboseOption) {
+         LOG.info("TestReadWrite - Written " + byteWrittenThisTime
+              + ". Total written = " + totalByteWritten
+              + ". TotalByteVisible = " + totalByteVisible + " to file "
+              + fname);
+        }
+        byteVisibleToRead = readData(fname, inBuffer, totalByteVisible); 
+        
+        String readmsg;
+         
+        if (byteVisibleToRead >= totalByteVisible
+            && byteVisibleToRead <= totalByteWritten) {
+          readmsg = "pass: reader sees expected number of visible byte " 
+              + byteVisibleToRead + " of file " + fname  + " [pass]";
+        } else {
+          countOfFailures++;
+          readmsg = "fail: reader does not see expected number of visible byte " 
+            + byteVisibleToRead + " of file " + fname  + " [fail]";
+      }
+       LOG.info(readmsg);
+      }
+
+      // test the automatic flush after close
+      writeData(out, outBuffer, chunkSize);
+      totalByteWritten += chunkSize;
+      totalByteVisible += chunkSize + totalByteWrittenButNotVisible;
+      totalByteWrittenButNotVisible += 0;
+
+      out.close();
+
+      byteVisibleToRead = readData(fname, inBuffer, totalByteVisible);
+      long lenFromFc = getFileLengthFromNN(path);
+
+      String readmsg;
+      if (byteVisibleToRead == totalByteVisible) {
+        readmsg = "PASS: reader sees expected size of file " + fname
+            + " after close. File Length from NN: " + lenFromFc + " [Pass]"; 
+      } else {
+        countOfFailures++;
+        readmsg = "FAIL: reader sees is different size of file " + fname
+            + " after close. File Length from NN: " + lenFromFc + " [Fail]"; 
+      }
+     LOG.info(readmsg);
+
+    } catch (IOException e) {
+      throw new IOException(
+          "##### Caught Exception in testAppendWriteAndRead. Close file. " 
+              + "Total Byte Read so far = " + byteVisibleToRead, e);
+    } finally {
+      if (out != null)
+        out.close();
+    }
+    return -countOfFailures;
+  }
+
+  // //////////////////////////////////////////////////////////////////////
+  // // helper function:
+  // /////////////////////////////////////////////////////////////////////
+  private FSDataInputStream openInputStream(Path path) throws IOException {
+    FSDataInputStream in = useFCOption ? mfc.open(path) : mfs.open(path);
+    return in;
+  }
+
+  // length of a file (path name) from NN.
+  private long getFileLengthFromNN(Path path) throws IOException {
+    FileStatus fileStatus = useFCOption ? 
+        mfc.getFileStatus(path) : mfs.getFileStatus(path);
+    return fileStatus.getLen();
+  }
+
+  private long getVisibleFileLength(FSDataInputStream in) throws IOException {
+    DFSClient.DFSDataInputStream din = (DFSClient.DFSDataInputStream) in;
+    return din.getVisibleLength();
+  }
+
+  private boolean ifExists(Path path) throws IOException {
+    return useFCOption ? mfc.util().exists(path) : mfs.exists(path);
+  }
+
+  private Path getFullyQualifiedPath(String pathString) {
+    return useFCOption ?  
+      mfc.makeQualified(new Path(ROOT_DIR, pathString)) :
+      mfs.makeQualified(new Path(ROOT_DIR, pathString));
+  }
+
+  private void usage(){
+    System.out.println("Usage: -chunkSize nn -loop ntime  -f filename");
+    System.exit(0);
+  }
+  
+  private void getCmdLineOption(String[] args){
+    for (int i = 0; i < args.length; i++){
+      if (args[i].equals("-f")) {
+        filenameOption = args[++i];
+      } else if (args[i].equals("-chunkSize")){
+        chunkSizeOption = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-loop")){
+        loopOption = Integer.parseInt(args[++i]);
+      } else { 
+        usage();
+      }
+    }
+    return;
+  }
+
+  /**
+   * Entry point of the test when using a real cluster.
+   * Usage: [-loop ntimes] [-chunkSize nn]  [-f filename]
+   * -loop: iterate ntimes: each iteration consists of a write, then a read
+   * -chunkSize: number of byte for each write
+   * -f filename: filename to write and read
+   * Default: ntimes = 10; chunkSize = 10000; filename = /tmp/fileX1
+   */
+  public static void main(String[] args) {
+    try {
+      TestWriteRead trw = new TestWriteRead();
+      trw.initClusterModeTest();
+      trw.getCmdLineOption(args);
+      int stat = trw.clusterTestWriteRead1();
+      
+      if (stat == 0){
+        System.out.println("Status: clusterTestWriteRead1 test PASS"); 
+      } else {
+        System.out.println("Status: clusterTestWriteRead1 test FAIL");    
+      }
+      System.exit(stat);
+    } catch (IOException e) {
+     LOG.info("#### Exception in Main");
+      e.printStackTrace();
+      System.exit(-2);
+    }
+  }
+}