HDFS-13291. RBF: Implement available space based OrderResolver. Contributed by Yiqun Lin.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java
index 8dd73ec..e31077e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java
@@ -21,6 +21,7 @@
 import java.util.EnumMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
 import org.apache.hadoop.hdfs.server.federation.resolver.order.HashFirstResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver;
@@ -77,6 +78,8 @@
     addResolver(DestinationOrder.LOCAL, new LocalResolver(conf, router));
     addResolver(DestinationOrder.RANDOM, new RandomResolver());
     addResolver(DestinationOrder.HASH_ALL, new HashResolver());
+    addResolver(DestinationOrder.SPACE,
+        new AvailableSpaceResolver(conf, router));
   }
 
   @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/AvailableSpaceResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/AvailableSpaceResolver.java
new file mode 100644
index 0000000..77a35a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/AvailableSpaceResolver.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.SubclusterAvailableSpace;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Order the destinations based on available space. This resolver uses a
+ * higher probability (instead of "always") to choose the cluster with higher
+ * available space.
+ */
+public class AvailableSpaceResolver
+    extends RouterResolver<String, SubclusterAvailableSpace> {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(AvailableSpaceResolver.class);
+
+  /** Increases chance of files on subcluster with more available space. */
+  public static final String BALANCER_PREFERENCE_KEY =
+      RBFConfigKeys.FEDERATION_ROUTER_PREFIX
+      + "available-space-resolver.balanced-space-preference-fraction";
+  public static final float BALANCER_PREFERENCE_DEFAULT = 0.6f;
+
+  /** Random instance used in the subcluster comparison. */
+  private static final Random RAND = new Random();
+
+  /** Customized comparator for SubclusterAvailableSpace. */
+  private SubclusterSpaceComparator comparator;
+
+  public AvailableSpaceResolver(final Configuration conf,
+      final Router routerService) {
+    super(conf, routerService);
+    float balancedPreference = conf.getFloat(BALANCER_PREFERENCE_KEY,
+        BALANCER_PREFERENCE_DEFAULT);
+    if (balancedPreference < 0.5) {
+      LOG.warn("The balancer preference value is less than 0.5. That means more"
+          + " files will be allocated in cluster with lower available space.");
+    }
+
+    this.comparator = new SubclusterSpaceComparator(balancedPreference);
+  }
+
+  /**
+   * Get the mapping from NamespaceId to subcluster space info. It gets this
+   * mapping from the subclusters through expensive calls (e.g., RPC) and uses
+   * caching to avoid too many calls. The cache might be updated asynchronously
+   * to reduce latency.
+   *
+   * @return NamespaceId -> {@link SubclusterAvailableSpace}
+   */
+  @Override
+  protected Map<String, SubclusterAvailableSpace> getSubclusterInfo(
+      MembershipStore membershipStore) {
+    Map<String, SubclusterAvailableSpace> mapping = new HashMap<>();
+    try {
+      // Get the Namenode's available space info from the subclusters
+      // from the Membership store.
+      GetNamenodeRegistrationsRequest request = GetNamenodeRegistrationsRequest
+          .newInstance();
+      GetNamenodeRegistrationsResponse response = membershipStore
+          .getNamenodeRegistrations(request);
+      final List<MembershipState> nns = response.getNamenodeMemberships();
+      for (MembershipState nn : nns) {
+        try {
+          String nsId = nn.getNameserviceId();
+          long availableSpace = nn.getStats().getAvailableSpace();
+          mapping.put(nsId, new SubclusterAvailableSpace(nsId, availableSpace));
+        } catch (Exception e) {
+          LOG.error("Cannot get stats info for {}: {}.", nn, e.getMessage());
+        }
+      }
+    } catch (IOException ioe) {
+      LOG.error("Cannot get Namenodes from the State Store.", ioe);
+    }
+    return mapping;
+  }
+
+  @Override
+  protected String chooseFirstNamespace(String path, PathLocation loc) {
+    Map<String, SubclusterAvailableSpace> subclusterInfo =
+        getSubclusterMapping();
+    List<SubclusterAvailableSpace> subclusterList = new LinkedList<>(
+        subclusterInfo.values());
+    Collections.sort(subclusterList, comparator);
+
+    return subclusterList.size() > 0 ? subclusterList.get(0).getNameserviceId()
+        : null;
+  }
+
+  /**
+   * Inner class that stores cluster available space info.
+   */
+  static class SubclusterAvailableSpace {
+    private final String nsId;
+    private final long availableSpace;
+
+    SubclusterAvailableSpace(String nsId, long availableSpace) {
+      this.nsId = nsId;
+      this.availableSpace = availableSpace;
+    }
+
+    public String getNameserviceId() {
+      return this.nsId;
+    }
+
+    public long getAvailableSpace() {
+      return this.availableSpace;
+    }
+  }
+
+  /**
+   * Customized comparator for SubclusterAvailableSpace. If more available
+   * space the one cluster has, the higher priority it will have. But this
+   * is not absolute, there is a balanced preference to make this use a higher
+   * probability (instead of "always") to compare by this way.
+   */
+  static final class SubclusterSpaceComparator
+      implements Comparator<SubclusterAvailableSpace>, Serializable {
+    private int balancedPreference;
+
+    SubclusterSpaceComparator(float balancedPreference) {
+      Preconditions.checkArgument(
+          balancedPreference <= 1 && balancedPreference >= 0,
+          "The balancer preference value should be in the range 0.0 - 1.0");
+
+      this.balancedPreference = (int) (100 * balancedPreference);
+    }
+
+    @Override
+    public int compare(SubclusterAvailableSpace cluster1,
+        SubclusterAvailableSpace cluster2) {
+      int ret = cluster1.getAvailableSpace() > cluster2.getAvailableSpace() ? -1
+          : 1;
+
+      if (ret < 0) {
+        return (RAND.nextInt(100) < balancedPreference) ? -1 : 1;
+      } else {
+        return (RAND.nextInt(100) < balancedPreference) ? 1 : -1;
+      }
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
index 03e68e5..99c5e22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
@@ -26,5 +26,6 @@
   HASH, // Follow consistent hashing in the first folder level
   LOCAL, // Local first
   RANDOM, // Random order
-  HASH_ALL // Follow consistent hashing
+  HASH_ALL, // Follow consistent hashing
+  SPACE // Available space based order
 }
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java
index 4d76c89..b6bd4b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java
@@ -17,11 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.federation.resolver.order;
 
-import static org.apache.hadoop.util.Time.monotonicNow;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.net.HostAndPort;
-
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -30,17 +25,14 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
-import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.router.Router;
 import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
 import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
-import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
@@ -50,40 +42,46 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.net.HostAndPort;
+
 
 /**
  * The local subcluster (where the writer is) should be tried first. The writer
  * is defined from the RPC query received in the RPC server.
  */
-public class LocalResolver implements OrderedResolver {
+public class LocalResolver extends RouterResolver<String, String> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(LocalResolver.class);
 
-  /** Configuration key to set the minimum time to update the local cache.*/
-  public static final String MIN_UPDATE_PERIOD_KEY =
-      RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "local-resolver.update-period";
-  /** 10 seconds by default. */
-  private static final long MIN_UPDATE_PERIOD_DEFAULT =
-      TimeUnit.SECONDS.toMillis(10);
-
-
-  /** Router service. */
-  private final Router router;
-  /** Minimum update time. */
-  private final long minUpdateTime;
-
-  /** Node IP -> Subcluster. */
-  private Map<String, String> nodeSubcluster = null;
-  /** Last time the subcluster map was updated. */
-  private long lastUpdated;
-
-
   public LocalResolver(final Configuration conf, final Router routerService) {
-    this.minUpdateTime = conf.getTimeDuration(
-        MIN_UPDATE_PERIOD_KEY, MIN_UPDATE_PERIOD_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    this.router = routerService;
+    super(conf, routerService);
+  }
+
+  /**
+   * Get the mapping from nodes to subcluster. It gets this mapping from the
+   * subclusters through expensive calls (e.g., RPC) and uses caching to avoid
+   * too many calls. The cache might be updated asynchronously to reduce
+   * latency.
+   *
+   * @return Node IP -> Subcluster.
+   */
+  @Override
+  protected Map<String, String> getSubclusterInfo(
+      MembershipStore membershipStore) {
+    Map<String, String> mapping = new HashMap<>();
+
+    Map<String, String> dnSubcluster = getDatanodesSubcluster();
+    if (dnSubcluster != null) {
+      mapping.putAll(dnSubcluster);
+    }
+
+    Map<String, String> nnSubcluster = getNamenodesSubcluster(membershipStore);
+    if (nnSubcluster != null) {
+      mapping.putAll(nnSubcluster);
+    }
+    return mapping;
   }
 
   /**
@@ -98,12 +96,12 @@
    * @return Local name space. Null if we don't know about this machine.
    */
   @Override
-  public String getFirstNamespace(final String path, final PathLocation loc) {
+  protected String chooseFirstNamespace(String path, PathLocation loc) {
     String localSubcluster = null;
     String clientAddr = getClientAddr();
-    Map<String, String> nodeToSubcluster = getSubclusterMappings();
-    if (nodeToSubcluster != null) {
-      localSubcluster = nodeToSubcluster.get(clientAddr);
+    Map<String, String> subclusterInfo = getSubclusterMapping();
+    if (subclusterInfo != null) {
+      localSubcluster = subclusterInfo.get(clientAddr);
       if (localSubcluster != null) {
         LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster);
       } else {
@@ -122,52 +120,6 @@
   }
 
   /**
-   * Get the mapping from nodes to subcluster. It gets this mapping from the
-   * subclusters through expensive calls (e.g., RPC) and uses caching to avoid
-   * too many calls. The cache might be updated asynchronously to reduce
-   * latency.
-   *
-   * @return Node IP -> Subcluster.
-   */
-  @VisibleForTesting
-  synchronized Map<String, String> getSubclusterMappings() {
-    if (nodeSubcluster == null ||
-        (monotonicNow() - lastUpdated) > minUpdateTime) {
-      // Fetch the mapping asynchronously
-      Thread updater = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          Map<String, String> mapping = new HashMap<>();
-
-          Map<String, String> dnSubcluster = getDatanodesSubcluster();
-          if (dnSubcluster != null) {
-            mapping.putAll(dnSubcluster);
-          }
-
-          Map<String, String> nnSubcluster = getNamenodesSubcluster();
-          if (nnSubcluster != null) {
-            mapping.putAll(nnSubcluster);
-          }
-          nodeSubcluster = mapping;
-          lastUpdated = monotonicNow();
-        }
-      });
-      updater.start();
-
-      // Wait until initialized
-      if (nodeSubcluster == null) {
-        try {
-          LOG.debug("Wait to get the mapping for the first time");
-          updater.join();
-        } catch (InterruptedException e) {
-          LOG.error("Cannot wait for the updater to finish");
-        }
-      }
-    }
-    return nodeSubcluster;
-  }
-
-  /**
    * Get the Datanode mapping from the subclusters from the Namenodes. This
    * needs to be done as a privileged action to use the user for the Router and
    * not the one from the client in the RPC call.
@@ -221,14 +173,8 @@
    *
    * @return NN IP -> Subcluster.
    */
-  private Map<String, String> getNamenodesSubcluster() {
-
-    final MembershipStore membershipStore = getMembershipStore();
-    if (membershipStore == null) {
-      LOG.error("Cannot access the Membership store");
-      return null;
-    }
-
+  private Map<String, String> getNamenodesSubcluster(
+      MembershipStore membershipStore) {
     // Manage requests from this hostname (127.0.0.1)
     String localIp = "127.0.0.1";
     String localHostname = localIp;
@@ -269,29 +215,4 @@
     }
     return ret;
   }
-
-  /**
-   * Get the Router RPC server.
-   *
-   * @return Router RPC server. Null if not possible.
-   */
-  private RouterRpcServer getRpcServer() {
-    if (this.router == null) {
-      return null;
-    }
-    return router.getRpcServer();
-  }
-
-  /**
-   * Get the Membership store.
-   *
-   * @return Membership store.
-   */
-  private MembershipStore getMembershipStore() {
-    StateStoreService stateStore = router.getStateStore();
-    if (stateStore == null) {
-      return null;
-    }
-    return stateStore.getRegisteredRecordStore(MembershipStore.class);
-  }
 }
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RouterResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RouterResolver.java
new file mode 100644
index 0000000..91af1ca
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RouterResolver.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The order resolver that depends upon the Router service.
+ *
+ * @param <K> The key type of subcluster mapping info queried from Router.
+ * @param <V> The value type of subcluster mapping info queried from Router.
+ */
+public abstract class RouterResolver<K, V> implements OrderedResolver {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterResolver.class);
+
+  /** Configuration key to set the minimum time to update subcluster info. */
+  public static final String MIN_UPDATE_PERIOD_KEY =
+      RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "router-resolver.update-period";
+  /** 10 seconds by default. */
+  private static final long MIN_UPDATE_PERIOD_DEFAULT = TimeUnit.SECONDS
+      .toMillis(10);
+
+  /** Router service. */
+  private final Router router;
+  /** Minimum update time. */
+  private final long minUpdateTime;
+
+  /** K -> T template mapping. */
+  private Map<K, V> subclusterMapping = null;
+  /** Last time the subcluster mapping was updated. */
+  private long lastUpdated;
+
+  public RouterResolver(final Configuration conf, final Router routerService) {
+    this.minUpdateTime = conf.getTimeDuration(MIN_UPDATE_PERIOD_KEY,
+        MIN_UPDATE_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
+    this.router = routerService;
+  }
+
+  @Override
+  public String getFirstNamespace(String path, PathLocation loc) {
+    updateSubclusterMapping();
+    return chooseFirstNamespace(path, loc);
+  }
+
+  /**
+   * The implementation for getting desired subcluster mapping info.
+   *
+   * @param membershipStore Membership store the resolver queried from.
+   * @return The map of desired type info.
+   */
+  protected abstract Map<K, V> getSubclusterInfo(
+      MembershipStore membershipStore);
+
+  /**
+   * Choose the first namespace from queried subcluster mapping info.
+   *
+   * @param path Path to check.
+   * @param loc Federated location with multiple destinations.
+   * @return First namespace out of the locations.
+   */
+  protected abstract String chooseFirstNamespace(String path, PathLocation loc);
+
+  /**
+   * Update <NamespaceId, Subcluster Info> mapping info periodically.
+   */
+  private synchronized void updateSubclusterMapping() {
+    if (subclusterMapping == null
+        || (monotonicNow() - lastUpdated) > minUpdateTime) {
+      // Fetch the mapping asynchronously
+      Thread updater = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          final MembershipStore membershipStore = getMembershipStore();
+          if (membershipStore == null) {
+            LOG.error("Cannot access the Membership store.");
+            return;
+          }
+
+          subclusterMapping = getSubclusterInfo(membershipStore);
+          lastUpdated = monotonicNow();
+        }
+      });
+      updater.start();
+
+      // Wait until initialized
+      if (subclusterMapping == null) {
+        try {
+          LOG.debug("Wait to get the mapping for the first time");
+          updater.join();
+        } catch (InterruptedException e) {
+          LOG.error("Cannot wait for the updater to finish");
+        }
+      }
+    }
+  }
+
+  /**
+   * Get the Router RPC server.
+   *
+   * @return Router RPC server. Null if not possible.
+   */
+  protected RouterRpcServer getRpcServer() {
+    if (this.router == null) {
+      return null;
+    }
+    return router.getRpcServer();
+  }
+
+  /**
+   * Get the Membership store.
+   *
+   * @return Membership store.
+   */
+  protected MembershipStore getMembershipStore() {
+    StateStoreService stateStore = router.getStateStore();
+    if (stateStore == null) {
+      return null;
+    }
+    return stateStore.getRegisteredRecordStore(MembershipStore.class);
+  }
+
+  /**
+   * Get subcluster mapping info.
+   *
+   * @return The map of subcluster info.
+   */
+  protected Map<K, V> getSubclusterMapping() {
+    return this.subclusterMapping;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index b3f677d..eaa3951 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -534,32 +534,21 @@
   }
 
   /**
-   * Get the permissions for the parent of a child with given permissions. If
-   * the child has r--, we will set it to r-x.
+   * Get the permissions for the parent of a child with given permissions.
+   * Add implicit u+wx permission for parent. This is based on
+   * @{FSDirMkdirOp#addImplicitUwx}.
    * @param mask The permission mask of the child.
    * @return The permission mask of the parent.
    */
   private static FsPermission getParentPermission(final FsPermission mask) {
     FsPermission ret = new FsPermission(
-        applyExecute(mask.getUserAction()),
-        applyExecute(mask.getGroupAction()),
-        applyExecute(mask.getOtherAction()));
+        mask.getUserAction().or(FsAction.WRITE_EXECUTE),
+        mask.getGroupAction(),
+        mask.getOtherAction());
     return ret;
   }
 
   /**
-   * Apply the execute permissions if it can be read.
-   * @param action Input permission.
-   * @return Output permission.
-   */
-  private static FsAction applyExecute(final FsAction action) {
-    if (action.and(FsAction.READ) == FsAction.READ) {
-      return action.or(FsAction.EXECUTE);
-    }
-    return action;
-  }
-
-  /**
    * Get the location to create a file. It checks if the file already existed
    * in one of the locations.
    *
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
index f8fec87..60496ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
@@ -421,7 +421,8 @@
   public boolean isAll() {
     DestinationOrder order = getDestOrder();
     return order == DestinationOrder.HASH_ALL ||
-        order == DestinationOrder.RANDOM;
+        order == DestinationOrder.RANDOM ||
+        order == DestinationOrder.SPACE;
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
index 9667489..48f93bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
@@ -297,6 +297,8 @@
       return DestinationOrder.RANDOM;
     case HASH_ALL:
       return DestinationOrder.HASH_ALL;
+    case SPACE:
+      return DestinationOrder.SPACE;
     default:
       return DestinationOrder.HASH;
     }
@@ -310,6 +312,8 @@
       return DestOrder.RANDOM;
     case HASH_ALL:
       return DestOrder.HASH_ALL;
+    case SPACE:
+      return DestOrder.SPACE;
     default:
       return DestOrder.HASH;
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
index b0d6982..b6a0b12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
@@ -130,6 +130,7 @@
     LOCAL = 1;
     RANDOM = 2;
     HASH_ALL = 3;
+    SPACE = 4;
   }
   optional DestOrder destOrder = 6 [default = HASH];
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestAvailableSpaceResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestAvailableSpaceResolver.java
new file mode 100644
index 0000000..dfbdf51
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestAvailableSpaceResolver.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import static org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.BALANCER_PREFERENCE_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.BALANCER_PREFERENCE_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.SubclusterAvailableSpace;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.SubclusterSpaceComparator;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MembershipStatsPBImpl;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+/**
+ * Test the {@link AvailableSpaceResolver}.
+ */
+public class TestAvailableSpaceResolver {
+
+  private static final int SUBCLUSTER_NUM = 10;
+
+  @Test
+  public void testResolverWithNoPreference() throws IOException {
+    MultipleDestinationMountTableResolver mountTableResolver =
+        mockAvailableSpaceResolver(1.0f);
+    // Since we don't have any preference, it will
+    // always chose the maximum-available-space subcluster.
+    PathLocation loc = mountTableResolver.getDestinationForPath("/space");
+    assertEquals("subcluster9",
+        loc.getDestinations().get(0).getNameserviceId());
+
+    loc = mountTableResolver.getDestinationForPath("/space/subdir");
+    assertEquals("subcluster9",
+        loc.getDestinations().get(0).getNameserviceId());
+  }
+
+  @Test
+  public void testResolverWithDefaultPreference() throws IOException {
+    MultipleDestinationMountTableResolver mountTableResolver =
+        mockAvailableSpaceResolver(BALANCER_PREFERENCE_DEFAULT);
+
+    int retries = 10;
+    int retryTimes = 0;
+    // There is chance we won't always chose the
+    // maximum-available-space subcluster.
+    for (retryTimes = 0; retryTimes < retries; retryTimes++) {
+      PathLocation loc = mountTableResolver.getDestinationForPath("/space");
+      if (!"subcluster9"
+          .equals(loc.getDestinations().get(0).getNameserviceId())) {
+        break;
+      }
+    }
+    assertNotEquals(retries, retryTimes);
+  }
+
+  /**
+   * Mock the available space based resolver.
+   *
+   * @param balancerPreference The balancer preference for the resolver.
+   * @throws IOException
+   * @return MultipleDestinationMountTableResolver instance.
+   */
+  @SuppressWarnings("unchecked")
+  private MultipleDestinationMountTableResolver mockAvailableSpaceResolver(
+      float balancerPreference) throws IOException {
+    Configuration conf = new Configuration();
+    conf.setFloat(BALANCER_PREFERENCE_KEY, balancerPreference);
+    Router router = mock(Router.class);
+    StateStoreService stateStore = mock(StateStoreService.class);
+    MembershipStore membership = mock(MembershipStore.class);
+    when(router.getStateStore()).thenReturn(stateStore);
+    when(stateStore.getRegisteredRecordStore(any(Class.class)))
+        .thenReturn(membership);
+    GetNamenodeRegistrationsResponse response =
+        GetNamenodeRegistrationsResponse.newInstance();
+    // Set the mapping for each client
+    List<MembershipState> records = new LinkedList<>();
+    for (int i = 0; i < SUBCLUSTER_NUM; i++) {
+      records.add(newMembershipState("subcluster" + i, i));
+    }
+    response.setNamenodeMemberships(records);
+
+    when(membership
+        .getNamenodeRegistrations(any(GetNamenodeRegistrationsRequest.class)))
+            .thenReturn(response);
+
+    // construct available space resolver
+    AvailableSpaceResolver resolver = new AvailableSpaceResolver(conf, router);
+    MultipleDestinationMountTableResolver mountTableResolver =
+        new MultipleDestinationMountTableResolver(conf, router);
+    mountTableResolver.addResolver(DestinationOrder.SPACE, resolver);
+
+    // We point /space to subclusters [0,..9] with the SPACE order
+    Map<String, String> destinations = new HashMap<>();
+    for (int i = 0; i < SUBCLUSTER_NUM; i++) {
+      destinations.put("subcluster" + i, "/space");
+    }
+    MountTable spaceEntry = MountTable.newInstance("/space", destinations);
+    spaceEntry.setDestOrder(DestinationOrder.SPACE);
+    mountTableResolver.addEntry(spaceEntry);
+
+    return mountTableResolver;
+  }
+
+  public static MembershipState newMembershipState(String nameservice,
+      long availableSpace) {
+    MembershipState record = MembershipState.newInstance();
+    record.setNameserviceId(nameservice);
+
+    MembershipStats stats = new MembershipStatsPBImpl();
+    stats.setAvailableSpace(availableSpace);
+    record.setStats(stats);
+    return record;
+  }
+
+  @Test
+  public void testSubclusterSpaceComparator() {
+    verifyRank(0.0f, true, false);
+    verifyRank(1.0f, true, true);
+    verifyRank(0.5f, false, false);
+    verifyRank(BALANCER_PREFERENCE_DEFAULT, false, false);
+
+    // test for illegal cases
+    try {
+      verifyRank(2.0f, false, false);
+      fail("Subcluster comparison should be failed.");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(
+          "The balancer preference value should be in the range 0.0 - 1.0", e);
+    }
+
+    try {
+      verifyRank(-1.0f, false, false);
+      fail("Subcluster comparison should be failed.");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(
+          "The balancer preference value should be in the range 0.0 - 1.0", e);
+    }
+  }
+
+  /**
+   * Verify result rank with {@link SubclusterSpaceComparator}.
+   * @param balancerPreference The balancer preference used
+   *        in {@link SubclusterSpaceComparator}.
+   * @param shouldOrdered The result rank should be ordered.
+   * @param isDesc If the rank result is in a descending order.
+   */
+  private void verifyRank(float balancerPreference, boolean shouldOrdered,
+      boolean isDesc) {
+    List<SubclusterAvailableSpace> subclusters = new LinkedList<>();
+    for (int i = 0; i < SUBCLUSTER_NUM; i++) {
+      subclusters.add(new SubclusterAvailableSpace("subcluster" + i, i));
+    }
+
+    // shuffle the cluster list if we expect rank to be ordered
+    if (shouldOrdered) {
+      Collections.shuffle(subclusters);
+    }
+
+    SubclusterSpaceComparator comparator = new SubclusterSpaceComparator(
+        balancerPreference);
+    Collections.sort(subclusters, comparator);
+
+    int i = SUBCLUSTER_NUM - 1;
+    for (; i >= 0; i--) {
+      SubclusterAvailableSpace cluster = subclusters
+          .get(SUBCLUSTER_NUM - 1 - i);
+
+      if (shouldOrdered) {
+        if (isDesc) {
+          assertEquals("subcluster" + i, cluster.getNameserviceId());
+          assertEquals(i, cluster.getAvailableSpace());
+        } else {
+          assertEquals("subcluster" + (SUBCLUSTER_NUM - 1 - i),
+              cluster.getNameserviceId());
+          assertEquals(SUBCLUSTER_NUM - 1 - i, cluster.getAvailableSpace());
+        }
+      } else {
+        // If catch one cluster is not in ordered, that's expected behavior.
+        if (!cluster.getNameserviceId().equals("subcluster" + i)
+            && cluster.getAvailableSpace() != i) {
+          break;
+        }
+      }
+    }
+
+    // The var i won't reach to 0 since cluster list won't be completely
+    // ordered.
+    if (!shouldOrdered) {
+      assertNotEquals(0, i);
+    }
+    subclusters.clear();
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java
index 4995de4..715b627 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java
@@ -60,8 +60,10 @@
 
   /** Directory that will be in a HASH_ALL mount point. */
   private static final String TEST_DIR_HASH_ALL = "/hashall";
-  /** Directory that will be in a HASH_ALL mount point. */
+  /** Directory that will be in a RANDOM mount point. */
   private static final String TEST_DIR_RANDOM = "/random";
+  /** Directory that will be in a SPACE mount point. */
+  private static final String TEST_DIR_SPACE = "/space";
 
   /** Number of namespaces. */
   private static final int NUM_NAMESPACES = 2;
@@ -103,6 +105,7 @@
     // Setup the test mount point
     createMountTableEntry(TEST_DIR_HASH_ALL, DestinationOrder.HASH_ALL);
     createMountTableEntry(TEST_DIR_RANDOM, DestinationOrder.RANDOM);
+    createMountTableEntry(TEST_DIR_SPACE, DestinationOrder.SPACE);
 
     // Get filesystems for federated and each namespace
     routerFs = routerContext.getFileSystem();
@@ -135,6 +138,11 @@
     testAll(TEST_DIR_RANDOM);
   }
 
+  @Test
+  public void testSpaceAll() throws Exception {
+    testAll(TEST_DIR_SPACE);
+  }
+
   /**
    * Tests that the resolver spreads files across subclusters in the whole
    * tree.