HDFS-5259. Merging change r1529735 from branch-2

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1529738 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java
index b6b8fd0..b04e7fc 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java
@@ -28,8 +28,8 @@
  * WRITE3 Request
  */
 public class WRITE3Request extends RequestWithHandle {
-  private final long offset;
-  private final int count;
+  private long offset;
+  private int count;
   private final WriteStableHow stableHow;
   private final ByteBuffer data;
 
@@ -54,10 +54,18 @@
     return this.offset;
   }
 
+  public void setOffset(long offset) {
+    this.offset = offset;
+  }
+  
   public int getCount() {
     return this.count;
   }
 
+  public void setCount(int count) {
+    this.count = count;
+  }
+  
   public WriteStableHow getStableHow() {
     return this.stableHow;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
index 36b57d1..6c87a8c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
@@ -22,6 +22,7 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.security.InvalidParameterException;
 import java.util.EnumSet;
@@ -55,6 +56,7 @@
 import org.apache.hadoop.util.Daemon;
 import org.jboss.netty.channel.Channel;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -362,6 +364,30 @@
     }
   }
 
+  @VisibleForTesting
+  public static void alterWriteRequest(WRITE3Request request, long cachedOffset) {
+    long offset = request.getOffset();
+    int count = request.getCount();
+    long smallerCount = offset + count - cachedOffset;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
+          + " current offset %d," + " drop the overlapped section (%d-%d)"
+          + " and append new data (%d-%d).", offset, (offset + count - 1),
+          cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+              + count - 1)));
+    }
+    
+    ByteBuffer data = request.getData();
+    Preconditions.checkState(data.position() == 0,
+        "The write request data has non-zero position");
+    data.position((int) (cachedOffset - offset));
+    Preconditions.checkState(data.limit() - data.position() == smallerCount,
+        "The write request buffer has wrong limit/position regarding count");
+    
+    request.setOffset(cachedOffset);
+    request.setCount((int) smallerCount);
+  }
+  
   /**
    * Creates and adds a WriteCtx into the pendingWrites map. This is a
    * synchronized method to handle concurrent writes.
@@ -374,12 +400,40 @@
     long offset = request.getOffset();
     int count = request.getCount();
     long cachedOffset = nextOffset.get();
-
+    int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT;
+    
     if (LOG.isDebugEnabled()) {
       LOG.debug("requesed offset=" + offset + " and current offset="
           + cachedOffset);
     }
 
+    // Handle a special case first
+    if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
+      // One Linux client behavior: after a file is closed and reopened to
+      // write, the client sometimes combines previous written data(could still
+      // be in kernel buffer) with newly appended data in one write. This is
+      // usually the first write after file reopened. In this
+      // case, we log the event and drop the overlapped section.
+      LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
+          + " current offset %d," + " drop the overlapped section (%d-%d)"
+          + " and append new data (%d-%d).", offset, (offset + count - 1),
+          cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+              + count - 1)));
+
+      if (!pendingWrites.isEmpty()) {
+        LOG.warn("There are other pending writes, fail this jumbo write");
+        return null;
+      }
+      
+      LOG.warn("Modify this write to write only the appended data");
+      alterWriteRequest(request, cachedOffset);
+
+      // Update local variable
+      originalCount = count;
+      offset = request.getOffset();
+      count = request.getCount();
+    }
+    
     // Fail non-append call
     if (offset < cachedOffset) {
       LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
@@ -389,8 +443,9 @@
       DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
           : WriteCtx.DataState.ALLOW_DUMP;
       WriteCtx writeCtx = new WriteCtx(request.getHandle(),
-          request.getOffset(), request.getCount(), request.getStableHow(),
-          request.getData().array(), channel, xid, false, dataState);
+          request.getOffset(), request.getCount(), originalCount,
+          request.getStableHow(), request.getData(), channel, xid, false,
+          dataState);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Add new write to the list with nextOffset " + cachedOffset
             + " and requesed offset=" + offset);
@@ -421,8 +476,7 @@
     WRITE3Response response;
     long cachedOffset = nextOffset.get();
     if (offset + count > cachedOffset) {
-      LOG.warn("Haven't noticed any partial overwrite for a sequential file"
-          + " write requests. Treat it as a real random write, no support.");
+      LOG.warn("Treat this jumbo write as a real random write, no support.");
       response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
           WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
     } else {
@@ -641,6 +695,7 @@
   private void addWrite(WriteCtx writeCtx) {
     long offset = writeCtx.getOffset();
     int count = writeCtx.getCount();
+    // For the offset range (min, max), min is inclusive, and max is exclusive
     pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
   }
   
@@ -753,19 +808,7 @@
     long offset = writeCtx.getOffset();
     int count = writeCtx.getCount();
     WriteStableHow stableHow = writeCtx.getStableHow();
-    byte[] data = null;
-    try {
-      data = writeCtx.getData();
-    } catch (Exception e1) {
-      LOG.error("Failed to get request data offset:" + offset + " count:"
-          + count + " error:" + e1);
-      // Cleanup everything
-      cleanup();
-      return;
-    }
     
-    Preconditions.checkState(data.length == count);
-
     FileHandle handle = writeCtx.getHandle();
     if (LOG.isDebugEnabled()) {
       LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
@@ -774,8 +817,8 @@
 
     try {
       // The write is not protected by lock. asyncState is used to make sure
-      // there is one thread doing write back at any time
-      fos.write(data, 0, count);
+      // there is one thread doing write back at any time    
+      writeCtx.writeData(fos);
       
       long flushedOffset = getFlushedOffset();
       if (flushedOffset != (offset + count)) {
@@ -784,10 +827,6 @@
             + (offset + count));
       }
       
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("After writing " + handle.getFileId() + " at offset "
-            + offset + ", update the memory count.");
-      }
 
       // Reduce memory occupation size if request was allowed dumped
       if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
@@ -795,6 +834,11 @@
           if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
             writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
             updateNonSequentialWriteInMemory(-count);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("After writing " + handle.getFileId() + " at offset "
+                  + offset + ", updated the memory count, new value:"
+                  + nonSequentialWriteInMemory.get());
+            }
           }
         }
       }
@@ -802,6 +846,11 @@
       if (!writeCtx.getReplied()) {
         WccAttr preOpAttr = latestAttr.getWccAttr();
         WccData fileWcc = new WccData(preOpAttr, latestAttr);
+        if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {
+          LOG.warn("Return original count:" + writeCtx.getOriginalCount()
+              + " instead of real data count:" + count);
+          count = writeCtx.getOriginalCount();
+        }
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
             fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
         Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
@@ -809,7 +858,7 @@
       }
     } catch (IOException e) {
       LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
-          + offset + " and length " + data.length, e);
+          + offset + " and length " + count, e);
       if (!writeCtx.getReplied()) {
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
         Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
index f1af652..05e0fb7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
@@ -20,13 +20,16 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
 import org.jboss.netty.channel.Channel;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -50,8 +53,17 @@
   private final FileHandle handle;
   private final long offset;
   private final int count;
+  
+  //Only needed for overlapped write, referring OpenFileCtx.addWritesToCache()  
+  private final int originalCount; 
+  public static final int INVALID_ORIGINAL_COUNT = -1;
+  
+  public int getOriginalCount() {
+    return originalCount;
+  }
+
   private final WriteStableHow stableHow;
-  private volatile byte[] data;
+  private volatile ByteBuffer data;
   
   private final Channel channel;
   private final int xid;
@@ -89,9 +101,13 @@
       }
       return 0;
     }
+
+    // Resized write should not allow dump
+    Preconditions.checkState(originalCount == INVALID_ORIGINAL_COUNT);
+
     this.raf = raf;
     dumpFileOffset = dumpOut.getChannel().position();
-    dumpOut.write(data, 0, count);
+    dumpOut.write(data.array(), 0, count);
     if (LOG.isDebugEnabled()) {
       LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
     }
@@ -127,7 +143,8 @@
     return stableHow;
   }
 
-  byte[] getData() throws IOException {
+  @VisibleForTesting
+  ByteBuffer getData() throws IOException {
     if (dataState != DataState.DUMPED) {
       synchronized (this) {
         if (dataState != DataState.DUMPED) {
@@ -143,15 +160,45 @@
 
   private void loadData() throws IOException {
     Preconditions.checkState(data == null);
-    data = new byte[count];
+    byte[] rawData = new byte[count];
     raf.seek(dumpFileOffset);
-    int size = raf.read(data, 0, count);
+    int size = raf.read(rawData, 0, count);
     if (size != count) {
       throw new IOException("Data count is " + count + ", but read back "
           + size + "bytes");
     }
+    data = ByteBuffer.wrap(rawData);
   }
 
+  public void writeData(HdfsDataOutputStream fos) throws IOException {
+    Preconditions.checkState(fos != null);
+
+    ByteBuffer dataBuffer = null;
+    try {
+      dataBuffer = getData();
+    } catch (Exception e1) {
+      LOG.error("Failed to get request data offset:" + offset + " count:"
+          + count + " error:" + e1);
+      throw new IOException("Can't get WriteCtx.data");
+    }
+
+    byte[] data = dataBuffer.array();
+    int position = dataBuffer.position();
+    int limit = dataBuffer.limit();
+    Preconditions.checkState(limit - position == count);
+    // Modified write has a valid original count
+    if (position != 0) {
+      if (limit != getOriginalCount()) {
+        throw new IOException("Modified write has differnt original size."
+            + "buff position:" + position + " buff limit:" + limit + ". "
+            + toString());
+      }
+    }
+    
+    // Now write data
+    fos.write(data, position, count);
+  }
+  
   Channel getChannel() {
     return channel;
   }
@@ -168,11 +215,13 @@
     this.replied = replied;
   }
   
-  WriteCtx(FileHandle handle, long offset, int count, WriteStableHow stableHow,
-      byte[] data, Channel channel, int xid, boolean replied, DataState dataState) {
+  WriteCtx(FileHandle handle, long offset, int count, int originalCount,
+      WriteStableHow stableHow, ByteBuffer data, Channel channel, int xid,
+      boolean replied, DataState dataState) {
     this.handle = handle;
     this.offset = offset;
     this.count = count;
+    this.originalCount = originalCount;
     this.stableHow = stableHow;
     this.data = data;
     this.channel = channel;
@@ -185,7 +234,7 @@
   @Override
   public String toString() {
     return "Id:" + handle.getFileId() + " offset:" + offset + " count:" + count
-        + " stableHow:" + stableHow + " replied:" + replied + " dataState:"
-        + dataState + " xid:" + xid;
+        + " originalCount:" + originalCount + " stableHow:" + stableHow
+        + " replied:" + replied + " dataState:" + dataState + " xid:" + xid;
   }
 }
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
new file mode 100644
index 0000000..d24e5d1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.nfs.nfs3.FileHandle;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
+import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.junit.Test;
+
+public class TestWrites {
+  @Test
+  public void testAlterWriteRequest() throws IOException {
+    int len = 20;
+    byte[] data = new byte[len];
+    ByteBuffer buffer = ByteBuffer.wrap(data);
+
+    for (int i = 0; i < len; i++) {
+      buffer.put((byte) i);
+    }
+    buffer.flip();
+    int originalCount = buffer.array().length;
+    WRITE3Request request = new WRITE3Request(new FileHandle(), 0, data.length,
+        WriteStableHow.UNSTABLE, buffer);
+
+    WriteCtx writeCtx1 = new WriteCtx(request.getHandle(), request.getOffset(),
+        request.getCount(), WriteCtx.INVALID_ORIGINAL_COUNT,
+        request.getStableHow(), request.getData(), null, 1, false,
+        WriteCtx.DataState.NO_DUMP);
+
+    Assert.assertTrue(writeCtx1.getData().array().length == originalCount);
+
+    // Now change the write request
+    OpenFileCtx.alterWriteRequest(request, 12);
+
+    WriteCtx writeCtx2 = new WriteCtx(request.getHandle(), request.getOffset(),
+        request.getCount(), originalCount, request.getStableHow(),
+        request.getData(), null, 2, false, WriteCtx.DataState.NO_DUMP);
+    ByteBuffer appendedData = writeCtx2.getData();
+
+    int position = appendedData.position();
+    int limit = appendedData.limit();
+    Assert.assertTrue(position == 12);
+    Assert.assertTrue(limit - position == 8);
+    Assert.assertTrue(appendedData.get(position) == (byte) 12);
+    Assert.assertTrue(appendedData.get(position + 1) == (byte) 13);
+    Assert.assertTrue(appendedData.get(position + 2) == (byte) 14);
+    Assert.assertTrue(appendedData.get(position + 7) == (byte) 19);
+
+    // Test current file write offset is at boundaries
+    buffer.position(0);
+    request = new WRITE3Request(new FileHandle(), 0, data.length,
+        WriteStableHow.UNSTABLE, buffer);
+    OpenFileCtx.alterWriteRequest(request, 1);
+    WriteCtx writeCtx3 = new WriteCtx(request.getHandle(), request.getOffset(),
+        request.getCount(), originalCount, request.getStableHow(),
+        request.getData(), null, 2, false, WriteCtx.DataState.NO_DUMP);
+    appendedData = writeCtx3.getData();
+    position = appendedData.position();
+    limit = appendedData.limit();
+    Assert.assertTrue(position == 1);
+    Assert.assertTrue(limit - position == 19);
+    Assert.assertTrue(appendedData.get(position) == (byte) 1);
+    Assert.assertTrue(appendedData.get(position + 18) == (byte) 19);
+
+    // Reset buffer position before test another boundary
+    buffer.position(0);
+    request = new WRITE3Request(new FileHandle(), 0, data.length,
+        WriteStableHow.UNSTABLE, buffer);
+    OpenFileCtx.alterWriteRequest(request, 19);
+    WriteCtx writeCtx4 = new WriteCtx(request.getHandle(), request.getOffset(),
+        request.getCount(), originalCount, request.getStableHow(),
+        request.getData(), null, 2, false, WriteCtx.DataState.NO_DUMP);
+    appendedData = writeCtx4.getData();
+    position = appendedData.position();
+    limit = appendedData.limit();
+    Assert.assertTrue(position == 19);
+    Assert.assertTrue(limit - position == 1);
+    Assert.assertTrue(appendedData.get(position) == (byte) 19);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index a1decce..ee67ff7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -73,6 +73,9 @@
     HDFS-5299. DFS client hangs in updatePipeline RPC when failover happened.
     (Vinay via jing9)
 
+    HDFS-5259. Support client which combines appended data with old data
+    before sends it to NFS server. (brandonli)
+
 Release 2.1.1-beta - 2013-09-23
 
   INCOMPATIBLE CHANGES