blob: 2c703958704966b5a45a9bd4050d4aa4e02e78e5 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hdfs.server.federation;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.waitNamenodeRegistered;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS;
import static org.junit.Assert.assertEquals;
import static;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.service.Service.STATE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Test utility to mimic a federated HDFS cluster with multiple routers.
public class MiniRouterDFSCluster {
private static final Logger LOG =
public static final String TEST_STRING = "teststring";
public static final String TEST_DIR = "testdir";
public static final String TEST_FILE = "testfile";
private static final Random RND = new Random();
/** Nameservices in the federated cluster. */
private List<String> nameservices;
/** Namenodes in the federated cluster. */
private List<NamenodeContext> namenodes;
/** Routers in the federated cluster. */
private List<RouterContext> routers;
/** If the Namenodes are in high availability.*/
private boolean highAvailability;
/** Number of datanodes per nameservice. */
private int numDatanodesPerNameservice = 2;
/** Custom storage type for each datanode. */
private StorageType[][] storageTypes = null;
/** Racks for datanodes. */
private String[] racks = null;
/** Mini cluster. */
private MiniDFSCluster cluster;
protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS =
protected static final long DEFAULT_CACHE_INTERVAL_MS =
/** Heartbeat interval in milliseconds. */
private long heartbeatInterval;
/** Cache flush interval in milliseconds. */
private long cacheFlushInterval;
/** Router configuration initializes. */
private Configuration routerConf;
/** Router configuration overrides. */
private Configuration routerOverrides;
/** Namenode configuration overrides. */
private Configuration namenodeOverrides;
/** If the DNs are shared. */
private boolean sharedDNs = true;
* Router context.
public static class RouterContext {
private Router router;
private FileContext fileContext;
private String nameserviceId;
private String namenodeId;
private int rpcPort;
private int httpPort;
private DFSClient client;
private Configuration conf;
private RouterClient adminClient;
private URI fileSystemUri;
public RouterContext(Configuration conf, String nsId, String nnId) {
this.conf = conf;
this.nameserviceId = nsId;
this.namenodeId = nnId;
this.router = new Router();
public Router getRouter() {
return this.router;
public String getNameserviceId() {
return this.nameserviceId;
public String getNamenodeId() {
return this.namenodeId;
public int getRpcPort() {
return this.rpcPort;
public int getHttpPort() {
return this.httpPort;
public FileContext getFileContext() {
return this.fileContext;
public URI getFileSystemURI() {
return fileSystemUri;
public String getHttpAddress() {
InetSocketAddress httpAddress = router.getHttpServerAddress();
return NetUtils.getHostPortString(httpAddress);
public void initRouter() throws URISyntaxException {
// Store the bound points for the router interfaces
InetSocketAddress rpcAddress = router.getRpcServerAddress();
if (rpcAddress != null) {
this.rpcPort = rpcAddress.getPort();
this.fileSystemUri =
URI.create("hdfs://" + NetUtils.getHostPortString(rpcAddress));
// Override the default FS to point to the router RPC
DistributedFileSystem.setDefaultUri(conf, fileSystemUri);
try {
this.fileContext = FileContext.getFileContext(conf);
} catch (UnsupportedFileSystemException e) {
this.fileContext = null;
InetSocketAddress httpAddress = router.getHttpServerAddress();
if (httpAddress != null) {
this.httpPort = httpAddress.getPort();
public FileSystem getFileSystem() throws IOException {
return DistributedFileSystem.get(conf);
public FileSystem getFileSystem(Configuration configuration) throws IOException {
return DistributedFileSystem.get(configuration);
public FileSystem getFileSystemWithObserverReadProxyProvider() throws IOException {
Configuration observerReadConf = new Configuration(conf);
observerReadConf.get(DFS_NAMESERVICES)+ ",router-service");
observerReadConf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1");
observerReadConf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1",
+ "." + "router-service", ObserverReadProxyProvider.class.getName());
DistributedFileSystem.setDefaultUri(observerReadConf, "hdfs://router-service");
return DistributedFileSystem.get(observerReadConf);
public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException {"Connecting to router at {}", fileSystemUri);
return user.doAs((PrivilegedExceptionAction<DFSClient>)
() -> new DFSClient(fileSystemUri, conf));
public RouterClient getAdminClient() throws IOException {
if (adminClient == null) {
InetSocketAddress routerSocket = router.getAdminServerAddress();"Connecting to router admin at {}", routerSocket);
adminClient = new RouterClient(routerSocket, conf);
return adminClient;
public void resetAdminClient() {
adminClient = null;
public DFSClient getClient() throws IOException, URISyntaxException {
if (client == null) {"Connecting to router at {}", fileSystemUri);
client = new DFSClient(fileSystemUri, conf);
return client;
public Configuration getConf() {
return conf;
public RouterRpcServer getRouterRpcServer() {
return router.getRpcServer();
public RouterRpcClient getRouterRpcClient() {
return getRouterRpcServer().getRPCClient();
* Namenode context in the federated cluster.
public class NamenodeContext {
private Configuration conf;
private NameNode namenode;
private String nameserviceId;
private String namenodeId;
private FileContext fileContext;
private int rpcPort;
private int servicePort;
private int lifelinePort;
private int httpPort;
private int httpsPort;
private URI fileSystemUri;
private int index;
private DFSClient client;
public NamenodeContext(
Configuration conf, String nsId, String nnId, int index) {
this.conf = conf;
this.nameserviceId = nsId;
this.namenodeId = nnId;
this.index = index;
public NameNode getNamenode() {
return this.namenode;
public String getNameserviceId() {
return this.nameserviceId;
public String getNamenodeId() {
return this.namenodeId;
public FileContext getFileContext() {
return this.fileContext;
public void setNamenode(NameNode nn) throws URISyntaxException {
this.namenode = nn;
// Store the bound ports and override the default FS with the local NN RPC
this.rpcPort = nn.getNameNodeAddress().getPort();
this.servicePort = nn.getServiceRpcAddress().getPort();
this.lifelinePort = nn.getServiceRpcAddress().getPort();
if (nn.getHttpAddress() != null) {
this.httpPort = nn.getHttpAddress().getPort();
if (nn.getHttpsAddress() != null) {
this.httpsPort = nn.getHttpsAddress().getPort();
this.fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort());
DistributedFileSystem.setDefaultUri(this.conf, this.fileSystemUri);
try {
this.fileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException e) {
this.fileContext = null;
public String getRpcAddress() {
return namenode.getNameNodeAddress().getHostName() + ":" + rpcPort;
public String getServiceAddress() {
return namenode.getServiceRpcAddress().getHostName() + ":" + servicePort;
public String getLifelineAddress() {
return namenode.getServiceRpcAddress().getHostName() + ":" + lifelinePort;
public String getWebAddress() {
if (conf.get(DFS_HTTP_POLICY_KEY)
.equals( {
return getHttpsAddress();
return getHttpAddress();
public String getHttpAddress() {
return namenode.getHttpAddress().getHostName() + ":" + httpPort;
public String getHttpsAddress() {
return namenode.getHttpsAddress().getHostName() + ":" + httpsPort;
public FileSystem getFileSystem() throws IOException {
return DistributedFileSystem.get(conf);
public void resetClient() {
client = null;
public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException {"Connecting to namenode at {}", fileSystemUri);
return user.doAs((PrivilegedExceptionAction<DFSClient>)
() -> new DFSClient(fileSystemUri, conf));
public DFSClient getClient() throws IOException, URISyntaxException {
if (client == null) {"Connecting to namenode at {}", fileSystemUri);
client = new DFSClient(fileSystemUri, conf);
return client;
public String getConfSuffix() {
String suffix = nameserviceId;
if (highAvailability) {
suffix += "." + namenodeId;
return suffix;
public Configuration getConf() {
return conf;
public MiniRouterDFSCluster(
boolean ha, int numNameservices, int numNamenodes,
long heartbeatInterval, long cacheFlushInterval,
Configuration overrideConf) {
this.highAvailability = ha;
this.heartbeatInterval = heartbeatInterval;
this.cacheFlushInterval = cacheFlushInterval;
configureNameservices(numNameservices, numNamenodes, overrideConf);
public MiniRouterDFSCluster(
boolean ha, int numNameservices, int numNamenodes,
long heartbeatInterval, long cacheFlushInterval) {
this(ha, numNameservices, numNamenodes,
heartbeatInterval, cacheFlushInterval, null);
public MiniRouterDFSCluster(boolean ha, int numNameservices) {
this(ha, numNameservices, 2,
public MiniRouterDFSCluster(
boolean ha, int numNameservices, int numNamenodes) {
this(ha, numNameservices, numNamenodes,
public MiniRouterDFSCluster(boolean ha, int numNameservices,
Configuration overrideConf) {
this(ha, numNameservices, 2,
* Add configuration settings to override default Router settings.
* @param conf Router configuration overrides.
public void addRouterOverrides(Configuration conf) {
if (this.routerOverrides == null) {
this.routerOverrides = conf;
} else {
* Add configuration settings to override default Namenode settings.
* @param conf Namenode configuration overrides.
public void addNamenodeOverrides(Configuration conf) {
if (this.namenodeOverrides == null) {
this.namenodeOverrides = conf;
} else {
* Generate the configuration for a client.
* @param nsId Nameservice identifier.
* @return New namenode configuration.
public Configuration generateNamenodeConfiguration(String nsId) {
Configuration conf = new HdfsConfiguration();
conf.set(DFS_NAMESERVICES, getNameservicesKey());
conf.set(FS_DEFAULT_NAME_KEY, "hdfs://" + nsId);
for (String ns : nameservices) {
if (highAvailability) {
for (NamenodeContext context : getNamenodes(ns)) {
String suffix = context.getConfSuffix();
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
"" + context.rpcPort);
conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
"" + context.httpPort);
conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix,
conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY + "." + suffix,
"" + context.httpsPort);
HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + ns,
// If the service port is enabled by default, we need to set them up
boolean servicePortEnabled = false;
if (servicePortEnabled) {
"" + context.servicePort);
if (this.namenodeOverrides != null) {
return conf;
* Generate the configuration for a client.
* @return New configuration for a client.
public Configuration generateClientConfiguration() {
Configuration conf = new HdfsConfiguration(false);
String ns0 = getNameservices().get(0);
return conf;
* Generate the configuration for a Router.
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier.
* @return New configuration for a Router.
public Configuration generateRouterConfiguration(String nsId, String nnId) {
Configuration conf;
if (this.routerConf == null) {
conf = new Configuration(false);
} else {
conf = new Configuration(routerConf);
conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0));
conf.setLong(DFS_ROUTER_HEARTBEAT_INTERVAL_MS, heartbeatInterval);
conf.setLong(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, cacheFlushInterval);
// Use mock resolver classes
MockResolver.class, ActiveNamenodeResolver.class);
MockResolver.class, FileSubclusterResolver.class);
// Disable safemode on startup
conf.setBoolean(DFS_ROUTER_SAFEMODE_ENABLE, false);
// Set the nameservice ID for the default NN monitor
conf.set(DFS_NAMESERVICE_ID, nsId);
if (nnId != null) {
conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
// Namenodes to monitor
StringBuilder sb = new StringBuilder();
for (String ns : this.nameservices) {
for (NamenodeContext context : getNamenodes(ns)) {
String suffix = context.getConfSuffix();
if (sb.length() != 0) {
conf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
// Add custom overrides if available
if (this.routerOverrides != null) {
for (Entry<String, String> entry : this.routerOverrides) {
String confKey = entry.getKey();
String confValue = entry.getValue();
conf.set(confKey, confValue);
return conf;
public void configureNameservices(int numNameservices, int numNamenodes,
Configuration overrideConf) {
this.nameservices = new ArrayList<>();
this.namenodes = new ArrayList<>();
NamenodeContext context = null;
int nnIndex = 0;
for (int i=0; i<numNameservices; i++) {
String ns = "ns" + i;
this.nameservices.add("ns" + i);
Configuration nnConf = generateNamenodeConfiguration(ns);
if (overrideConf != null) {
if (!highAvailability) {
context = new NamenodeContext(nnConf, ns, null, nnIndex++);
} else {
for (int j=0; j<numNamenodes; j++) {
context = new NamenodeContext(nnConf, ns, NAMENODES[j], nnIndex++);
public void setNumDatanodesPerNameservice(int num) {
this.numDatanodesPerNameservice = num;
* Set custom storage type configuration for each datanode.
* If storageTypes is uninitialized or passed null then
* StorageType.DEFAULT is used.
public void setStorageTypes(StorageType[][] storageTypes) {
this.storageTypes = storageTypes;
* Set racks for each datanode. If racks is uninitialized or passed null then
* default is used.
public void setRacks(String[] racks) {
this.racks = racks;
* Set the DNs to belong to only one subcluster.
public void setIndependentDNs() {
this.sharedDNs = false;
public String getNameservicesKey() {
StringBuilder sb = new StringBuilder();
for (String nsId : this.nameservices) {
if (sb.length() > 0) {
return sb.toString();
public String getRandomNameservice() {
int randIndex = RND.nextInt(nameservices.size());
return nameservices.get(randIndex);
public List<String> getNameservices() {
return nameservices;
public List<NamenodeContext> getNamenodes(String nameservice) {
List<NamenodeContext> nns = new ArrayList<>();
for (NamenodeContext c : namenodes) {
if (c.nameserviceId.equals(nameservice)) {
return nns;
public NamenodeContext getRandomNamenode() {
Random rand = new Random();
int i = rand.nextInt(this.namenodes.size());
return this.namenodes.get(i);
public List<NamenodeContext> getNamenodes() {
return this.namenodes;
public boolean isHighAvailability() {
return highAvailability;
public NamenodeContext getNamenode(String nameservice, String namenode) {
for (NamenodeContext c : this.namenodes) {
if (c.nameserviceId.equals(nameservice)) {
if (namenode == null || namenode.isEmpty() ||
c.namenodeId == null || c.namenodeId.isEmpty()) {
return c;
} else if (c.namenodeId.equals(namenode)) {
return c;
return null;
public List<RouterContext> getRouters(String nameservice) {
List<RouterContext> nns = new ArrayList<>();
for (RouterContext c : routers) {
if (c.nameserviceId.equals(nameservice)) {
return nns;
public RouterContext getRouterContext(String nsId, String nnId) {
for (RouterContext c : routers) {
if (nnId == null) {
return c;
if (c.namenodeId.equals(nnId) &&
c.nameserviceId.equals(nsId)) {
return c;
return null;
public RouterContext getRandomRouter() {
Random rand = new Random();
return routers.get(rand.nextInt(routers.size()));
public List<RouterContext> getRouters() {
return routers;
public RouterContext buildRouter(String nsId, String nnId)
throws URISyntaxException, IOException {
Configuration config = generateRouterConfiguration(nsId, nnId);
RouterContext rc = new RouterContext(config, nsId, nnId);
return rc;
public void startCluster() {
public void startCluster(Configuration overrideConf) {
try {
MiniDFSNNTopology topology = new MiniDFSNNTopology();
for (String ns : nameservices) {
NSConf conf = new MiniDFSNNTopology.NSConf(ns);
if (highAvailability) {
for (int i=0; i<namenodes.size()/nameservices.size(); i++) {
NNConf nnConf = new MiniDFSNNTopology.NNConf("nn" + i);
} else {
NNConf nnConf = new MiniDFSNNTopology.NNConf(null);
// Generate conf for namenodes and datanodes
String ns0 = nameservices.get(0);
Configuration nnConf = generateNamenodeConfiguration(ns0);
if (overrideConf != null) {
// Router also uses these configurations as initial values.
routerConf = new Configuration(overrideConf);
// Set independent DNs across subclusters
int numDNs = nameservices.size() * numDatanodesPerNameservice;
Configuration[] dnConfs = null;
if (!sharedDNs) {
dnConfs = new Configuration[numDNs];
int dnId = 0;
for (String nsId : nameservices) {
Configuration subclusterConf = new Configuration(nnConf);
subclusterConf.set(DFS_INTERNAL_NAMESERVICES_KEY, nsId);
for (int i = 0; i < numDatanodesPerNameservice; i++) {
dnConfs[dnId] = subclusterConf;
// Start mini DFS cluster
cluster = new MiniDFSCluster.Builder(nnConf)
// Store NN pointers
for (int i = 0; i < namenodes.size(); i++) {
NameNode nn = cluster.getNameNode(i);
} catch (Exception e) {
LOG.error("Cannot start Router DFS cluster: {}", e.getMessage(), e);
if (cluster != null) {
public void startRouters()
throws InterruptedException, URISyntaxException, IOException {
// Create one router per nameservice
this.routers = new ArrayList<>();
for (String ns : this.nameservices) {
for (NamenodeContext context : getNamenodes(ns)) {
RouterContext router = buildRouter(ns, context.namenodeId);
// Start all routers
for (RouterContext router : this.routers) {
// Wait until all routers are active and record their ports
for (RouterContext router : this.routers) {
public void waitActive(NamenodeContext nn) throws IOException {
public void waitActive(RouterContext router)
throws InterruptedException {
for (int loopCount = 0; loopCount < 20; loopCount++) {
// Validate connection of routers to NNs
if (router.router.getServiceState() == STATE.STARTED) {
fail("Timeout waiting for " + router.router + " to activate");
public void registerNamenodes() throws IOException {
for (RouterContext r : this.routers) {
ActiveNamenodeResolver resolver = r.router.getNamenodeResolver();
for (NamenodeContext nn : this.namenodes) {
// Generate a report
NamenodeStatusReport report = new NamenodeStatusReport(
nn.nameserviceId, nn.namenodeId,
nn.getRpcAddress(), nn.getServiceAddress(),
nn.getLifelineAddress(), "http", nn.getWebAddress());
FSImage fsImage = nn.namenode.getNamesystem().getFSImage();
NamespaceInfo nsInfo = fsImage.getStorage().getNamespaceInfo();
// Determine HA state from nn public state string
String nnState = nn.namenode.getState();
HAServiceState haState = HAServiceState.ACTIVE;
for (HAServiceState state : HAServiceState.values()) {
if (nnState.equalsIgnoreCase( {
haState = state;
// Register with the resolver
public void waitNamenodeRegistration() throws Exception {
for (RouterContext r : this.routers) {
Router router = r.router;
for (NamenodeContext nn : this.namenodes) {
ActiveNamenodeResolver nnResolver = router.getNamenodeResolver();
nnResolver, nn.nameserviceId, nn.namenodeId, null);
public void waitRouterRegistrationQuorum(RouterContext router,
FederationNamenodeServiceState state, String nsId, String nnId)
throws Exception {"Waiting for NN {} {} to transition to {}", nsId, nnId, state);
ActiveNamenodeResolver nnResolver = router.router.getNamenodeResolver();
waitNamenodeRegistered(nnResolver, nsId, nnId, state);
* Wait for name spaces to be active.
* @throws Exception If we cannot check the status or we timeout.
public void waitActiveNamespaces() throws Exception {
for (RouterContext r : this.routers) {
Router router = r.router;
final ActiveNamenodeResolver resolver = router.getNamenodeResolver();
for (FederationNamespaceInfo ns : resolver.getNamespaces()) {
final String nsId = ns.getNameserviceId();
resolver, nsId, FederationNamenodeServiceState.ACTIVE);
* Get the federated path for a nameservice.
* @param nsId Nameservice identifier.
* @return Path in the Router.
public String getFederatedPathForNS(String nsId) {
return "/" + nsId;
* Get the namenode path for a nameservice.
* @param nsId Nameservice identifier.
* @return Path in the Namenode.
public String getNamenodePathForNS(String nsId) {
return "/target-" + nsId;
* Get the federated test directory for a nameservice.
* @param nsId Nameservice identifier.
* @return Example:
* <ul>
* <li>/ns0/testdir which maps to ns0->/target-ns0/testdir
* </ul>
public String getFederatedTestDirectoryForNS(String nsId) {
return getFederatedPathForNS(nsId) + "/" + TEST_DIR;
* Get the namenode test directory for a nameservice.
* @param nsId Nameservice identifier.
* @return example:
* <ul>
* <li>/target-ns0/testdir
* </ul>
public String getNamenodeTestDirectoryForNS(String nsId) {
return getNamenodePathForNS(nsId) + "/" + TEST_DIR;
* Get the federated test file for a nameservice.
* @param nsId Nameservice identifier.
* @return example:
* <ul>
* <li>/ns0/testfile which maps to ns0->/target-ns0/testfile
* </ul>
public String getFederatedTestFileForNS(String nsId) {
return getFederatedPathForNS(nsId) + "/" + TEST_FILE;
* Get the namenode test file for a nameservice.
* @param nsId Nameservice identifier.
* @return example:
* <ul>
* <li>/target-ns0/testfile
* </ul>
public String getNamenodeTestFileForNS(String nsId) {
return getNamenodePathForNS(nsId) + "/" + TEST_FILE;
* Switch a namenode in a nameservice to be the active.
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier.
public void switchToActive(String nsId, String nnId) {
try {
int total = cluster.getNumNameNodes();
NameNodeInfo[] nns = cluster.getNameNodeInfos();
for (int i = 0; i < total; i++) {
NameNodeInfo nn = nns[i];
if (nn.getNameserviceId().equals(nsId) &&
nn.getNamenodeId().equals(nnId)) {
} catch (Throwable e) {
LOG.error("Cannot transition to active", e);
* Switch a namenode in a nameservice to be in standby.
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier.
public void switchToStandby(String nsId, String nnId) {
try {
int total = cluster.getNumNameNodes();
NameNodeInfo[] nns = cluster.getNameNodeInfos();
for (int i = 0; i < total; i++) {
NameNodeInfo nn = nns[i];
if (nn.getNameserviceId().equals(nsId) &&
nn.getNamenodeId().equals(nnId)) {
} catch (Throwable e) {
LOG.error("Cannot transition to standby", e);
* Switch a namenode in a nameservice to be the observer.
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier.
public void switchToObserver(String nsId, String nnId) {
try {
int total = cluster.getNumNameNodes();
NameNodeInfo[] nns = cluster.getNameNodeInfos();
for (int i = 0; i < total; i++) {
NameNodeInfo nn = nns[i];
if (nn.getNameserviceId().equals(nsId) &&
nn.getNamenodeId().equals(nnId)) {
} catch (Throwable e) {
LOG.error("Cannot transition to active", e);
* Stop the federated HDFS cluster.
public void shutdown() {
if (cluster != null) {
if (routers != null) {
for (RouterContext context : routers) {
* Stop a router.
* @param router Router context.
public void stopRouter(RouterContext router) {
try {
int loopCount = 0;
while (router.router.getServiceState() != STATE.STOPPED) {
if (loopCount > 20) {
LOG.error("Cannot shutdown router {}", router.rpcPort);
} catch (InterruptedException e) {
// Namespace Test Fixtures
* Creates test directories via the namenode.
* 1) /target-ns0/testfile
* 2) /target-ns1/testfile
* @throws IOException
public void createTestDirectoriesNamenode() throws IOException {
// Add a test dir to each NS and verify
for (String ns : getNameservices()) {
NamenodeContext context = getNamenode(ns, null);
if (!createTestDirectoriesNamenode(context)) {
throw new IOException("Cannot create test directory for ns " + ns);
public boolean createTestDirectoriesNamenode(NamenodeContext nn)
throws IOException {
FileSystem fs = nn.getFileSystem();
String testDir = getNamenodeTestDirectoryForNS(nn.nameserviceId);
return addDirectory(fs, testDir);
public void deleteAllFiles() throws IOException {
// Delete all files via the NNs and verify
for (NamenodeContext context : getNamenodes()) {
FileSystem fs = context.getFileSystem();
FileStatus[] status = fs.listStatus(new Path("/"));
for (int i = 0; i <status.length; i++) {
Path p = status[i].getPath();
fs.delete(p, true);
status = fs.listStatus(new Path("/"));
assertEquals(status.length, 0);
// MockRouterResolver Test Fixtures
* <ul>
* <li>/ -> [ns0->/].
* <li>/nso -> ns0->/target-ns0.
* <li>/ns1 -> ns1->/target-ns1.
* </ul>
public void installMockLocations() {
for (RouterContext r : routers) {
MockResolver resolver =
(MockResolver) r.router.getSubclusterResolver();
// create table entries
for (String nsId : nameservices) {
// Direct path
String routerPath = getFederatedPathForNS(nsId);
String nnPath = getNamenodePathForNS(nsId);
resolver.addLocation(routerPath, nsId, nnPath);
// Root path points to both first nameservice
String ns0 = nameservices.get(0);
resolver.addLocation("/", ns0, "/");
public MiniDFSCluster getCluster() {
return cluster;
* Wait until the federated cluster is up and ready.
* @throws IOException If we cannot wait for the cluster to be up.
public void waitClusterUp() throws IOException {
try {
} catch (Exception e) {
throw new IOException("Cannot wait for the namenodes", e);