blob: 443b9a7d1081624dabd5449ebdf6142268f9aebf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.newActiveNamenodeResolver;
import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.newFileSubclusterResolver;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Router that provides a unified view of multiple federated HDFS clusters. It
* has two main roles: (1) federated interface and (2) NameNode heartbeat.
* <p>
* For the federated interface, the Router receives a client request, checks the
* State Store for the correct subcluster, and forwards the request to the
* active Namenode of that subcluster. The reply from the Namenode then flows in
* the opposite direction. The Routers are stateless and can be behind a load
* balancer. HDFS clients connect to the router using the same interfaces as are
* used to communicate with a namenode, namely the ClientProtocol RPC interface
* and the WebHdfs HTTP interface exposed by the router. {@link RouterRpcServer}
* {@link RouterHttpServer}
* <p>
* For NameNode heartbeat, the Router periodically checks the state of a
* NameNode (usually on the same server) and reports their high availability
* (HA) state and load/space status to the State Store. Note that this is an
* optional role as a Router can be independent of any subcluster.
* {@link StateStoreService} {@link NamenodeHeartbeatService}
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class Router extends CompositeService {
private static final Logger LOG = LoggerFactory.getLogger(Router.class);
/** Configuration for the Router. */
private Configuration conf;
/** Router address/identifier. */
private String routerId;
/** RPC interface to the client. */
private RouterRpcServer rpcServer;
private InetSocketAddress rpcAddress;
/** RPC interface for the admin. */
private RouterAdminServer adminServer;
private InetSocketAddress adminAddress;
/** HTTP interface and web application. */
private RouterHttpServer httpServer;
/** Interface with the State Store. */
private StateStoreService stateStore;
/** Interface to map global name space to HDFS subcluster name spaces. */
private FileSubclusterResolver subclusterResolver;
/** Interface to identify the active NN for a nameservice or blockpool ID. */
private ActiveNamenodeResolver namenodeResolver;
/** Updates the namenode status in the namenode resolver. */
private Collection<NamenodeHeartbeatService> namenodeHearbeatServices;
/** Router metrics. */
private RouterMetricsService metrics;
/** JVM pauses (GC and others). */
private JvmPauseMonitor pauseMonitor;
/////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////
public Router() {
super(Router.class.getName());
}
/////////////////////////////////////////////////////////
// Service management
/////////////////////////////////////////////////////////
@Override
protected void serviceInit(Configuration configuration) throws Exception {
this.conf = configuration;
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
DFSConfigKeys.DFS_ROUTER_STORE_ENABLE_DEFAULT)) {
// Service that maintains the State Store connection
this.stateStore = new StateStoreService();
addService(this.stateStore);
}
// Resolver to track active NNs
this.namenodeResolver = newActiveNamenodeResolver(
this.conf, this.stateStore);
if (this.namenodeResolver == null) {
throw new IOException("Cannot find namenode resolver.");
}
// Lookup interface to map between the global and subcluster name spaces
this.subclusterResolver = newFileSubclusterResolver(this.conf, this);
if (this.subclusterResolver == null) {
throw new IOException("Cannot find subcluster resolver");
}
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_RPC_ENABLE,
DFSConfigKeys.DFS_ROUTER_RPC_ENABLE_DEFAULT)) {
// Create RPC server
this.rpcServer = createRpcServer();
addService(this.rpcServer);
this.setRpcServerAddress(rpcServer.getRpcAddress());
}
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE,
DFSConfigKeys.DFS_ROUTER_ADMIN_ENABLE_DEFAULT)) {
// Create admin server
this.adminServer = createAdminServer();
addService(this.adminServer);
}
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_HTTP_ENABLE,
DFSConfigKeys.DFS_ROUTER_HTTP_ENABLE_DEFAULT)) {
// Create HTTP server
this.httpServer = createHttpServer();
addService(this.httpServer);
}
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
// Create status updater for each monitored Namenode
this.namenodeHearbeatServices = createNamenodeHearbeatServices();
for (NamenodeHeartbeatService hearbeatService :
this.namenodeHearbeatServices) {
addService(hearbeatService);
}
if (this.namenodeHearbeatServices.isEmpty()) {
LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
}
}
// Router metrics system
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE,
DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE_DEFAULT)) {
DefaultMetricsSystem.initialize("Router");
this.metrics = new RouterMetricsService(this);
addService(this.metrics);
// JVM pause monitor
this.pauseMonitor = new JvmPauseMonitor();
this.pauseMonitor.init(conf);
}
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
if (this.pauseMonitor != null) {
this.pauseMonitor.start();
JvmMetrics jvmMetrics = this.metrics.getJvmMetrics();
if (jvmMetrics != null) {
jvmMetrics.setPauseMonitor(pauseMonitor);
}
}
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
// JVM pause monitor
if (this.pauseMonitor != null) {
this.pauseMonitor.stop();
}
super.serviceStop();
}
/**
* Shutdown the router.
*/
public void shutDown() {
new Thread() {
@Override
public void run() {
Router.this.stop();
}
}.start();
}
/////////////////////////////////////////////////////////
// RPC Server
/////////////////////////////////////////////////////////
/**
* Create a new Router RPC server to proxy ClientProtocol requests.
*
* @return New Router RPC Server.
* @throws IOException If the router RPC server was not started.
*/
protected RouterRpcServer createRpcServer() throws IOException {
return new RouterRpcServer(this.conf, this, this.getNamenodeResolver(),
this.getSubclusterResolver());
}
/**
* Get the Router RPC server.
*
* @return Router RPC server.
*/
public RouterRpcServer getRpcServer() {
return this.rpcServer;
}
/**
* Set the current RPC socket for the router.
*
* @param rpcAddress RPC address.
*/
protected void setRpcServerAddress(InetSocketAddress address) {
this.rpcAddress = address;
// Use the RPC address as our unique router Id
if (this.rpcAddress != null) {
try {
String hostname = InetAddress.getLocalHost().getHostName();
setRouterId(hostname + ":" + this.rpcAddress.getPort());
} catch (UnknownHostException ex) {
LOG.error("Cannot set unique router ID, address not resolvable {}",
this.rpcAddress);
}
}
}
/**
* Get the current RPC socket address for the router.
*
* @return InetSocketAddress
*/
public InetSocketAddress getRpcServerAddress() {
return this.rpcAddress;
}
/////////////////////////////////////////////////////////
// Admin server
/////////////////////////////////////////////////////////
/**
* Create a new router admin server to handle the router admin interface.
*
* @return RouterAdminServer
* @throws IOException If the admin server was not successfully started.
*/
protected RouterAdminServer createAdminServer() throws IOException {
return new RouterAdminServer(this.conf, this);
}
/**
* Set the current Admin socket for the router.
*
* @param adminAddress Admin RPC address.
*/
protected void setAdminServerAddress(InetSocketAddress address) {
this.adminAddress = address;
}
/**
* Get the current Admin socket address for the router.
*
* @return InetSocketAddress Admin address.
*/
public InetSocketAddress getAdminServerAddress() {
return adminAddress;
}
/////////////////////////////////////////////////////////
// HTTP server
/////////////////////////////////////////////////////////
/**
* Create an HTTP server for this Router.
*
* @return HTTP server for this Router.
*/
protected RouterHttpServer createHttpServer() {
return new RouterHttpServer(this);
}
/**
* Get the current HTTP socket address for the router.
*
* @return InetSocketAddress HTTP address.
*/
public InetSocketAddress getHttpServerAddress() {
if (httpServer != null) {
return httpServer.getHttpAddress();
}
return null;
}
/////////////////////////////////////////////////////////
// Namenode heartbeat monitors
/////////////////////////////////////////////////////////
/**
* Create each of the services that will monitor a Namenode.
*
* @return List of heartbeat services.
*/
protected Collection<NamenodeHeartbeatService>
createNamenodeHearbeatServices() {
Map<String, NamenodeHeartbeatService> ret = new HashMap<>();
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT)) {
// Create a local heartbet service
NamenodeHeartbeatService localHeartbeatService =
createLocalNamenodeHearbeatService();
if (localHeartbeatService != null) {
String nnDesc = localHeartbeatService.getNamenodeDesc();
ret.put(nnDesc, localHeartbeatService);
}
}
// Create heartbeat services for a list specified by the admin
String namenodes = this.conf.get(
DFSConfigKeys.DFS_ROUTER_MONITOR_NAMENODE);
if (namenodes != null) {
for (String namenode : namenodes.split(",")) {
String[] namenodeSplit = namenode.split("\\.");
String nsId = null;
String nnId = null;
if (namenodeSplit.length == 2) {
nsId = namenodeSplit[0];
nnId = namenodeSplit[1];
} else if (namenodeSplit.length == 1) {
nsId = namenode;
} else {
LOG.error("Wrong Namenode to monitor: {}", namenode);
}
if (nsId != null) {
NamenodeHeartbeatService heartbeatService =
createNamenodeHearbeatService(nsId, nnId);
if (heartbeatService != null) {
ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
}
}
}
}
return ret.values();
}
/**
* Create a new status updater for the local Namenode.
*
* @return Updater of the status for the local Namenode.
*/
protected NamenodeHeartbeatService createLocalNamenodeHearbeatService() {
// Detect NN running in this machine
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String nnId = null;
if (HAUtil.isHAEnabled(conf, nsId)) {
nnId = HAUtil.getNameNodeId(conf, nsId);
if (nnId == null) {
LOG.error("Cannot find namenode id for local {}", nsId);
}
}
return createNamenodeHearbeatService(nsId, nnId);
}
/**
* Create a heartbeat monitor for a particular Namenode.
*
* @param nsId Identifier of the nameservice to monitor.
* @param nnId Identifier of the namenode (HA) to monitor.
* @return Updater of the status for the specified Namenode.
*/
protected NamenodeHeartbeatService createNamenodeHearbeatService(
String nsId, String nnId) {
LOG.info("Creating heartbeat service for Namenode {} in {}", nnId, nsId);
NamenodeHeartbeatService ret = new NamenodeHeartbeatService(
namenodeResolver, nsId, nnId);
return ret;
}
/////////////////////////////////////////////////////////
// Submodule getters
/////////////////////////////////////////////////////////
/**
* Get the State Store service.
*
* @return State Store service.
*/
public StateStoreService getStateStore() {
return this.stateStore;
}
/**
* Get the metrics system for the Router.
*
* @return Router metrics.
*/
public RouterMetrics getRouterMetrics() {
if (this.metrics != null) {
return this.metrics.getRouterMetrics();
}
return null;
}
/**
* Get the federation metrics.
*
* @return Federation metrics.
*/
public FederationMetrics getMetrics() {
if (this.metrics != null) {
return this.metrics.getFederationMetrics();
}
return null;
}
/**
* Get the subcluster resolver for files.
*
* @return Subcluster resolver for files.
*/
public FileSubclusterResolver getSubclusterResolver() {
return this.subclusterResolver;
}
/**
* Get the namenode resolver for a subcluster.
*
* @return The namenode resolver for a subcluster.
*/
public ActiveNamenodeResolver getNamenodeResolver() {
return this.namenodeResolver;
}
/////////////////////////////////////////////////////////
// Router info
/////////////////////////////////////////////////////////
/**
* Unique ID for the router, typically the hostname:port string for the
* router's RPC server. This ID may be null on router startup before the RPC
* server has bound to a port.
*
* @return Router identifier.
*/
public String getRouterId() {
return this.routerId;
}
/**
* Sets a unique ID for this router.
*
* @param router Identifier of the Router.
*/
public void setRouterId(String id) {
this.routerId = id;
if (this.stateStore != null) {
this.stateStore.setIdentifier(this.routerId);
}
if (this.namenodeResolver != null) {
this.namenodeResolver.setRouterId(this.routerId);
}
}
}