blob: f5636ceccd11ffb5126efc11183d9125d742c869 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodePriorityComparator;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.util.Time;
/**
* In-memory cache/mock of a namenode and file resolver. Stores the most
* recently updated NN information for each nameservice and block pool. It also
* stores a virtual mount table for resolving global namespace paths to local NN
* paths.
*/
public class MockResolver
implements ActiveNamenodeResolver, FileSubclusterResolver {
private Map<String, List<? extends FederationNamenodeContext>> resolver =
new HashMap<>();
private Map<String, List<RemoteLocation>> locations = new HashMap<>();
private Set<FederationNamespaceInfo> namespaces = new HashSet<>();
private String defaultNamespace = null;
public MockResolver() {
this.cleanRegistrations();
}
public MockResolver(Configuration conf) {
this();
}
public MockResolver(Configuration conf, StateStoreService store) {
this();
}
public MockResolver(Configuration conf, Router router) {
this();
}
public void addLocation(String mount, String nsId, String location) {
List<RemoteLocation> locationsList = this.locations.get(mount);
if (locationsList == null) {
locationsList = new LinkedList<>();
this.locations.put(mount, locationsList);
}
final RemoteLocation remoteLocation =
new RemoteLocation(nsId, location, mount);
if (!locationsList.contains(remoteLocation)) {
locationsList.add(remoteLocation);
}
if (this.defaultNamespace == null) {
this.defaultNamespace = nsId;
}
}
public synchronized void cleanRegistrations() {
this.resolver = new HashMap<>();
this.namespaces = new HashSet<>();
}
@Override
public void updateActiveNamenode(
String nsId, InetSocketAddress successfulAddress) {
String address = successfulAddress.getHostName() + ":" +
successfulAddress.getPort();
String key = nsId;
if (key != null) {
// Update the active entry
@SuppressWarnings("unchecked")
List<FederationNamenodeContext> namenodes =
(List<FederationNamenodeContext>) this.resolver.get(key);
for (FederationNamenodeContext namenode : namenodes) {
if (namenode.getRpcAddress().equals(address)) {
MockNamenodeContext nn = (MockNamenodeContext) namenode;
nn.setState(FederationNamenodeServiceState.ACTIVE);
break;
}
}
// This operation modifies the list so we need to be careful
synchronized(namenodes) {
Collections.sort(namenodes, new NamenodePriorityComparator());
}
}
}
@Override
public List<? extends FederationNamenodeContext>
getNamenodesForNameserviceId(String nameserviceId) {
// Return a copy of the list because it is updated periodically
List<? extends FederationNamenodeContext> namenodes =
this.resolver.get(nameserviceId);
if (namenodes == null) {
namenodes = new ArrayList<>();
}
return Collections.unmodifiableList(new ArrayList<>(namenodes));
}
@Override
public List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
String blockPoolId) {
// Return a copy of the list because it is updated periodically
List<? extends FederationNamenodeContext> namenodes =
this.resolver.get(blockPoolId);
return Collections.unmodifiableList(new ArrayList<>(namenodes));
}
private static class MockNamenodeContext
implements FederationNamenodeContext {
private String namenodeId;
private String nameserviceId;
private String webAddress;
private String rpcAddress;
private String serviceAddress;
private String lifelineAddress;
private FederationNamenodeServiceState state;
private long dateModified;
MockNamenodeContext(
String rpc, String service, String lifeline, String web,
String ns, String nn, FederationNamenodeServiceState state) {
this.rpcAddress = rpc;
this.serviceAddress = service;
this.lifelineAddress = lifeline;
this.webAddress = web;
this.namenodeId = nn;
this.nameserviceId = ns;
this.state = state;
this.dateModified = Time.now();
}
public void setState(FederationNamenodeServiceState newState) {
this.state = newState;
this.dateModified = Time.now();
}
@Override
public String getRpcAddress() {
return rpcAddress;
}
@Override
public String getServiceAddress() {
return serviceAddress;
}
@Override
public String getLifelineAddress() {
return lifelineAddress;
}
@Override
public String getWebAddress() {
return webAddress;
}
@Override
public String getNamenodeKey() {
return nameserviceId + " " + namenodeId + " " + rpcAddress;
}
@Override
public String getNameserviceId() {
return nameserviceId;
}
@Override
public String getNamenodeId() {
return namenodeId;
}
@Override
public FederationNamenodeServiceState getState() {
return state;
}
@Override
public long getDateModified() {
return dateModified;
}
}
@Override
public synchronized boolean registerNamenode(NamenodeStatusReport report)
throws IOException {
MockNamenodeContext context = new MockNamenodeContext(
report.getRpcAddress(), report.getServiceAddress(),
report.getLifelineAddress(), report.getWebAddress(),
report.getNameserviceId(), report.getNamenodeId(), report.getState());
String nsId = report.getNameserviceId();
String bpId = report.getBlockPoolId();
String cId = report.getClusterId();
@SuppressWarnings("unchecked")
List<MockNamenodeContext> existingItems =
(List<MockNamenodeContext>) this.resolver.get(nsId);
if (existingItems == null) {
existingItems = new ArrayList<>();
this.resolver.put(bpId, existingItems);
this.resolver.put(nsId, existingItems);
}
boolean added = false;
for (int i=0; i<existingItems.size() && !added; i++) {
MockNamenodeContext existing = existingItems.get(i);
if (existing.getNamenodeKey().equals(context.getNamenodeKey())) {
existingItems.set(i, context);
added = true;
}
}
if (!added) {
existingItems.add(context);
}
Collections.sort(existingItems, new NamenodePriorityComparator());
FederationNamespaceInfo info = new FederationNamespaceInfo(bpId, cId, nsId);
this.namespaces.add(info);
return true;
}
@Override
public Set<FederationNamespaceInfo> getNamespaces() throws IOException {
return this.namespaces;
}
@Override
public Set<String> getDisabledNamespaces() throws IOException {
return new TreeSet<>();
}
@Override
public PathLocation getDestinationForPath(String path) throws IOException {
List<RemoteLocation> remoteLocations = new LinkedList<>();
// We go from the leaves to the root
List<String> keys = new ArrayList<>(this.locations.keySet());
Collections.sort(keys, Collections.reverseOrder());
for (String key : keys) {
if (path.startsWith(key)) {
for (RemoteLocation location : this.locations.get(key)) {
String finalPath = location.getDest();
String extraPath = path.substring(key.length());
if (finalPath.endsWith("/") && extraPath.startsWith("/")) {
extraPath = extraPath.substring(1);
}
finalPath += extraPath;
String nameservice = location.getNameserviceId();
RemoteLocation remoteLocation =
new RemoteLocation(nameservice, finalPath, path);
remoteLocations.add(remoteLocation);
}
break;
}
}
if (remoteLocations.isEmpty()) {
// Path isn't supported, mimic resolver behavior.
return null;
}
return new PathLocation(path, remoteLocations);
}
@Override
public List<String> getMountPoints(String path) throws IOException {
List<String> mounts = new ArrayList<>();
if (path.equals("/")) {
// Mounts only supported under root level
for (String mount : this.locations.keySet()) {
if (mount.length() > 1) {
// Remove leading slash, this is the behavior of the mount tree,
// return only names.
mounts.add(mount.replace("/", ""));
}
}
}
return mounts;
}
@Override
public void setRouterId(String router) {
}
@Override
public String getDefaultNamespace() {
return defaultNamespace;
}
}