| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.federation; |
| |
| import static java.util.Collections.emptySet; |
| import static java.util.Collections.singletonList; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.anyBoolean; |
| import static org.mockito.ArgumentMatchers.anyLong; |
| import static org.mockito.ArgumentMatchers.anyShort; |
| import static org.mockito.ArgumentMatchers.anyString; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.net.ConnectException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.Map.Entry; |
| import java.util.SortedMap; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.ContentSummary; |
| import org.apache.hadoop.fs.FileAlreadyExistsException; |
| import org.apache.hadoop.fs.FsServerDefaults; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.StorageType; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; |
| import org.apache.hadoop.ha.HAServiceStatus; |
| import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService; |
| import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB; |
| import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSUtil; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; |
| import org.apache.hadoop.hdfs.protocol.DatanodeID; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; |
| import org.apache.hadoop.hdfs.protocol.DirectoryListing; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlocks; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; |
| import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; |
| import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService; |
| import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; |
| import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; |
| import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; |
| import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; |
| import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB; |
| import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; |
| import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB; |
| import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; |
| import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; |
| import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; |
| import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; |
| import org.apache.hadoop.hdfs.server.federation.router.Router; |
| import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; |
| import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; |
| import org.apache.hadoop.hdfs.server.namenode.SafeModeException; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; |
| import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; |
| import org.apache.hadoop.hdfs.server.protocol.StorageReport; |
| import org.apache.hadoop.http.HttpServer2; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.ipc.ProtobufRpcEngine2; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.ipc.RPC.Server; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.ipc.StandbyException; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.security.authorize.AccessControlList; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.DataChecksum; |
| import org.apache.hadoop.util.DataChecksum.Type; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.thirdparty.protobuf.BlockingService; |
| |
| |
| /** |
| * Mock for the network interfaces (e.g., RPC and HTTP) of a Namenode. This is |
| * used by the Routers in a mock cluster. |
| */ |
| public class MockNamenode { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(MockNamenode.class); |
| |
| |
| /** Mock implementation of the Namenode. */ |
| private final NamenodeProtocols mockNn; |
| |
| /** Name service identifier (subcluster). */ |
| private String nsId; |
| /** HA state of the Namenode. */ |
| private HAServiceState haState = HAServiceState.STANDBY; |
| /** Datanodes registered in this Namenode. */ |
| private List<DatanodeInfo> dns = new ArrayList<>(); |
| |
| /** RPC server of the Namenode that redirects calls to the mock. */ |
| private Server rpcServer; |
| /** HTTP server of the Namenode that redirects calls to the mock. */ |
| private HttpServer2 httpServer; |
| |
| |
| public MockNamenode(final String nsIdentifier) throws IOException { |
| this(nsIdentifier, new HdfsConfiguration()); |
| } |
| |
| public MockNamenode(final String nsIdentifier, final Configuration conf) |
| throws IOException { |
| this.nsId = nsIdentifier; |
| this.mockNn = mock(NamenodeProtocols.class); |
| setupMock(); |
| setupRPCServer(conf); |
| setupHTTPServer(conf); |
| } |
| |
| /** |
| * Setup the mock of the Namenode. It offers the basic functionality for |
| * Routers to get the status. |
| * @throws IOException If the mock cannot be setup. |
| */ |
| protected void setupMock() throws IOException { |
| NamespaceInfo nsInfo = new NamespaceInfo(1, this.nsId, this.nsId, 1); |
| when(mockNn.versionRequest()).thenReturn(nsInfo); |
| |
| when(mockNn.getServiceStatus()). |
| thenAnswer((Answer<HAServiceStatus>) invocation -> { |
| HAServiceStatus haStatus = new HAServiceStatus(getHAServiceState()); |
| haStatus.setNotReadyToBecomeActive(""); |
| return haStatus; |
| }); |
| } |
| |
| /** |
| * Setup the RPC server of the Namenode that redirects calls to the mock. |
| * @param conf Configuration of the server. |
| * @throws IOException If the RPC server cannot be setup. |
| */ |
| private void setupRPCServer(final Configuration conf) throws IOException { |
| RPC.setProtocolEngine( |
| conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine2.class); |
| ClientNamenodeProtocolServerSideTranslatorPB |
| clientNNProtoXlator = |
| new ClientNamenodeProtocolServerSideTranslatorPB(mockNn); |
| BlockingService clientNNPbService = |
| ClientNamenodeProtocol.newReflectiveBlockingService( |
| clientNNProtoXlator); |
| |
| int numHandlers = conf.getInt( |
| DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, |
| DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT); |
| |
| rpcServer = new RPC.Builder(conf) |
| .setProtocol(ClientNamenodeProtocolPB.class) |
| .setInstance(clientNNPbService) |
| .setBindAddress("0.0.0.0") |
| .setPort(0) |
| .setNumHandlers(numHandlers) |
| .build(); |
| |
| NamenodeProtocolServerSideTranslatorPB nnProtoXlator = |
| new NamenodeProtocolServerSideTranslatorPB(mockNn); |
| BlockingService nnProtoPbService = |
| NamenodeProtocolService.newReflectiveBlockingService( |
| nnProtoXlator); |
| DFSUtil.addPBProtocol( |
| conf, NamenodeProtocolPB.class, nnProtoPbService, rpcServer); |
| |
| DatanodeProtocolServerSideTranslatorPB dnProtoPbXlator = |
| new DatanodeProtocolServerSideTranslatorPB(mockNn, 1000); |
| BlockingService dnProtoPbService = |
| DatanodeProtocolService.newReflectiveBlockingService( |
| dnProtoPbXlator); |
| DFSUtil.addPBProtocol( |
| conf, DatanodeProtocolPB.class, dnProtoPbService, rpcServer); |
| |
| HAServiceProtocolServerSideTranslatorPB haServiceProtoXlator = |
| new HAServiceProtocolServerSideTranslatorPB(mockNn); |
| BlockingService haProtoPbService = |
| HAServiceProtocolService.newReflectiveBlockingService( |
| haServiceProtoXlator); |
| DFSUtil.addPBProtocol( |
| conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer); |
| |
| this.rpcServer.addTerseExceptions( |
| RemoteException.class, |
| SafeModeException.class, |
| FileNotFoundException.class, |
| FileAlreadyExistsException.class, |
| AccessControlException.class, |
| LeaseExpiredException.class, |
| NotReplicatedYetException.class, |
| IOException.class, |
| ConnectException.class, |
| StandbyException.class); |
| |
| rpcServer.start(); |
| } |
| |
| /** |
| * Setup the HTTP server of the Namenode that redirects calls to the mock. |
| * @param conf Configuration of the server. |
| * @throws IOException If the HTTP server cannot be setup. |
| */ |
| private void setupHTTPServer(Configuration conf) throws IOException { |
| HttpServer2.Builder builder = new HttpServer2.Builder() |
| .setName("hdfs") |
| .setConf(conf) |
| .setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))) |
| .addEndpoint(URI.create("http://0.0.0.0:0")); |
| httpServer = builder.build(); |
| httpServer.start(); |
| } |
| |
| /** |
| * Get the RPC port for the Mock Namenode. |
| * @return RPC port. |
| */ |
| public int getRPCPort() { |
| return rpcServer.getListenerAddress().getPort(); |
| } |
| |
| /** |
| * Get the HTTP port for the Mock Namenode. |
| * @return HTTP port. |
| */ |
| public int getHTTPPort() { |
| return httpServer.getConnectorAddress(0).getPort(); |
| } |
| |
| /** |
| * Get the Mock core. This is used to extend the mock. |
| * @return Mock Namenode protocol to be extended. |
| */ |
| public NamenodeProtocols getMock() { |
| return mockNn; |
| } |
| |
| /** |
| * Get the name service id (subcluster) of the Mock Namenode. |
| * @return Name service identifier. |
| */ |
| public String getNameserviceId() { |
| return nsId; |
| } |
| |
| /** |
| * Get the HA state of the Mock Namenode. |
| * @return HA state (ACTIVE or STANDBY). |
| */ |
| public HAServiceState getHAServiceState() { |
| return haState; |
| } |
| |
| /** |
| * Show the Mock Namenode as Active. |
| */ |
| public void transitionToActive() { |
| this.haState = HAServiceState.ACTIVE; |
| } |
| |
| /** |
| * Show the Mock Namenode as Standby. |
| */ |
| public void transitionToStandby() { |
| this.haState = HAServiceState.STANDBY; |
| } |
| |
| /** |
| * Get the datanodes that this NN will return. |
| * @return The datanodes that this NN will return. |
| */ |
| public List<DatanodeInfo> getDatanodes() { |
| return this.dns; |
| } |
| |
| /** |
| * Stop the Mock Namenode. It stops all the servers. |
| * @throws Exception If it cannot stop the Namenode. |
| */ |
| public void stop() throws Exception { |
| if (rpcServer != null) { |
| rpcServer.stop(); |
| rpcServer = null; |
| } |
| if (httpServer != null) { |
| httpServer.stop(); |
| httpServer = null; |
| } |
| } |
| |
| /** |
| * Add the mock for the FileSystem calls in ClientProtocol. |
| * @throws IOException If it cannot be setup. |
| */ |
| public void addFileSystemMock() throws IOException { |
| final SortedMap<String, String> fs = |
| new ConcurrentSkipListMap<String, String>(); |
| |
| DirectoryListing l = mockNn.getListing(anyString(), any(), anyBoolean()); |
| when(l).thenAnswer(invocation -> { |
| String src = getSrc(invocation); |
| LOG.info("{} getListing({})", nsId, src); |
| if (fs.get(src) == null) { |
| throw new FileNotFoundException("File does not exist " + src); |
| } |
| if (!src.endsWith("/")) { |
| src += "/"; |
| } |
| Map<String, String> files = |
| fs.subMap(src, src + Character.MAX_VALUE); |
| List<HdfsFileStatus> list = new ArrayList<>(); |
| for (String file : files.keySet()) { |
| if (file.substring(src.length()).indexOf('/') < 0) { |
| HdfsFileStatus fileStatus = |
| getMockHdfsFileStatus(file, fs.get(file)); |
| list.add(fileStatus); |
| } |
| } |
| HdfsFileStatus[] array = list.toArray( |
| new HdfsFileStatus[list.size()]); |
| return new DirectoryListing(array, 0); |
| }); |
| when(mockNn.getFileInfo(anyString())).thenAnswer(invocation -> { |
| String src = getSrc(invocation); |
| LOG.info("{} getFileInfo({})", nsId, src); |
| return getMockHdfsFileStatus(src, fs.get(src)); |
| }); |
| HdfsFileStatus c = mockNn.create(anyString(), any(), anyString(), any(), |
| anyBoolean(), anyShort(), anyLong(), any(), any(), any()); |
| when(c).thenAnswer(invocation -> { |
| String src = getSrc(invocation); |
| LOG.info("{} create({})", nsId, src); |
| boolean createParent = (boolean)invocation.getArgument(4); |
| if (createParent) { |
| Path path = new Path(src).getParent(); |
| while (!path.isRoot()) { |
| LOG.info("{} create parent {}", nsId, path); |
| fs.put(path.toString(), "DIRECTORY"); |
| path = path.getParent(); |
| } |
| } |
| fs.put(src, "FILE"); |
| return getMockHdfsFileStatus(src, "FILE"); |
| }); |
| LocatedBlocks b = mockNn.getBlockLocations( |
| anyString(), anyLong(), anyLong()); |
| when(b).thenAnswer(invocation -> { |
| String src = getSrc(invocation); |
| LOG.info("{} getBlockLocations({})", nsId, src); |
| if (!fs.containsKey(src)) { |
| LOG.error("{} cannot find {} for getBlockLocations", nsId, src); |
| throw new FileNotFoundException("File does not exist " + src); |
| } |
| return mock(LocatedBlocks.class); |
| }); |
| boolean f = mockNn.complete(anyString(), anyString(), any(), anyLong()); |
| when(f).thenAnswer(invocation -> { |
| String src = getSrc(invocation); |
| if (!fs.containsKey(src)) { |
| LOG.error("{} cannot find {} for complete", nsId, src); |
| throw new FileNotFoundException("File does not exist " + src); |
| } |
| return true; |
| }); |
| LocatedBlock a = mockNn.addBlock( |
| anyString(), anyString(), any(), any(), anyLong(), any(), any()); |
| when(a).thenAnswer(invocation -> { |
| String src = getSrc(invocation); |
| if (!fs.containsKey(src)) { |
| LOG.error("{} cannot find {} for addBlock", nsId, src); |
| throw new FileNotFoundException("File does not exist " + src); |
| } |
| return getMockLocatedBlock(nsId); |
| }); |
| boolean m = mockNn.mkdirs(anyString(), any(), anyBoolean()); |
| when(m).thenAnswer(invocation -> { |
| String src = getSrc(invocation); |
| LOG.info("{} mkdirs({})", nsId, src); |
| boolean createParent = (boolean)invocation.getArgument(2); |
| if (createParent) { |
| Path path = new Path(src).getParent(); |
| while (!path.isRoot()) { |
| LOG.info("{} mkdir parent {}", nsId, path); |
| fs.put(path.toString(), "DIRECTORY"); |
| path = path.getParent(); |
| } |
| } |
| fs.put(src, "DIRECTORY"); |
| return true; |
| }); |
| when(mockNn.getServerDefaults()).thenAnswer(invocation -> { |
| LOG.info("{} getServerDefaults", nsId); |
| FsServerDefaults defaults = mock(FsServerDefaults.class); |
| when(defaults.getChecksumType()).thenReturn( |
| Type.valueOf(DataChecksum.CHECKSUM_CRC32)); |
| when(defaults.getKeyProviderUri()).thenReturn(nsId); |
| return defaults; |
| }); |
| when(mockNn.getContentSummary(anyString())).thenAnswer(invocation -> { |
| String src = getSrc(invocation); |
| LOG.info("{} getContentSummary({})", nsId, src); |
| if (fs.get(src) == null) { |
| throw new FileNotFoundException("File does not exist " + src); |
| } |
| if (!src.endsWith("/")) { |
| src += "/"; |
| } |
| Map<String, String> files = |
| fs.subMap(src, src + Character.MAX_VALUE); |
| int numFiles = 0; |
| int numDirs = 0; |
| int length = 0; |
| for (Entry<String, String> entry : files.entrySet()) { |
| String file = entry.getKey(); |
| if (file.substring(src.length()).indexOf('/') < 0) { |
| String type = entry.getValue(); |
| if ("DIRECTORY".equals(type)) { |
| numDirs++; |
| } else if ("FILE".equals(type)) { |
| numFiles++; |
| length += 100; |
| } |
| } |
| } |
| return new ContentSummary.Builder() |
| .fileCount(numFiles) |
| .directoryCount(numDirs) |
| .length(length) |
| .erasureCodingPolicy("") |
| .build(); |
| }); |
| } |
| |
| /** |
| * Add datanode related operations. |
| * @throws IOException If it cannot be setup. |
| */ |
| public void addDatanodeMock() throws IOException { |
| when(mockNn.getDatanodeReport(any(DatanodeReportType.class))).thenAnswer( |
| invocation -> { |
| LOG.info("{} getDatanodeReport()", nsId, invocation.getArgument(0)); |
| return dns.toArray(); |
| }); |
| when(mockNn.getDatanodeStorageReport(any(DatanodeReportType.class))) |
| .thenAnswer(invocation -> { |
| LOG.info("{} getDatanodeStorageReport()", |
| nsId, invocation.getArgument(0)); |
| DatanodeStorageReport[] ret = new DatanodeStorageReport[dns.size()]; |
| for (int i = 0; i < dns.size(); i++) { |
| DatanodeInfo dn = dns.get(i); |
| DatanodeStorage storage = new DatanodeStorage(dn.getName()); |
| StorageReport[] storageReports = new StorageReport[] { |
| new StorageReport(storage, false, 0L, 0L, 0L, 0L, 0L) |
| }; |
| ret[i] = new DatanodeStorageReport(dn, storageReports); |
| } |
| return ret; |
| }); |
| } |
| |
| private static String getSrc(InvocationOnMock invocation) { |
| return (String) invocation.getArguments()[0]; |
| } |
| |
| /** |
| * Get a mock HDFS file status. |
| * @param filename Name of the file. |
| * @param type Type of the file (FILE, DIRECTORY, or null). |
| * @return HDFS file status |
| */ |
| private static HdfsFileStatus getMockHdfsFileStatus( |
| final String filename, final String type) { |
| if (type == null) { |
| return null; |
| } |
| HdfsFileStatus fileStatus = mock(HdfsFileStatus.class); |
| when(fileStatus.getLocalNameInBytes()).thenReturn(filename.getBytes()); |
| when(fileStatus.getPermission()).thenReturn(mock(FsPermission.class)); |
| when(fileStatus.getOwner()).thenReturn("owner"); |
| when(fileStatus.getGroup()).thenReturn("group"); |
| if (type.equals("FILE")) { |
| when(fileStatus.getLen()).thenReturn(100L); |
| when(fileStatus.getReplication()).thenReturn((short) 1); |
| when(fileStatus.getBlockSize()).thenReturn( |
| HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT); |
| } else if (type.equals("DIRECTORY")) { |
| when(fileStatus.isDir()).thenReturn(true); |
| when(fileStatus.isDirectory()).thenReturn(true); |
| } |
| return fileStatus; |
| } |
| |
| /** |
| * Get a mock located block pointing to one of the subclusters. It is |
| * allocated in a fake Datanode. |
| * @param nsId Name service identifier (subcluster). |
| * @return Mock located block. |
| */ |
| private static LocatedBlock getMockLocatedBlock(final String nsId) { |
| LocatedBlock lb = mock(LocatedBlock.class); |
| when(lb.getCachedLocations()).thenReturn(DatanodeInfo.EMPTY_ARRAY); |
| DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", |
| 1111, 1112, 1113, 1114); |
| DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId); |
| DatanodeInfoWithStorage datanodeInfoWithStorage = |
| new DatanodeInfoWithStorage(dnInfo, "storageID", StorageType.DEFAULT); |
| when(lb.getLocations()) |
| .thenReturn(new DatanodeInfoWithStorage[] {datanodeInfoWithStorage}); |
| ExtendedBlock eb = mock(ExtendedBlock.class); |
| when(eb.getBlockPoolId()).thenReturn(nsId); |
| when(lb.getBlock()).thenReturn(eb); |
| @SuppressWarnings("unchecked") |
| Token<BlockTokenIdentifier> tok = mock(Token.class); |
| when(tok.getIdentifier()).thenReturn(nsId.getBytes()); |
| when(tok.getPassword()).thenReturn(nsId.getBytes()); |
| when(tok.getKind()).thenReturn(new Text(nsId)); |
| when(tok.getService()).thenReturn(new Text(nsId)); |
| when(lb.getBlockToken()).thenReturn(tok); |
| return lb; |
| } |
| |
| /** |
| * Register a set of NameNodes in a Router. |
| * @param router Router to register to. |
| * @param namenodes Set of NameNodes. |
| * @throws IOException If it cannot register them. |
| */ |
| public static void registerSubclusters(Router router, |
| Collection<MockNamenode> namenodes) throws IOException { |
| registerSubclusters(singletonList(router), namenodes, emptySet()); |
| } |
| |
| /** |
| * Register a set of NameNodes in a set of Routers. |
| * @param routers Set of Routers. |
| * @param namenodes Set of NameNodes. |
| * @param unavailableSubclusters Set of unavailable subclusters. |
| * @throws IOException If it cannot register them. |
| */ |
| public static void registerSubclusters(List<Router> routers, |
| Collection<MockNamenode> namenodes, |
| Set<String> unavailableSubclusters) throws IOException { |
| |
| for (final Router router : routers) { |
| MembershipNamenodeResolver resolver = |
| (MembershipNamenodeResolver) router.getNamenodeResolver(); |
| for (final MockNamenode nn : namenodes) { |
| String nsId = nn.getNameserviceId(); |
| String rpcAddress = "localhost:" + nn.getRPCPort(); |
| String httpAddress = "localhost:" + nn.getHTTPPort(); |
| String scheme = "http"; |
| NamenodeStatusReport report = new NamenodeStatusReport( |
| nsId, null, rpcAddress, rpcAddress, |
| rpcAddress, scheme, httpAddress); |
| if (unavailableSubclusters.contains(nsId)) { |
| LOG.info("Register {} as UNAVAILABLE", nsId); |
| report.setRegistrationValid(false); |
| } else { |
| LOG.info("Register {} as ACTIVE", nsId); |
| report.setRegistrationValid(true); |
| } |
| report.setNamespaceInfo(new NamespaceInfo(0, nsId, nsId, 0)); |
| resolver.registerNamenode(report); |
| } |
| resolver.loadCache(true); |
| } |
| } |
| } |