| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.federation; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeSet; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; |
| import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; |
| import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; |
| import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; |
| import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; |
| import org.apache.hadoop.hdfs.server.federation.resolver.NamenodePriorityComparator; |
| import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; |
| import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; |
| import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; |
| import org.apache.hadoop.hdfs.server.federation.router.Router; |
| import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; |
| import org.apache.hadoop.util.Time; |
| |
| /** |
| * In-memory cache/mock of a namenode and file resolver. Stores the most |
| * recently updated NN information for each nameservice and block pool. It also |
| * stores a virtual mount table for resolving global namespace paths to local NN |
| * paths. |
| */ |
| public class MockResolver |
| implements ActiveNamenodeResolver, FileSubclusterResolver { |
| |
| private Map<String, List<? extends FederationNamenodeContext>> resolver = |
| new HashMap<>(); |
| private Map<String, List<RemoteLocation>> locations = new HashMap<>(); |
| private Set<FederationNamespaceInfo> namespaces = new HashSet<>(); |
| private String defaultNamespace = null; |
| private boolean disableDefaultNamespace = false; |
| private volatile boolean disableRegistration = false; |
| private TreeSet<String> disableNamespaces = new TreeSet<>(); |
| |
| public MockResolver() { |
| this.cleanRegistrations(); |
| } |
| |
| public MockResolver(Configuration conf) { |
| this(); |
| } |
| |
| public MockResolver(Configuration conf, StateStoreService store) { |
| this(); |
| } |
| |
| public MockResolver(Configuration conf, Router router) { |
| this(); |
| } |
| |
| public void addLocation(String mount, String nsId, String location) { |
| List<RemoteLocation> locationsList = this.locations.get(mount); |
| if (locationsList == null) { |
| locationsList = new LinkedList<>(); |
| this.locations.put(mount, locationsList); |
| } |
| |
| final RemoteLocation remoteLocation = |
| new RemoteLocation(nsId, location, mount); |
| if (!locationsList.contains(remoteLocation)) { |
| locationsList.add(remoteLocation); |
| } |
| |
| if (this.defaultNamespace == null) { |
| this.defaultNamespace = nsId; |
| } |
| } |
| |
| public boolean removeLocation(String mount, String nsId, String location) { |
| List<RemoteLocation> locationsList = this.locations.get(mount); |
| final RemoteLocation remoteLocation = |
| new RemoteLocation(nsId, location, mount); |
| if (locationsList != null) { |
| return locationsList.remove(remoteLocation); |
| } |
| return false; |
| } |
| |
| public synchronized void cleanRegistrations() { |
| this.resolver = new HashMap<>(); |
| this.namespaces = new HashSet<>(); |
| } |
| |
| /* |
| * Disable NameNode auto registration for test. This method usually used after |
| * {@link MockResolver#cleanRegistrations()}, and before {@link |
| * MockResolver#registerNamenode()} |
| */ |
| public void setDisableRegistration(boolean isDisable) { |
| disableRegistration = isDisable; |
| } |
| |
| @Override public void updateUnavailableNamenode(String ns, |
| InetSocketAddress failedAddress) throws IOException { |
| updateNameNodeState(ns, failedAddress, |
| FederationNamenodeServiceState.UNAVAILABLE); |
| } |
| |
| @Override |
| public void updateActiveNamenode( |
| String nsId, InetSocketAddress successfulAddress) { |
| updateNameNodeState(nsId, successfulAddress, |
| FederationNamenodeServiceState.ACTIVE); |
| } |
| |
| private void updateNameNodeState(String nsId, |
| InetSocketAddress iAddr, |
| FederationNamenodeServiceState state) { |
| String sAddress = iAddr.getHostName() + ":" + |
| iAddr.getPort(); |
| String key = nsId; |
| if (key != null) { |
| // Update the active entry |
| @SuppressWarnings("unchecked") |
| List<FederationNamenodeContext> namenodes = |
| (List<FederationNamenodeContext>) this.resolver.get(key); |
| for (FederationNamenodeContext namenode : namenodes) { |
| if (namenode.getRpcAddress().equals(sAddress)) { |
| MockNamenodeContext nn = (MockNamenodeContext) namenode; |
| nn.setState(state); |
| break; |
| } |
| } |
| // This operation modifies the list, so we need to be careful |
| synchronized(namenodes) { |
| Collections.sort(namenodes, new NamenodePriorityComparator()); |
| } |
| } |
| } |
| |
| @Override |
| public synchronized List<? extends FederationNamenodeContext> |
| getNamenodesForNameserviceId(String nameserviceId, boolean observerRead) { |
| // Return a copy of the list because it is updated periodically |
| List<? extends FederationNamenodeContext> namenodes = |
| this.resolver.get(nameserviceId); |
| if (namenodes == null) { |
| namenodes = new ArrayList<>(); |
| } |
| |
| List<FederationNamenodeContext> ret = new ArrayList<>(); |
| |
| if (observerRead) { |
| Iterator<? extends FederationNamenodeContext> iterator = namenodes |
| .iterator(); |
| List<FederationNamenodeContext> observerNN = new ArrayList<>(); |
| List<FederationNamenodeContext> nonObserverNN = new ArrayList<>(); |
| while (iterator.hasNext()) { |
| FederationNamenodeContext membership = iterator.next(); |
| if (membership.getState() == FederationNamenodeServiceState.OBSERVER) { |
| observerNN.add(membership); |
| } else { |
| nonObserverNN.add(membership); |
| } |
| } |
| Collections.shuffle(observerNN); |
| Collections.sort(nonObserverNN, new NamenodePriorityComparator()); |
| ret.addAll(observerNN); |
| ret.addAll(nonObserverNN); |
| } else { |
| ret.addAll(namenodes); |
| Collections.sort(ret, new NamenodePriorityComparator()); |
| } |
| |
| return Collections.unmodifiableList(ret); |
| } |
| |
| @Override |
| public synchronized List<? extends FederationNamenodeContext> |
| getNamenodesForBlockPoolId(String blockPoolId) { |
| // Return a copy of the list because it is updated periodically |
| List<? extends FederationNamenodeContext> namenodes = |
| this.resolver.get(blockPoolId); |
| return Collections.unmodifiableList(new ArrayList<>(namenodes)); |
| } |
| |
| @SuppressWarnings("checkstyle:ParameterNumber") |
| private static class MockNamenodeContext |
| implements FederationNamenodeContext { |
| |
| private String namenodeId; |
| private String nameserviceId; |
| |
| private String webScheme; |
| private String webAddress; |
| private String rpcAddress; |
| private String serviceAddress; |
| private String lifelineAddress; |
| |
| private FederationNamenodeServiceState state; |
| private long dateModified; |
| |
| MockNamenodeContext( |
| String rpc, String service, String lifeline, String scheme, String web, |
| String ns, String nn, FederationNamenodeServiceState state) { |
| this.rpcAddress = rpc; |
| this.serviceAddress = service; |
| this.lifelineAddress = lifeline; |
| this.webScheme = scheme; |
| this.webAddress = web; |
| this.namenodeId = nn; |
| this.nameserviceId = ns; |
| this.state = state; |
| this.dateModified = Time.now(); |
| } |
| |
| public void setState(FederationNamenodeServiceState newState) { |
| this.state = newState; |
| this.dateModified = Time.now(); |
| } |
| |
| @Override |
| public String getRpcAddress() { |
| return rpcAddress; |
| } |
| |
| @Override |
| public String getServiceAddress() { |
| return serviceAddress; |
| } |
| |
| @Override |
| public String getLifelineAddress() { |
| return lifelineAddress; |
| } |
| |
| @Override |
| public String getWebScheme() { |
| return webScheme; |
| } |
| |
| @Override |
| public String getWebAddress() { |
| return webAddress; |
| } |
| |
| @Override |
| public String getNamenodeKey() { |
| return nameserviceId + " " + namenodeId + " " + rpcAddress; |
| } |
| |
| @Override |
| public String getNameserviceId() { |
| return nameserviceId; |
| } |
| |
| @Override |
| public String getNamenodeId() { |
| return namenodeId; |
| } |
| |
| @Override |
| public FederationNamenodeServiceState getState() { |
| return state; |
| } |
| |
| @Override |
| public long getDateModified() { |
| return dateModified; |
| } |
| } |
| |
| @Override |
| public synchronized boolean registerNamenode(NamenodeStatusReport report) |
| throws IOException { |
| if (disableRegistration) { |
| return false; |
| } |
| |
| MockNamenodeContext context = new MockNamenodeContext( |
| report.getRpcAddress(), report.getServiceAddress(), |
| report.getLifelineAddress(), report.getWebScheme(), |
| report.getWebAddress(), report.getNameserviceId(), |
| report.getNamenodeId(), report.getState()); |
| |
| String nsId = report.getNameserviceId(); |
| String bpId = report.getBlockPoolId(); |
| String cId = report.getClusterId(); |
| |
| @SuppressWarnings("unchecked") |
| List<MockNamenodeContext> existingItems = |
| (List<MockNamenodeContext>) this.resolver.get(nsId); |
| if (existingItems == null) { |
| existingItems = new ArrayList<>(); |
| this.resolver.put(bpId, existingItems); |
| this.resolver.put(nsId, existingItems); |
| } |
| boolean added = false; |
| for (int i=0; i<existingItems.size() && !added; i++) { |
| MockNamenodeContext existing = existingItems.get(i); |
| if (existing.getNamenodeKey().equals(context.getNamenodeKey())) { |
| existingItems.set(i, context); |
| added = true; |
| } |
| } |
| if (!added) { |
| existingItems.add(context); |
| } |
| Collections.sort(existingItems, new NamenodePriorityComparator()); |
| |
| FederationNamespaceInfo info = new FederationNamespaceInfo(bpId, cId, nsId); |
| this.namespaces.add(info); |
| return true; |
| } |
| |
| @Override |
| public synchronized Set<FederationNamespaceInfo> getNamespaces() |
| throws IOException { |
| return Collections.unmodifiableSet(this.namespaces); |
| } |
| |
| public void clearDisableNamespaces() { |
| this.disableNamespaces.clear(); |
| } |
| |
| public void disableNamespace(String nsId) { |
| this.disableNamespaces.add(nsId); |
| } |
| |
| @Override |
| public Set<String> getDisabledNamespaces() throws IOException { |
| return this.disableNamespaces; |
| } |
| |
| @Override |
| public PathLocation getDestinationForPath(String path) throws IOException { |
| List<RemoteLocation> remoteLocations = new LinkedList<>(); |
| // We go from the leaves to the root |
| List<String> keys = new ArrayList<>(this.locations.keySet()); |
| Collections.sort(keys, Collections.reverseOrder()); |
| for (String key : keys) { |
| if (path.startsWith(key)) { |
| for (RemoteLocation location : this.locations.get(key)) { |
| String finalPath = location.getDest(); |
| String extraPath = path.substring(key.length()); |
| if (finalPath.endsWith("/") && extraPath.startsWith("/")) { |
| extraPath = extraPath.substring(1); |
| } |
| finalPath += extraPath; |
| String nameservice = location.getNameserviceId(); |
| RemoteLocation remoteLocation = |
| new RemoteLocation(nameservice, finalPath, path); |
| remoteLocations.add(remoteLocation); |
| } |
| break; |
| } |
| } |
| if (remoteLocations.isEmpty()) { |
| // Path isn't supported, mimic resolver behavior. |
| return null; |
| } |
| return new PathLocation(path, remoteLocations); |
| } |
| |
| @Override |
| public List<String> getMountPoints(String path) throws IOException { |
| List<String> mountPoints = new ArrayList<>(); |
| for (String mp : this.locations.keySet()) { |
| if (mp.startsWith(path)) { |
| mountPoints.add(mp); |
| } |
| } |
| return FileSubclusterResolver.getMountPoints(path, mountPoints); |
| } |
| |
| @Override |
| public void setRouterId(String router) { |
| } |
| |
| /** |
| * Mocks the availability of default namespace. |
| * @param b if true default namespace is unset. |
| */ |
| public void setDisableNamespace(boolean b) { |
| this.disableDefaultNamespace = b; |
| } |
| |
| @Override |
| public String getDefaultNamespace() { |
| if (disableDefaultNamespace) { |
| return ""; |
| } |
| return defaultNamespace; |
| } |
| } |