| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.federation.resolver; |
| |
| import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.ACTIVE; |
| import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.EXPIRED; |
| import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.OBSERVER; |
| import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.UNAVAILABLE; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore; |
| import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; |
| import org.apache.hadoop.hdfs.server.federation.store.RecordStore; |
| import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache; |
| import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; |
| import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; |
| import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; |
| import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.util.Time; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Implements a cached lookup of the most recently active namenode for a |
| * particular nameservice. Relies on the {@link StateStoreService} to |
| * discover available nameservices and namenodes. |
| */ |
| public class MembershipNamenodeResolver |
| implements ActiveNamenodeResolver, StateStoreCache { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(MembershipNamenodeResolver.class); |
| |
| /** Reference to the State Store. */ |
| private final StateStoreService stateStore; |
| /** Membership State Store interface. */ |
| private MembershipStore membershipInterface; |
| /** Disabled Nameservice State Store interface. */ |
| private DisabledNameserviceStore disabledNameserviceInterface; |
| |
| /** Parent router ID. */ |
| private String routerId; |
| |
| /** Cached lookup of namenodes for nameservice. The keys are a pair of the nameservice |
| * name and a boolean indicating if observer namenodes should be listed first. |
| * If true, observer namenodes are listed first. If false, active namenodes are listed first. |
| * Invalidated on cache refresh. */ |
| private Map<Pair<String,Boolean>, List<? extends FederationNamenodeContext>> cacheNS; |
| /** Cached lookup of NN for block pool. Invalidated on cache refresh. */ |
| private Map<String, List<? extends FederationNamenodeContext>> cacheBP; |
| |
| |
| public MembershipNamenodeResolver( |
| Configuration conf, StateStoreService store) throws IOException { |
| this.stateStore = store; |
| |
| this.cacheNS = new ConcurrentHashMap<>(); |
| this.cacheBP = new ConcurrentHashMap<>(); |
| |
| if (this.stateStore != null) { |
| // Request cache updates from the state store |
| this.stateStore.registerCacheExternal(this); |
| } |
| } |
| |
| private synchronized MembershipStore getMembershipStore() throws IOException { |
| if (this.membershipInterface == null) { |
| this.membershipInterface = getStoreInterface(MembershipStore.class); |
| } |
| return this.membershipInterface; |
| } |
| |
| private synchronized DisabledNameserviceStore getDisabledNameserviceStore() |
| throws IOException { |
| if (this.disabledNameserviceInterface == null) { |
| this.disabledNameserviceInterface = |
| getStoreInterface(DisabledNameserviceStore.class); |
| } |
| return this.disabledNameserviceInterface; |
| } |
| |
| private <T extends RecordStore<?>> T getStoreInterface(Class<T> clazz) |
| throws IOException{ |
| T store = this.stateStore.getRegisteredRecordStore(clazz); |
| if (store == null) { |
| throw new IOException("State Store does not have an interface for " + |
| clazz.getSimpleName()); |
| } |
| return store; |
| } |
| |
| @Override |
| public boolean loadCache(boolean force) { |
| // Our cache depends on the store, update it first |
| try { |
| MembershipStore membership = getMembershipStore(); |
| membership.loadCache(force); |
| DisabledNameserviceStore disabled = getDisabledNameserviceStore(); |
| disabled.loadCache(force); |
| } catch (IOException e) { |
| LOG.error("Cannot update membership from the State Store", e); |
| } |
| |
| // Force refresh of active NN cache |
| cacheBP.clear(); |
| cacheNS.clear(); |
| return true; |
| } |
| |
| @Override public void updateUnavailableNamenode(String nsId, |
| InetSocketAddress address) throws IOException { |
| updateNameNodeState(nsId, address, UNAVAILABLE); |
| } |
| |
| @Override |
| public void updateActiveNamenode( |
| final String nsId, final InetSocketAddress address) throws IOException { |
| updateNameNodeState(nsId, address, ACTIVE); |
| } |
| |
| |
| private void updateNameNodeState(final String nsId, |
| final InetSocketAddress address, FederationNamenodeServiceState state) |
| throws IOException { |
| // Temporarily update our cache, it will be overwritten on the next update. |
| try { |
| MembershipState partial = MembershipState.newInstance(); |
| String rpcAddress = address.getHostName() + ":" + address.getPort(); |
| partial.setRpcAddress(rpcAddress); |
| partial.setNameserviceId(nsId); |
| |
| GetNamenodeRegistrationsRequest request = |
| GetNamenodeRegistrationsRequest.newInstance(partial); |
| |
| MembershipStore membership = getMembershipStore(); |
| GetNamenodeRegistrationsResponse response = |
| membership.getNamenodeRegistrations(request); |
| List<MembershipState> records = response.getNamenodeMemberships(); |
| |
| if (records != null && records.size() == 1) { |
| MembershipState record = records.get(0); |
| UpdateNamenodeRegistrationRequest updateRequest = |
| UpdateNamenodeRegistrationRequest.newInstance( |
| record.getNameserviceId(), record.getNamenodeId(), state); |
| membership.updateNamenodeRegistration(updateRequest); |
| |
| cacheNS.remove(Pair.of(nsId, Boolean.TRUE)); |
| cacheNS.remove(Pair.of(nsId, Boolean.FALSE)); |
| // Invalidating the full cacheBp since getting the blockpool id from |
| // namespace id is quite costly. |
| cacheBP.clear(); |
| } |
| } catch (StateStoreUnavailableException e) { |
| LOG.error("Cannot update {} as active, State Store unavailable", address); |
| } |
| } |
| |
| @Override |
| public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId( |
| final String nsId, boolean listObserversFirst) throws IOException { |
| |
| List<? extends FederationNamenodeContext> ret = cacheNS.get(Pair.of(nsId, listObserversFirst)); |
| if (ret != null) { |
| return ret; |
| } |
| |
| // Not cached, generate the value |
| final List<MembershipState> result; |
| try { |
| MembershipState partial = MembershipState.newInstance(); |
| partial.setNameserviceId(nsId); |
| GetNamenodeRegistrationsRequest request = |
| GetNamenodeRegistrationsRequest.newInstance(partial); |
| result = getRecentRegistrationForQuery(request, true, |
| false, listObserversFirst); |
| } catch (StateStoreUnavailableException e) { |
| LOG.error("Cannot get active NN for {}, State Store unavailable", nsId); |
| return null; |
| } |
| if (result == null || result.isEmpty()) { |
| LOG.error("Cannot locate eligible NNs for {}", nsId); |
| return null; |
| } |
| |
| // Mark disabled name services |
| try { |
| Set<String> disabled = |
| getDisabledNameserviceStore().getDisabledNameservices(); |
| if (disabled == null) { |
| LOG.error("Cannot get disabled name services"); |
| } else { |
| for (MembershipState nn : result) { |
| if (disabled.contains(nn.getNameserviceId())) { |
| nn.setState(FederationNamenodeServiceState.DISABLED); |
| } |
| } |
| } |
| } catch (StateStoreUnavailableException e) { |
| LOG.error("Cannot get disabled name services, State Store unavailable"); |
| } |
| |
| // Cache the response |
| ret = Collections.unmodifiableList(result); |
| cacheNS.put(Pair.of(nsId, listObserversFirst), result); |
| return ret; |
| } |
| |
| @Override |
| public List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId( |
| final String bpId) throws IOException { |
| |
| List<? extends FederationNamenodeContext> ret = cacheBP.get(bpId); |
| if (ret == null) { |
| try { |
| MembershipState partial = MembershipState.newInstance(); |
| partial.setBlockPoolId(bpId); |
| GetNamenodeRegistrationsRequest request = |
| GetNamenodeRegistrationsRequest.newInstance(partial); |
| |
| final List<MembershipState> result = |
| getRecentRegistrationForQuery(request, true, false, false); |
| if (result == null || result.isEmpty()) { |
| LOG.error("Cannot locate eligible NNs for {}", bpId); |
| } else { |
| cacheBP.put(bpId, result); |
| ret = result; |
| } |
| } catch (StateStoreUnavailableException e) { |
| LOG.error("Cannot get active NN for {}, State Store unavailable", bpId); |
| return null; |
| } |
| } |
| if (ret == null) { |
| return null; |
| } |
| return Collections.unmodifiableList(ret); |
| } |
| |
| @Override |
| public boolean registerNamenode(NamenodeStatusReport report) |
| throws IOException { |
| |
| if (this.routerId == null) { |
| LOG.warn("Cannot register namenode, router ID is not known {}", report); |
| return false; |
| } |
| |
| MembershipState record = MembershipState.newInstance( |
| routerId, report.getNameserviceId(), report.getNamenodeId(), |
| report.getClusterId(), report.getBlockPoolId(), |
| NetUtils.normalizeIP2HostName(report.getRpcAddress()), |
| report.getServiceAddress(), report.getLifelineAddress(), |
| report.getWebScheme(), report.getWebAddress(), report.getState(), |
| report.getSafemode()); |
| |
| if (report.statsValid()) { |
| MembershipStats stats = MembershipStats.newInstance(); |
| stats.setNumOfFiles(report.getNumFiles()); |
| stats.setNumOfBlocks(report.getNumBlocks()); |
| stats.setNumOfBlocksMissing(report.getNumBlocksMissing()); |
| stats.setNumOfBlocksPendingReplication( |
| report.getNumOfBlocksPendingReplication()); |
| stats.setNumOfBlocksUnderReplicated( |
| report.getNumOfBlocksUnderReplicated()); |
| stats.setNumOfBlocksPendingDeletion( |
| report.getNumOfBlocksPendingDeletion()); |
| stats.setAvailableSpace(report.getAvailableSpace()); |
| stats.setTotalSpace(report.getTotalSpace()); |
| stats.setProvidedSpace(report.getProvidedSpace()); |
| stats.setNumOfDecommissioningDatanodes( |
| report.getNumDecommissioningDatanodes()); |
| stats.setNumOfActiveDatanodes(report.getNumLiveDatanodes()); |
| stats.setNumOfDeadDatanodes(report.getNumDeadDatanodes()); |
| stats.setNumOfStaleDatanodes(report.getNumStaleDatanodes()); |
| stats.setNumOfDecomActiveDatanodes(report.getNumDecomLiveDatanodes()); |
| stats.setNumOfDecomDeadDatanodes(report.getNumDecomDeadDatanodes()); |
| stats.setNumOfInMaintenanceLiveDataNodes( |
| report.getNumInMaintenanceLiveDataNodes()); |
| stats.setNumOfInMaintenanceDeadDataNodes( |
| report.getNumInMaintenanceDeadDataNodes()); |
| stats.setNumOfEnteringMaintenanceDataNodes( |
| report.getNumEnteringMaintenanceDataNodes()); |
| stats.setCorruptFilesCount(report.getCorruptFilesCount()); |
| stats.setScheduledReplicationBlocks( |
| report.getScheduledReplicationBlocks()); |
| stats.setNumberOfMissingBlocksWithReplicationFactorOne( |
| report.getNumberOfMissingBlocksWithReplicationFactorOne()); |
| stats.setHighestPriorityLowRedundancyReplicatedBlocks( |
| report.getHighestPriorityLowRedundancyReplicatedBlocks()); |
| stats.setHighestPriorityLowRedundancyECBlocks( |
| report.getHighestPriorityLowRedundancyECBlocks()); |
| stats.setPendingSPSPaths(report.getPendingSPSPaths()); |
| record.setStats(stats); |
| } |
| |
| if (report.getState() != UNAVAILABLE) { |
| // Set/update our last contact time |
| record.setLastContact(Time.now()); |
| } |
| |
| NamenodeHeartbeatRequest request = NamenodeHeartbeatRequest.newInstance(); |
| request.setNamenodeMembership(record); |
| return getMembershipStore().namenodeHeartbeat(request).getResult(); |
| } |
| |
| @Override |
| public Set<FederationNamespaceInfo> getNamespaces() throws IOException { |
| GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); |
| GetNamespaceInfoResponse response = |
| getMembershipStore().getNamespaceInfo(request); |
| Set<FederationNamespaceInfo> nss = response.getNamespaceInfo(); |
| |
| // Filter disabled namespaces |
| Set<FederationNamespaceInfo> ret = new TreeSet<>(); |
| Set<String> disabled = getDisabledNamespaces(); |
| for (FederationNamespaceInfo ns : nss) { |
| if (!disabled.contains(ns.getNameserviceId())) { |
| ret.add(ns); |
| } |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| public Set<String> getDisabledNamespaces() throws IOException { |
| DisabledNameserviceStore store = getDisabledNameserviceStore(); |
| return store.getDisabledNameservices(); |
| } |
| |
| /** |
| * Picks the most relevant record registration that matches the query. |
| * If not observer read, |
| * return registrations matching the query in this preference: |
| * 1) Most recently updated ACTIVE registration |
| * 2) Most recently updated Observer registration |
| * 3) Most recently updated STANDBY registration (if showStandby) |
| * 4) Most recently updated UNAVAILABLE registration (if showUnavailable). |
| * |
| * If observer read, |
| * return registrations matching the query in this preference: |
| * 1) Observer registrations, shuffled to disperse queries. |
| * 2) Most recently updated ACTIVE registration |
| * 3) Most recently updated STANDBY registration (if showStandby) |
| * 4) Most recently updated UNAVAILABLE registration (if showUnavailable). |
| * |
| * EXPIRED registrations are ignored. |
| * |
| * @param request The select query for NN registrations. |
| * @param addUnavailable include UNAVAILABLE registrations. |
| * @param addExpired include EXPIRED registrations. |
| * @param observerRead Observer read case, observer NN will be ranked first |
| * @return List of memberships or null if no registrations that |
| * both match the query AND the selected states. |
| * @throws IOException |
| */ |
| private List<MembershipState> getRecentRegistrationForQuery( |
| GetNamenodeRegistrationsRequest request, boolean addUnavailable, |
| boolean addExpired, boolean observerRead) throws IOException { |
| |
| // Retrieve a list of all registrations that match this query. |
| // This may include all NN records for a namespace/blockpool, including |
| // duplicate records for the same NN from different routers. |
| MembershipStore membershipStore = getMembershipStore(); |
| GetNamenodeRegistrationsResponse response = |
| membershipStore.getNamenodeRegistrations(request); |
| |
| List<MembershipState> memberships = response.getNamenodeMemberships(); |
| List<MembershipState> observerMemberships = new ArrayList<>(); |
| Iterator<MembershipState> iterator = memberships.iterator(); |
| while (iterator.hasNext()) { |
| MembershipState membership = iterator.next(); |
| if (membership.getState() == EXPIRED && !addExpired) { |
| iterator.remove(); |
| } else if (membership.getState() == UNAVAILABLE && !addUnavailable) { |
| iterator.remove(); |
| } else if (membership.getState() == OBSERVER && observerRead) { |
| iterator.remove(); |
| observerMemberships.add(membership); |
| } |
| } |
| |
| memberships.sort(new NamenodePriorityComparator()); |
| if(observerRead) { |
| List<MembershipState> ret = new ArrayList<>( |
| memberships.size() + observerMemberships.size()); |
| if(observerMemberships.size() > 1) { |
| Collections.shuffle(observerMemberships); |
| } |
| ret.addAll(observerMemberships); |
| ret.addAll(memberships); |
| memberships = ret; |
| } |
| |
| LOG.debug("Selected most recent NN {} for query", memberships); |
| return memberships; |
| } |
| |
| @Override |
| public void setRouterId(String router) { |
| this.routerId = router; |
| } |
| } |