HDFS-10655. Fix path related byte array conversion bugs. (daryn)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 0ba80d9..5ab6978 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -275,14 +275,15 @@
     Preconditions.checkArgument(offset >= 0 && offset < pathComponents.length);
     Preconditions.checkArgument(length >= 0 && offset + length <=
         pathComponents.length);
-    if (pathComponents.length == 1
+    if (offset == 0 && length == 1
         && (pathComponents[0] == null || pathComponents[0].length == 0)) {
       return Path.SEPARATOR;
     }
     StringBuilder result = new StringBuilder();
-    for (int i = offset; i < offset + length; i++) {
+    int lastIndex = offset + length - 1;
+    for (int i = offset; i <= lastIndex; i++) {
       result.append(new String(pathComponents[i], Charsets.UTF_8));
-      if (i < pathComponents.length - 1) {
+      if (i < lastIndex) {
         result.append(Path.SEPARATOR_CHAR);
       }
     }
@@ -348,40 +349,37 @@
   public static byte[][] bytes2byteArray(byte[] bytes,
                                          int len,
                                          byte separator) {
-    assert len <= bytes.length;
-    int splits = 0;
+    Preconditions.checkPositionIndex(len, bytes.length);
     if (len == 0) {
       return new byte[][]{null};
     }
-    // Count the splits. Omit multiple separators and the last one
-    for (int i = 0; i < len; i++) {
-      if (bytes[i] == separator) {
+    // Count the splits. Omit multiple separators and the last one by
+    // peeking at prior byte.
+    int splits = 0;
+    for (int i = 1; i < len; i++) {
+      if (bytes[i-1] == separator && bytes[i] != separator) {
         splits++;
       }
     }
-    int last = len - 1;
-    while (last > -1 && bytes[last--] == separator) {
-      splits--;
-    }
     if (splits == 0 && bytes[0] == separator) {
       return new byte[][]{null};
     }
     splits++;
     byte[][] result = new byte[splits][];
-    int startIndex = 0;
     int nextIndex = 0;
-    int index = 0;
-    // Build the splits
-    while (index < splits) {
+    // Build the splits.
+    for (int i = 0; i < splits; i++) {
+      int startIndex = nextIndex;
+      // find next separator in the bytes.
       while (nextIndex < len && bytes[nextIndex] != separator) {
         nextIndex++;
       }
-      result[index] = new byte[nextIndex - startIndex];
-      System.arraycopy(bytes, startIndex, result[index], 0, nextIndex
-              - startIndex);
-      index++;
-      startIndex = nextIndex + 1;
-      nextIndex = startIndex;
+      result[i] = (nextIndex > 0)
+          ? Arrays.copyOfRange(bytes, startIndex, nextIndex)
+          : DFSUtilClient.EMPTY_BYTES; // reuse empty bytes for root.
+      do { // skip over separators.
+        nextIndex++;
+      } while (nextIndex < len && bytes[nextIndex] == separator);
     }
     return result;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
index d6c5183..dd9ca22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
@@ -191,7 +191,7 @@
       "/user/testHome/FileNameLength", PathComponentTooLongException.class);
 
     renameCheckParentDirectory("/user/testHome/FileNameLength",
-      "/user/testHome/really_big_name_0003_fail", "/user/testHome/",
+      "/user/testHome/really_big_name_0003_fail", "/user/testHome",
       PathComponentTooLongException.class);
 
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathComponents.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathComponents.java
index 3daabb9..189f34c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathComponents.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathComponents.java
@@ -17,43 +17,95 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.junit.Assert.assertTrue;
-
+import static org.junit.Assert.assertEquals;
 import java.util.Arrays;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.junit.Test;
 
-import com.google.common.base.Charsets;
-
-
 /**
  * 
  */
 public class TestPathComponents {
 
   @Test
-  public void testBytes2ByteArray() throws Exception {
-    testString("/");
-    testString("/file");
-    testString("/directory/");
-    testString("//");
-    testString("/dir//file");
-    testString("/dir/dir1//");
+  public void testBytes2ByteArrayFQ() throws Exception {
+    testString("/", new String[]{null});
+    testString("//", new String[]{null});
+    testString("/file", new String[]{"", "file"});
+    testString("/dir/", new String[]{"", "dir"});
+    testString("//file", new String[]{"", "file"});
+    testString("/dir//file", new String[]{"", "dir", "file"});
+    testString("//dir/dir1//", new String[]{"", "dir", "dir1"});
+    testString("//dir//dir1//", new String[]{"", "dir", "dir1"});
+    testString("//dir//dir1//file", new String[]{"", "dir", "dir1", "file"});
   }
 
-  public void testString(String str) throws Exception {
-    String pathString = str;
-    byte[][] oldPathComponents = INode.getPathComponents(pathString);
-    byte[][] newPathComponents = 
-                DFSUtil.bytes2byteArray(pathString.getBytes(Charsets.UTF_8),
-                                        (byte) Path.SEPARATOR_CHAR);
-    if (oldPathComponents[0] == null) {
-      assertTrue(oldPathComponents[0] == newPathComponents[0]);
-    } else {
-      assertTrue("Path components do not match for " + pathString,
-                  Arrays.deepEquals(oldPathComponents, newPathComponents));
+  @Test
+  public void testBytes2ByteArrayRelative() throws Exception {
+    testString("file", new String[]{"file"});
+    testString("dir/", new String[]{"dir"});
+    testString("dir//", new String[]{"dir"});
+    testString("dir//file", new String[]{"dir", "file"});
+    testString("dir/dir1//", new String[]{"dir", "dir1"});
+    testString("dir//dir1//", new String[]{"dir", "dir1"});
+    testString("dir//dir1//file", new String[]{"dir", "dir1", "file"});
+  }
+
+  @Test
+  public void testByteArray2PathStringRoot() {
+    byte[][] components = DFSUtil.getPathComponents("/");
+    assertEquals("", DFSUtil.byteArray2PathString(components, 0, 0));
+    assertEquals("/", DFSUtil.byteArray2PathString(components, 0, 1));
+  }
+
+  @Test
+  public void testByteArray2PathStringFQ() {
+    byte[][] components = DFSUtil.getPathComponents("/1/2/3");
+    assertEquals("/1/2/3", DFSUtil.byteArray2PathString(components));
+
+    assertEquals("", DFSUtil.byteArray2PathString(components, 0, 0));
+    assertEquals("/", DFSUtil.byteArray2PathString(components, 0, 1));
+    assertEquals("/1", DFSUtil.byteArray2PathString(components, 0, 2));
+    assertEquals("/1/2", DFSUtil.byteArray2PathString(components, 0, 3));
+    assertEquals("/1/2/3", DFSUtil.byteArray2PathString(components, 0, 4));
+
+    assertEquals("", DFSUtil.byteArray2PathString(components, 1, 0));
+    assertEquals("1", DFSUtil.byteArray2PathString(components, 1, 1));
+    assertEquals("1/2", DFSUtil.byteArray2PathString(components, 1, 2));
+    assertEquals("1/2/3", DFSUtil.byteArray2PathString(components, 1, 3));
+  }
+
+  @Test
+  public void testByteArray2PathStringRelative() {
+    byte[][] components = DFSUtil.getPathComponents("1/2/3");
+    assertEquals("1/2/3", DFSUtil.byteArray2PathString(components));
+
+    assertEquals("", DFSUtil.byteArray2PathString(components, 0, 0));
+    assertEquals("1", DFSUtil.byteArray2PathString(components, 0, 1));
+    assertEquals("1/2", DFSUtil.byteArray2PathString(components, 0, 2));
+    assertEquals("1/2/3", DFSUtil.byteArray2PathString(components, 0, 3));
+
+    assertEquals("", DFSUtil.byteArray2PathString(components, 1, 0));
+    assertEquals("2", DFSUtil.byteArray2PathString(components, 1, 1));
+    assertEquals("2/3", DFSUtil.byteArray2PathString(components, 1, 2));
+  }
+
+  public void testString(String path, String[] expected) throws Exception {
+    byte[][] components = DFSUtil.getPathComponents(path);
+    String[] actual = new String[components.length];
+    for (int i=0; i < components.length; i++) {
+      if (components[i] != null) {
+        actual[i] = DFSUtil.bytes2String(components[i]);
+      }
     }
+    assertEquals(Arrays.asList(expected), Arrays.asList(actual));
+
+    // test the reconstituted path
+    path = path.replaceAll("/+", "/");
+    if (path.length() > 1) {
+      path = path.replaceAll("/$", "");
+    }
+    assertEquals(path, DFSUtil.byteArray2PathString(components));
   }
 }