HADOOP-13055. Implement linkMergeSlash and linkFallback for ViewFileSystem

(cherry picked from commit 133d7ca76e3d4b60292d57429d4259e80bec650a)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
index 8acd41f..5867f62 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.fs.viewfs;
 
 import java.net.URI;
+import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
@@ -68,7 +69,72 @@
     addLink( conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, 
         src, target);   
   }
-  
+
+  /**
+   * Add a LinkMergeSlash to the config for the specified mount table.
+   * @param conf
+   * @param mountTableName
+   * @param target
+   */
+  public static void addLinkMergeSlash(Configuration conf,
+      final String mountTableName, final URI target) {
+    conf.set(getConfigViewFsPrefix(mountTableName) + "." +
+        Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH, target.toString());
+  }
+
+  /**
+   * Add a LinkMergeSlash to the config for the default mount table.
+   * @param conf
+   * @param target
+   */
+  public static void addLinkMergeSlash(Configuration conf, final URI target) {
+    addLinkMergeSlash(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
+        target);
+  }
+
+  /**
+   * Add a LinkFallback to the config for the specified mount table.
+   * @param conf
+   * @param mountTableName
+   * @param target
+   */
+  public static void addLinkFallback(Configuration conf,
+      final String mountTableName, final URI target) {
+    conf.set(getConfigViewFsPrefix(mountTableName) + "." +
+        Constants.CONFIG_VIEWFS_LINK_FALLBACK, target.toString());
+  }
+
+  /**
+   * Add a LinkFallback to the config for the default mount table.
+   * @param conf
+   * @param target
+   */
+  public static void addLinkFallback(Configuration conf, final URI target) {
+    addLinkFallback(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
+        target);
+  }
+
+  /**
+   * Add a LinkMerge to the config for the specified mount table.
+   * @param conf
+   * @param mountTableName
+   * @param targets
+   */
+  public static void addLinkMerge(Configuration conf,
+      final String mountTableName, final URI[] targets) {
+    conf.set(getConfigViewFsPrefix(mountTableName) + "." +
+        Constants.CONFIG_VIEWFS_LINK_MERGE, Arrays.toString(targets));
+  }
+
+  /**
+   * Add a LinkMerge to the config for the default mount table.
+   * @param conf
+   * @param targets
+   */
+  public static void addLinkMerge(Configuration conf, final URI[] targets) {
+    addLinkMerge(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, targets);
+  }
+
   /**
    *
    * @param conf
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
index 3f9aae2..7a0a6661 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
@@ -51,12 +51,17 @@
   /**
    * Config variable for specifying a simple link
    */
-  public static final String CONFIG_VIEWFS_LINK = "link";
-  
+  String CONFIG_VIEWFS_LINK = "link";
+
+  /**
+   * Config variable for specifying a fallback for link mount points.
+   */
+  String CONFIG_VIEWFS_LINK_FALLBACK = "linkFallback";
+
   /**
    * Config variable for specifying a merge link
    */
-  public static final String CONFIG_VIEWFS_LINK_MERGE = "linkMerge";
+  String CONFIG_VIEWFS_LINK_MERGE = "linkMerge";
 
   /**
    * Config variable for specifying an nfly link. Nfly writes to multiple
@@ -68,10 +73,9 @@
    * Config variable for specifying a merge of the root of the mount-table
    *  with the root of another file system. 
    */
-  public static final String CONFIG_VIEWFS_LINK_MERGE_SLASH = "linkMergeSlash";
+  String CONFIG_VIEWFS_LINK_MERGE_SLASH = "linkMergeSlash";
 
-  static public final FsPermission PERMISSION_555 =
-      new FsPermission((short) 0555);
+  FsPermission PERMISSION_555 = new FsPermission((short) 0555);
 
   String CONFIG_VIEWFS_RENAME_STRATEGY = "fs.viewfs.rename.strategy";
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
index 199ccc6..661cc9a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
@@ -18,12 +18,15 @@
 package org.apache.hadoop.fs.viewfs;
 
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -62,8 +65,12 @@
   }
 
   static final Path SlashPath = new Path("/");
-  private final INodeDir<T> root;     // the root of the mount table
-  private final String homedirPrefix; // the homedir for this mount table
+  // the root of the mount table
+  private final INode<T> root;
+  // the fallback filesystem
+  private final INodeLink<T> rootFallbackLink;
+  // the homedir for this mount table
+  private final String homedirPrefix;
   private List<MountPoint<T>> mountPoints = new ArrayList<MountPoint<T>>();
 
   static class MountPoint<T> {
@@ -86,7 +93,7 @@
   }
 
   /**
-   * Internal class for inode tree
+   * Internal class for INode tree.
    * @param <T>
    */
   abstract static class INode<T> {
@@ -95,21 +102,58 @@
     public INode(String pathToNode, UserGroupInformation aUgi) {
       fullPath = pathToNode;
     }
+
+    // INode forming the internal mount table directory tree
+    // for ViewFileSystem. This internal directory tree is
+    // constructed based on the mount table config entries
+    // and is read only.
+    abstract boolean isInternalDir();
+
+    // INode linking to another filesystem. Represented
+    // via mount table link config entries.
+    boolean isLink() {
+      return !isInternalDir();
+    }
   }
 
   /**
-   * Internal class to represent an internal dir of the mount table
+   * Internal class to represent an internal dir of the mount table.
    * @param <T>
    */
   static class INodeDir<T> extends INode<T> {
-    final Map<String,INode<T>> children = new HashMap<String,INode<T>>();
-    T InodeDirFs =  null; // file system of this internal directory of mountT
-    boolean isRoot = false;
+    private final Map<String, INode<T>> children = new HashMap<>();
+    private T internalDirFs =  null; //filesystem of this internal directory
+    private boolean isRoot = false;
 
     INodeDir(final String pathToNode, final UserGroupInformation aUgi) {
       super(pathToNode, aUgi);
     }
 
+    @Override
+    boolean isInternalDir() {
+      return true;
+    }
+
+    T getInternalDirFs() {
+      return internalDirFs;
+    }
+
+    void setInternalDirFs(T internalDirFs) {
+      this.internalDirFs = internalDirFs;
+    }
+
+    void setRoot(boolean root) {
+      isRoot = root;
+    }
+
+    boolean isRoot() {
+      return isRoot;
+    }
+
+    Map<String, INode<T>> getChildren() {
+      return Collections.unmodifiableMap(children);
+    }
+
     INode<T> resolveInternal(final String pathComponent) {
       return children.get(pathComponent);
     }
@@ -120,7 +164,7 @@
         throw new FileAlreadyExistsException();
       }
       final INodeDir<T> newDir = new INodeDir<T>(fullPath +
-          (isRoot ? "" : "/") + pathComponent, aUgi);
+          (isRoot() ? "" : "/") + pathComponent, aUgi);
       children.put(pathComponent, newDir);
       return newDir;
     }
@@ -134,10 +178,43 @@
     }
   }
 
+  /**
+   * Mount table link type.
+   */
   enum LinkType {
+    /**
+     * Link entry pointing to a single filesystem uri.
+     * Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.link.<link_name>
+     * Refer: {@link Constants#CONFIG_VIEWFS_LINK}
+     */
     SINGLE,
+    /**
+     * Fallback filesystem for the paths not mounted by
+     * any single link entries.
+     * Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkFallback
+     * Refer: {@link Constants#CONFIG_VIEWFS_LINK_FALLBACK}
+     */
+    SINGLE_FALLBACK,
+    /**
+     * Link entry pointing to an union of two or more filesystem uris.
+     * Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkMerge.<link_name>
+     * Refer: {@link Constants#CONFIG_VIEWFS_LINK_MERGE}
+     */
     MERGE,
-    NFLY
+    /**
+     * Link entry for merging mount table's root with the
+     * root of another filesystem.
+     * Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkMergeSlash
+     * Refer: {@link Constants#CONFIG_VIEWFS_LINK_MERGE_SLASH}
+     */
+    MERGE_SLASH,
+    /**
+     * Link entry to write to multiple filesystems and read
+     * from the closest filesystem.
+     * Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkNfly
+     * Refer: {@link Constants#CONFIG_VIEWFS_LINK_NFLY}
+     */
+    NFLY;
   }
 
   /**
@@ -195,6 +272,11 @@
       return new Path(result.toString());
     }
 
+    @Override
+    boolean isInternalDir() {
+      return false;
+    }
+
     /**
      * Get the instance of FileSystem to use, creating one if needed.
      * @return An Initialized instance of T
@@ -236,7 +318,10 @@
     }
 
     final String[] srcPaths = breakIntoPathComponents(src);
-    INodeDir<T> curInode = root;
+    // Make sure root is of INodeDir type before
+    // adding any regular links to it.
+    Preconditions.checkState(root.isInternalDir());
+    INodeDir<T> curInode = getRootDir();
     int i;
     // Ignore first initial slash, process all except last component
     for (i = 1; i < srcPaths.length - 1; i++) {
@@ -244,15 +329,15 @@
       INode<T> nextInode = curInode.resolveInternal(iPath);
       if (nextInode == null) {
         INodeDir<T> newDir = curInode.addDir(iPath, aUgi);
-        newDir.InodeDirFs = getTargetFileSystem(newDir);
+        newDir.setInternalDirFs(getTargetFileSystem(newDir));
         nextInode = newDir;
       }
-      if (nextInode instanceof INodeLink) {
+      if (nextInode.isLink()) {
         // Error - expected a dir but got a link
         throw new FileAlreadyExistsException("Path " + nextInode.fullPath +
             " already exists as link");
       } else {
-        assert (nextInode instanceof INodeDir);
+        assert(nextInode.isInternalDir());
         curInode = (INodeDir<T>) nextInode;
       }
     }
@@ -278,6 +363,11 @@
       newLink = new INodeLink<T>(fullPath, aUgi,
           initAndGetTargetFs(), new URI(target));
       break;
+    case SINGLE_FALLBACK:
+    case MERGE_SLASH:
+      // Link fallback and link merge slash configuration
+      // are handled specially at InodeTree.
+      throw new IllegalArgumentException("Unexpected linkType: " + linkType);
     case MERGE:
     case NFLY:
       final URI[] targetUris = StringUtils.stringToURI(
@@ -305,6 +395,77 @@
   protected abstract T getTargetFileSystem(String settings, URI[] mergeFsURIs)
       throws UnsupportedFileSystemException, URISyntaxException, IOException;
 
+  private INodeDir<T> getRootDir() {
+    Preconditions.checkState(root.isInternalDir());
+    return (INodeDir<T>)root;
+  }
+
+  private INodeLink<T> getRootLink() {
+    Preconditions.checkState(root.isLink());
+    return (INodeLink<T>)root;
+  }
+
+  private boolean hasFallbackLink() {
+    return rootFallbackLink != null;
+  }
+
+  private INodeLink<T> getRootFallbackLink() {
+    Preconditions.checkState(root.isInternalDir());
+    return rootFallbackLink;
+  }
+
+  /**
+   * An internal class representing the ViewFileSystem mount table
+   * link entries and their attributes.
+   * @see LinkType
+   */
+  private static class LinkEntry {
+    private final String src;
+    private final String target;
+    private final LinkType linkType;
+    private final String settings;
+    private final UserGroupInformation ugi;
+    private final Configuration config;
+
+    LinkEntry(String src, String target, LinkType linkType, String settings,
+        UserGroupInformation ugi, Configuration config) {
+      this.src = src;
+      this.target = target;
+      this.linkType = linkType;
+      this.settings = settings;
+      this.ugi = ugi;
+      this.config = config;
+    }
+
+    String getSrc() {
+      return src;
+    }
+
+    String getTarget() {
+      return target;
+    }
+
+    LinkType getLinkType() {
+      return linkType;
+    }
+
+    boolean isLinkType(LinkType type) {
+      return this.linkType == type;
+    }
+
+    String getSettings() {
+      return settings;
+    }
+
+    UserGroupInformation getUgi() {
+      return ugi;
+    }
+
+    Configuration getConfig() {
+      return config;
+    }
+  }
+
   /**
    * Create Inode Tree from the specified mount-table specified in Config
    * @param config - the mount table keys are prefixed with 
@@ -318,39 +479,59 @@
   protected InodeTree(final Configuration config, final String viewName)
       throws UnsupportedFileSystemException, URISyntaxException,
       FileAlreadyExistsException, IOException {
-    String vName = viewName;
-    if (vName == null) {
-      vName = Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
+    String mountTableName = viewName;
+    if (mountTableName == null) {
+      mountTableName = Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
     }
-    homedirPrefix = ConfigUtil.getHomeDirValue(config, vName);
-    root = new INodeDir<T>("/", UserGroupInformation.getCurrentUser());
-    root.InodeDirFs = getTargetFileSystem(root);
-    root.isRoot = true;
+    homedirPrefix = ConfigUtil.getHomeDirValue(config, mountTableName);
 
-    final String mtPrefix = Constants.CONFIG_VIEWFS_PREFIX + "." +
-        vName + ".";
+    boolean isMergeSlashConfigured = false;
+    String mergeSlashTarget = null;
+    List<LinkEntry> linkEntries = new LinkedList<>();
+
+    final String mountTablePrefix =
+        Constants.CONFIG_VIEWFS_PREFIX + "." + mountTableName + ".";
     final String linkPrefix = Constants.CONFIG_VIEWFS_LINK + ".";
+    final String linkFallbackPrefix = Constants.CONFIG_VIEWFS_LINK_FALLBACK;
     final String linkMergePrefix = Constants.CONFIG_VIEWFS_LINK_MERGE + ".";
+    final String linkMergeSlashPrefix =
+        Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH;
     boolean gotMountTableEntry = false;
     final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     for (Entry<String, String> si : config) {
       final String key = si.getKey();
-      if (key.startsWith(mtPrefix)) {
+      if (key.startsWith(mountTablePrefix)) {
         gotMountTableEntry = true;
-        LinkType linkType = LinkType.SINGLE;
-        String src = key.substring(mtPrefix.length());
+        LinkType linkType;
+        String src = key.substring(mountTablePrefix.length());
         String settings = null;
         if (src.startsWith(linkPrefix)) {
           src = src.substring(linkPrefix.length());
           if (src.equals(SlashPath.toString())) {
             throw new UnsupportedFileSystemException("Unexpected mount table "
-                + "link entry '" + key + "'. "
-                + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH  + " is not "
-                + "supported yet.");
+                + "link entry '" + key + "'. Use "
+                + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH  + " instead!");
           }
+          linkType = LinkType.SINGLE;
+        } else if (src.startsWith(linkFallbackPrefix)) {
+          if (src.length() != linkFallbackPrefix.length()) {
+            throw new IOException("ViewFs: Mount points initialization error." +
+                " Invalid " + Constants.CONFIG_VIEWFS_LINK_FALLBACK +
+                " entry in config: " + src);
+          }
+          linkType = LinkType.SINGLE_FALLBACK;
         } else if (src.startsWith(linkMergePrefix)) { // A merge link
-          linkType = LinkType.MERGE;
           src = src.substring(linkMergePrefix.length());
+          linkType = LinkType.MERGE;
+        } else if (src.startsWith(linkMergeSlashPrefix)) {
+          // This is a LinkMergeSlash entry. This entry should
+          // not have any additional source path.
+          if (src.length() != linkMergeSlashPrefix.length()) {
+            throw new IOException("ViewFs: Mount points initialization error." +
+                " Invalid " + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH +
+                " entry in config: " + src);
+          }
+          linkType = LinkType.MERGE_SLASH;
         } else if (src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY)) {
           // prefix.settings.src
           src = src.substring(Constants.CONFIG_VIEWFS_LINK_NFLY.length() + 1);
@@ -370,14 +551,69 @@
           throw new IOException("ViewFs: Cannot initialize: Invalid entry in " +
               "Mount table in config: " + src);
         }
-        final String target = si.getValue(); // link or merge link
-        createLink(src, target, linkType, settings, ugi, config);
+
+        final String target = si.getValue();
+        if (linkType != LinkType.MERGE_SLASH) {
+          if (isMergeSlashConfigured) {
+            throw new IOException("Mount table " + mountTableName
+                + " has already been configured with a merge slash link. "
+                + "A regular link should not be added.");
+          }
+          linkEntries.add(
+              new LinkEntry(src, target, linkType, settings, ugi, config));
+        } else {
+          if (!linkEntries.isEmpty()) {
+            throw new IOException("Mount table " + mountTableName
+                + " has already been configured with regular links. "
+                + "A merge slash link should not be configured.");
+          }
+          if (isMergeSlashConfigured) {
+            throw new IOException("Mount table " + mountTableName
+                + " has already been configured with a merge slash link. "
+                + "Multiple merge slash links for the same mount table is "
+                + "not allowed.");
+          }
+          isMergeSlashConfigured = true;
+          mergeSlashTarget = target;
+        }
       }
     }
+
+    if (isMergeSlashConfigured) {
+      Preconditions.checkNotNull(mergeSlashTarget);
+      root = new INodeLink<T>(mountTableName, ugi,
+          initAndGetTargetFs(),
+          new URI(mergeSlashTarget));
+      mountPoints.add(new MountPoint<T>("/", (INodeLink<T>) root));
+      rootFallbackLink = null;
+    } else {
+      root = new INodeDir<T>("/", UserGroupInformation.getCurrentUser());
+      getRootDir().setInternalDirFs(getTargetFileSystem(getRootDir()));
+      getRootDir().setRoot(true);
+      INodeLink<T> fallbackLink = null;
+      for (LinkEntry le : linkEntries) {
+        if (le.isLinkType(LinkType.SINGLE_FALLBACK)) {
+          if (fallbackLink != null) {
+            throw new IOException("Mount table " + mountTableName
+                + " has already been configured with a link fallback. "
+                + "Multiple fallback links for the same mount table is "
+                + "not allowed.");
+          }
+          fallbackLink = new INodeLink<T>(mountTableName, ugi,
+              initAndGetTargetFs(),
+              new URI(le.getTarget()));
+        } else {
+          createLink(le.getSrc(), le.getTarget(), le.getLinkType(),
+              le.getSettings(), le.getUgi(), le.getConfig());
+        }
+      }
+      rootFallbackLink = fallbackLink;
+    }
+
     if (!gotMountTableEntry) {
       throw new IOException(
           "ViewFs: Cannot initialize: Empty Mount table in config for " +
-              "viewfs://" + vName + "/");
+              "viewfs://" + mountTableName + "/");
     }
   }
 
@@ -414,7 +650,7 @@
 
   /**
    * Resolve the pathname p relative to root InodeDir
-   * @param p - inout path
+   * @param p - input path
    * @param resolveLastComponent
    * @return ResolveResult which allows further resolution of the remaining path
    * @throws FileNotFoundException
@@ -424,26 +660,53 @@
     // TO DO: - more efficient to not split the path, but simply compare
     String[] path = breakIntoPathComponents(p); 
     if (path.length <= 1) { // special case for when path is "/"
-      ResolveResult<T> res =
-          new ResolveResult<T>(ResultKind.INTERNAL_DIR,
-              root.InodeDirFs, root.fullPath, SlashPath);
+      T targetFs = root.isInternalDir() ?
+          getRootDir().getInternalDirFs() : getRootLink().getTargetFileSystem();
+      ResolveResult<T> res = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
+          targetFs, root.fullPath, SlashPath);
       return res;
     }
 
-    INodeDir<T> curInode = root;
+    /**
+     * linkMergeSlash has been configured. The root of this mount table has
+     * been linked to the root directory of a file system.
+     * The first non-slash path component should be name of the mount table.
+     */
+    if (root.isLink()) {
+      Path remainingPath;
+      StringBuilder remainingPathStr = new StringBuilder();
+      // ignore first slash
+      for (int i = 1; i < path.length; i++) {
+        remainingPathStr.append("/").append(path[i]);
+      }
+      remainingPath = new Path(remainingPathStr.toString());
+      ResolveResult<T> res = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+          getRootLink().getTargetFileSystem(), root.fullPath, remainingPath);
+      return res;
+    }
+    Preconditions.checkState(root.isInternalDir());
+    INodeDir<T> curInode = getRootDir();
+
     int i;
     // ignore first slash
     for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
       INode<T> nextInode = curInode.resolveInternal(path[i]);
       if (nextInode == null) {
-        StringBuilder failedAt = new StringBuilder(path[0]);
-        for (int j = 1; j <= i; ++j) {
-          failedAt.append('/').append(path[j]);
+        if (hasFallbackLink()) {
+          return new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+              getRootFallbackLink().getTargetFileSystem(),
+              root.fullPath, new Path(p));
+        } else {
+          StringBuilder failedAt = new StringBuilder(path[0]);
+          for (int j = 1; j <= i; ++j) {
+            failedAt.append('/').append(path[j]);
+          }
+          throw (new FileNotFoundException(
+              "File/Directory does not exist: " + failedAt.toString()));
         }
-        throw (new FileNotFoundException(failedAt.toString()));
       }
 
-      if (nextInode instanceof INodeLink) {
+      if (nextInode.isLink()) {
         final INodeLink<T> link = (INodeLink<T>) nextInode;
         final Path remainingPath;
         if (i >= path.length - 1) {
@@ -459,7 +722,7 @@
             new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
                 link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
         return res;
-      } else if (nextInode instanceof INodeDir) {
+      } else if (nextInode.isInternalDir()) {
         curInode = (INodeDir<T>) nextInode;
       }
     }
@@ -481,7 +744,7 @@
     }
     final ResolveResult<T> res =
         new ResolveResult<T>(ResultKind.INTERNAL_DIR,
-            curInode.InodeDirFs, curInode.fullPath, remainingPath);
+            curInode.getInternalDirFs(), curInode.fullPath, remainingPath);
     return res;
   }
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
index 9726100..672839b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
@@ -1167,12 +1167,12 @@
     public FileStatus[] listStatus(Path f) throws AccessControlException,
         FileNotFoundException, IOException {
       checkPathIsSlash(f);
-      FileStatus[] result = new FileStatus[theInternalDir.children.size()];
+      FileStatus[] result = new FileStatus[theInternalDir.getChildren().size()];
       int i = 0;
-      for (Entry<String, INode<FileSystem>> iEntry : 
-                                          theInternalDir.children.entrySet()) {
+      for (Entry<String, INode<FileSystem>> iEntry :
+          theInternalDir.getChildren().entrySet()) {
         INode<FileSystem> inode = iEntry.getValue();
-        if (inode instanceof INodeLink ) {
+        if (inode.isLink()) {
           INodeLink<FileSystem> link = (INodeLink<FileSystem>) inode;
 
           result[i++] = new FileStatus(0, false, 0, 0,
@@ -1195,11 +1195,12 @@
     @Override
     public boolean mkdirs(Path dir, FsPermission permission)
         throws AccessControlException, FileAlreadyExistsException {
-      if (theInternalDir.isRoot && dir == null) {
+      if (theInternalDir.isRoot() && dir == null) {
         throw new FileAlreadyExistsException("/ already exits");
       }
       // Note dir starts with /
-      if (theInternalDir.children.containsKey(dir.toString().substring(1))) {
+      if (theInternalDir.getChildren().containsKey(
+          dir.toString().substring(1))) {
         return true; // this is the stupid semantics of FileSystem
       }
       throw readOnlyMountTable("mkdirs",  dir);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
index 26f3a15..9c12baa 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
@@ -922,13 +922,13 @@
         throws IOException {
       // look up i internalDirs children - ignore first Slash
       INode<AbstractFileSystem> inode =
-        theInternalDir.children.get(f.toUri().toString().substring(1)); 
+          theInternalDir.getChildren().get(f.toUri().toString().substring(1));
       if (inode == null) {
         throw new FileNotFoundException(
             "viewFs internal mount table - missing entry:" + f);
       }
       FileStatus result;
-      if (inode instanceof INodeLink) {
+      if (inode.isLink()) {
         INodeLink<AbstractFileSystem> inodelink = 
           (INodeLink<AbstractFileSystem>) inode;
         result = new FileStatus(0, false, 0, 0, creationTime, creationTime,
@@ -970,14 +970,14 @@
     public FileStatus[] listStatus(final Path f) throws AccessControlException,
         IOException {
       checkPathIsSlash(f);
-      FileStatus[] result = new FileStatus[theInternalDir.children.size()];
+      FileStatus[] result = new FileStatus[theInternalDir.getChildren().size()];
       int i = 0;
-      for (Entry<String, INode<AbstractFileSystem>> iEntry : 
-                                          theInternalDir.children.entrySet()) {
+      for (Entry<String, INode<AbstractFileSystem>> iEntry :
+          theInternalDir.getChildren().entrySet()) {
         INode<AbstractFileSystem> inode = iEntry.getValue();
 
         
-        if (inode instanceof INodeLink ) {
+        if (inode.isLink()) {
           INodeLink<AbstractFileSystem> link = 
             (INodeLink<AbstractFileSystem>) inode;
 
@@ -1002,7 +1002,7 @@
     public void mkdir(final Path dir, final FsPermission permission,
         final boolean createParent) throws AccessControlException,
         FileAlreadyExistsException {
-      if (theInternalDir.isRoot && dir == null) {
+      if (theInternalDir.isRoot() && dir == null) {
         throw new FileAlreadyExistsException("/ already exits");
       }
       throw readOnlyMountTable("mkdir", dir);
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java
index 6b398bc..c33a180 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java
@@ -1300,8 +1300,8 @@
           + mtPrefix + Constants.CONFIG_VIEWFS_LINK + "." + "/");
     } catch (Exception e) {
       if (e instanceof UnsupportedFileSystemException) {
-        String msg = Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH
-            + " is not supported yet.";
+        String msg = " Use " + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH +
+            " instead";
         assertThat(e.getMessage(), containsString(msg));
       } else {
         fail("Unexpected exception: " + e.getMessage());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md
index 14d998f..b2f7e57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md
@@ -91,7 +91,7 @@
 
 ViewFs implements the Hadoop file system interface just like HDFS and the local file system. It is a trivial file system in the sense that it only allows linking to other file systems. Because ViewFs implements the Hadoop file system interface, it works transparently Hadoop tools. For example, all the shell commands work with ViewFs as with HDFS and local file system.
 
-The mount points of a mount table are specified in the standard Hadoop configuration files. In the configuration of each cluster, the default file system is set to the mount table for that cluster as shown below (compare it with the configuration in [Single Namenode Clusters](#Single_Namenode_Clusters)).
+In the configuration of each cluster, the default file system is set to the mount table for that cluster as shown below (compare it with the configuration in [Single Namenode Clusters](#Single_Namenode_Clusters)).
 
 ```xml
 <property>
@@ -100,7 +100,47 @@
 </property>
 ```
 
-The authority following the `viewfs://` scheme in the URI is the mount table name. It is recommanded that the mount table of a cluster should be named by the cluster name. Then Hadoop system will look for a mount table with the name "clusterX" in the Hadoop configuration files. Operations arrange all gateways and service machines to contain the mount tables for ALL clusters such that, for each cluster, the default file system is set to the ViewFs mount table for that cluster as described above.
+The authority following the `viewfs://` scheme in the URI is the mount table name. It is recommended that the mount table of a cluster should be named by the cluster name. Then Hadoop system will look for a mount table with the name "clusterX" in the Hadoop configuration files. Operations arrange all gateways and service machines to contain the mount tables for ALL clusters such that, for each cluster, the default file system is set to the ViewFs mount table for that cluster as described above.
+
+The mount points of a mount table are specified in the standard Hadoop configuration files. All the mount table config entries for `viewfs` are prefixed by `fs.viewfs.mounttable.`. The mount points that are linking other filesystems are specified using `link` tags. The recommendation is to have mount points name same as in the linked filesystem target locations. For all namespaces that are not configured in the mount table, we can have them fallback to a default filesystem via `linkFallback`.
+
+In the below mount table configuration, namespace `/data` is linked to the filesystem `hdfs://nn1-clusterx.example.com:9820/data`, `/project` is linked to the filesystem `hdfs://nn2-clusterx.example.com:9820/project`. All namespaces that are not configured in the mount table, like `/logs` are linked to the filesystem `hdfs://nn5-clusterx.example.com:9820/home`.
+
+```xml
+<configuration>
+  <property>
+    <name>fs.viewfs.mounttable.ClusterX.link./data</name>
+    <value>hdfs://nn1-clusterx.example.com:9820/data</value>
+  </property>
+  <property>
+    <name>fs.viewfs.mounttable.ClusterX.link./project</name>
+    <value>hdfs://nn2-clusterx.example.com:9820/project</value>
+  </property>
+  <property>
+    <name>fs.viewfs.mounttable.ClusterX.link./user</name>
+    <value>hdfs://nn3-clusterx.example.com:9820/user</value>
+  </property>
+  <property>
+    <name>fs.viewfs.mounttable.ClusterX.link./tmp</name>
+    <value>hdfs://nn4-clusterx.example.com:9820/tmp</value>
+  </property>
+  <property>
+    <name>fs.viewfs.mounttable.ClusterX.linkFallback</name>
+    <value>hdfs://nn5-clusterx.example.com:9820/home</value>
+  </property>
+</configuration>
+```
+
+Alternatively we can have the mount table's root merged with the root of another filesystem via `linkMergeSlash`. In the below mount table configuration, ClusterY's root is merged with the root filesystem at `hdfs://nn1-clustery.example.com:9820`.
+
+```xml
+<configuration>
+  <property>
+    <name>fs.viewfs.mounttable.ClusterY.linkMergeSlash</name>
+    <value>hdfs://nn1-clustery.example.com:9820/</value>
+  </property>
+</configuration>
+```
 
 ### Pathname Usage Patterns
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkFallback.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkFallback.java
new file mode 100644
index 0000000..5fb7c3b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkFallback.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for viewfs with LinkFallback mount table entries.
+ */
+public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
+
+  private static FileSystem fsDefault;
+  private static MiniDFSCluster cluster;
+  private static final int NAME_SPACES_COUNT = 3;
+  private static final int DATA_NODES_COUNT = 3;
+  private static final int FS_INDEX_DEFAULT = 0;
+  private static final String LINK_FALLBACK_CLUSTER_1_NAME = "Cluster1";
+  private static final FileSystem[] FS_HDFS = new FileSystem[NAME_SPACES_COUNT];
+  private static final Configuration CONF = new Configuration();
+  private static final File TEST_DIR = GenericTestUtils.getTestDir(
+      TestViewFileSystemLinkFallback.class.getSimpleName());
+  private static final String TEST_BASE_PATH =
+      "/tmp/TestViewFileSystemLinkFallback";
+  private final static Logger LOG = LoggerFactory.getLogger(
+      TestViewFileSystemLinkFallback.class);
+
+
+  @Override
+  protected FileSystemTestHelper createFileSystemHelper() {
+    return new FileSystemTestHelper(TEST_BASE_PATH);
+  }
+
+  @BeforeClass
+  public static void clusterSetupAtBeginning() throws IOException,
+      LoginException, URISyntaxException {
+    SupportsBlocks = true;
+    CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+        true);
+    cluster = new MiniDFSCluster.Builder(CONF)
+        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(
+            NAME_SPACES_COUNT))
+        .numDataNodes(DATA_NODES_COUNT)
+        .build();
+    cluster.waitClusterUp();
+
+    for (int i = 0; i < NAME_SPACES_COUNT; i++) {
+      FS_HDFS[i] = cluster.getFileSystem(i);
+    }
+    fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
+  }
+
+  @AfterClass
+  public static void clusterShutdownAtEnd() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    fsTarget = fsDefault;
+    super.setUp();
+  }
+
+  /**
+   * Override this so that we don't set the targetTestRoot to any path under the
+   * root of the FS, and so that we don't try to delete the test dir, but rather
+   * only its contents.
+   */
+  @Override
+  void initializeTargetTestRoot() throws IOException {
+    targetTestRoot = fsDefault.makeQualified(new Path("/"));
+    for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
+      fsDefault.delete(status.getPath(), true);
+    }
+  }
+
+  @Override
+  void setupMountPoints() {
+    super.setupMountPoints();
+    ConfigUtil.addLinkFallback(conf, LINK_FALLBACK_CLUSTER_1_NAME,
+        targetTestRoot.toUri());
+  }
+
+  @Override
+  int getExpectedDelegationTokenCount() {
+    return 1; // all point to the same fs so 1 unique token
+  }
+
+  @Override
+  int getExpectedDelegationTokenCountWithCredentials() {
+    return 1;
+  }
+
+  @Test
+  public void testConfLinkFallback() throws Exception {
+    Path testBasePath = new Path(TEST_BASE_PATH);
+    Path testLevel2Dir = new Path(TEST_BASE_PATH, "dir1/dirA");
+    Path testBaseFile = new Path(testBasePath, "testBaseFile.log");
+    Path testBaseFileRelative = new Path(testLevel2Dir,
+        "../../testBaseFile.log");
+    Path testLevel2File = new Path(testLevel2Dir, "testLevel2File.log");
+    fsTarget.mkdirs(testLevel2Dir);
+
+    fsTarget.createNewFile(testBaseFile);
+    FSDataOutputStream dataOutputStream = fsTarget.append(testBaseFile);
+    dataOutputStream.write(1);
+    dataOutputStream.close();
+
+    fsTarget.createNewFile(testLevel2File);
+    dataOutputStream = fsTarget.append(testLevel2File);
+    dataOutputStream.write("test link fallback".toString().getBytes());
+    dataOutputStream.close();
+
+    String clusterName = "ClusterFallback";
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
+        "/", null, null);
+
+    Configuration conf = new Configuration();
+    ConfigUtil.addLinkFallback(conf, clusterName, fsTarget.getUri());
+
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    assertEquals(ViewFileSystem.class, vfs.getClass());
+    FileStatus baseFileStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+        + testBaseFile.toUri().toString()));
+    LOG.info("BaseFileStat: " + baseFileStat);
+    FileStatus baseFileRelStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+        + testBaseFileRelative.toUri().toString()));
+    LOG.info("BaseFileRelStat: " + baseFileRelStat);
+    Assert.assertEquals("Unexpected file length for " + testBaseFile,
+        1, baseFileStat.getLen());
+    Assert.assertEquals("Unexpected file length for " + testBaseFileRelative,
+        baseFileStat.getLen(), baseFileRelStat.getLen());
+    FileStatus level2FileStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+        + testLevel2File.toUri().toString()));
+    LOG.info("Level2FileStat: " + level2FileStat);
+    vfs.close();
+  }
+
+  @Test
+  public void testConfLinkFallbackWithRegularLinks() throws Exception {
+    Path testBasePath = new Path(TEST_BASE_PATH);
+    Path testLevel2Dir = new Path(TEST_BASE_PATH, "dir1/dirA");
+    Path testBaseFile = new Path(testBasePath, "testBaseFile.log");
+    Path testLevel2File = new Path(testLevel2Dir, "testLevel2File.log");
+    fsTarget.mkdirs(testLevel2Dir);
+
+    fsTarget.createNewFile(testBaseFile);
+    fsTarget.createNewFile(testLevel2File);
+    FSDataOutputStream dataOutputStream = fsTarget.append(testLevel2File);
+    dataOutputStream.write("test link fallback".toString().getBytes());
+    dataOutputStream.close();
+
+    String clusterName = "ClusterFallback";
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
+        "/", null, null);
+
+    Configuration conf = new Configuration();
+    ConfigUtil.addLink(conf, clusterName,
+        "/internalDir/linkToDir2",
+        new Path(targetTestRoot, "dir2").toUri());
+    ConfigUtil.addLink(conf, clusterName,
+        "/internalDir/internalDirB/linkToDir3",
+        new Path(targetTestRoot, "dir3").toUri());
+    ConfigUtil.addLink(conf, clusterName,
+        "/danglingLink",
+        new Path(targetTestRoot, "missingTarget").toUri());
+    ConfigUtil.addLink(conf, clusterName,
+        "/linkToAFile",
+        new Path(targetTestRoot, "aFile").toUri());
+    System.out.println("ViewFs link fallback " + fsTarget.getUri());
+    ConfigUtil.addLinkFallback(conf, clusterName, targetTestRoot.toUri());
+
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    assertEquals(ViewFileSystem.class, vfs.getClass());
+    FileStatus baseFileStat = vfs.getFileStatus(
+        new Path(viewFsUri.toString() + testBaseFile.toUri().toString()));
+    LOG.info("BaseFileStat: " + baseFileStat);
+    Assert.assertEquals("Unexpected file length for " + testBaseFile,
+        0, baseFileStat.getLen());
+    FileStatus level2FileStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+        + testLevel2File.toUri().toString()));
+    LOG.info("Level2FileStat: " + level2FileStat);
+
+    dataOutputStream = vfs.append(testLevel2File);
+    dataOutputStream.write("Writing via viewfs fallback path".getBytes());
+    dataOutputStream.close();
+
+    FileStatus level2FileStatAfterWrite = vfs.getFileStatus(
+        new Path(viewFsUri.toString() + testLevel2File.toUri().toString()));
+    Assert.assertTrue("Unexpected file length for " + testLevel2File,
+        level2FileStatAfterWrite.getLen() > level2FileStat.getLen());
+
+    vfs.close();
+  }
+
+  @Test
+  public void testConfLinkFallbackWithMountPoint() throws Exception {
+    TEST_DIR.mkdirs();
+    Configuration conf = new Configuration();
+    String clusterName = "ClusterX";
+    String mountPoint = "/user";
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
+        "/", null, null);
+    String expectedErrorMsg =  "Invalid linkFallback entry in config: " +
+        "linkFallback./user";
+    String mountTableEntry = Constants.CONFIG_VIEWFS_PREFIX + "."
+        + clusterName + "." + Constants.CONFIG_VIEWFS_LINK_FALLBACK
+        + "." + mountPoint;
+    conf.set(mountTableEntry, TEST_DIR.toURI().toString());
+
+    try {
+      FileSystem.get(viewFsUri, conf);
+      fail("Shouldn't allow linkMergeSlash to take extra mount points!");
+    } catch (IOException e) {
+      assertTrue("Unexpected error: " + e.getMessage(),
+          e.getMessage().contains(expectedErrorMsg));
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkMergeSlash.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkMergeSlash.java
new file mode 100644
index 0000000..606743f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkMergeSlash.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import javax.security.auth.login.LoginException;
+
+/**
+ * Test for viewfs with LinkMergeSlash mount table entries.
+ */
+public class TestViewFileSystemLinkMergeSlash extends ViewFileSystemBaseTest {
+
+  private static FileSystem fsDefault;
+  private static MiniDFSCluster cluster;
+  private static final int NAME_SPACES_COUNT = 3;
+  private static final int DATA_NODES_COUNT = 3;
+  private static final int FS_INDEX_DEFAULT = 0;
+  private static final String LINK_MERGE_SLASH_CLUSTER_1_NAME = "ClusterLMS1";
+  private static final String LINK_MERGE_SLASH_CLUSTER_2_NAME = "ClusterLMS2";
+  private static final FileSystem[] FS_HDFS = new FileSystem[NAME_SPACES_COUNT];
+  private static final Configuration CONF = new Configuration();
+  private static final File TEST_DIR = GenericTestUtils.getTestDir(
+      TestViewFileSystemLinkMergeSlash.class.getSimpleName());
+  private static final String TEST_TEMP_PATH =
+      "/tmp/TestViewFileSystemLinkMergeSlash";
+  private final static Logger LOG = LoggerFactory.getLogger(
+      TestViewFileSystemLinkMergeSlash.class);
+
+  @Override
+  protected FileSystemTestHelper createFileSystemHelper() {
+    return new FileSystemTestHelper(TEST_TEMP_PATH);
+  }
+
+  @BeforeClass
+  public static void clusterSetupAtBeginning() throws IOException,
+      LoginException, URISyntaxException {
+    SupportsBlocks = true;
+    CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+        true);
+    cluster = new MiniDFSCluster.Builder(CONF)
+        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(
+            NAME_SPACES_COUNT))
+        .numDataNodes(DATA_NODES_COUNT)
+        .build();
+    cluster.waitClusterUp();
+
+    for (int i = 0; i < NAME_SPACES_COUNT; i++) {
+      FS_HDFS[i] = cluster.getFileSystem(i);
+    }
+    fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
+  }
+
+  @AfterClass
+  public static void clusterShutdownAtEnd() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    fsTarget = fsDefault;
+    super.setUp();
+  }
+
+  /**
+   * Override this so that we don't set the targetTestRoot to any path under the
+   * root of the FS, and so that we don't try to delete the test dir, but rather
+   * only its contents.
+   */
+  @Override
+  void initializeTargetTestRoot() throws IOException {
+    targetTestRoot = fsDefault.makeQualified(new Path("/"));
+    for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
+      fsDefault.delete(status.getPath(), true);
+    }
+  }
+
+  @Override
+  void setupMountPoints() {
+    super.setupMountPoints();
+    ConfigUtil.addLinkMergeSlash(conf, LINK_MERGE_SLASH_CLUSTER_1_NAME,
+        targetTestRoot.toUri());
+    ConfigUtil.addLinkMergeSlash(conf, LINK_MERGE_SLASH_CLUSTER_2_NAME,
+        targetTestRoot.toUri());
+  }
+
+  @Override
+  int getExpectedDelegationTokenCount() {
+    return 1; // all point to the same fs so 1 unique token
+  }
+
+  @Override
+  int getExpectedDelegationTokenCountWithCredentials() {
+    return 1;
+  }
+
+  @Test
+  public void testConfLinkMergeSlash() throws Exception {
+    TEST_DIR.mkdirs();
+    String clusterName = "ClusterMerge";
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
+        "/", null, null);
+    String testFileName = "testLinkMergeSlash";
+
+    File infile = new File(TEST_DIR, testFileName);
+    final byte[] content = "HelloWorld".getBytes();
+    FileOutputStream fos = null;
+    try {
+      fos = new FileOutputStream(infile);
+      fos.write(content);
+    } finally {
+      if (fos != null) {
+        fos.close();
+      }
+    }
+    assertEquals((long)content.length, infile.length());
+
+    Configuration conf = new Configuration();
+    ConfigUtil.addLinkMergeSlash(conf, clusterName, TEST_DIR.toURI());
+
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    assertEquals(ViewFileSystem.class, vfs.getClass());
+    FileStatus stat = vfs.getFileStatus(new Path(viewFsUri.toString() +
+        testFileName));
+
+    LOG.info("File stat: " + stat);
+    vfs.close();
+  }
+
+  @Test
+  public void testConfLinkMergeSlashWithRegularLinks() throws Exception {
+    TEST_DIR.mkdirs();
+    String clusterName = "ClusterMerge";
+    String expectedErrorMsg1 = "Mount table ClusterMerge has already been " +
+        "configured with a merge slash link";
+    String expectedErrorMsg2 = "Mount table ClusterMerge has already been " +
+        "configured with regular links";
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
+        "/", null, null);
+    Configuration conf = new Configuration();
+    ConfigUtil.addLinkMergeSlash(conf, clusterName, TEST_DIR.toURI());
+    ConfigUtil.addLink(conf, clusterName, "testDir", TEST_DIR.toURI());
+
+    try {
+      FileSystem.get(viewFsUri, conf);
+      fail("Shouldn't allow both merge slash link and regular link on same "
+          + "mount table.");
+    } catch (IOException e) {
+      assertTrue("Unexpected error message: " + e.getMessage(),
+          e.getMessage().contains(expectedErrorMsg1) || e.getMessage()
+              .contains(expectedErrorMsg2));
+    }
+  }
+
+  @Test
+  public void testConfLinkMergeSlashWithMountPoint() throws Exception {
+    TEST_DIR.mkdirs();
+    Configuration conf = new Configuration();
+    String clusterName = "ClusterX";
+    String mountPoint = "/user";
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
+        "/", null, null);
+    String expectedErrorMsg =  "Invalid linkMergeSlash entry in config: " +
+        "linkMergeSlash./user";
+    String mountTableEntry = Constants.CONFIG_VIEWFS_PREFIX + "."
+        + clusterName + "." + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH
+        + "." + mountPoint;
+    conf.set(mountTableEntry, TEST_DIR.toURI().toString());
+
+    try {
+      FileSystem.get(viewFsUri, conf);
+      fail("Shouldn't allow linkMergeSlash to take extra mount points!");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains(expectedErrorMsg));
+    }
+  }
+
+  @Test
+  public void testChildFileSystems() throws Exception {
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME,
+        LINK_MERGE_SLASH_CLUSTER_1_NAME, "/", null, null);
+    FileSystem fs = FileSystem.get(viewFsUri, conf);
+    FileSystem[] childFs = fs.getChildFileSystems();
+    Assert.assertEquals("Unexpected number of child filesystems!",
+        1, childFs.length);
+    Assert.assertEquals("Unexpected child filesystem!",
+        DistributedFileSystem.class, childFs[0].getClass());
+  }
+}