blob: 0950cde28de95c1d77e4978ea267dec3d477f094 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver;
import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.ACTIVE;
import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.EXPIRED;
import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.UNAVAILABLE;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements a cached lookup of the most recently active namenode for a
* particular nameservice. Relies on the {@link StateStoreService} to
* discover available nameservices and namenodes.
*/
public class MembershipNamenodeResolver
implements ActiveNamenodeResolver, StateStoreCache {
private static final Logger LOG =
LoggerFactory.getLogger(MembershipNamenodeResolver.class);
/** Reference to the State Store. */
private final StateStoreService stateStore;
/** Membership State Store interface. */
private MembershipStore membershipInterface;
/** Parent router ID. */
private String routerId;
/** Cached lookup of NN for nameservice. Invalidated on cache refresh. */
private Map<String, List<? extends FederationNamenodeContext>> cacheNS;
/** Cached lookup of NN for block pool. Invalidated on cache refresh. */
private Map<String, List<? extends FederationNamenodeContext>> cacheBP;
public MembershipNamenodeResolver(
Configuration conf, StateStoreService store) throws IOException {
this.stateStore = store;
this.cacheNS = new ConcurrentHashMap<>();
this.cacheBP = new ConcurrentHashMap<>();
if (this.stateStore != null) {
// Request cache updates from the state store
this.stateStore.registerCacheExternal(this);
}
}
private synchronized MembershipStore getMembershipStore() throws IOException {
if (this.membershipInterface == null) {
this.membershipInterface = this.stateStore.getRegisteredRecordStore(
MembershipStore.class);
if (this.membershipInterface == null) {
throw new IOException("State Store does not have an interface for " +
MembershipStore.class.getSimpleName());
}
}
return this.membershipInterface;
}
@Override
public boolean loadCache(boolean force) {
// Our cache depends on the store, update it first
try {
MembershipStore membership = getMembershipStore();
membership.loadCache(force);
} catch (IOException e) {
LOG.error("Cannot update membership from the State Store", e);
}
// Force refresh of active NN cache
cacheBP.clear();
cacheNS.clear();
return true;
}
@Override
public void updateActiveNamenode(
final String nsId, final InetSocketAddress address) throws IOException {
// Called when we have an RPC miss and successful hit on an alternate NN.
// Temporarily update our cache, it will be overwritten on the next update.
try {
MembershipState partial = MembershipState.newInstance();
String rpcAddress = address.getHostName() + ":" + address.getPort();
partial.setRpcAddress(rpcAddress);
partial.setNameserviceId(nsId);
GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance(partial);
MembershipStore membership = getMembershipStore();
GetNamenodeRegistrationsResponse response =
membership.getNamenodeRegistrations(request);
List<MembershipState> records = response.getNamenodeMemberships();
if (records != null && records.size() == 1) {
MembershipState record = records.get(0);
UpdateNamenodeRegistrationRequest updateRequest =
UpdateNamenodeRegistrationRequest.newInstance(
record.getNameserviceId(), record.getNamenodeId(), ACTIVE);
membership.updateNamenodeRegistration(updateRequest);
}
} catch (StateStoreUnavailableException e) {
LOG.error("Cannot update {} as active, State Store unavailable", address);
}
}
@Override
public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
final String nsId) throws IOException {
List<? extends FederationNamenodeContext> ret = cacheNS.get(nsId);
if (ret == null) {
try {
MembershipState partial = MembershipState.newInstance();
partial.setNameserviceId(nsId);
GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance(partial);
final List<MembershipState> result =
getRecentRegistrationForQuery(request, true, false);
if (result == null || result.isEmpty()) {
LOG.error("Cannot locate eligible NNs for {}", nsId);
return null;
} else {
cacheNS.put(nsId, result);
ret = result;
}
} catch (StateStoreUnavailableException e) {
LOG.error("Cannot get active NN for {}, State Store unavailable", nsId);
}
}
if (ret == null) {
return null;
}
return Collections.unmodifiableList(ret);
}
@Override
public List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
final String bpId) throws IOException {
List<? extends FederationNamenodeContext> ret = cacheBP.get(bpId);
if (ret == null) {
try {
MembershipState partial = MembershipState.newInstance();
partial.setBlockPoolId(bpId);
GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance(partial);
final List<MembershipState> result =
getRecentRegistrationForQuery(request, true, false);
if (result == null || result.isEmpty()) {
LOG.error("Cannot locate eligible NNs for {}", bpId);
} else {
cacheBP.put(bpId, result);
ret = result;
}
} catch (StateStoreUnavailableException e) {
LOG.error("Cannot get active NN for {}, State Store unavailable", bpId);
return null;
}
}
if (ret == null) {
return null;
}
return Collections.unmodifiableList(ret);
}
@Override
public boolean registerNamenode(NamenodeStatusReport report)
throws IOException {
if (this.routerId == null) {
LOG.warn("Cannot register namenode, router ID is not known {}", report);
return false;
}
MembershipState record = MembershipState.newInstance(
routerId, report.getNameserviceId(), report.getNamenodeId(),
report.getClusterId(), report.getBlockPoolId(), report.getRpcAddress(),
report.getServiceAddress(), report.getLifelineAddress(),
report.getWebAddress(), report.getState(), report.getSafemode());
if (report.statsValid()) {
MembershipStats stats = MembershipStats.newInstance();
stats.setNumOfFiles(report.getNumFiles());
stats.setNumOfBlocks(report.getNumBlocks());
stats.setNumOfBlocksMissing(report.getNumBlocksMissing());
stats.setNumOfBlocksPendingReplication(
report.getNumOfBlocksPendingReplication());
stats.setNumOfBlocksUnderReplicated(
report.getNumOfBlocksUnderReplicated());
stats.setNumOfBlocksPendingDeletion(
report.getNumOfBlocksPendingDeletion());
stats.setAvailableSpace(report.getAvailableSpace());
stats.setTotalSpace(report.getTotalSpace());
stats.setNumOfDecommissioningDatanodes(
report.getNumDecommissioningDatanodes());
stats.setNumOfActiveDatanodes(report.getNumLiveDatanodes());
stats.setNumOfDeadDatanodes(report.getNumDeadDatanodes());
stats.setNumOfDecomActiveDatanodes(report.getNumDecomLiveDatanodes());
stats.setNumOfDecomDeadDatanodes(report.getNumDecomDeadDatanodes());
record.setStats(stats);
}
if (report.getState() != UNAVAILABLE) {
// Set/update our last contact time
record.setLastContact(Time.now());
}
NamenodeHeartbeatRequest request = NamenodeHeartbeatRequest.newInstance();
request.setNamenodeMembership(record);
return getMembershipStore().namenodeHeartbeat(request).getResult();
}
@Override
public Set<FederationNamespaceInfo> getNamespaces() throws IOException {
GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
GetNamespaceInfoResponse response =
getMembershipStore().getNamespaceInfo(request);
return response.getNamespaceInfo();
}
/**
* Picks the most relevant record registration that matches the query. Return
* registrations matching the query in this preference: 1) Most recently
* updated ACTIVE registration 2) Most recently updated STANDBY registration
* (if showStandby) 3) Most recently updated UNAVAILABLE registration (if
* showUnavailable). EXPIRED registrations are ignored.
*
* @param query The select query for NN registrations.
* @param excludes List of NNs to exclude from matching results.
* @param addUnavailable include UNAVAILABLE registrations.
* @param addExpired include EXPIRED registrations.
* @return List of memberships or null if no registrations that
* both match the query AND the selected states.
* @throws IOException
*/
private List<MembershipState> getRecentRegistrationForQuery(
GetNamenodeRegistrationsRequest request, boolean addUnavailable,
boolean addExpired) throws IOException {
// Retrieve a list of all registrations that match this query.
// This may include all NN records for a namespace/blockpool, including
// duplicate records for the same NN from different routers.
MembershipStore membershipStore = getMembershipStore();
GetNamenodeRegistrationsResponse response =
membershipStore.getNamenodeRegistrations(request);
List<MembershipState> memberships = response.getNamenodeMemberships();
if (!addExpired || !addUnavailable) {
Iterator<MembershipState> iterator = memberships.iterator();
while (iterator.hasNext()) {
MembershipState membership = iterator.next();
if (membership.getState() == EXPIRED && !addExpired) {
iterator.remove();
} else if (membership.getState() == UNAVAILABLE && !addUnavailable) {
iterator.remove();
}
}
}
List<MembershipState> priorityList = new ArrayList<>();
priorityList.addAll(memberships);
Collections.sort(priorityList, new NamenodePriorityComparator());
LOG.debug("Selected most recent NN {} for query", priorityList);
return priorityList;
}
@Override
public void setRouterId(String router) {
this.routerId = router;
}
}