| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.federation.router; |
| |
| import static java.util.Arrays.asList; |
| import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileSystem; |
| import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters; |
| import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeSet; |
| |
| import java.util.concurrent.TimeUnit; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdfs.DFSClient; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.LogVerificationAppender; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; |
| import org.apache.hadoop.hdfs.server.federation.MockNamenode; |
| import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; |
| import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; |
| import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; |
| import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; |
| import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; |
| import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; |
| import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; |
| import org.apache.hadoop.http.HttpConfig; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.Time; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.slf4j.event.Level; |
| |
| /** |
| * Test namenodes monitor behavior in the Router. |
| */ |
| public class TestRouterNamenodeMonitoring { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestRouterNamenodeMonitoring.class); |
| |
| |
| /** Router for the test. */ |
| private Router router; |
| /** Namenodes in the cluster. */ |
| private Map<String, Map<String, MockNamenode>> nns = new HashMap<>(); |
| /** Nameservices in the federated cluster. */ |
| private List<String> nsIds = asList("ns0", "ns1"); |
| /** Namenodes in the cluster. */ |
| private List<String> nnIds = asList("nn0", "nn1"); |
| |
| /** Time the test starts. */ |
| private long initializedTime; |
| |
| |
| @Before |
| public void setup() throws Exception { |
| LOG.info("Initialize the Mock Namenodes to monitor"); |
| for (String nsId : nsIds) { |
| nns.put(nsId, new HashMap<>()); |
| for (String nnId : nnIds) { |
| nns.get(nsId).put(nnId, new MockNamenode(nsId)); |
| } |
| } |
| |
| LOG.info("Set nn0 to active for all nameservices"); |
| for (Map<String, MockNamenode> nnNS : nns.values()) { |
| nnNS.get("nn0").transitionToActive(); |
| nnNS.get("nn1").transitionToStandby(); |
| } |
| |
| initializedTime = Time.now(); |
| } |
| |
| @After |
| public void cleanup() throws Exception { |
| for (Map<String, MockNamenode> nnNS : nns.values()) { |
| for (MockNamenode nn : nnNS.values()) { |
| nn.stop(); |
| } |
| } |
| nns.clear(); |
| |
| if (router != null) { |
| router.stop(); |
| } |
| } |
| |
| /** |
| * Get the configuration of the cluster which contains all the Namenodes and |
| * their addresses. |
| * @return Configuration containing all the Namenodes. |
| */ |
| private Configuration getNamenodesConfig() { |
| final Configuration conf = new HdfsConfiguration(); |
| conf.set(DFSConfigKeys.DFS_NAMESERVICES, |
| StringUtils.join(",", nns.keySet())); |
| for (String nsId : nns.keySet()) { |
| Set<String> nsNnIds = nns.get(nsId).keySet(); |
| |
| StringBuilder sb = new StringBuilder(); |
| sb.append(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX); |
| sb.append(".").append(nsId); |
| conf.set(sb.toString(), StringUtils.join(",", nsNnIds)); |
| |
| for (String nnId : nsNnIds) { |
| final MockNamenode nn = nns.get(nsId).get(nnId); |
| |
| sb = new StringBuilder(); |
| sb.append(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); |
| sb.append(".").append(nsId); |
| sb.append(".").append(nnId); |
| conf.set(sb.toString(), "localhost:" + nn.getRPCPort()); |
| |
| sb = new StringBuilder(); |
| sb.append(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); |
| sb.append(".").append(nsId); |
| sb.append(".").append(nnId); |
| conf.set(sb.toString(), "localhost:" + nn.getHTTPPort()); |
| } |
| } |
| return conf; |
| } |
| |
| @Test |
| public void testNamenodeMonitoring() throws Exception { |
| Configuration nsConf = getNamenodesConfig(); |
| |
| // Setup the State Store for the Router to use |
| Configuration stateStoreConfig = getStateStoreConfiguration(); |
| stateStoreConfig.setClass( |
| RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, |
| MembershipNamenodeResolver.class, ActiveNamenodeResolver.class); |
| stateStoreConfig.setClass( |
| RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, |
| MountTableResolver.class, FileSubclusterResolver.class); |
| |
| Configuration routerConf = new RouterConfigBuilder(nsConf) |
| .enableLocalHeartbeat(true) |
| .heartbeat() |
| .stateStore() |
| .rpc() |
| .build(); |
| |
| // Specify namenodes (ns1.nn0,ns1.nn1) to monitor |
| routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0"); |
| routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, |
| "ns1.nn0,ns1.nn1"); |
| routerConf.addResource(stateStoreConfig); |
| |
| // Specify local node (ns0.nn1) to monitor |
| routerConf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, "ns0"); |
| routerConf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1"); |
| |
| // Start the Router with the namenodes to monitor |
| router = new Router(); |
| router.init(routerConf); |
| router.start(); |
| |
| // Manually trigger the heartbeat and update the values |
| Collection<NamenodeHeartbeatService> heartbeatServices = |
| router.getNamenodeHeartbeatServices(); |
| for (NamenodeHeartbeatService service : heartbeatServices) { |
| service.periodicInvoke(); |
| } |
| MembershipNamenodeResolver resolver = |
| (MembershipNamenodeResolver) router.getNamenodeResolver(); |
| resolver.loadCache(true); |
| |
| // Check that the monitored values are expected |
| final List<FederationNamenodeContext> namespaceInfo = new ArrayList<>(); |
| for (String nsId : nns.keySet()) { |
| List<? extends FederationNamenodeContext> nnReports = |
| resolver.getNamenodesForNameserviceId(nsId, false); |
| namespaceInfo.addAll(nnReports); |
| } |
| for (FederationNamenodeContext nnInfo : namespaceInfo) { |
| long modTime = nnInfo.getDateModified(); |
| long diff = modTime - initializedTime; |
| if ("ns0".equals(nnInfo.getNameserviceId()) && |
| "nn0".equals(nnInfo.getNamenodeId())) { |
| // The modified date won't be updated in ns0.nn0 |
| // since it isn't monitored by the Router. |
| assertTrue(nnInfo + " shouldn't be updated: " + diff, |
| modTime < initializedTime); |
| } else { |
| // other namnodes should be updated as expected |
| assertTrue(nnInfo + " should be updated: " + diff, |
| modTime > initializedTime); |
| } |
| } |
| } |
| |
| @Test |
| public void testNamenodeMonitoringConfig() throws Exception { |
| testConfig(asList(), ""); |
| testConfig(asList("ns1.nn0"), "ns1.nn0"); |
| testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1"); |
| testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0, ns1.nn1"); |
| testConfig(asList("ns1.nn0", "ns1.nn1"), " ns1.nn0,ns1.nn1"); |
| testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1,"); |
| } |
| |
| /** |
| * Test if configuring a Router to monitor particular Namenodes actually |
| * takes effect. |
| * @param expectedNNs Namenodes that should be monitored. |
| * @param confNsIds Router configuration setting for Namenodes to monitor. |
| */ |
| private void testConfig( |
| Collection<String> expectedNNs, String confNsIds) { |
| |
| // Setup and start the Router |
| Configuration conf = getNamenodesConfig(); |
| Configuration routerConf = new RouterConfigBuilder(conf) |
| .heartbeat(true) |
| .build(); |
| routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0"); |
| routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, confNsIds); |
| router = new Router(); |
| router.init(routerConf); |
| |
| // Test the heartbeat services of the Router |
| Collection<NamenodeHeartbeatService> heartbeatServices = |
| router.getNamenodeHeartbeatServices(); |
| assertNamenodeHeartbeatService(expectedNNs, heartbeatServices); |
| } |
| |
| /** |
| * Assert that the namenodes monitored by the Router are the expected. |
| * @param expected Expected namenodes. |
| * @param actual Actual heartbeat services for the Router |
| */ |
| private static void assertNamenodeHeartbeatService( |
| Collection<String> expected, |
| Collection<NamenodeHeartbeatService> actual) { |
| |
| final Set<String> actualSet = new TreeSet<>(); |
| for (NamenodeHeartbeatService heartbeatService : actual) { |
| NamenodeStatusReport report = heartbeatService.getNamenodeStatusReport(); |
| StringBuilder sb = new StringBuilder(); |
| sb.append(report.getNameserviceId()); |
| sb.append("."); |
| sb.append(report.getNamenodeId()); |
| actualSet.add(sb.toString()); |
| } |
| assertTrue(expected + " does not contain all " + actualSet, |
| expected.containsAll(actualSet)); |
| assertTrue(actualSet + " does not contain all " + expected, |
| actualSet.containsAll(expected)); |
| } |
| |
| @Test |
| public void testJmxUrlHTTP() { |
| verifyUrlSchemes(HttpConfig.Policy.HTTP_ONLY.name()); |
| } |
| |
| @Test |
| public void testJmxUrlHTTPs() { |
| verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name()); |
| } |
| |
| @Test |
| public void testJmxRequestFrequency() { |
| // Disable JMX requests |
| Configuration conf = getNamenodesConfig(); |
| conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS, -1); |
| verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name(), conf, 0, 0, 1); |
| |
| // Set JMX requests to lower frequency |
| conf = getNamenodesConfig(); |
| conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS, |
| TimeUnit.MINUTES.toMillis(5)); |
| verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name(), conf, 0, 1, 2); |
| |
| // Set JMX requests to default frequency |
| conf = getNamenodesConfig(); |
| verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name(), conf, 0, 2, 2); |
| } |
| |
| private void verifyUrlSchemes(String scheme) { |
| int httpRequests = HttpConfig.Policy.HTTP_ONLY.name().equals(scheme) ? 1 : 0; |
| int httpsRequests = HttpConfig.Policy.HTTPS_ONLY.name().equals(scheme) ? 1 : 0; |
| verifyUrlSchemes(scheme, getNamenodesConfig(), httpRequests, httpsRequests, 1); |
| } |
| |
| private void verifyUrlSchemes(String scheme, Configuration conf, int httpRequests, |
| int httpsRequests, int requestsPerService) { |
| |
| // Attach our own log appender so we can verify output |
| final LogVerificationAppender appender = |
| new LogVerificationAppender(); |
| final org.apache.log4j.Logger logger = |
| org.apache.log4j.Logger.getRootLogger(); |
| logger.addAppender(appender); |
| GenericTestUtils.setRootLogLevel(Level.DEBUG); |
| |
| // Setup and start the Router |
| conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, scheme); |
| Configuration routerConf = new RouterConfigBuilder(conf) |
| .heartbeat(true) |
| .build(); |
| routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0"); |
| routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, "ns1.nn0"); |
| router = new Router(); |
| router.init(routerConf); |
| |
| // Test the heartbeat services of the Router |
| Collection<NamenodeHeartbeatService> heartbeatServices = |
| router.getNamenodeHeartbeatServices(); |
| for (NamenodeHeartbeatService heartbeatService : heartbeatServices) { |
| for (int request = 0; request < requestsPerService; request++) { |
| heartbeatService.getNamenodeStatusReport(); |
| } |
| } |
| assertEquals(httpsRequests * 2, appender.countLinesWithMessage("JMX URL: https://")); |
| assertEquals(httpRequests * 2, appender.countLinesWithMessage("JMX URL: http://")); |
| } |
| |
| /** |
| * Test the view of the Datanodes that the Router sees. If a Datanode is |
| * registered in two subclusters, it should return the most up to date |
| * information. |
| * @throws IOException If the test cannot run. |
| */ |
| @Test |
| public void testDatanodesView() throws IOException { |
| |
| // Setup the router |
| Configuration routerConf = new RouterConfigBuilder() |
| .stateStore() |
| .rpc() |
| .build(); |
| router = new Router(); |
| router.init(routerConf); |
| router.start(); |
| |
| // Setup the namenodes |
| for (String nsId : nsIds) { |
| registerSubclusters(router, nns.get(nsId).values()); |
| for (String nnId : nnIds) { |
| MockNamenode nn = nns.get(nsId).get(nnId); |
| if ("nn0".equals(nnId)) { |
| nn.transitionToActive(); |
| } |
| nn.addDatanodeMock(); |
| } |
| } |
| |
| // Set different states for the DNs in each namespace |
| long time = Time.now(); |
| for (String nsId : nsIds) { |
| for (String nnId : nnIds) { |
| // dn0 is DECOMMISSIONED in the most recent (ns1) |
| DatanodeInfoBuilder dn0Builder = new DatanodeInfoBuilder() |
| .setDatanodeUuid("dn0") |
| .setHostName("dn0") |
| .setIpAddr("dn0") |
| .setXferPort(10000); |
| if ("ns0".equals(nsId)) { |
| dn0Builder.setLastUpdate(time - 1000); |
| dn0Builder.setAdminState(AdminStates.NORMAL); |
| } else if ("ns1".equals(nsId)) { |
| dn0Builder.setLastUpdate(time - 500); |
| dn0Builder.setAdminState(AdminStates.DECOMMISSIONED); |
| } |
| |
| // dn1 is NORMAL in the most recent (ns0) |
| DatanodeInfoBuilder dn1Builder = new DatanodeInfoBuilder() |
| .setDatanodeUuid("dn1") |
| .setHostName("dn1") |
| .setIpAddr("dn1") |
| .setXferPort(10000); |
| if ("ns0".equals(nsId)) { |
| dn1Builder.setLastUpdate(time - 1000); |
| dn1Builder.setAdminState(AdminStates.NORMAL); |
| } else if ("ns1".equals(nsId)) { |
| dn1Builder.setLastUpdate(time - 5 * 1000); |
| dn1Builder.setAdminState(AdminStates.DECOMMISSION_INPROGRESS); |
| } |
| |
| // Update the mock NameNode with the DN views |
| MockNamenode nn = nns.get(nsId).get(nnId); |
| List<DatanodeInfo> dns = nn.getDatanodes(); |
| dns.add(dn0Builder.build()); |
| dns.add(dn1Builder.build()); |
| } |
| } |
| |
| // Get the datanodes from the Router and check we get the right view |
| DistributedFileSystem dfs = (DistributedFileSystem)getFileSystem(router); |
| DFSClient dfsClient = dfs.getClient(); |
| DatanodeStorageReport[] dns = dfsClient.getDatanodeStorageReport( |
| DatanodeReportType.ALL); |
| assertEquals(2, dns.length); |
| for (DatanodeStorageReport dn : dns) { |
| DatanodeInfo dnInfo = dn.getDatanodeInfo(); |
| if ("dn0".equals(dnInfo.getHostName())) { |
| assertEquals(AdminStates.DECOMMISSIONED, dnInfo.getAdminState()); |
| } else if ("dn1".equals(dnInfo.getHostName())) { |
| assertEquals(AdminStates.NORMAL, dnInfo.getAdminState()); |
| } else { |
| fail("Unexpected DN: " + dnInfo.getHostName()); |
| } |
| } |
| } |
| } |