blob: afc49c76f09b49d4c73dd378ab67b83c87396076 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HostAndPort;
/**
* The local subcluster (where the writer is) should be tried first. The writer
* is defined from the RPC query received in the RPC server.
*/
public class LocalResolver extends RouterResolver<String, String> {
private static final Logger LOG =
LoggerFactory.getLogger(LocalResolver.class);
public LocalResolver(final Configuration conf, final Router routerService) {
super(conf, routerService);
}
/**
* Get the mapping from nodes to subcluster. It gets this mapping from the
* subclusters through expensive calls (e.g., RPC) and uses caching to avoid
* too many calls. The cache might be updated asynchronously to reduce
* latency.
*
* @return Node IP -> Subcluster.
*/
@Override
protected Map<String, String> getSubclusterInfo(
MembershipStore membershipStore) {
Map<String, String> mapping = new HashMap<>();
Map<String, String> dnSubcluster = getDatanodesSubcluster();
if (dnSubcluster != null) {
mapping.putAll(dnSubcluster);
}
Map<String, String> nnSubcluster = getNamenodesSubcluster(membershipStore);
if (nnSubcluster != null) {
mapping.putAll(nnSubcluster);
}
return mapping;
}
/**
* Get the local name space. This relies on the RPC Server to get the address
* from the client.
*
* TODO we only support DN and NN locations, we need to add others like
* Resource Managers.
*
* @param path Path ignored by this policy.
* @param loc Federated location with multiple destinations.
* @return Local name space. Null if we don't know about this machine.
*/
@Override
protected String chooseFirstNamespace(String path, PathLocation loc) {
String localSubcluster = null;
String clientAddr = getClientAddr();
Map<String, String> subclusterInfo = getSubclusterMapping();
if (subclusterInfo != null) {
localSubcluster = subclusterInfo.get(clientAddr);
if (localSubcluster != null) {
LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster);
} else {
LOG.error("Cannot get local namespace for {}", clientAddr);
}
} else {
LOG.error("Cannot get node mapping when resolving {} at {} from {}",
path, loc, clientAddr);
}
return localSubcluster;
}
@VisibleForTesting
String getClientAddr() {
return Server.getRemoteAddress();
}
/**
* Get the Datanode mapping from the subclusters from the Namenodes. This
* needs to be done as a privileged action to use the user for the Router and
* not the one from the client in the RPC call.
*
* @return DN IP -> Subcluster.
*/
private Map<String, String> getDatanodesSubcluster() {
final RouterRpcServer rpcServer = getRpcServer();
if (rpcServer == null) {
LOG.error("Cannot access the Router RPC server");
return null;
}
Map<String, String> ret = new HashMap<>();
try {
// We need to get the DNs as a privileged user
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
Map<String, DatanodeStorageReport[]> dnMap = loginUser.doAs(
new PrivilegedAction<Map<String, DatanodeStorageReport[]>>() {
@Override
public Map<String, DatanodeStorageReport[]> run() {
try {
return rpcServer.getDatanodeStorageReportMap(
DatanodeReportType.ALL);
} catch (IOException e) {
LOG.error("Cannot get the datanodes from the RPC server", e);
return null;
}
}
});
for (Entry<String, DatanodeStorageReport[]> entry : dnMap.entrySet()) {
String nsId = entry.getKey();
DatanodeStorageReport[] dns = entry.getValue();
for (DatanodeStorageReport dn : dns) {
DatanodeInfo dnInfo = dn.getDatanodeInfo();
String ipAddr = dnInfo.getIpAddr();
ret.put(ipAddr, nsId);
}
}
} catch (IOException e) {
LOG.error("Cannot get Datanodes from the Namenodes: {}", e.getMessage());
}
return ret;
}
/**
* Get the Namenode mapping from the subclusters from the Membership store. As
* the Routers are usually co-located with Namenodes, we also check for the
* local address for this Router here.
*
* @return NN IP -> Subcluster.
*/
private Map<String, String> getNamenodesSubcluster(
MembershipStore membershipStore) {
// Manage requests from this hostname (127.0.0.1)
String localIp = "127.0.0.1";
String localHostname = localIp;
try {
localHostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOG.error("Cannot get local host name");
}
Map<String, String> ret = new HashMap<>();
try {
// Get the values from the store
GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance();
GetNamenodeRegistrationsResponse response =
membershipStore.getNamenodeRegistrations(request);
final List<MembershipState> nns = response.getNamenodeMemberships();
for (MembershipState nn : nns) {
try {
String nsId = nn.getNameserviceId();
String rpcAddress = nn.getRpcAddress();
String hostname = HostAndPort.fromString(rpcAddress).getHostText();
ret.put(hostname, nsId);
if (hostname.equals(localHostname)) {
ret.put(localIp, nsId);
}
InetAddress addr = InetAddress.getByName(hostname);
String ipAddr = addr.getHostAddress();
ret.put(ipAddr, nsId);
} catch (Exception e) {
LOG.error("Cannot get address for {}: {}", nn, e.getMessage());
}
}
} catch (IOException ioe) {
LOG.error("Cannot get Namenodes from the State Store", ioe);
}
return ret;
}
}