blob: 896d08f2c49b6792c2c106f122e0b1d3a561495e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.waitNamenodeRegistered;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test utility to mimic a federated HDFS cluster with multiple routers.
*/
public class MiniRouterDFSCluster {
private static final Logger LOG =
LoggerFactory.getLogger(MiniRouterDFSCluster.class);
public static final String TEST_STRING = "teststring";
public static final String TEST_DIR = "testdir";
public static final String TEST_FILE = "testfile";
private static final Random RND = new Random();
/** Nameservices in the federated cluster. */
private List<String> nameservices;
/** Namenodes in the federated cluster. */
private List<NamenodeContext> namenodes;
/** Routers in the federated cluster. */
private List<RouterContext> routers;
/** If the Namenodes are in high availability.*/
private boolean highAvailability;
/** Number of datanodes per nameservice. */
private int numDatanodesPerNameservice = 2;
/** Custom storage type for each datanode. */
private StorageType[][] storageTypes = null;
/** Racks for datanodes. */
private String[] racks = null;
/** Mini cluster. */
private MiniDFSCluster cluster;
protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(5);
protected static final long DEFAULT_CACHE_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(5);
/** Heartbeat interval in milliseconds. */
private long heartbeatInterval;
/** Cache flush interval in milliseconds. */
private long cacheFlushInterval;
/** Router configuration initializes. */
private Configuration routerConf;
/** Router configuration overrides. */
private Configuration routerOverrides;
/** Namenode configuration overrides. */
private Configuration namenodeOverrides;
/** If the DNs are shared. */
private boolean sharedDNs = true;
/**
* Router context.
*/
public static class RouterContext {
private Router router;
private FileContext fileContext;
private String nameserviceId;
private String namenodeId;
private int rpcPort;
private int httpPort;
private DFSClient client;
private Configuration conf;
private RouterClient adminClient;
private URI fileSystemUri;
public RouterContext(Configuration conf, String nsId, String nnId) {
this.conf = conf;
this.nameserviceId = nsId;
this.namenodeId = nnId;
this.router = new Router();
this.router.init(conf);
}
public Router getRouter() {
return this.router;
}
public String getNameserviceId() {
return this.nameserviceId;
}
public String getNamenodeId() {
return this.namenodeId;
}
public int getRpcPort() {
return this.rpcPort;
}
public int getHttpPort() {
return this.httpPort;
}
public FileContext getFileContext() {
return this.fileContext;
}
public URI getFileSystemURI() {
return fileSystemUri;
}
public String getHttpAddress() {
InetSocketAddress httpAddress = router.getHttpServerAddress();
return NetUtils.getHostPortString(httpAddress);
}
public void initRouter() throws URISyntaxException {
// Store the bound points for the router interfaces
InetSocketAddress rpcAddress = router.getRpcServerAddress();
if (rpcAddress != null) {
this.rpcPort = rpcAddress.getPort();
this.fileSystemUri =
URI.create("hdfs://" + NetUtils.getHostPortString(rpcAddress));
// Override the default FS to point to the router RPC
DistributedFileSystem.setDefaultUri(conf, fileSystemUri);
try {
this.fileContext = FileContext.getFileContext(conf);
} catch (UnsupportedFileSystemException e) {
this.fileContext = null;
}
}
InetSocketAddress httpAddress = router.getHttpServerAddress();
if (httpAddress != null) {
this.httpPort = httpAddress.getPort();
}
}
public FileSystem getFileSystem() throws IOException {
return DistributedFileSystem.get(conf);
}
public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException {
LOG.info("Connecting to router at {}", fileSystemUri);
return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
@Override
public DFSClient run() throws IOException {
return new DFSClient(fileSystemUri, conf);
}
});
}
public RouterClient getAdminClient() throws IOException {
if (adminClient == null) {
InetSocketAddress routerSocket = router.getAdminServerAddress();
LOG.info("Connecting to router admin at {}", routerSocket);
adminClient = new RouterClient(routerSocket, conf);
}
return adminClient;
}
public void resetAdminClient() {
adminClient = null;
}
public DFSClient getClient() throws IOException, URISyntaxException {
if (client == null) {
LOG.info("Connecting to router at {}", fileSystemUri);
client = new DFSClient(fileSystemUri, conf);
}
return client;
}
public Configuration getConf() {
return conf;
}
}
/**
* Namenode context in the federated cluster.
*/
public class NamenodeContext {
private Configuration conf;
private NameNode namenode;
private String nameserviceId;
private String namenodeId;
private FileContext fileContext;
private int rpcPort;
private int servicePort;
private int lifelinePort;
private int httpPort;
private int httpsPort;
private URI fileSystemUri;
private int index;
private DFSClient client;
public NamenodeContext(
Configuration conf, String nsId, String nnId, int index) {
this.conf = conf;
this.nameserviceId = nsId;
this.namenodeId = nnId;
this.index = index;
}
public NameNode getNamenode() {
return this.namenode;
}
public String getNameserviceId() {
return this.nameserviceId;
}
public String getNamenodeId() {
return this.namenodeId;
}
public FileContext getFileContext() {
return this.fileContext;
}
public void setNamenode(NameNode nn) throws URISyntaxException {
this.namenode = nn;
// Store the bound ports and override the default FS with the local NN RPC
this.rpcPort = nn.getNameNodeAddress().getPort();
this.servicePort = nn.getServiceRpcAddress().getPort();
this.lifelinePort = nn.getServiceRpcAddress().getPort();
if (nn.getHttpAddress() != null) {
this.httpPort = nn.getHttpAddress().getPort();
}
if (nn.getHttpsAddress() != null) {
this.httpsPort = nn.getHttpsAddress().getPort();
}
this.fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort());
DistributedFileSystem.setDefaultUri(this.conf, this.fileSystemUri);
try {
this.fileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException e) {
this.fileContext = null;
}
}
public String getRpcAddress() {
return namenode.getNameNodeAddress().getHostName() + ":" + rpcPort;
}
public String getServiceAddress() {
return namenode.getServiceRpcAddress().getHostName() + ":" + servicePort;
}
public String getLifelineAddress() {
return namenode.getServiceRpcAddress().getHostName() + ":" + lifelinePort;
}
public String getWebAddress() {
if (conf.get(DFS_HTTP_POLICY_KEY)
.equals(HttpConfig.Policy.HTTPS_ONLY.name())) {
return getHttpsAddress();
}
return getHttpAddress();
}
public String getHttpAddress() {
return namenode.getHttpAddress().getHostName() + ":" + httpPort;
}
public String getHttpsAddress() {
return namenode.getHttpsAddress().getHostName() + ":" + httpsPort;
}
public FileSystem getFileSystem() throws IOException {
return DistributedFileSystem.get(conf);
}
public void resetClient() {
client = null;
}
public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException {
LOG.info("Connecting to namenode at {}", fileSystemUri);
return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
@Override
public DFSClient run() throws IOException {
return new DFSClient(fileSystemUri, conf);
}
});
}
public DFSClient getClient() throws IOException, URISyntaxException {
if (client == null) {
LOG.info("Connecting to namenode at {}", fileSystemUri);
client = new DFSClient(fileSystemUri, conf);
}
return client;
}
public String getConfSuffix() {
String suffix = nameserviceId;
if (highAvailability) {
suffix += "." + namenodeId;
}
return suffix;
}
public Configuration getConf() {
return conf;
}
}
public MiniRouterDFSCluster(
boolean ha, int numNameservices, int numNamenodes,
long heartbeatInterval, long cacheFlushInterval,
Configuration overrideConf) {
this.highAvailability = ha;
this.heartbeatInterval = heartbeatInterval;
this.cacheFlushInterval = cacheFlushInterval;
configureNameservices(numNameservices, numNamenodes, overrideConf);
}
public MiniRouterDFSCluster(
boolean ha, int numNameservices, int numNamenodes,
long heartbeatInterval, long cacheFlushInterval) {
this(ha, numNameservices, numNamenodes,
heartbeatInterval, cacheFlushInterval, null);
}
public MiniRouterDFSCluster(boolean ha, int numNameservices) {
this(ha, numNameservices, 2,
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS,
null);
}
public MiniRouterDFSCluster(
boolean ha, int numNameservices, int numNamenodes) {
this(ha, numNameservices, numNamenodes,
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS,
null);
}
public MiniRouterDFSCluster(boolean ha, int numNameservices,
Configuration overrideConf) {
this(ha, numNameservices, 2,
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS, overrideConf);
}
/**
* Add configuration settings to override default Router settings.
*
* @param conf Router configuration overrides.
*/
public void addRouterOverrides(Configuration conf) {
if (this.routerOverrides == null) {
this.routerOverrides = conf;
} else {
this.routerOverrides.addResource(conf);
}
}
/**
* Add configuration settings to override default Namenode settings.
*
* @param conf Namenode configuration overrides.
*/
public void addNamenodeOverrides(Configuration conf) {
if (this.namenodeOverrides == null) {
this.namenodeOverrides = conf;
} else {
this.namenodeOverrides.addResource(conf);
}
}
/**
* Generate the configuration for a client.
*
* @param nsId Nameservice identifier.
* @return New namenode configuration.
*/
public Configuration generateNamenodeConfiguration(String nsId) {
Configuration conf = new HdfsConfiguration();
conf.set(DFS_NAMESERVICES, getNameservicesKey());
conf.set(FS_DEFAULT_NAME_KEY, "hdfs://" + nsId);
for (String ns : nameservices) {
if (highAvailability) {
conf.set(
DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
NAMENODES[0] + "," + NAMENODES[1]);
}
for (NamenodeContext context : getNamenodes(ns)) {
String suffix = context.getConfSuffix();
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
"127.0.0.1:" + context.rpcPort);
conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
"127.0.0.1:" + context.httpPort);
conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix,
"0.0.0.0");
conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY + "." + suffix,
"127.0.0.1:" + context.httpsPort);
conf.set(
HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + ns,
ConfiguredFailoverProxyProvider.class.getName());
// If the service port is enabled by default, we need to set them up
boolean servicePortEnabled = false;
if (servicePortEnabled) {
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + "." + suffix,
"127.0.0.1:" + context.servicePort);
conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY + "." + suffix,
"0.0.0.0");
}
}
}
if (this.namenodeOverrides != null) {
conf.addResource(this.namenodeOverrides);
}
return conf;
}
/**
* Generate the configuration for a client.
*
* @return New configuration for a client.
*/
public Configuration generateClientConfiguration() {
Configuration conf = new HdfsConfiguration(false);
String ns0 = getNameservices().get(0);
conf.addResource(generateNamenodeConfiguration(ns0));
return conf;
}
/**
* Generate the configuration for a Router.
*
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier.
* @return New configuration for a Router.
*/
public Configuration generateRouterConfiguration(String nsId, String nnId) {
Configuration conf;
if (this.routerConf == null) {
conf = new Configuration(false);
} else {
conf = new Configuration(routerConf);
}
conf.addResource(generateNamenodeConfiguration(nsId));
conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, 10);
conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
conf.set(DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0");
conf.set(DFS_ROUTER_HTTP_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_ROUTER_HTTPS_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_ROUTER_HTTP_BIND_HOST_KEY, "0.0.0.0");
conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0));
conf.setLong(DFS_ROUTER_HEARTBEAT_INTERVAL_MS, heartbeatInterval);
conf.setLong(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, cacheFlushInterval);
// Use mock resolver classes
conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
MockResolver.class, ActiveNamenodeResolver.class);
conf.setClass(FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
MockResolver.class, FileSubclusterResolver.class);
// Disable safemode on startup
conf.setBoolean(DFS_ROUTER_SAFEMODE_ENABLE, false);
// Set the nameservice ID for the default NN monitor
conf.set(DFS_NAMESERVICE_ID, nsId);
if (nnId != null) {
conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
}
// Namenodes to monitor
StringBuilder sb = new StringBuilder();
for (String ns : this.nameservices) {
for (NamenodeContext context : getNamenodes(ns)) {
String suffix = context.getConfSuffix();
if (sb.length() != 0) {
sb.append(",");
}
sb.append(suffix);
}
}
conf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
// Add custom overrides if available
if (this.routerOverrides != null) {
for (Entry<String, String> entry : this.routerOverrides) {
String confKey = entry.getKey();
String confValue = entry.getValue();
conf.set(confKey, confValue);
}
}
return conf;
}
public void configureNameservices(int numNameservices, int numNamenodes,
Configuration overrideConf) {
this.nameservices = new ArrayList<>();
this.namenodes = new ArrayList<>();
NamenodeContext context = null;
int nnIndex = 0;
for (int i=0; i<numNameservices; i++) {
String ns = "ns" + i;
this.nameservices.add("ns" + i);
Configuration nnConf = generateNamenodeConfiguration(ns);
if (overrideConf != null) {
nnConf.addResource(overrideConf);
}
if (!highAvailability) {
context = new NamenodeContext(nnConf, ns, null, nnIndex++);
this.namenodes.add(context);
} else {
for (int j=0; j<numNamenodes; j++) {
context = new NamenodeContext(nnConf, ns, NAMENODES[j], nnIndex++);
this.namenodes.add(context);
}
}
}
}
public void setNumDatanodesPerNameservice(int num) {
this.numDatanodesPerNameservice = num;
}
/**
* Set custom storage type configuration for each datanode.
* If storageTypes is uninitialized or passed null then
* StorageType.DEFAULT is used.
*/
public void setStorageTypes(StorageType[][] storageTypes) {
this.storageTypes = storageTypes;
}
/**
* Set racks for each datanode. If racks is uninitialized or passed null then
* default is used.
*/
public void setRacks(String[] racks) {
this.racks = racks;
}
/**
* Set the DNs to belong to only one subcluster.
*/
public void setIndependentDNs() {
this.sharedDNs = false;
}
public String getNameservicesKey() {
StringBuilder sb = new StringBuilder();
for (String nsId : this.nameservices) {
if (sb.length() > 0) {
sb.append(",");
}
sb.append(nsId);
}
return sb.toString();
}
public String getRandomNameservice() {
int randIndex = RND.nextInt(nameservices.size());
return nameservices.get(randIndex);
}
public List<String> getNameservices() {
return nameservices;
}
public List<NamenodeContext> getNamenodes(String nameservice) {
List<NamenodeContext> nns = new ArrayList<>();
for (NamenodeContext c : namenodes) {
if (c.nameserviceId.equals(nameservice)) {
nns.add(c);
}
}
return nns;
}
public NamenodeContext getRandomNamenode() {
Random rand = new Random();
int i = rand.nextInt(this.namenodes.size());
return this.namenodes.get(i);
}
public List<NamenodeContext> getNamenodes() {
return this.namenodes;
}
public boolean isHighAvailability() {
return highAvailability;
}
public NamenodeContext getNamenode(String nameservice, String namenode) {
for (NamenodeContext c : this.namenodes) {
if (c.nameserviceId.equals(nameservice)) {
if (namenode == null || namenode.isEmpty() ||
c.namenodeId == null || c.namenodeId.isEmpty()) {
return c;
} else if (c.namenodeId.equals(namenode)) {
return c;
}
}
}
return null;
}
public List<RouterContext> getRouters(String nameservice) {
List<RouterContext> nns = new ArrayList<>();
for (RouterContext c : routers) {
if (c.nameserviceId.equals(nameservice)) {
nns.add(c);
}
}
return nns;
}
public RouterContext getRouterContext(String nsId, String nnId) {
for (RouterContext c : routers) {
if (nnId == null) {
return c;
}
if (c.namenodeId.equals(nnId) &&
c.nameserviceId.equals(nsId)) {
return c;
}
}
return null;
}
public RouterContext getRandomRouter() {
Random rand = new Random();
return routers.get(rand.nextInt(routers.size()));
}
public List<RouterContext> getRouters() {
return routers;
}
public RouterContext buildRouter(String nsId, String nnId)
throws URISyntaxException, IOException {
Configuration config = generateRouterConfiguration(nsId, nnId);
RouterContext rc = new RouterContext(config, nsId, nnId);
return rc;
}
public void startCluster() {
startCluster(null);
}
public void startCluster(Configuration overrideConf) {
try {
MiniDFSNNTopology topology = new MiniDFSNNTopology();
for (String ns : nameservices) {
NSConf conf = new MiniDFSNNTopology.NSConf(ns);
if (highAvailability) {
for (int i=0; i<namenodes.size()/nameservices.size(); i++) {
NNConf nnConf = new MiniDFSNNTopology.NNConf("nn" + i);
conf.addNN(nnConf);
}
} else {
NNConf nnConf = new MiniDFSNNTopology.NNConf(null);
conf.addNN(nnConf);
}
topology.addNameservice(conf);
}
topology.setFederation(true);
// Set independent DNs across subclusters
int numDNs = nameservices.size() * numDatanodesPerNameservice;
Configuration[] dnConfs = null;
if (!sharedDNs) {
dnConfs = new Configuration[numDNs];
int dnId = 0;
for (String nsId : nameservices) {
Configuration subclusterConf = new Configuration();
subclusterConf.set(DFS_INTERNAL_NAMESERVICES_KEY, nsId);
for (int i = 0; i < numDatanodesPerNameservice; i++) {
dnConfs[dnId] = subclusterConf;
dnId++;
}
}
}
// Start mini DFS cluster
String ns0 = nameservices.get(0);
Configuration nnConf = generateNamenodeConfiguration(ns0);
if (overrideConf != null) {
nnConf.addResource(overrideConf);
// Router also uses this configurations as initial values.
routerConf = new Configuration(overrideConf);
}
cluster = new MiniDFSCluster.Builder(nnConf)
.numDataNodes(numDNs)
.nnTopology(topology)
.dataNodeConfOverlays(dnConfs)
.storageTypes(storageTypes)
.racks(racks)
.build();
cluster.waitActive();
// Store NN pointers
for (int i = 0; i < namenodes.size(); i++) {
NameNode nn = cluster.getNameNode(i);
namenodes.get(i).setNamenode(nn);
}
} catch (Exception e) {
LOG.error("Cannot start Router DFS cluster: {}", e.getMessage(), e);
if (cluster != null) {
cluster.shutdown();
}
}
}
public void startRouters()
throws InterruptedException, URISyntaxException, IOException {
// Create one router per nameservice
this.routers = new ArrayList<>();
for (String ns : this.nameservices) {
for (NamenodeContext context : getNamenodes(ns)) {
RouterContext router = buildRouter(ns, context.namenodeId);
this.routers.add(router);
}
}
// Start all routers
for (RouterContext router : this.routers) {
router.router.start();
}
// Wait until all routers are active and record their ports
for (RouterContext router : this.routers) {
waitActive(router);
router.initRouter();
}
}
public void waitActive(NamenodeContext nn) throws IOException {
cluster.waitActive(nn.index);
}
public void waitActive(RouterContext router)
throws InterruptedException {
for (int loopCount = 0; loopCount < 20; loopCount++) {
// Validate connection of routers to NNs
if (router.router.getServiceState() == STATE.STARTED) {
return;
}
Thread.sleep(1000);
}
fail("Timeout waiting for " + router.router + " to activate");
}
public void registerNamenodes() throws IOException {
for (RouterContext r : this.routers) {
ActiveNamenodeResolver resolver = r.router.getNamenodeResolver();
for (NamenodeContext nn : this.namenodes) {
// Generate a report
NamenodeStatusReport report = new NamenodeStatusReport(
nn.nameserviceId, nn.namenodeId,
nn.getRpcAddress(), nn.getServiceAddress(),
nn.getLifelineAddress(), "http", nn.getWebAddress());
FSImage fsImage = nn.namenode.getNamesystem().getFSImage();
NamespaceInfo nsInfo = fsImage.getStorage().getNamespaceInfo();
report.setNamespaceInfo(nsInfo);
// Determine HA state from nn public state string
String nnState = nn.namenode.getState();
HAServiceState haState = HAServiceState.ACTIVE;
for (HAServiceState state : HAServiceState.values()) {
if (nnState.equalsIgnoreCase(state.name())) {
haState = state;
break;
}
}
report.setHAServiceState(haState);
// Register with the resolver
resolver.registerNamenode(report);
}
}
}
public void waitNamenodeRegistration() throws Exception {
for (RouterContext r : this.routers) {
Router router = r.router;
for (NamenodeContext nn : this.namenodes) {
ActiveNamenodeResolver nnResolver = router.getNamenodeResolver();
waitNamenodeRegistered(
nnResolver, nn.nameserviceId, nn.namenodeId, null);
}
}
}
public void waitRouterRegistrationQuorum(RouterContext router,
FederationNamenodeServiceState state, String nsId, String nnId)
throws Exception {
LOG.info("Waiting for NN {} {} to transition to {}", nsId, nnId, state);
ActiveNamenodeResolver nnResolver = router.router.getNamenodeResolver();
waitNamenodeRegistered(nnResolver, nsId, nnId, state);
}
/**
* Wait for name spaces to be active.
* @throws Exception If we cannot check the status or we timeout.
*/
public void waitActiveNamespaces() throws Exception {
for (RouterContext r : this.routers) {
Router router = r.router;
final ActiveNamenodeResolver resolver = router.getNamenodeResolver();
for (FederationNamespaceInfo ns : resolver.getNamespaces()) {
final String nsId = ns.getNameserviceId();
waitNamenodeRegistered(
resolver, nsId, FederationNamenodeServiceState.ACTIVE);
}
}
}
/**
* Get the federated path for a nameservice.
* @param nsId Nameservice identifier.
* @return Path in the Router.
*/
public String getFederatedPathForNS(String nsId) {
return "/" + nsId;
}
/**
* Get the namenode path for a nameservice.
* @param nsId Nameservice identifier.
* @return Path in the Namenode.
*/
public String getNamenodePathForNS(String nsId) {
return "/target-" + nsId;
}
/**
* Get the federated test directory for a nameservice.
* @param nsId Nameservice identifier.
* @return Example:
* <ul>
* <li>/ns0/testdir which maps to ns0->/target-ns0/testdir
* </ul>
*/
public String getFederatedTestDirectoryForNS(String nsId) {
return getFederatedPathForNS(nsId) + "/" + TEST_DIR;
}
/**
* Get the namenode test directory for a nameservice.
* @param nsId Nameservice identifier.
* @return example:
* <ul>
* <li>/target-ns0/testdir
* </ul>
*/
public String getNamenodeTestDirectoryForNS(String nsId) {
return getNamenodePathForNS(nsId) + "/" + TEST_DIR;
}
/**
* Get the federated test file for a nameservice.
* @param nsId Nameservice identifier.
* @return example:
* <ul>
* <li>/ns0/testfile which maps to ns0->/target-ns0/testfile
* </ul>
*/
public String getFederatedTestFileForNS(String nsId) {
return getFederatedPathForNS(nsId) + "/" + TEST_FILE;
}
/**
* Get the namenode test file for a nameservice.
* @param nsId Nameservice identifier.
* @return example:
* <ul>
* <li>/target-ns0/testfile
* </ul>
*/
public String getNamenodeTestFileForNS(String nsId) {
return getNamenodePathForNS(nsId) + "/" + TEST_FILE;
}
/**
* Switch a namenode in a nameservice to be the active.
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier.
*/
public void switchToActive(String nsId, String nnId) {
try {
int total = cluster.getNumNameNodes();
NameNodeInfo[] nns = cluster.getNameNodeInfos();
for (int i = 0; i < total; i++) {
NameNodeInfo nn = nns[i];
if (nn.getNameserviceId().equals(nsId) &&
nn.getNamenodeId().equals(nnId)) {
cluster.transitionToActive(i);
}
}
} catch (Throwable e) {
LOG.error("Cannot transition to active", e);
}
}
/**
* Switch a namenode in a nameservice to be in standby.
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier.
*/
public void switchToStandby(String nsId, String nnId) {
try {
int total = cluster.getNumNameNodes();
NameNodeInfo[] nns = cluster.getNameNodeInfos();
for (int i = 0; i < total; i++) {
NameNodeInfo nn = nns[i];
if (nn.getNameserviceId().equals(nsId) &&
nn.getNamenodeId().equals(nnId)) {
cluster.transitionToStandby(i);
}
}
} catch (Throwable e) {
LOG.error("Cannot transition to standby", e);
}
}
/**
* Stop the federated HDFS cluster.
*/
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
if (routers != null) {
for (RouterContext context : routers) {
stopRouter(context);
}
}
}
/**
* Stop a router.
* @param router Router context.
*/
public void stopRouter(RouterContext router) {
try {
router.router.shutDown();
int loopCount = 0;
while (router.router.getServiceState() != STATE.STOPPED) {
loopCount++;
Thread.sleep(1000);
if (loopCount > 20) {
LOG.error("Cannot shutdown router {}", router.rpcPort);
break;
}
}
} catch (InterruptedException e) {
}
}
/////////////////////////////////////////////////////////////////////////////
// Namespace Test Fixtures
/////////////////////////////////////////////////////////////////////////////
/**
* Creates test directories via the namenode.
* 1) /target-ns0/testfile
* 2) /target-ns1/testfile
* @throws IOException
*/
public void createTestDirectoriesNamenode() throws IOException {
// Add a test dir to each NS and verify
for (String ns : getNameservices()) {
NamenodeContext context = getNamenode(ns, null);
if (!createTestDirectoriesNamenode(context)) {
throw new IOException("Cannot create test directory for ns " + ns);
}
}
}
public boolean createTestDirectoriesNamenode(NamenodeContext nn)
throws IOException {
FileSystem fs = nn.getFileSystem();
String testDir = getNamenodeTestDirectoryForNS(nn.nameserviceId);
return addDirectory(fs, testDir);
}
public void deleteAllFiles() throws IOException {
// Delete all files via the NNs and verify
for (NamenodeContext context : getNamenodes()) {
FileSystem fs = context.getFileSystem();
FileStatus[] status = fs.listStatus(new Path("/"));
for (int i = 0; i <status.length; i++) {
Path p = status[i].getPath();
fs.delete(p, true);
}
status = fs.listStatus(new Path("/"));
assertEquals(status.length, 0);
}
}
/////////////////////////////////////////////////////////////////////////////
// MockRouterResolver Test Fixtures
/////////////////////////////////////////////////////////////////////////////
/**
* <ul>
* <li>/ -> [ns0->/].
* <li>/nso -> ns0->/target-ns0.
* <li>/ns1 -> ns1->/target-ns1.
* </ul>
*/
public void installMockLocations() {
for (RouterContext r : routers) {
MockResolver resolver =
(MockResolver) r.router.getSubclusterResolver();
// create table entries
for (String nsId : nameservices) {
// Direct path
String routerPath = getFederatedPathForNS(nsId);
String nnPath = getNamenodePathForNS(nsId);
resolver.addLocation(routerPath, nsId, nnPath);
}
// Root path points to both first nameservice
String ns0 = nameservices.get(0);
resolver.addLocation("/", ns0, "/");
}
}
public MiniDFSCluster getCluster() {
return cluster;
}
/**
* Wait until the federated cluster is up and ready.
* @throws IOException If we cannot wait for the cluster to be up.
*/
public void waitClusterUp() throws IOException {
cluster.waitClusterUp();
registerNamenodes();
try {
waitNamenodeRegistration();
} catch (Exception e) {
throw new IOException("Cannot wait for the namenodes", e);
}
}
}