HDFS-13291. RBF: Implement available space based OrderResolver. Contributed by Yiqun Lin.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java
index 8dd73ec..e31077e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java
@@ -21,6 +21,7 @@
import java.util.EnumMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.resolver.order.HashFirstResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver;
@@ -77,6 +78,8 @@
addResolver(DestinationOrder.LOCAL, new LocalResolver(conf, router));
addResolver(DestinationOrder.RANDOM, new RandomResolver());
addResolver(DestinationOrder.HASH_ALL, new HashResolver());
+ addResolver(DestinationOrder.SPACE,
+ new AvailableSpaceResolver(conf, router));
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/AvailableSpaceResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/AvailableSpaceResolver.java
new file mode 100644
index 0000000..77a35a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/AvailableSpaceResolver.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.SubclusterAvailableSpace;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Order the destinations based on available space. This resolver uses a
+ * higher probability (instead of "always") to choose the cluster with higher
+ * available space.
+ */
+public class AvailableSpaceResolver
+ extends RouterResolver<String, SubclusterAvailableSpace> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AvailableSpaceResolver.class);
+
+ /** Increases chance of files on subcluster with more available space. */
+ public static final String BALANCER_PREFERENCE_KEY =
+ RBFConfigKeys.FEDERATION_ROUTER_PREFIX
+ + "available-space-resolver.balanced-space-preference-fraction";
+ public static final float BALANCER_PREFERENCE_DEFAULT = 0.6f;
+
+ /** Random instance used in the subcluster comparison. */
+ private static final Random RAND = new Random();
+
+ /** Customized comparator for SubclusterAvailableSpace. */
+ private SubclusterSpaceComparator comparator;
+
+ public AvailableSpaceResolver(final Configuration conf,
+ final Router routerService) {
+ super(conf, routerService);
+ float balancedPreference = conf.getFloat(BALANCER_PREFERENCE_KEY,
+ BALANCER_PREFERENCE_DEFAULT);
+ if (balancedPreference < 0.5) {
+ LOG.warn("The balancer preference value is less than 0.5. That means more"
+ + " files will be allocated in cluster with lower available space.");
+ }
+
+ this.comparator = new SubclusterSpaceComparator(balancedPreference);
+ }
+
+ /**
+ * Get the mapping from NamespaceId to subcluster space info. It gets this
+ * mapping from the subclusters through expensive calls (e.g., RPC) and uses
+ * caching to avoid too many calls. The cache might be updated asynchronously
+ * to reduce latency.
+ *
+ * @return NamespaceId -> {@link SubclusterAvailableSpace}
+ */
+ @Override
+ protected Map<String, SubclusterAvailableSpace> getSubclusterInfo(
+ MembershipStore membershipStore) {
+ Map<String, SubclusterAvailableSpace> mapping = new HashMap<>();
+ try {
+ // Get the Namenode's available space info from the subclusters
+ // from the Membership store.
+ GetNamenodeRegistrationsRequest request = GetNamenodeRegistrationsRequest
+ .newInstance();
+ GetNamenodeRegistrationsResponse response = membershipStore
+ .getNamenodeRegistrations(request);
+ final List<MembershipState> nns = response.getNamenodeMemberships();
+ for (MembershipState nn : nns) {
+ try {
+ String nsId = nn.getNameserviceId();
+ long availableSpace = nn.getStats().getAvailableSpace();
+ mapping.put(nsId, new SubclusterAvailableSpace(nsId, availableSpace));
+ } catch (Exception e) {
+ LOG.error("Cannot get stats info for {}: {}.", nn, e.getMessage());
+ }
+ }
+ } catch (IOException ioe) {
+ LOG.error("Cannot get Namenodes from the State Store.", ioe);
+ }
+ return mapping;
+ }
+
+ @Override
+ protected String chooseFirstNamespace(String path, PathLocation loc) {
+ Map<String, SubclusterAvailableSpace> subclusterInfo =
+ getSubclusterMapping();
+ List<SubclusterAvailableSpace> subclusterList = new LinkedList<>(
+ subclusterInfo.values());
+ Collections.sort(subclusterList, comparator);
+
+ return subclusterList.size() > 0 ? subclusterList.get(0).getNameserviceId()
+ : null;
+ }
+
+ /**
+ * Inner class that stores cluster available space info.
+ */
+ static class SubclusterAvailableSpace {
+ private final String nsId;
+ private final long availableSpace;
+
+ SubclusterAvailableSpace(String nsId, long availableSpace) {
+ this.nsId = nsId;
+ this.availableSpace = availableSpace;
+ }
+
+ public String getNameserviceId() {
+ return this.nsId;
+ }
+
+ public long getAvailableSpace() {
+ return this.availableSpace;
+ }
+ }
+
+ /**
+ * Customized comparator for SubclusterAvailableSpace. If more available
+ * space the one cluster has, the higher priority it will have. But this
+ * is not absolute, there is a balanced preference to make this use a higher
+ * probability (instead of "always") to compare by this way.
+ */
+ static final class SubclusterSpaceComparator
+ implements Comparator<SubclusterAvailableSpace>, Serializable {
+ private int balancedPreference;
+
+ SubclusterSpaceComparator(float balancedPreference) {
+ Preconditions.checkArgument(
+ balancedPreference <= 1 && balancedPreference >= 0,
+ "The balancer preference value should be in the range 0.0 - 1.0");
+
+ this.balancedPreference = (int) (100 * balancedPreference);
+ }
+
+ @Override
+ public int compare(SubclusterAvailableSpace cluster1,
+ SubclusterAvailableSpace cluster2) {
+ int ret = cluster1.getAvailableSpace() > cluster2.getAvailableSpace() ? -1
+ : 1;
+
+ if (ret < 0) {
+ return (RAND.nextInt(100) < balancedPreference) ? -1 : 1;
+ } else {
+ return (RAND.nextInt(100) < balancedPreference) ? 1 : -1;
+ }
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
index 03e68e5..99c5e22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
@@ -26,5 +26,6 @@
HASH, // Follow consistent hashing in the first folder level
LOCAL, // Local first
RANDOM, // Random order
- HASH_ALL // Follow consistent hashing
+ HASH_ALL, // Follow consistent hashing
+ SPACE // Available space based order
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java
index 4d76c89..b6bd4b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java
@@ -17,11 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
-import static org.apache.hadoop.util.Time.monotonicNow;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.net.HostAndPort;
-
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -30,17 +25,14 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
-import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
-import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
@@ -50,40 +42,46 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.net.HostAndPort;
+
/**
* The local subcluster (where the writer is) should be tried first. The writer
* is defined from the RPC query received in the RPC server.
*/
-public class LocalResolver implements OrderedResolver {
+public class LocalResolver extends RouterResolver<String, String> {
private static final Logger LOG =
LoggerFactory.getLogger(LocalResolver.class);
- /** Configuration key to set the minimum time to update the local cache.*/
- public static final String MIN_UPDATE_PERIOD_KEY =
- RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "local-resolver.update-period";
- /** 10 seconds by default. */
- private static final long MIN_UPDATE_PERIOD_DEFAULT =
- TimeUnit.SECONDS.toMillis(10);
-
-
- /** Router service. */
- private final Router router;
- /** Minimum update time. */
- private final long minUpdateTime;
-
- /** Node IP -> Subcluster. */
- private Map<String, String> nodeSubcluster = null;
- /** Last time the subcluster map was updated. */
- private long lastUpdated;
-
-
public LocalResolver(final Configuration conf, final Router routerService) {
- this.minUpdateTime = conf.getTimeDuration(
- MIN_UPDATE_PERIOD_KEY, MIN_UPDATE_PERIOD_DEFAULT,
- TimeUnit.MILLISECONDS);
- this.router = routerService;
+ super(conf, routerService);
+ }
+
+ /**
+ * Get the mapping from nodes to subcluster. It gets this mapping from the
+ * subclusters through expensive calls (e.g., RPC) and uses caching to avoid
+ * too many calls. The cache might be updated asynchronously to reduce
+ * latency.
+ *
+ * @return Node IP -> Subcluster.
+ */
+ @Override
+ protected Map<String, String> getSubclusterInfo(
+ MembershipStore membershipStore) {
+ Map<String, String> mapping = new HashMap<>();
+
+ Map<String, String> dnSubcluster = getDatanodesSubcluster();
+ if (dnSubcluster != null) {
+ mapping.putAll(dnSubcluster);
+ }
+
+ Map<String, String> nnSubcluster = getNamenodesSubcluster(membershipStore);
+ if (nnSubcluster != null) {
+ mapping.putAll(nnSubcluster);
+ }
+ return mapping;
}
/**
@@ -98,12 +96,12 @@
* @return Local name space. Null if we don't know about this machine.
*/
@Override
- public String getFirstNamespace(final String path, final PathLocation loc) {
+ protected String chooseFirstNamespace(String path, PathLocation loc) {
String localSubcluster = null;
String clientAddr = getClientAddr();
- Map<String, String> nodeToSubcluster = getSubclusterMappings();
- if (nodeToSubcluster != null) {
- localSubcluster = nodeToSubcluster.get(clientAddr);
+ Map<String, String> subclusterInfo = getSubclusterMapping();
+ if (subclusterInfo != null) {
+ localSubcluster = subclusterInfo.get(clientAddr);
if (localSubcluster != null) {
LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster);
} else {
@@ -122,52 +120,6 @@
}
/**
- * Get the mapping from nodes to subcluster. It gets this mapping from the
- * subclusters through expensive calls (e.g., RPC) and uses caching to avoid
- * too many calls. The cache might be updated asynchronously to reduce
- * latency.
- *
- * @return Node IP -> Subcluster.
- */
- @VisibleForTesting
- synchronized Map<String, String> getSubclusterMappings() {
- if (nodeSubcluster == null ||
- (monotonicNow() - lastUpdated) > minUpdateTime) {
- // Fetch the mapping asynchronously
- Thread updater = new Thread(new Runnable() {
- @Override
- public void run() {
- Map<String, String> mapping = new HashMap<>();
-
- Map<String, String> dnSubcluster = getDatanodesSubcluster();
- if (dnSubcluster != null) {
- mapping.putAll(dnSubcluster);
- }
-
- Map<String, String> nnSubcluster = getNamenodesSubcluster();
- if (nnSubcluster != null) {
- mapping.putAll(nnSubcluster);
- }
- nodeSubcluster = mapping;
- lastUpdated = monotonicNow();
- }
- });
- updater.start();
-
- // Wait until initialized
- if (nodeSubcluster == null) {
- try {
- LOG.debug("Wait to get the mapping for the first time");
- updater.join();
- } catch (InterruptedException e) {
- LOG.error("Cannot wait for the updater to finish");
- }
- }
- }
- return nodeSubcluster;
- }
-
- /**
* Get the Datanode mapping from the subclusters from the Namenodes. This
* needs to be done as a privileged action to use the user for the Router and
* not the one from the client in the RPC call.
@@ -221,14 +173,8 @@
*
* @return NN IP -> Subcluster.
*/
- private Map<String, String> getNamenodesSubcluster() {
-
- final MembershipStore membershipStore = getMembershipStore();
- if (membershipStore == null) {
- LOG.error("Cannot access the Membership store");
- return null;
- }
-
+ private Map<String, String> getNamenodesSubcluster(
+ MembershipStore membershipStore) {
// Manage requests from this hostname (127.0.0.1)
String localIp = "127.0.0.1";
String localHostname = localIp;
@@ -269,29 +215,4 @@
}
return ret;
}
-
- /**
- * Get the Router RPC server.
- *
- * @return Router RPC server. Null if not possible.
- */
- private RouterRpcServer getRpcServer() {
- if (this.router == null) {
- return null;
- }
- return router.getRpcServer();
- }
-
- /**
- * Get the Membership store.
- *
- * @return Membership store.
- */
- private MembershipStore getMembershipStore() {
- StateStoreService stateStore = router.getStateStore();
- if (stateStore == null) {
- return null;
- }
- return stateStore.getRegisteredRecordStore(MembershipStore.class);
- }
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RouterResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RouterResolver.java
new file mode 100644
index 0000000..91af1ca
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RouterResolver.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The order resolver that depends upon the Router service.
+ *
+ * @param <K> The key type of subcluster mapping info queried from Router.
+ * @param <V> The value type of subcluster mapping info queried from Router.
+ */
+public abstract class RouterResolver<K, V> implements OrderedResolver {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterResolver.class);
+
+ /** Configuration key to set the minimum time to update subcluster info. */
+ public static final String MIN_UPDATE_PERIOD_KEY =
+ RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "router-resolver.update-period";
+ /** 10 seconds by default. */
+ private static final long MIN_UPDATE_PERIOD_DEFAULT = TimeUnit.SECONDS
+ .toMillis(10);
+
+ /** Router service. */
+ private final Router router;
+ /** Minimum update time. */
+ private final long minUpdateTime;
+
+ /** K -> T template mapping. */
+ private Map<K, V> subclusterMapping = null;
+ /** Last time the subcluster mapping was updated. */
+ private long lastUpdated;
+
+ public RouterResolver(final Configuration conf, final Router routerService) {
+ this.minUpdateTime = conf.getTimeDuration(MIN_UPDATE_PERIOD_KEY,
+ MIN_UPDATE_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
+ this.router = routerService;
+ }
+
+ @Override
+ public String getFirstNamespace(String path, PathLocation loc) {
+ updateSubclusterMapping();
+ return chooseFirstNamespace(path, loc);
+ }
+
+ /**
+ * The implementation for getting desired subcluster mapping info.
+ *
+ * @param membershipStore Membership store the resolver queried from.
+ * @return The map of desired type info.
+ */
+ protected abstract Map<K, V> getSubclusterInfo(
+ MembershipStore membershipStore);
+
+ /**
+ * Choose the first namespace from queried subcluster mapping info.
+ *
+ * @param path Path to check.
+ * @param loc Federated location with multiple destinations.
+ * @return First namespace out of the locations.
+ */
+ protected abstract String chooseFirstNamespace(String path, PathLocation loc);
+
+ /**
+ * Update <NamespaceId, Subcluster Info> mapping info periodically.
+ */
+ private synchronized void updateSubclusterMapping() {
+ if (subclusterMapping == null
+ || (monotonicNow() - lastUpdated) > minUpdateTime) {
+ // Fetch the mapping asynchronously
+ Thread updater = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ final MembershipStore membershipStore = getMembershipStore();
+ if (membershipStore == null) {
+ LOG.error("Cannot access the Membership store.");
+ return;
+ }
+
+ subclusterMapping = getSubclusterInfo(membershipStore);
+ lastUpdated = monotonicNow();
+ }
+ });
+ updater.start();
+
+ // Wait until initialized
+ if (subclusterMapping == null) {
+ try {
+ LOG.debug("Wait to get the mapping for the first time");
+ updater.join();
+ } catch (InterruptedException e) {
+ LOG.error("Cannot wait for the updater to finish");
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the Router RPC server.
+ *
+ * @return Router RPC server. Null if not possible.
+ */
+ protected RouterRpcServer getRpcServer() {
+ if (this.router == null) {
+ return null;
+ }
+ return router.getRpcServer();
+ }
+
+ /**
+ * Get the Membership store.
+ *
+ * @return Membership store.
+ */
+ protected MembershipStore getMembershipStore() {
+ StateStoreService stateStore = router.getStateStore();
+ if (stateStore == null) {
+ return null;
+ }
+ return stateStore.getRegisteredRecordStore(MembershipStore.class);
+ }
+
+ /**
+ * Get subcluster mapping info.
+ *
+ * @return The map of subcluster info.
+ */
+ protected Map<K, V> getSubclusterMapping() {
+ return this.subclusterMapping;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index b3f677d..eaa3951 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -534,32 +534,21 @@
}
/**
- * Get the permissions for the parent of a child with given permissions. If
- * the child has r--, we will set it to r-x.
+ * Get the permissions for the parent of a child with given permissions.
+ * Add implicit u+wx permission for parent. This is based on
+ * @{FSDirMkdirOp#addImplicitUwx}.
* @param mask The permission mask of the child.
* @return The permission mask of the parent.
*/
private static FsPermission getParentPermission(final FsPermission mask) {
FsPermission ret = new FsPermission(
- applyExecute(mask.getUserAction()),
- applyExecute(mask.getGroupAction()),
- applyExecute(mask.getOtherAction()));
+ mask.getUserAction().or(FsAction.WRITE_EXECUTE),
+ mask.getGroupAction(),
+ mask.getOtherAction());
return ret;
}
/**
- * Apply the execute permissions if it can be read.
- * @param action Input permission.
- * @return Output permission.
- */
- private static FsAction applyExecute(final FsAction action) {
- if (action.and(FsAction.READ) == FsAction.READ) {
- return action.or(FsAction.EXECUTE);
- }
- return action;
- }
-
- /**
* Get the location to create a file. It checks if the file already existed
* in one of the locations.
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
index f8fec87..60496ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
@@ -421,7 +421,8 @@
public boolean isAll() {
DestinationOrder order = getDestOrder();
return order == DestinationOrder.HASH_ALL ||
- order == DestinationOrder.RANDOM;
+ order == DestinationOrder.RANDOM ||
+ order == DestinationOrder.SPACE;
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
index 9667489..48f93bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
@@ -297,6 +297,8 @@
return DestinationOrder.RANDOM;
case HASH_ALL:
return DestinationOrder.HASH_ALL;
+ case SPACE:
+ return DestinationOrder.SPACE;
default:
return DestinationOrder.HASH;
}
@@ -310,6 +312,8 @@
return DestOrder.RANDOM;
case HASH_ALL:
return DestOrder.HASH_ALL;
+ case SPACE:
+ return DestOrder.SPACE;
default:
return DestOrder.HASH;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
index b0d6982..b6a0b12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
@@ -130,6 +130,7 @@
LOCAL = 1;
RANDOM = 2;
HASH_ALL = 3;
+ SPACE = 4;
}
optional DestOrder destOrder = 6 [default = HASH];
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestAvailableSpaceResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestAvailableSpaceResolver.java
new file mode 100644
index 0000000..dfbdf51
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestAvailableSpaceResolver.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import static org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.BALANCER_PREFERENCE_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.BALANCER_PREFERENCE_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.SubclusterAvailableSpace;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.AvailableSpaceResolver.SubclusterSpaceComparator;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MembershipStatsPBImpl;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+/**
+ * Test the {@link AvailableSpaceResolver}.
+ */
+public class TestAvailableSpaceResolver {
+
+ private static final int SUBCLUSTER_NUM = 10;
+
+ @Test
+ public void testResolverWithNoPreference() throws IOException {
+ MultipleDestinationMountTableResolver mountTableResolver =
+ mockAvailableSpaceResolver(1.0f);
+ // Since we don't have any preference, it will
+ // always chose the maximum-available-space subcluster.
+ PathLocation loc = mountTableResolver.getDestinationForPath("/space");
+ assertEquals("subcluster9",
+ loc.getDestinations().get(0).getNameserviceId());
+
+ loc = mountTableResolver.getDestinationForPath("/space/subdir");
+ assertEquals("subcluster9",
+ loc.getDestinations().get(0).getNameserviceId());
+ }
+
+ @Test
+ public void testResolverWithDefaultPreference() throws IOException {
+ MultipleDestinationMountTableResolver mountTableResolver =
+ mockAvailableSpaceResolver(BALANCER_PREFERENCE_DEFAULT);
+
+ int retries = 10;
+ int retryTimes = 0;
+ // There is chance we won't always chose the
+ // maximum-available-space subcluster.
+ for (retryTimes = 0; retryTimes < retries; retryTimes++) {
+ PathLocation loc = mountTableResolver.getDestinationForPath("/space");
+ if (!"subcluster9"
+ .equals(loc.getDestinations().get(0).getNameserviceId())) {
+ break;
+ }
+ }
+ assertNotEquals(retries, retryTimes);
+ }
+
+ /**
+ * Mock the available space based resolver.
+ *
+ * @param balancerPreference The balancer preference for the resolver.
+ * @throws IOException
+ * @return MultipleDestinationMountTableResolver instance.
+ */
+ @SuppressWarnings("unchecked")
+ private MultipleDestinationMountTableResolver mockAvailableSpaceResolver(
+ float balancerPreference) throws IOException {
+ Configuration conf = new Configuration();
+ conf.setFloat(BALANCER_PREFERENCE_KEY, balancerPreference);
+ Router router = mock(Router.class);
+ StateStoreService stateStore = mock(StateStoreService.class);
+ MembershipStore membership = mock(MembershipStore.class);
+ when(router.getStateStore()).thenReturn(stateStore);
+ when(stateStore.getRegisteredRecordStore(any(Class.class)))
+ .thenReturn(membership);
+ GetNamenodeRegistrationsResponse response =
+ GetNamenodeRegistrationsResponse.newInstance();
+ // Set the mapping for each client
+ List<MembershipState> records = new LinkedList<>();
+ for (int i = 0; i < SUBCLUSTER_NUM; i++) {
+ records.add(newMembershipState("subcluster" + i, i));
+ }
+ response.setNamenodeMemberships(records);
+
+ when(membership
+ .getNamenodeRegistrations(any(GetNamenodeRegistrationsRequest.class)))
+ .thenReturn(response);
+
+ // construct available space resolver
+ AvailableSpaceResolver resolver = new AvailableSpaceResolver(conf, router);
+ MultipleDestinationMountTableResolver mountTableResolver =
+ new MultipleDestinationMountTableResolver(conf, router);
+ mountTableResolver.addResolver(DestinationOrder.SPACE, resolver);
+
+ // We point /space to subclusters [0,..9] with the SPACE order
+ Map<String, String> destinations = new HashMap<>();
+ for (int i = 0; i < SUBCLUSTER_NUM; i++) {
+ destinations.put("subcluster" + i, "/space");
+ }
+ MountTable spaceEntry = MountTable.newInstance("/space", destinations);
+ spaceEntry.setDestOrder(DestinationOrder.SPACE);
+ mountTableResolver.addEntry(spaceEntry);
+
+ return mountTableResolver;
+ }
+
+ public static MembershipState newMembershipState(String nameservice,
+ long availableSpace) {
+ MembershipState record = MembershipState.newInstance();
+ record.setNameserviceId(nameservice);
+
+ MembershipStats stats = new MembershipStatsPBImpl();
+ stats.setAvailableSpace(availableSpace);
+ record.setStats(stats);
+ return record;
+ }
+
+ @Test
+ public void testSubclusterSpaceComparator() {
+ verifyRank(0.0f, true, false);
+ verifyRank(1.0f, true, true);
+ verifyRank(0.5f, false, false);
+ verifyRank(BALANCER_PREFERENCE_DEFAULT, false, false);
+
+ // test for illegal cases
+ try {
+ verifyRank(2.0f, false, false);
+ fail("Subcluster comparison should be failed.");
+ } catch (IllegalArgumentException e) {
+ GenericTestUtils.assertExceptionContains(
+ "The balancer preference value should be in the range 0.0 - 1.0", e);
+ }
+
+ try {
+ verifyRank(-1.0f, false, false);
+ fail("Subcluster comparison should be failed.");
+ } catch (IllegalArgumentException e) {
+ GenericTestUtils.assertExceptionContains(
+ "The balancer preference value should be in the range 0.0 - 1.0", e);
+ }
+ }
+
+ /**
+ * Verify result rank with {@link SubclusterSpaceComparator}.
+ * @param balancerPreference The balancer preference used
+ * in {@link SubclusterSpaceComparator}.
+ * @param shouldOrdered The result rank should be ordered.
+ * @param isDesc If the rank result is in a descending order.
+ */
+ private void verifyRank(float balancerPreference, boolean shouldOrdered,
+ boolean isDesc) {
+ List<SubclusterAvailableSpace> subclusters = new LinkedList<>();
+ for (int i = 0; i < SUBCLUSTER_NUM; i++) {
+ subclusters.add(new SubclusterAvailableSpace("subcluster" + i, i));
+ }
+
+ // shuffle the cluster list if we expect rank to be ordered
+ if (shouldOrdered) {
+ Collections.shuffle(subclusters);
+ }
+
+ SubclusterSpaceComparator comparator = new SubclusterSpaceComparator(
+ balancerPreference);
+ Collections.sort(subclusters, comparator);
+
+ int i = SUBCLUSTER_NUM - 1;
+ for (; i >= 0; i--) {
+ SubclusterAvailableSpace cluster = subclusters
+ .get(SUBCLUSTER_NUM - 1 - i);
+
+ if (shouldOrdered) {
+ if (isDesc) {
+ assertEquals("subcluster" + i, cluster.getNameserviceId());
+ assertEquals(i, cluster.getAvailableSpace());
+ } else {
+ assertEquals("subcluster" + (SUBCLUSTER_NUM - 1 - i),
+ cluster.getNameserviceId());
+ assertEquals(SUBCLUSTER_NUM - 1 - i, cluster.getAvailableSpace());
+ }
+ } else {
+ // If catch one cluster is not in ordered, that's expected behavior.
+ if (!cluster.getNameserviceId().equals("subcluster" + i)
+ && cluster.getAvailableSpace() != i) {
+ break;
+ }
+ }
+ }
+
+ // The var i won't reach to 0 since cluster list won't be completely
+ // ordered.
+ if (!shouldOrdered) {
+ assertNotEquals(0, i);
+ }
+ subclusters.clear();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java
index 4995de4..715b627 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java
@@ -60,8 +60,10 @@
/** Directory that will be in a HASH_ALL mount point. */
private static final String TEST_DIR_HASH_ALL = "/hashall";
- /** Directory that will be in a HASH_ALL mount point. */
+ /** Directory that will be in a RANDOM mount point. */
private static final String TEST_DIR_RANDOM = "/random";
+ /** Directory that will be in a SPACE mount point. */
+ private static final String TEST_DIR_SPACE = "/space";
/** Number of namespaces. */
private static final int NUM_NAMESPACES = 2;
@@ -103,6 +105,7 @@
// Setup the test mount point
createMountTableEntry(TEST_DIR_HASH_ALL, DestinationOrder.HASH_ALL);
createMountTableEntry(TEST_DIR_RANDOM, DestinationOrder.RANDOM);
+ createMountTableEntry(TEST_DIR_SPACE, DestinationOrder.SPACE);
// Get filesystems for federated and each namespace
routerFs = routerContext.getFileSystem();
@@ -135,6 +138,11 @@
testAll(TEST_DIR_RANDOM);
}
+ @Test
+ public void testSpaceAll() throws Exception {
+ testAll(TEST_DIR_SPACE);
+ }
+
/**
* Tests that the resolver spreads files across subclusters in the whole
* tree.