blob: bdd75c70f5c87806bb3307ce14a6cd00576db482 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_MOUNT_TABLE_CACHE_ENABLE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_MOUNT_TABLE_CACHE_ENABLE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.isParentEntry;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
* Mount table to map between global paths and remote locations. This allows the
* {@link org.apache.hadoop.hdfs.server.federation.router.Router Router} to map
* the global HDFS view to the remote namespaces. This is similar to
* {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}.
* This is implemented as a tree.
*/
public class MountTableResolver
implements FileSubclusterResolver, StateStoreCache {
private static final Logger LOG =
LoggerFactory.getLogger(MountTableResolver.class);
/** Reference to Router. */
private final Router router;
/** Reference to the State Store. */
private final StateStoreService stateStore;
/** Interface to the mount table store. */
private MountTableStore mountTableStore;
/** If the tree has been initialized. */
private boolean init = false;
/** Path -> Remote HDFS location. */
private final TreeMap<String, MountTable> tree = new TreeMap<>();
/** Path -> Remote location. */
private final Cache<String, PathLocation> locationCache;
/** Default nameservice when no mount matches the math. */
private String defaultNameService = "";
/** If use default nameservice to read and write files. */
private boolean defaultNSEnable = true;
/** Synchronization for both the tree and the cache. */
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
@VisibleForTesting
public MountTableResolver(Configuration conf) {
this(conf, (StateStoreService)null);
}
public MountTableResolver(Configuration conf, Router routerService) {
this(conf, routerService, null);
}
public MountTableResolver(Configuration conf, StateStoreService store) {
this(conf, null, store);
}
public MountTableResolver(Configuration conf, Router routerService,
StateStoreService store) {
this.router = routerService;
if (store != null) {
this.stateStore = store;
} else if (this.router != null) {
this.stateStore = this.router.getStateStore();
} else {
this.stateStore = null;
}
boolean mountTableCacheEnable = conf.getBoolean(
FEDERATION_MOUNT_TABLE_CACHE_ENABLE,
FEDERATION_MOUNT_TABLE_CACHE_ENABLE_DEFAULT);
if (mountTableCacheEnable) {
int maxCacheSize = conf.getInt(
FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE,
FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT);
this.locationCache = CacheBuilder.newBuilder()
.maximumSize(maxCacheSize)
.build();
} else {
this.locationCache = null;
}
registerCacheExternal();
initDefaultNameService(conf);
}
/**
* Request cache updates from the State Store for this resolver.
*/
private void registerCacheExternal() {
if (this.stateStore != null) {
this.stateStore.registerCacheExternal(this);
}
}
/**
* Nameservice for APIs that cannot be resolved to a specific one.
*
* @param conf Configuration for this resolver.
*/
private void initDefaultNameService(Configuration conf) {
this.defaultNameService = conf.get(
DFS_ROUTER_DEFAULT_NAMESERVICE,
DFSUtil.getNamenodeNameServiceId(conf));
this.defaultNSEnable = conf.getBoolean(
DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE,
DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE_DEFAULT);
if (defaultNameService == null) {
LOG.warn(
"{} and {} is not set. Fallback to {} as the default name service.",
DFS_ROUTER_DEFAULT_NAMESERVICE, DFS_NAMESERVICE_ID, DFS_NAMESERVICES);
Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
if (nsIds.isEmpty()) {
this.defaultNameService = "";
} else {
this.defaultNameService = nsIds.iterator().next();
}
}
if (this.defaultNameService.equals("")) {
this.defaultNSEnable = false;
LOG.warn("Default name service is not set.");
} else {
String enable = this.defaultNSEnable ? "enabled" : "disabled";
LOG.info("Default name service: {}, {} to read or write",
this.defaultNameService, enable);
}
}
/**
* Get a reference for the Router for this resolver.
*
* @return Router for this resolver.
*/
protected Router getRouter() {
return this.router;
}
/**
* Get the mount table store for this resolver.
*
* @return Mount table store.
* @throws IOException If it cannot connect to the State Store.
*/
protected MountTableStore getMountTableStore() throws IOException {
if (this.mountTableStore == null) {
this.mountTableStore = this.stateStore.getRegisteredRecordStore(
MountTableStore.class);
if (this.mountTableStore == null) {
throw new IOException("State Store does not have an interface for " +
MountTableStore.class);
}
}
return this.mountTableStore;
}
/**
* Add a mount entry to the table.
*
* @param entry The mount table record to add from the state store.
*/
public void addEntry(final MountTable entry) {
writeLock.lock();
try {
String srcPath = entry.getSourcePath();
this.tree.put(srcPath, entry);
invalidateLocationCache(srcPath);
} finally {
writeLock.unlock();
}
this.init = true;
}
/**
* Remove a mount table entry.
*
* @param srcPath Source path for the entry to remove.
*/
public void removeEntry(final String srcPath) {
writeLock.lock();
try {
this.tree.remove(srcPath);
invalidateLocationCache(srcPath);
} finally {
writeLock.unlock();
}
}
/**
* Invalidates all cache entries below this path. It requires the write lock.
*
* @param path Source path.
*/
private void invalidateLocationCache(final String path) {
LOG.debug("Invalidating {} from {}", path, locationCache);
if (locationCache == null || locationCache.size() == 0) {
return;
}
// Go through the entries and remove the ones from the path to invalidate
ConcurrentMap<String, PathLocation> map = locationCache.asMap();
Set<Entry<String, PathLocation>> entries = map.entrySet();
Iterator<Entry<String, PathLocation>> it = entries.iterator();
while (it.hasNext()) {
Entry<String, PathLocation> entry = it.next();
PathLocation loc = entry.getValue();
String src = loc.getSourcePath();
if (src != null) {
if (isParentEntry(src, path)) {
LOG.debug("Removing {}", src);
it.remove();
}
} else {
String dest = loc.getDefaultLocation().getDest();
if (dest.startsWith(path)) {
LOG.debug("Removing default cache {}", dest);
it.remove();
}
}
}
LOG.debug("Location cache after invalidation: {}", locationCache);
}
/**
* Updates the mount path tree with a new set of mount table entries. It also
* updates the needed caches.
*
* @param entries Full set of mount table entries to update.
*/
@VisibleForTesting
public void refreshEntries(final Collection<MountTable> entries) {
// The tree read/write must be atomic
writeLock.lock();
try {
// New entries
Map<String, MountTable> newEntries = new ConcurrentHashMap<>();
for (MountTable entry : entries) {
String srcPath = entry.getSourcePath();
newEntries.put(srcPath, entry);
}
// Old entries (reversed to sort from the leaves to the root)
Set<String> oldEntries = new TreeSet<>(Collections.reverseOrder());
for (MountTable entry : getTreeValues("/")) {
String srcPath = entry.getSourcePath();
oldEntries.add(srcPath);
}
// Entries that need to be removed
for (String srcPath : oldEntries) {
if (!newEntries.containsKey(srcPath)) {
this.tree.remove(srcPath);
invalidateLocationCache(srcPath);
LOG.info("Removed stale mount point {} from resolver", srcPath);
}
}
// Entries that need to be added
for (MountTable entry : entries) {
String srcPath = entry.getSourcePath();
if (!oldEntries.contains(srcPath)) {
// Add node, it does not exist
this.tree.put(srcPath, entry);
invalidateLocationCache(srcPath);
LOG.info("Added new mount point {} to resolver", srcPath);
} else {
// Node exists, check for updates
MountTable existingEntry = this.tree.get(srcPath);
if (existingEntry != null && !existingEntry.equals(entry)) {
LOG.info("Entry has changed from \"{}\" to \"{}\"",
existingEntry, entry);
this.tree.put(srcPath, entry);
invalidateLocationCache(srcPath);
LOG.info("Updated mount point {} in resolver", srcPath);
}
}
}
} finally {
writeLock.unlock();
}
this.init = true;
}
/**
* Replaces the current in-memory cached of the mount table with a new
* version fetched from the data store.
*/
@Override
public boolean loadCache(boolean force) {
try {
// Our cache depends on the store, update it first
MountTableStore mountTable = this.getMountTableStore();
mountTable.loadCache(force);
GetMountTableEntriesRequest request =
GetMountTableEntriesRequest.newInstance("/");
GetMountTableEntriesResponse response =
mountTable.getMountTableEntries(request);
List<MountTable> records = response.getEntries();
refreshEntries(records);
} catch (IOException e) {
LOG.error("Cannot fetch mount table entries from State Store", e);
return false;
}
return true;
}
/**
* Clears all data.
*/
public void clear() {
LOG.info("Clearing all mount location caches");
writeLock.lock();
try {
if (this.locationCache != null) {
this.locationCache.invalidateAll();
}
this.tree.clear();
} finally {
writeLock.unlock();
}
}
@Override
public PathLocation getDestinationForPath(final String path)
throws IOException {
verifyMountTable();
readLock.lock();
try {
if (this.locationCache == null) {
return lookupLocation(path);
}
Callable<? extends PathLocation> meh = new Callable<PathLocation>() {
@Override
public PathLocation call() throws Exception {
return lookupLocation(path);
}
};
return this.locationCache.get(path, meh);
} catch (ExecutionException e) {
throw new IOException(e);
} finally {
readLock.unlock();
}
}
/**
* Build the path location to insert into the cache atomically. It must hold
* the read lock.
* @param path Path to check/insert.
* @return New remote location.
*/
public PathLocation lookupLocation(final String path) throws IOException {
PathLocation ret = null;
MountTable entry = findDeepest(path);
if (entry != null) {
ret = buildLocation(path, entry);
} else {
// Not found, use default location
if (!defaultNSEnable) {
throw new IOException("Cannot find locations for " + path + ", " +
"because the default nameservice is disabled to read or write");
}
RemoteLocation remoteLocation =
new RemoteLocation(defaultNameService, path, path);
List<RemoteLocation> locations =
Collections.singletonList(remoteLocation);
ret = new PathLocation(null, locations);
}
return ret;
}
/**
* Get the mount table entry for a path.
*
* @param path Path to look for.
* @return Mount table entry the path belongs.
* @throws IOException If the State Store could not be reached.
*/
public MountTable getMountPoint(final String path) throws IOException {
verifyMountTable();
return findDeepest(path);
}
@Override
public List<String> getMountPoints(final String path) throws IOException {
verifyMountTable();
Set<String> children = new TreeSet<>();
readLock.lock();
try {
String from = path;
String to = path + Character.MAX_VALUE;
SortedMap<String, MountTable> subMap = this.tree.subMap(from, to);
boolean exists = false;
for (String subPath : subMap.keySet()) {
String child = subPath;
// Special case for /
if (!path.equals(Path.SEPARATOR)) {
// Get the children
int ini = path.length();
child = subPath.substring(ini);
}
if (child.isEmpty()) {
// This is a mount point but without children
exists = true;
} else if (child.startsWith(Path.SEPARATOR)) {
// This is a mount point with children
exists = true;
child = child.substring(1);
// We only return immediate children
int fin = child.indexOf(Path.SEPARATOR);
if (fin > -1) {
child = child.substring(0, fin);
}
if (!child.isEmpty()) {
children.add(child);
}
}
}
if (!exists) {
return null;
}
return new LinkedList<>(children);
} finally {
readLock.unlock();
}
}
/**
* Get all the mount records at or beneath a given path.
* @param path Path to get the mount points from.
* @return List of mount table records under the path or null if the path is
* not found.
* @throws IOException If it's not connected to the State Store.
*/
public List<MountTable> getMounts(final String path) throws IOException {
verifyMountTable();
return getTreeValues(path, false);
}
/**
* Check if the Mount Table is ready to be used.
* @throws StateStoreUnavailableException If it cannot connect to the store.
*/
private void verifyMountTable() throws StateStoreUnavailableException {
if (!this.init) {
throw new StateStoreUnavailableException("Mount Table not initialized");
}
}
@Override
public String toString() {
readLock.lock();
try {
return this.tree.toString();
} finally {
readLock.unlock();
}
}
/**
* Build a location for this result beneath the discovered mount point.
*
* @param path Path to build for.
* @param entry Mount table entry.
* @return PathLocation containing the namespace, local path.
*/
private static PathLocation buildLocation(
final String path, final MountTable entry) {
String srcPath = entry.getSourcePath();
if (!path.startsWith(srcPath)) {
LOG.error("Cannot build location, {} not a child of {}", path, srcPath);
return null;
}
String remainingPath = path.substring(srcPath.length());
if (remainingPath.startsWith(Path.SEPARATOR)) {
remainingPath = remainingPath.substring(1);
}
List<RemoteLocation> locations = new LinkedList<>();
for (RemoteLocation oneDst : entry.getDestinations()) {
String nsId = oneDst.getNameserviceId();
String dest = oneDst.getDest();
String newPath = dest;
if (!newPath.endsWith(Path.SEPARATOR) && !remainingPath.isEmpty()) {
newPath += Path.SEPARATOR;
}
newPath += remainingPath;
RemoteLocation remoteLocation = new RemoteLocation(nsId, newPath, path);
locations.add(remoteLocation);
}
DestinationOrder order = entry.getDestOrder();
return new PathLocation(srcPath, locations, order);
}
@Override
public String getDefaultNamespace() {
return this.defaultNameService;
}
/**
* Find the deepest mount point for a path.
* @param path Path to look for.
* @return Mount table entry.
*/
private MountTable findDeepest(final String path) {
readLock.lock();
try {
Entry<String, MountTable> entry = this.tree.floorEntry(path);
while (entry != null && !isParentEntry(path, entry.getKey())) {
entry = this.tree.lowerEntry(entry.getKey());
}
if (entry == null) {
return null;
}
return entry.getValue();
} finally {
readLock.unlock();
}
}
/**
* Get the mount table entries under a path.
* @param path Path to search from.
* @return Mount Table entries.
*/
private List<MountTable> getTreeValues(final String path) {
return getTreeValues(path, false);
}
/**
* Get the mount table entries under a path.
* @param path Path to search from.
* @param reverse If the order should be reversed.
* @return Mount Table entries.
*/
private List<MountTable> getTreeValues(final String path, boolean reverse) {
LinkedList<MountTable> ret = new LinkedList<>();
readLock.lock();
try {
String from = path;
String to = path + Character.MAX_VALUE;
SortedMap<String, MountTable> subMap = this.tree.subMap(from, to);
for (MountTable entry : subMap.values()) {
if (!reverse) {
ret.add(entry);
} else {
ret.addFirst(entry);
}
}
} finally {
readLock.unlock();
}
return ret;
}
/**
* Get the size of the cache.
* @return Size of the cache.
*/
protected long getCacheSize() throws IOException{
if (this.locationCache != null) {
return this.locationCache.size();
}
throw new IOException("localCache is null");
}
@VisibleForTesting
public String getDefaultNameService() {
return defaultNameService;
}
@VisibleForTesting
public void setDefaultNameService(String defaultNameService) {
this.defaultNameService = defaultNameService;
}
@VisibleForTesting
public boolean isDefaultNSEnable() {
return defaultNSEnable;
}
@VisibleForTesting
public void setDefaultNSEnable(boolean defaultNSRWEnable) {
this.defaultNSEnable = defaultNSRWEnable;
}
}