blob: 9ee9692aad1f35a4b973604cd919b0e2921885dd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static java.util.Arrays.asList;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileSystem;
import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
/**
* Test namenodes monitor behavior in the Router.
*/
public class TestRouterNamenodeMonitoring {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterNamenodeMonitoring.class);
/** Router for the test. */
private Router router;
/** Namenodes in the cluster. */
private Map<String, Map<String, MockNamenode>> nns = new HashMap<>();
/** Nameservices in the federated cluster. */
private List<String> nsIds = asList("ns0", "ns1");
/** Namenodes in the cluster. */
private List<String> nnIds = asList("nn0", "nn1");
/** Time the test starts. */
private long initializedTime;
@Before
public void setup() throws Exception {
LOG.info("Initialize the Mock Namenodes to monitor");
for (String nsId : nsIds) {
nns.put(nsId, new HashMap<>());
for (String nnId : nnIds) {
nns.get(nsId).put(nnId, new MockNamenode(nsId));
}
}
LOG.info("Set nn0 to active for all nameservices");
for (Map<String, MockNamenode> nnNS : nns.values()) {
nnNS.get("nn0").transitionToActive();
nnNS.get("nn1").transitionToStandby();
}
initializedTime = Time.now();
}
@After
public void cleanup() throws Exception {
for (Map<String, MockNamenode> nnNS : nns.values()) {
for (MockNamenode nn : nnNS.values()) {
nn.stop();
}
}
nns.clear();
if (router != null) {
router.stop();
}
}
/**
* Get the configuration of the cluster which contains all the Namenodes and
* their addresses.
* @return Configuration containing all the Namenodes.
*/
private Configuration getNamenodesConfig() {
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_NAMESERVICES,
StringUtils.join(",", nns.keySet()));
for (String nsId : nns.keySet()) {
Set<String> nsNnIds = nns.get(nsId).keySet();
StringBuilder sb = new StringBuilder();
sb.append(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX);
sb.append(".").append(nsId);
conf.set(sb.toString(), StringUtils.join(",", nsNnIds));
for (String nnId : nsNnIds) {
final MockNamenode nn = nns.get(nsId).get(nnId);
sb = new StringBuilder();
sb.append(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
sb.append(".").append(nsId);
sb.append(".").append(nnId);
conf.set(sb.toString(), "localhost:" + nn.getRPCPort());
sb = new StringBuilder();
sb.append(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
sb.append(".").append(nsId);
sb.append(".").append(nnId);
conf.set(sb.toString(), "localhost:" + nn.getHTTPPort());
}
}
return conf;
}
@Test
public void testNamenodeMonitoring() throws Exception {
Configuration nsConf = getNamenodesConfig();
// Setup the State Store for the Router to use
Configuration stateStoreConfig = getStateStoreConfiguration();
stateStoreConfig.setClass(
RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
stateStoreConfig.setClass(
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
MountTableResolver.class, FileSubclusterResolver.class);
Configuration routerConf = new RouterConfigBuilder(nsConf)
.enableLocalHeartbeat(true)
.heartbeat()
.stateStore()
.rpc()
.build();
// Specify namenodes (ns1.nn0,ns1.nn1) to monitor
routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE,
"ns1.nn0,ns1.nn1");
routerConf.addResource(stateStoreConfig);
// Specify local node (ns0.nn1) to monitor
routerConf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, "ns0");
routerConf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
// Start the Router with the namenodes to monitor
router = new Router();
router.init(routerConf);
router.start();
// Manually trigger the heartbeat and update the values
Collection<NamenodeHeartbeatService> heartbeatServices =
router.getNamenodeHeartbeatServices();
for (NamenodeHeartbeatService service : heartbeatServices) {
service.periodicInvoke();
}
MembershipNamenodeResolver resolver =
(MembershipNamenodeResolver) router.getNamenodeResolver();
resolver.loadCache(true);
// Check that the monitored values are expected
final List<FederationNamenodeContext> namespaceInfo = new ArrayList<>();
for (String nsId : nns.keySet()) {
List<? extends FederationNamenodeContext> nnReports =
resolver.getNamenodesForNameserviceId(nsId, false);
namespaceInfo.addAll(nnReports);
}
for (FederationNamenodeContext nnInfo : namespaceInfo) {
long modTime = nnInfo.getDateModified();
long diff = modTime - initializedTime;
if ("ns0".equals(nnInfo.getNameserviceId()) &&
"nn0".equals(nnInfo.getNamenodeId())) {
// The modified date won't be updated in ns0.nn0
// since it isn't monitored by the Router.
assertTrue(nnInfo + " shouldn't be updated: " + diff,
modTime < initializedTime);
} else {
// other namnodes should be updated as expected
assertTrue(nnInfo + " should be updated: " + diff,
modTime > initializedTime);
}
}
}
@Test
public void testNamenodeMonitoringConfig() throws Exception {
testConfig(asList(), "");
testConfig(asList("ns1.nn0"), "ns1.nn0");
testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1");
testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0, ns1.nn1");
testConfig(asList("ns1.nn0", "ns1.nn1"), " ns1.nn0,ns1.nn1");
testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1,");
}
/**
* Test if configuring a Router to monitor particular Namenodes actually
* takes effect.
* @param expectedNNs Namenodes that should be monitored.
* @param confNsIds Router configuration setting for Namenodes to monitor.
*/
private void testConfig(
Collection<String> expectedNNs, String confNsIds) {
// Setup and start the Router
Configuration conf = getNamenodesConfig();
Configuration routerConf = new RouterConfigBuilder(conf)
.heartbeat(true)
.build();
routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, confNsIds);
router = new Router();
router.init(routerConf);
// Test the heartbeat services of the Router
Collection<NamenodeHeartbeatService> heartbeatServices =
router.getNamenodeHeartbeatServices();
assertNamenodeHeartbeatService(expectedNNs, heartbeatServices);
}
/**
* Assert that the namenodes monitored by the Router are the expected.
* @param expected Expected namenodes.
* @param actual Actual heartbeat services for the Router
*/
private static void assertNamenodeHeartbeatService(
Collection<String> expected,
Collection<NamenodeHeartbeatService> actual) {
final Set<String> actualSet = new TreeSet<>();
for (NamenodeHeartbeatService heartbeatService : actual) {
NamenodeStatusReport report = heartbeatService.getNamenodeStatusReport();
StringBuilder sb = new StringBuilder();
sb.append(report.getNameserviceId());
sb.append(".");
sb.append(report.getNamenodeId());
actualSet.add(sb.toString());
}
assertTrue(expected + " does not contain all " + actualSet,
expected.containsAll(actualSet));
assertTrue(actualSet + " does not contain all " + expected,
actualSet.containsAll(expected));
}
@Test
public void testJmxUrlHTTP() {
verifyUrlSchemes(HttpConfig.Policy.HTTP_ONLY.name());
}
@Test
public void testJmxUrlHTTPs() {
verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name());
}
@Test
public void testJmxRequestFrequency() {
// Disable JMX requests
Configuration conf = getNamenodesConfig();
conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS, -1);
verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name(), conf, 0, 0, 1);
// Set JMX requests to lower frequency
conf = getNamenodesConfig();
conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS,
TimeUnit.MINUTES.toMillis(5));
verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name(), conf, 0, 1, 2);
// Set JMX requests to default frequency
conf = getNamenodesConfig();
verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name(), conf, 0, 2, 2);
}
private void verifyUrlSchemes(String scheme) {
int httpRequests = HttpConfig.Policy.HTTP_ONLY.name().equals(scheme) ? 1 : 0;
int httpsRequests = HttpConfig.Policy.HTTPS_ONLY.name().equals(scheme) ? 1 : 0;
verifyUrlSchemes(scheme, getNamenodesConfig(), httpRequests, httpsRequests, 1);
}
private void verifyUrlSchemes(String scheme, Configuration conf, int httpRequests,
int httpsRequests, int requestsPerService) {
// Attach our own log appender so we can verify output
final LogVerificationAppender appender =
new LogVerificationAppender();
final org.apache.log4j.Logger logger =
org.apache.log4j.Logger.getRootLogger();
logger.addAppender(appender);
GenericTestUtils.setRootLogLevel(Level.DEBUG);
// Setup and start the Router
conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, scheme);
Configuration routerConf = new RouterConfigBuilder(conf)
.heartbeat(true)
.build();
routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, "ns1.nn0");
router = new Router();
router.init(routerConf);
// Test the heartbeat services of the Router
Collection<NamenodeHeartbeatService> heartbeatServices =
router.getNamenodeHeartbeatServices();
for (NamenodeHeartbeatService heartbeatService : heartbeatServices) {
for (int request = 0; request < requestsPerService; request++) {
heartbeatService.getNamenodeStatusReport();
}
}
assertEquals(httpsRequests * 2, appender.countLinesWithMessage("JMX URL: https://"));
assertEquals(httpRequests * 2, appender.countLinesWithMessage("JMX URL: http://"));
}
/**
* Test the view of the Datanodes that the Router sees. If a Datanode is
* registered in two subclusters, it should return the most up to date
* information.
* @throws IOException If the test cannot run.
*/
@Test
public void testDatanodesView() throws IOException {
// Setup the router
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.rpc()
.build();
router = new Router();
router.init(routerConf);
router.start();
// Setup the namenodes
for (String nsId : nsIds) {
registerSubclusters(router, nns.get(nsId).values());
for (String nnId : nnIds) {
MockNamenode nn = nns.get(nsId).get(nnId);
if ("nn0".equals(nnId)) {
nn.transitionToActive();
}
nn.addDatanodeMock();
}
}
// Set different states for the DNs in each namespace
long time = Time.now();
for (String nsId : nsIds) {
for (String nnId : nnIds) {
// dn0 is DECOMMISSIONED in the most recent (ns1)
DatanodeInfoBuilder dn0Builder = new DatanodeInfoBuilder()
.setDatanodeUuid("dn0")
.setHostName("dn0")
.setIpAddr("dn0")
.setXferPort(10000);
if ("ns0".equals(nsId)) {
dn0Builder.setLastUpdate(time - 1000);
dn0Builder.setAdminState(AdminStates.NORMAL);
} else if ("ns1".equals(nsId)) {
dn0Builder.setLastUpdate(time - 500);
dn0Builder.setAdminState(AdminStates.DECOMMISSIONED);
}
// dn1 is NORMAL in the most recent (ns0)
DatanodeInfoBuilder dn1Builder = new DatanodeInfoBuilder()
.setDatanodeUuid("dn1")
.setHostName("dn1")
.setIpAddr("dn1")
.setXferPort(10000);
if ("ns0".equals(nsId)) {
dn1Builder.setLastUpdate(time - 1000);
dn1Builder.setAdminState(AdminStates.NORMAL);
} else if ("ns1".equals(nsId)) {
dn1Builder.setLastUpdate(time - 5 * 1000);
dn1Builder.setAdminState(AdminStates.DECOMMISSION_INPROGRESS);
}
// Update the mock NameNode with the DN views
MockNamenode nn = nns.get(nsId).get(nnId);
List<DatanodeInfo> dns = nn.getDatanodes();
dns.add(dn0Builder.build());
dns.add(dn1Builder.build());
}
}
// Get the datanodes from the Router and check we get the right view
DistributedFileSystem dfs = (DistributedFileSystem)getFileSystem(router);
DFSClient dfsClient = dfs.getClient();
DatanodeStorageReport[] dns = dfsClient.getDatanodeStorageReport(
DatanodeReportType.ALL);
assertEquals(2, dns.length);
for (DatanodeStorageReport dn : dns) {
DatanodeInfo dnInfo = dn.getDatanodeInfo();
if ("dn0".equals(dnInfo.getHostName())) {
assertEquals(AdminStates.DECOMMISSIONED, dnInfo.getAdminState());
} else if ("dn1".equals(dnInfo.getHostName())) {
assertEquals(AdminStates.NORMAL, dnInfo.getAdminState());
} else {
fail("Unexpected DN: " + dnInfo.getHostName());
}
}
}
}